#define _GNU_SOURCE #include "MQTTAsync.h" #include "MQTTClientPersistence.h" #include #include #include #if defined(WIN32) #include #define sleep Sleep #else #include #include #include #endif #if defined(_WRS_KERNEL) #include #endif #include #include #include #include #include #include #include "mqtt_utils.h" #include "json_utils.h" #include "main.h" #define PRINT_TIME_TAG #define DBG_TAG "mqtt_utils" #define DBG_LVL DBG_INFO #include "debug_print.h" #define MQTT_ARRY_ARRY_SIZE(x) (sizeof(x)/sizeof(x[0])) mqtt_utils_t *mqtt_conf; mqtt_parm_download_t mqtt_parm_download={0}; int connect_failure_times = 0; char nativeInvokTopicName[1024] = ""; char nativeUpgradeTopicName[1024] = ""; extern char softwareVersion[16]; typedef struct{ enum MQTTASYNC_TRACE_LEVELS level; char name[64]; }trace_level_name_t; trace_level_name_t trace_level_name_list[] = { {MQTTASYNC_TRACE_MAXIMUM, "MQTTASYNC_TRACE_MAXIMUM"}, {MQTTASYNC_TRACE_MEDIUM, "MQTTASYNC_TRACE_MEDIUM"}, {MQTTASYNC_TRACE_MINIMUM, "MQTTASYNC_TRACE_MINIMUM"}, {MQTTASYNC_TRACE_PROTOCOL, "MQTTASYNC_TRACE_PROTOCOL"}, {MQTTASYNC_TRACE_ERROR, "MQTTASYNC_TRACE_ERROR"}, {MQTTASYNC_TRACE_SEVERE, "MQTTASYNC_TRACE_SEVERE"}, {MQTTASYNC_TRACE_FATAL, "MQTTASYNC_TRACE_FATAL"}, }; void mqtt_server_events_report(char *sn,char *msg_id,json_object *events){ char topic[128] = ""; const char *payload = NULL; char time_buffer[16] = ""; json_object *root = NULL; struct timeval tv; gettimeofday(&tv, NULL); snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec); root = json_object_new_object(); if(root == NULL){ LOG_I("json_object_new_object error\n"); goto json_error; } json_object_object_add(root, "deviceId", json_object_new_string(sn)); json_object_object_add(root, "messageId", json_object_new_string(msg_id)); json_object_object_add(root, "timestamp", json_object_new_string(time_buffer)); json_object_object_add(root, "events", events); snprintf(topic, sizeof(topic), "$iot/v1/device/%s/events/post",sn); LOG_I("publish topic:[%s]\n", topic); payload = json_object_to_json_string(root); LOG_I("payload[%d][%s]\n", strlen(payload), payload); mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload)); json_error: json_object_put(root); } void mqtt_server_reply(char *sn,char *msg_id,json_object *functions) { char topic[128] = ""; const char *payload = NULL; json_object *root = NULL; struct timeval tv; char time_buffer[16] = ""; gettimeofday(&tv, NULL); snprintf(time_buffer,sizeof(time_buffer),"%ld",tv.tv_sec*1000+tv.tv_usec); //LOG_I("%s\n", __func__); root = json_object_new_object(); if(root == NULL){ LOG_I("json_object_new_object error\n"); goto json_error; } json_object_object_add(root, "deviceId", json_object_new_string(sn)); json_object_object_add(root, "messageId", json_object_new_string(msg_id)); json_object_object_add(root, "timestamp", json_object_new_string(time_buffer)); json_object_object_add(root, "code", json_object_new_int(200)); json_object_object_add(root, "message", json_object_new_string("success")); json_object_object_add(root, "payload", functions); snprintf(topic, sizeof(topic), "$iot/v1/device/%s/functions/call/response",sn); LOG_I("publish topic:[%s]\n", topic); payload = json_object_to_json_string(root); LOG_I("payload[%d][%s]\n", strlen(payload), payload); mqtt_utils_publish(mqtt_conf, topic, 2, payload, strlen(payload)); json_error: json_object_put(root); } void mqtt_net_failure(int *failure_times){ system("echo 0 > /sys/class/gpio/gpio113/value"); if(failure_times != NULL){ (*failure_times)++; LOG_I("mqtt net failure_times = %d\n", *failure_times); if(*failure_times >= 5){ system("reboot"); } } } int printVersionInfo(MQTTAsync_nameValue * info){ int rc = 0; //LOG_I("MQTT library information:\n"); while (info->name){ LOG_I("%s: %s\n", info->name, info->value); info++; rc = 1; /* at least one value printed */ } if (rc == 1) //LOG_I("\n"); return rc; } void mqtt_utils_trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message){ int i; int size = 0; size = MQTT_ARRY_ARRY_SIZE(trace_level_name_list); for (i=0;ipayload); } MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; } void mqtt_utils_delivery_complete(void *context, MQTTAsync_token token){ //LOG_I("%s\n",__func__); } void mqtt_utils_connected(void *context, char *cause){ if (cause != NULL) { LOG_I("%s cause:%s\n",__func__, cause); } else { LOG_I("%s\n",__func__); } connect_failure_times=0; mqtt_utils_subscribe(mqtt_conf,nativeInvokTopicName,2); system("echo 1 > /sys/class/gpio/gpio113/value");//yellow ok station_status_report(); } void mqtt_utils_disconnected(void *context, MQTTProperties* props, enum MQTTReasonCodes rc){ //LOG_I("%s reason code %s",__func__ ,MQTTReasonCode_toString(rc)); //mqtt_led_net_status_judge(); } void mqtt_utils_on_connect_success(void *context, MQTTAsync_successData *response){ //LOG_I("%s\n",__func__); } void mqtt_utils_on_connect_failure(void *context, MQTTAsync_failureData *response){ //LOG_I("%s connect failed, rc %s\n", __func__, response ? MQTTAsync_strerror(response->code) : "none"); // mqtt_led_net_status_judge(); mqtt_net_failure(&connect_failure_times); } static int mqtt_utils_on_ssl_error(const char *str, size_t len, void *context){ MQTTAsync client = (MQTTAsync)context; LOG_I("%s ssl error: %s\n",__func__, str); return 0; } void mqtt_utils_on_publish_success(void *context, MQTTAsync_successData *response){ //LOG_I("%s\n",__func__); } void mqtt_utils_on_publish_failure(void *context, MQTTAsync_failureData *response){ LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code)); } void mqtt_utils_on_subscribe_success(void *context, MQTTAsync_successData *response){ //LOG_I("%s\n",__func__); //sem_post(&rtu_server_ptr->mqtt_utils.sem_subscribe); } void mqtt_utils_on_subscribe_failure(void *context, MQTTAsync_failureData *response){ LOG_I("%s rc %s\n",__func__, MQTTAsync_strerror(response->code)); } void mqtt_utils_on_disconnect_success(void *context, MQTTAsync_successData *response){ LOG_I("%s\n",__func__); //mqtt_led_net_status_judge(); } int mqtt_utils_subscribe(mqtt_utils_t *mqtt_utils, char *topic, int qos){ int rc = 0; rc = MQTTAsync_subscribe(mqtt_utils->client, topic, qos, &mqtt_utils->sub_opts); if (rc != MQTTASYNC_SUCCESS){ LOG_I("%s return code %s\n",__func__ ,MQTTAsync_strerror(rc)); } return rc; } int mqtt_utils_publish(mqtt_utils_t *mqtt_utils, char *topic, int qos, const char *data, int datalen){ int rc; //LOG_I("%s data of length %d\n",__func__, datalen); rc = MQTTAsync_send(mqtt_utils->client, topic, datalen, data, qos, mqtt_utils->retain, &mqtt_utils->pub_opts); if (rc != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to start publish, return code %s\n", MQTTAsync_strerror(rc)); } return rc; } int mqtt_utils_init(mqtt_utils_t *mqtt_config) { mqtt_conf=mqtt_config; MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer; MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; MQTTAsync_responseOptions sub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_subscribe MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer; //used by MQTTAsync_send MQTTAsync_nameValue *infos = NULL; char url[256] = ""; int ret = -1; int rc = 0; infos = MQTTAsync_getVersionInfo(); printVersionInfo(infos); MQTTAsync_setTraceCallback(mqtt_utils_trace_callback); MQTTAsync_setTraceLevel(mqtt_conf->tracelevel); //snprintf(url, sizeof(url), "%s://%s:%d", mqtt_conf->protocol, mqtt_conf->host, mqtt_conf->port); snprintf(url, sizeof(url), "ssl://%s:%d", mqtt_conf->host, mqtt_conf->port); LOG_I("mqtt url=%s\n", url); create_opts.sendWhileDisconnected = 0; rc = MQTTAsync_createWithOptions(&mqtt_conf->client, url, mqtt_conf->clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts); if (rc != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to create client, return code: %s\n", MQTTAsync_strerror(rc)); ret = -1; goto error; } rc = MQTTAsync_setCallbacks(mqtt_conf->client, NULL, mqtt_utils_connection_lost, mqtt_utils_message_arrived, mqtt_utils_delivery_complete); if (rc != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to set callbacks, return code: %s\n", MQTTAsync_strerror(rc)); ret = -2; goto error; } rc = MQTTAsync_setConnected(mqtt_conf->client, NULL, mqtt_utils_connected); if (rc != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to set callback connect, return code: %s\n", MQTTAsync_strerror(rc)); ret = -2; goto error; } rc = MQTTAsync_setDisconnected(mqtt_conf->client, NULL, mqtt_utils_disconnected); if (rc != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to set callbacks disconnectcd, return code: %s\n", MQTTAsync_strerror(rc)); ret = -2; goto error; } /* connect option */ conn_opts.onSuccess = mqtt_utils_on_connect_success; conn_opts.onFailure = mqtt_utils_on_connect_failure; conn_opts.context = NULL; /* 不管clean session的值是什么,当终端设备离线时,QoS=0,1,2的消息一律接收不到。 当clean session的值为1,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息一律接收不到。 当clean session的值为0,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息仍然可以接收到。如果同个主题发了多条就接收多条,一条不差,照单全收。 */ conn_opts.cleansession = 1; conn_opts.automaticReconnect = 1; conn_opts.keepAliveInterval = mqtt_conf->keepalive; conn_opts.username = mqtt_conf->username; conn_opts.password = mqtt_conf->password; conn_opts.MQTTVersion = mqtt_conf->MQTTVersion; if (mqtt_conf->will_topic) /* will options */{ will_opts.message = mqtt_conf->will_payload; will_opts.topicName = mqtt_conf->will_topic; will_opts.qos = mqtt_conf->will_qos; //LOG_I("qos:%d\n",will_opts.qos); will_opts.retained = mqtt_conf->will_retain; conn_opts.will = &will_opts; } //ssl_opts.keyStore = clientCrt; //客户端证书路径 //ssl_opts.trustStore = CAFile; //ca证书路径 //ssl_opts.privateKey = clientKey; //客户端密钥路径 //ssl_opts.privateKeyPassword = mqtt_tls_port->clientKeyPasswd; //密钥,如果有客户端密钥路径该项可为空 //ssl_opts.sslVersion = MQTT_SSL_VERSION_DEFAULT; //conn_opts.ssl = &ssl_opts; // 证书和密钥路径(根据实际位置修改) //const char* clientCrt = "/path/to/client.crt"; // 客户端证书 //const char* CAFile = "/path/to/ca.crt"; // CA证书 //const char* clientKey = "/path/to/client.key"; // 客户端私钥 // 配置SSL选项 //MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer; //ssl_opts.keyStore = clientCrt; //ssl_opts.trustStore = CAFile; //ssl_opts.privateKey = clientKey; //ssl_opts.privateKeyPassword = "key_password"; // 若私钥未加密则设为NULL //ssl_opts.sslVersion = MQTT_SSL_VERSION_TLS_1_2; // 显式指定TLS 1.2 // 关联到连接选项 //MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; //conn_opts.ssl = &ssl_opts; if (mqtt_conf->insecure){ LOG_I("use key file\n"); ssl_opts.enableServerCertAuth = 1; //ssl_opts.CApath = mqtt_conf->capath; ssl_opts.keyStore = mqtt_conf->cert; ssl_opts.trustStore = mqtt_conf->cafile; ssl_opts.privateKey = mqtt_conf->key; //ssl_opts.privateKeyPassword = mqtt_conf->keypass; //ssl_opts.enabledCipherSuites = mqtt_conf->ciphers; ssl_opts.ssl_error_cb = mqtt_utils_on_ssl_error; ssl_opts.ssl_error_context = NULL; conn_opts.ssl = &ssl_opts; } if ((rc = MQTTAsync_connect(mqtt_conf->client, &conn_opts)) != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to start connect, return code %s\n", MQTTAsync_strerror(rc)); ret = -1; goto error; } mqtt_conf->sub_opts = sub_opts; mqtt_conf->pub_opts = pub_opts; mqtt_conf->pub_opts.onSuccess = mqtt_utils_on_publish_success; mqtt_conf->pub_opts.onFailure = mqtt_utils_on_publish_failure; mqtt_conf->pub_opts.context = NULL; mqtt_conf->sub_opts.onSuccess = mqtt_utils_on_subscribe_success; mqtt_conf->sub_opts.onFailure = mqtt_utils_on_subscribe_failure; mqtt_conf->sub_opts.context = NULL; sprintf(nativeInvokTopicName,"$iot/v1/device/%s/functions/call",mqtt_conf->username); LOG_I("nativeInvokTopicName:%s\n",nativeInvokTopicName); ret = 0; error: return ret; } int mqtt_utils_uninit(mqtt_utils_t *mqtt_config){ LOG_I("%s\n",__func__); MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; int rc = 0; int ret = -1; disc_opts.onSuccess = mqtt_utils_on_disconnect_success; if ((rc = MQTTAsync_disconnect(mqtt_config->client, &disc_opts)) != MQTTASYNC_SUCCESS){ LOG_I("mqtt: failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc)); ret = -1; goto error; } MQTTAsync_destroy(&mqtt_config->client); sem_destroy(&mqtt_config->sem_connect); //sem_destroy(&mqtt_config->sem_subscribe); ret = 0; error: return ret; }