AP05/mqtt_utils/mqtt_utils.c
2025-12-24 13:41:09 +08:00

821 lines
30 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#define _GNU_SOURCE
#include "MQTTAsync.h"
#include "MQTTClientPersistence.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#if defined(WIN32)
#include <windows.h>
#define sleep Sleep
#else
#include <unistd.h>
#include <sys/time.h>
#include <unistd.h>
#endif
#if defined(_WRS_KERNEL)
#include <OsWrapper.h>
#endif
#include <sys/time.h>
#include <time.h>
#include <semaphore.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdlib.h>
#include "mqtt_utils.h"
// 全局变量用于存储从topic解析出的deviceName
char g_mqtt_deviceName[256] = {0};
#include "main.h"
#define PRINT_TIME_TAG
#define DBG_TAG "mqtt_utils"
#define DBG_LVL DBG_INFO
#include "debug_print.h"
#define MQTT_ARRY_ARRY_SIZE(x) (sizeof(x)/sizeof(x[0]))
mqtt_utils_t *mqtt_conf;
mqtt_parm_download_t mqtt_parm_download={0};
int connect_failure_times = 0;
char nativeInvokTopicName[1024] = "";
char nativeUpgradeTopicName[1024] = "";
// 多个订阅topic数组
char subscribeTopics[10][1024] = {""};
int subscribeTopicCount = 0;
extern char softwareVersion[16];
extern bool isMqttConnected;
typedef struct{
enum MQTTASYNC_TRACE_LEVELS level;
char name[64];
}trace_level_name_t;
trace_level_name_t trace_level_name_list[] = {
{MQTTASYNC_TRACE_MAXIMUM, "MQTTASYNC_TRACE_MAXIMUM"},
{MQTTASYNC_TRACE_MEDIUM, "MQTTASYNC_TRACE_MEDIUM"},
{MQTTASYNC_TRACE_MINIMUM, "MQTTASYNC_TRACE_MINIMUM"},
{MQTTASYNC_TRACE_PROTOCOL, "MQTTASYNC_TRACE_PROTOCOL"},
{MQTTASYNC_TRACE_ERROR, "MQTTASYNC_TRACE_ERROR"},
{MQTTASYNC_TRACE_SEVERE, "MQTTASYNC_TRACE_SEVERE"},
{MQTTASYNC_TRACE_FATAL, "MQTTASYNC_TRACE_FATAL"},
};
void mqtt_server_events_report(char *sn,char *msg_id,json_object *data,char *productid){
char topic[128] = "";
const char *payload = NULL;
if(data == NULL){
LOG_I("data object is NULL\n");
return;
}
snprintf(topic, sizeof(topic), "/iot/%s/%s/thing/ota/inform",productid,sn);
LOG_I("publish topic:[%s]\n", topic);
payload = json_object_to_json_string(data);
LOG_I("send payload[%d][%s]\n", strlen(payload), payload);
mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload));
// 释放JSON对象
json_object_put(data);
}
void mqtt_server_reply(char *sn,char *msg_id,json_object *functions)
{
char topic[128] = "";
const char *payload = NULL;
json_object *root = NULL;
struct timeval tv;
char time_buffer[16] = "";
gettimeofday(&tv, NULL);
snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec);
//LOG_I("%s\n", __func__);
root = json_object_new_object();
if(root == NULL){
LOG_I("json_object_new_object error\n");
goto json_error;
}
json_object_object_add(root, "deviceId", json_object_new_string(sn));
json_object_object_add(root, "messageId", json_object_new_string(msg_id));
json_object_object_add(root, "timestamp", json_object_new_string(time_buffer));
json_object_object_add(root, "code", json_object_new_int(200));
json_object_object_add(root, "message", json_object_new_string("success"));
json_object_object_add(root, "payload", functions);
snprintf(topic, sizeof(topic), "$iot/v1/device/%s/functions/call/response",sn);
LOG_I("publish topic:[%s]\n", topic);
payload = json_object_to_json_string(root);
LOG_I("payload[%d][%s]\n", strlen(payload), payload);
mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload));
json_error:
json_object_put(root);
}
// 黄灯闪烁相关
static pthread_t pt_yellow_led_flash;
static volatile bool yellow_led_flash_running = false;
static pthread_mutex_t yellow_led_mutex = PTHREAD_MUTEX_INITIALIZER;
void *thread_yellow_led_flash(void *arg);
void *thread_yellow_led_flash(void *arg) {
LOG_I("Yellow LED flash thread started\n");
// 初始化随机种子
srand((unsigned int)time(NULL));
while (yellow_led_flash_running) {
// 生成随机延时 (100ms - 500ms)
int delay_us = 100000 + (rand() % 400000); // 100000-500000 微秒
// 点亮黄灯
system("echo 1 > /sys/class/gpio/gpio114/value");
// 随机延时保持亮
usleep(delay_us);
// 检查是否需要退出
if (!yellow_led_flash_running) break;
// 熄灭黄灯
system("echo 0 > /sys/class/gpio/gpio114/value");
// 随机延时保持灭
usleep(delay_us);
}
// 确保退出时灯是灭的
system("echo 0 > /sys/class/gpio/gpio114/value");
LOG_I("Yellow LED flash thread exited\n");
return NULL;
}
void mqtt_net_failure(int *failure_times){
// 停止黄灯闪烁
pthread_mutex_lock(&yellow_led_mutex);
if (yellow_led_flash_running) {
yellow_led_flash_running = false;
pthread_join(pt_yellow_led_flash, NULL);
LOG_I("Yellow LED flash thread stopped\n");
}
pthread_mutex_unlock(&yellow_led_mutex);
system("echo 0 > /sys/class/gpio/gpio114/value");
if(failure_times != NULL){
(*failure_times)++;
LOG_I("mqtt net failure_times = %d\n", *failure_times);
if(*failure_times >= 5){
system("systemctl restart mt_server");
}
}
}
int printVersionInfo(MQTTAsync_nameValue * info){
int rc = 0;
//LOG_I("MQTT library information:\n");
while (info->name){
LOG_I("%s: %s\n", info->name, info->value);
info++;
rc = 1; /* at least one value printed */
}
if (rc == 1)
//LOG_I("\n");
return rc;
}
void mqtt_utils_trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message){
int i;
int size = 0;
size = MQTT_ARRY_ARRY_SIZE(trace_level_name_list);
for (i=0;i<size;i++) {
if (trace_level_name_list[i].level == level) {
//LOG_I("%s, %s\n", trace_level_name_list[i].name, message);
}
}
}
void mqtt_utils_connection_lost(void *context, char *cause){
//if (cause != NULL) {
// LOG_I("%s cause:%s\n",__func__, cause);
// } else {
// LOG_I("%s\n",__func__);
// }
//mqtt_led_net_status_judge();
mqtt_net_failure(&connect_failure_times);
}
static int mqtt_utils_topic_match(const char *filter, const char *topic)
{
size_t fi = 0;
size_t ti = 0;
if (filter == NULL || topic == NULL) {
return 0;
}
while (1) {
size_t fseg_len = 0;
size_t tseg_len = 0;
while (filter[fi + fseg_len] != '\0' && filter[fi + fseg_len] != '/') {
fseg_len++;
}
while (topic[ti + tseg_len] != '\0' && topic[ti + tseg_len] != '/') {
tseg_len++;
}
if (fseg_len == 1 && filter[fi] == '#') {
return 1;
}
if (fseg_len == 1 && filter[fi] == '+') {
} else {
if (fseg_len != tseg_len) {
return 0;
}
if (strncmp(filter + fi, topic + ti, fseg_len) != 0) {
return 0;
}
}
fi += fseg_len;
ti += tseg_len;
if (filter[fi] == '\0' && topic[ti] == '\0') {
return 1;
}
if (filter[fi] == '\0' || topic[ti] == '\0') {
return 0;
}
if (filter[fi] != '/' || topic[ti] != '/') {
return 0;
}
fi++;
ti++;
}
}
static int mqtt_utils_parse_sys_lightOperate_invoke_topic(const char *topic, char *out1, size_t out1_len, char *out2,
size_t out2_len)
{
char tmp1[256] = {0};
char tmp2[256] = {0};
if (topic == NULL || out1 == NULL || out2 == NULL || out1_len == 0 || out2_len == 0) {
return -1;
}
if (sscanf(topic, "/sys/%255[^/]/%255[^/]/thing/service/lightOperate/invoke", tmp1, tmp2) != 2) {
return -1;
}
snprintf(out1, out1_len, "%s", tmp1);
snprintf(out2, out2_len, "%s", tmp2);
return 0;
}
int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message){
// LOG_I("---------------%s------------------\n",__func__);
// LOG_I("收到消息topic: %s\n", topicName ? topicName : "(null)");
// 检查消息是否来自任何一个订阅的topic
int topic_matched = 0;
for(int i = 0; i < subscribeTopicCount; i++) {
int matched = 0;
if (strchr(subscribeTopics[i], '+') != NULL || strchr(subscribeTopics[i], '#') != NULL) {
matched = mqtt_utils_topic_match(subscribeTopics[i], topicName);
} else {
matched = (strcmp(topicName, subscribeTopics[i]) == 0);
}
if (matched) {
if (strcmp(subscribeTopics[i], "/sys/+/+/thing/service/lightOperate/invoke") == 0) {
char field1[256] = {0};
char field2[256] = {0};
if (mqtt_utils_parse_sys_lightOperate_invoke_topic(topicName, field1, sizeof(field1), field2,
sizeof(field2)) == 0) {
LOG_I("lightOperate/invoke 匹配字段: productKey=%s, deviceName=%s\n", field1, field2);
// 将deviceName存储到全局变量中
memset(g_mqtt_deviceName, 0, sizeof(g_mqtt_deviceName));
strncpy(g_mqtt_deviceName, field2, sizeof(g_mqtt_deviceName) - 1);
} else {
LOG_I("lightOperate/invoke topic字段解析失败: %s\n", topicName);
}
}
// LOG_I("消息匹配topic[%d]: %s\n", i, subscribeTopics[i]);
if (message != NULL && message->payload != NULL && message->payloadlen > 0) {
char payload_buf[1024] = {0};
size_t copy_len = (size_t)message->payloadlen;
if (copy_len >= sizeof(payload_buf)) {
copy_len = sizeof(payload_buf) - 1;
}
memcpy(payload_buf, message->payload, copy_len);
payload_buf[copy_len] = '\0';
PutDataIntoMQueue(payload_buf);
} else {
// LOG_I("消息payload为空跳过入队\n");
}
topic_matched = 1;
break;
}
}
if(!topic_matched) {
// LOG_I("消息topic不匹配任何订阅的topic: %s\n", topicName);
}
MQTTAsync_freeMessage(&message);
if (topicName != NULL) {
MQTTAsync_free(topicName);
}
return 1;
}
void mqtt_utils_connected(void *context, char *cause){
// LOG_I("=== mqtt_utils_connected 函数被调用 ===\n");
if (cause != NULL) {
// LOG_I("%s cause:%s\n",__func__, cause);
} else {
// LOG_I("%s\n",__func__);
}
connect_failure_times=0;
// LOG_I(" MQTT连接成功开始订阅%d个topic\n", subscribeTopicCount);
// 订阅所有设置的topic
for(int i = 0; i < subscribeTopicCount; i++) {
if (strlen(subscribeTopics[i]) == 0) {
LOG_I("错误订阅topic[%d]为空!\n", i);
continue;
}
LOG_I("订阅topic[%d]: %s\n", i, subscribeTopics[i]);
int subscribe_result = mqtt_utils_subscribe(mqtt_conf, subscribeTopics[i], 1);
// LOG_I("订阅topic[%d]请求结果: %d\n", i, subscribe_result);
}
// 添加通配符订阅来接收所有消息(用于调试)
// LOG_I("=== 添加通配符订阅用于调试 ===\n");
int wildcard_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "#", 1);
// LOG_I("通配符订阅请求结果: %d\n", wildcard_subscribe_result);
// LOG_I("=== 通配符订阅完成 ===\n");
// 订阅公共topic用于测试
// LOG_I("=== 订阅公共topic用于测试 ===\n");
int public_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "/test/public/topic", 0);
// LOG_I("公共topic订阅请求结果: %d\n", public_subscribe_result);
// LOG_I("=== 公共topic订阅完成 ===\n");
// 启动黄灯闪烁线程
pthread_mutex_lock(&yellow_led_mutex);
if (!yellow_led_flash_running) {
yellow_led_flash_running = true;
int ret = pthread_create(&pt_yellow_led_flash, NULL, thread_yellow_led_flash, NULL);
if (ret != 0) {
LOG_I("Failed to create yellow LED flash thread, using constant on\n");
yellow_led_flash_running = false;
system("echo 1 > /sys/class/gpio/gpio114/value");//yellow ok
} else {
pthread_detach(pt_yellow_led_flash);
LOG_I("Yellow LED flash thread started\n");
}
}
pthread_mutex_unlock(&yellow_led_mutex);
station_status_report();
// 设置MQTT连接标志
isMqttConnected = true;
// LOG_I("MQTT connected, isMqttConnected = true\n");
// 补报所有已在线的灯条登录信息
typedef struct {
uint32_t tagCode;
time_t lastHeartbeat;
bool isOnline;
} lightbar_heartbeat_t;
extern int lightbarHeartbeatCount;
extern lightbar_heartbeat_t lightbarHeartbeat[];
extern pthread_mutex_t heartbeatMutex;
extern void report_lightbar_login(uint32_t tagCode);
pthread_mutex_lock(&heartbeatMutex);
LOG_I("Reporting login for %d online lightbars again\n", lightbarHeartbeatCount);
for (int i = 0; i < lightbarHeartbeatCount; i++) {
if (lightbarHeartbeat[i].isOnline) {
LOG_I("Report again lightbar %08X login\n", lightbarHeartbeat[i].tagCode);
report_lightbar_login(lightbarHeartbeat[i].tagCode);
}
}
pthread_mutex_unlock(&heartbeatMutex);
// 启动灭灯检测线程(只启动一次)
static bool light_off_thread_started = false;
if (!light_off_thread_started) {
extern pthread_t pt_light_off_check;
extern void *thread_light_off_check(void *arg);
int ret = pthread_create(&pt_light_off_check, NULL, thread_light_off_check, NULL);
if(ret != 0) {
LOG_I("pthread_create light_off_check fail\n");
} else {
LOG_I("pthread_create light_off_check success\n");
pthread_detach(pt_light_off_check);
light_off_thread_started = true;
}
}
// 启动心跳检测线程(只启动一次)
static bool heartbeat_thread_started = false;
if (!heartbeat_thread_started) {
extern pthread_t pt_heartbeat_check;
extern void *thread_heartbeat_check(void *arg);
int ret = pthread_create(&pt_heartbeat_check, NULL, thread_heartbeat_check, NULL);
if(ret != 0){
LOG_I("pthread_create heartbeat_check fail\n");
} else {
LOG_I("pthread_create heartbeat_check success\n");
pthread_detach(pt_heartbeat_check);
heartbeat_thread_started = true;
}
}
// LOG_I("=== mqtt_utils_connected 函数执行完成 ===\n");
}
void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){
//LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc));
//mqtt_led_net_status_judge();
// 设置MQTT连接标志为false
isMqttConnected = false;
// LOG_I("MQTT disconnected, set isMqttConnected = false\n");
}
void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){
//LOG_I("%s\n",__func__);
connect_failure_times = 0;
}
void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){
//LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none");
// mqtt_led_net_status_judge();
// LOG_I("MQTT连接失败 - 错误代码: %s\n", response ? MQTTAsync_strerror(response->code) : "未知错误");
if (response && response->message) {
// LOG_I("MQTT连接失败 - 错误信息: %s\n", response->message);
}
mqtt_net_failure(&connect_failure_times);
}
static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){
MQTTAsync client = (MQTTAsync)context;
// LOG_I("%s ssl error: %s\n",__func__, str);
return 0;
}
void mqtt_utils_on_publish_success(void *context, MQTTAsync_successData *response){
//LOG_I("%s\n",__func__);
}
void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){
// LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
// 如果是连接相关的问题,记录详细信息
if (response && response->code == MQTTASYNC_OPERATION_INCOMPLETE) {
// LOG_I("发布失败:操作在完成前被丢弃,可能是连接状态不稳定\n");
} else if (response && response->code == MQTTASYNC_DISCONNECTED) {
// LOG_I("发布失败:客户端已断开连接\n");
}
}
void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){
// LOG_I("=== MQTT订阅成功回调被调用 ===\n");
// LOG_I("MQTT订阅成功\n");
}
void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){
// LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
}
void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){
// LOG_I("%s\n",__func__);
//mqtt_led_net_status_judge();
}
int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){
int rc = 0;
// LOG_I("开始订阅topic: %s, qos: %d\n", topic, qos);
// LOG_I("MQTT客户端指针: %p\n", mqtt_utils->client);
// LOG_I("订阅选项指针: %p\n", &mqtt_utils->sub_opts);
// 检查参数有效性
if (topic == NULL || strlen(topic) == 0) {
// LOG_I("错误topic参数无效\n");
return -1;
}
if (mqtt_utils->client == NULL) {
// LOG_I("错误MQTT客户端未初始化\n");
return -1;
}
// 跳过连接状态检查,直接进行订阅
// LOG_I("跳过连接状态检查,直接进行订阅\n");
// LOG_I("准备调用 MQTTAsync_subscribe...\n");
rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils->sub_opts);
// LOG_I("MQTTAsync_subscribe 调用完成,返回值: %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
// LOG_I("订阅失败,错误代码: %s\n", MQTTAsync_strerror(rc));
} else {
// LOG_I("订阅请求已发送,等待回调\n");
}
return rc;
}
int mqtt_utils_publish(mqtt_utils_t *mqtt_utils, char *topic, int qos, const char *data, int datalen){
int rc;
int isConnected = 0;
//LOG_I("%s data of length %d\n",__func__, datalen);
rc = MQTTAsync_send(mqtt_utils->client, topic, datalen, data, qos, mqtt_utils->retain, &mqtt_utils->pub_opts);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to start publish, return code %s\n", MQTTAsync_strerror(rc));
}
return rc;
}
int mqtt_utils_init(mqtt_utils_t *mqtt_config)
{
mqtt_conf=mqtt_config;
MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
MQTTAsync_responseOptions sub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_subscribe
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_send
MQTTAsync_nameValue *infos = NULL;
char url[256] = "";
int ret = -1;
int rc = 0;
// LOG_I("=== 开始MQTT连接初始化 ===\n");
// LOG_I("ClientId: %s\n", mqtt_conf->clientid);
// LOG_I("Username: %s\n", mqtt_conf->username);
// LOG_I("Password: %s\n", mqtt_conf->password);
// LOG_I("Host: %s\n", mqtt_conf->host);
// LOG_I("Port: %d\n", mqtt_conf->port);
// 提前设置topic名称确保在连接回调中可用
sprintf(nativeInvokTopicName,"/iot%s/thing/ota/upgrade",mqtt_conf->username);
// LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// 设置多个订阅topic
subscribeTopicCount = 0;
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/ota", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/task", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/bind", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/group", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot%s/thing/ota/upgrade", mqtt_conf->username);
// 只使用通配符订阅覆盖所有设备的lightOperate消息
sprintf(subscribeTopics[subscribeTopicCount++], "/sys/+/+/thing/service/lightOperate/invoke");
// LOG_I("设置了%d个订阅topic:\n", subscribeTopicCount);
// for(int i = 0; i < subscribeTopicCount; i++) {
// LOG_I("Topic[%d]: %s\n", i, subscribeTopics[i]);
// }
infos = MQTTAsync_getVersionInfo();
printVersionInfo(infos);
MQTTAsync_setTraceCallback(mqtt_utils_trace_callback);
MQTTAsync_setTraceLevel(mqtt_conf->tracelevel);
//snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port);
snprintf(url, sizeof(url), "tcp://%s:%d", mqtt_conf->host, mqtt_conf->port);
// LOG_I("MQTT URL: %s\n", url);
create_opts.sendWhileDisconnected = 0;
rc = MQTTAsync_createWithOptions(&mqtt_conf->client,
url,
mqtt_conf->clientid,
MQTTCLIENT_PERSISTENCE_NONE,
NULL,
&create_opts);
// LOG_I("MQTTAsync_create rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("MQTT客户端创建失败: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
// LOG_I("MQTTAsync_create success\n");
// 设置所有必要的回调函数
// LOG_I("设置MQTT回调函数\n");
rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected);
// LOG_I("MQTTAsync_setConnected rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
// LOG_I("设置连接回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
// LOG_I("MQTTAsync_setConnected success\n");
rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected);
// LOG_I("MQTTAsync_setDisconnected rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
// LOG_I("设置断开回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
// LOG_I("MQTTAsync_setDisconnected success\n");
// 设置消息到达回调
rc = MQTTAsync_setMessageArrivedCallback(mqtt_conf->client, NULL, mqtt_utils_message_arrived);
// LOG_I("MQTTAsync_setMessageArrivedCallback rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
// LOG_I("设置消息到达回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
// LOG_I("MQTTAsync_setMessageArrivedCallback success\n");
// 设置连接丢失回调
rc = MQTTAsync_setConnectionLostCallback(mqtt_conf->client, NULL, mqtt_utils_connection_lost);
// LOG_I("MQTTAsync_setConnectionLostCallback rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
// LOG_I("设置连接丢失回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
// LOG_I("MQTTAsync_setConnectionLostCallback success\n");
/* connect option */
conn_opts.onSuccess = mqtt_utils_on_connect_success;
conn_opts.onFailure = mqtt_utils_on_connect_failure;
conn_opts.context = NULL;
/*
不管clean session的值是什么当终端设备离线时QoS=0,1,2的消息一律接收不到。
当clean session的值为1当终端设备离线再上线时离线期间发来QoS=0,1,2的消息一律接收不到。
当clean session的值为0当终端设备离线再上线时离线期间发来QoS=0,1,2的消息仍然可以接收到。如果同个主题发了多条就接收多条一条不差照单全收。
*/
conn_opts.cleansession = 1;
conn_opts.automaticReconnect = 1;
conn_opts.minRetryInterval = 1; // 最小重连间隔1秒
conn_opts.maxRetryInterval = 60; // 最大重连间隔60秒
conn_opts.keepAliveInterval = mqtt_conf->keepalive;
conn_opts.username = mqtt_conf->username;
conn_opts.password = mqtt_conf->password;
conn_opts.MQTTVersion = mqtt_conf->MQTTVersion;
if (mqtt_conf->will_topic) /* will options */{
will_opts.message = mqtt_conf->will_payload;
will_opts.topicName = mqtt_conf->will_topic;
will_opts.qos = mqtt_conf->will_qos;
//LOG_I("qos:%d\n",will_opts.qos);
will_opts.retained = mqtt_conf->will_retain;
conn_opts.will = &will_opts;
}
if (mqtt_conf->insecure){
ssl_opts.verify = 0;
ssl_opts.CApath = mqtt_conf->capath;
ssl_opts.keyStore = mqtt_conf->cert;
ssl_opts.trustStore = mqtt_conf->cafile;
ssl_opts.privateKey = mqtt_conf->key;
ssl_opts.privateKeyPassword = mqtt_conf->keypass;
ssl_opts.enabledCipherSuites = mqtt_conf->ciphers;
ssl_opts.ssl_error_cb = mqtt_utils_on_ssl_error;
ssl_opts.ssl_error_context = NULL;
conn_opts.ssl = &ssl_opts;
}
// LOG_I("正在连接MQTT服务器...\n");
rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts);
// LOG_I("MQTTAsync_connect rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("MQTT连接启动失败: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
// LOG_I("MQTTAsync_connect 启动成功\n");
mqtt_conf->sub_opts = sub_opts;
mqtt_conf->pub_opts = pub_opts;
mqtt_conf->pub_opts.onSuccess = mqtt_utils_on_publish_success;
mqtt_conf->pub_opts.onFailure = mqtt_utils_on_publish_failure;
mqtt_conf->pub_opts.context = NULL;
mqtt_conf->sub_opts.onSuccess = mqtt_utils_on_subscribe_success;
mqtt_conf->sub_opts.onFailure = mqtt_utils_on_subscribe_failure;
mqtt_conf->sub_opts.context = NULL;
//sprintf(nativeInvokTopicName,"iot/10025/%s/message/adviceDevice",mqtt_conf->clientid);
//sprintf(nativeInvokTopicName,"/iot%s/thing/ota/upgrade",mqtt_conf->username);
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// 验证回调函数设置
// LOG_I("=== 验证回调函数设置 ===\n");
// LOG_I("消息到达回调函数地址: %p\n", mqtt_utils_message_arrived);
// LOG_I("连接成功回调函数地址: %p\n", mqtt_utils_connected);
// LOG_I("订阅成功回调函数地址: %p\n", mqtt_utils_on_subscribe_success);
// LOG_I("订阅失败回调函数地址: %p\n", mqtt_utils_on_subscribe_failure);
// LOG_I("=== 回调函数验证完成 ===\n");
ret = 0;
error:
return ret;
}
int mqtt_utils_uninit(mqtt_utils_t *mqtt_config){
LOG_I("%s\n",__func__);
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc = 0;
int ret = -1;
disc_opts.onSuccess = mqtt_utils_on_disconnect_success;
if ((rc = MQTTAsync_disconnect(mqtt_config->client, &disc_opts)) != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed t o start disconnect, return code: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
MQTTAsync_destroy(&mqtt_config->client);
sem_destroy(&mqtt_config->sem_connect);
//sem_destroy(&mqtt_config->sem_subscribe);
ret = 0;
error:
return ret;
}
// 简单的连接状态检查函数不使用可能卡住的API
void mqtt_utils_check_connection_status(void) {
LOG_I("=== MQTT连接状态检查 ===\n");
LOG_I("mqtt_conf指针: %p\n", mqtt_conf);
if (mqtt_conf != NULL) {
LOG_I("MQTT客户端指针: %p\n", mqtt_conf->client);
LOG_I("当前订阅topic: %s\n", nativeInvokTopicName);
LOG_I("连接失败次数: %d\n", connect_failure_times);
// 发送心跳消息保持连接活跃
static int heartbeat_count = 0;
heartbeat_count++;
char heartbeat_topic[128];
char heartbeat_payload[256];
snprintf(heartbeat_topic, sizeof(heartbeat_topic), "/iot/%s/heartbeat", mqtt_conf->username);
snprintf(heartbeat_payload, sizeof(heartbeat_payload), "{\"heartbeat\":%d,\"timestamp\":\"%ld\"}",
heartbeat_count, time(NULL));
LOG_I("发送心跳消息: topic=%s, payload=%s\n", heartbeat_topic, heartbeat_payload);
int heartbeat_result = mqtt_utils_publish(mqtt_conf, heartbeat_topic, 0, heartbeat_payload, strlen(heartbeat_payload));
LOG_I("心跳消息发送结果: %d\n", heartbeat_result);
} else {
LOG_I("mqtt_conf为NULL\n");
}
LOG_I("=== 连接状态检查完成 ===\n");
}
// 尝试订阅不同的topic格式
void mqtt_utils_try_different_topics(void) {
LOG_I("=== 尝试订阅不同的topic格式 ===\n");
if (mqtt_conf == NULL || mqtt_conf->client == NULL) {
LOG_I("MQTT客户端未初始化跳过topic尝试\n");
return;
}
// 尝试不同的topic格式
char* test_topics[] = {
"/iot/WcLightStrip/90A9F73002CD/thing/ota/upgrade",
"/iot/WcLightStrip/90A9F73002CD/thing/+/+",
"/iot/WcLightStrip/+/thing/ota/upgrade",
"/iot/+/90A9F73002CD/thing/ota/upgrade",
"/iot/+ /+/thing/+/+",
"iot/WcLightStrip/90A9F73002CD/thing/ota/upgrade",
"iot/+/+/thing/+/+",
NULL
};
for (int i = 0; test_topics[i] != NULL; i++) {
LOG_I("尝试订阅topic: %s\n", test_topics[i]);
int result = mqtt_utils_subscribe(mqtt_conf, test_topics[i], 1);
LOG_I("订阅结果: %d\n", result);
sleep(1); // 等待一秒再尝试下一个
}
LOG_I("=== topic尝试完成 ===\n");
}