#include "mqttclient.h" #include #include #include MqttClient::MqttClient(QObject *parent) : QObject(parent) , m_socket(new QTcpSocket(this)) , m_isConnected(false) , m_username("") // 添加这一行 , m_password("") // 添加这一行 { connect(m_socket, &QTcpSocket::connected, this, &MqttClient::onConnected); connect(m_socket, &QTcpSocket::disconnected, this, &MqttClient::onDisconnected); connect(m_socket, &QTcpSocket::readyRead, this, &MqttClient::onReadyRead); connect(m_socket, QOverload::of(&QAbstractSocket::errorOccurred), this, &MqttClient::onError); // 添加心跳定时器 m_heartbeatTimer = new QTimer(this); connect(m_heartbeatTimer, &QTimer::timeout, this, &MqttClient::sendHeartbeat); } MqttClient::~MqttClient() { if (m_socket->state() == QAbstractSocket::ConnectedState) { m_socket->disconnectFromHost(); } } void MqttClient::connectToHost(const QString &host, quint16 port) { if (m_socket->state() == QTcpSocket::ConnectedState) { return; } m_host = host; m_port = port; qDebug() << "连接到MQTT服务器: " << host << ":" << port; // 只使用异步连接,避免竞态条件 m_socket->connectToHost(host, port); } void MqttClient::sendConnectPacket() { QByteArray packet; // 可变头部和载荷 QByteArray variableHeader; QByteArray payload; // 协议名称 - 修改为MQTT 3.1格式 variableHeader.append(char(0x00)); variableHeader.append(char(0x06)); // 长度改为6 variableHeader.append("MQIsdp"); // 协议名改为MQIsdp // 协议版本 (3 for MQTT 3.1) variableHeader.append(char(0x03)); // 版本改为3 // 连接标志 quint8 connectFlags = 0x02; // Clean Session if (!m_username.isEmpty()) { connectFlags |= 0x80; // Username flag } if (!m_password.isEmpty()) { connectFlags |= 0x40; // Password flag } variableHeader.append(connectFlags); // Keep Alive (60秒) variableHeader.append(char(0x00)); variableHeader.append(char(0x3C)); // 客户端ID QString clientId = QString("QtClient_%1").arg(QDateTime::currentMSecsSinceEpoch()); payload.append(static_cast(clientId.length() >> 8)); payload.append(static_cast(clientId.length() & 0xFF)); payload.append(clientId.toUtf8()); // 如果有用户名,添加到载荷 if (!m_username.isEmpty()) { payload.append(static_cast(m_username.length() >> 8)); payload.append(static_cast(m_username.length() & 0xFF)); payload.append(m_username.toUtf8()); } // 如果有密码,添加到载荷 if (!m_password.isEmpty()) { payload.append(static_cast(m_password.length() >> 8)); payload.append(static_cast(m_password.length() & 0xFF)); payload.append(m_password.toUtf8()); } // 计算剩余长度 int remainingLength = variableHeader.length() + payload.length(); // MQTT固定头部 packet.append(char(0x10)); // CONNECT消息类型 // 编码剩余长度(变长编码) do { quint8 byte = remainingLength % 128; remainingLength = remainingLength / 128; if (remainingLength > 0) { byte = byte | 128; } packet.append(char(byte)); } while (remainingLength > 0); // 组装完整包 packet.append(variableHeader); packet.append(payload); // 调试信息 qDebug() << "发送MQTT 3.1 CONNECT包,长度:" << packet.length(); qDebug() << "协议: MQIsdp v3.1"; qDebug() << "用户名:" << m_username; qDebug() << "客户端ID:" << clientId; // 发送CONNECT包 m_socket->write(packet); m_socket->flush(); } // 在mqttclient.cpp中添加以下方法实现: void MqttClient::setCredentials(const QString &username, const QString &password) { m_username = username; m_password = password; } void MqttClient::setUsername(const QString &username) { m_username = username; } void MqttClient::setPassword(const QString &password) { m_password = password; } void MqttClient::disconnectFromHost() { m_heartbeatTimer->stop(); if (m_socket->state() == QAbstractSocket::ConnectedState) { // 发送MQTT DISCONNECT报文 QByteArray disconnectPacket; disconnectPacket.append(char(0xE0)); // DISCONNECT报文类型 disconnectPacket.append(char(0x00)); // 剩余长度为0 m_socket->write(disconnectPacket); m_socket->disconnectFromHost(); } } bool MqttClient::isConnected() const { return m_isConnected; } bool MqttClient::publish(const QString &topic, const QString &message) { if (!m_isConnected) { emit errorOccurred("未连接到MQTT服务器。。。"); return false; } // 构建MQTT PUBLISH报文 QByteArray packet; packet.append(char(0x30)); // PUBLISH报文类型 QByteArray payload; // 添加主题长度和主题 payload.append(char(topic.length() >> 8)); payload.append(char(topic.length() & 0xFF)); payload.append(topic.toUtf8()); // 添加消息内容 payload.append(message.toUtf8()); // 修复:使用MQTT变长编码计算剩余长度 int remainingLength = payload.length(); QByteArray lengthBytes; do { quint8 encodedByte = remainingLength % 128; remainingLength = remainingLength / 128; if (remainingLength > 0) { encodedByte = encodedByte | 128; } lengthBytes.append(char(encodedByte)); } while (remainingLength > 0); // 添加变长编码的剩余长度 packet.append(lengthBytes); packet.append(payload); qDebug() << "发送PUBLISH消息,主题:" << topic << ",消息长度:" << message.length() << ",总包长度:" << packet.length(); qint64 written = m_socket->write(packet); return written > 0; } void MqttClient::subscribe(const QString &topic) { if (!m_isConnected) { emit errorOccurred("未连接到MQTT服务器"); return; } // 构建简单的MQTT SUBSCRIBE报文 QByteArray packet; packet.append(char(0x82)); // SUBSCRIBE报文类型 QByteArray payload; payload.append(char(0x00)); // 报文标识符高字节 payload.append(char(0x01)); // 报文标识符低字节 // 添加主题长度和主题 payload.append(char(topic.length() >> 8)); payload.append(char(topic.length() & 0xFF)); payload.append(topic.toUtf8()); payload.append(char(0x00)); // QoS级别 packet.append(char(payload.length())); packet.append(payload); m_socket->write(packet); } void MqttClient::onConnected() { qDebug() << "TCP连接已建立,发送MQTT CONNECT包"; // TCP连接建立后发送MQTT CONNECT包 sendConnectPacket(); } void MqttClient::onDisconnected() { m_isConnected = false; m_heartbeatTimer->stop(); emit disconnected(); } void MqttClient::onReadyRead() { // 将新数据追加到缓冲区 m_receiveBuffer.append(m_socket->readAll()); // 处理缓冲区中的完整消息 while (m_receiveBuffer.length() >= 2) { quint8 messageType = static_cast(m_receiveBuffer[0]) & 0xF0; // 解析消息长度 int remainingLength = 0; int multiplier = 1; int pos = 1; // MQTT变长编码解析 do { if (pos >= m_receiveBuffer.length()) { // 长度字段不完整,等待更多数据 return; } quint8 byte = static_cast(m_receiveBuffer[pos]); remainingLength += (byte & 0x7F) * multiplier; multiplier *= 128; pos++; if ((byte & 0x80) == 0) { break; // 长度解析完成 } } while (multiplier <= 128 * 128 * 128); // 计算完整消息长度 int totalMessageLength = pos + remainingLength; // 检查是否有完整消息 if (m_receiveBuffer.length() < totalMessageLength) { // 消息不完整,等待更多数据 return; } // 提取完整消息 QByteArray completeMessage = m_receiveBuffer.left(totalMessageLength); m_receiveBuffer.remove(0, totalMessageLength); // 处理完整消息 processCompleteMessage(completeMessage); } } void MqttClient::processCompleteMessage(const QByteArray &data) { if (data.length() >= 2) { quint8 messageType = static_cast(data[0]) & 0xF0; switch (messageType) { case 0x20: // CONNACK if (data.length() >= 4) { quint8 returnCode = data[3]; if (returnCode == 0x00) { m_isConnected = true; m_heartbeatTimer->start(30000); // 30秒心跳 emit connected(); } else { QString errorMsg; switch (returnCode) { case 0x01: errorMsg = "连接被拒绝:协议版本不支持"; break; case 0x02: errorMsg = "连接被拒绝:客户端ID不合法"; break; case 0x03: errorMsg = "连接被拒绝:服务器不可用"; break; case 0x04: errorMsg = "连接被拒绝:用户名或密码错误"; break; case 0x05: errorMsg = "连接被拒绝:未授权"; break; default: errorMsg = QString("连接被拒绝:未知错误码 0x%1").arg(returnCode, 2, 16, QChar('0')); break; } emit errorOccurred(errorMsg); } } break; case 0x30: // PUBLISH // 正确解析PUBLISH消息 if (data.length() > 4) { // 解析MQTT PUBLISH报文格式 int pos = 2; // 跳过固定头部 // 读取主题长度(2字节,大端序) if (pos + 2 <= data.length()) { quint16 topicLength = (static_cast(data[pos]) << 8) | static_cast(data[pos + 1]); pos += 2; // 读取主题 if (pos + topicLength <= data.length()) { QString topic = QString::fromUtf8(data.mid(pos, topicLength)); pos += topicLength; // 读取消息内容 QString message = QString::fromUtf8(data.mid(pos)); emit messageReceived(topic, message); } else { // 如果解析失败,使用原来的方式 emit messageReceived("incoming/topic", QString::fromUtf8(data.mid(4))); } } else { emit messageReceived("incoming/topic", QString::fromUtf8(data.mid(4))); } } break; case 0xD0: // PINGRESP // 心跳响应,无需处理 break; } } } void MqttClient::onError(QAbstractSocket::SocketError error) { QString errorString; switch (error) { case QAbstractSocket::ConnectionRefusedError: errorString = "连接被拒绝"; break; case QAbstractSocket::RemoteHostClosedError: errorString = "远程主机关闭连接"; break; case QAbstractSocket::HostNotFoundError: errorString = "主机未找到"; break; case QAbstractSocket::SocketTimeoutError: errorString = "连接超时"; break; default: errorString = QString("网络错误: %1").arg(m_socket->errorString()); break; } m_isConnected = false; m_heartbeatTimer->stop(); emit errorOccurred(errorString); } void MqttClient::sendHeartbeat() { if (m_isConnected && m_socket->state() == QAbstractSocket::ConnectedState) { // 发送MQTT PINGREQ报文 QByteArray pingPacket; pingPacket.append(char(0xC0)); // PINGREQ报文类型 pingPacket.append(char(0x00)); // 剩余长度为0 m_socket->write(pingPacket); } }