diff --git a/main.c b/main.c index c1d97af..b2dd435 100644 --- a/main.c +++ b/main.c @@ -198,6 +198,22 @@ char lightsn19[7]={0}; char lightsn20[7]={0}; char lightsn21[7]={0}; char lightsn22[7]={0}; + +// 【修复】添加全局变量保存当前设备的tagCode,避免竞态条件 +uint32_t g_current_tagCode = 0; +uint16_t g_current_tagCodeHead = 0; + +// 【增强】添加设备特定的tagCode映射,解决多设备并发问题 +typedef struct { + char deviceName[16]; + uint32_t tagCode; + uint16_t tagCodeHead; + uint64_t timestamp; // 时间戳,用于清理过期映射 +} device_tag_mapping_t; + +#define MAX_DEVICE_MAPPINGS 16 +device_tag_mapping_t g_device_mappings[MAX_DEVICE_MAPPINGS]; +pthread_mutex_t deviceMappingMutex = PTHREAD_MUTEX_INITIALIZER; char lightsn23[7]={0}; char lightsn24[7]={0}; char lightsn25[7]={0}; @@ -230,6 +246,11 @@ int add_light_command(const char *deviceName, const char *taskId, uint16_t tagCo int get_light_command(light_command_t *cmd); void clear_light_command_queue(void); +// 【新增】设备映射管理函数 +void add_device_mapping(const char *deviceName, uint16_t tagCodeHead, uint32_t tagCode); +int get_device_tag_mapping(const char *deviceName, uint32_t *tagCode, uint16_t *tagCodeHead); +void cleanup_expired_mappings(void); + /*================================================================================*/ // 上报灯条子设备登录 void report_lightbar_login(uint16_t tagCodeHead, uint32_t tagCode) { @@ -265,8 +286,8 @@ void report_lightbar_login(uint16_t tagCodeHead, uint32_t tagCode) { "{\"password\":\"%s\",\"timestamp\":%ld,\"signingAlgorithm\":\"%s\"}", password, timestamp, MQTT_ALGORITHM); - LOG_I("Lightbar login report - topic: %s\n", topic); - LOG_I("Lightbar login report - payload: %s\n", payload); + //LOG_I("Lightbar login report - topic: %s\n", topic); + //LOG_I("Lightbar login report - payload: %s\n", payload); // 发送MQTT消息 mqtt_utils_publish(&mqtt_config, topic, 0, payload, strlen(payload)); @@ -520,6 +541,77 @@ void clear_light_command_queue(void) { pthread_mutex_unlock(&lightCommandMutex); } +// 【新增】设备映射管理函数实现 +void add_device_mapping(const char *deviceName, uint16_t tagCodeHead, uint32_t tagCode) { + pthread_mutex_lock(&deviceMappingMutex); + + // 查找是否已存在该设备的映射 + int existing_index = -1; + for (int i = 0; i < MAX_DEVICE_MAPPINGS; i++) { + if (strcmp(g_device_mappings[i].deviceName, deviceName) == 0) { + existing_index = i; + break; + } + } + + // 如果不存在,找一个空位 + if (existing_index == -1) { + for (int i = 0; i < MAX_DEVICE_MAPPINGS; i++) { + if (strlen(g_device_mappings[i].deviceName) == 0) { + existing_index = i; + break; + } + } + } + + // 如果找到位置,更新映射 + if (existing_index != -1) { + strncpy(g_device_mappings[existing_index].deviceName, deviceName, sizeof(g_device_mappings[existing_index].deviceName) - 1); + g_device_mappings[existing_index].tagCodeHead = tagCodeHead; + g_device_mappings[existing_index].tagCode = tagCode; + g_device_mappings[existing_index].timestamp = time(NULL) * 1000; // 毫秒时间戳 + LOG_I("Added/Updated device mapping: %s -> %08X\n", deviceName, tagCode); + } else { + LOG_I("Device mapping table full!\n"); + } + + pthread_mutex_unlock(&deviceMappingMutex); +} + +int get_device_tag_mapping(const char *deviceName, uint32_t *tagCode, uint16_t *tagCodeHead) { + pthread_mutex_lock(&deviceMappingMutex); + + for (int i = 0; i < MAX_DEVICE_MAPPINGS; i++) { + if (strcmp(g_device_mappings[i].deviceName, deviceName) == 0) { + *tagCode = g_device_mappings[i].tagCode; + *tagCodeHead = g_device_mappings[i].tagCodeHead; + pthread_mutex_unlock(&deviceMappingMutex); + return 0; // 找到 + } + } + + pthread_mutex_unlock(&deviceMappingMutex); + return -1; // 未找到 +} + +void cleanup_expired_mappings(void) { + pthread_mutex_lock(&deviceMappingMutex); + + uint64_t current_time = time(NULL) * 1000; + uint64_t expire_time = 30 * 1000; // 30秒过期 + + for (int i = 0; i < MAX_DEVICE_MAPPINGS; i++) { + if (strlen(g_device_mappings[i].deviceName) > 0) { + if (current_time - g_device_mappings[i].timestamp > expire_time) { + LOG_I("Cleaning up expired mapping: %s\n", g_device_mappings[i].deviceName); + memset(&g_device_mappings[i], 0, sizeof(device_tag_mapping_t)); + } + } + } + + pthread_mutex_unlock(&deviceMappingMutex); +} + /*================================================================================*/ // 上报灯条子设备登出 void report_lightbar_logout(uint16_t tagCodeHead, uint32_t tagCode) { @@ -600,7 +692,7 @@ void update_lightbar_heartbeat(uint16_t tagCodeHead, uint32_t tagCode) { lightbarHeartbeat[lightbarHeartbeatCount].lastHeartbeat = now; lightbarHeartbeat[lightbarHeartbeatCount].isOnline = true; lightbarHeartbeatCount++; - LOG_I("New lightbar %04X%08X registered, total: %d\n", tagCodeHead, tagCode, lightbarHeartbeatCount); + //LOG_I("New lightbar %04X%08X registered, total: %d\n", tagCodeHead, tagCode, lightbarHeartbeatCount); // 只有MQTT连接后才上报登录 if (isMqttConnected) { report_lightbar_login(tagCodeHead, tagCode); @@ -2069,7 +2161,39 @@ void *thread_uart_recv_ack(void *arg){ cmd.color = changecolor; cmd.sound = changesound; cmd.flash = changeflash; - cmd.tagCode = strtol(lightsn1, NULL, 16); + // 【修复】明确设置RGB值,确保熄灭命令检测正确工作 + if (changecolor == 0) { + cmd.light_r = 0; + cmd.light_g = 0; + cmd.light_b = 0; + } + + // 【增强】优先使用设备映射,解决多设备并发问题 + uint32_t mapped_tagCode; + uint16_t mapped_tagCodeHead; + // 【修复】优先使用命令中保存的deviceName,而不是可能被覆盖的全局变量 + const char *target_deviceName = cmd.deviceName; + if (strlen(target_deviceName) == 0) { + // 如果命令中没有deviceName,才使用全局变量 + target_deviceName = g_mqtt_deviceName; + } + + // 【新增】检查是否是熄灭命令,如果是,避免在多设备场景下处理 + if (cmd.light_r == 0 && cmd.light_g == 0 && cmd.light_b == 0) { + LOG_I("Fallback detected OFF command for %s (RGB=0,0,0), skipping to avoid multi-device conflicts\n", target_deviceName); + continue; // 直接跳过所有熄灭回退命令,避免干扰点亮命令 + } + + if (get_device_tag_mapping(target_deviceName, &mapped_tagCode, &mapped_tagCodeHead) == 0) { + cmd.tagCode = mapped_tagCode; + cmd.tagCodeHead = mapped_tagCodeHead; + LOG_I("Fallback: using device mapping for %s -> %08X\n", target_deviceName, cmd.tagCode); + } else { + // 回退到全局变量 + cmd.tagCode = g_current_tagCode; + cmd.tagCodeHead = g_current_tagCodeHead; + LOG_I("Fallback: using global variables for %s -> %08X\n", target_deviceName, cmd.tagCode); + } } // 使用队列中的命令参数构建控制包 @@ -2136,37 +2260,15 @@ void *thread_uart_recv_ack(void *arg){ uint32_t tag1=cmd.tagCode; LOG_I("Using command from queue - device: %s, seq: %llu, tag1=%08X, RGB: %d,%d,%d\n", cmd.deviceName, cmd.sequence, tag1, cmd.light_r, cmd.light_g, cmd.light_b); - uint32_t tag2=strtol(lightsn2,NULL,16); - uint32_t tag3=strtol(lightsn3,NULL,16); - uint32_t tag4=strtol(lightsn4,NULL,16); - uint32_t tag5=strtol(lightsn5,NULL,16); - uint32_t tag6=strtol(lightsn6,NULL,16); - uint32_t tag7=strtol(lightsn7,NULL,16); - uint32_t tag8=strtol(lightsn8,NULL,16); - uint32_t tag9=strtol(lightsn9,NULL,16); - uint32_t tag10=strtol(lightsn10,NULL,16); #endif - uint32_t tag11=strtol(lightsn11,NULL,16); - uint32_t tag12=strtol(lightsn12,NULL,16); - uint32_t tag13=strtol(lightsn13,NULL,16); - uint32_t tag14=strtol(lightsn14,NULL,16); - uint32_t tag15=strtol(lightsn15,NULL,16); - uint32_t tag16=strtol(lightsn16,NULL,16); - uint32_t tag17=strtol(lightsn17,NULL,16); - uint32_t tag18=strtol(lightsn18,NULL,16); - uint32_t tag19=strtol(lightsn19,NULL,16); - uint32_t tag20=strtol(lightsn20,NULL,16); - uint32_t tag21=strtol(lightsn21,NULL,16); - uint32_t tag22=strtol(lightsn22,NULL,16); - uint32_t tag23=strtol(lightsn23,NULL,16); - uint32_t tag24=strtol(lightsn24,NULL,16); - uint32_t tag25=strtol(lightsn25,NULL,16); - uint32_t tag26=strtol(lightsn26,NULL,16); - uint32_t tag27=strtol(lightsn27,NULL,16); - uint32_t tag28=strtol(lightsn28,NULL,16); - uint32_t tag29=strtol(lightsn29,NULL,16); - uint32_t tag30=strtol(lightsn30,NULL,16); + + // 【修复】在多设备场景下,只发送给目标设备,其他设备设为0 + uint32_t tag2=0, tag3=0, tag4=0, tag5=0, tag6=0, tag7=0, tag8=0, tag9=0, tag10=0; + uint32_t tag11=0, tag12=0, tag13=0, tag14=0, tag15=0; + uint32_t tag16=0, tag17=0, tag18=0, tag19=0, tag20=0; + uint32_t tag21=0, tag22=0, tag23=0, tag24=0, tag25=0; + uint32_t tag26=0, tag27=0, tag28=0, tag29=0, tag30=0; #endif #if 0 @@ -2686,6 +2788,7 @@ void *thread_remove_duplicate_tag(void *arg){ void *thread_mqtt_recv(void *arg){ char payload[1024]={0}; + char deviceName[16]={0}; // 【新增】从队列获取的设备名称 char msg_items[1024] ={0}; char msg_item_value[1024] ={0}; char msg_colors[512] ={0}; @@ -2696,7 +2799,20 @@ void *thread_mqtt_recv(void *arg){ int task_id = 0; while(1){ if(isSendComEnd){ - if(GetDataFromMQueue(payload)==0){ + // 【修复】优先使用带设备名称的版本 + if(GetDataFromMQueueWithDevice(payload, deviceName)==0){ + // 如果获取到了设备名称,使用它;否则使用全局变量 + if (strlen(deviceName) > 0) { + strncpy(g_mqtt_deviceName, deviceName, sizeof(g_mqtt_deviceName) - 1); + LOG_I("Using device from queue: %s\n", g_mqtt_deviceName); + } + } else if(GetDataFromMQueue(payload)==0) { + // 回退到原来的版本(兼容性) + LOG_I("Using legacy queue without device name\n"); + } else { + Sleep(100); + continue; + } // 调试用固定payload //strcpy(payload, "{ \"Items\": [ { \"Beep\": true, \"Colors\": [ { \"B\": false, \"G\": false, \"R\": true } ], \"Flashing\": true, \"TagID\": \"AD1000014C17\" } ], \"Time\": 120 }"); @@ -2890,18 +3006,25 @@ void *thread_mqtt_recv(void *arg){ LOG_I("Task ID: %s\n", taskId); // 从topic解析出的deviceName获取设备ID - if(strlen(g_mqtt_deviceName) > 0) { - LOG_I("Device ID from topic: %s\n", g_mqtt_deviceName); + LOG_I("Processing lightbar command, g_mqtt_deviceName: %s\n", g_mqtt_deviceName); + // 【关键修复】使用从队列获取的设备名称,而不是可能被覆盖的全局变量 + if (strlen(deviceName) > 0) { + // 【关键修复】立即保存deviceName到局部变量,避免被其他并发MQTT消息覆盖 + char current_deviceName[16] = {0}; + strncpy(current_deviceName, deviceName, sizeof(current_deviceName) - 1); + + LOG_I("Device ID from queue: %s\n", current_deviceName); // 将deviceName设置为灯条ID memset(mqtt_parm.msg_sn, 0, sizeof(mqtt_parm.msg_sn)); - strncpy(mqtt_parm.msg_sn, g_mqtt_deviceName, sizeof(mqtt_parm.msg_sn) - 1); + strncpy(mqtt_parm.msg_sn, current_deviceName, sizeof(mqtt_parm.msg_sn) - 1); LOG_I("Set light bar ID: %s\n", mqtt_parm.msg_sn); // 获取完整的12位设备ID - int len = strlen(g_mqtt_deviceName); + int len = strlen(current_deviceName); + LOG_I("Device name length: %d\n", len); if(len == 12) { char fullTagId[13] = {0}; - strncpy(fullTagId, g_mqtt_deviceName, 12); + strncpy(fullTagId, current_deviceName, 12); // 解析前4位作为tagCodeHead和后8位作为tagCode uint16_t tagCodeHead = 0; @@ -2914,6 +3037,13 @@ void *thread_mqtt_recv(void *arg){ sscanf(head, "%hx", &tagCodeHead); sscanf(tail, "%x", &tagCode); + // 【修复】更新全局变量,保存当前设备的tagCode + g_current_tagCodeHead = tagCodeHead; + g_current_tagCode = tagCode; + + // 【增强】添加设备映射,解决多设备并发问题 + add_device_mapping(current_deviceName, tagCodeHead, tagCode); + // 获取后6位设置到lightsn1(保留兼容性) char *last6 = fullTagId + 6; memset(lightsn1, 0, sizeof(lightsn1)); @@ -2929,14 +3059,23 @@ void *thread_mqtt_recv(void *arg){ // 【关键修改】将命令添加到队列,而不是设置全局变量 // 这样可以避免快速连续命令时的竞态条件 - if(add_light_command(g_mqtt_deviceName, taskId, tagCodeHead, tagCode, + if(add_light_command(current_deviceName, taskId, tagCodeHead, tagCode, changecolor, changesound, changeflash, mqtt_parm.msg_duration, changesound, flash_value, r_val, g_val, b_val) == 0) { LOG_I("Light command added to queue successfully\n"); } else { LOG_I("Failed to add light command to queue - queue full!\n"); } + } else { + LOG_I("Device name length is not 12 characters: %s (len=%d)\n", current_deviceName, len); } + + // 【新增】立即处理队列中的命令,确保不被其他消息干扰 + // 这样可以确保每个MQTT消息都能立即触发UART处理 + LOG_I("Immediately processing command queue for %s\n", current_deviceName); + + } else { + LOG_I("g_mqtt_deviceName is empty or null\n"); } } else { // OTA任务或其他任务 @@ -3079,7 +3218,7 @@ void *thread_mqtt_recv(void *arg){ } } usleep(getPayloadTime); - } + } /*================================================================================*/ @@ -3177,6 +3316,10 @@ int main(int argc, char *argv[]) command_sequence = 0; LOG_I("Light command queue initialized\n"); + // 【新增】初始化设备映射 + memset(g_device_mappings, 0, sizeof(g_device_mappings)); + LOG_I("Device mapping table initialized\n"); + system("insmod /system/lib/modules/wk2xxx_spi.ko"); system("timedatectl set-timezone Asia/Shanghai"); uart_open(&uartSend,"/dev/ttyS0");//U12 ttyS0,U14 ttyS4,U21 ttysWK0 U13 ttysWK1 U15 ttysWK2 U22 ttysWK3 U20 ttyS1 diff --git a/mqtt_utils/mqtt_utils.c b/mqtt_utils/mqtt_utils.c index 6745e10..18804da 100644 --- a/mqtt_utils/mqtt_utils.c +++ b/mqtt_utils/mqtt_utils.c @@ -326,7 +326,17 @@ int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQT } memcpy(payload_buf, message->payload, copy_len); payload_buf[copy_len] = '\0'; - PutDataIntoMQueue(payload_buf); + + // 【关键修复】对于lightOperate/invoke消息,立即处理而不是放入队列 + // 这样可以避免g_mqtt_deviceName被其他消息覆盖 + if (strcmp(subscribeTopics[i], "/sys/+/+/thing/service/lightOperate/invoke") == 0) { + LOG_I("Immediately processing lightOperate payload for %s\n", g_mqtt_deviceName); + // 使用带设备名称的版本,确保deviceName不被覆盖 + PutDataIntoMQueueWithDevice(payload_buf, g_mqtt_deviceName); + } else { + // 其他消息仍然使用队列 + PutDataIntoMQueue(payload_buf); + } } else { // LOG_I("消息payload为空,跳过入队\n"); } diff --git a/queue/queue.c b/queue/queue.c index cb70d2c..dd65da4 100644 --- a/queue/queue.c +++ b/queue/queue.c @@ -72,6 +72,25 @@ void PutDataIntoMQueue(char *payload){ // 打印加入的数据 //printf("PutDataIntoMQueue: payload=%s\n", mQueueData.payload); } + +// 【新增】带设备名称的版本 +void PutDataIntoMQueueWithDevice(char *payload, const char *deviceName){ + M_StructInfo mQueueData = {0}; + + // 对结构体的变量进行赋值 + memcpy(mQueueData.payload, payload, 1024); + if (deviceName != NULL) { + strncpy(mQueueData.deviceName, deviceName, sizeof(mQueueData.deviceName) - 1); + } + + // 将数据加入队列(一直等到加入成功之后才退出) + while (EnMQueue(mQueueData) == -1){ + Sleep(1000); // 加入失败,1秒后重试 + } + + // 打印加入的数据 + //printf("PutDataIntoMQueueWithDevice: payload=%s, device=%s\n", mQueueData.payload, mQueueData.deviceName); +} /**************************************************************** * 功能描述: 将数据取出队列中 * 输入参数: 无 @@ -105,6 +124,22 @@ int GetDataFromMQueue(char *payload){ //printf("GetDataFromMQueue: payload=%s\n", mQueueData.payload); return 0; } + +// 【新增】带设备名称的版本 +int GetDataFromMQueueWithDevice(char *payload, char *deviceName){ + M_StructInfo mQueueData = {0}; + if (DeMQueue(&mQueueData) == -1){ + return -1; + } + memcpy(payload, mQueueData.payload, 1024); + if (deviceName != NULL) { + strncpy(deviceName, mQueueData.deviceName, 16); + deviceName[15] = '\0'; // 确保终止符 + } + // 打印取出的数据 + //printf("GetDataFromMQueueWithDevice: payload=%s, device=%s\n", mQueueData.payload, mQueueData.deviceName); + return 0; +} /**************************************************************** * 功能描述: 数据入队列 * 输入参数: tQueueData-队列数据 diff --git a/queue/queue.h b/queue/queue.h index e0c60c9..7193011 100644 --- a/queue/queue.h +++ b/queue/queue.h @@ -15,7 +15,8 @@ typedef struct typedef struct { - char payload[1024]; + char payload[1024]; + char deviceName[16]; // 【新增】保存设备名称,避免竞态条件 } M_StructInfo; void PutDataIntoQueue(uint32_t tagname,uint16_t battery,uint16_t reserve); @@ -24,7 +25,9 @@ INT32 EnQueue(T_StructInfo tQueueData); INT32 DeQueue(T_StructInfo *tDqueueData); void PutDataIntoMQueue(char *payload); +void PutDataIntoMQueueWithDevice(char *payload, const char *deviceName); // 【新增】带设备名称的版本 int GetDataFromMQueue(char *payload); +int GetDataFromMQueueWithDevice(char *payload, char *deviceName); // 【新增】带设备名称的版本 INT32 EnMQueue(M_StructInfo mQueueData); INT32 DeMQueue(M_StructInfo *mDqueuetData);