# -*- coding: utf-8 -*- """后台工作线程:执行单个/批量上传任务,与 main.Pdd 交互。""" import os import time from pathlib import Path from PyQt5.QtCore import QThread, pyqtSignal from main import Pdd from loguru import logger from gui_constants import get_default_folder_path, VIDEO_EXTENSIONS class WorkerThread(QThread): """工作线程,用于执行自动化任务""" finished = pyqtSignal(bool, str) log_message = pyqtSignal(str) progress = pyqtSignal(int) item_result = pyqtSignal(object) # 单条发布结果(dict) def __init__(self, config_data, is_batch_mode, prepared_files=None, parent=None): super().__init__(parent) self.config_data = config_data self.is_batch_mode = is_batch_mode self.prepared_files = prepared_files # 预查找的文件列表 self.is_running = True def run(self): sink_id = None start_ts = time.time() try: sink_id = logger.add(lambda msg: self.log_message.emit(str(msg).strip())) if self.is_batch_mode: self.run_batch_mode() else: self.run_single_mode() except Exception as e: error_msg = f"执行失败: {str(e)}" self.finished.emit(False, error_msg) self.log_message.emit(error_msg) logger.error(f"执行失败: {e}") import traceback traceback_str = traceback.format_exc() logger.error(traceback_str) self.log_message.emit(f"错误详情: {traceback_str}") finally: try: cost = time.time() - start_ts self.log_message.emit(f"[线程] 任务结束,耗时 {cost:.2f}s") except Exception: pass if sink_id is not None: try: logger.remove(sink_id) except Exception: pass def run_single_mode(self): """执行单个上传模式(action方法)""" try: config = self.config_data self.log_message.emit(f"开始处理单个任务(逐个上传模式)...") pdd = Pdd( url=config.get('达人链接', ''), user_id=config.get('多多id', ''), time_start=config.get('定时发布', '') if config.get('定时发布') else None, ht=config.get('话题', ''), index=config.get('序号', ''), title=config.get('标题', None) ) if self.prepared_files: self.log_message.emit(f"使用预查找的文件列表,共 {len(self.prepared_files)} 个文件") self.progress.emit(30) # 将预查找的文件按类型分类,直接传递给 action(),跳过文件夹扫描 prepared_video_files = [f['path'] for f in self.prepared_files if f['path'].is_file() and any( f['path'].suffix.lower() == ext for ext in VIDEO_EXTENSIONS)] prepared_image_folders = [f['path'] for f in self.prepared_files if f['path'].is_dir()] self.log_message.emit(f"预查找文件: {len(prepared_video_files)} 个视频, {len(prepared_image_folders)} 个图片文件夹") try: result = pdd.action( prepared_video_files=prepared_video_files, prepared_image_folders=prepared_image_folders, collect_all_videos=False ) ok = bool(result.get("ok")) if isinstance(result, dict) else False reason = result.get("reason", "未知原因") if isinstance(result, dict) else "未返回结果" if not ok: self.log_message.emit(f"失败原因: {reason}") self.item_result.emit({ "user_id": config.get("多多id", ""), "index": config.get("序号", ""), "name": config.get("标题", "") or "", "ok": ok, "reason": reason, }) self.progress.emit(100) self.finished.emit(ok, f"单个任务执行完成(ok={ok})") except Exception as e: error_msg = f"执行action方法失败: {str(e)}" self.log_message.emit(error_msg) logger.error(error_msg) import traceback traceback_str = traceback.format_exc() logger.error(traceback_str) self.log_message.emit(f"错误详情: {traceback_str}") self.finished.emit(False, error_msg) return else: folder_path = config.get('文件夹路径', '').strip() if not folder_path: folder_path = get_default_folder_path() if not os.path.exists(folder_path): error_msg = f"文件夹路径不存在: {folder_path}" self.finished.emit(False, error_msg) self.log_message.emit(error_msg) return self.log_message.emit(f"使用文件夹路径: {folder_path}") self.progress.emit(30) is_batch_mode = self.is_batch_mode try: result = pdd.action(folder_path=folder_path, collect_all_videos=is_batch_mode) ok = bool(result.get("ok")) if isinstance(result, dict) else False reason = result.get("reason", "未知原因") if isinstance(result, dict) else "未返回结果" if not ok: self.log_message.emit(f"失败原因: {reason}") self.item_result.emit({ "user_id": config.get("多多id", ""), "index": config.get("序号", ""), "name": config.get("标题", "") or "", "ok": ok, "reason": reason, }) except Exception as e: error_msg = f"执行action方法失败: {str(e)}" self.log_message.emit(error_msg) logger.error(error_msg) import traceback traceback_str = traceback.format_exc() logger.error(traceback_str) self.log_message.emit(f"错误详情: {traceback_str}") self.finished.emit(False, error_msg) return self.progress.emit(100) self.finished.emit(ok, f"单个任务执行完成(ok={ok})") except Exception as e: error_msg = f"执行失败: {str(e)}" self.finished.emit(False, error_msg) self.log_message.emit(error_msg) logger.error(f"执行失败: {e}") import traceback traceback_str = traceback.format_exc() logger.error(traceback_str) self.log_message.emit(f"错误详情: {traceback_str}") def run_batch_mode(self): """执行批量上传模式:先批量上传所有视频,然后逐个上传图片""" try: config = self.config_data self.log_message.emit(f"开始处理批量上传任务...") index = config.get('序号', '') pdd = Pdd( url=config.get('达人链接', ''), user_id=config.get('多多id', ''), time_start=config.get('定时发布', '') if config.get('定时发布') else None, ht=config.get('话题', ''), index=index, title=config.get('标题', None) ) if self.prepared_files: self.log_message.emit(f"使用预查找的文件列表,共 {len(self.prepared_files)} 个文件") self.progress.emit(30) video_file_paths = [f for f in self.prepared_files if f['path'].is_file() and any( f['path'].suffix.lower() == ext for ext in VIDEO_EXTENSIONS)] image_folders = [f for f in self.prepared_files if f['path'].is_dir()] if video_file_paths: self.log_message.emit(f"找到 {len(video_file_paths)} 个视频文件,开始批量上传...") self.progress.emit(30) def on_video_done(result_item): try: payload = dict(result_item) payload["user_id"] = config.get("多多id", "") if "index" not in payload or not payload.get("index"): for vid_file in video_file_paths: if str(vid_file.get('path')) == str(payload.get('path')): payload["index"] = vid_file.get("index", config.get("序号", "")) break if "index" not in payload or not payload.get("index"): payload["index"] = config.get("序号", "") self.item_result.emit(payload) self.log_message.emit(f" {'✓' if payload.get('ok') else '✗'} {payload.get('name', '')} 已更新状态") except Exception as e: self.log_message.emit(f" 回调处理异常: {e}") result = pdd.action1(folder_path=video_file_paths, on_item_done=on_video_done) ok = bool(result.get("ok")) if isinstance(result, dict) else False if not ok: fails = [r for r in (result.get("results") or []) if not r.get("ok")] self.log_message.emit(f"发布校验失败条数: {len(fails)}") for r in fails[:20]: self.log_message.emit(f" ✗ {r.get('name')} | {r.get('reason')}") self.log_message.emit(f"批量上传视频完成,共处理 {len(video_file_paths)} 个视频") else: self.log_message.emit("未找到视频文件,跳过视频上传") ok = True if image_folders: self.log_message.emit(f"找到 {len(image_folders)} 个图片文件夹,开始逐个上传...") self.progress.emit(70) for idx, image_folder_info in enumerate(image_folders): if not self.is_running: break image_folder_path = image_folder_info['path'] self.log_message.emit(f"处理第 {idx + 1}/{len(image_folders)} 个图片文件夹: {image_folder_path}") # 直接传递图片文件夹路径,跳过文件夹扫描 pdd.action(prepared_image_folders=[image_folder_path]) self.log_message.emit(f"图片文件夹 {idx + 1} 上传完成") self.log_message.emit(f"所有图片文件夹上传完成,共处理 {len(image_folders)} 个文件夹") else: self.log_message.emit("未找到图片文件夹,跳过图片上传") self.progress.emit(100) self.finished.emit(ok, f"批量任务执行完成,共处理 {len(video_file_paths)} 个视频和 {len(image_folders)} 个图片文件夹(ok={ok})") else: folder_path = config.get('文件夹路径', '').strip() if not folder_path: folder_path = get_default_folder_path() if not os.path.exists(folder_path): self.finished.emit(False, f"文件夹路径不存在: {folder_path}") return self.log_message.emit("第一步:收集所有视频文件...") video_file_paths = self.prepare_batch_files(folder_path, config) self.log_message.emit("第二步:收集所有图片文件夹...") image_folders = self.prepare_image_folders(folder_path, index) if video_file_paths: self.log_message.emit(f"找到 {len(video_file_paths)} 个视频文件,开始批量上传...") self.progress.emit(30) pdd.action1(folder_path=video_file_paths) self.log_message.emit(f"批量上传视频完成,共处理 {len(video_file_paths)} 个视频") else: self.log_message.emit("未找到视频文件,跳过视频上传") if image_folders: self.log_message.emit(f"找到 {len(image_folders)} 个图片文件夹,开始逐个上传...") self.progress.emit(70) for idx, image_folder in enumerate(image_folders): if not self.is_running: break self.log_message.emit(f"处理第 {idx + 1}/{len(image_folders)} 个图片文件夹: {image_folder}") # 直接传递图片文件夹路径,跳过文件夹扫描 pdd.action(prepared_image_folders=[Path(image_folder)]) self.log_message.emit(f"图片文件夹 {idx + 1} 上传完成") self.log_message.emit(f"所有图片文件夹上传完成,共处理 {len(image_folders)} 个文件夹") else: self.log_message.emit("未找到图片文件夹,跳过图片上传") self.progress.emit(100) total_count = len(video_file_paths) + len(image_folders) self.finished.emit(True, f"批量任务执行完成,共处理 {len(video_file_paths)} 个视频和 {len(image_folders)} 个图片文件夹") except Exception as e: self.finished.emit(False, f"批量执行失败: {str(e)}") logger.error(f"批量执行失败: {e}") def prepare_image_folders(self, folder_path, index): """收集所有匹配序号的图片文件夹(与main.py中action方法的逻辑一致)""" image_folders = [] try: if not folder_path or not os.path.exists(folder_path): logger.warning(f"文件夹路径无效或不存在: {folder_path}") return image_folders if not os.access(folder_path, os.R_OK): logger.error(f"没有权限读取文件夹: {folder_path}") return image_folders try: subdirs = os.listdir(folder_path) except PermissionError: logger.error(f"没有权限访问文件夹: {folder_path}") return image_folders except Exception as e: logger.error(f"读取文件夹失败: {e}") return image_folders for file in subdirs: try: file_path = os.path.join(folder_path, file) if not os.path.isdir(file_path): continue try: files = os.listdir(file_path) except (PermissionError, Exception) as e: logger.warning(f"读取子文件夹失败: {file_path}, 错误: {e}") continue for file_name in files: try: if file_name.startswith('.'): continue file_names = file_name.split("-") if len(file_names) > 0 and file_names[0] == str(index): path = Path(os.path.join(file_path, file_name)) if path.exists() and path.is_dir(): image_folders.append(str(path)) except Exception as e: logger.warning(f"处理文件 {file_name} 时出错: {e}") continue except Exception as e: logger.warning(f"处理子文件夹 {file} 时出错: {e}") continue except Exception as e: logger.error(f"收集图片文件夹失败: {e}", exc_info=True) return image_folders def count_videos_in_folder(self, folder_path, index): """统计文件夹中匹配序号的视频文件数量(与main.py逻辑一致)""" count = 0 try: for file in os.listdir(folder_path): file_path = os.path.join(folder_path, file) if not os.path.isdir(file_path): continue files = os.listdir(file_path) for file_name in files: if ".mp4" in file_name: file_names = file_name.split("-") if len(file_names) > 0 and file_names[0] == str(index): path = Path(os.path.join(file_path, file_name)) if path.is_file(): count += 1 else: for sub_file in os.listdir(path): sub_path = Path(os.path.join(path, sub_file)) if sub_path.is_file(): count += 1 except Exception as e: logger.error(f"统计视频文件失败: {e}") return count def prepare_batch_files(self, folder_path, config): """准备批量上传的文件列表(与main.py中的action1方法逻辑一致)""" file_paths = [] if not config: logger.error("配置为空,无法准备批量上传文件") return file_paths index = str(config.get('序号', '')).strip() if not index: logger.warning("序号为空,无法准备批量上传文件") return file_paths if not folder_path or not os.path.exists(folder_path): logger.error(f"文件夹路径无效或不存在: {folder_path}") return file_paths if not os.access(folder_path, os.R_OK): logger.error(f"没有权限读取文件夹: {folder_path}") return file_paths logger.info("=" * 50) logger.info("开始准备批量上传文件列表...") logger.info(f"文件夹路径: {folder_path}") logger.info(f"查找序号: {index}") try: try: all_items = os.listdir(folder_path) subdirs = [f for f in all_items if os.path.isdir(os.path.join(folder_path, f))] except PermissionError: logger.error(f"没有权限访问文件夹: {folder_path}") return file_paths except Exception as e: logger.error(f"读取文件夹失败: {e}") return file_paths logger.info(f"在最外层文件夹下找到 {len(subdirs)} 个子文件夹(多多ID文件夹)") for subdir_name in subdirs: file_path = os.path.join(folder_path, subdir_name) logger.info(f" 正在扫描子文件夹: {subdir_name}") if not os.path.isdir(file_path): logger.info(f" 跳过(不是文件夹)") continue files = os.listdir(file_path) logger.info(f" 该文件夹下有 {len(files)} 个项目") for file_name in files: logger.info(f" 检查项目: {file_name}") if ".mp4" in file_name: logger.info(f" ✓ 是视频文件(包含.mp4)") file_names = file_name.split("-") logger.info(f" 文件名分割结果: {file_names}") if len(file_names) > 0 and file_names[0] == str(index): logger.info(f" ✓ 序号匹配!序号: {file_names[0]}, 目标序号: {index}") path = Path(os.path.join(file_path, file_name)) if path.is_file(): logger.info(f" ✓ 是文件,添加到列表: {path.name}") file_paths.append({ "url": config.get('达人链接', ''), "user_id": config.get('多多id', ''), "time_start": config.get('定时发布', '') if config.get('定时发布') else None, "ht": config.get('话题', ''), "index": str(index), "path": path }) logger.info(f" 当前视频总数: {len(file_paths)}") else: logger.info(f" 是文件夹,扫描其中的文件...") try: sub_files = os.listdir(path) logger.info(f" 文件夹中有 {len(sub_files)} 个文件") for sub_file in sub_files: sub_path = Path(os.path.join(path, sub_file)) if sub_path.is_file(): logger.info(f" ✓ 添加文件: {sub_file}") file_paths.append({ "url": config.get('达人链接', ''), "user_id": config.get('多多id', ''), "time_start": config.get('定时发布', '') if config.get('定时发布') else None, "ht": config.get('话题', ''), "index": str(index), "path": sub_path }) logger.info(f" 当前视频总数: {len(file_paths)}") except Exception as e: logger.error(f" 扫描子文件夹失败: {e}") else: if len(file_names) > 0: logger.info(f" ✗ 序号不匹配: {file_names[0]} != {index}") else: logger.info(f" ✗ 文件名格式不正确,无法提取序号") else: logger.info(f" ✗ 不是视频文件(不包含.mp4),跳过") except Exception as e: logger.error(f"准备批量文件失败: {e}") import traceback traceback.print_exc() logger.info("=" * 50) logger.info(f"文件收集完成!共找到 {len(file_paths)} 个视频文件") if file_paths: logger.info("视频文件列表:") for idx, video_info in enumerate(file_paths, 1): logger.info(f" {idx}. {video_info['path'].name} ({video_info['path']})") return file_paths def stop(self): """停止执行""" self.is_running = False def requestInterruption(self): """请求中断线程(重写QThread方法)""" super().requestInterruption() self.is_running = False