#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ WordPress CLI 工具 - 供 OpenClaw 调用 通过 stdin 接收 JSON 数据,避免 shell 引号冲突问题 """ import sys import json import requests import argparse import os def main(): parser = argparse.ArgumentParser(description='WordPress CLI Tool') parser.add_argument('action', choices=['create', 'update', 'get', 'list', 'delete', 'upload'], help='操作类型') parser.add_argument('--url', required=True, help='WordPress URL') parser.add_argument('--user', required=True, help='WordPress 用户名') parser.add_argument('--password', required=True, help='WordPress 应用密码') parser.add_argument('--type', choices=['posts', 'pages', 'categories', 'tags', 'media'], default='posts', help='资源类型') parser.add_argument('--id', type=int, help='资源 ID(用于 update/get/delete)') parser.add_argument('--data-file', help='JSON 数据文件路径(推荐,避免 shell 引号问题)') parser.add_argument('--search', help='搜索关键词') parser.add_argument('--per-page', type=int, default=10, help='每页数量') parser.add_argument('--page', type=int, default=1, help='页码') args = parser.parse_args() # 构建基础 URL base_url = args.url.rstrip('/') api_base = f'{base_url}/wp-json/wp/v2/{args.type}' # 读取 JSON 数据(优先从文件读取,避免 shell 引号问题) data = {} if args.data_file: if os.path.exists(args.data_file): with open(args.data_file, 'r', encoding='utf-8') as f: data = json.load(f) else: print(json.dumps({'success': False, 'error': f'数据文件不存在:{args.data_file}'})) sys.exit(1) elif not sys.stdin.isatty(): # 从 stdin 读取 try: stdin_data = sys.stdin.read() if stdin_data.strip(): data = json.loads(stdin_data) except json.JSONDecodeError as e: print(json.dumps({'success': False, 'error': f'Stdin JSON 解析失败:{str(e)}'})) sys.exit(1) try: if args.action == 'create': result = create_resource(api_base, args.user, args.password, data) elif args.action == 'update': if not args.id: print(json.dumps({'success': False, 'error': '更新操作需要 --id 参数'})) sys.exit(1) result = update_resource(f'{api_base}/{args.id}', args.user, args.password, data) elif args.action == 'get': if not args.id: print(json.dumps({'success': False, 'error': '获取操作需要 --id 参数'})) sys.exit(1) result = get_resource(f'{api_base}/{args.id}', args.user, args.password) elif args.action == 'list': result = list_resources(api_base, args.user, args.password, search=args.search, per_page=args.per_page, page=args.page) elif args.action == 'delete': if not args.id: print(json.dumps({'success': False, 'error': '删除操作需要 --id 参数'})) sys.exit(1) result = delete_resource(f'{api_base}/{args.id}', args.user, args.password) elif args.action == 'upload': if not args.data_file: print(json.dumps({'success': False, 'error': '上传操作需要 --data-file 指定媒体文件'})) sys.exit(1) result = upload_media(api_base, args.user, args.password, args.data_file, data) # 输出结果 print(json.dumps(result, ensure_ascii=False, indent=2)) except Exception as e: print(json.dumps({'success': False, 'error': str(e)}, ensure_ascii=False)) sys.exit(1) def create_resource(api_url, user, password, data): """创建资源""" response = requests.post( api_url, auth=(user, password), json=data, verify=False, timeout=30 ) if response.status_code == 201: result = response.json() return { 'success': True, 'id': result.get('id'), 'url': result.get('link', ''), 'title': result.get('title', {}).get('rendered', '') if isinstance(result.get('title'), dict) else result.get('title', ''), 'data': result } else: return { 'success': False, 'error': response.text, 'status_code': response.status_code } def update_resource(api_url, user, password, data): """更新资源""" response = requests.post( api_url, auth=(user, password), json=data, verify=False, timeout=30 ) if response.status_code == 200: result = response.json() return { 'success': True, 'id': result.get('id'), 'data': result } else: return { 'success': False, 'error': response.text, 'status_code': response.status_code } def get_resource(api_url, user, password): """获取资源""" response = requests.get( api_url, auth=(user, password), verify=False, timeout=30 ) if response.status_code == 200: return { 'success': True, 'data': response.json() } else: return { 'success': False, 'error': response.text, 'status_code': response.status_code } def list_resources(api_url, user, password, search=None, per_page=10, page=1): """列出资源""" params = { 'per_page': per_page, 'page': page } if search: params['search'] = search response = requests.get( api_url, auth=(user, password), params=params, verify=False, timeout=30 ) if response.status_code == 200: return { 'success': True, 'data': response.json(), 'total': response.headers.get('X-WP-Total', 0), 'total_pages': response.headers.get('X-WP-TotalPages', 0) } else: return { 'success': False, 'error': response.text, 'status_code': response.status_code } def delete_resource(api_url, user, password): """删除资源""" response = requests.delete( api_url, auth=(user, password), params={'force': True}, verify=False, timeout=30 ) if response.status_code == 200: return { 'success': True, 'data': response.json() } else: return { 'success': False, 'error': response.text, 'status_code': response.status_code } def upload_media(api_url, user, password, file_path, metadata=None): """上传媒体文件""" if not os.path.exists(file_path): return {'success': False, 'error': f'文件不存在:{file_path}'} filename = os.path.basename(file_path) content_type = get_content_type(file_path) headers = { 'Content-Disposition': f'attachment; filename="{filename}"', 'Content-Type': content_type } with open(file_path, 'rb') as f: file_data = f.read() response = requests.post( api_url, auth=(user, password), headers=headers, data=file_data, verify=False, timeout=60 ) if response.status_code == 201: result = response.json() media_id = result.get('id') # 更新元数据 if metadata and media_id: update_url = f'{api_url}/{media_id}' requests.post( update_url, auth=(user, password), json=metadata, verify=False, timeout=10 ) return { 'success': True, 'id': media_id, 'url': result.get('source_url', ''), 'filename': filename, 'data': result } else: return { 'success': False, 'error': response.text, 'status_code': response.status_code } def get_content_type(file_path): """获取文件 MIME 类型""" ext = os.path.splitext(file_path)[1].lower() content_types = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.webp': 'image/webp', '.svg': 'image/svg+xml', '.pdf': 'application/pdf', '.doc': 'application/msword', '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' } return content_types.get(ext, 'application/octet-stream') if __name__ == '__main__': main()