feishu_fabu/scripts/wp_cli.py
wp-publish-bot 1fb93e34c6 feat: 初始化 WordPress 自动发布系统(飞书机器人集成)
- 飞书消息接收与处理(文字、图片、Word 文档)
- WordPress REST API 文章发布
- 图片自动上传到媒体库
- Word 文档解析与发布
- HTML 格式化与分类自动匹配
- Python CLI 工具(避免 shell 引号冲突)
- Webhook 服务器(8080 端口)
- 完整日志系统
2026-05-12 15:09:30 +08:00

293 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
WordPress CLI 工具 - 供 OpenClaw 调用
通过 stdin 接收 JSON 数据,避免 shell 引号冲突问题
"""
import sys
import json
import requests
import argparse
import os
def main():
parser = argparse.ArgumentParser(description='WordPress CLI Tool')
parser.add_argument('action', choices=['create', 'update', 'get', 'list', 'delete', 'upload'],
help='操作类型')
parser.add_argument('--url', required=True, help='WordPress URL')
parser.add_argument('--user', required=True, help='WordPress 用户名')
parser.add_argument('--password', required=True, help='WordPress 应用密码')
parser.add_argument('--type', choices=['posts', 'pages', 'categories', 'tags', 'media'],
default='posts', help='资源类型')
parser.add_argument('--id', type=int, help='资源 ID用于 update/get/delete')
parser.add_argument('--data-file', help='JSON 数据文件路径(推荐,避免 shell 引号问题)')
parser.add_argument('--search', help='搜索关键词')
parser.add_argument('--per-page', type=int, default=10, help='每页数量')
parser.add_argument('--page', type=int, default=1, help='页码')
args = parser.parse_args()
# 构建基础 URL
base_url = args.url.rstrip('/')
api_base = f'{base_url}/wp-json/wp/v2/{args.type}'
# 读取 JSON 数据(优先从文件读取,避免 shell 引号问题)
data = {}
if args.data_file:
if os.path.exists(args.data_file):
with open(args.data_file, 'r', encoding='utf-8') as f:
data = json.load(f)
else:
print(json.dumps({'success': False, 'error': f'数据文件不存在:{args.data_file}'}))
sys.exit(1)
elif not sys.stdin.isatty():
# 从 stdin 读取
try:
stdin_data = sys.stdin.read()
if stdin_data.strip():
data = json.loads(stdin_data)
except json.JSONDecodeError as e:
print(json.dumps({'success': False, 'error': f'Stdin JSON 解析失败:{str(e)}'}))
sys.exit(1)
try:
if args.action == 'create':
result = create_resource(api_base, args.user, args.password, data)
elif args.action == 'update':
if not args.id:
print(json.dumps({'success': False, 'error': '更新操作需要 --id 参数'}))
sys.exit(1)
result = update_resource(f'{api_base}/{args.id}', args.user, args.password, data)
elif args.action == 'get':
if not args.id:
print(json.dumps({'success': False, 'error': '获取操作需要 --id 参数'}))
sys.exit(1)
result = get_resource(f'{api_base}/{args.id}', args.user, args.password)
elif args.action == 'list':
result = list_resources(api_base, args.user, args.password,
search=args.search, per_page=args.per_page, page=args.page)
elif args.action == 'delete':
if not args.id:
print(json.dumps({'success': False, 'error': '删除操作需要 --id 参数'}))
sys.exit(1)
result = delete_resource(f'{api_base}/{args.id}', args.user, args.password)
elif args.action == 'upload':
if not args.data_file:
print(json.dumps({'success': False, 'error': '上传操作需要 --data-file 指定媒体文件'}))
sys.exit(1)
result = upload_media(api_base, args.user, args.password, args.data_file, data)
# 输出结果
print(json.dumps(result, ensure_ascii=False, indent=2))
except Exception as e:
print(json.dumps({'success': False, 'error': str(e)}, ensure_ascii=False))
sys.exit(1)
def create_resource(api_url, user, password, data):
"""创建资源"""
response = requests.post(
api_url,
auth=(user, password),
json=data,
verify=False,
timeout=30
)
if response.status_code == 201:
result = response.json()
return {
'success': True,
'id': result.get('id'),
'url': result.get('link', ''),
'title': result.get('title', {}).get('rendered', '') if isinstance(result.get('title'), dict) else result.get('title', ''),
'data': result
}
else:
return {
'success': False,
'error': response.text,
'status_code': response.status_code
}
def update_resource(api_url, user, password, data):
"""更新资源"""
response = requests.post(
api_url,
auth=(user, password),
json=data,
verify=False,
timeout=30
)
if response.status_code == 200:
result = response.json()
return {
'success': True,
'id': result.get('id'),
'data': result
}
else:
return {
'success': False,
'error': response.text,
'status_code': response.status_code
}
def get_resource(api_url, user, password):
"""获取资源"""
response = requests.get(
api_url,
auth=(user, password),
verify=False,
timeout=30
)
if response.status_code == 200:
return {
'success': True,
'data': response.json()
}
else:
return {
'success': False,
'error': response.text,
'status_code': response.status_code
}
def list_resources(api_url, user, password, search=None, per_page=10, page=1):
"""列出资源"""
params = {
'per_page': per_page,
'page': page
}
if search:
params['search'] = search
response = requests.get(
api_url,
auth=(user, password),
params=params,
verify=False,
timeout=30
)
if response.status_code == 200:
return {
'success': True,
'data': response.json(),
'total': response.headers.get('X-WP-Total', 0),
'total_pages': response.headers.get('X-WP-TotalPages', 0)
}
else:
return {
'success': False,
'error': response.text,
'status_code': response.status_code
}
def delete_resource(api_url, user, password):
"""删除资源"""
response = requests.delete(
api_url,
auth=(user, password),
params={'force': True},
verify=False,
timeout=30
)
if response.status_code == 200:
return {
'success': True,
'data': response.json()
}
else:
return {
'success': False,
'error': response.text,
'status_code': response.status_code
}
def upload_media(api_url, user, password, file_path, metadata=None):
"""上传媒体文件"""
if not os.path.exists(file_path):
return {'success': False, 'error': f'文件不存在:{file_path}'}
filename = os.path.basename(file_path)
content_type = get_content_type(file_path)
headers = {
'Content-Disposition': f'attachment; filename="{filename}"',
'Content-Type': content_type
}
with open(file_path, 'rb') as f:
file_data = f.read()
response = requests.post(
api_url,
auth=(user, password),
headers=headers,
data=file_data,
verify=False,
timeout=60
)
if response.status_code == 201:
result = response.json()
media_id = result.get('id')
# 更新元数据
if metadata and media_id:
update_url = f'{api_url}/{media_id}'
requests.post(
update_url,
auth=(user, password),
json=metadata,
verify=False,
timeout=10
)
return {
'success': True,
'id': media_id,
'url': result.get('source_url', ''),
'filename': filename,
'data': result
}
else:
return {
'success': False,
'error': response.text,
'status_code': response.status_code
}
def get_content_type(file_path):
"""获取文件 MIME 类型"""
ext = os.path.splitext(file_path)[1].lower()
content_types = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp',
'.svg': 'image/svg+xml',
'.pdf': 'application/pdf',
'.doc': 'application/msword',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
}
return content_types.get(ext, 'application/octet-stream')
if __name__ == '__main__':
main()