Files
haha/gui_worker.py

478 lines
24 KiB
Python
Raw Normal View History

2026-01-31 10:42:28 +08:00
# -*- coding: utf-8 -*-
"""后台工作线程:执行单个/批量上传任务,与 main.Pdd 交互。"""
import os
import time
from pathlib import Path
from PyQt5.QtCore import QThread, pyqtSignal
from main import Pdd
from loguru import logger
from gui_constants import get_default_folder_path, VIDEO_EXTENSIONS
class WorkerThread(QThread):
"""工作线程,用于执行自动化任务"""
finished = pyqtSignal(bool, str)
log_message = pyqtSignal(str)
progress = pyqtSignal(int)
item_result = pyqtSignal(object) # 单条发布结果dict
def __init__(self, config_data, is_batch_mode, prepared_files=None, parent=None):
super().__init__(parent)
self.config_data = config_data
self.is_batch_mode = is_batch_mode
self.prepared_files = prepared_files # 预查找的文件列表
self.is_running = True
def run(self):
sink_id = None
start_ts = time.time()
try:
sink_id = logger.add(lambda msg: self.log_message.emit(str(msg).strip()))
if self.is_batch_mode:
self.run_batch_mode()
else:
self.run_single_mode()
except Exception as e:
error_msg = f"执行失败: {str(e)}"
self.finished.emit(False, error_msg)
self.log_message.emit(error_msg)
logger.error(f"执行失败: {e}")
import traceback
traceback_str = traceback.format_exc()
logger.error(traceback_str)
self.log_message.emit(f"错误详情: {traceback_str}")
finally:
try:
cost = time.time() - start_ts
self.log_message.emit(f"[线程] 任务结束,耗时 {cost:.2f}s")
except Exception:
pass
if sink_id is not None:
try:
logger.remove(sink_id)
except Exception:
pass
def run_single_mode(self):
"""执行单个上传模式action方法"""
try:
config = self.config_data
self.log_message.emit(f"开始处理单个任务(逐个上传模式)...")
pdd = Pdd(
url=config.get('达人链接', ''),
user_id=config.get('多多id', ''),
time_start=config.get('定时发布', '') if config.get('定时发布') else None,
ht=config.get('话题', ''),
index=config.get('序号', ''),
title=config.get('标题', None)
)
if self.prepared_files:
self.log_message.emit(f"使用预查找的文件列表,共 {len(self.prepared_files)} 个文件")
self.progress.emit(30)
2026-02-07 21:23:42 +08:00
# 将预查找的文件按类型分类,直接传递给 action(),跳过文件夹扫描
prepared_video_files = [f['path'] for f in self.prepared_files
if f['path'].is_file() and any(
f['path'].suffix.lower() == ext for ext in VIDEO_EXTENSIONS)]
prepared_image_folders = [f['path'] for f in self.prepared_files
if f['path'].is_dir()]
self.log_message.emit(f"预查找文件: {len(prepared_video_files)} 个视频, {len(prepared_image_folders)} 个图片文件夹")
2026-01-31 10:42:28 +08:00
try:
2026-02-07 21:23:42 +08:00
result = pdd.action(
prepared_video_files=prepared_video_files,
prepared_image_folders=prepared_image_folders,
collect_all_videos=False
)
2026-01-31 10:42:28 +08:00
ok = bool(result.get("ok")) if isinstance(result, dict) else True
if not ok:
2026-02-01 01:21:41 +08:00
self.log_message.emit(f"失败原因: {result.get('reason') if isinstance(result, dict) else ''}")
2026-01-31 10:42:28 +08:00
if isinstance(result, dict):
self.item_result.emit({
"user_id": config.get("多多id", ""),
"index": config.get("序号", ""),
"name": config.get("标题", "") or "",
"ok": ok,
"reason": result.get("reason", ""),
})
self.progress.emit(100)
self.finished.emit(ok, f"单个任务执行完成ok={ok}")
except Exception as e:
error_msg = f"执行action方法失败: {str(e)}"
self.log_message.emit(error_msg)
logger.error(error_msg)
import traceback
traceback_str = traceback.format_exc()
logger.error(traceback_str)
self.log_message.emit(f"错误详情: {traceback_str}")
self.finished.emit(False, error_msg)
return
else:
folder_path = config.get('文件夹路径', '').strip()
if not folder_path:
folder_path = get_default_folder_path()
if not os.path.exists(folder_path):
error_msg = f"文件夹路径不存在: {folder_path}"
self.finished.emit(False, error_msg)
self.log_message.emit(error_msg)
return
self.log_message.emit(f"使用文件夹路径: {folder_path}")
self.progress.emit(30)
is_batch_mode = self.is_batch_mode
try:
result = pdd.action(folder_path=folder_path, collect_all_videos=is_batch_mode)
ok = bool(result.get("ok")) if isinstance(result, dict) else True
if not ok:
2026-02-01 01:21:41 +08:00
self.log_message.emit(f"失败原因: {result.get('reason') if isinstance(result, dict) else ''}")
2026-01-31 10:42:28 +08:00
if isinstance(result, dict):
self.item_result.emit({
"user_id": config.get("多多id", ""),
"index": config.get("序号", ""),
"name": config.get("标题", "") or "",
"ok": ok,
"reason": result.get("reason", ""),
})
except Exception as e:
error_msg = f"执行action方法失败: {str(e)}"
self.log_message.emit(error_msg)
logger.error(error_msg)
import traceback
traceback_str = traceback.format_exc()
logger.error(traceback_str)
self.log_message.emit(f"错误详情: {traceback_str}")
self.finished.emit(False, error_msg)
return
self.progress.emit(100)
self.finished.emit(ok, f"单个任务执行完成ok={ok}")
except Exception as e:
error_msg = f"执行失败: {str(e)}"
self.finished.emit(False, error_msg)
self.log_message.emit(error_msg)
logger.error(f"执行失败: {e}")
import traceback
traceback_str = traceback.format_exc()
logger.error(traceback_str)
self.log_message.emit(f"错误详情: {traceback_str}")
def run_batch_mode(self):
"""执行批量上传模式:先批量上传所有视频,然后逐个上传图片"""
try:
config = self.config_data
self.log_message.emit(f"开始处理批量上传任务...")
index = config.get('序号', '')
pdd = Pdd(
url=config.get('达人链接', ''),
user_id=config.get('多多id', ''),
time_start=config.get('定时发布', '') if config.get('定时发布') else None,
ht=config.get('话题', ''),
index=index,
title=config.get('标题', None)
)
if self.prepared_files:
self.log_message.emit(f"使用预查找的文件列表,共 {len(self.prepared_files)} 个文件")
self.progress.emit(30)
video_file_paths = [f for f in self.prepared_files if f['path'].is_file() and any(
f['path'].suffix.lower() == ext for ext in VIDEO_EXTENSIONS)]
image_folders = [f for f in self.prepared_files if f['path'].is_dir()]
if video_file_paths:
self.log_message.emit(f"找到 {len(video_file_paths)} 个视频文件,开始批量上传...")
self.progress.emit(30)
def on_video_done(result_item):
try:
payload = dict(result_item)
payload["user_id"] = config.get("多多id", "")
if "index" not in payload or not payload.get("index"):
for vid_file in video_file_paths:
if str(vid_file.get('path')) == str(payload.get('path')):
payload["index"] = vid_file.get("index", config.get("序号", ""))
break
if "index" not in payload or not payload.get("index"):
payload["index"] = config.get("序号", "")
self.item_result.emit(payload)
self.log_message.emit(f" {'' if payload.get('ok') else ''} {payload.get('name', '')} 已更新状态")
except Exception as e:
self.log_message.emit(f" 回调处理异常: {e}")
result = pdd.action1(folder_path=video_file_paths, on_item_done=on_video_done)
ok = bool(result.get("ok")) if isinstance(result, dict) else True
if not ok:
fails = [r for r in (result.get("results") or []) if not r.get("ok")]
self.log_message.emit(f"发布校验失败条数: {len(fails)}")
for r in fails[:20]:
self.log_message.emit(f"{r.get('name')} | {r.get('reason')}")
self.log_message.emit(f"批量上传视频完成,共处理 {len(video_file_paths)} 个视频")
else:
self.log_message.emit("未找到视频文件,跳过视频上传")
ok = True
if image_folders:
self.log_message.emit(f"找到 {len(image_folders)} 个图片文件夹,开始逐个上传...")
self.progress.emit(70)
for idx, image_folder_info in enumerate(image_folders):
if not self.is_running:
break
2026-02-07 21:23:42 +08:00
image_folder_path = image_folder_info['path']
self.log_message.emit(f"处理第 {idx + 1}/{len(image_folders)} 个图片文件夹: {image_folder_path}")
# 直接传递图片文件夹路径,跳过文件夹扫描
pdd.action(prepared_image_folders=[image_folder_path])
2026-01-31 10:42:28 +08:00
self.log_message.emit(f"图片文件夹 {idx + 1} 上传完成")
self.log_message.emit(f"所有图片文件夹上传完成,共处理 {len(image_folders)} 个文件夹")
else:
self.log_message.emit("未找到图片文件夹,跳过图片上传")
self.progress.emit(100)
self.finished.emit(ok,
f"批量任务执行完成,共处理 {len(video_file_paths)} 个视频和 {len(image_folders)} 个图片文件夹ok={ok}")
else:
folder_path = config.get('文件夹路径', '').strip()
if not folder_path:
folder_path = get_default_folder_path()
if not os.path.exists(folder_path):
self.finished.emit(False, f"文件夹路径不存在: {folder_path}")
return
self.log_message.emit("第一步:收集所有视频文件...")
video_file_paths = self.prepare_batch_files(folder_path, config)
self.log_message.emit("第二步:收集所有图片文件夹...")
image_folders = self.prepare_image_folders(folder_path, index)
if video_file_paths:
self.log_message.emit(f"找到 {len(video_file_paths)} 个视频文件,开始批量上传...")
self.progress.emit(30)
pdd.action1(folder_path=video_file_paths)
self.log_message.emit(f"批量上传视频完成,共处理 {len(video_file_paths)} 个视频")
else:
self.log_message.emit("未找到视频文件,跳过视频上传")
if image_folders:
self.log_message.emit(f"找到 {len(image_folders)} 个图片文件夹,开始逐个上传...")
self.progress.emit(70)
for idx, image_folder in enumerate(image_folders):
if not self.is_running:
break
self.log_message.emit(f"处理第 {idx + 1}/{len(image_folders)} 个图片文件夹: {image_folder}")
2026-02-07 21:23:42 +08:00
# 直接传递图片文件夹路径,跳过文件夹扫描
pdd.action(prepared_image_folders=[Path(image_folder)])
2026-01-31 10:42:28 +08:00
self.log_message.emit(f"图片文件夹 {idx + 1} 上传完成")
self.log_message.emit(f"所有图片文件夹上传完成,共处理 {len(image_folders)} 个文件夹")
else:
self.log_message.emit("未找到图片文件夹,跳过图片上传")
self.progress.emit(100)
total_count = len(video_file_paths) + len(image_folders)
self.finished.emit(True,
f"批量任务执行完成,共处理 {len(video_file_paths)} 个视频和 {len(image_folders)} 个图片文件夹")
except Exception as e:
self.finished.emit(False, f"批量执行失败: {str(e)}")
logger.error(f"批量执行失败: {e}")
def prepare_image_folders(self, folder_path, index):
"""收集所有匹配序号的图片文件夹与main.py中action方法的逻辑一致"""
image_folders = []
try:
if not folder_path or not os.path.exists(folder_path):
logger.warning(f"文件夹路径无效或不存在: {folder_path}")
return image_folders
if not os.access(folder_path, os.R_OK):
logger.error(f"没有权限读取文件夹: {folder_path}")
return image_folders
try:
subdirs = os.listdir(folder_path)
except PermissionError:
logger.error(f"没有权限访问文件夹: {folder_path}")
return image_folders
except Exception as e:
logger.error(f"读取文件夹失败: {e}")
return image_folders
for file in subdirs:
try:
file_path = os.path.join(folder_path, file)
if not os.path.isdir(file_path):
continue
try:
files = os.listdir(file_path)
except (PermissionError, Exception) as e:
logger.warning(f"读取子文件夹失败: {file_path}, 错误: {e}")
continue
for file_name in files:
try:
if file_name.startswith('.'):
continue
file_names = file_name.split("-")
if len(file_names) > 0 and file_names[0] == str(index):
path = Path(os.path.join(file_path, file_name))
if path.exists() and path.is_dir():
image_folders.append(str(path))
except Exception as e:
logger.warning(f"处理文件 {file_name} 时出错: {e}")
continue
except Exception as e:
logger.warning(f"处理子文件夹 {file} 时出错: {e}")
continue
except Exception as e:
logger.error(f"收集图片文件夹失败: {e}", exc_info=True)
return image_folders
def count_videos_in_folder(self, folder_path, index):
"""统计文件夹中匹配序号的视频文件数量与main.py逻辑一致"""
count = 0
try:
for file in os.listdir(folder_path):
file_path = os.path.join(folder_path, file)
if not os.path.isdir(file_path):
continue
files = os.listdir(file_path)
for file_name in files:
if ".mp4" in file_name:
file_names = file_name.split("-")
if len(file_names) > 0 and file_names[0] == str(index):
path = Path(os.path.join(file_path, file_name))
if path.is_file():
count += 1
else:
for sub_file in os.listdir(path):
sub_path = Path(os.path.join(path, sub_file))
if sub_path.is_file():
count += 1
except Exception as e:
logger.error(f"统计视频文件失败: {e}")
return count
def prepare_batch_files(self, folder_path, config):
"""准备批量上传的文件列表与main.py中的action1方法逻辑一致"""
file_paths = []
if not config:
logger.error("配置为空,无法准备批量上传文件")
return file_paths
index = str(config.get('序号', '')).strip()
if not index:
logger.warning("序号为空,无法准备批量上传文件")
return file_paths
if not folder_path or not os.path.exists(folder_path):
logger.error(f"文件夹路径无效或不存在: {folder_path}")
return file_paths
if not os.access(folder_path, os.R_OK):
logger.error(f"没有权限读取文件夹: {folder_path}")
return file_paths
logger.info("=" * 50)
logger.info("开始准备批量上传文件列表...")
logger.info(f"文件夹路径: {folder_path}")
logger.info(f"查找序号: {index}")
try:
try:
all_items = os.listdir(folder_path)
subdirs = [f for f in all_items if os.path.isdir(os.path.join(folder_path, f))]
except PermissionError:
logger.error(f"没有权限访问文件夹: {folder_path}")
return file_paths
except Exception as e:
logger.error(f"读取文件夹失败: {e}")
return file_paths
logger.info(f"在最外层文件夹下找到 {len(subdirs)} 个子文件夹多多ID文件夹")
for subdir_name in subdirs:
file_path = os.path.join(folder_path, subdir_name)
logger.info(f" 正在扫描子文件夹: {subdir_name}")
if not os.path.isdir(file_path):
logger.info(f" 跳过(不是文件夹)")
continue
files = os.listdir(file_path)
logger.info(f" 该文件夹下有 {len(files)} 个项目")
for file_name in files:
logger.info(f" 检查项目: {file_name}")
if ".mp4" in file_name:
logger.info(f" ✓ 是视频文件(包含.mp4")
file_names = file_name.split("-")
logger.info(f" 文件名分割结果: {file_names}")
if len(file_names) > 0 and file_names[0] == str(index):
logger.info(f" ✓ 序号匹配!序号: {file_names[0]}, 目标序号: {index}")
path = Path(os.path.join(file_path, file_name))
if path.is_file():
logger.info(f" ✓ 是文件,添加到列表: {path.name}")
file_paths.append({
"url": config.get('达人链接', ''),
"user_id": config.get('多多id', ''),
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
"ht": config.get('话题', ''),
"index": str(index),
"path": path
})
logger.info(f" 当前视频总数: {len(file_paths)}")
else:
logger.info(f" 是文件夹,扫描其中的文件...")
try:
sub_files = os.listdir(path)
logger.info(f" 文件夹中有 {len(sub_files)} 个文件")
for sub_file in sub_files:
sub_path = Path(os.path.join(path, sub_file))
if sub_path.is_file():
logger.info(f" ✓ 添加文件: {sub_file}")
file_paths.append({
"url": config.get('达人链接', ''),
"user_id": config.get('多多id', ''),
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
"ht": config.get('话题', ''),
"index": str(index),
"path": sub_path
})
logger.info(f" 当前视频总数: {len(file_paths)}")
except Exception as e:
logger.error(f" 扫描子文件夹失败: {e}")
else:
if len(file_names) > 0:
logger.info(f" ✗ 序号不匹配: {file_names[0]} != {index}")
else:
logger.info(f" ✗ 文件名格式不正确,无法提取序号")
else:
logger.info(f" ✗ 不是视频文件(不包含.mp4跳过")
except Exception as e:
logger.error(f"准备批量文件失败: {e}")
import traceback
traceback.print_exc()
logger.info("=" * 50)
logger.info(f"文件收集完成!共找到 {len(file_paths)} 个视频文件")
if file_paths:
logger.info("视频文件列表:")
for idx, video_info in enumerate(file_paths, 1):
logger.info(f" {idx}. {video_info['path'].name} ({video_info['path']})")
return file_paths
def stop(self):
"""停止执行"""
self.is_running = False
def requestInterruption(self):
"""请求中断线程重写QThread方法"""
super().requestInterruption()
self.is_running = False