Files
haha/gui_worker.py
2026-02-09 00:03:22 +08:00

478 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""后台工作线程:执行单个/批量上传任务,与 main.Pdd 交互。"""
import os
import time
from pathlib import Path
from PyQt5.QtCore import QThread, pyqtSignal
from main import Pdd
from loguru import logger
from gui_constants import get_default_folder_path, VIDEO_EXTENSIONS
class WorkerThread(QThread):
"""工作线程,用于执行自动化任务"""
finished = pyqtSignal(bool, str)
log_message = pyqtSignal(str)
progress = pyqtSignal(int)
item_result = pyqtSignal(object) # 单条发布结果dict
def __init__(self, config_data, is_batch_mode, prepared_files=None, parent=None):
super().__init__(parent)
self.config_data = config_data
self.is_batch_mode = is_batch_mode
self.prepared_files = prepared_files # 预查找的文件列表
self.is_running = True
def run(self):
sink_id = None
start_ts = time.time()
try:
sink_id = logger.add(lambda msg: self.log_message.emit(str(msg).strip()))
if self.is_batch_mode:
self.run_batch_mode()
else:
self.run_single_mode()
except Exception as e:
error_msg = f"执行失败: {str(e)}"
self.finished.emit(False, error_msg)
self.log_message.emit(error_msg)
logger.error(f"执行失败: {e}")
import traceback
traceback_str = traceback.format_exc()
logger.error(traceback_str)
self.log_message.emit(f"错误详情: {traceback_str}")
finally:
try:
cost = time.time() - start_ts
self.log_message.emit(f"[线程] 任务结束,耗时 {cost:.2f}s")
except Exception:
pass
if sink_id is not None:
try:
logger.remove(sink_id)
except Exception:
pass
def run_single_mode(self):
"""执行单个上传模式action方法"""
try:
config = self.config_data
self.log_message.emit(f"开始处理单个任务(逐个上传模式)...")
pdd = Pdd(
url=config.get('达人链接', ''),
user_id=config.get('多多id', ''),
time_start=config.get('定时发布', '') if config.get('定时发布') else None,
ht=config.get('话题', ''),
index=config.get('序号', ''),
title=config.get('标题', None)
)
if self.prepared_files:
self.log_message.emit(f"使用预查找的文件列表,共 {len(self.prepared_files)} 个文件")
self.progress.emit(30)
# 将预查找的文件按类型分类,直接传递给 action(),跳过文件夹扫描
prepared_video_files = [f['path'] for f in self.prepared_files
if f['path'].is_file() and any(
f['path'].suffix.lower() == ext for ext in VIDEO_EXTENSIONS)]
prepared_image_folders = [f['path'] for f in self.prepared_files
if f['path'].is_dir()]
self.log_message.emit(f"预查找文件: {len(prepared_video_files)} 个视频, {len(prepared_image_folders)} 个图片文件夹")
try:
result = pdd.action(
prepared_video_files=prepared_video_files,
prepared_image_folders=prepared_image_folders,
collect_all_videos=False
)
ok = bool(result.get("ok")) if isinstance(result, dict) else False
reason = result.get("reason", "未知原因") if isinstance(result, dict) else "未返回结果"
if not ok:
self.log_message.emit(f"失败原因: {reason}")
self.item_result.emit({
"user_id": config.get("多多id", ""),
"index": config.get("序号", ""),
"name": config.get("标题", "") or "",
"ok": ok,
"reason": reason,
})
self.progress.emit(100)
self.finished.emit(ok, f"单个任务执行完成ok={ok}")
except Exception as e:
error_msg = f"执行action方法失败: {str(e)}"
self.log_message.emit(error_msg)
logger.error(error_msg)
import traceback
traceback_str = traceback.format_exc()
logger.error(traceback_str)
self.log_message.emit(f"错误详情: {traceback_str}")
self.finished.emit(False, error_msg)
return
else:
folder_path = config.get('文件夹路径', '').strip()
if not folder_path:
folder_path = get_default_folder_path()
if not os.path.exists(folder_path):
error_msg = f"文件夹路径不存在: {folder_path}"
self.finished.emit(False, error_msg)
self.log_message.emit(error_msg)
return
self.log_message.emit(f"使用文件夹路径: {folder_path}")
self.progress.emit(30)
is_batch_mode = self.is_batch_mode
try:
result = pdd.action(folder_path=folder_path, collect_all_videos=is_batch_mode)
ok = bool(result.get("ok")) if isinstance(result, dict) else False
reason = result.get("reason", "未知原因") if isinstance(result, dict) else "未返回结果"
if not ok:
self.log_message.emit(f"失败原因: {reason}")
self.item_result.emit({
"user_id": config.get("多多id", ""),
"index": config.get("序号", ""),
"name": config.get("标题", "") or "",
"ok": ok,
"reason": reason,
})
except Exception as e:
error_msg = f"执行action方法失败: {str(e)}"
self.log_message.emit(error_msg)
logger.error(error_msg)
import traceback
traceback_str = traceback.format_exc()
logger.error(traceback_str)
self.log_message.emit(f"错误详情: {traceback_str}")
self.finished.emit(False, error_msg)
return
self.progress.emit(100)
self.finished.emit(ok, f"单个任务执行完成ok={ok}")
except Exception as e:
error_msg = f"执行失败: {str(e)}"
self.finished.emit(False, error_msg)
self.log_message.emit(error_msg)
logger.error(f"执行失败: {e}")
import traceback
traceback_str = traceback.format_exc()
logger.error(traceback_str)
self.log_message.emit(f"错误详情: {traceback_str}")
def run_batch_mode(self):
"""执行批量上传模式:先批量上传所有视频,然后逐个上传图片"""
try:
config = self.config_data
self.log_message.emit(f"开始处理批量上传任务...")
index = config.get('序号', '')
pdd = Pdd(
url=config.get('达人链接', ''),
user_id=config.get('多多id', ''),
time_start=config.get('定时发布', '') if config.get('定时发布') else None,
ht=config.get('话题', ''),
index=index,
title=config.get('标题', None)
)
if self.prepared_files:
self.log_message.emit(f"使用预查找的文件列表,共 {len(self.prepared_files)} 个文件")
self.progress.emit(30)
video_file_paths = [f for f in self.prepared_files if f['path'].is_file() and any(
f['path'].suffix.lower() == ext for ext in VIDEO_EXTENSIONS)]
image_folders = [f for f in self.prepared_files if f['path'].is_dir()]
if video_file_paths:
self.log_message.emit(f"找到 {len(video_file_paths)} 个视频文件,开始批量上传...")
self.progress.emit(30)
def on_video_done(result_item):
try:
payload = dict(result_item)
payload["user_id"] = config.get("多多id", "")
if "index" not in payload or not payload.get("index"):
for vid_file in video_file_paths:
if str(vid_file.get('path')) == str(payload.get('path')):
payload["index"] = vid_file.get("index", config.get("序号", ""))
break
if "index" not in payload or not payload.get("index"):
payload["index"] = config.get("序号", "")
self.item_result.emit(payload)
self.log_message.emit(f" {'' if payload.get('ok') else ''} {payload.get('name', '')} 已更新状态")
except Exception as e:
self.log_message.emit(f" 回调处理异常: {e}")
result = pdd.action1(folder_path=video_file_paths, on_item_done=on_video_done)
ok = bool(result.get("ok")) if isinstance(result, dict) else False
if not ok:
fails = [r for r in (result.get("results") or []) if not r.get("ok")]
self.log_message.emit(f"发布校验失败条数: {len(fails)}")
for r in fails[:20]:
self.log_message.emit(f"{r.get('name')} | {r.get('reason')}")
self.log_message.emit(f"批量上传视频完成,共处理 {len(video_file_paths)} 个视频")
else:
self.log_message.emit("未找到视频文件,跳过视频上传")
ok = True
if image_folders:
self.log_message.emit(f"找到 {len(image_folders)} 个图片文件夹,开始逐个上传...")
self.progress.emit(70)
for idx, image_folder_info in enumerate(image_folders):
if not self.is_running:
break
image_folder_path = image_folder_info['path']
self.log_message.emit(f"处理第 {idx + 1}/{len(image_folders)} 个图片文件夹: {image_folder_path}")
# 直接传递图片文件夹路径,跳过文件夹扫描
pdd.action(prepared_image_folders=[image_folder_path])
self.log_message.emit(f"图片文件夹 {idx + 1} 上传完成")
self.log_message.emit(f"所有图片文件夹上传完成,共处理 {len(image_folders)} 个文件夹")
else:
self.log_message.emit("未找到图片文件夹,跳过图片上传")
self.progress.emit(100)
self.finished.emit(ok,
f"批量任务执行完成,共处理 {len(video_file_paths)} 个视频和 {len(image_folders)} 个图片文件夹ok={ok}")
else:
folder_path = config.get('文件夹路径', '').strip()
if not folder_path:
folder_path = get_default_folder_path()
if not os.path.exists(folder_path):
self.finished.emit(False, f"文件夹路径不存在: {folder_path}")
return
self.log_message.emit("第一步:收集所有视频文件...")
video_file_paths = self.prepare_batch_files(folder_path, config)
self.log_message.emit("第二步:收集所有图片文件夹...")
image_folders = self.prepare_image_folders(folder_path, index)
if video_file_paths:
self.log_message.emit(f"找到 {len(video_file_paths)} 个视频文件,开始批量上传...")
self.progress.emit(30)
pdd.action1(folder_path=video_file_paths)
self.log_message.emit(f"批量上传视频完成,共处理 {len(video_file_paths)} 个视频")
else:
self.log_message.emit("未找到视频文件,跳过视频上传")
if image_folders:
self.log_message.emit(f"找到 {len(image_folders)} 个图片文件夹,开始逐个上传...")
self.progress.emit(70)
for idx, image_folder in enumerate(image_folders):
if not self.is_running:
break
self.log_message.emit(f"处理第 {idx + 1}/{len(image_folders)} 个图片文件夹: {image_folder}")
# 直接传递图片文件夹路径,跳过文件夹扫描
pdd.action(prepared_image_folders=[Path(image_folder)])
self.log_message.emit(f"图片文件夹 {idx + 1} 上传完成")
self.log_message.emit(f"所有图片文件夹上传完成,共处理 {len(image_folders)} 个文件夹")
else:
self.log_message.emit("未找到图片文件夹,跳过图片上传")
self.progress.emit(100)
total_count = len(video_file_paths) + len(image_folders)
self.finished.emit(True,
f"批量任务执行完成,共处理 {len(video_file_paths)} 个视频和 {len(image_folders)} 个图片文件夹")
except Exception as e:
self.finished.emit(False, f"批量执行失败: {str(e)}")
logger.error(f"批量执行失败: {e}")
def prepare_image_folders(self, folder_path, index):
"""收集所有匹配序号的图片文件夹与main.py中action方法的逻辑一致"""
image_folders = []
try:
if not folder_path or not os.path.exists(folder_path):
logger.warning(f"文件夹路径无效或不存在: {folder_path}")
return image_folders
if not os.access(folder_path, os.R_OK):
logger.error(f"没有权限读取文件夹: {folder_path}")
return image_folders
try:
subdirs = os.listdir(folder_path)
except PermissionError:
logger.error(f"没有权限访问文件夹: {folder_path}")
return image_folders
except Exception as e:
logger.error(f"读取文件夹失败: {e}")
return image_folders
for file in subdirs:
try:
file_path = os.path.join(folder_path, file)
if not os.path.isdir(file_path):
continue
try:
files = os.listdir(file_path)
except (PermissionError, Exception) as e:
logger.warning(f"读取子文件夹失败: {file_path}, 错误: {e}")
continue
for file_name in files:
try:
if file_name.startswith('.'):
continue
file_names = file_name.split("-")
if len(file_names) > 0 and file_names[0] == str(index):
path = Path(os.path.join(file_path, file_name))
if path.exists() and path.is_dir():
image_folders.append(str(path))
except Exception as e:
logger.warning(f"处理文件 {file_name} 时出错: {e}")
continue
except Exception as e:
logger.warning(f"处理子文件夹 {file} 时出错: {e}")
continue
except Exception as e:
logger.error(f"收集图片文件夹失败: {e}", exc_info=True)
return image_folders
def count_videos_in_folder(self, folder_path, index):
"""统计文件夹中匹配序号的视频文件数量与main.py逻辑一致"""
count = 0
try:
for file in os.listdir(folder_path):
file_path = os.path.join(folder_path, file)
if not os.path.isdir(file_path):
continue
files = os.listdir(file_path)
for file_name in files:
if ".mp4" in file_name:
file_names = file_name.split("-")
if len(file_names) > 0 and file_names[0] == str(index):
path = Path(os.path.join(file_path, file_name))
if path.is_file():
count += 1
else:
for sub_file in os.listdir(path):
sub_path = Path(os.path.join(path, sub_file))
if sub_path.is_file():
count += 1
except Exception as e:
logger.error(f"统计视频文件失败: {e}")
return count
def prepare_batch_files(self, folder_path, config):
"""准备批量上传的文件列表与main.py中的action1方法逻辑一致"""
file_paths = []
if not config:
logger.error("配置为空,无法准备批量上传文件")
return file_paths
index = str(config.get('序号', '')).strip()
if not index:
logger.warning("序号为空,无法准备批量上传文件")
return file_paths
if not folder_path or not os.path.exists(folder_path):
logger.error(f"文件夹路径无效或不存在: {folder_path}")
return file_paths
if not os.access(folder_path, os.R_OK):
logger.error(f"没有权限读取文件夹: {folder_path}")
return file_paths
logger.info("=" * 50)
logger.info("开始准备批量上传文件列表...")
logger.info(f"文件夹路径: {folder_path}")
logger.info(f"查找序号: {index}")
try:
try:
all_items = os.listdir(folder_path)
subdirs = [f for f in all_items if os.path.isdir(os.path.join(folder_path, f))]
except PermissionError:
logger.error(f"没有权限访问文件夹: {folder_path}")
return file_paths
except Exception as e:
logger.error(f"读取文件夹失败: {e}")
return file_paths
logger.info(f"在最外层文件夹下找到 {len(subdirs)} 个子文件夹多多ID文件夹")
for subdir_name in subdirs:
file_path = os.path.join(folder_path, subdir_name)
logger.info(f" 正在扫描子文件夹: {subdir_name}")
if not os.path.isdir(file_path):
logger.info(f" 跳过(不是文件夹)")
continue
files = os.listdir(file_path)
logger.info(f" 该文件夹下有 {len(files)} 个项目")
for file_name in files:
logger.info(f" 检查项目: {file_name}")
if ".mp4" in file_name:
logger.info(f" ✓ 是视频文件(包含.mp4")
file_names = file_name.split("-")
logger.info(f" 文件名分割结果: {file_names}")
if len(file_names) > 0 and file_names[0] == str(index):
logger.info(f" ✓ 序号匹配!序号: {file_names[0]}, 目标序号: {index}")
path = Path(os.path.join(file_path, file_name))
if path.is_file():
logger.info(f" ✓ 是文件,添加到列表: {path.name}")
file_paths.append({
"url": config.get('达人链接', ''),
"user_id": config.get('多多id', ''),
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
"ht": config.get('话题', ''),
"index": str(index),
"path": path
})
logger.info(f" 当前视频总数: {len(file_paths)}")
else:
logger.info(f" 是文件夹,扫描其中的文件...")
try:
sub_files = os.listdir(path)
logger.info(f" 文件夹中有 {len(sub_files)} 个文件")
for sub_file in sub_files:
sub_path = Path(os.path.join(path, sub_file))
if sub_path.is_file():
logger.info(f" ✓ 添加文件: {sub_file}")
file_paths.append({
"url": config.get('达人链接', ''),
"user_id": config.get('多多id', ''),
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
"ht": config.get('话题', ''),
"index": str(index),
"path": sub_path
})
logger.info(f" 当前视频总数: {len(file_paths)}")
except Exception as e:
logger.error(f" 扫描子文件夹失败: {e}")
else:
if len(file_names) > 0:
logger.info(f" ✗ 序号不匹配: {file_names[0]} != {index}")
else:
logger.info(f" ✗ 文件名格式不正确,无法提取序号")
else:
logger.info(f" ✗ 不是视频文件(不包含.mp4跳过")
except Exception as e:
logger.error(f"准备批量文件失败: {e}")
import traceback
traceback.print_exc()
logger.info("=" * 50)
logger.info(f"文件收集完成!共找到 {len(file_paths)} 个视频文件")
if file_paths:
logger.info("视频文件列表:")
for idx, video_info in enumerate(file_paths, 1):
logger.info(f" {idx}. {video_info['path'].name} ({video_info['path']})")
return file_paths
def stop(self):
"""停止执行"""
self.is_running = False
def requestInterruption(self):
"""请求中断线程重写QThread方法"""
super().requestInterruption()
self.is_running = False