AP05/mqtt_utils/mqtt_utils.c

464 lines
16 KiB
C
Raw Normal View History

2025-04-06 06:41:47 +00:00
#define _GNU_SOURCE
#include "MQTTAsync.h"
#include "MQTTClientPersistence.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#if defined(WIN32)
#include <windows.h>
#define sleep Sleep
#else
#include <unistd.h>
#include <sys/time.h>
#include <unistd.h>
#endif
#if defined(_WRS_KERNEL)
#include <OsWrapper.h>
#endif
#include <sys/time.h>
#include <time.h>
#include <semaphore.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include "mqtt_utils.h"
#include "json_utils.h"
#include "main.h"
#define PRINT_TIME_TAG
#define DBG_TAG "mqtt_utils"
#define DBG_LVL DBG_INFO
#include "debug_print.h"
#define MQTT_ARRY_ARRY_SIZE(x) (sizeof(x)/sizeof(x[0]))
mqtt_utils_t *mqtt_conf;
mqtt_parm_download_t mqtt_parm_download={0};
int connect_failure_times = 0;
char nativeInvokTopicName[1024] = "";
char nativeUpgradeTopicName[1024] = "";
extern char softwareVersion[16];
typedef struct{
enum MQTTASYNC_TRACE_LEVELS level;
char name[64];
}trace_level_name_t;
trace_level_name_t trace_level_name_list[] = {
{MQTTASYNC_TRACE_MAXIMUM, "MQTTASYNC_TRACE_MAXIMUM"},
{MQTTASYNC_TRACE_MEDIUM, "MQTTASYNC_TRACE_MEDIUM"},
{MQTTASYNC_TRACE_MINIMUM, "MQTTASYNC_TRACE_MINIMUM"},
{MQTTASYNC_TRACE_PROTOCOL, "MQTTASYNC_TRACE_PROTOCOL"},
{MQTTASYNC_TRACE_ERROR, "MQTTASYNC_TRACE_ERROR"},
{MQTTASYNC_TRACE_SEVERE, "MQTTASYNC_TRACE_SEVERE"},
{MQTTASYNC_TRACE_FATAL, "MQTTASYNC_TRACE_FATAL"},
};
void mqtt_server_light_status_report(char *sn,char *msg_id,char *scene,json_object *lights){
char topic[128] = "";
const char *payload = NULL;
char time_buffer[128] = "";
json_object *root = NULL;
json_object *data = NULL;
json_object *msg = NULL;
struct timeval tv;
gettimeofday(&tv, NULL);
snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec);
root = json_object_new_object();
if(root == NULL){
LOG_I("json_object_new_object error\n");
goto json_error;
}
msg = json_object_new_object();
if(msg == NULL){
LOG_I("msg: NULL\n");
goto json_error;
}
data = json_object_new_object();
if(msg == NULL){
LOG_I("msg: NULL\n");
goto json_error;
}
json_object_object_add(root, "deviceId", json_object_new_string(sn));
json_object_object_add(root, "messageId", json_object_new_string(msg_id));
json_object_object_add(root, "productId", json_object_new_string(mqtt_conf->productcode));
json_object_object_add(root, "timestamp", json_object_new_string(time_buffer));
json_object_object_add(data, "scene", json_object_new_string(scene));
json_object_object_add(data, "lights", lights);
json_object_object_add(msg, "data", data);
json_object_object_add(msg, "msgType", json_object_new_string("9007"));
json_object_object_add(root, "msg", msg);
snprintf(topic, sizeof(topic), "iot/%s/message/report",mqtt_conf->productcode);
LOG_I("publish topic:[%s]\n", topic);
payload = json_object_to_json_string(root);
LOG_I("payload[%d][%s]\n", strlen(payload), payload);
mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload));
json_error:
json_object_put(root);
}
void mqtt_server_station_status_report(char *msg_id,char *product_id,char *sn,char *local_ip, char *base_version,
char *status, char *duration){
char topic[128] = "";
const char *payload = NULL;
char time_buffer[128] = "";
json_object *root = NULL;
json_object *msg = NULL;
json_object *data = NULL;
struct timeval tv;
gettimeofday(&tv, NULL);
snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec);
//LOG_I("%s\n", __func__);
root = json_object_new_object();
if(root == NULL){
LOG_I("json_object_new_object error\n");
goto json_error;
}
msg = json_object_new_object();
if(msg == NULL){
LOG_I("msg: NULL\n");
goto json_error;
}
data = json_object_new_object();
if(msg == NULL){
LOG_I("msg: NULL\n");
goto json_error;
}
json_object_object_add(root, "deviceId", json_object_new_string(sn));
json_object_object_add(root, "messageId", json_object_new_string(msg_id));
json_object_object_add(root, "productId", json_object_new_string(mqtt_conf->productcode));
json_object_object_add(root, "timestamp", json_object_new_string(time_buffer));
json_object_object_add(data, "ip", json_object_new_string(local_ip));
json_object_object_add(data, "baseVersion", json_object_new_string(base_version));
json_object_object_add(data, "status", json_object_new_string(status));
json_object_object_add(data, "duration", json_object_new_string(duration));
json_object_object_add(msg, "data", data);
json_object_object_add(msg, "msgType", json_object_new_string("9008"));
json_object_object_add(root, "msg", msg);
snprintf(topic, sizeof(topic), "iot/%s/message/report",mqtt_conf->productcode);
LOG_I("publish topic:[%s]\n", topic);
payload = json_object_to_json_string(root);
LOG_I("payload[%d][%s]\n", strlen(payload), payload);
mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload));
json_error:
json_object_put(root);
}
void mqtt_service_reply(char *sn,char *msg_id,char *msg,int success,char *product_id)
{
char topic[128] = "";
const char *payload = NULL;
json_object *root = NULL;
//LOG_I("%s\n", __func__);
root = json_object_new_object();
if(root == NULL){
LOG_I("json_object_new_object error\n");
goto json_error;
}
json_object_object_add(root, "deviceId", json_object_new_string(sn));
json_object_object_add(root, "messageId", json_object_new_string(msg_id));
json_object_object_add(root, "msg", json_object_new_string(msg));
json_object_object_add(root, "success", json_object_new_boolean(success));
json_object_object_add(root, "productId", json_object_new_string(mqtt_conf->productcode));
snprintf(topic, sizeof(topic), "iot/%s/adviceDevice/reply",mqtt_conf->productcode);
LOG_I("publish topic:[%s]\n", topic);
payload = json_object_to_json_string(root);
LOG_I("payload[%d][%s]\n", strlen(payload), payload);
mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload));
json_error:
json_object_put(root);
}
void mqtt_net_failure(int *failure_times){
system("echo 0 > /sys/class/gpio/gpio113/value");
if(failure_times != NULL){
(*failure_times)++;
LOG_I("mqtt net failure_times = %d\n", *failure_times);
if(*failure_times >= 5){
system("reboot");
}
}
}
int printVersionInfo(MQTTAsync_nameValue * info){
int rc = 0;
//LOG_I("MQTT library information:\n");
while (info->name){
//LOG_I("%s: %s\n", info->name, info->value);
info++;
rc = 1; /* at least one value printed */
}
if (rc == 1)
//LOG_I("\n");
return rc;
}
void mqtt_utils_trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message){
int i;
int size = 0;
size = MQTT_ARRY_ARRY_SIZE(trace_level_name_list);
for (i=0;i<size;i++) {
if (trace_level_name_list[i].level == level) {
//LOG_I("%s, %s\n", trace_level_name_list[i].name, message);
}
}
}
void mqtt_utils_connection_lost(void *context, char *cause){
//if (cause != NULL) {
// LOG_I("%s cause:%s\n",__func__, cause);
// } else {
// LOG_I("%s\n",__func__);
// }
//mqtt_led_net_status_judge();
mqtt_net_failure(&connect_failure_times);
}
int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message){
//LOG_I("%s\n",__func__);
if(strcmp(topicName,nativeInvokTopicName)==0){
PutDataIntoMQueue(message->payload);
}
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void mqtt_utils_delivery_complete(void *context, MQTTAsync_token token){
//LOG_I("%s\n",__func__);
}
void mqtt_utils_connected(void *context, char *cause){
if (cause != NULL) {
LOG_I("%s cause:%s\n",__func__, cause);
} else {
LOG_I("%s\n",__func__);
}
connect_failure_times=0;
mqtt_utils_subscribe(mqtt_conf,nativeInvokTopicName,2);
system("echo 1 > /sys/class/gpio/gpio113/value");//yellow ok
char myid[32]={0};
myrand(myid,19);
updateStationInfo(myid);
}
void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){
//LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc));
//mqtt_led_net_status_judge();
}
void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){
//LOG_I("%s\n",__func__);
}
void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){
//LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none");
// mqtt_led_net_status_judge();
mqtt_net_failure(&connect_failure_times);
}
static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){
MQTTAsync client = (MQTTAsync)context;
LOG_I("%s ssl error: %s\n",__func__, str);
return 0;
}
void mqtt_utils_on_publish_success(void *context, MQTTAsync_successData *response){
//LOG_I("%s\n",__func__);
}
void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
}
void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){
//LOG_I("%s\n",__func__);
//sem_post(&rtu_server_ptr->mqtt_utils.sem_subscribe);
}
void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
}
void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){
LOG_I("%s\n",__func__);
//mqtt_led_net_status_judge();
}
int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){
int rc = 0;
rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils->sub_opts);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("%s return code %s\n",__func__ ,MQTTAsync_strerror(rc));
}
return rc;
}
int mqtt_utils_publish(mqtt_utils_t *mqtt_utils, char *topic, int qos, const char *data, int datalen){
int rc;
//LOG_I("%s data of length %d\n",__func__, datalen);
rc = MQTTAsync_send(mqtt_utils->client, topic, datalen, data, qos, mqtt_utils->retain, &mqtt_utils->pub_opts);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to start publish, return code %s\n", MQTTAsync_strerror(rc));
}
return rc;
}
int mqtt_utils_init(mqtt_utils_t *mqtt_config)
{
mqtt_conf=mqtt_config;
MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
MQTTAsync_responseOptions sub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_subscribe
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_send
MQTTAsync_nameValue *infos = NULL;
char url[256] = "";
int ret = -1;
int rc = 0;
infos = MQTTAsync_getVersionInfo();
printVersionInfo(infos);
MQTTAsync_setTraceCallback(mqtt_utils_trace_callback);
MQTTAsync_setTraceLevel(mqtt_conf->tracelevel);
//snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port);
snprintf(url, sizeof(url), "tcp://%s:%d", mqtt_conf->host, mqtt_conf->port);
LOG_I("mqtt url=%s\n", url);
create_opts.sendWhileDisconnected = 0;
rc = MQTTAsync_createWithOptions(&mqtt_conf->client,
url,
mqtt_conf->clientid,
MQTTCLIENT_PERSISTENCE_NONE,
NULL,
&create_opts);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to create client, return code: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
rc = MQTTAsync_setCallbacks(mqtt_conf->client,
NULL,
mqtt_utils_connection_lost,
mqtt_utils_message_arrived,
mqtt_utils_delivery_complete);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to set callbacks, return code: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to set callback connect, return code: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to set callbacks disconnectcd, return code: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
/* connect option */
conn_opts.onSuccess = mqtt_utils_on_connect_success;
conn_opts.onFailure = mqtt_utils_on_connect_failure;
conn_opts.context = NULL;
/*
clean session的值是什么线QoS=0,1,2
clean session的值为1线线线QoS=0,1,2
clean session的值为0线线线QoS=0,1,2
*/
conn_opts.cleansession = 1;
conn_opts.automaticReconnect = 1;
conn_opts.keepAliveInterval = mqtt_conf->keepalive;
conn_opts.username = mqtt_conf->username;
conn_opts.password = mqtt_conf->password;
conn_opts.MQTTVersion = mqtt_conf->MQTTVersion;
if (mqtt_conf->will_topic) /* will options */{
will_opts.message = mqtt_conf->will_payload;
will_opts.topicName = mqtt_conf->will_topic;
will_opts.qos = mqtt_conf->will_qos;
//LOG_I("qos:%d\n",will_opts.qos);
will_opts.retained = mqtt_conf->will_retain;
conn_opts.will = &will_opts;
}
if (mqtt_conf->insecure){
ssl_opts.verify = 0;
ssl_opts.CApath = mqtt_conf->capath;
ssl_opts.keyStore = mqtt_conf->cert;
ssl_opts.trustStore = mqtt_conf->cafile;
ssl_opts.privateKey = mqtt_conf->key;
ssl_opts.privateKeyPassword = mqtt_conf->keypass;
ssl_opts.enabledCipherSuites = mqtt_conf->ciphers;
ssl_opts.ssl_error_cb = mqtt_utils_on_ssl_error;
ssl_opts.ssl_error_context = NULL;
conn_opts.ssl = &ssl_opts;
}
if ((rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts)) != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to start connect, return code %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
mqtt_conf->sub_opts = sub_opts;
mqtt_conf->pub_opts = pub_opts;
mqtt_conf->pub_opts.onSuccess = mqtt_utils_on_publish_success;
mqtt_conf->pub_opts.onFailure = mqtt_utils_on_publish_failure;
mqtt_conf->pub_opts.context = NULL;
mqtt_conf->sub_opts.onSuccess = mqtt_utils_on_subscribe_success;
mqtt_conf->sub_opts.onFailure = mqtt_utils_on_subscribe_failure;
mqtt_conf->sub_opts.context = NULL;
//sprintf(nativeInvokTopicName,"iot/10025/%s/message/adviceDevice",mqtt_conf->clientid);
sprintf(nativeInvokTopicName,"iot/%s/%s/message/adviceDevice",mqtt_conf->productcode,mqtt_conf->username);
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
ret = 0;
error:
return ret;
}
int mqtt_utils_uninit(mqtt_utils_t *mqtt_config){
LOG_I("%s\n",__func__);
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc = 0;
int ret = -1;
disc_opts.onSuccess = mqtt_utils_on_disconnect_success;
if ((rc = MQTTAsync_disconnect(mqtt_config->client, &disc_opts)) != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
MQTTAsync_destroy(&mqtt_config->client);
sem_destroy(&mqtt_config->sem_connect);
//sem_destroy(&mqtt_config->sem_subscribe);
ret = 0;
error:
return ret;
}