AP05/mqtt_utils/mqtt_utils.c

420 lines
15 KiB
C
Raw Normal View History

2025-04-06 06:41:47 +00:00
#define _GNU_SOURCE
#include "MQTTAsync.h"
#include "MQTTClientPersistence.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#if defined(WIN32)
#include <windows.h>
#define sleep Sleep
#else
#include <unistd.h>
#include <sys/time.h>
#include <unistd.h>
#endif
#if defined(_WRS_KERNEL)
#include <OsWrapper.h>
#endif
#include <sys/time.h>
#include <time.h>
#include <semaphore.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include "mqtt_utils.h"
#include "json_utils.h"
#include "main.h"
#define PRINT_TIME_TAG
#define DBG_TAG "mqtt_utils"
#define DBG_LVL DBG_INFO
#include "debug_print.h"
#define MQTT_ARRY_ARRY_SIZE(x) (sizeof(x)/sizeof(x[0]))
mqtt_utils_t *mqtt_conf;
mqtt_parm_download_t mqtt_parm_download={0};
int connect_failure_times = 0;
char nativeInvokTopicName[1024] = "";
char nativeUpgradeTopicName[1024] = "";
extern char softwareVersion[16];
typedef struct{
enum MQTTASYNC_TRACE_LEVELS level;
char name[64];
}trace_level_name_t;
trace_level_name_t trace_level_name_list[] = {
{MQTTASYNC_TRACE_MAXIMUM, "MQTTASYNC_TRACE_MAXIMUM"},
{MQTTASYNC_TRACE_MEDIUM, "MQTTASYNC_TRACE_MEDIUM"},
{MQTTASYNC_TRACE_MINIMUM, "MQTTASYNC_TRACE_MINIMUM"},
{MQTTASYNC_TRACE_PROTOCOL, "MQTTASYNC_TRACE_PROTOCOL"},
{MQTTASYNC_TRACE_ERROR, "MQTTASYNC_TRACE_ERROR"},
{MQTTASYNC_TRACE_SEVERE, "MQTTASYNC_TRACE_SEVERE"},
{MQTTASYNC_TRACE_FATAL, "MQTTASYNC_TRACE_FATAL"},
};
2025-04-20 13:43:28 +00:00
void mqtt_server_events_report(char *sn,char *msg_id,json_object *events){
2025-04-06 06:41:47 +00:00
char topic[128] = "";
const char *payload = NULL;
2025-04-20 13:43:28 +00:00
char time_buffer[16] = "";
2025-04-06 06:41:47 +00:00
json_object *root = NULL;
struct timeval tv;
gettimeofday(&tv, NULL);
snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec);
root = json_object_new_object();
if(root == NULL){
LOG_I("json_object_new_object error\n");
goto json_error;
}
json_object_object_add(root, "deviceId", json_object_new_string(sn));
json_object_object_add(root, "messageId", json_object_new_string(msg_id));
json_object_object_add(root, "timestamp", json_object_new_string(time_buffer));
2025-04-20 13:43:28 +00:00
json_object_object_add(root, "events", events);
2025-04-06 06:41:47 +00:00
2025-04-20 13:43:28 +00:00
snprintf(topic, sizeof(topic), "$iot/v1/device/%s/events/post",sn);
2025-04-06 06:41:47 +00:00
LOG_I("publish topic:[%s]\n", topic);
payload = json_object_to_json_string(root);
LOG_I("payload[%d][%s]\n", strlen(payload), payload);
mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload));
json_error:
json_object_put(root);
}
2025-04-20 13:43:28 +00:00
void mqtt_server_reply(char *sn,char *msg_id,json_object *functions)
{
2025-04-06 06:41:47 +00:00
char topic[128] = "";
const char *payload = NULL;
json_object *root = NULL;
struct timeval tv;
2025-04-20 13:43:28 +00:00
char time_buffer[16] = "";
2025-04-06 06:41:47 +00:00
gettimeofday(&tv, NULL);
snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec);
//LOG_I("%s\n", __func__);
root = json_object_new_object();
if(root == NULL){
LOG_I("json_object_new_object error\n");
goto json_error;
}
json_object_object_add(root, "deviceId", json_object_new_string(sn));
json_object_object_add(root, "messageId", json_object_new_string(msg_id));
json_object_object_add(root, "timestamp", json_object_new_string(time_buffer));
2025-04-20 13:43:28 +00:00
json_object_object_add(root, "code", json_object_new_int(200));
json_object_object_add(root, "message", json_object_new_string("success"));
json_object_object_add(root, "payload", functions);
2025-04-06 06:41:47 +00:00
2025-04-20 13:43:28 +00:00
snprintf(topic, sizeof(topic), "$iot/v1/device/%s/functions/call/response",sn);
2025-04-06 06:41:47 +00:00
LOG_I("publish topic:[%s]\n", topic);
payload = json_object_to_json_string(root);
LOG_I("payload[%d][%s]\n", strlen(payload), payload);
mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload));
json_error:
json_object_put(root);
}
void mqtt_net_failure(int *failure_times){
system("echo 0 > /sys/class/gpio/gpio113/value");
if(failure_times != NULL){
(*failure_times)++;
LOG_I("mqtt net failure_times = %d\n", *failure_times);
if(*failure_times >= 5){
system("reboot");
}
}
}
int printVersionInfo(MQTTAsync_nameValue * info){
int rc = 0;
//LOG_I("MQTT library information:\n");
while (info->name){
2025-04-20 13:43:28 +00:00
LOG_I("%s: %s\n", info->name, info->value);
2025-04-06 06:41:47 +00:00
info++;
rc = 1; /* at least one value printed */
}
if (rc == 1)
//LOG_I("\n");
return rc;
}
void mqtt_utils_trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message){
int i;
int size = 0;
size = MQTT_ARRY_ARRY_SIZE(trace_level_name_list);
for (i=0;i<size;i++) {
if (trace_level_name_list[i].level == level) {
//LOG_I("%s, %s\n", trace_level_name_list[i].name, message);
}
}
}
void mqtt_utils_connection_lost(void *context, char *cause){
//if (cause != NULL) {
// LOG_I("%s cause:%s\n",__func__, cause);
// } else {
// LOG_I("%s\n",__func__);
// }
//mqtt_led_net_status_judge();
mqtt_net_failure(&connect_failure_times);
}
int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message){
//LOG_I("%s\n",__func__);
if(strcmp(topicName,nativeInvokTopicName)==0){
PutDataIntoMQueue(message->payload);
}
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void mqtt_utils_delivery_complete(void *context, MQTTAsync_token token){
//LOG_I("%s\n",__func__);
}
void mqtt_utils_connected(void *context, char *cause){
if (cause != NULL) {
LOG_I("%s cause:%s\n",__func__, cause);
} else {
LOG_I("%s\n",__func__);
}
connect_failure_times=0;
mqtt_utils_subscribe(mqtt_conf,nativeInvokTopicName,2);
system("echo 1 > /sys/class/gpio/gpio113/value");//yellow ok
2025-04-20 13:43:28 +00:00
station_status_report();
2025-04-06 06:41:47 +00:00
}
void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){
//LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc));
//mqtt_led_net_status_judge();
}
void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){
//LOG_I("%s\n",__func__);
}
void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){
//LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none");
// mqtt_led_net_status_judge();
mqtt_net_failure(&connect_failure_times);
}
static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){
MQTTAsync client = (MQTTAsync)context;
LOG_I("%s ssl error: %s\n",__func__, str);
return 0;
}
void mqtt_utils_on_publish_success(void *context, MQTTAsync_successData *response){
//LOG_I("%s\n",__func__);
}
void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
}
void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){
//LOG_I("%s\n",__func__);
//sem_post(&rtu_server_ptr->mqtt_utils.sem_subscribe);
}
void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
}
void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){
LOG_I("%s\n",__func__);
//mqtt_led_net_status_judge();
}
int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){
int rc = 0;
rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils->sub_opts);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("%s return code %s\n",__func__ ,MQTTAsync_strerror(rc));
}
return rc;
}
int mqtt_utils_publish(mqtt_utils_t *mqtt_utils, char *topic, int qos, const char *data, int datalen){
int rc;
//LOG_I("%s data of length %d\n",__func__, datalen);
rc = MQTTAsync_send(mqtt_utils->client, topic, datalen, data, qos, mqtt_utils->retain, &mqtt_utils->pub_opts);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to start publish, return code %s\n", MQTTAsync_strerror(rc));
}
return rc;
}
int mqtt_utils_init(mqtt_utils_t *mqtt_config)
{
mqtt_conf=mqtt_config;
MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
MQTTAsync_responseOptions sub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_subscribe
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_send
MQTTAsync_nameValue *infos = NULL;
char url[256] = "";
int ret = -1;
int rc = 0;
infos = MQTTAsync_getVersionInfo();
printVersionInfo(infos);
MQTTAsync_setTraceCallback(mqtt_utils_trace_callback);
MQTTAsync_setTraceLevel(mqtt_conf->tracelevel);
//snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port);
2025-04-20 13:43:28 +00:00
snprintf(url, sizeof(url), "ssl://%s:%d", mqtt_conf->host, mqtt_conf->port);
2025-04-06 06:41:47 +00:00
LOG_I("mqtt url=%s\n", url);
create_opts.sendWhileDisconnected = 0;
rc = MQTTAsync_createWithOptions(&mqtt_conf->client,
url,
mqtt_conf->clientid,
MQTTCLIENT_PERSISTENCE_NONE,
NULL,
&create_opts);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to create client, return code: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
rc = MQTTAsync_setCallbacks(mqtt_conf->client,
NULL,
mqtt_utils_connection_lost,
mqtt_utils_message_arrived,
mqtt_utils_delivery_complete);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to set callbacks, return code: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to set callback connect, return code: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to set callbacks disconnectcd, return code: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
/* connect option */
conn_opts.onSuccess = mqtt_utils_on_connect_success;
conn_opts.onFailure = mqtt_utils_on_connect_failure;
conn_opts.context = NULL;
/*
clean session的值是什么线QoS=0,1,2
clean session的值为1线线线QoS=0,1,2
clean session的值为0线线线QoS=0,1,2
*/
conn_opts.cleansession = 1;
conn_opts.automaticReconnect = 1;
conn_opts.keepAliveInterval = mqtt_conf->keepalive;
conn_opts.username = mqtt_conf->username;
conn_opts.password = mqtt_conf->password;
conn_opts.MQTTVersion = mqtt_conf->MQTTVersion;
if (mqtt_conf->will_topic) /* will options */{
will_opts.message = mqtt_conf->will_payload;
will_opts.topicName = mqtt_conf->will_topic;
will_opts.qos = mqtt_conf->will_qos;
//LOG_I("qos:%d\n",will_opts.qos);
will_opts.retained = mqtt_conf->will_retain;
conn_opts.will = &will_opts;
}
2025-04-20 13:43:28 +00:00
//ssl_opts.keyStore = clientCrt; //客户端证书路径
//ssl_opts.trustStore = CAFile; //ca证书路径
//ssl_opts.privateKey = clientKey; //客户端密钥路径
//ssl_opts.privateKeyPassword = mqtt_tls_port->clientKeyPasswd; //密钥,如果有客户端密钥路径该项可为空
//ssl_opts.sslVersion = MQTT_SSL_VERSION_DEFAULT;
//conn_opts.ssl = &ssl_opts;
// 证书和密钥路径(根据实际位置修改)
//const char* clientCrt = "/path/to/client.crt"; // 客户端证书
//const char* CAFile = "/path/to/ca.crt"; // CA证书
//const char* clientKey = "/path/to/client.key"; // 客户端私钥
// 配置SSL选项
//MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer;
//ssl_opts.keyStore = clientCrt;
//ssl_opts.trustStore = CAFile;
//ssl_opts.privateKey = clientKey;
//ssl_opts.privateKeyPassword = "key_password"; // 若私钥未加密则设为NULL
//ssl_opts.sslVersion = MQTT_SSL_VERSION_TLS_1_2; // 显式指定TLS 1.2
// 关联到连接选项
//MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
//conn_opts.ssl = &ssl_opts;
2025-04-06 06:41:47 +00:00
if (mqtt_conf->insecure){
2025-04-20 13:43:28 +00:00
LOG_I("use key file\n");
ssl_opts.enableServerCertAuth = 1;
//ssl_opts.CApath = mqtt_conf->capath;
2025-04-06 06:41:47 +00:00
ssl_opts.keyStore = mqtt_conf->cert;
ssl_opts.trustStore = mqtt_conf->cafile;
ssl_opts.privateKey = mqtt_conf->key;
2025-04-20 13:43:28 +00:00
//ssl_opts.privateKeyPassword = mqtt_conf->keypass;
//ssl_opts.enabledCipherSuites = mqtt_conf->ciphers;
2025-04-06 06:41:47 +00:00
ssl_opts.ssl_error_cb = mqtt_utils_on_ssl_error;
ssl_opts.ssl_error_context = NULL;
conn_opts.ssl = &ssl_opts;
}
if ((rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts)) != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to start connect, return code %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
mqtt_conf->sub_opts = sub_opts;
mqtt_conf->pub_opts = pub_opts;
mqtt_conf->pub_opts.onSuccess = mqtt_utils_on_publish_success;
mqtt_conf->pub_opts.onFailure = mqtt_utils_on_publish_failure;
mqtt_conf->pub_opts.context = NULL;
mqtt_conf->sub_opts.onSuccess = mqtt_utils_on_subscribe_success;
mqtt_conf->sub_opts.onFailure = mqtt_utils_on_subscribe_failure;
mqtt_conf->sub_opts.context = NULL;
2025-04-20 13:43:28 +00:00
sprintf(nativeInvokTopicName,"$iot/v1/device/%s/functions/call",mqtt_conf->username);
2025-04-06 06:41:47 +00:00
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
ret = 0;
error:
return ret;
}
int mqtt_utils_uninit(mqtt_utils_t *mqtt_config){
LOG_I("%s\n",__func__);
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc = 0;
int ret = -1;
disc_opts.onSuccess = mqtt_utils_on_disconnect_success;
if ((rc = MQTTAsync_disconnect(mqtt_config->client, &disc_opts)) != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
MQTTAsync_destroy(&mqtt_config->client);
sem_destroy(&mqtt_config->sem_connect);
//sem_destroy(&mqtt_config->sem_subscribe);
ret = 0;
error:
return ret;
}