🗣 Wd提问: mqtt聊天服务器
🤖 Ai回答: 我来帮你创建一个基于MQTT的聊天服务器。这里提供一个完整的实现方案:
1、服务端实现
python
mqtt_chat_server.py
import paho.mqtt.client as mqtt
import json
import time
from datetime import datetime
import threading
class MQTTChatServer:
def __init__(self, broker_host=localhost, broker_port=1883):
self.broker_host = broker_host
self.broker_port = broker_port
self.client = mqtt.Client()
self.connected_users = {}
self.chat_rooms = {}
设置回调函数
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
配置用户名密码(如果需要)
self.client.username_pw_set(username, password)
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print(f聊天服务器连接成功)
订阅系统主题
self.client.subscribe(chat/system/+) 系统消息
self.client.subscribe(chat/user/+/status) 用户状态
self.client.subscribe(chat/room/+/message) 房间消息
self.client.subscribe(chat/private/+/+) 私聊消息
else:
print(f连接失败,错误代码: {rc})
def on_disconnect(self, client, userdata, rc):
print(服务器断开连接)
def on_message(self, client, userdata, msg):
try:
topic = msg.topic
payload = msg.payload.decode('utf-8')
print(f收到消息 主题: {topic}, 内容: {payload})
处理不同类型的订阅
if topic.startswith(chat/user/):
self.handle_user_status(topic, payload)
elif topic.startswith(chat/room/):
self.handle_room_message(topic, payload)
elif topic.startswith(chat/private/):
self.handle_private_message(topic, payload)
except Exception as e:
print(f处理消息时出错: {e})
def handle_user_status(self, topic, payload):
处理用户状态变更
try:
data = json.loads(payload)
user_id = data.get('user_id')
status = data.get('status')
if status == 'online':
self.connected_users[user_id] = {
'timestamp': time.time(),
'client_id': data.get('client_id', '')
}
print(f用户 {user_id} 上线)
elif status == 'offline':
if user_id in self.connected_users:
del self.connected_users[user_id]
print(f用户 {user_id} 下线)
except Exception as e:
print(f处理用户状态时出错: {e})
def handle_room_message(self, topic, payload):
处理房间消息
try:
转发房间消息到其他相关主题
room_topic = topic.replace('/message', '/messages')
self.client.publish(room_topic, payload, qos=1)
except Exception as e:
print(f处理房间消息时出错: {e})
def handle_private_message(self, topic, payload):
处理私聊消息
try:
私聊消息直接转发即可
pass
except Exception as e:
print(f处理私聊消息时出错: {e})
def start_server(self):
启动服务器
try:
self.client.connect(self.broker_host, self.broker_port, 60)
self.client.loop_start()
print(fMQTT聊天服务器已启动 {self.broker_host}:{self.broker_port})
return True
except Exception as e:
print(f启动服务器失败: {e})
return False
def stop_server(self):
停止服务器
self.client.loop_stop()
self.client.disconnect()
print(聊天服务器已停止)
def get_online_users(self):
获取在线用户列表
return list(self.connected_users.keys())
def broadcast_system_message(self, message):
广播系统消息
system_msg = {
'type': 'system',
'content': message,
'timestamp': datetime.now().isoformat()
}
self.client.publish(chat/system/notification,
json.dumps(system_msg), qos=1)
启动服务器
if __name__ == __main__:
server = MQTTChatServer()
if server.start_server():
try:
广播服务器启动消息
server.broadcast_system_message(聊天服务器已启动)
保持服务器运行
while True:
time.sleep(1)
except KeyboardInterrupt:
print(\n正在关闭服务器...)
server.broadcast_system_message(聊天服务器即将关闭)
time.sleep(1)
server.stop_server()
2、客户端实现
python
mqtt_chat_client.py
import paho.mqtt.client as mqtt
import json
import threading
import time
from datetime import datetime
class MQTTChatClient:
def __init__(self, user_id, broker_host=localhost, broker_port=1883):
self.user_id = user_id
self.broker_host = broker_host
self.broker_port = broker_port
self.client = mqtt.Client()
self.is_connected = False
self.current_room = None
设置回调函数
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
订阅的主题
self.subscribed_topics = set()
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print(f{self.user_id} 连接成功)
self.is_connected = True
self.send_status('online')
self.subscribe_default_topics()
else:
print(f连接失败,错误代码: {rc})
def on_disconnect(self, client, userdata, rc):
print(f{self.user_id} 断开连接)
self.is_connected = False
def on_message(self, client, userdata, msg):
try:
topic = msg.topic
payload = msg.payload.decode('utf-8')
data = json.loads(payload)
根据主题类型处理消息
if topic.startswith(chat/system/):
self.handle_system_message(data)
elif topic.startswith(chat/room/messages):
self.handle_room_message(data)
elif topic.startswith(chat/private/):
self.handle_private_message(data)
elif topic.endswith(/status):
self.handle_user_status(data)
except Exception as e:
print(f处理消息时出错: {e})
def subscribe_default_topics(self):
订阅默认主题
topics = [
fchat/system/,
fchat/user/{self.user_id}/, 个人相关消息
fchat/private/{self.user_id}/+, 收到的私聊消息
]
for topic in topics:
if topic not in self.subscribed_topics:
self.client.subscribe(topic)
self.subscribed_topics.add(topic)
print(f已订阅: {topic})
def connect_to_server(self):
连接到服务器
try:
self.client.connect(self.broker_host, self.broker_port, 60)
self.client.loop_start()
return True
except Exception as e:
print(f连接服务器失败: {e})
return False
def disconnect_from_server(self):
断开服务器连接
self.send_status('offline')
time.sleep(0.5) 等待离线消息发送
self.client.loop_stop()
self.client.disconnect()
def send_status(self, status):
发送用户状态
status_data = {
'user_id': self.user_id,
'status': status,
'timestamp': datetime.now().isoformat()
}
self.client.publish(fchat/user/{self.user_id}/status,
json.dumps(status_data))
def join_room(self, room_name):
加入聊天室
if self.current_room:
self.leave_room()
self.current_room = room_name
room_topic = fchat/room/{room_name}
订阅房间消息
self.client.subscribe(f{room_topic}/messages)
self.subscribed_topics.add(f{room_topic}/messages)
发送加入房间的消息
join_msg = {
'type': 'join',
'user_id': self.user_id,
'room': room_name,
'timestamp': datetime.now().isoformat()
}
self.client.publish(f{room_topic}/message, json.dumps(join_msg))
print(f已加入房间: {room_name})
def leave_room(self):
离开聊天室
if self.current_room:
发送离开房间的消息
leave_msg = {
'type': 'leave',
'user_id': self.user_id,
'room': self.current_room,
'timestamp': datetime.now().isoformat()
}
self.client.publish(fchat/room/{self.current_room}/message,
json.dumps(leave_msg))
self.current_room = None
def send_room_message(self, content):
发送房间消息
if not self.current_room:
print(请
📣 商家广告 :
👍
0
👎
IP地址: 224.132.242.48
搜索次数: 0
提问时间: 2025-11-04 09:42:22
🛒 域名购买 :
6.tj.cn 62.gs yxxg.cn 62.chat 2.bj.cn 279.net.cn 46.gold 08.gold 54.gold 875.net.cn
❓️ 热门提问 :
香港买域名 内网dns服务 香港粤语版电视剧 tn.us域名 ip paris 路由器搭建vps 企业机房排名 智能云计算平台 云主机与云主机 ip变 豌豆Ai站群搜索引擎系统
🌐 域名评估 :
vivo.he.cn kimi.cool si.sx.cn vzbw.cn si.gz.cn dwtop.com f.xj.cn ev.mo.cn d63.net sun.hb.cn
⛏ 最新挖掘 :
凯里酸汤鱼 贵阳烤脑花 都匀毛尖茶 花溪牛肉粉 贵阳烤生蚝 贵阳烤豆腐 贵阳纸包鱼 贵阳特色菜
🖌 热门作画 :
🤝 关于我们:
豌豆Ai
域名
建站
站群
留痕
推广
评估
源码
开发Ai
工具
日记
价格
加盟
广告
流量
留言
联系
🗨 加入群聊 :
🔗 友情链接 :
搜索引擎导航
广州月饼生产厂家
ai提问
🧰 站长工具 :Ai工具 whois查询 搜索
📢 温馨提示 :本站所有问答由Ai自动创作,内容仅供参考,若有误差请用“联系”里面信息通知我们人工修改或删除。
👉 技术支持 :本站由豌豆Ai 提供技术支持,使用的最新版:《豌豆Ai站群搜索引擎系统 V.25.10.25》搭建本站。