取消没必要的打印

增加灯条心跳漏报机制
增加模拟 topic
This commit is contained in:
zzh 2025-12-23 11:19:36 +08:00
parent 9ccab6c22f
commit 72a94babe5
2 changed files with 282 additions and 103 deletions

189
main.c
View File

@ -40,6 +40,7 @@ pthread_t pt_mqtt_status;
pthread_t pt_station_heartbeat;
pthread_t pt_simulate_light;
pthread_t pt_all_light;
pthread_t pt_simulate_mqtt_topic;
uart_utils_t uartSend = {0};
uart_utils_t uartRecvData = {0};
uart_utils_t uartRecvBack = {0};
@ -61,6 +62,7 @@ uint8_t allLightFlash=3; // 全场亮灯闪烁模式,默认闪烁
uint8_t allLightSound=0; // 全场亮灯蜂鸣,默认关闭
int allLightDuration=30; // 全场亮灯持续时间(秒)
bool isOtaEnable=false;
bool isMqttConnected=false; // MQTT连接状态标志
jt_only_tag_t onlyTags[200]={0};
int tagCount=0;
@ -143,6 +145,7 @@ void *thread_station_heartbeat(void *arg);
void *thread_heartbeat_check(void *arg);
void *thread_simulate_light(void *arg);
void *thread_all_light(void *arg);
void *thread_simulate_mqtt_topic(void *arg);
void update_lightbar_heartbeat(uint32_t tagCode);
void report_lightbar_login(uint32_t tagCode);
void report_lightbar_logout(uint32_t tagCode);
@ -253,7 +256,12 @@ void update_lightbar_heartbeat(uint32_t tagCode) {
if (!lightbarHeartbeat[found].isOnline) {
lightbarHeartbeat[found].isOnline = true;
LOG_I("Lightbar %08X back online\n", tagCode);
report_lightbar_login(tagCode);
// 只有MQTT连接后才上报登录
if (isMqttConnected) {
report_lightbar_login(tagCode);
} else {
LOG_I("MQTT not connected yet, skip lightbar login report\n");
}
}
} else if (lightbarHeartbeatCount < MAX_LIGHTBAR_NUM) {
// 新灯条心跳存储
@ -262,7 +270,12 @@ void update_lightbar_heartbeat(uint32_t tagCode) {
lightbarHeartbeat[lightbarHeartbeatCount].isOnline = true;
lightbarHeartbeatCount++;
LOG_I("New lightbar %08X registered, total: %d\n", tagCode, lightbarHeartbeatCount);
report_lightbar_login(tagCode);
// 只有MQTT连接后才上报登录
if (isMqttConnected) {
report_lightbar_login(tagCode);
} else {
LOG_I("MQTT not connected yet, skip lightbar login report\n");
}
}
pthread_mutex_unlock(&heartbeatMutex);
@ -303,33 +316,139 @@ void *thread_simulate_light(void *arg){
LOG_I("thread_simulate_light started\n");
char sim_payload[1024] = {0};
char device_specific_payload[1024] = {0};
char last_deviceName[256] = {0}; // 记录上次的deviceName
int first_run = 1; // 首次运行标志
while(1){
sleep(30); // 每30秒执行一次
// 从/root/payload文件读取内容
FILE *fp = fopen("/root/payload", "r");
if(fp == NULL){
LOG_I("simulate_light: cannot open /root/payload\n");
// 检查是否有从topic解析出的deviceName
if(strlen(g_mqtt_deviceName) > 0) {
// 检查deviceName是否有变化
if(strcmp(g_mqtt_deviceName, last_deviceName) != 0 || first_run) {
LOG_I("simulate_light: detected new deviceName from topic: %s\n", g_mqtt_deviceName);
// 保存当前deviceName
strcpy(last_deviceName, g_mqtt_deviceName);
first_run = 0;
// 立即响应使用deviceName构建设备特定的payload
snprintf(device_specific_payload, sizeof(device_specific_payload),
"{\"deviceId\":\"%s\",\"params\":{\"LightSwitch\":1}}",
g_mqtt_deviceName);
LOG_I("simulate_light: immediate response with device_specific payload: %s\n", device_specific_payload);
// 直接把设备特定的payload放入消息队列
PutDataIntoMQueue(device_specific_payload);
} else {
LOG_I("simulate_light: deviceName unchanged: %s, skipping duplicate simulation\n", g_mqtt_deviceName);
}
} else {
// 从/root/payload文件读取内容原有逻辑
FILE *fp = fopen("/root/payload", "r");
if(fp == NULL){
LOG_I("simulate_light: cannot open /root/payload and no deviceName from topic\n");
continue;
}
memset(sim_payload, 0, sizeof(sim_payload));
if(fgets(sim_payload, sizeof(sim_payload), fp) != NULL){
// 去掉换行符
int len = strlen(sim_payload);
if(len > 0 && sim_payload[len-1] == '\n'){
sim_payload[len-1] = '\0';
}
LOG_I("simulate_light: using payload from file\n");
LOG_I("payload: %s\n", sim_payload);
// 直接把payload放入消息队列由thread_mqtt_recv处理
PutDataIntoMQueue(sim_payload);
}
fclose(fp);
}
}
pthread_exit(NULL);
}
/*================================================================================*/
// 模拟MQTT topic线程 - 监控文件并模拟接收MQTT消息
void *thread_simulate_mqtt_topic(void *arg){
LOG_I("thread_simulate_mqtt_topic started\n");
char sim_topic[512] = {0};
char sim_payload[1024] = {0};
while(1){
sleep(2); // 每2秒检查一次
// 检查模拟topic文件
FILE *fp_topic = fopen("/root/sim_topic", "r");
if(fp_topic == NULL){
continue;
}
memset(sim_payload, 0, sizeof(sim_payload));
if(fgets(sim_payload, sizeof(sim_payload), fp) != NULL){
// 读取topic内容
memset(sim_topic, 0, sizeof(sim_topic));
if(fgets(sim_topic, sizeof(sim_topic), fp_topic) != NULL){
// 去掉换行符
int len = strlen(sim_payload);
if(len > 0 && sim_payload[len-1] == '\n'){
sim_payload[len-1] = '\0';
int len = strlen(sim_topic);
if(len > 0 && sim_topic[len-1] == '\n'){
sim_topic[len-1] = '\0';
}
LOG_I("simulate_light: putting payload into queue\n");
LOG_I("payload: %s\n", sim_payload);
// 直接把payload放入消息队列由thread_mqtt_recv处理
PutDataIntoMQueue(sim_payload);
// 检查是否是lightOperate/invoke topic
if(strstr(sim_topic, "/sys/") && strstr(sim_topic, "/thing/service/lightOperate/invoke")) {
// 解析topic获取deviceName
char productKey[256] = {0};
char deviceName[256] = {0};
// 使用类似mqtt_utils_parse_sys_lightOperate_invoke_topic的逻辑
if(sscanf(sim_topic, "/sys/%255[^/]/%255[^/]/thing/service/lightOperate/invoke", productKey, deviceName) == 2) {
LOG_I("simulate_mqtt_topic: parsed productKey=%s, deviceName=%s\n", productKey, deviceName);
// 设置全局变量模拟mqtt_utils_message_arrived的行为
memset(g_mqtt_deviceName, 0, sizeof(g_mqtt_deviceName));
strncpy(g_mqtt_deviceName, deviceName, sizeof(g_mqtt_deviceName) - 1);
LOG_I("simulate_mqtt_topic: set g_mqtt_deviceName=%s\n", g_mqtt_deviceName);
// 读取payload文件
FILE *fp_payload = fopen("/root/sim_payload", "r");
if(fp_payload != NULL) {
memset(sim_payload, 0, sizeof(sim_payload));
if(fgets(sim_payload, sizeof(sim_payload), fp_payload) != NULL) {
// 去掉换行符
len = strlen(sim_payload);
if(len > 0 && sim_payload[len-1] == '\n'){
sim_payload[len-1] = '\0';
}
LOG_I("simulate_mqtt_topic: sending payload to queue: %s\n", sim_payload);
// 将payload放入消息队列模拟接收到的MQTT消息
PutDataIntoMQueue(sim_payload);
}
fclose(fp_payload);
// 删除payload文件
unlink("/root/sim_payload");
} else {
// 使用默认payload
char default_payload[] = "{\"method\":\"thing.service.lightOperate\",\"params\":{\"LightSwitch\":1}}";
LOG_I("simulate_mqtt_topic: using default payload: %s\n", default_payload);
PutDataIntoMQueue(default_payload);
}
}
}
}
fclose(fp);
fclose(fp_topic);
// 删除topic文件
unlink("/root/sim_topic");
}
pthread_exit(NULL);
}
@ -510,7 +629,7 @@ void report_tag(void){
while(1){
sleep(60*10);
LOG_I("report_tag\n");
light_status_report();
//light_status_report();
}
}
@ -1834,8 +1953,13 @@ void *thread_uart_recv_back(void *arg){
while(1){
uart_data_receive_data_back(&uartRecvBack,&parmAck,&tagCodeHead,&tagCode,&tagSignal,&totalLen,&tagFeature,
&count,&batteryV,&version,&ledCtrl,&signCode,&reserve,&lableParm);
//LOG_I("recv_back:%04x,%04x,tag:%08x,%02x,%02x,%02x,%02x,battery:%02x,%04x,%02x,%04x,reserve:%04x,%08x\n",
//parmAck,tagCodeHead,tagCode,tagSignal,totalLen,tagFeature,count,batteryV,version,ledCtrl,signCode,reserve,lableParm);
// 打印除心跳外的所有接收数据
if(tagFeature != 0xFF) {
LOG_I("recv_back:%04x,%04x,tag:%08x,%02x,%02x,%02x,%02x,battery:%02x,%04x,%02x,%04x,reserve:%04x,%08x\n",
parmAck,tagCodeHead,tagCode,tagSignal,totalLen,tagFeature,count,batteryV,version,ledCtrl,signCode,reserve,lableParm);
}
PutDataIntoQueue(tagCode,batteryV,reserve);
if(tagFeature==0xFF){
//LOG_I("heart beat from lightbar %06X\n", tagCode);
@ -2074,7 +2198,7 @@ void addOnlyTag(uint32_t tagname,uint16_t battery,uint16_t reserve){
}
}
if(tagCount==100){
light_status_report();
//light_status_report();
}
}
@ -2711,13 +2835,14 @@ int main(int argc, char *argv[])
pthread_detach(pt_station_heartbeat);
}
ret = pthread_create(&pt_heartbeat_check,NULL,thread_heartbeat_check,NULL);
if(ret!=0){
LOG_I("pthread_create heartbeat_check fail\n");
}else{
LOG_I("pthread_create heartbeat_check success\n");
pthread_detach(pt_heartbeat_check);
}
// 心跳检测线程移到MQTT连接成功后启动避免MQTT未连接时上报失败
// ret = pthread_create(&pt_heartbeat_check,NULL,thread_heartbeat_check,NULL);
// if(ret!=0){
// LOG_I("pthread_create heartbeat_check fail\n");
// }else{
// LOG_I("pthread_create heartbeat_check success\n");
// pthread_detach(pt_heartbeat_check);
// }
#if 1
ret = pthread_create(&pt_simulate_light,NULL,thread_simulate_light,NULL);
@ -2735,6 +2860,14 @@ int main(int argc, char *argv[])
LOG_I("pthread_create all_light success\n");
pthread_detach(pt_all_light);
}
ret = pthread_create(&pt_simulate_mqtt_topic,NULL,thread_simulate_mqtt_topic,NULL);
if(ret!=0){
LOG_I("pthread_create simulate_mqtt_topic fail\n");
}else{
LOG_I("pthread_create simulate_mqtt_topic success\n");
pthread_detach(pt_simulate_mqtt_topic);
}
#endif
#if 0
readresult=file_to_buffer("mqttRawPassword",&len);

View File

@ -49,6 +49,7 @@ char nativeUpgradeTopicName[1024] = "";
char subscribeTopics[10][1024] = {""};
int subscribeTopicCount = 0;
extern char softwareVersion[16];
extern bool isMqttConnected;
typedef struct{
enum MQTTASYNC_TRACE_LEVELS level;
char name[64];
@ -237,8 +238,8 @@ static int mqtt_utils_parse_sys_lightOperate_invoke_topic(const char *topic, cha
}
int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message){
LOG_I("---------------%s------------------\n",__func__);
LOG_I("收到消息topic: %s\n", topicName ? topicName : "(null)");
// LOG_I("---------------%s------------------\n",__func__);
// LOG_I("收到消息topic: %s\n", topicName ? topicName : "(null)");
// 检查消息是否来自任何一个订阅的topic
int topic_matched = 0;
@ -256,16 +257,16 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
char field2[256] = {0};
if (mqtt_utils_parse_sys_lightOperate_invoke_topic(topicName, field1, sizeof(field1), field2,
sizeof(field2)) == 0) {
LOG_I("lightOperate/invoke 匹配字段: productKey=%s, deviceName=%s\n", field1, field2);
// LOG_I("lightOperate/invoke 匹配字段: productKey=%s, deviceName=%s\n", field1, field2);
// 将deviceName存储到全局变量中
memset(g_mqtt_deviceName, 0, sizeof(g_mqtt_deviceName));
strncpy(g_mqtt_deviceName, field2, sizeof(g_mqtt_deviceName) - 1);
} else {
LOG_I("lightOperate/invoke topic字段解析失败: %s\n", topicName);
// LOG_I("lightOperate/invoke topic字段解析失败: %s\n", topicName);
}
}
LOG_I("消息匹配topic[%d]: %s\n", i, subscribeTopics[i]);
// LOG_I("消息匹配topic[%d]: %s\n", i, subscribeTopics[i]);
if (message != NULL && message->payload != NULL && message->payloadlen > 0) {
char payload_buf[1024] = {0};
size_t copy_len = (size_t)message->payloadlen;
@ -276,7 +277,7 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
payload_buf[copy_len] = '\0';
PutDataIntoMQueue(payload_buf);
} else {
LOG_I("消息payload为空跳过入队\n");
// LOG_I("消息payload为空跳过入队\n");
}
topic_matched = 1;
break;
@ -284,7 +285,7 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
}
if(!topic_matched) {
LOG_I("消息topic不匹配任何订阅的topic: %s\n", topicName);
// LOG_I("消息topic不匹配任何订阅的topic: %s\n", topicName);
}
MQTTAsync_freeMessage(&message);
@ -295,15 +296,15 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
}
void mqtt_utils_connected(void *context, char *cause){
LOG_I("=== mqtt_utils_connected 函数被调用 ===\n");
// LOG_I("=== mqtt_utils_connected 函数被调用 ===\n");
if (cause != NULL) {
LOG_I("%s cause:%s\n",__func__, cause);
// LOG_I("%s cause:%s\n",__func__, cause);
} else {
LOG_I("%s\n",__func__);
// LOG_I("%s\n",__func__);
}
connect_failure_times=0;
LOG_I(" MQTT连接成功开始订阅%d个topic\n", subscribeTopicCount);
// LOG_I(" MQTT连接成功开始订阅%d个topic\n", subscribeTopicCount);
// 订阅所有设置的topic
for(int i = 0; i < subscribeTopicCount; i++) {
@ -314,29 +315,74 @@ void mqtt_utils_connected(void *context, char *cause){
LOG_I("订阅topic[%d]: %s\n", i, subscribeTopics[i]);
int subscribe_result = mqtt_utils_subscribe(mqtt_conf, subscribeTopics[i], 1);
LOG_I("订阅topic[%d]请求结果: %d\n", i, subscribe_result);
// LOG_I("订阅topic[%d]请求结果: %d\n", i, subscribe_result);
}
// 添加通配符订阅来接收所有消息(用于调试)
LOG_I("=== 添加通配符订阅用于调试 ===\n");
// LOG_I("=== 添加通配符订阅用于调试 ===\n");
int wildcard_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "#", 1);
LOG_I("通配符订阅请求结果: %d\n", wildcard_subscribe_result);
LOG_I("=== 通配符订阅完成 ===\n");
// LOG_I("通配符订阅请求结果: %d\n", wildcard_subscribe_result);
// LOG_I("=== 通配符订阅完成 ===\n");
// 订阅公共topic用于测试
LOG_I("=== 订阅公共topic用于测试 ===\n");
// LOG_I("=== 订阅公共topic用于测试 ===\n");
int public_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "/test/public/topic", 0);
LOG_I("公共topic订阅请求结果: %d\n", public_subscribe_result);
LOG_I("=== 公共topic订阅完成 ===\n");
// LOG_I("公共topic订阅请求结果: %d\n", public_subscribe_result);
// LOG_I("=== 公共topic订阅完成 ===\n");
system("echo 1 > /sys/class/gpio/gpio114/value");//yellow ok
station_status_report();
LOG_I("=== mqtt_utils_connected 函数执行完成 ===\n");
// 设置MQTT连接标志
isMqttConnected = true;
// LOG_I("MQTT connected, isMqttConnected = true\n");
// 补报所有已在线的灯条登录信息
typedef struct {
uint32_t tagCode;
time_t lastHeartbeat;
bool isOnline;
} lightbar_heartbeat_t;
extern int lightbarHeartbeatCount;
extern lightbar_heartbeat_t lightbarHeartbeat[];
extern pthread_mutex_t heartbeatMutex;
extern void report_lightbar_login(uint32_t tagCode);
pthread_mutex_lock(&heartbeatMutex);
LOG_I("Reporting login for %d online lightbars again\n", lightbarHeartbeatCount);
for (int i = 0; i < lightbarHeartbeatCount; i++) {
if (lightbarHeartbeat[i].isOnline) {
LOG_I("Report again lightbar %08X login\n", lightbarHeartbeat[i].tagCode);
report_lightbar_login(lightbarHeartbeat[i].tagCode);
}
}
pthread_mutex_unlock(&heartbeatMutex);
// 启动心跳检测线程(只启动一次)
static bool heartbeat_thread_started = false;
if (!heartbeat_thread_started) {
extern pthread_t pt_heartbeat_check;
extern void *thread_heartbeat_check(void *arg);
int ret = pthread_create(&pt_heartbeat_check, NULL, thread_heartbeat_check, NULL);
if(ret != 0){
LOG_I("pthread_create heartbeat_check fail\n");
} else {
LOG_I("pthread_create heartbeat_check success\n");
pthread_detach(pt_heartbeat_check);
heartbeat_thread_started = true;
}
}
// LOG_I("=== mqtt_utils_connected 函数执行完成 ===\n");
}
void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){
//LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc));
//mqtt_led_net_status_judge();
// 设置MQTT连接标志为false
isMqttConnected = false;
// LOG_I("MQTT disconnected, set isMqttConnected = false\n");
}
void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){
@ -347,16 +393,16 @@ void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *respons
void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){
//LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none");
// mqtt_led_net_status_judge();
LOG_I("MQTT连接失败 - 错误代码: %s\n", response ? MQTTAsync_strerror(response->code) : "未知错误");
// LOG_I("MQTT连接失败 - 错误代码: %s\n", response ? MQTTAsync_strerror(response->code) : "未知错误");
if (response && response->message) {
LOG_I("MQTT连接失败 - 错误信息: %s\n", response->message);
// LOG_I("MQTT连接失败 - 错误信息: %s\n", response->message);
}
mqtt_net_failure(&connect_failure_times);
}
static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){
MQTTAsync client = (MQTTAsync)context;
LOG_I("%s ssl error: %s\n",__func__, str);
// LOG_I("%s ssl error: %s\n",__func__, str);
return 0;
}
@ -365,59 +411,59 @@ void mqtt_utils_on_publish_success(void *context, MQTTAsync_successData *respons
}
void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
// LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
// 如果是连接相关的问题,记录详细信息
if (response && response->code == MQTTASYNC_OPERATION_INCOMPLETE) {
LOG_I("发布失败:操作在完成前被丢弃,可能是连接状态不稳定\n");
// LOG_I("发布失败:操作在完成前被丢弃,可能是连接状态不稳定\n");
} else if (response && response->code == MQTTASYNC_DISCONNECTED) {
LOG_I("发布失败:客户端已断开连接\n");
// LOG_I("发布失败:客户端已断开连接\n");
}
}
void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){
LOG_I("=== MQTT订阅成功回调被调用 ===\n");
LOG_I("MQTT订阅成功\n");
// LOG_I("=== MQTT订阅成功回调被调用 ===\n");
// LOG_I("MQTT订阅成功\n");
}
void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
// LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
}
void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){
LOG_I("%s\n",__func__);
// LOG_I("%s\n",__func__);
//mqtt_led_net_status_judge();
}
int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){
int rc = 0;
LOG_I("开始订阅topic: %s, qos: %d\n", topic, qos);
LOG_I("MQTT客户端指针: %p\n", mqtt_utils->client);
LOG_I("订阅选项指针: %p\n", &mqtt_utils->sub_opts);
// LOG_I("开始订阅topic: %s, qos: %d\n", topic, qos);
// LOG_I("MQTT客户端指针: %p\n", mqtt_utils->client);
// LOG_I("订阅选项指针: %p\n", &mqtt_utils->sub_opts);
// 检查参数有效性
if (topic == NULL || strlen(topic) == 0) {
LOG_I("错误topic参数无效\n");
// LOG_I("错误topic参数无效\n");
return -1;
}
if (mqtt_utils->client == NULL) {
LOG_I("错误MQTT客户端未初始化\n");
// LOG_I("错误MQTT客户端未初始化\n");
return -1;
}
// 跳过连接状态检查,直接进行订阅
LOG_I("跳过连接状态检查,直接进行订阅\n");
LOG_I("准备调用 MQTTAsync_subscribe...\n");
// LOG_I("跳过连接状态检查,直接进行订阅\n");
// LOG_I("准备调用 MQTTAsync_subscribe...\n");
rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils->sub_opts);
LOG_I("MQTTAsync_subscribe 调用完成,返回值: %d\n", rc);
// LOG_I("MQTTAsync_subscribe 调用完成,返回值: %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("订阅失败,错误代码: %s\n", MQTTAsync_strerror(rc));
// LOG_I("订阅失败,错误代码: %s\n", MQTTAsync_strerror(rc));
} else {
LOG_I("订阅请求已发送,等待回调\n");
// LOG_I("订阅请求已发送,等待回调\n");
}
return rc;
}
@ -448,16 +494,16 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
int ret = -1;
int rc = 0;
LOG_I("=== 开始MQTT连接初始化 ===\n");
LOG_I("ClientId: %s\n", mqtt_conf->clientid);
LOG_I("Username: %s\n", mqtt_conf->username);
LOG_I("Password: %s\n", mqtt_conf->password);
LOG_I("Host: %s\n", mqtt_conf->host);
LOG_I("Port: %d\n", mqtt_conf->port);
// LOG_I("=== 开始MQTT连接初始化 ===\n");
// LOG_I("ClientId: %s\n", mqtt_conf->clientid);
// LOG_I("Username: %s\n", mqtt_conf->username);
// LOG_I("Password: %s\n", mqtt_conf->password);
// LOG_I("Host: %s\n", mqtt_conf->host);
// LOG_I("Port: %d\n", mqtt_conf->port);
// 提前设置topic名称确保在连接回调中可用
sprintf(nativeInvokTopicName,"/iot%s/thing/ota/upgrade",mqtt_conf->username);
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// 设置多个订阅topic
subscribeTopicCount = 0;
@ -469,10 +515,10 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
sprintf(subscribeTopics[subscribeTopicCount++], "/sys/WcSubLightStrip/AD1000014C11/thing/service/lightOperate/invoke");
sprintf(subscribeTopics[subscribeTopicCount++], "/sys/+/+/thing/service/lightOperate/invoke");
LOG_I("设置了%d个订阅topic:\n", subscribeTopicCount);
for(int i = 0; i < subscribeTopicCount; i++) {
LOG_I("Topic[%d]: %s\n", i, subscribeTopics[i]);
}
// LOG_I("设置了%d个订阅topic:\n", subscribeTopicCount);
// for(int i = 0; i < subscribeTopicCount; i++) {
// LOG_I("Topic[%d]: %s\n", i, subscribeTopics[i]);
// }
infos = MQTTAsync_getVersionInfo();
printVersionInfo(infos);
@ -481,7 +527,7 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
//snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port);
snprintf(url, sizeof(url), "tcp://%s:%d", mqtt_conf->host, mqtt_conf->port);
LOG_I("MQTT URL: %s\n", url);
// LOG_I("MQTT URL: %s\n", url);
create_opts.sendWhileDisconnected = 0;
rc = MQTTAsync_createWithOptions(&mqtt_conf->client,
@ -490,54 +536,54 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
MQTTCLIENT_PERSISTENCE_NONE,
NULL,
&create_opts);
LOG_I("MQTTAsync_create rc = %d\n", rc);
// LOG_I("MQTTAsync_create rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("MQTT客户端创建失败: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
LOG_I("MQTTAsync_create succes s\n");
// LOG_I("MQTTAsync_create success\n");
// 设置所有必要的回调函数
LOG_I("设置MQTT回调函数\n");
// LOG_I("设置MQTT回调函数\n");
rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected);
LOG_I("MQTTAsync_setConnected rc = %d\n", rc);
// LOG_I("MQTTAsync_setConnected rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置连接回调失败: %s\n", MQTTAsync_strerror(rc));
// LOG_I("设置连接回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setConnected success\n");
// LOG_I("MQTTAsync_setConnected success\n");
rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected);
LOG_I("MQTTAsy nc_setDisconnected rc = %d\n", rc);
// LOG_I("MQTTAsync_setDisconnected rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置断开回调失败: %s\n", MQTTAsync_strerror(rc));
// LOG_I("设置断开回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setDisconnected success\n");
// LOG_I("MQTTAsync_setDisconnected success\n");
// 设置消息到达回调
rc = MQTTAsync_setMessageArrivedCallback(mqtt_conf->client, NULL, mqtt_utils_message_arrived);
LOG_I("MQTTAsync_setMessageArrivedCallback rc = %d\n", rc);
// LOG_I("MQTTAsync_setMessageArrivedCallback rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置消息到达回调失败: %s\n", MQTTAsync_strerror(rc));
// LOG_I("设置消息到达回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setMessageArrivedCallback success\n");
// LOG_I("MQTTAsync_setMessageArrivedCallback success\n");
// 设置连接丢失回调
rc = MQTTAsync_setConnectionLostCallback(mqtt_conf->client, NULL, mqtt_utils_connection_lost);
LOG_I("MQTTAsync_setConnectionLostCallback rc = %d\n", rc);
// LOG_I("MQTTAsync_setConnectionLostCallback rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置连接丢失回调失败: %s\n", MQTTAsync_strerror(rc));
// LOG_I("设置连接丢失回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setConnectionLostCallback success\n");
// LOG_I("MQTTAsync_setConnectionLostCallback success\n");
/* connect option */
conn_opts.onSuccess = mqtt_utils_on_connect_success;
@ -579,15 +625,15 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
conn_opts.ssl = &ssl_opts;
}
LOG_I("正在连接MQTT服务器...\n");
// LOG_I("正在连接MQTT服务器...\n");
rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts);
LOG_I("MQTTAsync_connect rc = %d\n", rc);
// LOG_I("MQTTAsync_connect rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("MQTT连接启动失败: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
LOG_I("MQTTAsync_connect 启动成功\n");
// LOG_I("MQTTAsync_connect 启动成功\n");
mqtt_conf->sub_opts = sub_opts;
mqtt_conf->pub_opts = pub_opts;
@ -604,12 +650,12 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// 验证回调函数设置
LOG_I("=== 验证回调函数设置 ===\n");
LOG_I("消息到达回调函数地址: %p\n", mqtt_utils_message_arrived);
LOG_I("连接成功回调函数地址: %p\n", mqtt_utils_connected);
LOG_I("订阅成功回调函数地址: %p\n", mqtt_utils_on_subscribe_success);
LOG_I("订阅失败回调函数地址: %p\n", mqtt_utils_on_subscribe_failure);
LOG_I("=== 回调函数验证完成 ===\n");
// LOG_I("=== 验证回调函数设置 ===\n");
// LOG_I("消息到达回调函数地址: %p\n", mqtt_utils_message_arrived);
// LOG_I("连接成功回调函数地址: %p\n", mqtt_utils_connected);
// LOG_I("订阅成功回调函数地址: %p\n", mqtt_utils_on_subscribe_success);
// LOG_I("订阅失败回调函数地址: %p\n", mqtt_utils_on_subscribe_failure);
// LOG_I("=== 回调函数验证完成 ===\n");
ret = 0;
error: