Compare commits
7 Commits
e74e13e372
...
fc95e3561a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc95e3561a | ||
|
|
f910a4d701 | ||
|
|
67e579ac27 | ||
|
|
30700b9178 | ||
|
|
72a94babe5 | ||
|
|
9ccab6c22f | ||
|
|
16b46d1b00 |
BIN
hiredis/alloc.o
Normal file
BIN
hiredis/alloc.o
Normal file
Binary file not shown.
BIN
hiredis/async.o
Normal file
BIN
hiredis/async.o
Normal file
Binary file not shown.
BIN
hiredis/hiredis-test
Executable file
BIN
hiredis/hiredis-test
Executable file
Binary file not shown.
BIN
hiredis/hiredis.o
Normal file
BIN
hiredis/hiredis.o
Normal file
Binary file not shown.
11
hiredis/hiredis.pc
Normal file
11
hiredis/hiredis.pc
Normal file
@ -0,0 +1,11 @@
|
||||
prefix=/usr/local
|
||||
exec_prefix=${prefix}
|
||||
libdir=/usr/local/lib
|
||||
includedir=/usr/local/include
|
||||
pkgincludedir=/usr/local/include/hiredis
|
||||
|
||||
Name: hiredis
|
||||
Description: Minimalistic C client library for Redis.
|
||||
Version: 1.3.0
|
||||
Libs: -L${libdir} -lhiredis
|
||||
Cflags: -I${pkgincludedir} -I${includedir} -D_FILE_OFFSET_BITS=64
|
||||
BIN
hiredis/libhiredis.a
Normal file
BIN
hiredis/libhiredis.a
Normal file
Binary file not shown.
BIN
hiredis/libhiredis.so
Executable file
BIN
hiredis/libhiredis.so
Executable file
Binary file not shown.
BIN
hiredis/net.o
Normal file
BIN
hiredis/net.o
Normal file
Binary file not shown.
BIN
hiredis/read.o
Normal file
BIN
hiredis/read.o
Normal file
Binary file not shown.
BIN
hiredis/sds.o
Normal file
BIN
hiredis/sds.o
Normal file
Binary file not shown.
BIN
hiredis/sockcompat.o
Normal file
BIN
hiredis/sockcompat.o
Normal file
Binary file not shown.
BIN
hiredis/test.o
Normal file
BIN
hiredis/test.o
Normal file
Binary file not shown.
@ -2722,8 +2722,8 @@ static void setRetryLoopInterval(int keepalive)
|
||||
|
||||
if (proposed < 1)
|
||||
proposed = 1;
|
||||
else if (proposed > 5)
|
||||
proposed = 5;
|
||||
else if (proposed > 60)
|
||||
proposed = 60; // 提高上限到60秒,适应更长的keepalive
|
||||
if (proposed < retryLoopInterval)
|
||||
retryLoopInterval = proposed;
|
||||
}
|
||||
|
||||
@ -1323,8 +1323,8 @@ static void setRetryLoopInterval(int keepalive)
|
||||
|
||||
if (proposed < 1)
|
||||
proposed = 1;
|
||||
else if (proposed > 5)
|
||||
proposed = 5;
|
||||
else if (proposed > 60)
|
||||
proposed = 60; // 提高上限到60秒,适应更长的keepalive
|
||||
if (proposed < retryLoopInterval)
|
||||
retryLoopInterval = proposed;
|
||||
}
|
||||
|
||||
@ -633,7 +633,7 @@ void MQTTProtocol_keepalive(time_t now)
|
||||
}
|
||||
else
|
||||
{
|
||||
//LOG_I("PINGREQ send success\n");
|
||||
LOG_I("PINGREQ send success at %ld\n", (long)now);
|
||||
client->net.lastSent = now;
|
||||
client->ping_outstanding = 1;
|
||||
}
|
||||
@ -641,9 +641,25 @@ void MQTTProtocol_keepalive(time_t now)
|
||||
}
|
||||
else
|
||||
{
|
||||
Log(TRACE_PROTOCOL, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
|
||||
LOG_I("%s:PINGRESP not received\n",__func__);
|
||||
MQTTProtocol_closeSession(client, 1);
|
||||
// 增加容错:允许ping_outstanding为2时再等待一个周期
|
||||
if (client->ping_outstanding >= 2) {
|
||||
Log(TRACE_PROTOCOL, -1, "PINGRESP not received after 2 keepalive intervals for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
|
||||
LOG_I("%s:PINGRESP not received after 2 attempts\n",__func__);
|
||||
MQTTProtocol_closeSession(client, 1);
|
||||
} else {
|
||||
// 第二次发送PINGREQ
|
||||
if (Socket_noPendingWrites(client->net.socket)) {
|
||||
if (MQTTPacket_send_pingreq(&client->net, client->clientID) != TCPSOCKET_COMPLETE) {
|
||||
Log(TRACE_PROTOCOL, -1, "Error sending second PINGREQ for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
|
||||
LOG_I("%s:Error sending second PINGREQ for client %s on socket %d, disconnecting\n",__func__,client->clientID, client->net.socket);
|
||||
MQTTProtocol_closeSession(client, 1);
|
||||
} else {
|
||||
LOG_I("Second PINGREQ send success at %ld\n", (long)now);
|
||||
client->net.lastSent = now;
|
||||
client->ping_outstanding++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -178,7 +178,7 @@ int MQTTProtocol_handlePingresps(void* pack, int sock)
|
||||
client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
|
||||
Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID);
|
||||
client->ping_outstanding = 0;
|
||||
//LOG_I("PINGRESP receive success\n");
|
||||
LOG_I("PINGRESP received successfully, ping_outstanding cleared\n");
|
||||
FUNC_EXIT_RC(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -25,6 +25,9 @@
|
||||
#include <sys/types.h>
|
||||
#include <sys/ipc.h>
|
||||
#include <sys/msg.h>
|
||||
#include <pthread.h>
|
||||
#include <stdbool.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
#include "mqtt_utils.h"
|
||||
|
||||
@ -49,6 +52,7 @@ char nativeUpgradeTopicName[1024] = "";
|
||||
char subscribeTopics[10][1024] = {""};
|
||||
int subscribeTopicCount = 0;
|
||||
extern char softwareVersion[16];
|
||||
extern bool isMqttConnected;
|
||||
typedef struct{
|
||||
enum MQTTASYNC_TRACE_LEVELS level;
|
||||
char name[64];
|
||||
@ -119,8 +123,56 @@ json_error:
|
||||
json_object_put(root);
|
||||
}
|
||||
|
||||
// 黄灯闪烁相关
|
||||
static pthread_t pt_yellow_led_flash;
|
||||
static volatile bool yellow_led_flash_running = false;
|
||||
static pthread_mutex_t yellow_led_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
void *thread_yellow_led_flash(void *arg);
|
||||
|
||||
void *thread_yellow_led_flash(void *arg) {
|
||||
LOG_I("Yellow LED flash thread started\n");
|
||||
|
||||
// 初始化随机种子
|
||||
srand((unsigned int)time(NULL));
|
||||
|
||||
while (yellow_led_flash_running) {
|
||||
// 生成随机延时 (100ms - 500ms)
|
||||
int delay_us = 100000 + (rand() % 400000); // 100000-500000 微秒
|
||||
|
||||
// 点亮黄灯
|
||||
system("echo 1 > /sys/class/gpio/gpio114/value");
|
||||
|
||||
// 随机延时保持亮
|
||||
usleep(delay_us);
|
||||
|
||||
// 检查是否需要退出
|
||||
if (!yellow_led_flash_running) break;
|
||||
|
||||
// 熄灭黄灯
|
||||
system("echo 0 > /sys/class/gpio/gpio114/value");
|
||||
|
||||
// 随机延时保持灭
|
||||
usleep(delay_us);
|
||||
}
|
||||
|
||||
// 确保退出时灯是灭的
|
||||
system("echo 0 > /sys/class/gpio/gpio114/value");
|
||||
LOG_I("Yellow LED flash thread exited\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void mqtt_net_failure(int *failure_times){
|
||||
system("echo 0 > /sys/class/gpio/gpio113/value");
|
||||
// 停止黄灯闪烁
|
||||
pthread_mutex_lock(&yellow_led_mutex);
|
||||
if (yellow_led_flash_running) {
|
||||
yellow_led_flash_running = false;
|
||||
pthread_join(pt_yellow_led_flash, NULL);
|
||||
LOG_I("Yellow LED flash thread stopped\n");
|
||||
}
|
||||
pthread_mutex_unlock(&yellow_led_mutex);
|
||||
|
||||
system("echo 0 > /sys/class/gpio/gpio114/value");
|
||||
if(failure_times != NULL){
|
||||
(*failure_times)++;
|
||||
LOG_I("mqtt net failure_times = %d\n", *failure_times);
|
||||
@ -237,8 +289,8 @@ static int mqtt_utils_parse_sys_lightOperate_invoke_topic(const char *topic, cha
|
||||
}
|
||||
|
||||
int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message){
|
||||
LOG_I("---------------%s------------------\n",__func__);
|
||||
LOG_I("收到消息,topic: %s\n", topicName ? topicName : "(null)");
|
||||
// LOG_I("---------------%s------------------\n",__func__);
|
||||
// LOG_I("收到消息,topic: %s\n", topicName ? topicName : "(null)");
|
||||
|
||||
// 检查消息是否来自任何一个订阅的topic
|
||||
int topic_matched = 0;
|
||||
@ -265,7 +317,7 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
|
||||
}
|
||||
}
|
||||
|
||||
LOG_I("消息匹配topic[%d]: %s\n", i, subscribeTopics[i]);
|
||||
// LOG_I("消息匹配topic[%d]: %s\n", i, subscribeTopics[i]);
|
||||
if (message != NULL && message->payload != NULL && message->payloadlen > 0) {
|
||||
char payload_buf[1024] = {0};
|
||||
size_t copy_len = (size_t)message->payloadlen;
|
||||
@ -276,7 +328,7 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
|
||||
payload_buf[copy_len] = '\0';
|
||||
PutDataIntoMQueue(payload_buf);
|
||||
} else {
|
||||
LOG_I("消息payload为空,跳过入队\n");
|
||||
// LOG_I("消息payload为空,跳过入队\n");
|
||||
}
|
||||
topic_matched = 1;
|
||||
break;
|
||||
@ -284,7 +336,7 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
|
||||
}
|
||||
|
||||
if(!topic_matched) {
|
||||
LOG_I("消息topic不匹配任何订阅的topic: %s\n", topicName);
|
||||
// LOG_I("消息topic不匹配任何订阅的topic: %s\n", topicName);
|
||||
}
|
||||
|
||||
MQTTAsync_freeMessage(&message);
|
||||
@ -295,15 +347,15 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT
|
||||
}
|
||||
|
||||
void mqtt_utils_connected(void *context, char *cause){
|
||||
LOG_I("=== mqtt_utils_connected 函数被调用 ===\n");
|
||||
// LOG_I("=== mqtt_utils_connected 函数被调用 ===\n");
|
||||
if (cause != NULL) {
|
||||
LOG_I("%s cause:%s\n",__func__, cause);
|
||||
// LOG_I("%s cause:%s\n",__func__, cause);
|
||||
} else {
|
||||
LOG_I("%s\n",__func__);
|
||||
// LOG_I("%s\n",__func__);
|
||||
}
|
||||
connect_failure_times=0;
|
||||
|
||||
LOG_I(" MQTT连接成功,开始订阅%d个topic\n", subscribeTopicCount);
|
||||
// LOG_I(" MQTT连接成功,开始订阅%d个topic\n", subscribeTopicCount);
|
||||
|
||||
// 订阅所有设置的topic
|
||||
for(int i = 0; i < subscribeTopicCount; i++) {
|
||||
@ -314,29 +366,105 @@ void mqtt_utils_connected(void *context, char *cause){
|
||||
|
||||
LOG_I("订阅topic[%d]: %s\n", i, subscribeTopics[i]);
|
||||
int subscribe_result = mqtt_utils_subscribe(mqtt_conf, subscribeTopics[i], 1);
|
||||
LOG_I("订阅topic[%d]请求结果: %d\n", i, subscribe_result);
|
||||
// LOG_I("订阅topic[%d]请求结果: %d\n", i, subscribe_result);
|
||||
}
|
||||
|
||||
// 添加通配符订阅来接收所有消息(用于调试)
|
||||
LOG_I("=== 添加通配符订阅用于调试 ===\n");
|
||||
// LOG_I("=== 添加通配符订阅用于调试 ===\n");
|
||||
int wildcard_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "#", 1);
|
||||
LOG_I("通配符订阅请求结果: %d\n", wildcard_subscribe_result);
|
||||
LOG_I("=== 通配符订阅完成 ===\n");
|
||||
// LOG_I("通配符订阅请求结果: %d\n", wildcard_subscribe_result);
|
||||
// LOG_I("=== 通配符订阅完成 ===\n");
|
||||
|
||||
// 订阅公共topic用于测试
|
||||
LOG_I("=== 订阅公共topic用于测试 ===\n");
|
||||
// LOG_I("=== 订阅公共topic用于测试 ===\n");
|
||||
int public_subscribe_result = mqtt_utils_subscribe(mqtt_conf, "/test/public/topic", 0);
|
||||
LOG_I("公共topic订阅请求结果: %d\n", public_subscribe_result);
|
||||
LOG_I("=== 公共topic订阅完成 ===\n");
|
||||
// LOG_I("公共topic订阅请求结果: %d\n", public_subscribe_result);
|
||||
// LOG_I("=== 公共topic订阅完成 ===\n");
|
||||
|
||||
system("echo 1 > /sys/class/gpio/gpio113/value");//yellow ok
|
||||
// 启动黄灯闪烁线程
|
||||
pthread_mutex_lock(&yellow_led_mutex);
|
||||
if (!yellow_led_flash_running) {
|
||||
yellow_led_flash_running = true;
|
||||
int ret = pthread_create(&pt_yellow_led_flash, NULL, thread_yellow_led_flash, NULL);
|
||||
if (ret != 0) {
|
||||
LOG_I("Failed to create yellow LED flash thread, using constant on\n");
|
||||
yellow_led_flash_running = false;
|
||||
system("echo 1 > /sys/class/gpio/gpio114/value");//yellow ok
|
||||
} else {
|
||||
pthread_detach(pt_yellow_led_flash);
|
||||
LOG_I("Yellow LED flash thread started\n");
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&yellow_led_mutex);
|
||||
station_status_report();
|
||||
LOG_I("=== mqtt_utils_connected 函数执行完成 ===\n");
|
||||
|
||||
// 设置MQTT连接标志
|
||||
isMqttConnected = true;
|
||||
// LOG_I("MQTT connected, isMqttConnected = true\n");
|
||||
|
||||
// 补报所有已在线的灯条登录信息
|
||||
typedef struct {
|
||||
uint32_t tagCode;
|
||||
time_t lastHeartbeat;
|
||||
bool isOnline;
|
||||
} lightbar_heartbeat_t;
|
||||
extern int lightbarHeartbeatCount;
|
||||
extern lightbar_heartbeat_t lightbarHeartbeat[];
|
||||
extern pthread_mutex_t heartbeatMutex;
|
||||
extern void report_lightbar_login(uint32_t tagCode);
|
||||
|
||||
pthread_mutex_lock(&heartbeatMutex);
|
||||
LOG_I("Reporting login for %d online lightbars again\n", lightbarHeartbeatCount);
|
||||
for (int i = 0; i < lightbarHeartbeatCount; i++) {
|
||||
if (lightbarHeartbeat[i].isOnline) {
|
||||
LOG_I("Report again lightbar %08X login\n", lightbarHeartbeat[i].tagCode);
|
||||
report_lightbar_login(lightbarHeartbeat[i].tagCode);
|
||||
// 添加短暂延迟,避免MQTT缓冲区溢出
|
||||
usleep(100 * 1000); // 100ms延迟
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&heartbeatMutex);
|
||||
|
||||
// 启动灭灯检测线程(只启动一次)
|
||||
static bool light_off_thread_started = false;
|
||||
if (!light_off_thread_started) {
|
||||
extern pthread_t pt_light_off_check;
|
||||
extern void *thread_light_off_check(void *arg);
|
||||
int ret = pthread_create(&pt_light_off_check, NULL, thread_light_off_check, NULL);
|
||||
if(ret != 0) {
|
||||
LOG_I("pthread_create light_off_check fail\n");
|
||||
} else {
|
||||
LOG_I("pthread_create light_off_check success\n");
|
||||
pthread_detach(pt_light_off_check);
|
||||
light_off_thread_started = true;
|
||||
}
|
||||
}
|
||||
|
||||
// 启动心跳检测线程(只启动一次)
|
||||
static bool heartbeat_thread_started = false;
|
||||
if (!heartbeat_thread_started) {
|
||||
extern pthread_t pt_heartbeat_check;
|
||||
extern void *thread_heartbeat_check(void *arg);
|
||||
int ret = pthread_create(&pt_heartbeat_check, NULL, thread_heartbeat_check, NULL);
|
||||
if(ret != 0){
|
||||
LOG_I("pthread_create heartbeat_check fail\n");
|
||||
} else {
|
||||
LOG_I("pthread_create heartbeat_check success\n");
|
||||
pthread_detach(pt_heartbeat_check);
|
||||
heartbeat_thread_started = true;
|
||||
}
|
||||
}
|
||||
|
||||
// LOG_I("=== mqtt_utils_connected 函数执行完成 ===\n");
|
||||
}
|
||||
|
||||
void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){
|
||||
//LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc));
|
||||
//mqtt_led_net_status_judge();
|
||||
|
||||
// 设置MQTT连接标志为false
|
||||
isMqttConnected = false;
|
||||
// LOG_I("MQTT disconnected, set isMqttConnected = false\n");
|
||||
}
|
||||
|
||||
void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){
|
||||
@ -347,16 +475,16 @@ void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *respons
|
||||
void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){
|
||||
//LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none");
|
||||
// mqtt_led_net_status_judge();
|
||||
LOG_I("MQTT连接失败 - 错误代码: %s\n", response ? MQTTAsync_strerror(response->code) : "未知错误");
|
||||
// LOG_I("MQTT连接失败 - 错误代码: %s\n", response ? MQTTAsync_strerror(response->code) : "未知错误");
|
||||
if (response && response->message) {
|
||||
LOG_I("MQTT连接失败 - 错误信息: %s\n", response->message);
|
||||
// LOG_I("MQTT连接失败 - 错误信息: %s\n", response->message);
|
||||
}
|
||||
mqtt_net_failure(&connect_failure_times);
|
||||
}
|
||||
|
||||
static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){
|
||||
MQTTAsync client = (MQTTAsync)context;
|
||||
LOG_I("%s ssl error: %s\n",__func__, str);
|
||||
// LOG_I("%s ssl error: %s\n",__func__, str);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -365,59 +493,59 @@ void mqtt_utils_on_publish_success(void *context, MQTTAsync_successData *respons
|
||||
}
|
||||
|
||||
void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){
|
||||
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
|
||||
// LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
|
||||
|
||||
|
||||
// 如果是连接相关的问题,记录详细信息
|
||||
if (response && response->code == MQTTASYNC_OPERATION_INCOMPLETE) {
|
||||
LOG_I("发布失败:操作在完成前被丢弃,可能是连接状态不稳定\n");
|
||||
// LOG_I("发布失败:操作在完成前被丢弃,可能是连接状态不稳定\n");
|
||||
} else if (response && response->code == MQTTASYNC_DISCONNECTED) {
|
||||
LOG_I("发布失败:客户端已断开连接\n");
|
||||
// LOG_I("发布失败:客户端已断开连接\n");
|
||||
}
|
||||
}
|
||||
|
||||
void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){
|
||||
LOG_I("=== MQTT订阅成功回调被调用 ===\n");
|
||||
LOG_I("MQTT订阅成功\n");
|
||||
// LOG_I("=== MQTT订阅成功回调被调用 ===\n");
|
||||
// LOG_I("MQTT订阅成功\n");
|
||||
}
|
||||
|
||||
void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){
|
||||
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
|
||||
// LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
|
||||
}
|
||||
|
||||
void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){
|
||||
LOG_I("%s\n",__func__);
|
||||
// LOG_I("%s\n",__func__);
|
||||
//mqtt_led_net_status_judge();
|
||||
}
|
||||
|
||||
int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){
|
||||
int rc = 0;
|
||||
|
||||
LOG_I("开始订阅topic: %s, qos: %d\n", topic, qos);
|
||||
LOG_I("MQTT客户端指针: %p\n", mqtt_utils->client);
|
||||
LOG_I("订阅选项指针: %p\n", &mqtt_utils->sub_opts);
|
||||
// LOG_I("开始订阅topic: %s, qos: %d\n", topic, qos);
|
||||
// LOG_I("MQTT客户端指针: %p\n", mqtt_utils->client);
|
||||
// LOG_I("订阅选项指针: %p\n", &mqtt_utils->sub_opts);
|
||||
|
||||
// 检查参数有效性
|
||||
if (topic == NULL || strlen(topic) == 0) {
|
||||
LOG_I("错误:topic参数无效\n");
|
||||
// LOG_I("错误:topic参数无效\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mqtt_utils->client == NULL) {
|
||||
LOG_I("错误:MQTT客户端未初始化\n");
|
||||
// LOG_I("错误:MQTT客户端未初始化\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
// 跳过连接状态检查,直接进行订阅
|
||||
LOG_I("跳过连接状态检查,直接进行订阅\n");
|
||||
LOG_I("准备调用 MQTTAsync_subscribe...\n");
|
||||
// LOG_I("跳过连接状态检查,直接进行订阅\n");
|
||||
// LOG_I("准备调用 MQTTAsync_subscribe...\n");
|
||||
|
||||
rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils->sub_opts);
|
||||
LOG_I("MQTTAsync_subscribe 调用完成,返回值: %d\n", rc);
|
||||
// LOG_I("MQTTAsync_subscribe 调用完成,返回值: %d\n", rc);
|
||||
if (rc != MQTTASYNC_SUCCESS){
|
||||
LOG_I("订阅失败,错误代码: %s\n", MQTTAsync_strerror(rc));
|
||||
// LOG_I("订阅失败,错误代码: %s\n", MQTTAsync_strerror(rc));
|
||||
} else {
|
||||
LOG_I("订阅请求已发送,等待回调\n");
|
||||
// LOG_I("订阅请求已发送,等待回调\n");
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
@ -448,16 +576,16 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
|
||||
int ret = -1;
|
||||
int rc = 0;
|
||||
|
||||
LOG_I("=== 开始MQTT连接初始化 ===\n");
|
||||
LOG_I("ClientId: %s\n", mqtt_conf->clientid);
|
||||
LOG_I("Username: %s\n", mqtt_conf->username);
|
||||
LOG_I("Password: %s\n", mqtt_conf->password);
|
||||
LOG_I("Host: %s\n", mqtt_conf->host);
|
||||
LOG_I("Port: %d\n", mqtt_conf->port);
|
||||
// LOG_I("=== 开始MQTT连接初始化 ===\n");
|
||||
// LOG_I("ClientId: %s\n", mqtt_conf->clientid);
|
||||
// LOG_I("Username: %s\n", mqtt_conf->username);
|
||||
// LOG_I("Password: %s\n", mqtt_conf->password);
|
||||
// LOG_I("Host: %s\n", mqtt_conf->host);
|
||||
// LOG_I("Port: %d\n", mqtt_conf->port);
|
||||
|
||||
// 提前设置topic名称,确保在连接回调中可用
|
||||
sprintf(nativeInvokTopicName,"/iot%s/thing/ota/upgrade",mqtt_conf->username);
|
||||
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
|
||||
// LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
|
||||
|
||||
// 设置多个订阅topic
|
||||
subscribeTopicCount = 0;
|
||||
@ -466,13 +594,13 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
|
||||
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/bind", mqtt_conf->username);
|
||||
sprintf(subscribeTopics[subscribeTopicCount++], "/iot/estation%s/group", mqtt_conf->username);
|
||||
sprintf(subscribeTopics[subscribeTopicCount++], "/iot%s/thing/ota/upgrade", mqtt_conf->username);
|
||||
sprintf(subscribeTopics[subscribeTopicCount++], "/sys/WcSubLightStrip/AD1000014C11/thing/service/lightOperate/invoke");
|
||||
// 只使用通配符订阅,覆盖所有设备的lightOperate消息
|
||||
sprintf(subscribeTopics[subscribeTopicCount++], "/sys/+/+/thing/service/lightOperate/invoke");
|
||||
|
||||
LOG_I("设置了%d个订阅topic:\n", subscribeTopicCount);
|
||||
for(int i = 0; i < subscribeTopicCount; i++) {
|
||||
LOG_I("Topic[%d]: %s\n", i, subscribeTopics[i]);
|
||||
}
|
||||
// LOG_I("设置了%d个订阅topic:\n", subscribeTopicCount);
|
||||
// for(int i = 0; i < subscribeTopicCount; i++) {
|
||||
// LOG_I("Topic[%d]: %s\n", i, subscribeTopics[i]);
|
||||
// }
|
||||
|
||||
infos = MQTTAsync_getVersionInfo();
|
||||
printVersionInfo(infos);
|
||||
@ -481,7 +609,7 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
|
||||
|
||||
//snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port);
|
||||
snprintf(url, sizeof(url), "tcp://%s:%d", mqtt_conf->host, mqtt_conf->port);
|
||||
LOG_I("MQTT URL: %s\n", url);
|
||||
// LOG_I("MQTT URL: %s\n", url);
|
||||
create_opts.sendWhileDisconnected = 0;
|
||||
|
||||
rc = MQTTAsync_createWithOptions(&mqtt_conf->client,
|
||||
@ -490,54 +618,54 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
|
||||
MQTTCLIENT_PERSISTENCE_NONE,
|
||||
NULL,
|
||||
&create_opts);
|
||||
LOG_I("MQTTAsync_create rc = %d\n", rc);
|
||||
// LOG_I("MQTTAsync_create rc = %d\n", rc);
|
||||
if (rc != MQTTASYNC_SUCCESS){
|
||||
LOG_I("MQTT客户端创建失败: %s\n", MQTTAsync_strerror(rc));
|
||||
ret = -1;
|
||||
goto error;
|
||||
}
|
||||
LOG_I("MQTTAsync_create succes s\n");
|
||||
// LOG_I("MQTTAsync_create success\n");
|
||||
|
||||
// 设置所有必要的回调函数
|
||||
LOG_I("设置MQTT回调函数\n");
|
||||
// LOG_I("设置MQTT回调函数\n");
|
||||
|
||||
rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected);
|
||||
LOG_I("MQTTAsync_setConnected rc = %d\n", rc);
|
||||
// LOG_I("MQTTAsync_setConnected rc = %d\n", rc);
|
||||
if (rc != MQTTASYNC_SUCCESS){
|
||||
LOG_I("设置连接回调失败: %s\n", MQTTAsync_strerror(rc));
|
||||
// LOG_I("设置连接回调失败: %s\n", MQTTAsync_strerror(rc));
|
||||
ret = -2;
|
||||
goto error;
|
||||
}
|
||||
LOG_I("MQTTAsync_setConnected success\n");
|
||||
// LOG_I("MQTTAsync_setConnected success\n");
|
||||
|
||||
rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected);
|
||||
LOG_I("MQTTAsy nc_setDisconnected rc = %d\n", rc);
|
||||
// LOG_I("MQTTAsync_setDisconnected rc = %d\n", rc);
|
||||
if (rc != MQTTASYNC_SUCCESS){
|
||||
LOG_I("设置断开回调失败: %s\n", MQTTAsync_strerror(rc));
|
||||
// LOG_I("设置断开回调失败: %s\n", MQTTAsync_strerror(rc));
|
||||
ret = -2;
|
||||
goto error;
|
||||
}
|
||||
LOG_I("MQTTAsync_setDisconnected success\n");
|
||||
// LOG_I("MQTTAsync_setDisconnected success\n");
|
||||
|
||||
// 设置消息到达回调
|
||||
rc = MQTTAsync_setMessageArrivedCallback(mqtt_conf->client, NULL, mqtt_utils_message_arrived);
|
||||
LOG_I("MQTTAsync_setMessageArrivedCallback rc = %d\n", rc);
|
||||
// LOG_I("MQTTAsync_setMessageArrivedCallback rc = %d\n", rc);
|
||||
if (rc != MQTTASYNC_SUCCESS){
|
||||
LOG_I("设置消息到达回调失败: %s\n", MQTTAsync_strerror(rc));
|
||||
// LOG_I("设置消息到达回调失败: %s\n", MQTTAsync_strerror(rc));
|
||||
ret = -2;
|
||||
goto error;
|
||||
}
|
||||
LOG_I("MQTTAsync_setMessageArrivedCallback success\n");
|
||||
// LOG_I("MQTTAsync_setMessageArrivedCallback success\n");
|
||||
|
||||
// 设置连接丢失回调
|
||||
rc = MQTTAsync_setConnectionLostCallback(mqtt_conf->client, NULL, mqtt_utils_connection_lost);
|
||||
LOG_I("MQTTAsync_setConnectionLostCallback rc = %d\n", rc);
|
||||
// LOG_I("MQTTAsync_setConnectionLostCallback rc = %d\n", rc);
|
||||
if (rc != MQTTASYNC_SUCCESS){
|
||||
LOG_I("设置连接丢失回调失败: %s\n", MQTTAsync_strerror(rc));
|
||||
// LOG_I("设置连接丢失回调失败: %s\n", MQTTAsync_strerror(rc));
|
||||
ret = -2;
|
||||
goto error;
|
||||
}
|
||||
LOG_I("MQTTAsync_setConnectionLostCallback success\n");
|
||||
// LOG_I("MQTTAsync_setConnectionLostCallback success\n");
|
||||
|
||||
/* connect option */
|
||||
conn_opts.onSuccess = mqtt_utils_on_connect_success;
|
||||
@ -579,15 +707,15 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
|
||||
conn_opts.ssl = &ssl_opts;
|
||||
}
|
||||
|
||||
LOG_I("正在连接MQTT服务器...\n");
|
||||
// LOG_I("正在连接MQTT服务器...\n");
|
||||
rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts);
|
||||
LOG_I("MQTTAsync_connect rc = %d\n", rc);
|
||||
// LOG_I("MQTTAsync_connect rc = %d\n", rc);
|
||||
if (rc != MQTTASYNC_SUCCESS){
|
||||
LOG_I("MQTT连接启动失败: %s\n", MQTTAsync_strerror(rc));
|
||||
ret = -1;
|
||||
goto error;
|
||||
}
|
||||
LOG_I("MQTTAsync_connect 启动成功\n");
|
||||
// LOG_I("MQTTAsync_connect 启动成功\n");
|
||||
|
||||
mqtt_conf->sub_opts = sub_opts;
|
||||
mqtt_conf->pub_opts = pub_opts;
|
||||
@ -604,12 +732,12 @@ int mqtt_utils_init(mqtt_utils_t *mqtt_config)
|
||||
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
|
||||
|
||||
// 验证回调函数设置
|
||||
LOG_I("=== 验证回调函数设置 ===\n");
|
||||
LOG_I("消息到达回调函数地址: %p\n", mqtt_utils_message_arrived);
|
||||
LOG_I("连接成功回调函数地址: %p\n", mqtt_utils_connected);
|
||||
LOG_I("订阅成功回调函数地址: %p\n", mqtt_utils_on_subscribe_success);
|
||||
LOG_I("订阅失败回调函数地址: %p\n", mqtt_utils_on_subscribe_failure);
|
||||
LOG_I("=== 回调函数验证完成 ===\n");
|
||||
// LOG_I("=== 验证回调函数设置 ===\n");
|
||||
// LOG_I("消息到达回调函数地址: %p\n", mqtt_utils_message_arrived);
|
||||
// LOG_I("连接成功回调函数地址: %p\n", mqtt_utils_connected);
|
||||
// LOG_I("订阅成功回调函数地址: %p\n", mqtt_utils_on_subscribe_success);
|
||||
// LOG_I("订阅失败回调函数地址: %p\n", mqtt_utils_on_subscribe_failure);
|
||||
// LOG_I("=== 回调函数验证完成 ===\n");
|
||||
|
||||
ret = 0;
|
||||
error:
|
||||
|
||||
@ -67,7 +67,7 @@ typedef struct
|
||||
|
||||
extern mqtt_utils_t mqtt_config;
|
||||
extern void PutDataIntoMQueue(char *payload);
|
||||
extern char g_mqtt_deviceName[256];
|
||||
extern char g_mqtt_deviceName[256]; // 从topic解析出的deviceName
|
||||
|
||||
typedef struct{
|
||||
char msg_messageId[32];
|
||||
|
||||
@ -3423,55 +3423,59 @@ int uart_data_receive_data_back(uart_utils_t *uart,uint16_t *parmAck,uint16_t *t
|
||||
uint16_t *signCode,uint16_t *reserve,uint32_t *lableParm){
|
||||
int ret = 0;
|
||||
jt_data_back_package_t jt_data_back_package;
|
||||
ret = uart_read_until_time(uart->uart_fd,(char *)&jt_data_back_package,sizeof(jt_data_back_package_t), 1000, 50);
|
||||
if(ret == sizeof(jt_data_back_package_t)){
|
||||
//LOG_I("%s success\n", __func__);
|
||||
if((jt_data_back_package.pre1 == 0x2424)
|
||||
&&(jt_data_back_package.pre2 == 0x2424)
|
||||
&&(jt_data_back_package.pre3 == 0x2424)){
|
||||
uint8_t version_1=(ntohs(jt_data_back_package.version)>>8)&0xFF;
|
||||
uint8_t version_2=(ntohs(jt_data_back_package.version))&0xFF;
|
||||
uint8_t signCode_1=(ntohs(jt_data_back_package.signCode)>>8)&0xFF;
|
||||
uint8_t signCode_2=(ntohs(jt_data_back_package.signCode))&0xFF;
|
||||
uint8_t reserve_1=(ntohs(jt_data_back_package.reserve)>>8)&0xFF;
|
||||
uint8_t reserve_2=(ntohs(jt_data_back_package.reserve))&0xFF;
|
||||
uint8_t lableParm_1=(ntohl(jt_data_back_package.lableParm)>>24)&0xFF;
|
||||
uint8_t lableParm_2=(ntohl(jt_data_back_package.lableParm)>>16)&0xFF;
|
||||
uint8_t lableParm_3=(ntohl(jt_data_back_package.lableParm)>>8)&0xFF;
|
||||
uint8_t lableParm_4=(ntohl(jt_data_back_package.lableParm))&0xFF;
|
||||
|
||||
uint8_t test1[15] = {jt_data_back_package.len,jt_data_back_package.featureCode,
|
||||
jt_data_back_package.count,jt_data_back_package.battery,version_1,version_2,
|
||||
jt_data_back_package.ledCtrl,signCode_1,signCode_2,reserve_1,reserve_2,
|
||||
lableParm_1,lableParm_2,lableParm_3,lableParm_4};
|
||||
uint16_t crc_c=CRC16_XMODEM(test1,15);
|
||||
//for(int i=0;i<15;i++){
|
||||
// printf("%02x ",test1[i]);
|
||||
//}
|
||||
//LOG_I("%s:XModem_CRC16 = %04x\r\n",__func__,crc_c);
|
||||
//LOG_I("CRC = %04x\r\n",ntohs(jt_data_back_package.crc));
|
||||
|
||||
*parmAck=ntohs(jt_data_back_package.pre4);
|
||||
*tagCodeHead=ntohs(jt_data_back_package.tagHead);
|
||||
*tagCode=ntohl(jt_data_back_package.tag);
|
||||
*tagSignal=jt_data_back_package.signal;
|
||||
*totalLen=jt_data_back_package.len;
|
||||
*tagFeature=jt_data_back_package.featureCode;
|
||||
*count=jt_data_back_package.count;
|
||||
*batteryV=jt_data_back_package.battery;
|
||||
*version=ntohs(jt_data_back_package.version);
|
||||
*ledCtrl=jt_data_back_package.ledCtrl;
|
||||
*signCode=ntohs(jt_data_back_package.signCode);
|
||||
*reserve=ntohs(jt_data_back_package.reserve);
|
||||
*lableParm=ntohl(jt_data_back_package.lableParm);
|
||||
}else{
|
||||
// LOG_I("%s failed\n", __func__);
|
||||
ret = -2;
|
||||
}
|
||||
}else{
|
||||
//LOG_I("%s, failed, time out\n", __func__);
|
||||
ret = -1;
|
||||
uint8_t *buf = (uint8_t *)&jt_data_back_package;
|
||||
int pkg_size = sizeof(jt_data_back_package_t);
|
||||
|
||||
ret = uart_read_until_time(uart->uart_fd,(char *)buf, pkg_size, 1000, 50);
|
||||
if(ret != pkg_size){
|
||||
return -1;
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
// 检查包头
|
||||
if(!((jt_data_back_package.pre1 == 0x2424)
|
||||
&&(jt_data_back_package.pre2 == 0x2424)
|
||||
&&(jt_data_back_package.pre3 == 0x2424))){
|
||||
// 帧错位,尝试在已读数据中找包头 0x24 0x24
|
||||
int sync_offset = -1;
|
||||
for(int i = 1; i < pkg_size - 1; i++){
|
||||
if(buf[i] == 0x24 && buf[i+1] == 0x24){
|
||||
sync_offset = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(sync_offset > 0){
|
||||
// 找到包头,左移数据并补读剩余字节
|
||||
int remain = pkg_size - sync_offset;
|
||||
memmove(buf, buf + sync_offset, remain);
|
||||
int need = sync_offset;
|
||||
int got = uart_read_until_time(uart->uart_fd, (char *)(buf + remain), need, 500, 50);
|
||||
if(got != need){
|
||||
return -2;
|
||||
}
|
||||
// 重新校验包头
|
||||
if(!((jt_data_back_package.pre1 == 0x2424)
|
||||
&&(jt_data_back_package.pre2 == 0x2424)
|
||||
&&(jt_data_back_package.pre3 == 0x2424))){
|
||||
return -2;
|
||||
}
|
||||
}else{
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
|
||||
// 解析数据
|
||||
*parmAck=ntohs(jt_data_back_package.pre4);
|
||||
*tagCodeHead=ntohs(jt_data_back_package.tagHead);
|
||||
*tagCode=ntohl(jt_data_back_package.tag);
|
||||
*tagSignal=jt_data_back_package.signal;
|
||||
*totalLen=jt_data_back_package.len;
|
||||
*tagFeature=jt_data_back_package.featureCode;
|
||||
*count=jt_data_back_package.count;
|
||||
*batteryV=jt_data_back_package.battery;
|
||||
*version=ntohs(jt_data_back_package.version);
|
||||
*ledCtrl=jt_data_back_package.ledCtrl;
|
||||
*signCode=ntohs(jt_data_back_package.signCode);
|
||||
*reserve=ntohs(jt_data_back_package.reserve);
|
||||
*lableParm=ntohl(jt_data_back_package.lableParm);
|
||||
return pkg_size;
|
||||
}
|
||||
|
||||
@ -1414,52 +1414,51 @@ typedef struct
|
||||
|
||||
typedef struct
|
||||
{
|
||||
uint8_t len;
|
||||
uint16_t customCode;
|
||||
uint8_t type;
|
||||
uint8_t bindStart;
|
||||
uint8_t bindEnd;
|
||||
uint32_t idStart;
|
||||
uint32_t idEnd;
|
||||
jt_led_or_group_package_t ledCtrl;
|
||||
uint16_t reserve;
|
||||
uint32_t crc;
|
||||
uint8_t len;
|
||||
uint16_t customCode;
|
||||
uint8_t type;
|
||||
uint8_t bindStart;
|
||||
uint8_t bindEnd;
|
||||
uint32_t idStart;
|
||||
uint32_t idEnd;
|
||||
jt_led_or_group_package_t ledCtrl;
|
||||
uint16_t reserve;
|
||||
uint32_t crc;
|
||||
}__attribute__((packed)) jt_id_package_t;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
uint16_t pre1;//$$ 0x2424
|
||||
uint16_t pre2;//$$ 0x2424
|
||||
uint16_t pre3;//$$ 0x2424 CC 0x4343 DD 0x4444
|
||||
uint16_t ack;//## 0x2323 complete TT 0x5454 timeout RR 0x5252 AP-reset EE 0x4545 AP-crc-error FF 0x4646 AP broadcast end
|
||||
uint16_t pre1;//$$ 0x2424
|
||||
uint16_t pre2;//$$ 0x2424
|
||||
uint16_t pre3;//$$ 0x2424 CC 0x4343 DD 0x4444
|
||||
uint16_t ack;//## 0x2323 complete TT 0x5454 timeout RR 0x5252 AP-reset EE 0x4545 AP-crc-error FF 0x4646 AP broadcast end
|
||||
}__attribute__((packed)) jt_receive_package_t;
|
||||
typedef struct
|
||||
{
|
||||
uint16_t v1;
|
||||
uint16_t v2;
|
||||
uint16_t v1;
|
||||
uint16_t v2;
|
||||
}__attribute__((packed)) jt_receive_version_package_t;
|
||||
typedef struct
|
||||
{
|
||||
uint16_t pre1;
|
||||
uint16_t pre2;
|
||||
uint16_t pre3;
|
||||
uint16_t pre4;
|
||||
uint16_t tagHead;
|
||||
uint32_t tag;
|
||||
uint8_t signal;
|
||||
uint8_t len;
|
||||
uint8_t featureCode;
|
||||
uint8_t count;
|
||||
uint8_t battery;
|
||||
uint16_t version;
|
||||
uint8_t ledCtrl;
|
||||
uint16_t signCode;
|
||||
uint16_t reserve;
|
||||
uint32_t lableParm;
|
||||
uint16_t crc;
|
||||
uint16_t pre1;
|
||||
uint16_t pre2;
|
||||
uint16_t pre3;
|
||||
uint16_t pre4;
|
||||
uint16_t tagHead;
|
||||
uint32_t tag;
|
||||
uint8_t signal;
|
||||
uint8_t len;
|
||||
uint8_t featureCode;
|
||||
uint8_t count;
|
||||
uint8_t battery;
|
||||
uint16_t version;
|
||||
uint8_t ledCtrl;
|
||||
uint16_t signCode;
|
||||
uint16_t reserve;
|
||||
uint32_t lableParm;
|
||||
uint16_t crc;
|
||||
}__attribute__((packed)) jt_data_back_package_t;
|
||||
|
||||
|
||||
uint32_t CRC32_DIRECT(uint8_t *data, uint32_t length);
|
||||
uint16_t CRC16_XMODEM(uint8_t *puchMsg, uint32_t usDataLen);
|
||||
|
||||
@ -1468,10 +1467,10 @@ int uart_data_send_head(uart_utils_t *uart,uint8_t func,uint8_t wakeup_time,uint
|
||||
int uart_data_send_lighton_or_group(uart_utils_t *uart,jt_tag_package_t tags[],uint8_t tag_num);
|
||||
int uart_data_send_lighton_by_group(uart_utils_t *uart,jt_group_package_t groups[],uint8_t tag_num);
|
||||
int uart_data_send_lighton_by_id(uart_utils_t *uart,uint8_t bind_start,uint8_t bind_end,
|
||||
uint32_t id_start,uint32_t id_end,jt_led_or_group_package_t led_ctrl,uint16_t reserve);
|
||||
uint32_t id_start,uint32_t id_end,jt_led_or_group_package_t led_ctrl,uint16_t reserve);
|
||||
int uart_data_receive_data_back(uart_utils_t *uart,uint16_t *parmAck,uint16_t *tagCodeHead,uint32_t *tagCode,uint8_t *tagSignal,
|
||||
uint8_t *totalLen,uint8_t *tagFeature,uint8_t *count,uint8_t *batteryV,uint16_t *version,uint8_t *ledCtrl,
|
||||
uint16_t *signCode,uint16_t *reserve,uint32_t *lableParm);
|
||||
uint8_t *totalLen,uint8_t *tagFeature,uint8_t *count,uint8_t *batteryV,uint16_t *version,uint8_t *ledCtrl,
|
||||
uint16_t *signCode,uint16_t *reserve,uint32_t *lableParm);
|
||||
int uart_data_receive_ack(uart_utils_t *uart,uint16_t *parm_ack);
|
||||
int uart_data_receive_version(uart_utils_t *uart);
|
||||
#endif
|
||||
|
||||
Loading…
Reference in New Issue
Block a user