AP05/mqtt_utils/mqtt_utils.c

497 lines
17 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#define _GNU_SOURCE
#include "MQTTAsync.h"
#include "MQTTClientPersistence.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#if defined(WIN32)
#include <windows.h>
#define sleep Sleep
#else
#include <unistd.h>
#include <sys/time.h>
#include <unistd.h>
#endif
#if defined(_WRS_KERNEL)
#include <OsWrapper.h>
#endif
#include <pthread.h>
#include <sys/prctl.h>
#include <sys/time.h>
#include <time.h>
#include <semaphore.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include "mqtt_utils.h"
#include "json_utils.h"
#include "main.h"
#define PRINT_TIME_TAG
#define DBG_TAG "mqtt_utils"
#define DBG_LVL DBG_INFO
#include "debug_print.h"
#define MQTT_ARRY_ARRY_SIZE(x) (sizeof(x)/sizeof(x[0]))
mqtt_utils_t *mqtt_conf;
mqtt_parm_download_t mqtt_parm_download={0};
int connect_failure_times = 0;
char nativeInvokTopicName[1024] = "";
char nativeUpgradeTopicName[1024] = "";
extern char softwareVersion[16];
typedef struct{
enum MQTTASYNC_TRACE_LEVELS level;
char name[64];
}trace_level_name_t;
trace_level_name_t trace_level_name_list[] = {
{MQTTASYNC_TRACE_MAXIMUM, "MQTTASYNC_TRACE_MAXIMUM"},
{MQTTASYNC_TRACE_MEDIUM, "MQTTASYNC_TRACE_MEDIUM"},
{MQTTASYNC_TRACE_MINIMUM, "MQTTASYNC_TRACE_MINIMUM"},
{MQTTASYNC_TRACE_PROTOCOL, "MQTTASYNC_TRACE_PROTOCOL"},
{MQTTASYNC_TRACE_ERROR, "MQTTASYNC_TRACE_ERROR"},
{MQTTASYNC_TRACE_SEVERE, "MQTTASYNC_TRACE_SEVERE"},
{MQTTASYNC_TRACE_FATAL, "MQTTASYNC_TRACE_FATAL"},
};
void mqtt_server_light_status_report(char *sn,char *msg_id,char *scene,json_object *lights){
char topic[128] = "";
const char *payload = NULL;
char time_buffer[128] = "";
json_object *root = NULL;
json_object *data = NULL;
json_object *msg = NULL;
struct timeval tv;
gettimeofday(&tv, NULL);
snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec);
LOG_I("%s\n",__func__);
root = json_object_new_object();
if(root == NULL){
LOG_I("json_object_new_object error\n");
goto json_error;
}
msg = json_object_new_object();
if(msg == NULL){
LOG_I("msg: NULL\n");
goto json_error;
}
data = json_object_new_object();
if(msg == NULL){
LOG_I("msg: NULL\n");
goto json_error;
}
json_object_object_add(root, "deviceId", json_object_new_string(sn));
json_object_object_add(root, "messageId", json_object_new_string(msg_id));
json_object_object_add(root, "productId", json_object_new_string(mqtt_conf->productcode));
json_object_object_add(root, "timestamp", json_object_new_string(time_buffer));
json_object_object_add(data, "scene", json_object_new_string(scene));
json_object_object_add(data, "lights", lights);
json_object_object_add(msg, "data", data);
json_object_object_add(msg, "msgType", json_object_new_string("9007"));
json_object_object_add(root, "msg", msg);
snprintf(topic, sizeof(topic), "iot/%s/message/report",mqtt_conf->productcode);
//LOG_I("publish topic:[%s]\n", topic);
payload = json_object_to_json_string(root);
//LOG_I("payload[%d][%s]\n", strlen(payload), payload);
mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload));
json_error:
json_object_put(root);
}
void mqtt_server_station_status_report(char *msg_id,char *product_id,char *sn,char *local_ip, char *base_version,
char *status, char *duration){
char topic[128] = "";
const char *payload = NULL;
char time_buffer[128] = "";
json_object *root = NULL;
json_object *msg = NULL;
json_object *data = NULL;
struct timeval tv;
gettimeofday(&tv, NULL);
snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec);
LOG_I("%s\n",__func__);
root = json_object_new_object();
if(root == NULL){
LOG_I("json_object_new_object error\n");
goto json_error;
}
msg = json_object_new_object();
if(msg == NULL){
LOG_I("msg: NULL\n");
goto json_error;
}
data = json_object_new_object();
if(msg == NULL){
LOG_I("msg: NULL\n");
goto json_error;
}
json_object_object_add(root, "deviceId", json_object_new_string(sn));
json_object_object_add(root, "messageId", json_object_new_string(msg_id));
json_object_object_add(root, "productId", json_object_new_string(mqtt_conf->productcode));
json_object_object_add(root, "timestamp", json_object_new_string(time_buffer));
json_object_object_add(data, "ip", json_object_new_string(local_ip));
json_object_object_add(data, "baseVersion", json_object_new_string(base_version));
json_object_object_add(data, "status", json_object_new_string(status));
json_object_object_add(data, "duration", json_object_new_string(duration));
json_object_object_add(msg, "data", data);
json_object_object_add(msg, "msgType", json_object_new_string("9008"));
json_object_object_add(root, "msg", msg);
snprintf(topic, sizeof(topic), "iot/%s/message/report",mqtt_conf->productcode);
//LOG_I("publish topic:[%s]\n", topic);
payload = json_object_to_json_string(root);
//LOG_I("payload[%d][%s]\n", strlen(payload), payload);
mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload));
json_error:
json_object_put(root);
}
void mqtt_service_reply(char *sn,char *msg_id,char *msg,int success,char *product_id)
{
char topic[128] = "";
const char *payload = NULL;
json_object *root = NULL;
LOG_I("%s\n", __func__);
root = json_object_new_object();
if(root == NULL){
LOG_I("json_object_new_object error\n");
goto json_error;
}
json_object_object_add(root, "deviceId", json_object_new_string(sn));
json_object_object_add(root, "messageId", json_object_new_string(msg_id));
json_object_object_add(root, "msg", json_object_new_string(msg));
json_object_object_add(root, "success", json_object_new_boolean(success));
json_object_object_add(root, "productId", json_object_new_string(mqtt_conf->productcode));
snprintf(topic, sizeof(topic), "iot/%s/adviceDevice/reply",mqtt_conf->productcode);
//LOG_I("publish topic:[%s]\n", topic);
payload = json_object_to_json_string(root);
//LOG_I("payload[%d][%s]\n", strlen(payload), payload);
mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload));
json_error:
json_object_put(root);
}
void mqtt_net_failure(int *failure_times){
int fd = open("/sys/class/gpio/gpio113/value", O_WRONLY);
if (fd < 0) {
LOG_I("Failed to open gpio113 value: %s\n", strerror(errno));
} else {
write(fd, "0", 1);
close(fd);
}
if(failure_times != NULL){
(*failure_times)++;
LOG_I("mqtt net failure_times = %d\n", *failure_times);
if(*failure_times >= 5){
system("reboot");
}
}
}
int printVersionInfo(MQTTAsync_nameValue * info){
int rc = 0;
//LOG_I("MQTT library information:\n");
while (info->name){
//LOG_I("%s: %s\n", info->name, info->value);
info++;
rc = 1; /* at least one value printed */
}
if (rc == 1)
//LOG_I("\n");
return rc;
}
void mqtt_utils_trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message){
int i;
int size = 0;
size = MQTT_ARRY_ARRY_SIZE(trace_level_name_list);
for (i=0;i<size;i++) {
if (trace_level_name_list[i].level == level) {
//LOG_I("%s, %s\n", trace_level_name_list[i].name, message);
}
}
}
void mqtt_utils_connection_lost(void *context, char *cause){
//if (cause != NULL) {
// LOG_I("%s cause:%s\n",__func__, cause);
// } else {
// LOG_I("%s\n",__func__);
// }
//mqtt_led_net_status_judge();
mqtt_net_failure(&connect_failure_times);
}
int mqtt_utils_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message){
//LOG_I("%s\n",__func__);
if(strcmp(topicName,nativeInvokTopicName)==0){
PutDataIntoMQueue(message->payload);
}
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void mqtt_utils_delivery_complete(void *context, MQTTAsync_token token){
//LOG_I("%s\n",__func__);
}
static int led_running = 1;
static pthread_t led_thread;
void* led_blink_thread(void* arg) {
prctl(PR_SET_NAME, "led_blink");
int fd = open("/sys/class/gpio/gpio113/value", O_WRONLY);
if (fd < 0) {
LOG_I("Failed to open GPIO\n");
return NULL;
}
while(led_running) {
write(fd, "1", 1);
usleep(500000);
write(fd, "0", 1);
usleep(500000);
}
close(fd);
return NULL;
}
void mqtt_utils_connected(void *context, char *cause){
if (cause != NULL) {
LOG_I("%s cause:%s\n",__func__, cause);
} else {
LOG_I("%s\n",__func__);
}
connect_failure_times=0;
mqtt_utils_subscribe(mqtt_conf,nativeInvokTopicName,2);
//system("echo 1 > /sys/class/gpio/gpio113/value");//yellow ok
led_running = 1;
pthread_create(&led_thread, NULL, led_blink_thread, NULL);
char myid[32]={0};
myrand(myid,19);
updateStationInfo(myid);
}
void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){
//LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc));
//mqtt_led_net_status_judge();
}
void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){
//LOG_I("%s\n",__func__);
}
void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){
//LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none");
// mqtt_led_net_status_judge();
mqtt_net_failure(&connect_failure_times);
}
static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){
MQTTAsync client = (MQTTAsync)context;
LOG_I("%s ssl error: %s\n",__func__, str);
return 0;
}
void mqtt_utils_on_publish_success(void *context, MQTTAsync_successData *response){
//LOG_I("%s\n",__func__);
}
void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
}
void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){
//LOG_I("%s\n",__func__);
//sem_post(&rtu_server_ptr->mqtt_utils.sem_subscribe);
}
void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){
LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code));
}
void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){
LOG_I("%s\n",__func__);
//mqtt_led_net_status_judge();
}
int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){
int rc = 0;
rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils->sub_opts);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("%s return code %s\n",__func__ ,MQTTAsync_strerror(rc));
}
return rc;
}
int mqtt_utils_publish(mqtt_utils_t *mqtt_utils, char *topic, int qos, const char *data, int datalen){
int rc;
//LOG_I("%s data of length %d\n",__func__, datalen);
rc = MQTTAsync_send(mqtt_utils->client, topic, datalen, data, qos, mqtt_utils->retain, &mqtt_utils->pub_opts);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to start publish, return code %s\n", MQTTAsync_strerror(rc));
}
return rc;
}
int mqtt_utils_init(mqtt_utils_t *mqtt_config)
{
mqtt_conf=mqtt_config;
MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
MQTTAsync_responseOptions sub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_subscribe
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_send
MQTTAsync_nameValue *infos = NULL;
char url[256] = "";
int ret = -1;
int rc = 0;
infos = MQTTAsync_getVersionInfo();
printVersionInfo(infos);
MQTTAsync_setTraceCallback(mqtt_utils_trace_callback);
MQTTAsync_setTraceLevel(mqtt_conf->tracelevel);
//snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port);
snprintf(url, sizeof(url), "tcp://%s:%d", mqtt_conf->host, mqtt_conf->port);
LOG_I("mqtt url=%s\n", url);
create_opts.sendWhileDisconnected = 0;
rc = MQTTAsync_createWithOptions(&mqtt_conf->client,
url,
mqtt_conf->clientid,
MQTTCLIENT_PERSISTENCE_NONE,
NULL,
&create_opts);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to create client, return code: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
rc = MQTTAsync_setCallbacks(mqtt_conf->client,
NULL,
mqtt_utils_connection_lost,
mqtt_utils_message_arrived,
mqtt_utils_delivery_complete);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to set callbacks, return code: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to set callback connect, return code: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected);
if (rc != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to set callbacks disconnectcd, return code: %s\n", MQTTAsync_strerror(rc));
ret = -2;
goto error;
}
/* connect option */
conn_opts.onSuccess = mqtt_utils_on_connect_success;
conn_opts.onFailure = mqtt_utils_on_connect_failure;
conn_opts.context = NULL;
/*
不管clean session的值是什么当终端设备离线时QoS=0,1,2的消息一律接收不到。
当clean session的值为1当终端设备离线再上线时离线期间发来QoS=0,1,2的消息一律接收不到。
当clean session的值为0当终端设备离线再上线时离线期间发来QoS=0,1,2的消息仍然可以接收到。如果同个主题发了多条就接收多条一条不差照单全收。
*/
conn_opts.cleansession = 1;
conn_opts.automaticReconnect = 1;
conn_opts.keepAliveInterval = mqtt_conf->keepalive;
conn_opts.username = mqtt_conf->username;
conn_opts.password = mqtt_conf->password;
conn_opts.MQTTVersion = mqtt_conf->MQTTVersion;
if (mqtt_conf->will_topic) /* will options */{
will_opts.message = mqtt_conf->will_payload;
will_opts.topicName = mqtt_conf->will_topic;
will_opts.qos = mqtt_conf->will_qos;
//LOG_I("qos:%d\n",will_opts.qos);
will_opts.retained = mqtt_conf->will_retain;
conn_opts.will = &will_opts;
}
if (mqtt_conf->insecure){
ssl_opts.verify = 0;
ssl_opts.CApath = mqtt_conf->capath;
ssl_opts.keyStore = mqtt_conf->cert;
ssl_opts.trustStore = mqtt_conf->cafile;
ssl_opts.privateKey = mqtt_conf->key;
ssl_opts.privateKeyPassword = mqtt_conf->keypass;
ssl_opts.enabledCipherSuites = mqtt_conf->ciphers;
ssl_opts.ssl_error_cb = mqtt_utils_on_ssl_error;
ssl_opts.ssl_error_context = NULL;
conn_opts.ssl = &ssl_opts;
}
if ((rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts)) != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to start connect, return code %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
mqtt_conf->sub_opts = sub_opts;
mqtt_conf->pub_opts = pub_opts;
mqtt_conf->pub_opts.onSuccess = mqtt_utils_on_publish_success;
mqtt_conf->pub_opts.onFailure = mqtt_utils_on_publish_failure;
mqtt_conf->pub_opts.context = NULL;
mqtt_conf->sub_opts.onSuccess = mqtt_utils_on_subscribe_success;
mqtt_conf->sub_opts.onFailure = mqtt_utils_on_subscribe_failure;
mqtt_conf->sub_opts.context = NULL;
//sprintf(nativeInvokTopicName,"iot/10025/%s/message/adviceDevice",mqtt_conf->clientid);
sprintf(nativeInvokTopicName,"iot/%s/%s/message/adviceDevice",mqtt_conf->productcode,mqtt_conf->username);
LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName);
ret = 0;
error:
return ret;
}
int mqtt_utils_uninit(mqtt_utils_t *mqtt_config){
LOG_I("%s\n",__func__);
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc = 0;
int ret = -1;
disc_opts.onSuccess = mqtt_utils_on_disconnect_success;
if ((rc = MQTTAsync_disconnect(mqtt_config->client, &disc_opts)) != MQTTASYNC_SUCCESS){
LOG_I("mqtt: failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc));
ret = -1;
goto error;
}
MQTTAsync_destroy(&mqtt_config->client);
sem_destroy(&mqtt_config->sem_connect);
//sem_destroy(&mqtt_config->sem_subscribe);
ret = 0;
error:
return ret;
}