豌豆Ai站群搜索引擎系统 V.25.10.25 网址:www.wd.chat

🗣 Wd提问: mqtt聊天服务器

🤖 Ai回答:
我来帮你创建一个基于MQTT的聊天服务器。这里提供一个完整的实现方案:

1、服务端实现

python
mqtt_chat_server.py
import paho.mqtt.client as mqtt
import json
import time
from datetime import datetime
import threading

class MQTTChatServer:
def __init__(self, broker_host=localhost, broker_port=1883):
self.broker_host = broker_host
self.broker_port = broker_port
self.client = mqtt.Client()
self.connected_users = {}
self.chat_rooms = {}

设置回调函数
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message

配置用户名密码(如果需要)
self.client.username_pw_set(username, password)

def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print(f聊天服务器连接成功)
订阅系统主题
self.client.subscribe(chat/system/+) 系统消息
self.client.subscribe(chat/user/+/status) 用户状态
self.client.subscribe(chat/room/+/message) 房间消息
self.client.subscribe(chat/private/+/+) 私聊消息
else:
print(f连接失败,错误代码: {rc})

def on_disconnect(self, client, userdata, rc):
print(服务器断开连接)

def on_message(self, client, userdata, msg):
try:
topic = msg.topic
payload = msg.payload.decode('utf-8')

print(f收到消息 主题: {topic}, 内容: {payload})

处理不同类型的订阅
if topic.startswith(chat/user/):
self.handle_user_status(topic, payload)
elif topic.startswith(chat/room/):
self.handle_room_message(topic, payload)
elif topic.startswith(chat/private/):
self.handle_private_message(topic, payload)

except Exception as e:
print(f处理消息时出错: {e})

def handle_user_status(self, topic, payload):
处理用户状态变更
try:
data = json.loads(payload)
user_id = data.get('user_id')
status = data.get('status')

if status == 'online':
self.connected_users[user_id] = {
'timestamp': time.time(),
'client_id': data.get('client_id', '')
}
print(f用户 {user_id} 上线)
elif status == 'offline':
if user_id in self.connected_users:
del self.connected_users[user_id]
print(f用户 {user_id} 下线)

except Exception as e:
print(f处理用户状态时出错: {e})

def handle_room_message(self, topic, payload):
处理房间消息
try:
转发房间消息到其他相关主题
room_topic = topic.replace('/message', '/messages')
self.client.publish(room_topic, payload, qos=1)

except Exception as e:
print(f处理房间消息时出错: {e})

def handle_private_message(self, topic, payload):
处理私聊消息
try:
私聊消息直接转发即可
pass

except Exception as e:
print(f处理私聊消息时出错: {e})

def start_server(self):
启动服务器
try:
self.client.connect(self.broker_host, self.broker_port, 60)
self.client.loop_start()
print(fMQTT聊天服务器已启动 {self.broker_host}:{self.broker_port})
return True
except Exception as e:
print(f启动服务器失败: {e})
return False

def stop_server(self):
停止服务器
self.client.loop_stop()
self.client.disconnect()
print(聊天服务器已停止)

def get_online_users(self):
获取在线用户列表
return list(self.connected_users.keys())

def broadcast_system_message(self, message):
广播系统消息
system_msg = {
'type': 'system',
'content': message,
'timestamp': datetime.now().isoformat()
}
self.client.publish(chat/system/notification,
json.dumps(system_msg), qos=1)

启动服务器
if __name__ == __main__:
server = MQTTChatServer()
if server.start_server():
try:
广播服务器启动消息
server.broadcast_system_message(聊天服务器已启动)

保持服务器运行
while True:
time.sleep(1)

except KeyboardInterrupt:
print(\n正在关闭服务器...)
server.broadcast_system_message(聊天服务器即将关闭)
time.sleep(1)
server.stop_server()

2、客户端实现

python
mqtt_chat_client.py
import paho.mqtt.client as mqtt
import json
import threading
import time
from datetime import datetime

class MQTTChatClient:
def __init__(self, user_id, broker_host=localhost, broker_port=1883):
self.user_id = user_id
self.broker_host = broker_host
self.broker_port = broker_port
self.client = mqtt.Client()
self.is_connected = False
self.current_room = None

设置回调函数
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message

订阅的主题
self.subscribed_topics = set()

def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print(f{self.user_id} 连接成功)
self.is_connected = True
self.send_status('online')
self.subscribe_default_topics()
else:
print(f连接失败,错误代码: {rc})

def on_disconnect(self, client, userdata, rc):
print(f{self.user_id} 断开连接)
self.is_connected = False

def on_message(self, client, userdata, msg):
try:
topic = msg.topic
payload = msg.payload.decode('utf-8')
data = json.loads(payload)

根据主题类型处理消息
if topic.startswith(chat/system/):
self.handle_system_message(data)
elif topic.startswith(chat/room/messages):
self.handle_room_message(data)
elif topic.startswith(chat/private/):
self.handle_private_message(data)
elif topic.endswith(/status):
self.handle_user_status(data)

except Exception as e:
print(f处理消息时出错: {e})

def subscribe_default_topics(self):
订阅默认主题
topics = [
fchat/system/,
fchat/user/{self.user_id}/, 个人相关消息
fchat/private/{self.user_id}/+, 收到的私聊消息
]

for topic in topics:
if topic not in self.subscribed_topics:
self.client.subscribe(topic)
self.subscribed_topics.add(topic)
print(f已订阅: {topic})

def connect_to_server(self):
连接到服务器
try:
self.client.connect(self.broker_host, self.broker_port, 60)
self.client.loop_start()
return True
except Exception as e:
print(f连接服务器失败: {e})
return False

def disconnect_from_server(self):
断开服务器连接
self.send_status('offline')
time.sleep(0.5) 等待离线消息发送
self.client.loop_stop()
self.client.disconnect()

def send_status(self, status):
发送用户状态
status_data = {
'user_id': self.user_id,
'status': status,
'timestamp': datetime.now().isoformat()
}
self.client.publish(fchat/user/{self.user_id}/status,
json.dumps(status_data))

def join_room(self, room_name):
加入聊天室
if self.current_room:
self.leave_room()

self.current_room = room_name
room_topic = fchat/room/{room_name}

订阅房间消息
self.client.subscribe(f{room_topic}/messages)
self.subscribed_topics.add(f{room_topic}/messages)

发送加入房间的消息
join_msg = {
'type': 'join',
'user_id': self.user_id,
'room': room_name,
'timestamp': datetime.now().isoformat()
}
self.client.publish(f{room_topic}/message, json.dumps(join_msg))
print(f已加入房间: {room_name})

def leave_room(self):
离开聊天室
if self.current_room:
发送离开房间的消息
leave_msg = {
'type': 'leave',
'user_id': self.user_id,
'room': self.current_room,
'timestamp': datetime.now().isoformat()
}
self.client.publish(fchat/room/{self.current_room}/message,
json.dumps(leave_msg))
self.current_room = None

def send_room_message(self, content):
发送房间消息
if not self.current_room:
print(请

Ai作答

📣 商家广告


vps

算命

广告招商

广告招商

姓名


0

IP地址: 224.132.242.48

搜索次数: 0

提问时间: 2025-11-04 09:42:22

🛒 域名购买
6.tj.cn
62.gs
yxxg.cn
62.chat
2.bj.cn
279.net.cn
46.gold
08.gold
54.gold
875.net.cn

❓️ 热门提问
香港买域名
内网dns服务
香港粤语版电视剧
tn.us域名
ip paris
路由器搭建vps
企业机房排名
智能云计算平台
云主机与云主机
ip变
豌豆Ai站群搜索引擎系统

🌐 域名评估
vivo.he.cn
kimi.cool
si.sx.cn
vzbw.cn
si.gz.cn
dwtop.com
f.xj.cn
ev.mo.cn
d63.net
sun.hb.cn

⛏ 最新挖掘
凯里酸汤鱼
贵阳烤脑花
都匀毛尖茶
花溪牛肉粉
贵阳烤生蚝
贵阳烤豆腐
贵阳纸包鱼
贵阳特色菜

🖌 热门作画

🤝 关于我们:
豌豆Ai 域名 建站 站群 留痕 推广 评估 源码
开发Ai 工具 日记 价格 加盟 广告 流量 留言 联系

🗨 加入群聊
群

🔗 友情链接
搜索引擎导航  广州月饼生产厂家  ai提问

🧰 站长工具
Ai工具  whois查询  搜索

📢 温馨提示:本站所有问答由Ai自动创作,内容仅供参考,若有误差请用“联系”里面信息通知我们人工修改或删除。

👉 技术支持:本站由豌豆Ai提供技术支持,使用的最新版:《豌豆Ai站群搜索引擎系统 V.25.10.25》搭建本站。

上一篇 58809 58810 58811 下一篇