#define _GNU_SOURCE #include "MQTTAsync.h" #include "MQTTClientPersistence.h" #include #include #include #if defined(WIN32) #include #define sleep Sleep #else #include #include #include #endif #if defined(_WRS_KERNEL) #include #endif #include #include #include #include #include #include #include "mqtt_utils.h" #include "main.h" #define PRINT_TIME_TAG #define DBG_TAG "mqtt_utils" #define DBG_LVL DBG_INFO #include "debug_print.h" #define MQTT_ARRY_ARRY_SIZE(x) (sizeof(x)/sizeof(x[0])) mqtt_utils_t *mqtt_conf; mqtt_parm_download_t mqtt_parm_download={0}; int connect_failure_times = 0; char nativeInvokTopicName[1024] = ""; char nativeUpgradeTopicName[1024] = ""; // 多个订阅topic数组 char subscribeTopics[5][1024] = {""}; int subscribeTopicCount = 0; extern char softwareVersion[16]; typedef struct{ enum MQTTASYNC_TRACE_LEVELS level; char name[64]; }trace_level_name_t; trace_level_name_t trace_level_name_list[] = { {MQTTASYNC_TRACE_MAXIMUM, "MQTTASYNC_TRACE_MAXIMUM"}, {MQTTASYNC_TRACE_MEDIUM, "MQTTASYNC_TRACE_MEDIUM"}, {MQTTASYNC_TRACE_MINIMUM, "MQTTASYNC_TRACE_MINIMUM"}, {MQTTASYNC_TRACE_PROTOCOL, "MQTTASYNC_TRACE_PROTOCOL"}, {MQTTASYNC_TRACE_ERROR, "MQTTASYNC_TRACE_ERROR"}, {MQTTASYNC_TRACE_SEVERE, "MQTTASYNC_TRACE_SEVERE"}, {MQTTASYNC_TRACE_FATAL, "MQTTASYNC_TRACE_FATAL"}, }; void mqtt_server_events_report(char *sn,char *msg_id,json_object *data,char *productid){ char topic[128] = ""; const char *payload = NULL; if(data == NULL){ LOG_I("data object is NULL\n"); return; } snprintf(topic, sizeof(topic), "/iot/%s/%s/thing/ota/inform",productid,sn); LOG_I("publish topic:[%s]\n", topic); payload = json_object_to_json_string(data); LOG_I("send payload[%d][%s]\n", strlen(payload), payload); mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload)); // 释放JSON对象 json_object_put(data); } void mqtt_server_reply(char *sn,char *msg_id,json_object *functions) { char topic[128] = ""; const char *payload = NULL; json_object *root = NULL; struct timeval tv; char time_buffer[16] = ""; gettimeofday(&tv, NULL); snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec); //LOG_I("%s\n", __func__); root = json_object_new_object(); if(root == NULL){ LOG_I("json_object_new_object error\n"); goto json_error; } json_object_object_add(root, "deviceId", json_object_new_string(sn)); json_object_object_add(root, "messageId", json_object_new_string(msg_id)); json_object_object_add(root, "timestamp", json_object_new_string(time_buffer)); json_object_object_add(root, "code", json_object_new_int(200)); json_object_object_add(root, "message", json_object_new_string("success")); json_object_object_add(root, "payload", functions); snprintf(topic, sizeof(topic), "$iot/v1/device/%s/functions/call/response",sn); LOG_I("publish topic:[%s]\n", topic); payload = json_object_to_json_string(root); LOG_I("payload[%d][%s]\n", strlen(payload), payload); mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload)); json_error: json_object_put(root); } void mqtt_net_failure(int *failure_times){ system("echo 0 > /sys/class/gpio/gpio113/value"); if(failure_times != NULL){ (*failure_times)++; LOG_I("mqtt net failure_times = %d\n", *failure_times); if(*failure_times >= 5){ system("systemctl restart mt_server"); } } } int printVersionInfo(MQTTAsync_nameValue * info){ int rc = 0; //LOG_I("MQTT library information:\n"); while (info->name){ LOG_I("%s: %s\n", info->name, info->value); info++; rc = 1; /* at least one value printed */ } if (rc == 1) //LOG_I("\n"); return rc; } void mqtt_utils_trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message){ int i; int size = 0; size = MQTT_ARRY_ARRY_SIZE(trace_level_name_list); for (i=0;ipayload); topic_matched = 1; break; } } if(!topic_matched) { LOG_I("消息topic不匹配任何订阅的topic: %s\n", topicName); } MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; } void mqtt_utils_connected(void *context, char *cause){ LOG_I("=== mqtt_utils_connected 函数被调用 ===\n"); if (cause != NULL) { LOG_I("%s cause:%s\n",__func__, cause); } else { LOG_I("%s\n",__func__); } connect_failure_times=0; LOG_I(" MQTT连接成功,开始订阅%d个topic\n", subscribeTopicCount); // 订阅所有设置的topic for(int i = 0; i < subscribeTopicCount; i++) { if (strlen(subscribeTopics[i]) == 0) { LOG_I("错误:订阅topic[%d]为空!\n", i); continue; } LOG_I("订阅topic[%d]: %s\n", i, subscribeTopics[i]); int subscribe_result = mqtt_utils_subscribe(mqtt_conf, subscribeTopics[i], 1); LOG_I("订阅topic[%d]请求结果: %d\n", i, subscribe_result); } // 添加通配 符订阅来接收所有消息(用于调试) LOG_I("=== 添加通配符订阅用于调试 ===\n"); int wildcard_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "#", 1); LOG_I("通配符订阅请求结果: %d\n", wildcard_subscribe_result); LOG_I("=== 通配符订阅完成 ===\n"); // 订阅公共 topic用于测试 LOG_I("=== 订阅公共topic用于测试 ===\n"); int public_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "/test/public/topic", 0); LOG_I("公共topic订阅请求结果: %d\n", public_subscribe_result); LOG_I("=== 公共topic订阅完成 ===\n"); system( "echo 1 > /sys/class/gpio/gpio113/value");//yellow ok station_status_report(); LOG_I("=== mqtt_utils_connected 函数执行完成 ===\n"); } void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){ //LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc)); //mqtt_led_net_status_judge(); } void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){ //LOG_I("%s\n",__func__); connect_failure_times = 0; } void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){ //LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none"); // mqtt_led_net_status_judge(); LOG_I("MQTT连接失败 - 错误代码: %s\n", response ? MQTTAsync_strerror(response->code) : "未知错误"); if (response && response->message) { LOG_I("MQTT连接失败 - 错误信息: %s\n", response->message); } mqtt_net_failure(&connect_failure_times); } static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){ MQTTAsync client = (MQTTAsync)context; LOG_I("%s ssl error: %s\n",__func__, str); return 0; } void mqtt_utils_on_publish_success(void *context, MQTTAsync_successData *response){ //LOG_I("%s\n",__func__); } void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){ LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code)); // 如果是连 接相关的问题,记录详细信息 if (response && response->code == MQTTASYNC_OPERATION_INCOMPLETE) { LOG_I("发布失败:操作在完成前被丢弃,可能是连接状态不稳定\n"); } else if (response && response->code == MQTTASYNC_DISCONNECTED) { LOG_I("发布失败:客户端已断开连接\n"); } } void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){ LOG_I("=== MQTT订阅成功回调被调用 ===\n"); LOG_I("MQTT订阅成功\n"); } void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){ LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code)); } void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){ LOG_I("%s\n",__func__); //mqtt_led_net_status_judge(); } int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){ int rc = 0; LOG_I("开始订阅topic: %s, qos: %d\n", topic, qos); LOG_I("MQTT客户端指针: %p\n", mqtt_utils->client); LOG_I("订阅选项指针: %p\n", &mqtt_utils->sub_opts); // 检查参数有效性 if (topic == NULL || strlen(topic) == 0) { LOG_I("错误:topic参数无效\n"); return -1; } if (mqtt_utils->client == NULL) { LOG_I("错误:MQTT客户端未初始化\n"); return -1; } // 跳过连接状态检查,直接进行 订阅 LOG_I("跳过连接状态检查,直接进行订阅\n"); LOG_I("准备调用 MQTTAsync_subscribe...\n"); rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils ->sub_opts); LOG_I("MQTTAsync_subscribe 调用完成,返回值: %d\n", rc); if (rc != MQTTASYNC_SUCCESS){ LOG_I("订阅失败,错误代码: %s\n ", MQTTAsync_strerror(rc)); } else { LOG_I("订阅请求已发送,等待回调\n"); } return rc; } int mqtt_utils_publish(mqtt_utils_t *mqtt_utils, char *topic, int qos, const char *data, int datalen){ int rc; int isConnected = 0; //LOG_I("%s data of length %d\n",__func__, datalen); rc = MQTTAsync_send(mqtt_utils->client, topic, datalen, data, qos, mqtt_utils->retain, &mqtt_utils->pub_opts); if (rc != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to start publish, return code %s\n", MQTTAsync_strerror(rc)); } return rc; } int mqtt_utils_init(mqtt_utils_t *mqtt_config) { mqtt_conf=mqtt_config; MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer; MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; MQTTAsync_responseOptions sub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_subscribe MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_send MQTTAsync_nameValue *infos = NULL; char url[256] = ""; int ret = -1; int rc = 0; LOG_I("=== 开始MQTT连接初始化 ===\n"); LOG_I("ClientId: %s\n", mqtt_conf->clientid); LOG_I("Username: %s\n", mqtt_conf->username); LOG_I("Password: %s\n", mqtt_conf->password); LOG_I("Host: %s\n", mqtt_conf->host); LOG_I("Port: %d\n", mqtt_conf->port); // 提前设置topic名称,确保在连接回调中可用 sprintf(nativeInvokTopicName,"/iot%s/thing/ota/upgrade",mqtt_conf->username); LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName); // 设置多个订阅topic subscribeTopicCount = 0; sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/ota", mqtt_conf->username); sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/task", mqtt_conf->username); sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/bind", mqtt_conf->username); sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/group", mqtt_conf->username); sprintf(subscribeTopics[subscribeTopicCount++], "/iot%s/thing/ota/upgrade", mqtt_conf->username); LOG_I("设置了%d个订阅topic:\n", subscribeTopicCount); for(int i = 0; i < subscribeTopicCount; i++) { LOG_I("Topic[%d]: %s\n", i, subscribeTopics[i]); } infos = MQTTAsync_getVersionInfo(); printVersionInfo(infos); MQTTAsync_setTraceCallback(mqtt_utils_trace_callback); MQTTAsync_setTraceLevel(mqtt_conf->tracelevel); //snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port); snprintf(url, sizeof(url), "tcp://%s:%d", mqtt_conf->host, mqtt_conf->port); LOG_I("MQTT URL: %s\n", url); create_opts.sendWhileDisconnected = 0; rc = MQTTAsync_createWithOptions(&mqtt_conf->client, url, mqtt_conf->clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts); LOG_I("MQTTAsync_create rc = %d\n", rc); if (rc != MQTTASYNC_SUCCESS){ LOG_I("MQTT客户端创建失败: %s\n", MQTTAsync_strerror(rc)); ret = -1; goto error; } LOG_I("MQTTAsync_create succes s\n"); // 设置所有必要的回调函数 LOG_I("设置MQTT回调函数\n"); rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected); LOG_I("MQTTAsync_setConnected rc = %d\n", rc); if (rc != MQTTASYNC_SUCCESS){ LOG_I("设置连接回调失败: %s\n", MQTTAsync_strerror(rc)); ret = -2; goto error; } LOG_I("MQTTAsync_setConnected success\n"); rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected); LOG_I("MQTTAsy nc_setDisconnected rc = %d\n", rc); if (rc != MQTTASYNC_SUCCESS){ LOG_I("设置断开回调失败: %s\n", MQTTAsync_strerror(rc)); ret = -2; goto error; } LOG_I("MQTTAsync_setDisconnected success\n"); // 设置消息到达回调 rc = MQTTAsync_setMessageArrivedCallback(mqtt_conf->client, NULL, mqtt_utils_message_arrived); LOG_I("MQTTAsync_setMessageArrivedCallback rc = %d\n", rc); if (rc != MQTTASYNC_SUCCESS){ LOG_I("设置消息到达回调失败: %s\n", MQTTAsync_strerror(rc)); ret = -2; goto error; } LOG_I("MQTTAsync_setMessageArrivedCallback success\n"); // 设置连接丢失回调 rc = MQTTAsync_setConnectionLostCallback(mqtt_conf->client, NULL, mqtt_utils_connection_lost); LOG_I("MQTTAsync_setConnectionLostCallback rc = %d\n", rc); if (rc != MQTTASYNC_SUCCESS){ LOG_I("设置连接丢失回调失败: %s\n", MQTTAsync_strerror(rc)); ret = -2; goto error; } LOG_I("MQTTAsync_setConnectionLostCallback success\n"); /* connect option */ conn_opts.onSuccess = mqtt_utils_on_connect_success; conn_opts.onFailure = mqtt_utils_on_connect_failure; conn_opts.context = NULL; /* 不管clean session的值是什么,当终端设备离线时,QoS=0,1,2的消息一律接收不到。 当clean session的值为1,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息一律接收不到。 当clean session的值为0,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息仍然可以接收到。如果同个主题发了多条就接收多条,一条不差,照单全收。 */ conn_opts.cleansession = 1; conn_opts.automaticReconnect = 1; conn_opts.minRetryInterval = 1; // 最小重连间隔1秒 conn_opts.maxRetryInterval = 60; // 最大重连间隔60秒 conn_opts.keepAliveInterval = mqtt_conf->keepalive; conn_opts.username = mqtt_conf->username; conn_opts.password = mqtt_conf->password; conn_opts.MQTTVersion = mqtt_conf->MQTTVersion; if (mqtt_conf->will_topic) /* will options */{ will_opts.message = mqtt_conf->will_payload; will_opts.topicName = mqtt_conf->will_topic; will_opts.qos = mqtt_conf->will_qos; //LOG_I("qos:%d\n",will_opts.qos); will_opts.retained = mqtt_conf->will_retain; conn_opts.will = &will_opts; } if (mqtt_conf->insecure){ ssl_opts.verify = 0; ssl_opts.CApath = mqtt_conf->capath; ssl_opts.keyStore = mqtt_conf->cert; ssl_opts.trustStore = mqtt_conf->cafile; ssl_opts.privateKey = mqtt_conf->key; ssl_opts.privateKeyPassword = mqtt_conf->keypass; ssl_opts.enabledCipherSuites = mqtt_conf->ciphers; ssl_opts.ssl_error_cb = mqtt_utils_on_ssl_error; ssl_opts.ssl_error_context = NULL; conn_opts.ssl = &ssl_opts; } LOG_I("正在连接MQTT服务器...\n"); rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts); LOG_I("MQTTAsync_connect rc = %d\n", rc); if (rc != MQTTASYNC_SUCCESS){ LOG_I("MQTT连接启动失败: %s\n", MQTTAsync_strerror(rc)); ret = -1; goto error; } LOG_I("MQTTAsync_connect 启动成功\n"); mqtt_conf->sub_opts = sub_opts; mqtt_conf->pub_opts = pub_opts; mqtt_conf->pub_opts.onSuccess = mqtt_utils_on_publish_success; mqtt_conf->pub_opts.onFailure = mqtt_utils_on_publish_failure; mqtt_conf->pub_opts.context = NULL; mqtt_conf->sub_opts.onSuccess = mqtt_utils_on_subscribe_success; mqtt_conf->sub_opts.onFailure = mqtt_utils_on_subscribe_failure; mqtt_conf->sub_opts.context = NULL; //sprintf(nativeInvokTopicName,"iot/10025/%s/message/adviceDevice",mqtt_conf->clientid); //sprintf(nativeInvokTopicName,"/iot%s/thing/ota/upgrade",mqtt_conf->username); LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName); // 验证回调函数设置 LOG_I("=== 验证回调函数设置 ===\n"); LOG_I("消息到达回调函数地址: %p\n", mqtt_utils_message_arrived); LOG_I("连接成功回调函数地址: %p\n", mqtt_utils_connected); LOG_I("订阅成功回调函数地址: %p\n", mqtt_utils_on_subscribe_success); LOG_I("订阅失败回调函数地址: %p\n", mqtt_utils_on_subscribe_failure); LOG_I("=== 回调函数验证完成 ===\n"); ret = 0; error: return ret; } int mqtt_utils_uninit(mqtt_utils_t *mqtt_config){ LOG_I("%s\n",__func__); MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; int rc = 0; int ret = -1; disc_opts.onSuccess = mqtt_utils_on_disconnect_success; if ((rc = MQTTAsync_disconnect(mqtt_config->client, &disc_opts)) != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed t o start disconnect, return code: %s\n", MQTTAsync_strerror(rc)); ret = -1; goto error; } MQTTAsync_destroy(&mqtt_config->client); sem_destroy(&mqtt_config->sem_connect); //sem_destroy(&mqtt_config->sem_subscribe); ret = 0; error: return ret; } // 简单的连接状态检查函数(不使用可能卡住的API) void mqtt_utils_check_connection_status(void) { LOG_I("=== MQTT连接状态检查 ===\n"); LOG_I("mqtt_conf指针: %p\n", mqtt_conf); if (mqtt_conf != NULL) { LOG_I("MQTT客户端指针: %p\n", mqtt_conf->client); LOG_I("当前订阅topic: %s\n", nativeInvokTopicName); LOG_I("连接失败次数: %d\n", connect_failure_times); // 发送心跳消息保持连接活跃 static int heartbeat_count = 0; heartbeat_count++; char heartbeat_topic[128]; char heartbeat_payload[256]; snprintf(heartbeat_topic, sizeof(heartbeat_topic), "/iot/%s/heartbeat", mqtt_conf->username); snprintf(heartbeat_payload, sizeof(heartbeat_payload), "{\"heartbeat\":%d,\"timestamp\":\"%ld\"}", heartbeat_count, time(NULL)); LOG_I("发送心跳消息: topic=%s, payload=%s\n", heartbeat_topic, heartbeat_payload); int heartbeat_result = mqtt_utils_publish(mqtt_conf, heartbeat_topic, 0, heartbeat_payload, strlen(heartbeat_payload)); LOG_I("心跳消息发送结果: %d\n", heartbeat_result); } else { LOG_I("mqtt_conf为NULL\n"); } LOG_I("=== 连接状态检查完成 ===\n"); } // 尝试订阅不同的topic格式 void mqtt_utils_try_different_topics(void) { LOG_I("=== 尝试订阅不同的topic格式 ===\n"); if (mqtt_conf == NULL || mqtt_conf->client == NULL) { LOG_I("MQTT客户端未初始化,跳过topic尝试\n"); return; } // 尝试不同的topic格式 char* test_topics[] = { "/iot/WcLightStrip/90A9F73002CD/thing/ota/upgrade", "/iot/WcLightStrip/90A9F73002CD/thing/+/+", "/iot/WcLightStrip/+/thing/ota/upgrade", "/iot/+/90A9F73002CD/thing/ota/upgrade", "/iot/+ /+/thing/+/+", "iot/WcLightStrip/90A9F73002CD/thing/ota/upgrade", "iot/+/+/thing/+/+", NULL }; for (int i = 0; test_topics[i] != NULL; i++) { LOG_I("尝试订阅topic: %s\n", test_topics[i]); int result = mqtt_utils_subscribe(mqtt_conf, test_topics[i], 1); LOG_I("订阅结果: %d\n", result); sleep(1); // 等待一秒再尝试下一个 } LOG_I("=== topic尝试完成 ===\n"); }