AP05/mqtt_utils/mqtt_utils.c
2025-12-05 18:35:10 +08:00

585 lines
21 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#define _GNU_SOURCE
#include "MQTTAsync.h"
#include "MQTTClientPersistence.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#if defined(WIN32)
#include <windows.h>
#define sleep Sleep
#else
#include <unistd.h>
#include <sys/time.h>
#include <unistd.h>
#endif
#if defined(_WRS_KERNEL)
#include <OsWrapper.h>
#endif
#include <sys/time.h>
#include <time.h>
#include <semaphore.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include "mqtt_utils.h"
#include "main.h"
#define PRINT_TIME_TAG
#define DBG_TAG "mqtt_utils"
#define DBG_LVL DBG_INFO
#include "debug_print.h"
#define MQTT_ARRY_ARRY_SIZE(x) (sizeof(x)/sizeof(x[0]))
mqtt_utils_t *mqtt_conf;
mqtt_parm_download_t mqtt_parm_download={0};
int connect_failure_times = 0;
char nativeInvokTopicName[1024] = "";
char nativeUpgradeTopicName[1024] = "";
// 多个订阅topic数组
char subscribeTopics[10][1024] = {""};
int subscribeTopicCount = 0;
extern char softwareVersion[16];
typedef struct{
enum MQTTASYNC_TRACE_LEVELS level;
char name[64];
}trace_level_name_t;
trace_level_name_t trace_level_name_list[] = {
{MQTTASYNC_TRACE_MAXIMUM, "MQTTASYNC_TRACE_MAXIMUM"},
{MQTTASYNC_TRACE_MEDIUM, "MQTTASYNC_TRACE_MEDIUM"},
{MQTTASYNC_TRACE_MINIMUM, "MQTTASYNC_TRACE_MINIMUM"},
{MQTTASYNC_TRACE_PROTOCOL, "MQTTASYNC_TRACE_PROTOCOL"},
{MQTTASYNC_TRACE_ERROR, "MQTTASYNC_TRACE_ERROR"},
{MQTTASYNC_TRACE_SEVERE, "MQTTASYNC_TRACE_SEVERE"},
{MQTTASYNC_TRACE_FATAL, "MQTTASYNC_TRACE_FATAL"},
};
void mqtt_server_events_report(char *sn,char *msg_id,json_object *data,char *productid){
char topic[128] = "";
const char *payload = NULL;
if(data == NULL){
LOG_I("data object is NULL\n");
return;
}
snprintf(topic, sizeof(topic), "/iot/%s/%s/thing/ota/inform",productid,sn);
LOG_I("publish topic:[%s]\n", topic);
payload = json_object_to_json_string(data);
LOG_I("send payload[%d][%s]\n", strlen(payload), payload);
mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload));
// 释放JSON对象
json_object_put(data);
}
void mqtt_server_reply(char *sn,char *msg_id,json_object *functions)
{
char topic[128] = "";
const char *payload = NULL;
json_object *root = NULL;
struct timeval tv;
char time_buffer[16] = "";
gettimeofday(&tv, NULL);
snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec);
//LOG_I("%s\n", __func__);
root = json_object_new_object();
if(root == NULL){
LOG_I("json_object_new_object error\n");
goto json_error;
}
json_object_object_add(root, "deviceId", json_object_new_string(sn));
json_object_object_add(root, "messageId", json_object_new_string(msg_id));
json_object_object_add(root, "timestamp", json_object_new_string(time_buffer));
json_object_object_add(root, "code", json_object_new_int(200));
json_object_object_add(root, "message", json_object_new_string("success"));
json_object_object_add(root, "payload", functions);
snprintf(topic, sizeof(topic), "$iot/v1/device/%s/functions/call/response",sn);
LOG_I("publish topic:[%s]\n", topic);
payload = json_object_to_json_string(root);
LOG_I("payload[%d][%s]\n", strlen(payload), payload);
mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload));
json_error:
json_object_put(root);
}
void mqtt_net_failure(int *failure_times){
system("echo 0 > /sys/class/gpio/gpio113/value");
if(failure_times != NULL){
(*failure_times)++;
LOG_I("mqtt net failure_times = %d\n", *failure_times);
if(*failure_times >= 5){
system("systemctl restart mt_server");
}
}
}
int printVersionInfo(MQTTAsync_nameValue * info){
int rc = 0;
//LOG_I("MQTT library information:\n");
while (info->name){
LOG_I("%s: %s\n", info->name, info->value);
info++;
rc = 1; /* at least one value printed */
}
if (rc == 1)
//LOG_I("\n");
return rc;
}
void mqtt_utils_trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message){
int i;
int size = 0;
size = MQTT_ARRY_ARRY_SIZE(trace_level_name_list);
for (i=0;i<size;i++) {
if (trace_level_name_list[i].level == level) {
//LOG_I("%s, %s\n", trace_level_name_list[i].name, message);
}
}
}
void mqtt_utils_connection_lost(void *context, char *cause){
//if (cause != NULL) {
// LOG_I("%s cause:%s\n",__func__, cause);
// } else {
// LOG_I("%s\n",__func__);
// }
//mqtt_led_net_status_judge();
mqtt_net_failure(&connect_failure_times);
}
int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message){
LOG_I("---------------%s------------------\n",__func__);
LOG_I("收到消息topic: %s\n", topicName);
// 检查消息是否来自任何一个订阅的topic
int topic_matched = 0;
for(int i = 0; i < subscribeTopicCount; i++) {
if(strcmp(topicName, subscribeTopics[i]) == 0) {
LOG_I("消息匹配topic[%d]: %s\n", i, subscribeTopics[i]);
PutDataIntoMQueue(message->payload);
topic_matched = 1;
break;
}
}
if(!topic_matched) {
LOG_I("消息topic不匹配任何订阅的topic: %s\n", topicName);
}
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void mqtt_utils_connected(void *context, char *cause){
LOG_I("=== mqtt_utils_connected 函数被调用 ===\n");
if (cause != NULL) {
LOG_I("%s cause:%s\n",__func__, cause);
} else {
LOG_I("%s\n",__func__);
}
connect_failure_times=0;
LOG_I(" MQTT连接成功开始订阅%d个topic\n", subscribeTopicCount);
// 订阅所有设置的topic
for(int i = 0; i < subscribeTopicCount; i++) {
if (strlen(subscribeTopics[i]) == 0) {
LOG_I("错误订阅topic[%d]为空!\n", i);
continue;
}
LOG_I("订阅topic[%d]: %s\n", i, subscribeTopics[i]);
int subscribe_result = mqtt_utils_subscribe(mqtt_conf, subscribeTopics[i], 1);
LOG_I("订阅topic[%d]请求结果: %d\n", i, subscribe_result);
}
// 添加通配 符订阅来接收所有消息(用于调试)
LOG_I("=== 添加通配符订阅用于调试 ===\n");
int wildcard_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "#", 1);
LOG_I("通配符订阅请求结果: %d\n", wildcard_subscribe_result);
LOG_I("=== 通配符订阅完成 ===\n");
// 订阅公共 topic用于测试
LOG_I("=== 订阅公共topic用于测试 ===\n");
int public_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "/test/public/topic", 0);
LOG_I("公共topic订阅请求结果: %d\n", public_subscribe_result);
LOG_I("=== 公共topic订阅完成 ===\n");
system( "echo 1 > /sys/class/gpio/gpio113/value");//yellow ok
station_status_report();
LOG_I("=== mqtt_utils_connected 函数执行完成 ===\n");
}
void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){
//LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc));
//mqtt_led_net_status_judge();
}
void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){
//LOG_I("%s\n",__func__);
connect_failure_times = 0;
}
void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){
//LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none");
// mqtt_led_net_status_judge();
LOG_I("MQTT连接失败 - 错误代码: %s\n", response ? MQTTAsync_strerror(response->code) : "未知错误");
if (response && response->message) {
LOG_I("MQTT连接失败 - 错误信息: %s\n", response->message);
}
mqtt_net_failure(&connect_failure_times);
}
static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){
MQTTAsync client = (MQTTAsync)context;
LOG_I("%s ssl error: %s\n",__func__, str);
return 0;
}
void mqtt_utils_on_publish_success(void *context, MQTTAsync_successData *response){
//LOG_I("%s\n",__func__);
}
void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
// 如果是连 接相关的问题,记录详细信息
if (response && response->code == MQTTASYNC_OPERATION_INCOMPLETE) {
LOG_I("发布失败:操作在完成前被丢弃,可能是连接状态不稳定\n");
} else if (response && response->code == MQTTASYNC_DISCONNECTED) {
LOG_I("发布失败:客户端已断开连接\n");
}
}
void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){
LOG_I("=== MQTT订阅成功回调被调用 ===\n");
LOG_I("MQTT订阅成功\n");
}
void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
}
void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){
LOG_I("%s\n",__func__);
//mqtt_led_net_status_judge();
}
int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){
int rc = 0;
LOG_I("开始订阅topic: %s, qos: %d\n", topic, qos);
LOG_I("MQTT客户端指针: %p\n", mqtt_utils->client);
LOG_I("订阅选项指针: %p\n", &mqtt_utils->sub_opts);
// 检查参数有效性
if (topic == NULL || strlen(topic) == 0) {
LOG_I("错误topic参数无效\n");
return -1;
}
if (mqtt_utils->client == NULL) {
LOG_I("错误MQTT客户端未初始化\n");
return -1;
}
// 跳过连接状态检查,直接进行 订阅
LOG_I("跳过连接状态检查,直接进行订阅\n");
LOG_I("准备调用 MQTTAsync_subscribe...\n");
rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils ->sub_opts);
LOG_I("MQTTAsync_subscribe 调用完成,返回值: %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("订阅失败,错误代码: %s\n ", MQTTAsync_strerror(rc));
} else {
LOG_I("订阅请求已发送,等待回调\n");
}
return rc;
}
int mqtt_utils_publish(mqtt_utils_t *mqtt_utils, char *topic, int qos, const char *data, int datalen){
int rc;
int isConnected = 0;
//LOG_I("%s data of length %d\n",__func__, datalen);
rc = MQTTAsync_send(mqtt_utils->client, topic, datalen, data, qos, mqtt_utils->retain, &mqtt_utils->pub_opts);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to start publish, return code %s\n", MQTTAsync_strerror(rc));
}
return rc;
}
int mqtt_utils_init(mqtt_utils_t *mqtt_config)
{
mqtt_conf=mqtt_config;
MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
MQTTAsync_responseOptions sub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_subscribe
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_send
MQTTAsync_nameValue *infos = NULL;
char url[256] = "";
int ret = -1;
int rc = 0;
LOG_I("=== 开始MQTT连接初始化 ===\n");
LOG_I("ClientId: %s\n", mqtt_conf->clientid);
LOG_I("Username: %s\n", mqtt_conf->username);
LOG_I("Password: %s\n", mqtt_conf->password);
LOG_I("Host: %s\n", mqtt_conf->host);
LOG_I("Port: %d\n", mqtt_conf->port);
// 提前设置topic名称确保在连接回调中可用
sprintf(nativeInvokTopicName,"/iot%s/thing/ota/upgrade",mqtt_conf->username);
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// 设置多个订阅topic
subscribeTopicCount = 0;
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/ota", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/task", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/bind", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/group", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot%s/thing/ota/upgrade", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/sys/WcSubLightStrip/AD1000014C11/thing/service/lightOperate/invoke");
LOG_I("设置了%d个订阅topic:\n", subscribeTopicCount);
for(int i = 0; i < subscribeTopicCount; i++) {
LOG_I("Topic[%d]: %s\n", i, subscribeTopics[i]);
}
infos = MQTTAsync_getVersionInfo();
printVersionInfo(infos);
MQTTAsync_setTraceCallback(mqtt_utils_trace_callback);
MQTTAsync_setTraceLevel(mqtt_conf->tracelevel);
//snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port);
snprintf(url, sizeof(url), "tcp://%s:%d", mqtt_conf->host, mqtt_conf->port);
LOG_I("MQTT URL: %s\n", url);
create_opts.sendWhileDisconnected = 0;
rc = MQTTAsync_createWithOptions(&mqtt_conf->client,
url,
mqtt_conf->clientid,
MQTTCLIENT_PERSISTENCE_NONE,
NULL,
&create_opts);
LOG_I("MQTTAsync_create rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("MQTT客户端创建失败: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
LOG_I("MQTTAsync_create succes s\n");
// 设置所有必要的回调函数
LOG_I("设置MQTT回调函数\n");
rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected);
LOG_I("MQTTAsync_setConnected rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置连接回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setConnected success\n");
rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected);
LOG_I("MQTTAsy nc_setDisconnected rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置断开回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setDisconnected success\n");
// 设置消息到达回调
rc = MQTTAsync_setMessageArrivedCallback(mqtt_conf->client, NULL, mqtt_utils_message_arrived);
LOG_I("MQTTAsync_setMessageArrivedCallback rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置消息到达回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setMessageArrivedCallback success\n");
// 设置连接丢失回调
rc = MQTTAsync_setConnectionLostCallback(mqtt_conf->client, NULL, mqtt_utils_connection_lost);
LOG_I("MQTTAsync_setConnectionLostCallback rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置连接丢失回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setConnectionLostCallback success\n");
/* connect option */
conn_opts.onSuccess = mqtt_utils_on_connect_success;
conn_opts.onFailure = mqtt_utils_on_connect_failure;
conn_opts.context = NULL;
/*
不管clean session的值是什么当终端设备离线时QoS=0,1,2的消息一律接收不到。
当clean session的值为1当终端设备离线再上线时离线期间发来QoS=0,1,2的消息一律接收不到。
当clean session的值为0当终端设备离线再上线时离线期间发来QoS=0,1,2的消息仍然可以接收到。如果同个主题发了多条就接收多条一条不差照单全收。
*/
conn_opts.cleansession = 1;
conn_opts.automaticReconnect = 1;
conn_opts.minRetryInterval = 1; // 最小重连间隔1秒
conn_opts.maxRetryInterval = 60; // 最大重连间隔60秒
conn_opts.keepAliveInterval = mqtt_conf->keepalive;
conn_opts.username = mqtt_conf->username;
conn_opts.password = mqtt_conf->password;
conn_opts.MQTTVersion = mqtt_conf->MQTTVersion;
if (mqtt_conf->will_topic) /* will options */{
will_opts.message = mqtt_conf->will_payload;
will_opts.topicName = mqtt_conf->will_topic;
will_opts.qos = mqtt_conf->will_qos;
//LOG_I("qos:%d\n",will_opts.qos);
will_opts.retained = mqtt_conf->will_retain;
conn_opts.will = &will_opts;
}
if (mqtt_conf->insecure){
ssl_opts.verify = 0;
ssl_opts.CApath = mqtt_conf->capath;
ssl_opts.keyStore = mqtt_conf->cert;
ssl_opts.trustStore = mqtt_conf->cafile;
ssl_opts.privateKey = mqtt_conf->key;
ssl_opts.privateKeyPassword = mqtt_conf->keypass;
ssl_opts.enabledCipherSuites = mqtt_conf->ciphers;
ssl_opts.ssl_error_cb = mqtt_utils_on_ssl_error;
ssl_opts.ssl_error_context = NULL;
conn_opts.ssl = &ssl_opts;
}
LOG_I("正在连接MQTT服务器...\n");
rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts);
LOG_I("MQTTAsync_connect rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("MQTT连接启动失败: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
LOG_I("MQTTAsync_connect 启动成功\n");
mqtt_conf->sub_opts = sub_opts;
mqtt_conf->pub_opts = pub_opts;
mqtt_conf->pub_opts.onSuccess = mqtt_utils_on_publish_success;
mqtt_conf->pub_opts.onFailure = mqtt_utils_on_publish_failure;
mqtt_conf->pub_opts.context = NULL;
mqtt_conf->sub_opts.onSuccess = mqtt_utils_on_subscribe_success;
mqtt_conf->sub_opts.onFailure = mqtt_utils_on_subscribe_failure;
mqtt_conf->sub_opts.context = NULL;
//sprintf(nativeInvokTopicName,"iot/10025/%s/message/adviceDevice",mqtt_conf->clientid);
//sprintf(nativeInvokTopicName,"/iot%s/thing/ota/upgrade",mqtt_conf->username);
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// 验证回调函数设置
LOG_I("=== 验证回调函数设置 ===\n");
LOG_I("消息到达回调函数地址: %p\n", mqtt_utils_message_arrived);
LOG_I("连接成功回调函数地址: %p\n", mqtt_utils_connected);
LOG_I("订阅成功回调函数地址: %p\n", mqtt_utils_on_subscribe_success);
LOG_I("订阅失败回调函数地址: %p\n", mqtt_utils_on_subscribe_failure);
LOG_I("=== 回调函数验证完成 ===\n");
ret = 0;
error:
return ret;
}
int mqtt_utils_uninit(mqtt_utils_t *mqtt_config){
LOG_I("%s\n",__func__);
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc = 0;
int ret = -1;
disc_opts.onSuccess = mqtt_utils_on_disconnect_success;
if ((rc = MQTTAsync_disconnect(mqtt_config->client, &disc_opts)) != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed t o start disconnect, return code: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
MQTTAsync_destroy(&mqtt_config->client);
sem_destroy(&mqtt_config->sem_connect);
//sem_destroy(&mqtt_config->sem_subscribe);
ret = 0;
error:
return ret;
}
// 简单的连接状态检查函数不使用可能卡住的API
void mqtt_utils_check_connection_status(void) {
LOG_I("=== MQTT连接状态检查 ===\n");
LOG_I("mqtt_conf指针: %p\n", mqtt_conf);
if (mqtt_conf != NULL) {
LOG_I("MQTT客户端指针: %p\n", mqtt_conf->client);
LOG_I("当前订阅topic: %s\n", nativeInvokTopicName);
LOG_I("连接失败次数: %d\n", connect_failure_times);
// 发送心跳消息保持连接活跃
static int heartbeat_count = 0;
heartbeat_count++;
char heartbeat_topic[128];
char heartbeat_payload[256];
snprintf(heartbeat_topic, sizeof(heartbeat_topic), "/iot/%s/heartbeat", mqtt_conf->username);
snprintf(heartbeat_payload, sizeof(heartbeat_payload), "{\"heartbeat\":%d,\"timestamp\":\"%ld\"}",
heartbeat_count, time(NULL));
LOG_I("发送心跳消息: topic=%s, payload=%s\n", heartbeat_topic, heartbeat_payload);
int heartbeat_result = mqtt_utils_publish(mqtt_conf, heartbeat_topic, 0, heartbeat_payload, strlen(heartbeat_payload));
LOG_I("心跳消息发送结果: %d\n", heartbeat_result);
} else {
LOG_I("mqtt_conf为NULL\n");
}
LOG_I("=== 连接状态检查完成 ===\n");
}
// 尝试订阅不同的topic格式
void mqtt_utils_try_different_topics(void) {
LOG_I("=== 尝试订阅不同的topic格式 ===\n");
if (mqtt_conf == NULL || mqtt_conf->client == NULL) {
LOG_I("MQTT客户端未初始化跳过topic尝试\n");
return;
}
// 尝试不同的topic格式
char* test_topics[] = {
"/iot/WcLightStrip/90A9F73002CD/thing/ota/upgrade",
"/iot/WcLightStrip/90A9F73002CD/thing/+/+",
"/iot/WcLightStrip/+/thing/ota/upgrade",
"/iot/+/90A9F73002CD/thing/ota/upgrade",
"/iot/+ /+/thing/+/+",
"iot/WcLightStrip/90A9F73002CD/thing/ota/upgrade",
"iot/+/+/thing/+/+",
NULL
};
for (int i = 0; test_topics[i] != NULL; i++) {
LOG_I("尝试订阅topic: %s\n", test_topics[i]);
int result = mqtt_utils_subscribe(mqtt_conf, test_topics[i], 1);
LOG_I("订阅结果: %d\n", result);
sleep(1); // 等待一秒再尝试下一个
}
LOG_I("=== topic尝试完成 ===\n");
}