diff --git a/main.c b/main.c index cdb9cb8..5222739 100644 --- a/main.c +++ b/main.c @@ -1842,7 +1842,7 @@ void mqtt_init(){ mqtt_config.tracelevel=MQTTASYNC_TRACE_PROTOCOL; mqtt_config.retain=0; mqtt_config.port=mqtt_port; - mqtt_config.keepalive=300; + mqtt_config.keepalive=180; memset(mqtt_config.clientid,0,sizeof(mqtt_config.clientid)); memcpy(mqtt_config.clientid, clientid, strlen(clientid)+1); diff --git a/mqtt/MQTTAsync.c b/mqtt/MQTTAsync.c index a5813b4..dfdac17 100644 --- a/mqtt/MQTTAsync.c +++ b/mqtt/MQTTAsync.c @@ -2722,8 +2722,8 @@ static void setRetryLoopInterval(int keepalive) if (proposed < 1) proposed = 1; - else if (proposed > 5) - proposed = 5; + else if (proposed > 60) + proposed = 60; // 提高上限到60秒,适应更长的keepalive if (proposed < retryLoopInterval) retryLoopInterval = proposed; } diff --git a/mqtt/MQTTClient.c b/mqtt/MQTTClient.c index 2069ff9..a058143 100644 --- a/mqtt/MQTTClient.c +++ b/mqtt/MQTTClient.c @@ -1323,8 +1323,8 @@ static void setRetryLoopInterval(int keepalive) if (proposed < 1) proposed = 1; - else if (proposed > 5) - proposed = 5; + else if (proposed > 60) + proposed = 60; // 提高上限到60秒,适应更长的keepalive if (proposed < retryLoopInterval) retryLoopInterval = proposed; } diff --git a/mqtt/MQTTProtocolClient.c b/mqtt/MQTTProtocolClient.c index 051d494..b83ae10 100644 --- a/mqtt/MQTTProtocolClient.c +++ b/mqtt/MQTTProtocolClient.c @@ -633,7 +633,7 @@ void MQTTProtocol_keepalive(time_t now) } else { - //LOG_I("PINGREQ send success\n"); + LOG_I("PINGREQ send success at %ld\n", (long)now); client->net.lastSent = now; client->ping_outstanding = 1; } @@ -641,9 +641,25 @@ void MQTTProtocol_keepalive(time_t now) } else { - Log(TRACE_PROTOCOL, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket); - LOG_I("%s:PINGRESP not received\n",__func__); - MQTTProtocol_closeSession(client, 1); + // 增加容错:允许ping_outstanding为2时再等待一个周期 + if (client->ping_outstanding >= 2) { + Log(TRACE_PROTOCOL, -1, "PINGRESP not received after 2 keepalive intervals for client %s on socket %d, disconnecting", client->clientID, client->net.socket); + LOG_I("%s:PINGRESP not received after 2 attempts\n",__func__); + MQTTProtocol_closeSession(client, 1); + } else { + // 第二次发送PINGREQ + if (Socket_noPendingWrites(client->net.socket)) { + if (MQTTPacket_send_pingreq(&client->net, client->clientID) != TCPSOCKET_COMPLETE) { + Log(TRACE_PROTOCOL, -1, "Error sending second PINGREQ for client %s on socket %d, disconnecting", client->clientID, client->net.socket); + LOG_I("%s:Error sending second PINGREQ for client %s on socket %d, disconnecting\n",__func__,client->clientID, client->net.socket); + MQTTProtocol_closeSession(client, 1); + } else { + LOG_I("Second PINGREQ send success at %ld\n", (long)now); + client->net.lastSent = now; + client->ping_outstanding++; + } + } + } } } } diff --git a/mqtt/MQTTProtocolOut.c b/mqtt/MQTTProtocolOut.c index 16d678a..dee6fb5 100644 --- a/mqtt/MQTTProtocolOut.c +++ b/mqtt/MQTTProtocolOut.c @@ -178,7 +178,7 @@ int MQTTProtocol_handlePingresps(void* pack, int sock) client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content); Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID); client->ping_outstanding = 0; - //LOG_I("PINGRESP receive success\n"); + LOG_I("PINGRESP received successfully, ping_outstanding cleared\n"); FUNC_EXIT_RC(rc); return rc; } diff --git a/mqtt_utils/mqtt_utils.c b/mqtt_utils/mqtt_utils.c index 15e18ba..6f0baee 100644 --- a/mqtt_utils/mqtt_utils.c +++ b/mqtt_utils/mqtt_utils.c @@ -419,6 +419,8 @@ void mqtt_utils_connected(void *context, char *cause){ if (lightbarHeartbeat[i].isOnline) { LOG_I("Report again lightbar %08X login\n", lightbarHeartbeat[i].tagCode); report_lightbar_login(lightbarHeartbeat[i].tagCode); + // 添加短暂延迟,避免MQTT缓冲区溢出 + usleep(100 * 1000); // 100ms延迟 } } pthread_mutex_unlock(&heartbeatMutex);