478 lines
24 KiB
Python
478 lines
24 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""后台工作线程:执行单个/批量上传任务,与 main.Pdd 交互。"""
|
||
|
||
import os
|
||
import time
|
||
from pathlib import Path
|
||
|
||
from PyQt5.QtCore import QThread, pyqtSignal
|
||
|
||
from main import Pdd
|
||
from loguru import logger
|
||
|
||
from gui_constants import get_default_folder_path, VIDEO_EXTENSIONS
|
||
|
||
|
||
class WorkerThread(QThread):
|
||
"""工作线程,用于执行自动化任务"""
|
||
finished = pyqtSignal(bool, str)
|
||
log_message = pyqtSignal(str)
|
||
progress = pyqtSignal(int)
|
||
item_result = pyqtSignal(object) # 单条发布结果(dict)
|
||
|
||
def __init__(self, config_data, is_batch_mode, prepared_files=None, parent=None):
|
||
super().__init__(parent)
|
||
self.config_data = config_data
|
||
self.is_batch_mode = is_batch_mode
|
||
self.prepared_files = prepared_files # 预查找的文件列表
|
||
self.is_running = True
|
||
|
||
def run(self):
|
||
sink_id = None
|
||
start_ts = time.time()
|
||
try:
|
||
sink_id = logger.add(lambda msg: self.log_message.emit(str(msg).strip()))
|
||
if self.is_batch_mode:
|
||
self.run_batch_mode()
|
||
else:
|
||
self.run_single_mode()
|
||
except Exception as e:
|
||
error_msg = f"执行失败: {str(e)}"
|
||
self.finished.emit(False, error_msg)
|
||
self.log_message.emit(error_msg)
|
||
logger.error(f"执行失败: {e}")
|
||
import traceback
|
||
traceback_str = traceback.format_exc()
|
||
logger.error(traceback_str)
|
||
self.log_message.emit(f"错误详情: {traceback_str}")
|
||
finally:
|
||
try:
|
||
cost = time.time() - start_ts
|
||
self.log_message.emit(f"[线程] 任务结束,耗时 {cost:.2f}s")
|
||
except Exception:
|
||
pass
|
||
if sink_id is not None:
|
||
try:
|
||
logger.remove(sink_id)
|
||
except Exception:
|
||
pass
|
||
|
||
def run_single_mode(self):
|
||
"""执行单个上传模式(action方法)"""
|
||
try:
|
||
config = self.config_data
|
||
self.log_message.emit(f"开始处理单个任务(逐个上传模式)...")
|
||
|
||
pdd = Pdd(
|
||
url=config.get('达人链接', ''),
|
||
user_id=config.get('多多id', ''),
|
||
time_start=config.get('定时发布', '') if config.get('定时发布') else None,
|
||
ht=config.get('话题', ''),
|
||
index=config.get('序号', ''),
|
||
title=config.get('标题', None)
|
||
)
|
||
|
||
if self.prepared_files:
|
||
self.log_message.emit(f"使用预查找的文件列表,共 {len(self.prepared_files)} 个文件")
|
||
self.progress.emit(30)
|
||
|
||
# 将预查找的文件按类型分类,直接传递给 action(),跳过文件夹扫描
|
||
prepared_video_files = [f['path'] for f in self.prepared_files
|
||
if f['path'].is_file() and any(
|
||
f['path'].suffix.lower() == ext for ext in VIDEO_EXTENSIONS)]
|
||
prepared_image_folders = [f['path'] for f in self.prepared_files
|
||
if f['path'].is_dir()]
|
||
|
||
self.log_message.emit(f"预查找文件: {len(prepared_video_files)} 个视频, {len(prepared_image_folders)} 个图片文件夹")
|
||
try:
|
||
result = pdd.action(
|
||
prepared_video_files=prepared_video_files,
|
||
prepared_image_folders=prepared_image_folders,
|
||
collect_all_videos=False
|
||
)
|
||
ok = bool(result.get("ok")) if isinstance(result, dict) else True
|
||
if not ok:
|
||
self.log_message.emit(f"失败原因: {result.get('reason') if isinstance(result, dict) else ''}")
|
||
if isinstance(result, dict):
|
||
self.item_result.emit({
|
||
"user_id": config.get("多多id", ""),
|
||
"index": config.get("序号", ""),
|
||
"name": config.get("标题", "") or "",
|
||
"ok": ok,
|
||
"reason": result.get("reason", ""),
|
||
})
|
||
self.progress.emit(100)
|
||
self.finished.emit(ok, f"单个任务执行完成(ok={ok})")
|
||
except Exception as e:
|
||
error_msg = f"执行action方法失败: {str(e)}"
|
||
self.log_message.emit(error_msg)
|
||
logger.error(error_msg)
|
||
import traceback
|
||
traceback_str = traceback.format_exc()
|
||
logger.error(traceback_str)
|
||
self.log_message.emit(f"错误详情: {traceback_str}")
|
||
self.finished.emit(False, error_msg)
|
||
return
|
||
else:
|
||
folder_path = config.get('文件夹路径', '').strip()
|
||
if not folder_path:
|
||
folder_path = get_default_folder_path()
|
||
|
||
if not os.path.exists(folder_path):
|
||
error_msg = f"文件夹路径不存在: {folder_path}"
|
||
self.finished.emit(False, error_msg)
|
||
self.log_message.emit(error_msg)
|
||
return
|
||
|
||
self.log_message.emit(f"使用文件夹路径: {folder_path}")
|
||
self.progress.emit(30)
|
||
is_batch_mode = self.is_batch_mode
|
||
|
||
try:
|
||
result = pdd.action(folder_path=folder_path, collect_all_videos=is_batch_mode)
|
||
ok = bool(result.get("ok")) if isinstance(result, dict) else True
|
||
if not ok:
|
||
self.log_message.emit(f"失败原因: {result.get('reason') if isinstance(result, dict) else ''}")
|
||
if isinstance(result, dict):
|
||
self.item_result.emit({
|
||
"user_id": config.get("多多id", ""),
|
||
"index": config.get("序号", ""),
|
||
"name": config.get("标题", "") or "",
|
||
"ok": ok,
|
||
"reason": result.get("reason", ""),
|
||
})
|
||
except Exception as e:
|
||
error_msg = f"执行action方法失败: {str(e)}"
|
||
self.log_message.emit(error_msg)
|
||
logger.error(error_msg)
|
||
import traceback
|
||
traceback_str = traceback.format_exc()
|
||
logger.error(traceback_str)
|
||
self.log_message.emit(f"错误详情: {traceback_str}")
|
||
self.finished.emit(False, error_msg)
|
||
return
|
||
|
||
self.progress.emit(100)
|
||
self.finished.emit(ok, f"单个任务执行完成(ok={ok})")
|
||
except Exception as e:
|
||
error_msg = f"执行失败: {str(e)}"
|
||
self.finished.emit(False, error_msg)
|
||
self.log_message.emit(error_msg)
|
||
logger.error(f"执行失败: {e}")
|
||
import traceback
|
||
traceback_str = traceback.format_exc()
|
||
logger.error(traceback_str)
|
||
self.log_message.emit(f"错误详情: {traceback_str}")
|
||
|
||
def run_batch_mode(self):
|
||
"""执行批量上传模式:先批量上传所有视频,然后逐个上传图片"""
|
||
try:
|
||
config = self.config_data
|
||
self.log_message.emit(f"开始处理批量上传任务...")
|
||
index = config.get('序号', '')
|
||
|
||
pdd = Pdd(
|
||
url=config.get('达人链接', ''),
|
||
user_id=config.get('多多id', ''),
|
||
time_start=config.get('定时发布', '') if config.get('定时发布') else None,
|
||
ht=config.get('话题', ''),
|
||
index=index,
|
||
title=config.get('标题', None)
|
||
)
|
||
|
||
if self.prepared_files:
|
||
self.log_message.emit(f"使用预查找的文件列表,共 {len(self.prepared_files)} 个文件")
|
||
self.progress.emit(30)
|
||
|
||
video_file_paths = [f for f in self.prepared_files if f['path'].is_file() and any(
|
||
f['path'].suffix.lower() == ext for ext in VIDEO_EXTENSIONS)]
|
||
image_folders = [f for f in self.prepared_files if f['path'].is_dir()]
|
||
|
||
if video_file_paths:
|
||
self.log_message.emit(f"找到 {len(video_file_paths)} 个视频文件,开始批量上传...")
|
||
self.progress.emit(30)
|
||
|
||
def on_video_done(result_item):
|
||
try:
|
||
payload = dict(result_item)
|
||
payload["user_id"] = config.get("多多id", "")
|
||
if "index" not in payload or not payload.get("index"):
|
||
for vid_file in video_file_paths:
|
||
if str(vid_file.get('path')) == str(payload.get('path')):
|
||
payload["index"] = vid_file.get("index", config.get("序号", ""))
|
||
break
|
||
if "index" not in payload or not payload.get("index"):
|
||
payload["index"] = config.get("序号", "")
|
||
self.item_result.emit(payload)
|
||
self.log_message.emit(f" {'✓' if payload.get('ok') else '✗'} {payload.get('name', '')} 已更新状态")
|
||
except Exception as e:
|
||
self.log_message.emit(f" 回调处理异常: {e}")
|
||
|
||
result = pdd.action1(folder_path=video_file_paths, on_item_done=on_video_done)
|
||
ok = bool(result.get("ok")) if isinstance(result, dict) else True
|
||
if not ok:
|
||
fails = [r for r in (result.get("results") or []) if not r.get("ok")]
|
||
self.log_message.emit(f"发布校验失败条数: {len(fails)}")
|
||
for r in fails[:20]:
|
||
self.log_message.emit(f" ✗ {r.get('name')} | {r.get('reason')}")
|
||
self.log_message.emit(f"批量上传视频完成,共处理 {len(video_file_paths)} 个视频")
|
||
else:
|
||
self.log_message.emit("未找到视频文件,跳过视频上传")
|
||
ok = True
|
||
|
||
if image_folders:
|
||
self.log_message.emit(f"找到 {len(image_folders)} 个图片文件夹,开始逐个上传...")
|
||
self.progress.emit(70)
|
||
|
||
for idx, image_folder_info in enumerate(image_folders):
|
||
if not self.is_running:
|
||
break
|
||
image_folder_path = image_folder_info['path']
|
||
self.log_message.emit(f"处理第 {idx + 1}/{len(image_folders)} 个图片文件夹: {image_folder_path}")
|
||
# 直接传递图片文件夹路径,跳过文件夹扫描
|
||
pdd.action(prepared_image_folders=[image_folder_path])
|
||
self.log_message.emit(f"图片文件夹 {idx + 1} 上传完成")
|
||
|
||
self.log_message.emit(f"所有图片文件夹上传完成,共处理 {len(image_folders)} 个文件夹")
|
||
else:
|
||
self.log_message.emit("未找到图片文件夹,跳过图片上传")
|
||
|
||
self.progress.emit(100)
|
||
self.finished.emit(ok,
|
||
f"批量任务执行完成,共处理 {len(video_file_paths)} 个视频和 {len(image_folders)} 个图片文件夹(ok={ok})")
|
||
else:
|
||
folder_path = config.get('文件夹路径', '').strip()
|
||
if not folder_path:
|
||
folder_path = get_default_folder_path()
|
||
|
||
if not os.path.exists(folder_path):
|
||
self.finished.emit(False, f"文件夹路径不存在: {folder_path}")
|
||
return
|
||
|
||
self.log_message.emit("第一步:收集所有视频文件...")
|
||
video_file_paths = self.prepare_batch_files(folder_path, config)
|
||
self.log_message.emit("第二步:收集所有图片文件夹...")
|
||
image_folders = self.prepare_image_folders(folder_path, index)
|
||
|
||
if video_file_paths:
|
||
self.log_message.emit(f"找到 {len(video_file_paths)} 个视频文件,开始批量上传...")
|
||
self.progress.emit(30)
|
||
pdd.action1(folder_path=video_file_paths)
|
||
self.log_message.emit(f"批量上传视频完成,共处理 {len(video_file_paths)} 个视频")
|
||
else:
|
||
self.log_message.emit("未找到视频文件,跳过视频上传")
|
||
|
||
if image_folders:
|
||
self.log_message.emit(f"找到 {len(image_folders)} 个图片文件夹,开始逐个上传...")
|
||
self.progress.emit(70)
|
||
|
||
for idx, image_folder in enumerate(image_folders):
|
||
if not self.is_running:
|
||
break
|
||
self.log_message.emit(f"处理第 {idx + 1}/{len(image_folders)} 个图片文件夹: {image_folder}")
|
||
# 直接传递图片文件夹路径,跳过文件夹扫描
|
||
pdd.action(prepared_image_folders=[Path(image_folder)])
|
||
self.log_message.emit(f"图片文件夹 {idx + 1} 上传完成")
|
||
|
||
self.log_message.emit(f"所有图片文件夹上传完成,共处理 {len(image_folders)} 个文件夹")
|
||
else:
|
||
self.log_message.emit("未找到图片文件夹,跳过图片上传")
|
||
|
||
self.progress.emit(100)
|
||
total_count = len(video_file_paths) + len(image_folders)
|
||
self.finished.emit(True,
|
||
f"批量任务执行完成,共处理 {len(video_file_paths)} 个视频和 {len(image_folders)} 个图片文件夹")
|
||
|
||
except Exception as e:
|
||
self.finished.emit(False, f"批量执行失败: {str(e)}")
|
||
logger.error(f"批量执行失败: {e}")
|
||
|
||
def prepare_image_folders(self, folder_path, index):
|
||
"""收集所有匹配序号的图片文件夹(与main.py中action方法的逻辑一致)"""
|
||
image_folders = []
|
||
try:
|
||
if not folder_path or not os.path.exists(folder_path):
|
||
logger.warning(f"文件夹路径无效或不存在: {folder_path}")
|
||
return image_folders
|
||
if not os.access(folder_path, os.R_OK):
|
||
logger.error(f"没有权限读取文件夹: {folder_path}")
|
||
return image_folders
|
||
try:
|
||
subdirs = os.listdir(folder_path)
|
||
except PermissionError:
|
||
logger.error(f"没有权限访问文件夹: {folder_path}")
|
||
return image_folders
|
||
except Exception as e:
|
||
logger.error(f"读取文件夹失败: {e}")
|
||
return image_folders
|
||
|
||
for file in subdirs:
|
||
try:
|
||
file_path = os.path.join(folder_path, file)
|
||
if not os.path.isdir(file_path):
|
||
continue
|
||
try:
|
||
files = os.listdir(file_path)
|
||
except (PermissionError, Exception) as e:
|
||
logger.warning(f"读取子文件夹失败: {file_path}, 错误: {e}")
|
||
continue
|
||
|
||
for file_name in files:
|
||
try:
|
||
if file_name.startswith('.'):
|
||
continue
|
||
file_names = file_name.split("-")
|
||
if len(file_names) > 0 and file_names[0] == str(index):
|
||
path = Path(os.path.join(file_path, file_name))
|
||
if path.exists() and path.is_dir():
|
||
image_folders.append(str(path))
|
||
except Exception as e:
|
||
logger.warning(f"处理文件 {file_name} 时出错: {e}")
|
||
continue
|
||
except Exception as e:
|
||
logger.warning(f"处理子文件夹 {file} 时出错: {e}")
|
||
continue
|
||
except Exception as e:
|
||
logger.error(f"收集图片文件夹失败: {e}", exc_info=True)
|
||
return image_folders
|
||
|
||
def count_videos_in_folder(self, folder_path, index):
|
||
"""统计文件夹中匹配序号的视频文件数量(与main.py逻辑一致)"""
|
||
count = 0
|
||
try:
|
||
for file in os.listdir(folder_path):
|
||
file_path = os.path.join(folder_path, file)
|
||
if not os.path.isdir(file_path):
|
||
continue
|
||
files = os.listdir(file_path)
|
||
for file_name in files:
|
||
if ".mp4" in file_name:
|
||
file_names = file_name.split("-")
|
||
if len(file_names) > 0 and file_names[0] == str(index):
|
||
path = Path(os.path.join(file_path, file_name))
|
||
if path.is_file():
|
||
count += 1
|
||
else:
|
||
for sub_file in os.listdir(path):
|
||
sub_path = Path(os.path.join(path, sub_file))
|
||
if sub_path.is_file():
|
||
count += 1
|
||
except Exception as e:
|
||
logger.error(f"统计视频文件失败: {e}")
|
||
return count
|
||
|
||
def prepare_batch_files(self, folder_path, config):
|
||
"""准备批量上传的文件列表(与main.py中的action1方法逻辑一致)"""
|
||
file_paths = []
|
||
if not config:
|
||
logger.error("配置为空,无法准备批量上传文件")
|
||
return file_paths
|
||
index = str(config.get('序号', '')).strip()
|
||
if not index:
|
||
logger.warning("序号为空,无法准备批量上传文件")
|
||
return file_paths
|
||
if not folder_path or not os.path.exists(folder_path):
|
||
logger.error(f"文件夹路径无效或不存在: {folder_path}")
|
||
return file_paths
|
||
if not os.access(folder_path, os.R_OK):
|
||
logger.error(f"没有权限读取文件夹: {folder_path}")
|
||
return file_paths
|
||
|
||
logger.info("=" * 50)
|
||
logger.info("开始准备批量上传文件列表...")
|
||
logger.info(f"文件夹路径: {folder_path}")
|
||
logger.info(f"查找序号: {index}")
|
||
|
||
try:
|
||
try:
|
||
all_items = os.listdir(folder_path)
|
||
subdirs = [f for f in all_items if os.path.isdir(os.path.join(folder_path, f))]
|
||
except PermissionError:
|
||
logger.error(f"没有权限访问文件夹: {folder_path}")
|
||
return file_paths
|
||
except Exception as e:
|
||
logger.error(f"读取文件夹失败: {e}")
|
||
return file_paths
|
||
|
||
logger.info(f"在最外层文件夹下找到 {len(subdirs)} 个子文件夹(多多ID文件夹)")
|
||
|
||
for subdir_name in subdirs:
|
||
file_path = os.path.join(folder_path, subdir_name)
|
||
logger.info(f" 正在扫描子文件夹: {subdir_name}")
|
||
if not os.path.isdir(file_path):
|
||
logger.info(f" 跳过(不是文件夹)")
|
||
continue
|
||
files = os.listdir(file_path)
|
||
logger.info(f" 该文件夹下有 {len(files)} 个项目")
|
||
|
||
for file_name in files:
|
||
logger.info(f" 检查项目: {file_name}")
|
||
if ".mp4" in file_name:
|
||
logger.info(f" ✓ 是视频文件(包含.mp4)")
|
||
file_names = file_name.split("-")
|
||
logger.info(f" 文件名分割结果: {file_names}")
|
||
|
||
if len(file_names) > 0 and file_names[0] == str(index):
|
||
logger.info(f" ✓ 序号匹配!序号: {file_names[0]}, 目标序号: {index}")
|
||
path = Path(os.path.join(file_path, file_name))
|
||
|
||
if path.is_file():
|
||
logger.info(f" ✓ 是文件,添加到列表: {path.name}")
|
||
file_paths.append({
|
||
"url": config.get('达人链接', ''),
|
||
"user_id": config.get('多多id', ''),
|
||
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
|
||
"ht": config.get('话题', ''),
|
||
"index": str(index),
|
||
"path": path
|
||
})
|
||
logger.info(f" 当前视频总数: {len(file_paths)}")
|
||
else:
|
||
logger.info(f" 是文件夹,扫描其中的文件...")
|
||
try:
|
||
sub_files = os.listdir(path)
|
||
logger.info(f" 文件夹中有 {len(sub_files)} 个文件")
|
||
for sub_file in sub_files:
|
||
sub_path = Path(os.path.join(path, sub_file))
|
||
if sub_path.is_file():
|
||
logger.info(f" ✓ 添加文件: {sub_file}")
|
||
file_paths.append({
|
||
"url": config.get('达人链接', ''),
|
||
"user_id": config.get('多多id', ''),
|
||
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
|
||
"ht": config.get('话题', ''),
|
||
"index": str(index),
|
||
"path": sub_path
|
||
})
|
||
logger.info(f" 当前视频总数: {len(file_paths)}")
|
||
except Exception as e:
|
||
logger.error(f" 扫描子文件夹失败: {e}")
|
||
else:
|
||
if len(file_names) > 0:
|
||
logger.info(f" ✗ 序号不匹配: {file_names[0]} != {index}")
|
||
else:
|
||
logger.info(f" ✗ 文件名格式不正确,无法提取序号")
|
||
else:
|
||
logger.info(f" ✗ 不是视频文件(不包含.mp4),跳过")
|
||
except Exception as e:
|
||
logger.error(f"准备批量文件失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
logger.info("=" * 50)
|
||
logger.info(f"文件收集完成!共找到 {len(file_paths)} 个视频文件")
|
||
if file_paths:
|
||
logger.info("视频文件列表:")
|
||
for idx, video_info in enumerate(file_paths, 1):
|
||
logger.info(f" {idx}. {video_info['path'].name} ({video_info['path']})")
|
||
return file_paths
|
||
|
||
def stop(self):
|
||
"""停止执行"""
|
||
self.is_running = False
|
||
|
||
def requestInterruption(self):
|
||
"""请求中断线程(重写QThread方法)"""
|
||
super().requestInterruption()
|
||
self.is_running = False
|