#define _GNU_SOURCE #include "MQTTAsync.h" #include "MQTTClientPersistence.h" #include #include #include #if defined(WIN32) #include #define sleep Sleep #else #include #include #include #endif #if defined(_WRS_KERNEL) #include #endif #include #include #include #include #include #include #include #include #include "mqtt_utils.h" #include "json_utils.h" #include "main.h" #define PRINT_TIME_TAG #define DBG_TAG "mqtt_utils" #define DBG_LVL DBG_INFO #include "debug_print.h" #define MQTT_ARRY_ARRY_SIZE(x) (sizeof(x)/sizeof(x[0])) mqtt_utils_t *mqtt_conf; mqtt_parm_download_t mqtt_parm_download={0}; int connect_failure_times = 0; char nativeInvokTopicName[1024] = ""; char nativeUpgradeTopicName[1024] = ""; extern char softwareVersion[16]; typedef struct{ enum MQTTASYNC_TRACE_LEVELS level; char name[64]; }trace_level_name_t; trace_level_name_t trace_level_name_list[] = { {MQTTASYNC_TRACE_MAXIMUM, "MQTTASYNC_TRACE_MAXIMUM"}, {MQTTASYNC_TRACE_MEDIUM, "MQTTASYNC_TRACE_MEDIUM"}, {MQTTASYNC_TRACE_MINIMUM, "MQTTASYNC_TRACE_MINIMUM"}, {MQTTASYNC_TRACE_PROTOCOL, "MQTTASYNC_TRACE_PROTOCOL"}, {MQTTASYNC_TRACE_ERROR, "MQTTASYNC_TRACE_ERROR"}, {MQTTASYNC_TRACE_SEVERE, "MQTTASYNC_TRACE_SEVERE"}, {MQTTASYNC_TRACE_FATAL, "MQTTASYNC_TRACE_FATAL"}, }; void mqtt_server_light_status_report(char *sn,char *msg_id,char *scene,json_object *lights){ char topic[128] = ""; const char *payload = NULL; char time_buffer[128] = ""; json_object *root = NULL; json_object *data = NULL; json_object *msg = NULL; struct timeval tv; gettimeofday(&tv, NULL); snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec); //LOG_I("%s\n",__func__); // 打印去重后的sn if (json_object_is_type(lights, json_type_array)) { int arr_len = json_object_array_length(lights); for (int i = 0; i < arr_len; ++i) { json_object *item = json_object_array_get_idx(lights, i); if (item && json_object_is_type(item, json_type_object)) { json_object *sn_obj = NULL; if (json_object_object_get_ex(item, "sn", &sn_obj)) { const char *sn_str = json_object_get_string(sn_obj); if (sn_str && strlen(sn_str) == 8) { //uint32_t sn = (uint32_t)strtoul(sn_str, NULL, 16); DEBUG_TX("%s\n", sn_str); } } } } } root = json_object_new_object(); if(root == NULL){ LOG_I("json_object_new_object error\n"); goto json_error; } msg = json_object_new_object(); if(msg == NULL){ LOG_I("msg: NULL\n"); goto json_error; } data = json_object_new_object(); if(msg == NULL){ LOG_I("msg: NULL\n"); goto json_error; } json_object_object_add(root, "deviceId", json_object_new_string(sn)); json_object_object_add(root, "messageId", json_object_new_string(msg_id)); json_object_object_add(root, "productId", json_object_new_string(mqtt_conf->productcode)); json_object_object_add(root, "timestamp", json_object_new_string(time_buffer)); json_object_object_add(data, "scene", json_object_new_string(scene)); json_object_object_add(data, "lights", lights); json_object_object_add(msg, "data", data); json_object_object_add(msg, "msgType", json_object_new_string("9007")); json_object_object_add(root, "msg", msg); snprintf(topic, sizeof(topic), "iot/%s/message/report",mqtt_conf->productcode); //LOG_I("publish topic:[%s]\n", topic); payload = json_object_to_json_string(root); //LOG_I("payload[%d][%s]\n", strlen(payload), payload); mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload)); json_error: json_object_put(root); } void mqtt_server_light_status_report_2(char *sn,char *msg_id,char *scene,json_object *lights){ char topic[128] = ""; const char *payload = NULL; char time_buffer[128] = ""; json_object *root = NULL; json_object *data = NULL; json_object *msg = NULL; struct timeval tv; gettimeofday(&tv, NULL); snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec); LOG_I("%s\n",__func__); root = json_object_new_object(); if(root == NULL){ LOG_I("json_object_new_object error\n"); goto json_error; } msg = json_object_new_object(); if(msg == NULL){ LOG_I("msg: NULL\n"); goto json_error; } data = json_object_new_object(); if(msg == NULL){ LOG_I("msg: NULL\n"); goto json_error; } json_object_object_add(root, "deviceId", json_object_new_string(sn)); json_object_object_add(root, "messageId", json_object_new_string(msg_id)); json_object_object_add(root, "productId", json_object_new_string(mqtt_conf->productcode)); json_object_object_add(root, "timestamp", json_object_new_string(time_buffer)); json_object_object_add(data, "scene", json_object_new_string(scene)); json_object_object_add(data, "lights", lights); json_object_object_add(msg, "data", data); json_object_object_add(msg, "msgType", json_object_new_string("9007")); json_object_object_add(root, "msg", msg); snprintf(topic, sizeof(topic), "iot/%s/%s/message/report",mqtt_conf->productcode,mqtt_conf->username); //LOG_I("publish topic:[%s]\n", topic); payload = json_object_to_json_string(root); //LOG_I("payload[%d][%s]\n", strlen(payload), payload); mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload)); json_error: json_object_put(root); } void mqtt_server_station_status_report(char *msg_id,char *product_id,char *sn,char *local_ip, char *base_version, char *status, char *duration){ char topic[128] = ""; const char *payload = NULL; char time_buffer[128] = ""; json_object *root = NULL; json_object *msg = NULL; json_object *data = NULL; struct timeval tv; gettimeofday(&tv, NULL); snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec); LOG_I("%s\n",__func__); root = json_object_new_object(); if(root == NULL){ LOG_I("json_object_new_object error\n"); goto json_error; } msg = json_object_new_object(); if(msg == NULL){ LOG_I("msg: NULL\n"); goto json_error; } data = json_object_new_object(); if(msg == NULL){ LOG_I("msg: NULL\n"); goto json_error; } json_object_object_add(root, "deviceId", json_object_new_string(sn)); json_object_object_add(root, "messageId", json_object_new_string(msg_id)); json_object_object_add(root, "productId", json_object_new_string(mqtt_conf->productcode)); json_object_object_add(root, "timestamp", json_object_new_string(time_buffer)); json_object_object_add(data, "ip", json_object_new_string(local_ip)); json_object_object_add(data, "baseVersion", json_object_new_string(base_version)); json_object_object_add(data, "status", json_object_new_string(status)); json_object_object_add(data, "duration", json_object_new_string(duration)); json_object_object_add(msg, "data", data); json_object_object_add(msg, "msgType", json_object_new_string("9008")); json_object_object_add(root, "msg", msg); snprintf(topic, sizeof(topic), "iot/%s/message/report",mqtt_conf->productcode); //LOG_I("publish topic:[%s]\n", topic); payload = json_object_to_json_string(root); //LOG_I("payload[%d][%s]\n", strlen(payload), payload); mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload)); json_error: json_object_put(root); } void mqtt_server_station_status_report_test(char *msg_id,char *product_id,char *sn,char *local_ip, char *base_version, char *status, char *duration){ char topic[128] = ""; const char *payload = NULL; char time_buffer[128] = ""; json_object *root = NULL; json_object *msg = NULL; json_object *data = NULL; struct timeval tv; gettimeofday(&tv, NULL); snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec); LOG_I("%s\n",__func__); root = json_object_new_object(); if(root == NULL){ LOG_I("json_object_new_object error\n"); goto json_error; } msg = json_object_new_object(); if(msg == NULL){ LOG_I("msg: NULL\n"); goto json_error; } data = json_object_new_object(); if(msg == NULL){ LOG_I("msg: NULL\n"); goto json_error; } json_object_object_add(root, "deviceId", json_object_new_string(sn)); json_object_object_add(root, "messageId", json_object_new_string(msg_id)); json_object_object_add(root, "productId", json_object_new_string(mqtt_conf->productcode)); json_object_object_add(root, "timestamp", json_object_new_string(time_buffer)); json_object_object_add(data, "ip", json_object_new_string(local_ip)); json_object_object_add(data, "baseVersion", json_object_new_string(base_version)); json_object_object_add(data, "status", json_object_new_string(status)); json_object_object_add(data, "duration", json_object_new_string(duration)); json_object_object_add(msg, "data", data); json_object_object_add(msg, "msgType", json_object_new_string("9008")); json_object_object_add(root, "msg", msg); snprintf(topic, sizeof(topic), "iot/%s/%s/station/report", mqtt_conf->productcode, mqtt_conf->username); //LOG_I("publish topic:[%s]\n", topic); payload = json_object_to_json_string(root); //LOG_I("payload[%d][%s]\n", strlen(payload), payload); mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload)); json_error: json_object_put(root); } void mqtt_service_reply(char *sn,char *msg_id,char *msg,int success,char *product_id) { char topic[128] = ""; const char *payload = NULL; json_object *root = NULL; LOG_I("%s\n", __func__); root = json_object_new_object(); if(root == NULL){ LOG_I("json_object_new_object error\n"); goto json_error; } json_object_object_add(root, "deviceId", json_object_new_string(sn)); json_object_object_add(root, "messageId", json_object_new_string(msg_id)); json_object_object_add(root, "msg", json_object_new_string(msg)); json_object_object_add(root, "success", json_object_new_boolean(success)); json_object_object_add(root, "productId", json_object_new_string(mqtt_conf->productcode)); snprintf(topic, sizeof(topic), "iot/%s/adviceDevice/reply",mqtt_conf->productcode); //LOG_I("publish topic:[%s]\n", topic); payload = json_object_to_json_string(root); //LOG_I("payload[%d][%s]\n", strlen(payload), payload); mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload)); json_error: json_object_put(root); } void mqtt_resource_reply(char *sn,char *msg_id,char *msg,int success,char *product_id, char *resource) { char topic[128] = ""; const char *payload = NULL; json_object *root = NULL; LOG_I("%s\n", __func__); root = json_object_new_object(); if(root == NULL){ LOG_I("json_object_new_object error\n"); goto json_error; } json_object_object_add(root, "deviceId", json_object_new_string(sn)); json_object_object_add(root, "messageId", json_object_new_string(msg_id)); json_object_object_add(root, "msg", json_object_new_string(msg)); json_object_object_add(root, "success", json_object_new_boolean(success)); json_object_object_add(root, "productId", json_object_new_string(mqtt_conf->productcode)); json_object_object_add(root, "resource", json_object_new_string(resource)); snprintf(topic, sizeof(topic), "iot/%s/%s/resource/report", mqtt_config.productcode, mqtt_config.username); //LOG_I("publish topic:[%s]\n", topic); payload = json_object_to_json_string(root); //LOG_I("payload[%d][%s]\n", strlen(payload), payload); mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload)); json_error: json_object_put(root); } void mqtt_net_failure(int *failure_times){ int fd = open("/sys/class/gpio/gpio113/value", O_WRONLY); if (fd < 0) { LOG_I("Failed to open gpio113 value: %s\n", strerror(errno)); } else { write(fd, "0", 1); close(fd); } if(failure_times != NULL){ (*failure_times)++; LOG_I("mqtt net failure_times = %d\n", *failure_times); if(*failure_times >= 5){ system("sync"); system("reboot"); } } } int printVersionInfo(MQTTAsync_nameValue * info){ int rc = 0; //LOG_I("MQTT library information:\n"); while (info->name){ //LOG_I("%s: %s\n", info->name, info->value); info++; rc = 1; /* at least one value printed */ } if (rc == 1) //LOG_I("\n"); return rc; } void mqtt_utils_trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message){ int i; int size = 0; size = MQTT_ARRY_ARRY_SIZE(trace_level_name_list); for (i=0;ipayload); } MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; } void mqtt_utils_delivery_complete(void *context, MQTTAsync_token token){ //LOG_I("%s\n",__func__); } static int led_running = 1; static pthread_t led_thread; void* led_blink_thread(void* arg) { prctl(PR_SET_NAME, "led_blink"); int fd = open("/sys/class/gpio/gpio113/value", O_WRONLY); if (fd < 0) { LOG_I("Failed to open GPIO\n"); return NULL; } while(led_running) { write(fd, "1", 1); usleep(500000); write(fd, "0", 1); usleep(500000); } close(fd); return NULL; } void mqtt_utils_connected(void *context, char *cause){ if (cause != NULL) { LOG_I("%s cause:%s\n",__func__, cause); } else { LOG_I("%s\n",__func__); } connect_failure_times=0; mqtt_utils_subscribe(mqtt_conf,nativeInvokTopicName,2); //system("echo 1 > /sys/class/gpio/gpio113/value");//yellow ok led_running = 1; pthread_create(&led_thread, NULL, led_blink_thread, NULL); char myid[32]={0}; myrand(myid,19); updateStationInfo(myid); } void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){ //LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc)); //mqtt_led_net_status_judge(); } void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){ //LOG_I("%s\n",__func__); } void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){ //LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none"); // mqtt_led_net_status_judge(); mqtt_net_failure(&connect_failure_times); } static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){ MQTTAsync client = (MQTTAsync)context; LOG_I("%s ssl error: %s\n",__func__, str); return 0; } void mqtt_utils_on_publish_success(void *context, MQTTAsync_successData *response){ //LOG_I("%s\n",__func__); } void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){ LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code)); } void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){ //LOG_I("%s\n",__func__); //sem_post(&rtu_server_ptr->mqtt_utils.sem_subscribe); } void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){ LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code)); } void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){ LOG_I("%s\n",__func__); //mqtt_led_net_status_judge(); } int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){ int rc = 0; rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils->sub_opts); if (rc != MQTTASYNC_SUCCESS){ LOG_I("%s return code %s\n",__func__ ,MQTTAsync_strerror(rc)); } return rc; } int mqtt_utils_publish(mqtt_utils_t *mqtt_utils, char *topic, int qos, const char *data, int datalen){ int rc; //LOG_I("%s data of length %d\n",__func__, datalen); rc = MQTTAsync_send(mqtt_utils->client, topic, datalen, data, qos, mqtt_utils->retain, &mqtt_utils->pub_opts); if (rc != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to start publish, return code %s\n", MQTTAsync_strerror(rc)); } return rc; } int mqtt_utils_init(mqtt_utils_t *mqtt_config) { mqtt_conf=mqtt_config; MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer; MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; MQTTAsync_responseOptions sub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_subscribe MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_send MQTTAsync_nameValue *infos = NULL; char url[256] = ""; int ret = -1; int rc = 0; infos = MQTTAsync_getVersionInfo(); printVersionInfo(infos); MQTTAsync_setTraceCallback(mqtt_utils_trace_callback); MQTTAsync_setTraceLevel(mqtt_conf->tracelevel); //snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port); snprintf(url, sizeof(url), "tcp://%s:%d", mqtt_conf->host, mqtt_conf->port); LOG_I("mqtt url=%s\n", url); create_opts.sendWhileDisconnected = 0; rc = MQTTAsync_createWithOptions(&mqtt_conf->client, url, mqtt_conf->clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts); if (rc != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to create client, return code: %s\n", MQTTAsync_strerror(rc)); ret = -1; goto error; } rc = MQTTAsync_setCallbacks(mqtt_conf->client, NULL, mqtt_utils_connection_lost, mqtt_utils_message_arrived, mqtt_utils_delivery_complete); if (rc != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to set callbacks, return code: %s\n", MQTTAsync_strerror(rc)); ret = -2; goto error; } rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected); if (rc != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to set callback connect, return code: %s\n", MQTTAsync_strerror(rc)); ret = -2; goto error; } rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected); if (rc != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to set callbacks disconnectcd, return code: %s\n", MQTTAsync_strerror(rc)); ret = -2; goto error; } /* connect option */ conn_opts.onSuccess = mqtt_utils_on_connect_success; conn_opts.onFailure = mqtt_utils_on_connect_failure; conn_opts.context = NULL; /* 不管clean session的值是什么,当终端设备离线时,QoS=0,1,2的消息一律接收不到。 当clean session的值为1,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息一律接收不到。 当clean session的值为0,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息仍然可以接收到。如果同个主题发了多条就接收多条,一条不差,照单全收。 */ conn_opts.cleansession = 1; conn_opts.automaticReconnect = 1; conn_opts.keepAliveInterval = mqtt_conf->keepalive; conn_opts.username = mqtt_conf->username; conn_opts.password = mqtt_conf->password; conn_opts.MQTTVersion = mqtt_conf->MQTTVersion; if (mqtt_conf->will_topic) /* will options */{ will_opts.message = mqtt_conf->will_payload; will_opts.topicName = mqtt_conf->will_topic; will_opts.qos = mqtt_conf->will_qos; //LOG_I("qos:%d\n",will_opts.qos); will_opts.retained = mqtt_conf->will_retain; conn_opts.will = &will_opts; } if (mqtt_conf->insecure){ ssl_opts.verify = 0; ssl_opts.CApath = mqtt_conf->capath; ssl_opts.keyStore = mqtt_conf->cert; ssl_opts.trustStore = mqtt_conf->cafile; ssl_opts.privateKey = mqtt_conf->key; ssl_opts.privateKeyPassword = mqtt_conf->keypass; ssl_opts.enabledCipherSuites = mqtt_conf->ciphers; ssl_opts.ssl_error_cb = mqtt_utils_on_ssl_error; ssl_opts.ssl_error_context = NULL; conn_opts.ssl = &ssl_opts; } if ((rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts)) != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to start connect, return code %s\n", MQTTAsync_strerror(rc)); ret = -1; goto error; } mqtt_conf->sub_opts = sub_opts; mqtt_conf->pub_opts = pub_opts; mqtt_conf->pub_opts.onSuccess = mqtt_utils_on_publish_success; mqtt_conf->pub_opts.onFailure = mqtt_utils_on_publish_failure; mqtt_conf->pub_opts.context = NULL; mqtt_conf->sub_opts.onSuccess = mqtt_utils_on_subscribe_success; mqtt_conf->sub_opts.onFailure = mqtt_utils_on_subscribe_failure; mqtt_conf->sub_opts.context = NULL; //sprintf(nativeInvokTopicName,"iot/10025/%s/message/adviceDevice",mqtt_conf->clientid); sprintf(nativeInvokTopicName,"iot/%s/%s/message/adviceDevice",mqtt_conf->productcode,mqtt_conf->username); LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName); ret = 0; error: return ret; } int mqtt_utils_uninit(mqtt_utils_t *mqtt_config){ LOG_I("%s\n",__func__); MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; int rc = 0; int ret = -1; disc_opts.onSuccess = mqtt_utils_on_disconnect_success; if ((rc = MQTTAsync_disconnect(mqtt_config->client, &disc_opts)) != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc)); ret = -1; goto error; } MQTTAsync_destroy(&mqtt_config->client); sem_destroy(&mqtt_config->sem_connect); //sem_destroy(&mqtt_config->sem_subscribe); ret = 0; error: return ret; }