328 lines
11 KiB
Python
328 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
WordPress 发布系统 - 图片处理模块
|
||
处理图片保存、上传到 WordPress 媒体库
|
||
"""
|
||
|
||
import os
|
||
import base64
|
||
import requests
|
||
from io import BytesIO
|
||
from PIL import Image
|
||
|
||
from modules.wp_logger import get_publish_logger, get_debug_logger
|
||
|
||
# 基础目录
|
||
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
TEMP_DIR = os.path.join(BASE_DIR, 'temp')
|
||
os.makedirs(TEMP_DIR, exist_ok=True)
|
||
|
||
|
||
class ImageHandler:
|
||
"""图片处理器"""
|
||
|
||
def __init__(self, wp_url, wp_user, wp_password):
|
||
"""
|
||
初始化图片处理器
|
||
|
||
Args:
|
||
wp_url: WordPress 站点 URL
|
||
wp_user: WordPress 用户名
|
||
wp_password: WordPress 应用密码
|
||
"""
|
||
self.wp_url = wp_url.rstrip('/')
|
||
self.wp_user = wp_user
|
||
self.wp_password = wp_password
|
||
self.uploaded_images = {} # 记录已上传的图片
|
||
|
||
self.pl = get_publish_logger()
|
||
self.dl = get_debug_logger()
|
||
|
||
def save_temp_image(self, image_data, filename):
|
||
"""
|
||
保存图片到临时目录
|
||
|
||
Args:
|
||
image_data: 图片二进制数据
|
||
filename: 文件名
|
||
|
||
Returns:
|
||
str: 临时文件路径
|
||
"""
|
||
temp_path = os.path.join(TEMP_DIR, filename)
|
||
|
||
try:
|
||
with open(temp_path, 'wb') as f:
|
||
f.write(image_data)
|
||
|
||
self.dl.debug(f"图片已保存:{temp_path}")
|
||
return temp_path
|
||
|
||
except Exception as e:
|
||
self.dl.error(f"保存图片失败:{str(e)}")
|
||
raise
|
||
|
||
def upload_image(self, image_path, title=None, alt_text=None):
|
||
"""
|
||
上传图片到 WordPress 媒体库
|
||
|
||
Args:
|
||
image_path: 图片文件路径
|
||
title: 图片标题
|
||
alt_text: 图片 alt 文本
|
||
|
||
Returns:
|
||
dict: 上传结果,包含 url, id, filename
|
||
"""
|
||
if not os.path.exists(image_path):
|
||
self.pl.error(f"图片文件不存在:{image_path}")
|
||
raise FileNotFoundError(f"图片文件不存在:{image_path}")
|
||
|
||
filename = os.path.basename(image_path)
|
||
if not title:
|
||
title = os.path.splitext(filename)[0]
|
||
if not alt_text:
|
||
alt_text = title
|
||
|
||
self.pl.info(f"📤 上传图片:{filename}")
|
||
self.dl.log_step("上传图片", f"文件:{filename}")
|
||
|
||
try:
|
||
# 读取图片文件
|
||
with open(image_path, 'rb') as f:
|
||
image_content = f.read()
|
||
|
||
self.dl.debug(f"图片文件大小:{len(image_content)} 字节")
|
||
|
||
# 获取 content_type
|
||
content_type = self._get_content_type(image_path)
|
||
|
||
# 构建请求头 - 使用正确的 WordPress 媒体上传格式
|
||
headers = {
|
||
'Content-Disposition': f'attachment; filename="{filename}"',
|
||
'Content-Type': content_type,
|
||
'Content-Transfer-Encoding': 'binary'
|
||
}
|
||
|
||
self.dl.debug(f"请求头:Content-Type={content_type}")
|
||
self.dl.debug(f"请求头:Content-Disposition={headers['Content-Disposition']}")
|
||
|
||
# 上传图片
|
||
response = requests.post(
|
||
f'{self.wp_url}/wp-json/wp/v2/media',
|
||
auth=(self.wp_user, self.wp_password),
|
||
headers=headers,
|
||
data=image_content,
|
||
verify=False, # 跳过 SSL 验证
|
||
timeout=30
|
||
)
|
||
|
||
self.dl.debug(f"响应状态码:{response.status_code}")
|
||
self.dl.debug(f"响应内容:{response.text[:200]}")
|
||
|
||
if response.status_code == 201:
|
||
result = response.json()
|
||
image_url = result.get('source_url', '')
|
||
image_id = result.get('id', 0)
|
||
|
||
# 更新图片标题和 alt
|
||
self._update_image_meta(image_id, title, alt_text)
|
||
|
||
self.pl.success(f"图片上传成功 - ID: {image_id}, URL: {image_url}")
|
||
self.dl.log_result("上传结果", {
|
||
'id': image_id,
|
||
'url': image_url,
|
||
'filename': filename
|
||
})
|
||
|
||
# 记录已上传的图片
|
||
self.uploaded_images[filename] = {
|
||
'id': image_id,
|
||
'url': image_url,
|
||
'title': title
|
||
}
|
||
|
||
return {
|
||
'id': image_id,
|
||
'url': image_url,
|
||
'filename': filename,
|
||
'title': title
|
||
}
|
||
else:
|
||
error_msg = response.text
|
||
self.pl.error(f"图片上传失败 - 状态码:{response.status_code}")
|
||
self.pl.error(f"错误详情:{error_msg}")
|
||
self.dl.error(f"上传失败:{error_msg}")
|
||
raise Exception(f"图片上传失败:{error_msg}")
|
||
|
||
except requests.exceptions.Timeout:
|
||
self.pl.error("图片上传请求超时")
|
||
self.dl.error("图片上传请求超时")
|
||
raise Exception("图片上传请求超时")
|
||
except requests.exceptions.ConnectionError as e:
|
||
self.pl.error(f"图片上传连接错误:{str(e)}")
|
||
self.dl.error(f"图片上传连接错误:{str(e)}")
|
||
raise Exception(f"图片上传连接错误:{str(e)}")
|
||
except Exception as e:
|
||
self.pl.error(f"图片上传异常:{str(e)}")
|
||
self.dl.error(f"上传异常:{str(e)}", exc_info=True)
|
||
raise
|
||
|
||
def upload_images_batch(self, images):
|
||
"""
|
||
批量上传图片
|
||
|
||
Args:
|
||
images: 图片列表,每个图片包含 data, filename
|
||
|
||
Returns:
|
||
list: 上传结果列表
|
||
"""
|
||
results = []
|
||
|
||
for i, image in enumerate(images):
|
||
try:
|
||
# 保存图片到临时目录
|
||
temp_path = self.save_temp_image(image['data'], image['filename'])
|
||
|
||
# 上传图片
|
||
result = self.upload_image(
|
||
temp_path,
|
||
title=f"图片 {i+1}",
|
||
alt_text=f"文章配图 {i+1}"
|
||
)
|
||
|
||
results.append(result)
|
||
|
||
# 清理临时文件
|
||
if os.path.exists(temp_path):
|
||
os.remove(temp_path)
|
||
|
||
except Exception as e:
|
||
self.pl.error(f"图片 {i+1} 上传失败:{str(e)}")
|
||
results.append({'error': str(e), 'filename': image['filename']})
|
||
|
||
return results
|
||
|
||
def generate_image_html(self, image_url, alt_text="", width=None):
|
||
"""
|
||
生成图片 HTML 标签
|
||
|
||
Args:
|
||
image_url: 图片 URL
|
||
alt_text: alt 文本
|
||
width: 图片宽度(可选)
|
||
|
||
Returns:
|
||
str: HTML img 标签
|
||
"""
|
||
style = "max-width: 100%; height: auto; display: block; margin: 16px auto;"
|
||
if width:
|
||
style += f" max-width: {width}px;"
|
||
|
||
html = f'<img src="{image_url}" alt="{alt_text}" style="{style}" loading="lazy">'
|
||
return f'<figure style="text-align: center;">{html}</figure>'
|
||
|
||
def generate_featured_image_shortcode(self, image_url, alt_text=""):
|
||
"""
|
||
生成特色图片短代码
|
||
|
||
Args:
|
||
image_url: 图片 URL
|
||
alt_text: alt 文本
|
||
|
||
Returns:
|
||
str: 特色图片 HTML
|
||
"""
|
||
return self.generate_image_html(image_url, alt_text)
|
||
|
||
def _update_image_meta(self, image_id, title, alt_text):
|
||
"""更新图片元数据"""
|
||
try:
|
||
response = requests.post(
|
||
f'{self.wp_url}/wp-json/wp/v2/media/{image_id}',
|
||
auth=(self.wp_user, self.wp_password),
|
||
json={
|
||
'title': title,
|
||
'alt_text': alt_text
|
||
},
|
||
verify=False,
|
||
timeout=10
|
||
)
|
||
if response.status_code == 200:
|
||
self.dl.debug(f"图片元数据更新成功:ID {image_id}")
|
||
except Exception as e:
|
||
self.dl.warning(f"更新图片元数据失败:{str(e)}")
|
||
|
||
def _get_content_type(self, file_path):
|
||
"""获取图片 content_type"""
|
||
ext = os.path.splitext(file_path)[1].lower()
|
||
content_type_map = {
|
||
'.jpg': 'image/jpeg',
|
||
'.jpeg': 'image/jpeg',
|
||
'.png': 'image/png',
|
||
'.gif': 'image/gif',
|
||
'.bmp': 'image/bmp',
|
||
'.webp': 'image/webp',
|
||
'.svg': 'image/svg+xml'
|
||
}
|
||
return content_type_map.get(ext, 'image/jpeg')
|
||
|
||
def optimize_image(self, image_path, max_width=1200, quality=85):
|
||
"""
|
||
优化图片大小
|
||
|
||
Args:
|
||
image_path: 图片路径
|
||
max_width: 最大宽度
|
||
quality: 质量 (1-100)
|
||
|
||
Returns:
|
||
str: 优化后的图片路径
|
||
"""
|
||
try:
|
||
img = Image.open(image_path)
|
||
|
||
# 获取原始尺寸
|
||
width, height = img.size
|
||
|
||
# 如果宽度超过限制,等比例缩放
|
||
if width > max_width:
|
||
ratio = max_width / width
|
||
new_height = int(height * ratio)
|
||
img = img.resize((max_width, new_height), Image.LANCZOS)
|
||
self.dl.debug(f"图片已缩放:{width}x{height} -> {max_width}x{new_height}")
|
||
|
||
# 保存优化后的图片
|
||
optimized_path = image_path.replace('.', '_optimized.')
|
||
img.save(optimized_path, quality=quality, optimize=True)
|
||
|
||
return optimized_path
|
||
|
||
except Exception as e:
|
||
self.dl.warning(f"图片优化失败:{str(e)}")
|
||
return image_path
|
||
|
||
|
||
def create_image_handler(wp_url, wp_user, wp_password):
|
||
"""创建图片处理器实例"""
|
||
return ImageHandler(wp_url, wp_user, wp_password)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
import sys
|
||
|
||
if len(sys.argv) < 5:
|
||
print("用法:python wp_image_handler.py <wp_url> <wp_user> <wp_password> <image_path>")
|
||
sys.exit(1)
|
||
|
||
wp_url = sys.argv[1]
|
||
wp_user = sys.argv[2]
|
||
wp_password = sys.argv[3]
|
||
image_path = sys.argv[4]
|
||
|
||
handler = create_image_handler(wp_url, wp_user, wp_password)
|
||
result = handler.upload_image(image_path)
|
||
print(f"上传结果:{result}")
|