- 飞书消息接收与处理(文字、图片、Word 文档) - WordPress REST API 文章发布 - 图片自动上传到媒体库 - Word 文档解析与发布 - HTML 格式化与分类自动匹配 - Python CLI 工具(避免 shell 引号冲突) - Webhook 服务器(8080 端口) - 完整日志系统
199 lines
6.3 KiB
Python
199 lines
6.3 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
飞书机器人 Webhook 服务器
|
|
接收飞书消息推送,处理并回复
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import logging
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
from datetime import datetime
|
|
|
|
# 添加项目根目录到 Python 路径
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
sys.path.insert(0, BASE_DIR)
|
|
|
|
from feishu_bot import handle_message
|
|
|
|
# 配置日志
|
|
LOG_DIR = os.path.join(BASE_DIR, 'logs')
|
|
os.makedirs(LOG_DIR, exist_ok=True)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(os.path.join(LOG_DIR, 'webhook_server.log'), encoding='utf-8'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger('webhook_server')
|
|
|
|
|
|
class FeishuWebhookHandler(BaseHTTPRequestHandler):
|
|
"""飞书 Webhook 处理器"""
|
|
|
|
def do_POST(self):
|
|
"""处理 POST 请求"""
|
|
try:
|
|
# 读取请求体
|
|
content_length = int(self.headers.get('Content-Length', 0))
|
|
if content_length == 0:
|
|
logger.warning("收到空请求")
|
|
self._send_response(400, {'error': 'Empty request body'})
|
|
return
|
|
|
|
post_data = self.rfile.read(content_length)
|
|
|
|
# 解析 JSON
|
|
try:
|
|
message_data = json.loads(post_data.decode('utf-8'))
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"JSON 解析失败:{str(e)}")
|
|
logger.error(f"原始数据:{post_data[:200]}")
|
|
self._send_response(400, {'error': 'Invalid JSON'})
|
|
return
|
|
|
|
logger.info(f"📨 收到飞书消息")
|
|
logger.debug(f"请求数据:{json.dumps(message_data, ensure_ascii=False)}")
|
|
|
|
# 处理事件
|
|
if message_data.get('header', {}).get('event_type') == 'url_verification':
|
|
# URL 验证
|
|
self._handle_url_verification(message_data)
|
|
elif message_data.get('header', {}).get('event_type') == 'im.message.receive_v1':
|
|
# 消息接收事件
|
|
self._handle_message_event(message_data)
|
|
else:
|
|
logger.warning(f"未知事件类型:{message_data.get('header', {}).get('event_type')}")
|
|
self._send_response(200, {'error': 'Unknown event type'})
|
|
|
|
except Exception as e:
|
|
logger.error(f"处理请求失败:{str(e)}", exc_info=True)
|
|
self._send_response(500, {'error': str(e)})
|
|
|
|
def _handle_url_verification(self, message_data):
|
|
"""
|
|
处理 URL 验证
|
|
|
|
Args:
|
|
message_data: 验证数据
|
|
"""
|
|
challenge = message_data.get('challenge', '')
|
|
logger.info(f"🔐 URL 验证:{challenge}")
|
|
self._send_response(200, {'challenge': challenge})
|
|
|
|
def _handle_message_event(self, message_data):
|
|
"""
|
|
处理消息事件
|
|
|
|
Args:
|
|
message_data: 消息数据
|
|
"""
|
|
try:
|
|
# 提取消息内容
|
|
event = message_data.get('event', {})
|
|
message = event.get('message', {})
|
|
sender = event.get('sender', {})
|
|
|
|
# 构建消息对象
|
|
msg = {
|
|
'msg_type': message.get('message_type', ''),
|
|
'content': message.get('content', ''),
|
|
'sender': {
|
|
'sender_id': sender.get('sender_id', ''),
|
|
'sender_type': sender.get('sender_type', '')
|
|
},
|
|
'message_id': message.get('message_id', ''),
|
|
'chat_id': message.get('chat_id', '')
|
|
}
|
|
|
|
# 处理消息
|
|
reply = handle_message(msg)
|
|
|
|
# 发送回复
|
|
self._send_reply(msg.get('chat_id', ''), msg.get('message_id', ''), reply)
|
|
|
|
logger.info(f"✅ 消息处理完成")
|
|
|
|
except Exception as e:
|
|
logger.error(f"处理消息事件失败:{str(e)}", exc_info=True)
|
|
|
|
def _send_reply(self, chat_id, message_id, reply_text):
|
|
"""
|
|
发送回复
|
|
|
|
Args:
|
|
chat_id: 聊天 ID
|
|
message_id: 消息 ID
|
|
reply_text: 回复内容
|
|
"""
|
|
# TODO: 实现回复发送逻辑
|
|
logger.info(f"📤 准备回复 - 聊天:{chat_id}, 消息:{message_id}")
|
|
logger.info(f"回复内容:{reply_text}")
|
|
|
|
def _send_response(self, status_code, data):
|
|
"""
|
|
发送响应
|
|
|
|
Args:
|
|
status_code: HTTP 状态码
|
|
data: 响应数据
|
|
"""
|
|
self.send_response(status_code)
|
|
self.send_header('Content-Type', 'application/json')
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(data).encode('utf-8'))
|
|
|
|
def do_GET(self):
|
|
"""处理 GET 请求"""
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
|
self.end_headers()
|
|
|
|
html = """
|
|
<html>
|
|
<head><title>飞书机器人 Webhook 服务</title></head>
|
|
<body>
|
|
<h1>🤖 飞书机器人 Webhook 服务</h1>
|
|
<p>服务运行正常 ✅</p>
|
|
<p>时间:{time}</p>
|
|
</body>
|
|
</html>
|
|
""".format(time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
|
|
|
|
self.wfile.write(html.encode('utf-8'))
|
|
|
|
|
|
def start_server(host='0.0.0.0', port=8080):
|
|
"""
|
|
启动 Webhook 服务器
|
|
|
|
Args:
|
|
host: 监听地址
|
|
port: 监听端口
|
|
"""
|
|
server = HTTPServer((host, port), FeishuWebhookHandler)
|
|
logger.info(f"🌐 Webhook 服务器启动 - http://{host}:{port}")
|
|
|
|
try:
|
|
server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
logger.info("🛑 服务器关闭")
|
|
server.shutdown()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='飞书机器人 Webhook 服务器')
|
|
parser.add_argument('--host', default='0.0.0.0', help='监听地址')
|
|
parser.add_argument('--port', type=int, default=8080, help='监听端口')
|
|
|
|
args = parser.parse_args()
|
|
|
|
start_server(args.host, args.port)
|