#include "mqtt_client.h" #include #include MqttClient* MqttClient::_instance = nullptr; void MqttClient::mqttCallbackStatic(char* topic, byte* payload, unsigned int length) { if (_instance) _instance->mqttCallback(topic, payload, length); } void MqttClient::begin(Client& networkClient) { _instance = this; _client.setClient(networkClient); _client.setBufferSize(MQTT_BUFFER_SIZE); _client.setCallback(mqttCallbackStatic); _lastReconnectAttemptMs = 0; _reconnectIntervalMs = MQTT_RECONNECT_BASE_MS; } void MqttClient::setConfig(const DeviceConfig& config) { strlcpy(_deviceId, config.device_id, sizeof(_deviceId)); strlcpy(_broker, config.mqtt_broker, sizeof(_broker)); _port = config.mqtt_port; strlcpy(_username, config.mqtt_username, sizeof(_username)); strlcpy(_password, config.mqtt_password, sizeof(_password)); strlcpy(_topicPrefix, config.mqtt_topic_prefix, sizeof(_topicPrefix)); _client.setServer(_broker, _port); buildTopics(); Serial.printf("[MQTT] Configured - broker: %s:%d, device: %s\n", _broker, _port, _deviceId); } void MqttClient::buildTopics() { snprintf(_topicTelemetry, sizeof(_topicTelemetry), "%s/%s/telemetry", _topicPrefix, _deviceId); snprintf(_topicHeartbeat, sizeof(_topicHeartbeat), "%s/%s/heartbeat", _topicPrefix, _deviceId); snprintf(_topicCmd, sizeof(_topicCmd), "%s/%s/cmd", _topicPrefix, _deviceId); snprintf(_topicResp, sizeof(_topicResp), "%s/%s/resp", _topicPrefix, _deviceId); snprintf(_topicStatus, sizeof(_topicStatus), "%s/%s/status", _topicPrefix, _deviceId); } bool MqttClient::connect() { if (_broker[0] == '\0') { Serial.println("[MQTT] No broker configured, skipping"); return false; } Serial.printf("[MQTT] Connecting to %s:%d...\n", _broker, _port); String lwtPayload = buildLwtPayload(); bool connected; if (_username[0] != '\0') { connected = _client.connect(_deviceId, _username, _password, _topicStatus, 1, true, lwtPayload.c_str()); } else { connected = _client.connect(_deviceId, _topicStatus, 1, true, lwtPayload.c_str()); } if (connected) { Serial.println("[MQTT] Connected"); _reconnectIntervalMs = MQTT_RECONNECT_BASE_MS; subscribeToCommands(); publishOnlineStatus(); return true; } Serial.printf("[MQTT] Connection failed (rc=%d)\n", _client.state()); return false; } void MqttClient::disconnect() { if (_client.connected()) { JsonDocument doc; doc["status"] = "offline"; doc["ts"] = getEpochTime(); String payload; serializeJson(doc, payload); _client.publish(_topicStatus, payload.c_str(), true); _client.disconnect(); Serial.println("[MQTT] Disconnected"); } } bool MqttClient::isConnected() { return _client.connected(); } void MqttClient::loop() { if (_broker[0] == '\0') return; if (_client.connected()) { _client.loop(); return; } unsigned long now = millis(); if (now - _lastReconnectAttemptMs >= _reconnectIntervalMs) { _lastReconnectAttemptMs = now; if (connect()) { return; } // Exponential backoff _reconnectIntervalMs = min(_reconnectIntervalMs * 2, (unsigned long)MQTT_RECONNECT_MAX_MS); Serial.printf("[MQTT] Next retry in %lus\n", _reconnectIntervalMs / 1000); } } bool MqttClient::subscribeToCommands() { if (_client.subscribe(_topicCmd, 1)) { Serial.printf("[MQTT] Subscribed to %s\n", _topicCmd); return true; } Serial.printf("[MQTT] Subscribe failed: %s\n", _topicCmd); return false; } void MqttClient::publishOnlineStatus() { JsonDocument doc; doc["status"] = "online"; doc["ts"] = getEpochTime(); doc["fw"] = FW_VERSION; doc["dev"] = _deviceId; String payload; serializeJson(doc, payload); _client.publish(_topicStatus, payload.c_str(), true); Serial.println("[MQTT] Published online status"); } String MqttClient::buildLwtPayload() { JsonDocument doc; doc["status"] = "offline"; doc["ts"] = getEpochTime(); doc["dev"] = _deviceId; String payload; serializeJson(doc, payload); return payload; } void MqttClient::mqttCallback(char* topic, byte* payload, unsigned int length) { String topicStr(topic); String payloadStr; payloadStr.reserve(length); for (unsigned int i = 0; i < length; i++) { payloadStr += (char)payload[i]; } Serial.printf("[MQTT] Received on %s (%d bytes)\n", topic, length); if (_commandCallback) { _commandCallback(topicStr, payloadStr); } } bool MqttClient::publishTelemetry(const AcuvimData& data, const char* connType, int signal, const char* operatorName) { if (!_client.connected()) return false; JsonDocument doc; doc["ts"] = getEpochTime(); doc["dev"] = _deviceId; // Voltages JsonObject v = doc["v"].to(); v["a"] = serialized(String(data.voltage_a, 1)); v["b"] = serialized(String(data.voltage_b, 1)); v["c"] = serialized(String(data.voltage_c, 1)); v["ab"] = serialized(String(data.voltage_ab, 1)); v["bc"] = serialized(String(data.voltage_bc, 1)); v["ca"] = serialized(String(data.voltage_ca, 1)); // Currents JsonObject i = doc["i"].to(); i["a"] = serialized(String(data.current_a, 2)); i["b"] = serialized(String(data.current_b, 2)); i["c"] = serialized(String(data.current_c, 2)); // Power JsonObject p = doc["p"].to(); p["total"] = serialized(String(data.active_power, 3)); p["a"] = serialized(String(data.power_a, 3)); p["b"] = serialized(String(data.power_b, 3)); p["c"] = serialized(String(data.power_c, 3)); p["reactive"] = serialized(String(data.reactive_power, 3)); p["apparent"] = serialized(String(data.apparent_power, 3)); p["pf"] = serialized(String(data.power_factor, 3)); // Frequency doc["f"] = serialized(String(data.frequency, 2)); // Energy JsonObject e = doc["e"].to(); e["imp_act"] = serialized(String(data.import_active_energy, 1)); e["exp_act"] = serialized(String(data.export_active_energy, 1)); e["imp_react"] = serialized(String(data.import_reactive_energy, 1)); e["exp_react"] = serialized(String(data.export_reactive_energy, 1)); // Demand JsonObject d = doc["d"].to(); d["act"] = serialized(String(data.active_demand, 3)); d["max_act"] = serialized(String(data.max_active_demand, 3)); d["react"] = serialized(String(data.reactive_demand, 3)); // THD JsonObject thd = doc["thd"].to(); thd["va"] = serialized(String(data.thd_voltage_a, 2)); thd["vb"] = serialized(String(data.thd_voltage_b, 2)); thd["vc"] = serialized(String(data.thd_voltage_c, 2)); thd["ia"] = serialized(String(data.thd_current_a, 2)); thd["ib"] = serialized(String(data.thd_current_b, 2)); thd["ic"] = serialized(String(data.thd_current_c, 2)); // Connection info doc["conn"] = connType; doc["rssi"] = signal; if (operatorName && operatorName[0] != '\0') { doc["operator"] = operatorName; } String payload; serializeJson(doc, payload); bool ok = _client.publish(_topicTelemetry, payload.c_str()); if (!ok) { Serial.println("[MQTT] Telemetry publish failed"); } return ok; } bool MqttClient::publishResponse(const char* requestId, const char* payload) { if (!_client.connected()) return false; return _client.publish(_topicResp, payload); } bool MqttClient::publish(const char* topic, const char* payload, bool retain) { if (!_client.connected()) return false; return _client.publish(topic, payload, retain); } bool MqttClient::publishBuffered(const char* payload) { if (!_client.connected()) return false; return _client.publish(_topicTelemetry, payload); } uint32_t MqttClient::getEpochTime() { time_t now; time(&now); if (now < 1700000000) { return millis() / 1000; } return (uint32_t)now; }