AP05/mqtt_test.c
2025-12-02 13:06:35 +08:00

278 lines
8.9 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "MQTTAsync.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include <openssl/hmac.h>
#include <openssl/sha.h>
// MQTT协议常量与JD项目保持一致
#define MQTT_ALGORITHM "AUK1_MQTT_HMAC_SHA1"
#define MQTT_SECURE_MODE "TCP"
#define MQTT_AUTH_TYPE_REGISTER "REGISTER"
#define MQTT_AUTH_TYPE_AUTH "AUTH"
// 全局变量与JD项目保持一致
char stationsn[32] = "90A9F73002CD";
char productid[16] = "WcLightStrip";
char appKey[32] = "fdhQmhqhvbL1cf1K9mUqt";
char appSecret[64] = "RxU8NZjfZaxsKg2B3Dr6sx";
char mqttRawPassword[64] = "";
char *hostDomain = "auk-iot.test.zservey.com";
int mqtt_port = 1883;
static int message_arrived_count = 0;
// 前向声明
void onSubscribe(void *context, MQTTAsync_successData *response);
void onSubscribeFailure(void *context, MQTTAsync_failureData *response);
// HMAC-SHA1函数与JD项目保持一致
void hmacsha1_hex(char *key, char* data, char *signhex, int signhex_len) {
unsigned char hash[SHA_DIGEST_LENGTH];
unsigned int hash_len;
HMAC(EVP_sha1(), key, strlen(key), (unsigned char*)data, strlen(data), hash, &hash_len);
for (unsigned int i = 0; i < hash_len && i * 2 + 1 < signhex_len; i++) {
snprintf(signhex + i * 2, signhex_len - i * 2, "%02x", hash[i]);
}
}
// 获取IP地址简化实现
void get_ip_by_domain(const char* domain, char* ip, int ip_len) {
strcpy(ip, "103.37.154.120");
}
void messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) {
printf("=== 收到MQTT消息 ===\n");
printf("Topic: %s\n", topicName);
printf("消息长度: %d\n", message->payloadlen);
printf("消息内容: %.*s\n", message->payloadlen, (char*)message->payload);
printf("QoS: %d\n", message->qos);
printf("Retained: %d\n", message->retained);
printf("=== 消息处理完成 ===\n");
message_arrived_count++;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
}
void onConnect(void *context, MQTTAsync_successData *response) {
printf("=== MQTT连接成功 ===\n");
printf("连接响应: %p\n", response);
if (response) {
printf("连接成功数据: %p\n", response->alt.connect);
}
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
// 设置订阅回调
opts.onSuccess = onSubscribe;
opts.onFailure = onSubscribeFailure;
// 订阅相同的topic
char topic[] = "/iot/WcLightStrip/90A9F73002CD/thing/ota/upgrade";
printf("订阅topic: %s\n", topic);
int rc = MQTTAsync_subscribe(client, topic, 1, &opts);
if (rc != MQTTASYNC_SUCCESS) {
printf("订阅失败: %d - %s\n", rc, MQTTAsync_strerror(rc));
} else {
printf("订阅请求已发送,等待服务器确认...\n");
}
// 订阅通配符
printf("订阅通配符: #\n");
rc = MQTTAsync_subscribe(client, "#", 1, &opts);
if (rc != MQTTASYNC_SUCCESS) {
printf("通配符订阅失败: %d - %s\n", rc, MQTTAsync_strerror(rc));
} else {
printf("通配符订阅请求已发送,等待服务器确认...\n");
}
// 订阅测试topic
printf("订阅测试topic: /test/public/topic\n");
rc = MQTTAsync_subscribe(client, "/test/public/topic", 0, &opts);
if (rc != MQTTASYNC_SUCCESS) {
printf("测试topic订阅失败: %d - %s\n", rc, MQTTAsync_strerror(rc));
} else {
printf("测试topic订阅请求已发送等待服务器确认...\n");
}
// 发送测试消息
printf("发送测试消息到 /test/public/topic\n");
char test_payload[] = "{\"test\":\"from_test_program\",\"timestamp\":\"test\"}";
rc = MQTTAsync_send(client, "/test/public/topic", strlen(test_payload), test_payload, 0, 0, NULL);
if (rc != MQTTASYNC_SUCCESS) {
printf("发送测试消息失败: %d - %s\n", rc, MQTTAsync_strerror(rc));
} else {
printf("测试消息发送成功\n");
}
printf("=== 订阅完成 ===\n");
}
void onConnectFailure(void *context, MQTTAsync_failureData *response) {
printf("MQTT连接失败\n");
if (response) {
printf("错误代码: %d\n", response->code);
if (response->message) {
printf("错误信息: %s\n", response->message);
}
}
}
void onSubscribe(void *context, MQTTAsync_successData *response) {
printf("订阅成功\n");
}
void onSubscribeFailure(void *context, MQTTAsync_failureData *response) {
printf("订阅失败\n");
if (response) {
printf("错误代码: %d\n", response->code);
}
}
void onDisconnect(void *context, MQTTAsync_successData *response) {
printf("MQTT断开连接\n");
}
void connectionLost(void *context, char *cause) {
printf("连接丢失: %s\n", cause ? cause : "未知原因");
}
int main() {
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("=== MQTT测试程序 ===\n");
printf("服务器: %s:%d\n", hostDomain, mqtt_port);
printf("设备ID: %s\n", stationsn);
printf("产品ID: %s\n", productid);
// 动态生成MQTT配置与JD项目保持一致
struct timeval tv;
gettimeofday(&tv, NULL);
long timestamp = tv.tv_sec * 1000 + tv.tv_usec / 1000;
printf("时间戳: %ld\n", timestamp);
// 使用白名单设备ID
char unique_device_id[32];
strcpy(unique_device_id, stationsn);
printf("设备ID: %s\n", unique_device_id);
// 使用AUTH模式
const char* authType = MQTT_AUTH_TYPE_AUTH;
char* secretKey = appSecret; // 使用ProductSecret
printf("认证模式: %s\n", authType);
// 构建ClientId
char clientid[256];
snprintf(clientid, sizeof(clientid), "%s&/%s/%s/%s/%ld/%s",
productid, unique_device_id, MQTT_SECURE_MODE, MQTT_ALGORITHM, timestamp, authType);
printf("ClientId: %s\n", clientid);
// 构建Username
char username[64];
snprintf(username, sizeof(username), "/%s/%s", productid, unique_device_id);
printf("Username: %s\n", username);
// 构建StringToSign
char stringToSign[512];
char timestamp_str[32];
snprintf(timestamp_str, sizeof(timestamp_str), "%ld", timestamp);
snprintf(stringToSign, sizeof(stringToSign), "%s%s%s%s%s",
unique_device_id, unique_device_id, productid, MQTT_ALGORITHM, timestamp_str);
printf("StringToSign: %s\n", stringToSign);
// 生成Password
char password[64];
hmacsha1_hex(secretKey, stringToSign, password, sizeof(password));
printf("Password: %s\n", password);
// 解析服务器地址
char hostip[16];
get_ip_by_domain(hostDomain, hostip, sizeof(hostip));
printf("服务器IP: %s\n", hostip);
// 创建客户端
char url[256];
snprintf(url, sizeof(url), "tcp://%s:%d", hostip, mqtt_port);
rc = MQTTAsync_create(&client, url, clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc != MQTTASYNC_SUCCESS) {
printf("创建客户端失败: %d\n", rc);
return -1;
}
// 设置回调函数
rc = MQTTAsync_setConnected(client, client, onConnect);
if (rc != MQTTASYNC_SUCCESS) {
printf("设置连接回调失败: %d\n", rc);
return -1;
}
rc = MQTTAsync_setDisconnected(client, client, onDisconnect);
if (rc != MQTTASYNC_SUCCESS) {
printf("设置断开回调失败: %d\n", rc);
return -1;
}
rc = MQTTAsync_setMessageArrivedCallback(client, client, messageArrived);
if (rc != MQTTASYNC_SUCCESS) {
printf("设置消息到达回调失败: %d\n", rc);
return -1;
}
rc = MQTTAsync_setConnectionLostCallback(client, client, connectionLost);
if (rc != MQTTASYNC_SUCCESS) {
printf("设置连接丢失回调失败: %d\n", rc);
return -1;
}
// 设置连接选项
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
conn_opts.cleansession = 1;
conn_opts.username = username;
conn_opts.password = password;
conn_opts.keepAliveInterval = 60;
conn_opts.connectTimeout = 30;
conn_opts.MQTTVersion = 4;
printf("正在连接MQTT服务器...\n");
rc = MQTTAsync_connect(client, &conn_opts);
if (rc != MQTTASYNC_SUCCESS) {
printf("启动连接失败: %d\n", rc);
return -1;
}
printf("连接请求已发送,等待连接结果...\n");
// 等待连接完成
int connect_timeout = 10;
while (connect_timeout > 0) {
sleep(1);
connect_timeout--;
printf("等待连接... %d秒\n", connect_timeout);
}
printf("等待消息...\n");
printf("按Ctrl+C退出\n");
// 等待消息
while (1) {
sleep(1);
if (message_arrived_count > 0) {
printf("已收到 %d 条消息\n", message_arrived_count);
}
}
return 0;
}