#include "MQTTAsync.h" #include #include #include #include #include #include #include #include // MQTT协议常量(与JD项目保持一致) #define MQTT_ALGORITHM "AUK1_MQTT_HMAC_SHA1" #define MQTT_SECURE_MODE "TCP" #define MQTT_AUTH_TYPE_REGISTER "REGISTER" #define MQTT_AUTH_TYPE_AUTH "AUTH" // 全局变量(与JD项目保持一致) char stationsn[32] = "90A9F73002CD"; char productid[16] = "WcLightStrip"; char appKey[32] = "fdhQmhqhvbL1cf1K9mUqt"; char appSecret[64] = "RxU8NZjfZaxsKg2B3Dr6sx"; char mqttRawPassword[64] = ""; char *hostDomain = "auk-iot.test.zservey.com"; int mqtt_port = 1883; static int message_arrived_count = 0; // 前向声明 void onSubscribe(void *context, MQTTAsync_successData *response); void onSubscribeFailure(void *context, MQTTAsync_failureData *response); // HMAC-SHA1函数(与JD项目保持一致) void hmacsha1_hex(char *key, char* data, char *signhex, int signhex_len) { unsigned char hash[SHA_DIGEST_LENGTH]; unsigned int hash_len; HMAC(EVP_sha1(), key, strlen(key), (unsigned char*)data, strlen(data), hash, &hash_len); for (unsigned int i = 0; i < hash_len && i * 2 + 1 < signhex_len; i++) { snprintf(signhex + i * 2, signhex_len - i * 2, "%02x", hash[i]); } } // 获取IP地址(简化实现) void get_ip_by_domain(const char* domain, char* ip, int ip_len) { strcpy(ip, "103.37.154.120"); } void messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) { printf("=== 收到MQTT消息 ===\n"); printf("Topic: %s\n", topicName); printf("消息长度: %d\n", message->payloadlen); printf("消息内容: %.*s\n", message->payloadlen, (char*)message->payload); printf("QoS: %d\n", message->qos); printf("Retained: %d\n", message->retained); printf("=== 消息处理完成 ===\n"); message_arrived_count++; MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); } void onConnect(void *context, MQTTAsync_successData *response) { printf("=== MQTT连接成功 ===\n"); printf("连接响应: %p\n", response); if (response) { printf("连接成功数据: %p\n", response->alt.connect); } MQTTAsync client = (MQTTAsync)context; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; // 设置订阅回调 opts.onSuccess = onSubscribe; opts.onFailure = onSubscribeFailure; // 订阅相同的topic char topic[] = "/iot/WcLightStrip/90A9F73002CD/thing/ota/upgrade"; printf("订阅topic: %s\n", topic); int rc = MQTTAsync_subscribe(client, topic, 1, &opts); if (rc != MQTTASYNC_SUCCESS) { printf("订阅失败: %d - %s\n", rc, MQTTAsync_strerror(rc)); } else { printf("订阅请求已发送,等待服务器确认...\n"); } // 订阅通配符 printf("订阅通配符: #\n"); rc = MQTTAsync_subscribe(client, "#", 1, &opts); if (rc != MQTTASYNC_SUCCESS) { printf("通配符订阅失败: %d - %s\n", rc, MQTTAsync_strerror(rc)); } else { printf("通配符订阅请求已发送,等待服务器确认...\n"); } // 订阅测试topic printf("订阅测试topic: /test/public/topic\n"); rc = MQTTAsync_subscribe(client, "/test/public/topic", 0, &opts); if (rc != MQTTASYNC_SUCCESS) { printf("测试topic订阅失败: %d - %s\n", rc, MQTTAsync_strerror(rc)); } else { printf("测试topic订阅请求已发送,等待服务器确认...\n"); } // 发送测试消息 printf("发送测试消息到 /test/public/topic\n"); char test_payload[] = "{\"test\":\"from_test_program\",\"timestamp\":\"test\"}"; rc = MQTTAsync_send(client, "/test/public/topic", strlen(test_payload), test_payload, 0, 0, NULL); if (rc != MQTTASYNC_SUCCESS) { printf("发送测试消息失败: %d - %s\n", rc, MQTTAsync_strerror(rc)); } else { printf("测试消息发送成功\n"); } printf("=== 订阅完成 ===\n"); } void onConnectFailure(void *context, MQTTAsync_failureData *response) { printf("MQTT连接失败\n"); if (response) { printf("错误代码: %d\n", response->code); if (response->message) { printf("错误信息: %s\n", response->message); } } } void onSubscribe(void *context, MQTTAsync_successData *response) { printf("订阅成功\n"); } void onSubscribeFailure(void *context, MQTTAsync_failureData *response) { printf("订阅失败\n"); if (response) { printf("错误代码: %d\n", response->code); } } void onDisconnect(void *context, MQTTAsync_successData *response) { printf("MQTT断开连接\n"); } void connectionLost(void *context, char *cause) { printf("连接丢失: %s\n", cause ? cause : "未知原因"); } int main() { MQTTAsync client; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; int rc; printf("=== MQTT测试程序 ===\n"); printf("服务器: %s:%d\n", hostDomain, mqtt_port); printf("设备ID: %s\n", stationsn); printf("产品ID: %s\n", productid); // 动态生成MQTT配置(与JD项目保持一致) struct timeval tv; gettimeofday(&tv, NULL); long timestamp = tv.tv_sec * 1000 + tv.tv_usec / 1000; printf("时间戳: %ld\n", timestamp); // 使用白名单设备ID char unique_device_id[32]; strcpy(unique_device_id, stationsn); printf("设备ID: %s\n", unique_device_id); // 使用AUTH模式 const char* authType = MQTT_AUTH_TYPE_AUTH; char* secretKey = appSecret; // 使用ProductSecret printf("认证模式: %s\n", authType); // 构建ClientId char clientid[256]; snprintf(clientid, sizeof(clientid), "%s&/%s/%s/%s/%ld/%s", productid, unique_device_id, MQTT_SECURE_MODE, MQTT_ALGORITHM, timestamp, authType); printf("ClientId: %s\n", clientid); // 构建Username char username[64]; snprintf(username, sizeof(username), "/%s/%s", productid, unique_device_id); printf("Username: %s\n", username); // 构建StringToSign char stringToSign[512]; char timestamp_str[32]; snprintf(timestamp_str, sizeof(timestamp_str), "%ld", timestamp); snprintf(stringToSign, sizeof(stringToSign), "%s%s%s%s%s", unique_device_id, unique_device_id, productid, MQTT_ALGORITHM, timestamp_str); printf("StringToSign: %s\n", stringToSign); // 生成Password char password[64]; hmacsha1_hex(secretKey, stringToSign, password, sizeof(password)); printf("Password: %s\n", password); // 解析服务器地址 char hostip[16]; get_ip_by_domain(hostDomain, hostip, sizeof(hostip)); printf("服务器IP: %s\n", hostip); // 创建客户端 char url[256]; snprintf(url, sizeof(url), "tcp://%s:%d", hostip, mqtt_port); rc = MQTTAsync_create(&client, url, clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL); if (rc != MQTTASYNC_SUCCESS) { printf("创建客户端失败: %d\n", rc); return -1; } // 设置回调函数 rc = MQTTAsync_setConnected(client, client, onConnect); if (rc != MQTTASYNC_SUCCESS) { printf("设置连接回调失败: %d\n", rc); return -1; } rc = MQTTAsync_setDisconnected(client, client, onDisconnect); if (rc != MQTTASYNC_SUCCESS) { printf("设置断开回调失败: %d\n", rc); return -1; } rc = MQTTAsync_setMessageArrivedCallback(client, client, messageArrived); if (rc != MQTTASYNC_SUCCESS) { printf("设置消息到达回调失败: %d\n", rc); return -1; } rc = MQTTAsync_setConnectionLostCallback(client, client, connectionLost); if (rc != MQTTASYNC_SUCCESS) { printf("设置连接丢失回调失败: %d\n", rc); return -1; } // 设置连接选项 conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; conn_opts.context = client; conn_opts.cleansession = 1; conn_opts.username = username; conn_opts.password = password; conn_opts.keepAliveInterval = 60; conn_opts.connectTimeout = 30; conn_opts.MQTTVersion = 4; printf("正在连接MQTT服务器...\n"); rc = MQTTAsync_connect(client, &conn_opts); if (rc != MQTTASYNC_SUCCESS) { printf("启动连接失败: %d\n", rc); return -1; } printf("连接请求已发送,等待连接结果...\n"); // 等待连接完成 int connect_timeout = 10; while (connect_timeout > 0) { sleep(1); connect_timeout--; printf("等待连接... %d秒\n", connect_timeout); } printf("等待消息...\n"); printf("按Ctrl+C退出\n"); // 等待消息 while (1) { sleep(1); if (message_arrived_count > 0) { printf("已收到 %d 条消息\n", message_arrived_count); } } return 0; }