Compare commits

...

7 Commits

Author SHA1 Message Date
zzh
fc95e3561a 减少mqtt断联 2025-12-25 10:15:56 +08:00
zzh
f910a4d701 bug fix 2025-12-24 16:13:04 +08:00
zzh
67e579ac27 新增灭灯上报 2025-12-24 13:41:09 +08:00
zzh
30700b9178 更新亮灯成功上报,mqtt 连接成功时随机闪烁黄灯 2025-12-24 10:25:33 +08:00
zzh
72a94babe5 取消没必要的打印
增加灯条心跳漏报机制
增加模拟 topic
2025-12-23 11:19:36 +08:00
zzh
9ccab6c22f 修复编译错误 2025-12-22 16:21:29 +08:00
zzh
16b46d1b00 同步改版目录内容到 meituan_2.0_New_core_board 2025-12-22 11:53:28 +08:00
21 changed files with 1362 additions and 357 deletions

BIN
hiredis/alloc.o Normal file

Binary file not shown.

BIN
hiredis/async.o Normal file

Binary file not shown.

BIN
hiredis/hiredis-test Executable file

Binary file not shown.

BIN
hiredis/hiredis.o Normal file

Binary file not shown.

11
hiredis/hiredis.pc Normal file
View File

@ -0,0 +1,11 @@
prefix=/usr/local
exec_prefix=${prefix}
libdir=/usr/local/lib
includedir=/usr/local/include
pkgincludedir=/usr/local/include/hiredis
Name: hiredis
Description: Minimalistic C client library for Redis.
Version: 1.3.0
Libs: -L${libdir} -lhiredis
Cflags: -I${pkgincludedir} -I${includedir} -D_FILE_OFFSET_BITS=64

BIN
hiredis/libhiredis.a Normal file

Binary file not shown.

BIN
hiredis/libhiredis.so Executable file

Binary file not shown.

BIN
hiredis/net.o Normal file

Binary file not shown.

BIN
hiredis/read.o Normal file

Binary file not shown.

BIN
hiredis/sds.o Normal file

Binary file not shown.

BIN
hiredis/sockcompat.o Normal file

Binary file not shown.

BIN
hiredis/test.o Normal file

Binary file not shown.

1088
main.c

File diff suppressed because it is too large Load Diff

View File

@ -2722,8 +2722,8 @@ static void setRetryLoopInterval(int keepalive)
if (proposed < 1)
proposed = 1;
else if (proposed > 5)
proposed = 5;
else if (proposed > 60)
proposed = 60; // 提高上限到60秒适应更长的keepalive
if (proposed < retryLoopInterval)
retryLoopInterval = proposed;
}

View File

@ -1323,8 +1323,8 @@ static void setRetryLoopInterval(int keepalive)
if (proposed < 1)
proposed = 1;
else if (proposed > 5)
proposed = 5;
else if (proposed > 60)
proposed = 60; // 提高上限到60秒适应更长的keepalive
if (proposed < retryLoopInterval)
retryLoopInterval = proposed;
}

View File

@ -633,7 +633,7 @@ void MQTTProtocol_keepalive(time_t now)
}
else
{
//LOG_I("PINGREQ send success\n");
LOG_I("PINGREQ send success at %ld\n", (long)now);
client->net.lastSent = now;
client->ping_outstanding = 1;
}
@ -641,9 +641,25 @@ void MQTTProtocol_keepalive(time_t now)
}
else
{
Log(TRACE_PROTOCOL, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
LOG_I("%s:PINGRESP not received\n",__func__);
MQTTProtocol_closeSession(client, 1);
// 增加容错允许ping_outstanding为2时再等待一个周期
if (client->ping_outstanding >= 2) {
Log(TRACE_PROTOCOL, -1, "PINGRESP not received after 2 keepalive intervals for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
LOG_I("%s:PINGRESP not received after 2 attempts\n",__func__);
MQTTProtocol_closeSession(client, 1);
} else {
// 第二次发送PINGREQ
if (Socket_noPendingWrites(client->net.socket)) {
if (MQTTPacket_send_pingreq(&client->net, client->clientID) != TCPSOCKET_COMPLETE) {
Log(TRACE_PROTOCOL, -1, "Error sending second PINGREQ for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
LOG_I("%s:Error sending second PINGREQ for client %s on socket %d, disconnecting\n",__func__,client->clientID, client->net.socket);
MQTTProtocol_closeSession(client, 1);
} else {
LOG_I("Second PINGREQ send success at %ld\n", (long)now);
client->net.lastSent = now;
client->ping_outstanding++;
}
}
}
}
}
}

View File

@ -178,7 +178,7 @@ int MQTTProtocol_handlePingresps(void* pack, int sock)
client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID);
client->ping_outstanding = 0;
//LOG_I("PINGRESP receive success\n");
LOG_I("PINGRESP received successfully, ping_outstanding cleared\n");
FUNC_EXIT_RC(rc);
return rc;
}

View File

@ -25,9 +25,15 @@
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdlib.h>
#include "mqtt_utils.h"
// 全局变量用于存储从topic解析出的deviceName
char g_mqtt_deviceName[256] = {0};
#include "main.h"
#define PRINT_TIME_TAG
#define DBG_TAG "mqtt_utils"
@ -46,6 +52,7 @@ char nativeUpgradeTopicName[1024] = "";
char subscribeTopics[10][1024] = {""};
int subscribeTopicCount = 0;
extern char softwareVersion[16];
extern bool isMqttConnected;
typedef struct{
enum MQTTASYNC_TRACE_LEVELS level;
char name[64];
@ -116,8 +123,56 @@ json_error:
json_object_put(root);
}
// 黄灯闪烁相关
static pthread_t pt_yellow_led_flash;
static volatile bool yellow_led_flash_running = false;
static pthread_mutex_t yellow_led_mutex = PTHREAD_MUTEX_INITIALIZER;
void *thread_yellow_led_flash(void *arg);
void *thread_yellow_led_flash(void *arg) {
LOG_I("Yellow LED flash thread started\n");
// 初始化随机种子
srand((unsigned int)time(NULL));
while (yellow_led_flash_running) {
// 生成随机延时 (100ms - 500ms)
int delay_us = 100000 + (rand() % 400000); // 100000-500000 微秒
// 点亮黄灯
system("echo 1 > /sys/class/gpio/gpio114/value");
// 随机延时保持亮
usleep(delay_us);
// 检查是否需要退出
if (!yellow_led_flash_running) break;
// 熄灭黄灯
system("echo 0 > /sys/class/gpio/gpio114/value");
// 随机延时保持灭
usleep(delay_us);
}
// 确保退出时灯是灭的
system("echo 0 > /sys/class/gpio/gpio114/value");
LOG_I("Yellow LED flash thread exited\n");
return NULL;
}
void mqtt_net_failure(int *failure_times){
system("echo 0 > /sys/class/gpio/gpio113/value");
// 停止黄灯闪烁
pthread_mutex_lock(&yellow_led_mutex);
if (yellow_led_flash_running) {
yellow_led_flash_running = false;
pthread_join(pt_yellow_led_flash, NULL);
LOG_I("Yellow LED flash thread stopped\n");
}
pthread_mutex_unlock(&yellow_led_mutex);
system("echo 0 > /sys/class/gpio/gpio114/value");
if(failure_times != NULL){
(*failure_times)++;
LOG_I("mqtt net failure_times = %d\n", *failure_times);
@ -161,40 +216,146 @@ void mqtt_utils_connection_lost(void *context, char *cause){
mqtt_net_failure(&connect_failure_times);
}
static int mqtt_utils_topic_match(const char *filter, const char *topic)
{
size_t fi = 0;
size_t ti = 0;
if (filter == NULL || topic == NULL) {
return 0;
}
while (1) {
size_t fseg_len = 0;
size_t tseg_len = 0;
while (filter[fi + fseg_len] != '\0' && filter[fi + fseg_len] != '/') {
fseg_len++;
}
while (topic[ti + tseg_len] != '\0' && topic[ti + tseg_len] != '/') {
tseg_len++;
}
if (fseg_len == 1 && filter[fi] == '#') {
return 1;
}
if (fseg_len == 1 && filter[fi] == '+') {
} else {
if (fseg_len != tseg_len) {
return 0;
}
if (strncmp(filter + fi, topic + ti, fseg_len) != 0) {
return 0;
}
}
fi += fseg_len;
ti += tseg_len;
if (filter[fi] == '\0' && topic[ti] == '\0') {
return 1;
}
if (filter[fi] == '\0' || topic[ti] == '\0') {
return 0;
}
if (filter[fi] != '/' || topic[ti] != '/') {
return 0;
}
fi++;
ti++;
}
}
static int mqtt_utils_parse_sys_lightOperate_invoke_topic(const char *topic, char *out1, size_t out1_len, char *out2,
size_t out2_len)
{
char tmp1[256] = {0};
char tmp2[256] = {0};
if (topic == NULL || out1 == NULL || out2 == NULL || out1_len == 0 || out2_len == 0) {
return -1;
}
if (sscanf(topic, "/sys/%255[^/]/%255[^/]/thing/service/lightOperate/invoke", tmp1, tmp2) != 2) {
return -1;
}
snprintf(out1, out1_len, "%s", tmp1);
snprintf(out2, out2_len, "%s", tmp2);
return 0;
}
int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message){
LOG_I("---------------%s------------------\n",__func__);
LOG_I("收到消息topic: %s\n", topicName);
// LOG_I("---------------%s------------------\n",__func__);
// LOG_I("收到消息topic: %s\n", topicName ? topicName : "(null)");
// 检查消息是否来自任何一个订阅的topic
int topic_matched = 0;
for(int i = 0; i < subscribeTopicCount; i++) {
if(strcmp(topicName, subscribeTopics[i]) == 0) {
LOG_I("消息匹配topic[%d]: %s\n", i, subscribeTopics[i]);
PutDataIntoMQueue(message->payload);
int matched = 0;
if (strchr(subscribeTopics[i], '+') != NULL || strchr(subscribeTopics[i], '#') != NULL) {
matched = mqtt_utils_topic_match(subscribeTopics[i], topicName);
} else {
matched = (strcmp(topicName, subscribeTopics[i]) == 0);
}
if (matched) {
if (strcmp(subscribeTopics[i], "/sys/+/+/thing/service/lightOperate/invoke") == 0) {
char field1[256] = {0};
char field2[256] = {0};
if (mqtt_utils_parse_sys_lightOperate_invoke_topic(topicName, field1, sizeof(field1), field2,
sizeof(field2)) == 0) {
LOG_I("lightOperate/invoke 匹配字段: productKey=%s, deviceName=%s\n", field1, field2);
// 将deviceName存储到全局变量中
memset(g_mqtt_deviceName, 0, sizeof(g_mqtt_deviceName));
strncpy(g_mqtt_deviceName, field2, sizeof(g_mqtt_deviceName) - 1);
} else {
LOG_I("lightOperate/invoke topic字段解析失败: %s\n", topicName);
}
}
// LOG_I("消息匹配topic[%d]: %s\n", i, subscribeTopics[i]);
if (message != NULL && message->payload != NULL && message->payloadlen > 0) {
char payload_buf[1024] = {0};
size_t copy_len = (size_t)message->payloadlen;
if (copy_len >= sizeof(payload_buf)) {
copy_len = sizeof(payload_buf) - 1;
}
memcpy(payload_buf, message->payload, copy_len);
payload_buf[copy_len] = '\0';
PutDataIntoMQueue(payload_buf);
} else {
// LOG_I("消息payload为空跳过入队\n");
}
topic_matched = 1;
break;
}
}
if(!topic_matched) {
LOG_I("消息topic不匹配任何订阅的topic: %s\n", topicName);
// LOG_I("消息topic不匹配任何订阅的topic: %s\n", topicName);
}
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
if (topicName != NULL) {
MQTTAsync_free(topicName);
}
return 1;
}
void mqtt_utils_connected(void *context, char *cause){
LOG_I("=== mqtt_utils_connected 函数被调用 ===\n");
// LOG_I("=== mqtt_utils_connected 函数被调用 ===\n");
if (cause != NULL) {
LOG_I("%s cause:%s\n",__func__, cause);
// LOG_I("%s cause:%s\n",__func__, cause);
} else {
LOG_I("%s\n",__func__);
// LOG_I("%s\n",__func__);
}
connect_failure_times=0;
LOG_I(" MQTT连接成功开始订阅%d个topic\n", subscribeTopicCount);
// LOG_I(" MQTT连接成功开始订阅%d个topic\n", subscribeTopicCount);
// 订阅所有设置的topic
for(int i = 0; i < subscribeTopicCount; i++) {
@ -205,29 +366,105 @@ void mqtt_utils_connected(void *context, char *cause){
LOG_I("订阅topic[%d]: %s\n", i, subscribeTopics[i]);
int subscribe_result = mqtt_utils_subscribe(mqtt_conf, subscribeTopics[i], 1);
LOG_I("订阅topic[%d]请求结果: %d\n", i, subscribe_result);
// LOG_I("订阅topic[%d]请求结果: %d\n", i, subscribe_result);
}
// 添加通配 符订阅来接收所有消息(用于调试)
LOG_I("=== 添加通配符订阅用于调试 ===\n");
// 添加通配符订阅来接收所有消息(用于调试)
// LOG_I("=== 添加通配符订阅用于调试 ===\n");
int wildcard_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "#", 1);
LOG_I("通配符订阅请求结果: %d\n", wildcard_subscribe_result);
LOG_I("=== 通配符订阅完成 ===\n");
// LOG_I("通配符订阅请求结果: %d\n", wildcard_subscribe_result);
// LOG_I("=== 通配符订阅完成 ===\n");
// 订阅公共 topic用于测试
LOG_I("=== 订阅公共topic用于测试 ===\n");
// 订阅公共topic用于测试
// LOG_I("=== 订阅公共topic用于测试 ===\n");
int public_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "/test/public/topic", 0);
LOG_I("公共topic订阅请求结果: %d\n", public_subscribe_result);
LOG_I("=== 公共topic订阅完成 ===\n");
// LOG_I("公共topic订阅请求结果: %d\n", public_subscribe_result);
// LOG_I("=== 公共topic订阅完成 ===\n");
system( "echo 1 > /sys/class/gpio/gpio113/value");//yellow ok
// 启动黄灯闪烁线程
pthread_mutex_lock(&yellow_led_mutex);
if (!yellow_led_flash_running) {
yellow_led_flash_running = true;
int ret = pthread_create(&pt_yellow_led_flash, NULL, thread_yellow_led_flash, NULL);
if (ret != 0) {
LOG_I("Failed to create yellow LED flash thread, using constant on\n");
yellow_led_flash_running = false;
system("echo 1 > /sys/class/gpio/gpio114/value");//yellow ok
} else {
pthread_detach(pt_yellow_led_flash);
LOG_I("Yellow LED flash thread started\n");
}
}
pthread_mutex_unlock(&yellow_led_mutex);
station_status_report();
LOG_I("=== mqtt_utils_connected 函数执行完成 ===\n");
// 设置MQTT连接标志
isMqttConnected = true;
// LOG_I("MQTT connected, isMqttConnected = true\n");
// 补报所有已在线的灯条登录信息
typedef struct {
uint32_t tagCode;
time_t lastHeartbeat;
bool isOnline;
} lightbar_heartbeat_t;
extern int lightbarHeartbeatCount;
extern lightbar_heartbeat_t lightbarHeartbeat[];
extern pthread_mutex_t heartbeatMutex;
extern void report_lightbar_login(uint32_t tagCode);
pthread_mutex_lock(&heartbeatMutex);
LOG_I("Reporting login for %d online lightbars again\n", lightbarHeartbeatCount);
for (int i = 0; i < lightbarHeartbeatCount; i++) {
if (lightbarHeartbeat[i].isOnline) {
LOG_I("Report again lightbar %08X login\n", lightbarHeartbeat[i].tagCode);
report_lightbar_login(lightbarHeartbeat[i].tagCode);
// 添加短暂延迟避免MQTT缓冲区溢出
usleep(100 * 1000); // 100ms延迟
}
}
pthread_mutex_unlock(&heartbeatMutex);
// 启动灭灯检测线程(只启动一次)
static bool light_off_thread_started = false;
if (!light_off_thread_started) {
extern pthread_t pt_light_off_check;
extern void *thread_light_off_check(void *arg);
int ret = pthread_create(&pt_light_off_check, NULL, thread_light_off_check, NULL);
if(ret != 0) {
LOG_I("pthread_create light_off_check fail\n");
} else {
LOG_I("pthread_create light_off_check success\n");
pthread_detach(pt_light_off_check);
light_off_thread_started = true;
}
}
// 启动心跳检测线程(只启动一次)
static bool heartbeat_thread_started = false;
if (!heartbeat_thread_started) {
extern pthread_t pt_heartbeat_check;
extern void *thread_heartbeat_check(void *arg);
int ret = pthread_create(&pt_heartbeat_check, NULL, thread_heartbeat_check, NULL);
if(ret != 0){
LOG_I("pthread_create heartbeat_check fail\n");
} else {
LOG_I("pthread_create heartbeat_check success\n");
pthread_detach(pt_heartbeat_check);
heartbeat_thread_started = true;
}
}
// LOG_I("=== mqtt_utils_connected 函数执行完成 ===\n");
}
void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){
//LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc));
//mqtt_led_net_status_judge();
// 设置MQTT连接标志为false
isMqttConnected = false;
// LOG_I("MQTT disconnected, set isMqttConnected = false\n");
}
void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){
@ -238,16 +475,16 @@ void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *respons
void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){
//LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none");
// mqtt_led_net_status_judge();
LOG_I("MQTT连接失败 - 错误代码: %s\n", response ? MQTTAsync_strerror(response->code) : "未知错误");
// LOG_I("MQTT连接失败 - 错误代码: %s\n", response ? MQTTAsync_strerror(response->code) : "未知错误");
if (response && response->message) {
LOG_I("MQTT连接失败 - 错误信息: %s\n", response->message);
// LOG_I("MQTT连接失败 - 错误信息: %s\n", response->message);
}
mqtt_net_failure(&connect_failure_times);
}
static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){
MQTTAsync client = (MQTTAsync)context;
LOG_I("%s ssl error: %s\n",__func__, str);
// LOG_I("%s ssl error: %s\n",__func__, str);
return 0;
}
@ -256,64 +493,64 @@ void mqtt_utils_on_publish_success(void *context, MQTTAsync_successData *respons
}
void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
// LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
// 如果是连 接相关的问题,记录详细信息
// 如果是连接相关的问题,记录详细信息
if (response && response->code == MQTTASYNC_OPERATION_INCOMPLETE) {
LOG_I("发布失败:操作在完成前被丢弃,可能是连接状态不稳定\n");
// LOG_I("发布失败:操作在完成前被丢弃,可能是连接状态不稳定\n");
} else if (response && response->code == MQTTASYNC_DISCONNECTED) {
LOG_I("发布失败:客户端已断开连接\n");
// LOG_I("发布失败:客户端已断开连接\n");
}
}
void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){
LOG_I("=== MQTT订阅成功回调被调用 ===\n");
LOG_I("MQTT订阅成功\n");
// LOG_I("=== MQTT订阅成功回调被调用 ===\n");
// LOG_I("MQTT订阅成功\n");
}
void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
// LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
}
void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){
LOG_I("%s\n",__func__);
// LOG_I("%s\n",__func__);
//mqtt_led_net_status_judge();
}
int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){
int rc = 0;
LOG_I("开始订阅topic: %s, qos: %d\n", topic, qos);
LOG_I("MQTT客户端指针: %p\n", mqtt_utils->client);
LOG_I("订阅选项指针: %p\n", &mqtt_utils->sub_opts);
// LOG_I("开始订阅topic: %s, qos: %d\n", topic, qos);
// LOG_I("MQTT客户端指针: %p\n", mqtt_utils->client);
// LOG_I("订阅选项指针: %p\n", &mqtt_utils->sub_opts);
// 检查参数有效性
if (topic == NULL || strlen(topic) == 0) {
LOG_I("错误topic参数无效\n");
// LOG_I("错误topic参数无效\n");
return -1;
}
if (mqtt_utils->client == NULL) {
LOG_I("错误MQTT客户端未初始化\n");
// LOG_I("错误MQTT客户端未初始化\n");
return -1;
}
// 跳过连接状态检查,直接进行 订阅
LOG_I("跳过连接状态检查,直接进行订阅\n");
LOG_I("准备调用 MQTTAsync_subscribe...\n");
// 跳过连接状态检查,直接进行订阅
// LOG_I("跳过连接状态检查,直接进行订阅\n");
// LOG_I("准备调用 MQTTAsync_subscribe...\n");
rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils ->sub_opts);
LOG_I("MQTTAsync_subscribe 调用完成,返回值: %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("订阅失败,错误代码: %s\n ", MQTTAsync_strerror(rc));
rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils->sub_opts);
// LOG_I("MQTTAsync_subscribe 调用完成,返回值: %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
// LOG_I("订阅失败,错误代码: %s\n", MQTTAsync_strerror(rc));
} else {
LOG_I("订阅请求已发送,等待回调\n");
// LOG_I("订阅请求已发送,等待回调\n");
}
return rc;
}
int mqtt_utils_publish(mqtt_utils_t *mqtt_utils, char *topic, int qos, const char *data, int datalen){
int mqtt_utils_publish(mqtt_utils_t *mqtt_utils, char *topic, int qos, const char *data, int datalen){
int rc;
int isConnected = 0;
@ -339,16 +576,16 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
int ret = -1;
int rc = 0;
LOG_I("=== 开始MQTT连接初始化 ===\n");
LOG_I("ClientId: %s\n", mqtt_conf->clientid);
LOG_I("Username: %s\n", mqtt_conf->username);
LOG_I("Password: %s\n", mqtt_conf->password);
LOG_I("Host: %s\n", mqtt_conf->host);
LOG_I("Port: %d\n", mqtt_conf->port);
// LOG_I("=== 开始MQTT连接初始化 ===\n");
// LOG_I("ClientId: %s\n", mqtt_conf->clientid);
// LOG_I("Username: %s\n", mqtt_conf->username);
// LOG_I("Password: %s\n", mqtt_conf->password);
// LOG_I("Host: %s\n", mqtt_conf->host);
// LOG_I("Port: %d\n", mqtt_conf->port);
// 提前设置topic名称确保在连接回调中可用
sprintf(nativeInvokTopicName,"/iot%s/thing/ota/upgrade",mqtt_conf->username);
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// 设置多个订阅topic
subscribeTopicCount = 0;
@ -357,12 +594,13 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/bind", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/group", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/iot%s/thing/ota/upgrade", mqtt_conf->username);
sprintf(subscribeTopics[subscribeTopicCount++], "/sys/WcSubLightStrip/AD1000014C11/thing/service/lightOperate/invoke");
// 只使用通配符订阅覆盖所有设备的lightOperate消息
sprintf(subscribeTopics[subscribeTopicCount++], "/sys/+/+/thing/service/lightOperate/invoke");
LOG_I("设置了%d个订阅topic:\n", subscribeTopicCount);
for(int i = 0; i < subscribeTopicCount; i++) {
LOG_I("Topic[%d]: %s\n", i, subscribeTopics[i]);
}
// LOG_I("设置了%d个订阅topic:\n", subscribeTopicCount);
// for(int i = 0; i < subscribeTopicCount; i++) {
// LOG_I("Topic[%d]: %s\n", i, subscribeTopics[i]);
// }
infos = MQTTAsync_getVersionInfo();
printVersionInfo(infos);
@ -371,7 +609,7 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
//snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port);
snprintf(url, sizeof(url), "tcp://%s:%d", mqtt_conf->host, mqtt_conf->port);
LOG_I("MQTT URL: %s\n", url);
// LOG_I("MQTT URL: %s\n", url);
create_opts.sendWhileDisconnected = 0;
rc = MQTTAsync_createWithOptions(&mqtt_conf->client,
@ -380,54 +618,54 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
MQTTCLIENT_PERSISTENCE_NONE,
NULL,
&create_opts);
LOG_I("MQTTAsync_create rc = %d\n", rc);
// LOG_I("MQTTAsync_create rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("MQTT客户端创建失败: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
LOG_I("MQTTAsync_create succes s\n");
// LOG_I("MQTTAsync_create success\n");
// 设置所有必要的回调函数
LOG_I("设置MQTT回调函数\n");
// LOG_I("设置MQTT回调函数\n");
rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected);
LOG_I("MQTTAsync_setConnected rc = %d\n", rc);
// LOG_I("MQTTAsync_setConnected rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置连接回调失败: %s\n", MQTTAsync_strerror(rc));
// LOG_I("设置连接回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setConnected success\n");
// LOG_I("MQTTAsync_setConnected success\n");
rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected);
LOG_I("MQTTAsy nc_setDisconnected rc = %d\n", rc);
// LOG_I("MQTTAsync_setDisconnected rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置断开回调失败: %s\n", MQTTAsync_strerror(rc));
// LOG_I("设置断开回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setDisconnected success\n");
// LOG_I("MQTTAsync_setDisconnected success\n");
// 设置消息到达回调
rc = MQTTAsync_setMessageArrivedCallback(mqtt_conf->client, NULL, mqtt_utils_message_arrived);
LOG_I("MQTTAsync_setMessageArrivedCallback rc = %d\n", rc);
// LOG_I("MQTTAsync_setMessageArrivedCallback rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置消息到达回调失败: %s\n", MQTTAsync_strerror(rc));
// LOG_I("设置消息到达回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setMessageArrivedCallback success\n");
// LOG_I("MQTTAsync_setMessageArrivedCallback success\n");
// 设置连接丢失回调
rc = MQTTAsync_setConnectionLostCallback(mqtt_conf->client, NULL, mqtt_utils_connection_lost);
LOG_I("MQTTAsync_setConnectionLostCallback rc = %d\n", rc);
// LOG_I("MQTTAsync_setConnectionLostCallback rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("设置连接丢失回调失败: %s\n", MQTTAsync_strerror(rc));
// LOG_I("设置连接丢失回调失败: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
LOG_I("MQTTAsync_setConnectionLostCallback success\n");
// LOG_I("MQTTAsync_setConnectionLostCallback success\n");
/* connect option */
conn_opts.onSuccess = mqtt_utils_on_connect_success;
@ -469,15 +707,15 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
conn_opts.ssl = &ssl_opts;
}
LOG_I("正在连接MQTT服务器...\n");
// LOG_I("正在连接MQTT服务器...\n");
rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts);
LOG_I("MQTTAsync_connect rc = %d\n", rc);
// LOG_I("MQTTAsync_connect rc = %d\n", rc);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("MQTT连接启动失败: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
LOG_I("MQTTAsync_connect 启动成功\n");
// LOG_I("MQTTAsync_connect 启动成功\n");
mqtt_conf->sub_opts = sub_opts;
mqtt_conf->pub_opts = pub_opts;
@ -494,12 +732,12 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
// 验证回调函数设置
LOG_I("=== 验证回调函数设置 ===\n");
LOG_I("消息到达回调函数地址: %p\n", mqtt_utils_message_arrived);
LOG_I("连接成功回调函数地址: %p\n", mqtt_utils_connected);
LOG_I("订阅成功回调函数地址: %p\n", mqtt_utils_on_subscribe_success);
LOG_I("订阅失败回调函数地址: %p\n", mqtt_utils_on_subscribe_failure);
LOG_I("=== 回调函数验证完成 ===\n");
// LOG_I("=== 验证回调函数设置 ===\n");
// LOG_I("消息到达回调函数地址: %p\n", mqtt_utils_message_arrived);
// LOG_I("连接成功回调函数地址: %p\n", mqtt_utils_connected);
// LOG_I("订阅成功回调函数地址: %p\n", mqtt_utils_on_subscribe_success);
// LOG_I("订阅失败回调函数地址: %p\n", mqtt_utils_on_subscribe_failure);
// LOG_I("=== 回调函数验证完成 ===\n");
ret = 0;
error:

View File

@ -67,6 +67,7 @@ typedef struct
extern mqtt_utils_t mqtt_config;
extern void PutDataIntoMQueue(char *payload);
extern char g_mqtt_deviceName[256]; // 从topic解析出的deviceName
typedef struct{
char msg_messageId[32];

View File

@ -3423,55 +3423,59 @@ int uart_data_receive_data_back(uart_utils_t *uart,uint16_t *parmAck,uint16_t *t
uint16_t *signCode,uint16_t *reserve,uint32_t *lableParm){
int ret = 0;
jt_data_back_package_t jt_data_back_package;
ret = uart_read_until_time(uart->uart_fd,(char *)&jt_data_back_package,sizeof(jt_data_back_package_t), 1000, 50);
if(ret == sizeof(jt_data_back_package_t)){
//LOG_I("%s success\n", __func__);
if((jt_data_back_package.pre1 == 0x2424)
&&(jt_data_back_package.pre2 == 0x2424)
&&(jt_data_back_package.pre3 == 0x2424)){
uint8_t version_1=(ntohs(jt_data_back_package.version)>>8)&0xFF;
uint8_t version_2=(ntohs(jt_data_back_package.version))&0xFF;
uint8_t signCode_1=(ntohs(jt_data_back_package.signCode)>>8)&0xFF;
uint8_t signCode_2=(ntohs(jt_data_back_package.signCode))&0xFF;
uint8_t reserve_1=(ntohs(jt_data_back_package.reserve)>>8)&0xFF;
uint8_t reserve_2=(ntohs(jt_data_back_package.reserve))&0xFF;
uint8_t lableParm_1=(ntohl(jt_data_back_package.lableParm)>>24)&0xFF;
uint8_t lableParm_2=(ntohl(jt_data_back_package.lableParm)>>16)&0xFF;
uint8_t lableParm_3=(ntohl(jt_data_back_package.lableParm)>>8)&0xFF;
uint8_t lableParm_4=(ntohl(jt_data_back_package.lableParm))&0xFF;
uint8_t test1[15] = {jt_data_back_package.len,jt_data_back_package.featureCode,
jt_data_back_package.count,jt_data_back_package.battery,version_1,version_2,
jt_data_back_package.ledCtrl,signCode_1,signCode_2,reserve_1,reserve_2,
lableParm_1,lableParm_2,lableParm_3,lableParm_4};
uint16_t crc_c=CRC16_XMODEM(test1,15);
//for(int i=0;i<15;i++){
// printf("%02x ",test1[i]);
//}
//LOG_I("%s:XModem_CRC16 = %04x\r\n",__func__,crc_c);
//LOG_I("CRC = %04x\r\n",ntohs(jt_data_back_package.crc));
*parmAck=ntohs(jt_data_back_package.pre4);
*tagCodeHead=ntohs(jt_data_back_package.tagHead);
*tagCode=ntohl(jt_data_back_package.tag);
*tagSignal=jt_data_back_package.signal;
*totalLen=jt_data_back_package.len;
*tagFeature=jt_data_back_package.featureCode;
*count=jt_data_back_package.count;
*batteryV=jt_data_back_package.battery;
*version=ntohs(jt_data_back_package.version);
*ledCtrl=jt_data_back_package.ledCtrl;
*signCode=ntohs(jt_data_back_package.signCode);
*reserve=ntohs(jt_data_back_package.reserve);
*lableParm=ntohl(jt_data_back_package.lableParm);
}else{
// LOG_I("%s failed\n", __func__);
ret = -2;
}
}else{
//LOG_I("%s, failed, time out\n", __func__);
ret = -1;
uint8_t *buf = (uint8_t *)&jt_data_back_package;
int pkg_size = sizeof(jt_data_back_package_t);
ret = uart_read_until_time(uart->uart_fd,(char *)buf, pkg_size, 1000, 50);
if(ret != pkg_size){
return -1;
}
return ret;
// 检查包头
if(!((jt_data_back_package.pre1 == 0x2424)
&&(jt_data_back_package.pre2 == 0x2424)
&&(jt_data_back_package.pre3 == 0x2424))){
// 帧错位,尝试在已读数据中找包头 0x24 0x24
int sync_offset = -1;
for(int i = 1; i < pkg_size - 1; i++){
if(buf[i] == 0x24 && buf[i+1] == 0x24){
sync_offset = i;
break;
}
}
if(sync_offset > 0){
// 找到包头,左移数据并补读剩余字节
int remain = pkg_size - sync_offset;
memmove(buf, buf + sync_offset, remain);
int need = sync_offset;
int got = uart_read_until_time(uart->uart_fd, (char *)(buf + remain), need, 500, 50);
if(got != need){
return -2;
}
// 重新校验包头
if(!((jt_data_back_package.pre1 == 0x2424)
&&(jt_data_back_package.pre2 == 0x2424)
&&(jt_data_back_package.pre3 == 0x2424))){
return -2;
}
}else{
return -2;
}
}
// 解析数据
*parmAck=ntohs(jt_data_back_package.pre4);
*tagCodeHead=ntohs(jt_data_back_package.tagHead);
*tagCode=ntohl(jt_data_back_package.tag);
*tagSignal=jt_data_back_package.signal;
*totalLen=jt_data_back_package.len;
*tagFeature=jt_data_back_package.featureCode;
*count=jt_data_back_package.count;
*batteryV=jt_data_back_package.battery;
*version=ntohs(jt_data_back_package.version);
*ledCtrl=jt_data_back_package.ledCtrl;
*signCode=ntohs(jt_data_back_package.signCode);
*reserve=ntohs(jt_data_back_package.reserve);
*lableParm=ntohl(jt_data_back_package.lableParm);
return pkg_size;
}

View File

@ -1414,52 +1414,51 @@ typedef struct
typedef struct
{
uint8_t len;
uint16_t customCode;
uint8_t type;
uint8_t bindStart;
uint8_t bindEnd;
uint32_t idStart;
uint32_t idEnd;
jt_led_or_group_package_t ledCtrl;
uint16_t reserve;
uint32_t crc;
uint8_t len;
uint16_t customCode;
uint8_t type;
uint8_t bindStart;
uint8_t bindEnd;
uint32_t idStart;
uint32_t idEnd;
jt_led_or_group_package_t ledCtrl;
uint16_t reserve;
uint32_t crc;
}__attribute__((packed)) jt_id_package_t;
typedef struct
{
uint16_t pre1;//$$ 0x2424
uint16_t pre2;//$$ 0x2424
uint16_t pre3;//$$ 0x2424 CC 0x4343 DD 0x4444
uint16_t ack;//## 0x2323 complete TT 0x5454 timeout RR 0x5252 AP-reset EE 0x4545 AP-crc-error FF 0x4646 AP broadcast end
uint16_t pre1;//$$ 0x2424
uint16_t pre2;//$$ 0x2424
uint16_t pre3;//$$ 0x2424 CC 0x4343 DD 0x4444
uint16_t ack;//## 0x2323 complete TT 0x5454 timeout RR 0x5252 AP-reset EE 0x4545 AP-crc-error FF 0x4646 AP broadcast end
}__attribute__((packed)) jt_receive_package_t;
typedef struct
{
uint16_t v1;
uint16_t v2;
uint16_t v1;
uint16_t v2;
}__attribute__((packed)) jt_receive_version_package_t;
typedef struct
{
uint16_t pre1;
uint16_t pre2;
uint16_t pre3;
uint16_t pre4;
uint16_t tagHead;
uint32_t tag;
uint8_t signal;
uint8_t len;
uint8_t featureCode;
uint8_t count;
uint8_t battery;
uint16_t version;
uint8_t ledCtrl;
uint16_t signCode;
uint16_t reserve;
uint32_t lableParm;
uint16_t crc;
uint16_t pre1;
uint16_t pre2;
uint16_t pre3;
uint16_t pre4;
uint16_t tagHead;
uint32_t tag;
uint8_t signal;
uint8_t len;
uint8_t featureCode;
uint8_t count;
uint8_t battery;
uint16_t version;
uint8_t ledCtrl;
uint16_t signCode;
uint16_t reserve;
uint32_t lableParm;
uint16_t crc;
}__attribute__((packed)) jt_data_back_package_t;
uint32_t CRC32_DIRECT(uint8_t *data, uint32_t length);
uint16_t CRC16_XMODEM(uint8_t *puchMsg, uint32_t usDataLen);
@ -1468,10 +1467,10 @@ int uart_data_send_head(uart_utils_t *uart,uint8_t func,uint8_t wakeup_time,uint
int uart_data_send_lighton_or_group(uart_utils_t *uart,jt_tag_package_t tags[],uint8_t tag_num);
int uart_data_send_lighton_by_group(uart_utils_t *uart,jt_group_package_t groups[],uint8_t tag_num);
int uart_data_send_lighton_by_id(uart_utils_t *uart,uint8_t bind_start,uint8_t bind_end,
uint32_t id_start,uint32_t id_end,jt_led_or_group_package_t led_ctrl,uint16_t reserve);
uint32_t id_start,uint32_t id_end,jt_led_or_group_package_t led_ctrl,uint16_t reserve);
int uart_data_receive_data_back(uart_utils_t *uart,uint16_t *parmAck,uint16_t *tagCodeHead,uint32_t *tagCode,uint8_t *tagSignal,
uint8_t *totalLen,uint8_t *tagFeature,uint8_t *count,uint8_t *batteryV,uint16_t *version,uint8_t *ledCtrl,
uint16_t *signCode,uint16_t *reserve,uint32_t *lableParm);
uint8_t *totalLen,uint8_t *tagFeature,uint8_t *count,uint8_t *batteryV,uint16_t *version,uint8_t *ledCtrl,
uint16_t *signCode,uint16_t *reserve,uint32_t *lableParm);
int uart_data_receive_ack(uart_utils_t *uart,uint16_t *parm_ack);
int uart_data_receive_version(uart_utils_t *uart);
#endif