Compare commits

..

7 Commits

Author SHA1 Message Date
zzh
fc95e3561a 减少mqtt断联 2025-12-25 10:15:56 +08:00
zzh
f910a4d701 bug fix 2025-12-24 16:13:04 +08:00
zzh
67e579ac27 新增灭灯上报 2025-12-24 13:41:09 +08:00
zzh
30700b9178 更新亮灯成功上报,mqtt 连接成功时随机闪烁黄灯 2025-12-24 10:25:33 +08:00
zzh
72a94babe5 取消没必要的打印
增加灯条心跳漏报机制
增加模拟 topic
2025-12-23 11:19:36 +08:00
zzh
9ccab6c22f 修复编译错误 2025-12-22 16:21:29 +08:00
zzh
16b46d1b00 同步改版目录内容到 meituan_2.0_New_core_board 2025-12-22 11:53:28 +08:00
21 changed files with 1107 additions and 246 deletions

BIN
hiredis/alloc.o Normal file

Binary file not shown.

BIN
hiredis/async.o Normal file

Binary file not shown.

BIN
hiredis/hiredis-test Executable file

Binary file not shown.

BIN
hiredis/hiredis.o Normal file

Binary file not shown.

11
hiredis/hiredis.pc Normal file
View File

@ -0,0 +1,11 @@
prefix=/usr/local
exec_prefix=${prefix}
libdir=/usr/local/lib
includedir=/usr/local/include
pkgincludedir=/usr/local/include/hiredis
Name: hiredis
Description: Minimalistic C client library for Redis.
Version: 1.3.0
Libs: -L${libdir} -lhiredis
Cflags: -I${pkgincludedir} -I${includedir} -D_FILE_OFFSET_BITS=64

BIN
hiredis/libhiredis.a Normal file

Binary file not shown.

BIN
hiredis/libhiredis.so Executable file

Binary file not shown.

BIN
hiredis/net.o Normal file

Binary file not shown.

BIN
hiredis/read.o Normal file

Binary file not shown.

BIN
hiredis/sds.o Normal file

Binary file not shown.

BIN
hiredis/sockcompat.o Normal file

Binary file not shown.

BIN
hiredis/test.o Normal file

Binary file not shown.

811
main.c

File diff suppressed because it is too large Load Diff

View File

@ -2722,8 +2722,8 @@ static void setRetryLoopInterval(int keepalive)
if (proposed < 1)
proposed = 1;
else if (proposed > 5)
proposed = 5;
else if (proposed > 60)
proposed = 60; // 提高上限到60秒适应更长的keepalive
if (proposed < retryLoopInterval)
retryLoopInterval = proposed;
}

View File

@ -1323,8 +1323,8 @@ static void setRetryLoopInterval(int keepalive)
if (proposed < 1)
proposed = 1;
else if (proposed > 5)
proposed = 5;
else if (proposed > 60)
proposed = 60; // 提高上限到60秒适应更长的keepalive
if (proposed < retryLoopInterval)
retryLoopInterval = proposed;
}

View File

@ -633,7 +633,7 @@ void MQTTProtocol_keepalive(time_t now)
}
else
{
//LOG_I("PINGREQ send success\n");
LOG_I("PINGREQ send success at %ld\n", (long)now);
client->net.lastSent = now;
client->ping_outstanding = 1;
}
@ -641,9 +641,25 @@ void MQTTProtocol_keepalive(time_t now)
}
else
{
Log(TRACE_PROTOCOL, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
LOG_I("%s:PINGRESP not received\n",__func__);
// 增加容错允许ping_outstanding为2时再等待一个周期
if (client->ping_outstanding >= 2) {
Log(TRACE_PROTOCOL, -1, "PINGRESP not received after 2 keepalive intervals for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
LOG_I("%s:PINGRESP not received after 2 attempts\n",__func__);
MQTTProtocol_closeSession(client, 1);
} else {
// 第二次发送PINGREQ
if (Socket_noPendingWrites(client->net.socket)) {
if (MQTTPacket_send_pingreq(&client->net, client->clientID) != TCPSOCKET_COMPLETE) {
Log(TRACE_PROTOCOL, -1, "Error sending second PINGREQ for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
LOG_I("%s:Error sending second PINGREQ for client %s on socket %d, disconnecting\n",__func__,client->clientID, client->net.socket);
MQTTProtocol_closeSession(client, 1);
} else {
LOG_I("Second PINGREQ send success at %ld\n", (long)now);
client->net.lastSent = now;
client->ping_outstanding++;
}
}
}
}
}
}

View File

@ -178,7 +178,7 @@ int MQTTProtocol_handlePingresps(void* pack, int sock)
client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID);
client->ping_outstanding = 0;
//LOG_I("PINGRESP receive success\n");
LOG_I("PINGRESP received successfully, ping_outstanding cleared\n");
FUNC_EXIT_RC(rc);
return rc;
}

View File

@ -25,6 +25,9 @@
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdlib.h>
#include "mqtt_utils.h"
@ -49,6 +52,7 @@ char nativeUpgradeTopicName[1024] = "";
char subscribeTopics[10][1024] = {""};
int subscribeTopicCount = 0;
extern char softwareVersion[16];
extern bool isMqttConnected;
typedef struct{
enum MQTTASYNC_TRACE_LEVELS level;
char name[64];
@ -119,8 +123,56 @@ json_error:
json_object_put(root);
}
// 黄灯闪烁相关
static pthread_t pt_yellow_led_flash;
static volatile bool yellow_led_flash_running = false;
static pthread_mutex_t yellow_led_mutex = PTHREAD_MUTEX_INITIALIZER;
void *thread_yellow_led_flash(void *arg);
void *thread_yellow_led_flash(void *arg) {
LOG_I("Yellow LED flash thread started\n");
// 初始化随机种子
srand((unsigned int)time(NULL));
while (yellow_led_flash_running) {
// 生成随机延时 (100ms - 500ms)
int delay_us = 100000 + (rand() % 400000); // 100000-500000 微秒
// 点亮黄灯
system("echo 1 > /sys/class/gpio/gpio114/value");
// 随机延时保持亮
usleep(delay_us);
// 检查是否需要退出
if (!yellow_led_flash_running) break;
// 熄灭黄灯
system("echo 0 > /sys/class/gpio/gpio114/value");
// 随机延时保持灭
usleep(delay_us);
}
// 确保退出时灯是灭的
system("echo 0 > /sys/class/gpio/gpio114/value");
LOG_I("Yellow LED flash thread exited\n");
return NULL;
}
void mqtt_net_failure(int *failure_times){
system("echo 0 > /sys/class/gpio/gpio113/value");
// 停止黄灯闪烁
pthread_mutex_lock(&yellow_led_mutex);
if (yellow_led_flash_running) {
yellow_led_flash_running = false;
pthread_join(pt_yellow_led_flash, NULL);
LOG_I("Yellow LED flash thread stopped\n");
}
pthread_mutex_unlock(&yellow_led_mutex);
system("echo 0 > /sys/class/gpio/gpio114/value");
if(failure_times != NULL){
(*failure_times)++;
LOG_I("mqtt net failure_times = %d\n", *failure_times);
@ -237,8 +289,8 @@ static int mqtt_utils_parse_sys_lightOperate_invoke_topic(const char *topic, cha
}
int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message){
LOG_I("---------------%s------------------\n",__func__);
LOG_I("收到消息topic: %s\n", topicName ? topicName : "(null)");
// LOG_I("---------------%s------------------\n",__func__);
// LOG_I("收到消息topic: %s\n", topicName ? topicName : "(null)");
// 检查消息是否来自任何一个订阅的topic
int topic_matched = 0;
@ -265,7 +317,7 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
}
}
LOG_I("消息匹配topic[%d]: %s\n", i, subscribeTopics[i]);
// LOG_I("消息匹配topic[%d]: %s\n", i, subscribeTopics[i]);
if (message != NULL && message->payload != NULL && message->payloadlen > 0) {
char payload_buf[1024] = {0};
size_t copy_len = (size_t)message->payloadlen;
@ -276,7 +328,7 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
payload_buf[copy_len] = '\0';
PutDataIntoMQueue(payload_buf);
} else {
LOG_I("消息payload为空跳过入队\n");
// LOG_I("消息payload为空跳过入队\n");
}
topic_matched = 1;
break;
@ -284,7 +336,7 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
}
if(!topic_matched) {
LOG_I("消息topic不匹配任何订阅的topic: %s\n", topicName);
// LOG_I("消息topic不匹配任何订阅的topic: %s\n", topicName);
}
MQTTAsync_freeMessage(&message);
@ -295,15 +347,15 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
}
void mqtt_utils_connected(void *context, char *cause){
LOG_I("=== mqtt_utils_connected 函数被调用 ===\n");
// LOG_I("=== mqtt_utils_connected 函数被调用 ===\n");
if (cause != NULL) {
LOG_I("%s cause:%s\n",__func__, cause);
// LOG_I("%s cause:%s\n",__func__, cause);
} else {
LOG_I("%s\n",__func__);
// LOG_I("%s\n",__func__);
}
connect_failure_times=0;
LOG_I(" MQTT连接成功开始订阅%d个topic\n", subscribeTopicCount);
// LOG_I(" MQTT连接成功开始订阅%d个topic\n", subscribeTopicCount);
// 订阅所有设置的topic
for(int i = 0; i < subscribeTopicCount; i++) {
@ -314,29 +366,105 @@ void mqtt_utils_connected(void *context, char *cause){
LOG_I("订阅topic[%d]: %s\n", i, subscribeTopics[i]);
int subscribe_result = mqtt_utils_subscribe(mqtt_conf, subscribeTopics[i], 1);
LOG_I("订阅topic[%d]请求结果: %d\n", i, subscribe_result);
// LOG_I("订阅topic[%d]请求结果: %d\n", i, subscribe_result);
}
// 添加通配符订阅来接收所有消息(用于调试)
LOG_I("=== 添加通配符订阅用于调试 ===\n");
// LOG_I("=== 添加通配符订阅用于调试 ===\n");
int wildcard_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "#", 1);
LOG_I("通配符订阅请求结果: %d\n", wildcard_subscribe_result);
LOG_I("=== 通配符订阅完成 ===\n");
// LOG_I("通配符订阅请求结果: %d\n", wildcard_subscribe_result);
// LOG_I("=== 通配符订阅完成 ===\n");
// 订阅公共topic用于测试
LOG_I("=== 订阅公共topic用于测试 ===\n");
// LOG_I("=== 订阅公共topic用于测试 ===\n");
int public_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "/test/public/topic", 0);
LOG_I("公共topic订阅请求结果: %d\n", public_subscribe_result);
LOG_I("=== 公共topic订阅完成 ===\n");
// LOG_I("公共topic订阅请求结果: %d\n", public_subscribe_result);
// LOG_I("=== 公共topic订阅完成 ===\n");
system("echo 1 > /sys/class/gpio/gpio113/value");//yellow ok
// 启动黄灯闪烁线程
pthread_mutex_lock(&yellow_led_mutex);
if (!yellow_led_flash_running) {
yellow_led_flash_running = true;
int ret = pthread_create(&pt_yellow_led_flash, NULL, thread_yellow_led_flash, NULL);
if (ret != 0) {
LOG_I("Failed to create yellow LED flash thread, using constant on\n");
yellow_led_flash_running = false;
system("echo 1 > /sys/class/gpio/gpio114/value");//yellow ok
} else {
pthread_detach(pt_yellow_led_flash);
LOG_I("Yellow LED flash thread started\n");
}
}
pthread_mutex_unlock(&yellow_led_mutex);
station_status_report();
LOG_I("=== mqtt_utils_connected 函数执行完成 ===\n");
// 设置MQTT连接标志
isMqttConnected = true;
// LOG_I("MQTT connected, isMqttConnected = true\n");
// 补报所有已在线的灯条登录信息
typedef struct {
uint32_t tagCode;
time_t lastHeartbeat;
bool isOnline;
} lightbar_heartbeat_t;
extern int lightbarHeartbeatCount;
extern lightbar_heartbeat_t lightbarHeartbeat[];
extern pthread_mutex_t heartbeatMutex;
extern void report_lightbar_login(uint32_t tagCode);
pthread_mutex_lock(&heartbeatMutex);
LOG_I("Reporting login for %d online lightbars again\n", lightbarHeartbeatCount);
for (int i = 0; i < lightbarHeartbeatCount; i++) {
if (lightbarHeartbeat[i].isOnline) {
LOG_I("Report again lightbar %08X login\n", lightbarHeartbeat[i].tagCode);
report_lightbar_login(lightbarHeartbeat[i].tagCode);
// 添加短暂延迟避免MQTT缓冲区溢出
usleep(100 * 1000); // 100ms延迟
}
}
pthread_mutex_unlock(&heartbeatMutex);
// 启动灭灯检测线程(只启动一次)
static bool light_off_thread_started = false;
if (!light_off_thread_started) {
extern pthread_t pt_light_off_check;
extern void *thread_light_off_check(void *arg);
int ret = pthread_create(&pt_light_off_check, NULL, thread_light_off_check, NULL);
if(ret != 0) {
LOG_I("pthread_create light_off_check fail\n");
} else {
LOG_I("pthread_create light_off_check success\n");
pthread_detach(pt_light_off_check);
light_off_thread_started = true;
}
}
// 启动心跳检测线程(只启动一次)
static bool heartbeat_thread_started = false;
if (!heartbeat_thread_started) {
extern pthread_t pt_heartbeat_check;
extern void *thread_heartbeat_check(void *arg);
int ret = pthread_create(&pt_heartbeat_check, NULL, thread_heartbeat_check, NULL);
if(ret != 0){
LOG_I("pthread_create heartbeat_check fail\n");
} else {
LOG_I("pthread_create heartbeat_check success\n");
pthread_detach(pt_heartbeat_check);
heartbeat_thread_started = true;
}
}
// LOG_I("=== mqtt_utils_connected 函数执行完成 ===\n");
}
void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){
//LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc));
//mqtt_led_net_status_judge();
// 设置MQTT连接标志为false
isMqttConnected = false;
// LOG_I("MQTT disconnected, set isMqttConnected = false\n");
}
void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){
@ -347,16 +475,16 @@ void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *respons
void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){
//LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none");
// mqtt_led_net_status_judge();
LOG_I("MQTT连接失败 - 错误代码: %s\n", response ? MQTTAsync_strerror(response->code) : "未知错误");
// LOG_I("MQTT连接失败 - 错误代码: %s\n", response ? MQTTAsync_strerror(response->code) : "未知错误");
if (response && response->message) {
LOG_I("MQTT连接失败 - 错误信息: %s\n", response->message);
// LOG_I("MQTT连接失败 - 错误信息: %s\n", response->message);
}
mqtt_net_failure(&connect_failure_times);
}
static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){
MQTTAsync client = (MQTTAsync)context;
LOG_I("%s ssl error: %s\n",__func__, str);
// LOG_I("%s ssl error: %s\n",__func__, str);
return 0;
}
@ -365,59 +493,59 @@ void mqtt_utils_on_publish_success(void *context, MQTTAsync_successData *respons
}
void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
// LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
// 如果是连接相关的问题,记录详细信息
if (response && response->code == MQTTASYNC_OPERATION_INCOMPLETE) {
LOG_I("发布失败:操作在完成前被丢弃,可能是连接状态不稳定\n");
// LOG_I("发布失败:操作在完成前被丢弃,可能是连接状态不稳定\n");
} else if (response && response->code == MQTTASYNC_DISCONNECTED) {
LOG_I("发布失败:客户端已断开连接\n");
// LOG_I("发布失败:客户端已断开连接\n");
}
}
void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){
LOG_I("=== MQTT订阅成功回调被调用 ===\n");
LOG_I("MQTT订阅成功\n");
// LOG_I("=== MQTT订阅成功回调被调用 ===\n");
// LOG_I("MQTT订阅成功\n");
}
void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
// LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
}
void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){
LOG_I("%s\n",__func__);
// LOG_I("%s\n",__func__);
//mqtt_led_net_status_judge();
}
int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){
int rc = 0;
LOG_I("开始订阅topic: %s, qos: %d\n", topic, qos);
LOG_I("MQTT客户端指针: %p\n", mqtt_utils->client);
LOG_I("订阅选项指针: %p\n", &mqtt_utils->sub_opts);
// LOG_I("开始订阅topic: %s, qos: %d\n", topic, qos);
// LOG_I("MQTT客户端指针: %p\n", mqtt_utils->client);
// LOG_I("订阅选项指针: %p\n", &mqtt_utils->sub_opts);
// 检查参数有效性
if (topic == NULL || strlen(topic) == 0) {
LOG_I("错误topic参数无效\n");
// LOG_I("错误topic参数无效\n");
return -1;
}
if (mqtt_utils->client == NULL) {
LOG_I("错误MQTT客户端未初始化\n");
// LOG_I("错误MQTT客户端未初始化\n");
return -1;
}
// 跳过连接状态检查,直接进行订阅
LOG_I("跳过连接状态检查,直接进行订阅\n");
LOG_I("准备调用 MQTTAsync_subscribe...\n");
// LOG_I("跳过连接状态检查,直接进行订阅\n");
// LOG_I("准备调用 MQTTAsync_subscribe...\n");
rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils->sub_opts);
LOG_I("MQTTAsync_subscribe 调用完成,返回值: %d\n", rc);
// LOG_I("MQTTAsync_subscribe 调用完成,返回值: %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("订阅失败,错误代码: %s\n", MQTTAsync_strerror(rc));
// LOG_I("订阅失败,错误代码: %s\n", MQTTAsync_strerror(rc));
} else {
LOG_I("订阅请求已发送,等待回调\n");
// LOG_I("订阅请求已发送,等待回调\n");
}
return rc;
}
@ -448,16 +576,16 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
int ret = -1;
int rc = 0;
LOG_I("=== 开始MQTT连接初始化 ===\n");
LOG_I("ClientId: %s\n", mqtt_conf->clientid);
LOG_I("Username: %s\n", mqtt_conf->username);
LOG_I("Password: %s\n", mqtt_conf->password);
LOG_I("Host: %s\n", mqtt_conf->host);
LOG_I("Port: %d\n", mqtt_conf->port);
// LOG_I("=== 开始MQTT连接初始化 ===\n");
// LOG_I("ClientId: %s\n", mqtt_conf->clientid);
// LOG_I("Username: %s\n", mqtt_conf->username);
// LOG_I("Password: %s\n", mqtt_conf->password);
// LOG_I("Host: %s\n", mqtt_conf->host);
// LOG_I("Port: %d\n", mqtt_conf->port);
// 提前设置topic名称确保在连接回调中可用
sprintf(nativeInvokTopicName,"/iot%s/thing/ota/upgrade",mqtt_conf->username);
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// 设置多个订阅topic
subscribeTopicCount = 0;
@ -466,13 +594,13 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/bind", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/group", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot%s/thing/ota/upgrade", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/sys/WcSubLightStrip/AD1000014C11/thing/service/lightOperate/invoke");
// 只使用通配符订阅覆盖所有设备的lightOperate消息
sprintf(subscribeTopics[subscribeTopicCount++], "/sys/+/+/thing/service/lightOperate/invoke");
LOG_I("设置了%d个订阅topic:\n", subscribeTopicCount);
for(int i = 0; i < subscribeTopicCount; i++) {
LOG_I("Topic[%d]: %s\n", i, subscribeTopics[i]);
}
// LOG_I("设置了%d个订阅topic:\n", subscribeTopicCount);
// for(int i = 0; i < subscribeTopicCount; i++) {
// LOG_I("Topic[%d]: %s\n", i, subscribeTopics[i]);
// }
infos = MQTTAsync_getVersionInfo();
printVersionInfo(infos);
@ -481,7 +609,7 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
//snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port);
snprintf(url, sizeof(url), "tcp://%s:%d", mqtt_conf->host, mqtt_conf->port);
LOG_I("MQTT URL: %s\n", url);
// LOG_I("MQTT URL: %s\n", url);
create_opts.sendWhileDisconnected = 0;
rc = MQTTAsync_createWithOptions(&mqtt_conf->client,
@ -490,54 +618,54 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
MQTTCLIENT_PERSISTENCE_NONE,
NULL,
&create_opts);
LOG_I("MQTTAsync_create rc = %d\n", rc);
// LOG_I("MQTTAsync_create rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("MQTT客户端创建失败: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
LOG_I("MQTTAsync_create succes s\n");
// LOG_I("MQTTAsync_create success\n");
// 设置所有必要的回调函数
LOG_I("设置MQTT回调函数\n");
// LOG_I("设置MQTT回调函数\n");
rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected);
LOG_I("MQTTAsync_setConnected rc = %d\n", rc);
// LOG_I("MQTTAsync_setConnected rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置连接回调失败: %s\n", MQTTAsync_strerror(rc));
// LOG_I("设置连接回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setConnected success\n");
// LOG_I("MQTTAsync_setConnected success\n");
rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected);
LOG_I("MQTTAsy nc_setDisconnected rc = %d\n", rc);
// LOG_I("MQTTAsync_setDisconnected rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置断开回调失败: %s\n", MQTTAsync_strerror(rc));
// LOG_I("设置断开回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setDisconnected success\n");
// LOG_I("MQTTAsync_setDisconnected success\n");
// 设置消息到达回调
rc = MQTTAsync_setMessageArrivedCallback(mqtt_conf->client, NULL, mqtt_utils_message_arrived);
LOG_I("MQTTAsync_setMessageArrivedCallback rc = %d\n", rc);
// LOG_I("MQTTAsync_setMessageArrivedCallback rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置消息到达回调失败: %s\n", MQTTAsync_strerror(rc));
// LOG_I("设置消息到达回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setMessageArrivedCallback success\n");
// LOG_I("MQTTAsync_setMessageArrivedCallback success\n");
// 设置连接丢失回调
rc = MQTTAsync_setConnectionLostCallback(mqtt_conf->client, NULL, mqtt_utils_connection_lost);
LOG_I("MQTTAsync_setConnectionLostCallback rc = %d\n", rc);
// LOG_I("MQTTAsync_setConnectionLostCallback rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置连接丢失回调失败: %s\n", MQTTAsync_strerror(rc));
// LOG_I("设置连接丢失回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setConnectionLostCallback success\n");
// LOG_I("MQTTAsync_setConnectionLostCallback success\n");
/* connect option */
conn_opts.onSuccess = mqtt_utils_on_connect_success;
@ -579,15 +707,15 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
conn_opts.ssl = &ssl_opts;
}
LOG_I("正在连接MQTT服务器...\n");
// LOG_I("正在连接MQTT服务器...\n");
rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts);
LOG_I("MQTTAsync_connect rc = %d\n", rc);
// LOG_I("MQTTAsync_connect rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("MQTT连接启动失败: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
LOG_I("MQTTAsync_connect 启动成功\n");
// LOG_I("MQTTAsync_connect 启动成功\n");
mqtt_conf->sub_opts = sub_opts;
mqtt_conf->pub_opts = pub_opts;
@ -604,12 +732,12 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// 验证回调函数设置
LOG_I("=== 验证回调函数设置 ===\n");
LOG_I("消息到达回调函数地址: %p\n", mqtt_utils_message_arrived);
LOG_I("连接成功回调函数地址: %p\n", mqtt_utils_connected);
LOG_I("订阅成功回调函数地址: %p\n", mqtt_utils_on_subscribe_success);
LOG_I("订阅失败回调函数地址: %p\n", mqtt_utils_on_subscribe_failure);
LOG_I("=== 回调函数验证完成 ===\n");
// LOG_I("=== 验证回调函数设置 ===\n");
// LOG_I("消息到达回调函数地址: %p\n", mqtt_utils_message_arrived);
// LOG_I("连接成功回调函数地址: %p\n", mqtt_utils_connected);
// LOG_I("订阅成功回调函数地址: %p\n", mqtt_utils_on_subscribe_success);
// LOG_I("订阅失败回调函数地址: %p\n", mqtt_utils_on_subscribe_failure);
// LOG_I("=== 回调函数验证完成 ===\n");
ret = 0;
error:

View File

@ -67,7 +67,7 @@ typedef struct
extern mqtt_utils_t mqtt_config;
extern void PutDataIntoMQueue(char *payload);
extern char g_mqtt_deviceName[256];
extern char g_mqtt_deviceName[256]; // 从topic解析出的deviceName
typedef struct{
char msg_messageId[32];

View File

@ -3423,34 +3423,47 @@ int uart_data_receive_data_back(uart_utils_t *uart,uint16_t *parmAck,uint16_t *t
uint16_t *signCode,uint16_t *reserve,uint32_t *lableParm){
int ret = 0;
jt_data_back_package_t jt_data_back_package;
ret = uart_read_until_time(uart->uart_fd,(char *)&jt_data_back_package,sizeof(jt_data_back_package_t), 1000, 50);
if(ret == sizeof(jt_data_back_package_t)){
//LOG_I("%s success\n", __func__);
if((jt_data_back_package.pre1 == 0x2424)
uint8_t *buf = (uint8_t *)&jt_data_back_package;
int pkg_size = sizeof(jt_data_back_package_t);
ret = uart_read_until_time(uart->uart_fd,(char *)buf, pkg_size, 1000, 50);
if(ret != pkg_size){
return -1;
}
// 检查包头
if(!((jt_data_back_package.pre1 == 0x2424)
&&(jt_data_back_package.pre2 == 0x2424)
&&(jt_data_back_package.pre3 == 0x2424)){
uint8_t version_1=(ntohs(jt_data_back_package.version)>>8)&0xFF;
uint8_t version_2=(ntohs(jt_data_back_package.version))&0xFF;
uint8_t signCode_1=(ntohs(jt_data_back_package.signCode)>>8)&0xFF;
uint8_t signCode_2=(ntohs(jt_data_back_package.signCode))&0xFF;
uint8_t reserve_1=(ntohs(jt_data_back_package.reserve)>>8)&0xFF;
uint8_t reserve_2=(ntohs(jt_data_back_package.reserve))&0xFF;
uint8_t lableParm_1=(ntohl(jt_data_back_package.lableParm)>>24)&0xFF;
uint8_t lableParm_2=(ntohl(jt_data_back_package.lableParm)>>16)&0xFF;
uint8_t lableParm_3=(ntohl(jt_data_back_package.lableParm)>>8)&0xFF;
uint8_t lableParm_4=(ntohl(jt_data_back_package.lableParm))&0xFF;
uint8_t test1[15] = {jt_data_back_package.len,jt_data_back_package.featureCode,
jt_data_back_package.count,jt_data_back_package.battery,version_1,version_2,
jt_data_back_package.ledCtrl,signCode_1,signCode_2,reserve_1,reserve_2,
lableParm_1,lableParm_2,lableParm_3,lableParm_4};
uint16_t crc_c=CRC16_XMODEM(test1,15);
//for(int i=0;i<15;i++){
// printf("%02x ",test1[i]);
//}
//LOG_I("%s:XModem_CRC16 = %04x\r\n",__func__,crc_c);
//LOG_I("CRC = %04x\r\n",ntohs(jt_data_back_package.crc));
&&(jt_data_back_package.pre3 == 0x2424))){
// 帧错位,尝试在已读数据中找包头 0x24 0x24
int sync_offset = -1;
for(int i = 1; i < pkg_size - 1; i++){
if(buf[i] == 0x24 && buf[i+1] == 0x24){
sync_offset = i;
break;
}
}
if(sync_offset > 0){
// 找到包头,左移数据并补读剩余字节
int remain = pkg_size - sync_offset;
memmove(buf, buf + sync_offset, remain);
int need = sync_offset;
int got = uart_read_until_time(uart->uart_fd, (char *)(buf + remain), need, 500, 50);
if(got != need){
return -2;
}
// 重新校验包头
if(!((jt_data_back_package.pre1 == 0x2424)
&&(jt_data_back_package.pre2 == 0x2424)
&&(jt_data_back_package.pre3 == 0x2424))){
return -2;
}
}else{
return -2;
}
}
// 解析数据
*parmAck=ntohs(jt_data_back_package.pre4);
*tagCodeHead=ntohs(jt_data_back_package.tagHead);
*tagCode=ntohl(jt_data_back_package.tag);
@ -3464,14 +3477,5 @@ int uart_data_receive_data_back(uart_utils_t *uart,uint16_t *parmAck,uint16_t *t
*signCode=ntohs(jt_data_back_package.signCode);
*reserve=ntohs(jt_data_back_package.reserve);
*lableParm=ntohl(jt_data_back_package.lableParm);
}else{
// LOG_I("%s failed\n", __func__);
ret = -2;
}
}else{
//LOG_I("%s, failed, time out\n", __func__);
ret = -1;
}
return ret;
return pkg_size;
}

View File

@ -1414,52 +1414,51 @@ typedef struct
typedef struct
{
uint8_t len;
uint16_t customCode;
uint8_t type;
uint8_t bindStart;
uint8_t bindEnd;
uint32_t idStart;
uint32_t idEnd;
jt_led_or_group_package_t ledCtrl;
uint16_t reserve;
uint32_t crc;
uint8_t len;
uint16_t customCode;
uint8_t type;
uint8_t bindStart;
uint8_t bindEnd;
uint32_t idStart;
uint32_t idEnd;
jt_led_or_group_package_t ledCtrl;
uint16_t reserve;
uint32_t crc;
}__attribute__((packed)) jt_id_package_t;
typedef struct
{
uint16_t pre1;//$$ 0x2424
uint16_t pre2;//$$ 0x2424
uint16_t pre3;//$$ 0x2424 CC 0x4343 DD 0x4444
uint16_t ack;//## 0x2323 complete TT 0x5454 timeout RR 0x5252 AP-reset EE 0x4545 AP-crc-error FF 0x4646 AP broadcast end
uint16_t pre1;//$$ 0x2424
uint16_t pre2;//$$ 0x2424
uint16_t pre3;//$$ 0x2424 CC 0x4343 DD 0x4444
uint16_t ack;//## 0x2323 complete TT 0x5454 timeout RR 0x5252 AP-reset EE 0x4545 AP-crc-error FF 0x4646 AP broadcast end
}__attribute__((packed)) jt_receive_package_t;
typedef struct
{
uint16_t v1;
uint16_t v2;
uint16_t v1;
uint16_t v2;
}__attribute__((packed)) jt_receive_version_package_t;
typedef struct
{
uint16_t pre1;
uint16_t pre2;
uint16_t pre3;
uint16_t pre4;
uint16_t tagHead;
uint32_t tag;
uint8_t signal;
uint8_t len;
uint8_t featureCode;
uint8_t count;
uint8_t battery;
uint16_t version;
uint8_t ledCtrl;
uint16_t signCode;
uint16_t reserve;
uint32_t lableParm;
uint16_t crc;
uint16_t pre1;
uint16_t pre2;
uint16_t pre3;
uint16_t pre4;
uint16_t tagHead;
uint32_t tag;
uint8_t signal;
uint8_t len;
uint8_t featureCode;
uint8_t count;
uint8_t battery;
uint16_t version;
uint8_t ledCtrl;
uint16_t signCode;
uint16_t reserve;
uint32_t lableParm;
uint16_t crc;
}__attribute__((packed)) jt_data_back_package_t;
uint32_t CRC32_DIRECT(uint8_t *data, uint32_t length);
uint16_t CRC16_XMODEM(uint8_t *puchMsg, uint32_t usDataLen);
@ -1468,10 +1467,10 @@ int uart_data_send_head(uart_utils_t *uart,uint8_t func,uint8_t wakeup_time,uint
int uart_data_send_lighton_or_group(uart_utils_t *uart,jt_tag_package_t tags[],uint8_t tag_num);
int uart_data_send_lighton_by_group(uart_utils_t *uart,jt_group_package_t groups[],uint8_t tag_num);
int uart_data_send_lighton_by_id(uart_utils_t *uart,uint8_t bind_start,uint8_t bind_end,
uint32_t id_start,uint32_t id_end,jt_led_or_group_package_t led_ctrl,uint16_t reserve);
uint32_t id_start,uint32_t id_end,jt_led_or_group_package_t led_ctrl,uint16_t reserve);
int uart_data_receive_data_back(uart_utils_t *uart,uint16_t *parmAck,uint16_t *tagCodeHead,uint32_t *tagCode,uint8_t *tagSignal,
uint8_t *totalLen,uint8_t *tagFeature,uint8_t *count,uint8_t *batteryV,uint16_t *version,uint8_t *ledCtrl,
uint16_t *signCode,uint16_t *reserve,uint32_t *lableParm);
uint8_t *totalLen,uint8_t *tagFeature,uint8_t *count,uint8_t *batteryV,uint16_t *version,uint8_t *ledCtrl,
uint16_t *signCode,uint16_t *reserve,uint32_t *lableParm);
int uart_data_receive_ack(uart_utils_t *uart,uint16_t *parm_ack);
int uart_data_receive_version(uart_utils_t *uart);
#endif