feishu_fabu/webhook_server.py
wp-publish-bot 1fb93e34c6 feat: 初始化 WordPress 自动发布系统(飞书机器人集成)
- 飞书消息接收与处理(文字、图片、Word 文档)
- WordPress REST API 文章发布
- 图片自动上传到媒体库
- Word 文档解析与发布
- HTML 格式化与分类自动匹配
- Python CLI 工具(避免 shell 引号冲突)
- Webhook 服务器(8080 端口)
- 完整日志系统
2026-05-12 15:09:30 +08:00

199 lines
6.3 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
飞书机器人 Webhook 服务器
接收飞书消息推送,处理并回复
"""
import os
import sys
import json
import logging
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime
# 添加项目根目录到 Python 路径
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from feishu_bot import handle_message
# 配置日志
LOG_DIR = os.path.join(BASE_DIR, 'logs')
os.makedirs(LOG_DIR, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(LOG_DIR, 'webhook_server.log'), encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger('webhook_server')
class FeishuWebhookHandler(BaseHTTPRequestHandler):
"""飞书 Webhook 处理器"""
def do_POST(self):
"""处理 POST 请求"""
try:
# 读取请求体
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
logger.warning("收到空请求")
self._send_response(400, {'error': 'Empty request body'})
return
post_data = self.rfile.read(content_length)
# 解析 JSON
try:
message_data = json.loads(post_data.decode('utf-8'))
except json.JSONDecodeError as e:
logger.error(f"JSON 解析失败:{str(e)}")
logger.error(f"原始数据:{post_data[:200]}")
self._send_response(400, {'error': 'Invalid JSON'})
return
logger.info(f"📨 收到飞书消息")
logger.debug(f"请求数据:{json.dumps(message_data, ensure_ascii=False)}")
# 处理事件
if message_data.get('header', {}).get('event_type') == 'url_verification':
# URL 验证
self._handle_url_verification(message_data)
elif message_data.get('header', {}).get('event_type') == 'im.message.receive_v1':
# 消息接收事件
self._handle_message_event(message_data)
else:
logger.warning(f"未知事件类型:{message_data.get('header', {}).get('event_type')}")
self._send_response(200, {'error': 'Unknown event type'})
except Exception as e:
logger.error(f"处理请求失败:{str(e)}", exc_info=True)
self._send_response(500, {'error': str(e)})
def _handle_url_verification(self, message_data):
"""
处理 URL 验证
Args:
message_data: 验证数据
"""
challenge = message_data.get('challenge', '')
logger.info(f"🔐 URL 验证:{challenge}")
self._send_response(200, {'challenge': challenge})
def _handle_message_event(self, message_data):
"""
处理消息事件
Args:
message_data: 消息数据
"""
try:
# 提取消息内容
event = message_data.get('event', {})
message = event.get('message', {})
sender = event.get('sender', {})
# 构建消息对象
msg = {
'msg_type': message.get('message_type', ''),
'content': message.get('content', ''),
'sender': {
'sender_id': sender.get('sender_id', ''),
'sender_type': sender.get('sender_type', '')
},
'message_id': message.get('message_id', ''),
'chat_id': message.get('chat_id', '')
}
# 处理消息
reply = handle_message(msg)
# 发送回复
self._send_reply(msg.get('chat_id', ''), msg.get('message_id', ''), reply)
logger.info(f"✅ 消息处理完成")
except Exception as e:
logger.error(f"处理消息事件失败:{str(e)}", exc_info=True)
def _send_reply(self, chat_id, message_id, reply_text):
"""
发送回复
Args:
chat_id: 聊天 ID
message_id: 消息 ID
reply_text: 回复内容
"""
# TODO: 实现回复发送逻辑
logger.info(f"📤 准备回复 - 聊天:{chat_id}, 消息:{message_id}")
logger.info(f"回复内容:{reply_text}")
def _send_response(self, status_code, data):
"""
发送响应
Args:
status_code: HTTP 状态码
data: 响应数据
"""
self.send_response(status_code)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(data).encode('utf-8'))
def do_GET(self):
"""处理 GET 请求"""
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.end_headers()
html = """
<html>
<head><title>飞书机器人 Webhook 服务</title></head>
<body>
<h1>🤖 飞书机器人 Webhook 服务</h1>
<p>服务运行正常 ✅</p>
<p>时间:{time}</p>
</body>
</html>
""".format(time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
self.wfile.write(html.encode('utf-8'))
def start_server(host='0.0.0.0', port=8080):
"""
启动 Webhook 服务器
Args:
host: 监听地址
port: 监听端口
"""
server = HTTPServer((host, port), FeishuWebhookHandler)
logger.info(f"🌐 Webhook 服务器启动 - http://{host}:{port}")
try:
server.serve_forever()
except KeyboardInterrupt:
logger.info("🛑 服务器关闭")
server.shutdown()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='飞书机器人 Webhook 服务器')
parser.add_argument('--host', default='0.0.0.0', help='监听地址')
parser.add_argument('--port', type=int, default=8080, help='监听端口')
args = parser.parse_args()
start_server(args.host, args.port)