Compare commits

..

7 Commits

Author SHA1 Message Date
zzh
fc95e3561a 减少mqtt断联 2025-12-25 10:15:56 +08:00
zzh
f910a4d701 bug fix 2025-12-24 16:13:04 +08:00
zzh
67e579ac27 新增灭灯上报 2025-12-24 13:41:09 +08:00
zzh
30700b9178 更新亮灯成功上报,mqtt 连接成功时随机闪烁黄灯 2025-12-24 10:25:33 +08:00
zzh
72a94babe5 取消没必要的打印
增加灯条心跳漏报机制
增加模拟 topic
2025-12-23 11:19:36 +08:00
zzh
9ccab6c22f 修复编译错误 2025-12-22 16:21:29 +08:00
zzh
16b46d1b00 同步改版目录内容到 meituan_2.0_New_core_board 2025-12-22 11:53:28 +08:00
21 changed files with 1107 additions and 246 deletions

BIN
hiredis/alloc.o Normal file

Binary file not shown.

BIN
hiredis/async.o Normal file

Binary file not shown.

BIN
hiredis/hiredis-test Executable file

Binary file not shown.

BIN
hiredis/hiredis.o Normal file

Binary file not shown.

11
hiredis/hiredis.pc Normal file
View File

@ -0,0 +1,11 @@
prefix=/usr/local
exec_prefix=${prefix}
libdir=/usr/local/lib
includedir=/usr/local/include
pkgincludedir=/usr/local/include/hiredis
Name: hiredis
Description: Minimalistic C client library for Redis.
Version: 1.3.0
Libs: -L${libdir} -lhiredis
Cflags: -I${pkgincludedir} -I${includedir} -D_FILE_OFFSET_BITS=64

BIN
hiredis/libhiredis.a Normal file

Binary file not shown.

BIN
hiredis/libhiredis.so Executable file

Binary file not shown.

BIN
hiredis/net.o Normal file

Binary file not shown.

BIN
hiredis/read.o Normal file

Binary file not shown.

BIN
hiredis/sds.o Normal file

Binary file not shown.

BIN
hiredis/sockcompat.o Normal file

Binary file not shown.

BIN
hiredis/test.o Normal file

Binary file not shown.

849
main.c

File diff suppressed because it is too large Load Diff

View File

@ -2722,8 +2722,8 @@ static void setRetryLoopInterval(int keepalive)
if (proposed < 1) if (proposed < 1)
proposed = 1; proposed = 1;
else if (proposed > 5) else if (proposed > 60)
proposed = 5; proposed = 60; // 提高上限到60秒适应更长的keepalive
if (proposed < retryLoopInterval) if (proposed < retryLoopInterval)
retryLoopInterval = proposed; retryLoopInterval = proposed;
} }

View File

@ -1323,8 +1323,8 @@ static void setRetryLoopInterval(int keepalive)
if (proposed < 1) if (proposed < 1)
proposed = 1; proposed = 1;
else if (proposed > 5) else if (proposed > 60)
proposed = 5; proposed = 60; // 提高上限到60秒适应更长的keepalive
if (proposed < retryLoopInterval) if (proposed < retryLoopInterval)
retryLoopInterval = proposed; retryLoopInterval = proposed;
} }

View File

@ -633,7 +633,7 @@ void MQTTProtocol_keepalive(time_t now)
} }
else else
{ {
//LOG_I("PINGREQ send success\n"); LOG_I("PINGREQ send success at %ld\n", (long)now);
client->net.lastSent = now; client->net.lastSent = now;
client->ping_outstanding = 1; client->ping_outstanding = 1;
} }
@ -641,9 +641,25 @@ void MQTTProtocol_keepalive(time_t now)
} }
else else
{ {
Log(TRACE_PROTOCOL, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket); // 增加容错允许ping_outstanding为2时再等待一个周期
LOG_I("%s:PINGRESP not received\n",__func__); if (client->ping_outstanding >= 2) {
MQTTProtocol_closeSession(client, 1); Log(TRACE_PROTOCOL, -1, "PINGRESP not received after 2 keepalive intervals for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
LOG_I("%s:PINGRESP not received after 2 attempts\n",__func__);
MQTTProtocol_closeSession(client, 1);
} else {
// 第二次发送PINGREQ
if (Socket_noPendingWrites(client->net.socket)) {
if (MQTTPacket_send_pingreq(&client->net, client->clientID) != TCPSOCKET_COMPLETE) {
Log(TRACE_PROTOCOL, -1, "Error sending second PINGREQ for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
LOG_I("%s:Error sending second PINGREQ for client %s on socket %d, disconnecting\n",__func__,client->clientID, client->net.socket);
MQTTProtocol_closeSession(client, 1);
} else {
LOG_I("Second PINGREQ send success at %ld\n", (long)now);
client->net.lastSent = now;
client->ping_outstanding++;
}
}
}
} }
} }
} }

View File

@ -178,7 +178,7 @@ int MQTTProtocol_handlePingresps(void* pack, int sock)
client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content); client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID); Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID);
client->ping_outstanding = 0; client->ping_outstanding = 0;
//LOG_I("PINGRESP receive success\n"); LOG_I("PINGRESP received successfully, ping_outstanding cleared\n");
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }

View File

@ -25,6 +25,9 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/ipc.h> #include <sys/ipc.h>
#include <sys/msg.h> #include <sys/msg.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdlib.h>
#include "mqtt_utils.h" #include "mqtt_utils.h"
@ -49,6 +52,7 @@ char nativeUpgradeTopicName[1024] = "";
char subscribeTopics[10][1024] = {""}; char subscribeTopics[10][1024] = {""};
int subscribeTopicCount = 0; int subscribeTopicCount = 0;
extern char softwareVersion[16]; extern char softwareVersion[16];
extern bool isMqttConnected;
typedef struct{ typedef struct{
enum MQTTASYNC_TRACE_LEVELS level; enum MQTTASYNC_TRACE_LEVELS level;
char name[64]; char name[64];
@ -119,8 +123,56 @@ json_error:
json_object_put(root); json_object_put(root);
} }
// 黄灯闪烁相关
static pthread_t pt_yellow_led_flash;
static volatile bool yellow_led_flash_running = false;
static pthread_mutex_t yellow_led_mutex = PTHREAD_MUTEX_INITIALIZER;
void *thread_yellow_led_flash(void *arg);
void *thread_yellow_led_flash(void *arg) {
LOG_I("Yellow LED flash thread started\n");
// 初始化随机种子
srand((unsigned int)time(NULL));
while (yellow_led_flash_running) {
// 生成随机延时 (100ms - 500ms)
int delay_us = 100000 + (rand() % 400000); // 100000-500000 微秒
// 点亮黄灯
system("echo 1 > /sys/class/gpio/gpio114/value");
// 随机延时保持亮
usleep(delay_us);
// 检查是否需要退出
if (!yellow_led_flash_running) break;
// 熄灭黄灯
system("echo 0 > /sys/class/gpio/gpio114/value");
// 随机延时保持灭
usleep(delay_us);
}
// 确保退出时灯是灭的
system("echo 0 > /sys/class/gpio/gpio114/value");
LOG_I("Yellow LED flash thread exited\n");
return NULL;
}
void mqtt_net_failure(int *failure_times){ void mqtt_net_failure(int *failure_times){
system("echo 0 > /sys/class/gpio/gpio113/value"); // 停止黄灯闪烁
pthread_mutex_lock(&yellow_led_mutex);
if (yellow_led_flash_running) {
yellow_led_flash_running = false;
pthread_join(pt_yellow_led_flash, NULL);
LOG_I("Yellow LED flash thread stopped\n");
}
pthread_mutex_unlock(&yellow_led_mutex);
system("echo 0 > /sys/class/gpio/gpio114/value");
if(failure_times != NULL){ if(failure_times != NULL){
(*failure_times)++; (*failure_times)++;
LOG_I("mqtt net failure_times = %d\n", *failure_times); LOG_I("mqtt net failure_times = %d\n", *failure_times);
@ -237,8 +289,8 @@ static int mqtt_utils_parse_sys_lightOperate_invoke_topic(const char *topic, cha
} }
int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message){ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message){
LOG_I("---------------%s------------------\n",__func__); // LOG_I("---------------%s------------------\n",__func__);
LOG_I("收到消息topic: %s\n", topicName ? topicName : "(null)"); // LOG_I("收到消息topic: %s\n", topicName ? topicName : "(null)");
// 检查消息是否来自任何一个订阅的topic // 检查消息是否来自任何一个订阅的topic
int topic_matched = 0; int topic_matched = 0;
@ -265,7 +317,7 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
} }
} }
LOG_I("消息匹配topic[%d]: %s\n", i, subscribeTopics[i]); // LOG_I("消息匹配topic[%d]: %s\n", i, subscribeTopics[i]);
if (message != NULL && message->payload != NULL && message->payloadlen > 0) { if (message != NULL && message->payload != NULL && message->payloadlen > 0) {
char payload_buf[1024] = {0}; char payload_buf[1024] = {0};
size_t copy_len = (size_t)message->payloadlen; size_t copy_len = (size_t)message->payloadlen;
@ -276,7 +328,7 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
payload_buf[copy_len] = '\0'; payload_buf[copy_len] = '\0';
PutDataIntoMQueue(payload_buf); PutDataIntoMQueue(payload_buf);
} else { } else {
LOG_I("消息payload为空跳过入队\n"); // LOG_I("消息payload为空跳过入队\n");
} }
topic_matched = 1; topic_matched = 1;
break; break;
@ -284,7 +336,7 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
} }
if(!topic_matched) { if(!topic_matched) {
LOG_I("消息topic不匹配任何订阅的topic: %s\n", topicName); // LOG_I("消息topic不匹配任何订阅的topic: %s\n", topicName);
} }
MQTTAsync_freeMessage(&message); MQTTAsync_freeMessage(&message);
@ -295,15 +347,15 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
} }
void mqtt_utils_connected(void *context, char *cause){ void mqtt_utils_connected(void *context, char *cause){
LOG_I("=== mqtt_utils_connected 函数被调用 ===\n"); // LOG_I("=== mqtt_utils_connected 函数被调用 ===\n");
if (cause != NULL) { if (cause != NULL) {
LOG_I("%s cause:%s\n",__func__, cause); // LOG_I("%s cause:%s\n",__func__, cause);
} else { } else {
LOG_I("%s\n",__func__); // LOG_I("%s\n",__func__);
} }
connect_failure_times=0; connect_failure_times=0;
LOG_I(" MQTT连接成功开始订阅%d个topic\n", subscribeTopicCount); // LOG_I(" MQTT连接成功开始订阅%d个topic\n", subscribeTopicCount);
// 订阅所有设置的topic // 订阅所有设置的topic
for(int i = 0; i < subscribeTopicCount; i++) { for(int i = 0; i < subscribeTopicCount; i++) {
@ -314,29 +366,105 @@ void mqtt_utils_connected(void *context, char *cause){
LOG_I("订阅topic[%d]: %s\n", i, subscribeTopics[i]); LOG_I("订阅topic[%d]: %s\n", i, subscribeTopics[i]);
int subscribe_result = mqtt_utils_subscribe(mqtt_conf, subscribeTopics[i], 1); int subscribe_result = mqtt_utils_subscribe(mqtt_conf, subscribeTopics[i], 1);
LOG_I("订阅topic[%d]请求结果: %d\n", i, subscribe_result); // LOG_I("订阅topic[%d]请求结果: %d\n", i, subscribe_result);
} }
// 添加通配符订阅来接收所有消息(用于调试) // 添加通配符订阅来接收所有消息(用于调试)
LOG_I("=== 添加通配符订阅用于调试 ===\n"); // LOG_I("=== 添加通配符订阅用于调试 ===\n");
int wildcard_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "#", 1); int wildcard_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "#", 1);
LOG_I("通配符订阅请求结果: %d\n", wildcard_subscribe_result); // LOG_I("通配符订阅请求结果: %d\n", wildcard_subscribe_result);
LOG_I("=== 通配符订阅完成 ===\n"); // LOG_I("=== 通配符订阅完成 ===\n");
// 订阅公共topic用于测试 // 订阅公共topic用于测试
LOG_I("=== 订阅公共topic用于测试 ===\n"); // LOG_I("=== 订阅公共topic用于测试 ===\n");
int public_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "/test/public/topic", 0); int public_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "/test/public/topic", 0);
LOG_I("公共topic订阅请求结果: %d\n", public_subscribe_result); // LOG_I("公共topic订阅请求结果: %d\n", public_subscribe_result);
LOG_I("=== 公共topic订阅完成 ===\n"); // LOG_I("=== 公共topic订阅完成 ===\n");
system("echo 1 > /sys/class/gpio/gpio113/value");//yellow ok // 启动黄灯闪烁线程
pthread_mutex_lock(&yellow_led_mutex);
if (!yellow_led_flash_running) {
yellow_led_flash_running = true;
int ret = pthread_create(&pt_yellow_led_flash, NULL, thread_yellow_led_flash, NULL);
if (ret != 0) {
LOG_I("Failed to create yellow LED flash thread, using constant on\n");
yellow_led_flash_running = false;
system("echo 1 > /sys/class/gpio/gpio114/value");//yellow ok
} else {
pthread_detach(pt_yellow_led_flash);
LOG_I("Yellow LED flash thread started\n");
}
}
pthread_mutex_unlock(&yellow_led_mutex);
station_status_report(); station_status_report();
LOG_I("=== mqtt_utils_connected 函数执行完成 ===\n");
// 设置MQTT连接标志
isMqttConnected = true;
// LOG_I("MQTT connected, isMqttConnected = true\n");
// 补报所有已在线的灯条登录信息
typedef struct {
uint32_t tagCode;
time_t lastHeartbeat;
bool isOnline;
} lightbar_heartbeat_t;
extern int lightbarHeartbeatCount;
extern lightbar_heartbeat_t lightbarHeartbeat[];
extern pthread_mutex_t heartbeatMutex;
extern void report_lightbar_login(uint32_t tagCode);
pthread_mutex_lock(&heartbeatMutex);
LOG_I("Reporting login for %d online lightbars again\n", lightbarHeartbeatCount);
for (int i = 0; i < lightbarHeartbeatCount; i++) {
if (lightbarHeartbeat[i].isOnline) {
LOG_I("Report again lightbar %08X login\n", lightbarHeartbeat[i].tagCode);
report_lightbar_login(lightbarHeartbeat[i].tagCode);
// 添加短暂延迟避免MQTT缓冲区溢出
usleep(100 * 1000); // 100ms延迟
}
}
pthread_mutex_unlock(&heartbeatMutex);
// 启动灭灯检测线程(只启动一次)
static bool light_off_thread_started = false;
if (!light_off_thread_started) {
extern pthread_t pt_light_off_check;
extern void *thread_light_off_check(void *arg);
int ret = pthread_create(&pt_light_off_check, NULL, thread_light_off_check, NULL);
if(ret != 0) {
LOG_I("pthread_create light_off_check fail\n");
} else {
LOG_I("pthread_create light_off_check success\n");
pthread_detach(pt_light_off_check);
light_off_thread_started = true;
}
}
// 启动心跳检测线程(只启动一次)
static bool heartbeat_thread_started = false;
if (!heartbeat_thread_started) {
extern pthread_t pt_heartbeat_check;
extern void *thread_heartbeat_check(void *arg);
int ret = pthread_create(&pt_heartbeat_check, NULL, thread_heartbeat_check, NULL);
if(ret != 0){
LOG_I("pthread_create heartbeat_check fail\n");
} else {
LOG_I("pthread_create heartbeat_check success\n");
pthread_detach(pt_heartbeat_check);
heartbeat_thread_started = true;
}
}
// LOG_I("=== mqtt_utils_connected 函数执行完成 ===\n");
} }
void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){ void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){
//LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc)); //LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc));
//mqtt_led_net_status_judge(); //mqtt_led_net_status_judge();
// 设置MQTT连接标志为false
isMqttConnected = false;
// LOG_I("MQTT disconnected, set isMqttConnected = false\n");
} }
void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){ void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){
@ -347,16 +475,16 @@ void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *respons
void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){ void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){
//LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none"); //LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none");
// mqtt_led_net_status_judge(); // mqtt_led_net_status_judge();
LOG_I("MQTT连接失败 - 错误代码: %s\n", response ? MQTTAsync_strerror(response->code) : "未知错误"); // LOG_I("MQTT连接失败 - 错误代码: %s\n", response ? MQTTAsync_strerror(response->code) : "未知错误");
if (response && response->message) { if (response && response->message) {
LOG_I("MQTT连接失败 - 错误信息: %s\n", response->message); // LOG_I("MQTT连接失败 - 错误信息: %s\n", response->message);
} }
mqtt_net_failure(&connect_failure_times); mqtt_net_failure(&connect_failure_times);
} }
static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){ static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){
MQTTAsync client = (MQTTAsync)context; MQTTAsync client = (MQTTAsync)context;
LOG_I("%s ssl error: %s\n",__func__, str); // LOG_I("%s ssl error: %s\n",__func__, str);
return 0; return 0;
} }
@ -365,59 +493,59 @@ void mqtt_utils_on_publish_success(void *context, MQTTAsync_successData *respons
} }
void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){ void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code)); // LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
// 如果是连接相关的问题,记录详细信息 // 如果是连接相关的问题,记录详细信息
if (response && response->code == MQTTASYNC_OPERATION_INCOMPLETE) { if (response && response->code == MQTTASYNC_OPERATION_INCOMPLETE) {
LOG_I("发布失败:操作在完成前被丢弃,可能是连接状态不稳定\n"); // LOG_I("发布失败:操作在完成前被丢弃,可能是连接状态不稳定\n");
} else if (response && response->code == MQTTASYNC_DISCONNECTED) { } else if (response && response->code == MQTTASYNC_DISCONNECTED) {
LOG_I("发布失败:客户端已断开连接\n"); // LOG_I("发布失败:客户端已断开连接\n");
} }
} }
void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){ void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){
LOG_I("=== MQTT订阅成功回调被调用 ===\n"); // LOG_I("=== MQTT订阅成功回调被调用 ===\n");
LOG_I("MQTT订阅成功\n"); // LOG_I("MQTT订阅成功\n");
} }
void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){ void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code)); // LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
} }
void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){ void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){
LOG_I("%s\n",__func__); // LOG_I("%s\n",__func__);
//mqtt_led_net_status_judge(); //mqtt_led_net_status_judge();
} }
int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){ int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){
int rc = 0; int rc = 0;
LOG_I("开始订阅topic: %s, qos: %d\n", topic, qos); // LOG_I("开始订阅topic: %s, qos: %d\n", topic, qos);
LOG_I("MQTT客户端指针: %p\n", mqtt_utils->client); // LOG_I("MQTT客户端指针: %p\n", mqtt_utils->client);
LOG_I("订阅选项指针: %p\n", &mqtt_utils->sub_opts); // LOG_I("订阅选项指针: %p\n", &mqtt_utils->sub_opts);
// 检查参数有效性 // 检查参数有效性
if (topic == NULL || strlen(topic) == 0) { if (topic == NULL || strlen(topic) == 0) {
LOG_I("错误topic参数无效\n"); // LOG_I("错误topic参数无效\n");
return -1; return -1;
} }
if (mqtt_utils->client == NULL) { if (mqtt_utils->client == NULL) {
LOG_I("错误MQTT客户端未初始化\n"); // LOG_I("错误MQTT客户端未初始化\n");
return -1; return -1;
} }
// 跳过连接状态检查,直接进行订阅 // 跳过连接状态检查,直接进行订阅
LOG_I("跳过连接状态检查,直接进行订阅\n"); // LOG_I("跳过连接状态检查,直接进行订阅\n");
LOG_I("准备调用 MQTTAsync_subscribe...\n"); // LOG_I("准备调用 MQTTAsync_subscribe...\n");
rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils->sub_opts); rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils->sub_opts);
LOG_I("MQTTAsync_subscribe 调用完成,返回值: %d\n", rc); // LOG_I("MQTTAsync_subscribe 调用完成,返回值: %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){ if (rc != MQTTASYNC_SUCCESS){
LOG_I("订阅失败,错误代码: %s\n", MQTTAsync_strerror(rc)); // LOG_I("订阅失败,错误代码: %s\n", MQTTAsync_strerror(rc));
} else { } else {
LOG_I("订阅请求已发送,等待回调\n"); // LOG_I("订阅请求已发送,等待回调\n");
} }
return rc; return rc;
} }
@ -448,16 +576,16 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
int ret = -1; int ret = -1;
int rc = 0; int rc = 0;
LOG_I("=== 开始MQTT连接初始化 ===\n"); // LOG_I("=== 开始MQTT连接初始化 ===\n");
LOG_I("ClientId: %s\n", mqtt_conf->clientid); // LOG_I("ClientId: %s\n", mqtt_conf->clientid);
LOG_I("Username: %s\n", mqtt_conf->username); // LOG_I("Username: %s\n", mqtt_conf->username);
LOG_I("Password: %s\n", mqtt_conf->password); // LOG_I("Password: %s\n", mqtt_conf->password);
LOG_I("Host: %s\n", mqtt_conf->host); // LOG_I("Host: %s\n", mqtt_conf->host);
LOG_I("Port: %d\n", mqtt_conf->port); // LOG_I("Port: %d\n", mqtt_conf->port);
// 提前设置topic名称确保在连接回调中可用 // 提前设置topic名称确保在连接回调中可用
sprintf(nativeInvokTopicName,"/iot%s/thing/ota/upgrade",mqtt_conf->username); sprintf(nativeInvokTopicName,"/iot%s/thing/ota/upgrade",mqtt_conf->username);
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName); // LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// 设置多个订阅topic // 设置多个订阅topic
subscribeTopicCount = 0; subscribeTopicCount = 0;
@ -466,13 +594,13 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/bind", mqtt_conf->username); sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/bind", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/group", mqtt_conf->username); sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/group", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot%s/thing/ota/upgrade", mqtt_conf->username); sprintf(subscribeTopics[subscribeTopicCount++], "/iot%s/thing/ota/upgrade", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/sys/WcSubLightStrip/AD1000014C11/thing/service/lightOperate/invoke"); // 只使用通配符订阅覆盖所有设备的lightOperate消息
sprintf(subscribeTopics[subscribeTopicCount++], "/sys/+/+/thing/service/lightOperate/invoke"); sprintf(subscribeTopics[subscribeTopicCount++], "/sys/+/+/thing/service/lightOperate/invoke");
LOG_I("设置了%d个订阅topic:\n", subscribeTopicCount); // LOG_I("设置了%d个订阅topic:\n", subscribeTopicCount);
for(int i = 0; i < subscribeTopicCount; i++) { // for(int i = 0; i < subscribeTopicCount; i++) {
LOG_I("Topic[%d]: %s\n", i, subscribeTopics[i]); // LOG_I("Topic[%d]: %s\n", i, subscribeTopics[i]);
} // }
infos = MQTTAsync_getVersionInfo(); infos = MQTTAsync_getVersionInfo();
printVersionInfo(infos); printVersionInfo(infos);
@ -481,7 +609,7 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
//snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port); //snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port);
snprintf(url, sizeof(url), "tcp://%s:%d", mqtt_conf->host, mqtt_conf->port); snprintf(url, sizeof(url), "tcp://%s:%d", mqtt_conf->host, mqtt_conf->port);
LOG_I("MQTT URL: %s\n", url); // LOG_I("MQTT URL: %s\n", url);
create_opts.sendWhileDisconnected = 0; create_opts.sendWhileDisconnected = 0;
rc = MQTTAsync_createWithOptions(&mqtt_conf->client, rc = MQTTAsync_createWithOptions(&mqtt_conf->client,
@ -490,54 +618,54 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
MQTTCLIENT_PERSISTENCE_NONE, MQTTCLIENT_PERSISTENCE_NONE,
NULL, NULL,
&create_opts); &create_opts);
LOG_I("MQTTAsync_create rc = %d\n", rc); // LOG_I("MQTTAsync_create rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){ if (rc != MQTTASYNC_SUCCESS){
LOG_I("MQTT客户端创建失败: %s\n", MQTTAsync_strerror(rc)); LOG_I("MQTT客户端创建失败: %s\n", MQTTAsync_strerror(rc));
ret = -1; ret = -1;
goto error; goto error;
} }
LOG_I("MQTTAsync_create succes s\n"); // LOG_I("MQTTAsync_create success\n");
// 设置所有必要的回调函数 // 设置所有必要的回调函数
LOG_I("设置MQTT回调函数\n"); // LOG_I("设置MQTT回调函数\n");
rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected); rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected);
LOG_I("MQTTAsync_setConnected rc = %d\n", rc); // LOG_I("MQTTAsync_setConnected rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){ if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置连接回调失败: %s\n", MQTTAsync_strerror(rc)); // LOG_I("设置连接回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2; ret = -2;
goto error; goto error;
} }
LOG_I("MQTTAsync_setConnected success\n"); // LOG_I("MQTTAsync_setConnected success\n");
rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected); rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected);
LOG_I("MQTTAsy nc_setDisconnected rc = %d\n", rc); // LOG_I("MQTTAsync_setDisconnected rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){ if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置断开回调失败: %s\n", MQTTAsync_strerror(rc)); // LOG_I("设置断开回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2; ret = -2;
goto error; goto error;
} }
LOG_I("MQTTAsync_setDisconnected success\n"); // LOG_I("MQTTAsync_setDisconnected success\n");
// 设置消息到达回调 // 设置消息到达回调
rc = MQTTAsync_setMessageArrivedCallback(mqtt_conf->client, NULL, mqtt_utils_message_arrived); rc = MQTTAsync_setMessageArrivedCallback(mqtt_conf->client, NULL, mqtt_utils_message_arrived);
LOG_I("MQTTAsync_setMessageArrivedCallback rc = %d\n", rc); // LOG_I("MQTTAsync_setMessageArrivedCallback rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){ if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置消息到达回调失败: %s\n", MQTTAsync_strerror(rc)); // LOG_I("设置消息到达回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2; ret = -2;
goto error; goto error;
} }
LOG_I("MQTTAsync_setMessageArrivedCallback success\n"); // LOG_I("MQTTAsync_setMessageArrivedCallback success\n");
// 设置连接丢失回调 // 设置连接丢失回调
rc = MQTTAsync_setConnectionLostCallback(mqtt_conf->client, NULL, mqtt_utils_connection_lost); rc = MQTTAsync_setConnectionLostCallback(mqtt_conf->client, NULL, mqtt_utils_connection_lost);
LOG_I("MQTTAsync_setConnectionLostCallback rc = %d\n", rc); // LOG_I("MQTTAsync_setConnectionLostCallback rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){ if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置连接丢失回调失败: %s\n", MQTTAsync_strerror(rc)); // LOG_I("设置连接丢失回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2; ret = -2;
goto error; goto error;
} }
LOG_I("MQTTAsync_setConnectionLostCallback success\n"); // LOG_I("MQTTAsync_setConnectionLostCallback success\n");
/* connect option */ /* connect option */
conn_opts.onSuccess = mqtt_utils_on_connect_success; conn_opts.onSuccess = mqtt_utils_on_connect_success;
@ -579,15 +707,15 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
conn_opts.ssl = &ssl_opts; conn_opts.ssl = &ssl_opts;
} }
LOG_I("正在连接MQTT服务器...\n"); // LOG_I("正在连接MQTT服务器...\n");
rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts); rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts);
LOG_I("MQTTAsync_connect rc = %d\n", rc); // LOG_I("MQTTAsync_connect rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){ if (rc != MQTTASYNC_SUCCESS){
LOG_I("MQTT连接启动失败: %s\n", MQTTAsync_strerror(rc)); LOG_I("MQTT连接启动失败: %s\n", MQTTAsync_strerror(rc));
ret = -1; ret = -1;
goto error; goto error;
} }
LOG_I("MQTTAsync_connect 启动成功\n"); // LOG_I("MQTTAsync_connect 启动成功\n");
mqtt_conf->sub_opts = sub_opts; mqtt_conf->sub_opts = sub_opts;
mqtt_conf->pub_opts = pub_opts; mqtt_conf->pub_opts = pub_opts;
@ -604,12 +732,12 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName); LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// 验证回调函数设置 // 验证回调函数设置
LOG_I("=== 验证回调函数设置 ===\n"); // LOG_I("=== 验证回调函数设置 ===\n");
LOG_I("消息到达回调函数地址: %p\n", mqtt_utils_message_arrived); // LOG_I("消息到达回调函数地址: %p\n", mqtt_utils_message_arrived);
LOG_I("连接成功回调函数地址: %p\n", mqtt_utils_connected); // LOG_I("连接成功回调函数地址: %p\n", mqtt_utils_connected);
LOG_I("订阅成功回调函数地址: %p\n", mqtt_utils_on_subscribe_success); // LOG_I("订阅成功回调函数地址: %p\n", mqtt_utils_on_subscribe_success);
LOG_I("订阅失败回调函数地址: %p\n", mqtt_utils_on_subscribe_failure); // LOG_I("订阅失败回调函数地址: %p\n", mqtt_utils_on_subscribe_failure);
LOG_I("=== 回调函数验证完成 ===\n"); // LOG_I("=== 回调函数验证完成 ===\n");
ret = 0; ret = 0;
error: error:

View File

@ -67,7 +67,7 @@ typedef struct
extern mqtt_utils_t mqtt_config; extern mqtt_utils_t mqtt_config;
extern void PutDataIntoMQueue(char *payload); extern void PutDataIntoMQueue(char *payload);
extern char g_mqtt_deviceName[256]; extern char g_mqtt_deviceName[256]; // 从topic解析出的deviceName
typedef struct{ typedef struct{
char msg_messageId[32]; char msg_messageId[32];

View File

@ -3423,55 +3423,59 @@ int uart_data_receive_data_back(uart_utils_t *uart,uint16_t *parmAck,uint16_t *t
uint16_t *signCode,uint16_t *reserve,uint32_t *lableParm){ uint16_t *signCode,uint16_t *reserve,uint32_t *lableParm){
int ret = 0; int ret = 0;
jt_data_back_package_t jt_data_back_package; jt_data_back_package_t jt_data_back_package;
ret = uart_read_until_time(uart->uart_fd,(char *)&jt_data_back_package,sizeof(jt_data_back_package_t), 1000, 50); uint8_t *buf = (uint8_t *)&jt_data_back_package;
if(ret == sizeof(jt_data_back_package_t)){ int pkg_size = sizeof(jt_data_back_package_t);
//LOG_I("%s success\n", __func__);
if((jt_data_back_package.pre1 == 0x2424) ret = uart_read_until_time(uart->uart_fd,(char *)buf, pkg_size, 1000, 50);
&&(jt_data_back_package.pre2 == 0x2424) if(ret != pkg_size){
&&(jt_data_back_package.pre3 == 0x2424)){ return -1;
uint8_t version_1=(ntohs(jt_data_back_package.version)>>8)&0xFF;
uint8_t version_2=(ntohs(jt_data_back_package.version))&0xFF;
uint8_t signCode_1=(ntohs(jt_data_back_package.signCode)>>8)&0xFF;
uint8_t signCode_2=(ntohs(jt_data_back_package.signCode))&0xFF;
uint8_t reserve_1=(ntohs(jt_data_back_package.reserve)>>8)&0xFF;
uint8_t reserve_2=(ntohs(jt_data_back_package.reserve))&0xFF;
uint8_t lableParm_1=(ntohl(jt_data_back_package.lableParm)>>24)&0xFF;
uint8_t lableParm_2=(ntohl(jt_data_back_package.lableParm)>>16)&0xFF;
uint8_t lableParm_3=(ntohl(jt_data_back_package.lableParm)>>8)&0xFF;
uint8_t lableParm_4=(ntohl(jt_data_back_package.lableParm))&0xFF;
uint8_t test1[15] = {jt_data_back_package.len,jt_data_back_package.featureCode,
jt_data_back_package.count,jt_data_back_package.battery,version_1,version_2,
jt_data_back_package.ledCtrl,signCode_1,signCode_2,reserve_1,reserve_2,
lableParm_1,lableParm_2,lableParm_3,lableParm_4};
uint16_t crc_c=CRC16_XMODEM(test1,15);
//for(int i=0;i<15;i++){
// printf("%02x ",test1[i]);
//}
//LOG_I("%s:XModem_CRC16 = %04x\r\n",__func__,crc_c);
//LOG_I("CRC = %04x\r\n",ntohs(jt_data_back_package.crc));
*parmAck=ntohs(jt_data_back_package.pre4);
*tagCodeHead=ntohs(jt_data_back_package.tagHead);
*tagCode=ntohl(jt_data_back_package.tag);
*tagSignal=jt_data_back_package.signal;
*totalLen=jt_data_back_package.len;
*tagFeature=jt_data_back_package.featureCode;
*count=jt_data_back_package.count;
*batteryV=jt_data_back_package.battery;
*version=ntohs(jt_data_back_package.version);
*ledCtrl=jt_data_back_package.ledCtrl;
*signCode=ntohs(jt_data_back_package.signCode);
*reserve=ntohs(jt_data_back_package.reserve);
*lableParm=ntohl(jt_data_back_package.lableParm);
}else{
// LOG_I("%s failed\n", __func__);
ret = -2;
}
}else{
//LOG_I("%s, failed, time out\n", __func__);
ret = -1;
} }
return ret; // 检查包头
if(!((jt_data_back_package.pre1 == 0x2424)
&&(jt_data_back_package.pre2 == 0x2424)
&&(jt_data_back_package.pre3 == 0x2424))){
// 帧错位,尝试在已读数据中找包头 0x24 0x24
int sync_offset = -1;
for(int i = 1; i < pkg_size - 1; i++){
if(buf[i] == 0x24 && buf[i+1] == 0x24){
sync_offset = i;
break;
}
}
if(sync_offset > 0){
// 找到包头,左移数据并补读剩余字节
int remain = pkg_size - sync_offset;
memmove(buf, buf + sync_offset, remain);
int need = sync_offset;
int got = uart_read_until_time(uart->uart_fd, (char *)(buf + remain), need, 500, 50);
if(got != need){
return -2;
}
// 重新校验包头
if(!((jt_data_back_package.pre1 == 0x2424)
&&(jt_data_back_package.pre2 == 0x2424)
&&(jt_data_back_package.pre3 == 0x2424))){
return -2;
}
}else{
return -2;
}
}
// 解析数据
*parmAck=ntohs(jt_data_back_package.pre4);
*tagCodeHead=ntohs(jt_data_back_package.tagHead);
*tagCode=ntohl(jt_data_back_package.tag);
*tagSignal=jt_data_back_package.signal;
*totalLen=jt_data_back_package.len;
*tagFeature=jt_data_back_package.featureCode;
*count=jt_data_back_package.count;
*batteryV=jt_data_back_package.battery;
*version=ntohs(jt_data_back_package.version);
*ledCtrl=jt_data_back_package.ledCtrl;
*signCode=ntohs(jt_data_back_package.signCode);
*reserve=ntohs(jt_data_back_package.reserve);
*lableParm=ntohl(jt_data_back_package.lableParm);
return pkg_size;
} }

View File

@ -1414,52 +1414,51 @@ typedef struct
typedef struct typedef struct
{ {
uint8_t len; uint8_t len;
uint16_t customCode; uint16_t customCode;
uint8_t type; uint8_t type;
uint8_t bindStart; uint8_t bindStart;
uint8_t bindEnd; uint8_t bindEnd;
uint32_t idStart; uint32_t idStart;
uint32_t idEnd; uint32_t idEnd;
jt_led_or_group_package_t ledCtrl; jt_led_or_group_package_t ledCtrl;
uint16_t reserve; uint16_t reserve;
uint32_t crc; uint32_t crc;
}__attribute__((packed)) jt_id_package_t; }__attribute__((packed)) jt_id_package_t;
typedef struct typedef struct
{ {
uint16_t pre1;//$$ 0x2424 uint16_t pre1;//$$ 0x2424
uint16_t pre2;//$$ 0x2424 uint16_t pre2;//$$ 0x2424
uint16_t pre3;//$$ 0x2424 CC 0x4343 DD 0x4444 uint16_t pre3;//$$ 0x2424 CC 0x4343 DD 0x4444
uint16_t ack;//## 0x2323 complete TT 0x5454 timeout RR 0x5252 AP-reset EE 0x4545 AP-crc-error FF 0x4646 AP broadcast end uint16_t ack;//## 0x2323 complete TT 0x5454 timeout RR 0x5252 AP-reset EE 0x4545 AP-crc-error FF 0x4646 AP broadcast end
}__attribute__((packed)) jt_receive_package_t; }__attribute__((packed)) jt_receive_package_t;
typedef struct typedef struct
{ {
uint16_t v1; uint16_t v1;
uint16_t v2; uint16_t v2;
}__attribute__((packed)) jt_receive_version_package_t; }__attribute__((packed)) jt_receive_version_package_t;
typedef struct typedef struct
{ {
uint16_t pre1; uint16_t pre1;
uint16_t pre2; uint16_t pre2;
uint16_t pre3; uint16_t pre3;
uint16_t pre4; uint16_t pre4;
uint16_t tagHead; uint16_t tagHead;
uint32_t tag; uint32_t tag;
uint8_t signal; uint8_t signal;
uint8_t len; uint8_t len;
uint8_t featureCode; uint8_t featureCode;
uint8_t count; uint8_t count;
uint8_t battery; uint8_t battery;
uint16_t version; uint16_t version;
uint8_t ledCtrl; uint8_t ledCtrl;
uint16_t signCode; uint16_t signCode;
uint16_t reserve; uint16_t reserve;
uint32_t lableParm; uint32_t lableParm;
uint16_t crc; uint16_t crc;
}__attribute__((packed)) jt_data_back_package_t; }__attribute__((packed)) jt_data_back_package_t;
uint32_t CRC32_DIRECT(uint8_t *data, uint32_t length); uint32_t CRC32_DIRECT(uint8_t *data, uint32_t length);
uint16_t CRC16_XMODEM(uint8_t *puchMsg, uint32_t usDataLen); uint16_t CRC16_XMODEM(uint8_t *puchMsg, uint32_t usDataLen);
@ -1468,10 +1467,10 @@ int uart_data_send_head(uart_utils_t *uart,uint8_t func,uint8_t wakeup_time,uint
int uart_data_send_lighton_or_group(uart_utils_t *uart,jt_tag_package_t tags[],uint8_t tag_num); int uart_data_send_lighton_or_group(uart_utils_t *uart,jt_tag_package_t tags[],uint8_t tag_num);
int uart_data_send_lighton_by_group(uart_utils_t *uart,jt_group_package_t groups[],uint8_t tag_num); int uart_data_send_lighton_by_group(uart_utils_t *uart,jt_group_package_t groups[],uint8_t tag_num);
int uart_data_send_lighton_by_id(uart_utils_t *uart,uint8_t bind_start,uint8_t bind_end, int uart_data_send_lighton_by_id(uart_utils_t *uart,uint8_t bind_start,uint8_t bind_end,
uint32_t id_start,uint32_t id_end,jt_led_or_group_package_t led_ctrl,uint16_t reserve); uint32_t id_start,uint32_t id_end,jt_led_or_group_package_t led_ctrl,uint16_t reserve);
int uart_data_receive_data_back(uart_utils_t *uart,uint16_t *parmAck,uint16_t *tagCodeHead,uint32_t *tagCode,uint8_t *tagSignal, int uart_data_receive_data_back(uart_utils_t *uart,uint16_t *parmAck,uint16_t *tagCodeHead,uint32_t *tagCode,uint8_t *tagSignal,
uint8_t *totalLen,uint8_t *tagFeature,uint8_t *count,uint8_t *batteryV,uint16_t *version,uint8_t *ledCtrl, uint8_t *totalLen,uint8_t *tagFeature,uint8_t *count,uint8_t *batteryV,uint16_t *version,uint8_t *ledCtrl,
uint16_t *signCode,uint16_t *reserve,uint32_t *lableParm); uint16_t *signCode,uint16_t *reserve,uint32_t *lableParm);
int uart_data_receive_ack(uart_utils_t *uart,uint16_t *parm_ack); int uart_data_receive_ack(uart_utils_t *uart,uint16_t *parm_ack);
int uart_data_receive_version(uart_utils_t *uart); int uart_data_receive_version(uart_utils_t *uart);
#endif #endif