减少mqtt断联
This commit is contained in:
parent
f910a4d701
commit
fc95e3561a
2
main.c
2
main.c
@ -1842,7 +1842,7 @@ void mqtt_init(){
|
|||||||
mqtt_config.tracelevel=MQTTASYNC_TRACE_PROTOCOL;
|
mqtt_config.tracelevel=MQTTASYNC_TRACE_PROTOCOL;
|
||||||
mqtt_config.retain=0;
|
mqtt_config.retain=0;
|
||||||
mqtt_config.port=mqtt_port;
|
mqtt_config.port=mqtt_port;
|
||||||
mqtt_config.keepalive=300;
|
mqtt_config.keepalive=180;
|
||||||
|
|
||||||
memset(mqtt_config.clientid,0,sizeof(mqtt_config.clientid));
|
memset(mqtt_config.clientid,0,sizeof(mqtt_config.clientid));
|
||||||
memcpy(mqtt_config.clientid, clientid, strlen(clientid)+1);
|
memcpy(mqtt_config.clientid, clientid, strlen(clientid)+1);
|
||||||
|
|||||||
@ -2722,8 +2722,8 @@ static void setRetryLoopInterval(int keepalive)
|
|||||||
|
|
||||||
if (proposed < 1)
|
if (proposed < 1)
|
||||||
proposed = 1;
|
proposed = 1;
|
||||||
else if (proposed > 5)
|
else if (proposed > 60)
|
||||||
proposed = 5;
|
proposed = 60; // 提高上限到60秒,适应更长的keepalive
|
||||||
if (proposed < retryLoopInterval)
|
if (proposed < retryLoopInterval)
|
||||||
retryLoopInterval = proposed;
|
retryLoopInterval = proposed;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1323,8 +1323,8 @@ static void setRetryLoopInterval(int keepalive)
|
|||||||
|
|
||||||
if (proposed < 1)
|
if (proposed < 1)
|
||||||
proposed = 1;
|
proposed = 1;
|
||||||
else if (proposed > 5)
|
else if (proposed > 60)
|
||||||
proposed = 5;
|
proposed = 60; // 提高上限到60秒,适应更长的keepalive
|
||||||
if (proposed < retryLoopInterval)
|
if (proposed < retryLoopInterval)
|
||||||
retryLoopInterval = proposed;
|
retryLoopInterval = proposed;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -633,7 +633,7 @@ void MQTTProtocol_keepalive(time_t now)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
//LOG_I("PINGREQ send success\n");
|
LOG_I("PINGREQ send success at %ld\n", (long)now);
|
||||||
client->net.lastSent = now;
|
client->net.lastSent = now;
|
||||||
client->ping_outstanding = 1;
|
client->ping_outstanding = 1;
|
||||||
}
|
}
|
||||||
@ -641,9 +641,25 @@ void MQTTProtocol_keepalive(time_t now)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Log(TRACE_PROTOCOL, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
|
// 增加容错:允许ping_outstanding为2时再等待一个周期
|
||||||
LOG_I("%s:PINGRESP not received\n",__func__);
|
if (client->ping_outstanding >= 2) {
|
||||||
MQTTProtocol_closeSession(client, 1);
|
Log(TRACE_PROTOCOL, -1, "PINGRESP not received after 2 keepalive intervals for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
|
||||||
|
LOG_I("%s:PINGRESP not received after 2 attempts\n",__func__);
|
||||||
|
MQTTProtocol_closeSession(client, 1);
|
||||||
|
} else {
|
||||||
|
// 第二次发送PINGREQ
|
||||||
|
if (Socket_noPendingWrites(client->net.socket)) {
|
||||||
|
if (MQTTPacket_send_pingreq(&client->net, client->clientID) != TCPSOCKET_COMPLETE) {
|
||||||
|
Log(TRACE_PROTOCOL, -1, "Error sending second PINGREQ for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
|
||||||
|
LOG_I("%s:Error sending second PINGREQ for client %s on socket %d, disconnecting\n",__func__,client->clientID, client->net.socket);
|
||||||
|
MQTTProtocol_closeSession(client, 1);
|
||||||
|
} else {
|
||||||
|
LOG_I("Second PINGREQ send success at %ld\n", (long)now);
|
||||||
|
client->net.lastSent = now;
|
||||||
|
client->ping_outstanding++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -178,7 +178,7 @@ int MQTTProtocol_handlePingresps(void* pack, int sock)
|
|||||||
client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
|
client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
|
||||||
Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID);
|
Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID);
|
||||||
client->ping_outstanding = 0;
|
client->ping_outstanding = 0;
|
||||||
//LOG_I("PINGRESP receive success\n");
|
LOG_I("PINGRESP received successfully, ping_outstanding cleared\n");
|
||||||
FUNC_EXIT_RC(rc);
|
FUNC_EXIT_RC(rc);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -419,6 +419,8 @@ void mqtt_utils_connected(void *context, char *cause){
|
|||||||
if (lightbarHeartbeat[i].isOnline) {
|
if (lightbarHeartbeat[i].isOnline) {
|
||||||
LOG_I("Report again lightbar %08X login\n", lightbarHeartbeat[i].tagCode);
|
LOG_I("Report again lightbar %08X login\n", lightbarHeartbeat[i].tagCode);
|
||||||
report_lightbar_login(lightbarHeartbeat[i].tagCode);
|
report_lightbar_login(lightbarHeartbeat[i].tagCode);
|
||||||
|
// 添加短暂延迟,避免MQTT缓冲区溢出
|
||||||
|
usleep(100 * 1000); // 100ms延迟
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&heartbeatMutex);
|
pthread_mutex_unlock(&heartbeatMutex);
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user