Files
haha/gui_app.py
2026-01-20 15:24:08 +08:00

1452 lines
69 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import json
import time
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Optional
import pandas as pd
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QPushButton, QLabel, QLineEdit, QTextEdit, QFileDialog,
QTableWidget, QTableWidgetItem, QMessageBox, QDateTimeEdit,
QGroupBox, QCheckBox, QProgressBar, QGridLayout
)
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QDateTime
from PyQt5.QtGui import QFont
from main import Pdd
from loguru import logger
def get_default_folder_path():
"""获取默认文件夹路径(桌面/多多自动化发文)"""
desktop = os.path.join(os.path.expanduser("~"), "Desktop")
default_path = os.path.join(desktop, "多多自动化发文")
return default_path
class WorkerThread(QThread):
"""工作线程,用于执行自动化任务"""
finished = pyqtSignal(bool, str)
log_message = pyqtSignal(str)
progress = pyqtSignal(int)
def __init__(self, config_data, is_batch_mode, prepared_files=None, parent=None):
super().__init__(parent)
self.config_data = config_data
self.is_batch_mode = is_batch_mode
self.prepared_files = prepared_files # 预查找的文件列表
self.is_running = True
def run(self):
try:
# 配置日志输出到GUI
logger.remove()
logger.add(lambda msg: self.log_message.emit(str(msg).strip()))
if self.is_batch_mode:
self.run_batch_mode()
else:
self.run_single_mode()
except Exception as e:
error_msg = f"执行失败: {str(e)}"
self.finished.emit(False, error_msg)
self.log_message.emit(error_msg)
logger.error(f"执行失败: {e}")
import traceback
traceback_str = traceback.format_exc()
logger.error(traceback_str)
self.log_message.emit(f"错误详情: {traceback_str}")
def run_single_mode(self):
"""执行单个上传模式action方法"""
try:
config = self.config_data
self.log_message.emit(f"开始处理单个任务(逐个上传模式)...")
pdd = Pdd(
url=config.get('达人链接', ''),
user_id=config.get('多多id', ''),
time_start=config.get('定时发布', '') if config.get('定时发布') else None,
ht=config.get('话题', ''),
index=config.get('序号', ''),
title=config.get('标题', None)
)
# 如果已经预查找了文件,直接使用
if self.prepared_files:
self.log_message.emit(f"使用预查找的文件列表,共 {len(self.prepared_files)} 个文件")
self.progress.emit(30)
# 判断是否为视频文件
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm']
video_files = [f for f in self.prepared_files if f['path'].is_file() and any(f['path'].suffix.lower() == ext for ext in video_extensions)]
# 如果勾选了批量上传且是视频调用action1方法
if self.is_batch_mode and video_files:
# 批量上传视频调用action1方法
self.log_message.emit(f"批量上传模式:找到 {len(video_files)} 个视频文件调用action1方法")
try:
pdd.action1(folder_path=video_files)
self.progress.emit(100)
self.finished.emit(True, f"批量上传完成,共处理 {len(video_files)} 个视频")
except Exception as e:
error_msg = f"执行action1方法失败: {str(e)}"
self.log_message.emit(error_msg)
logger.error(error_msg)
import traceback
traceback_str = traceback.format_exc()
logger.error(traceback_str)
self.log_message.emit(f"错误详情: {traceback_str}")
self.finished.emit(False, error_msg)
return
else:
# 单个上传模式使用action方法
# 从预查找的文件中提取文件夹路径(取第一个文件的父目录的父目录,回到最外层文件夹)
if self.prepared_files:
first_file = self.prepared_files[0]
if first_file['path'].is_file():
# 文件路径:最外层文件夹/多多ID文件夹/文件名
# 需要回到最外层文件夹
folder_path = str(first_file['path'].parent.parent)
else:
# 文件夹路径:最外层文件夹/多多ID文件夹/文件夹名
folder_path = str(first_file['path'].parent.parent)
self.log_message.emit(f"使用文件夹路径: {folder_path}")
try:
pdd.action(folder_path=folder_path, collect_all_videos=False)
self.progress.emit(100)
self.finished.emit(True, "单个任务执行完成")
except Exception as e:
error_msg = f"执行action方法失败: {str(e)}"
self.log_message.emit(error_msg)
logger.error(error_msg)
import traceback
traceback_str = traceback.format_exc()
logger.error(traceback_str)
self.log_message.emit(f"错误详情: {traceback_str}")
self.finished.emit(False, error_msg)
return
else:
# 未预查找文件,使用原来的逻辑
folder_path = config.get('文件夹路径', '').strip()
if not folder_path:
folder_path = get_default_folder_path()
if not os.path.exists(folder_path):
error_msg = f"文件夹路径不存在: {folder_path}"
self.finished.emit(False, error_msg)
self.log_message.emit(error_msg)
return
self.log_message.emit(f"使用文件夹路径: {folder_path}")
self.progress.emit(30)
# 检查是否勾选了批量上传
is_batch_mode = self.is_batch_mode
# 调用action方法
try:
pdd.action(folder_path=folder_path, collect_all_videos=is_batch_mode)
except Exception as e:
error_msg = f"执行action方法失败: {str(e)}"
self.log_message.emit(error_msg)
logger.error(error_msg)
import traceback
traceback_str = traceback.format_exc()
logger.error(traceback_str)
self.log_message.emit(f"错误详情: {traceback_str}")
self.finished.emit(False, error_msg)
return
self.progress.emit(100)
self.finished.emit(True, "单个任务执行完成")
except Exception as e:
error_msg = f"执行失败: {str(e)}"
self.finished.emit(False, error_msg)
self.log_message.emit(error_msg)
logger.error(f"执行失败: {e}")
import traceback
traceback_str = traceback.format_exc()
logger.error(traceback_str)
self.log_message.emit(f"错误详情: {traceback_str}")
def run_batch_mode(self):
"""执行批量上传模式:先批量上传所有视频,然后逐个上传图片"""
try:
config = self.config_data
self.log_message.emit(f"开始处理批量上传任务...")
index = config.get('序号', '')
# 创建Pdd实例
pdd = Pdd(
url=config.get('达人链接', ''),
user_id=config.get('多多id', ''),
time_start=config.get('定时发布', '') if config.get('定时发布') else None,
ht=config.get('话题', ''),
index=index,
title=config.get('标题', None)
)
# 如果已经预查找了文件,直接使用
if self.prepared_files:
self.log_message.emit(f"使用预查找的文件列表,共 {len(self.prepared_files)} 个文件")
self.progress.emit(30)
# 分离视频文件和图片文件夹
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm']
video_file_paths = [f for f in self.prepared_files if f['path'].is_file() and any(f['path'].suffix.lower() == ext for ext in video_extensions)]
image_folders = [f for f in self.prepared_files if f['path'].is_dir()]
# 如果有视频,批量上传所有视频
if video_file_paths:
self.log_message.emit(f"找到 {len(video_file_paths)} 个视频文件,开始批量上传...")
self.progress.emit(30)
pdd.action1(folder_path=video_file_paths)
self.log_message.emit(f"批量上传视频完成,共处理 {len(video_file_paths)} 个视频")
else:
self.log_message.emit("未找到视频文件,跳过视频上传")
# 如果有图片文件夹,逐个上传图片
if image_folders:
self.log_message.emit(f"找到 {len(image_folders)} 个图片文件夹,开始逐个上传...")
self.progress.emit(70)
for idx, image_folder_info in enumerate(image_folders):
if not self.is_running:
break
image_folder = str(image_folder_info['path'])
self.log_message.emit(f"处理第 {idx + 1}/{len(image_folders)} 个图片文件夹: {image_folder}")
# 使用action方法处理图片文件夹
pdd.action(folder_path=image_folder)
self.log_message.emit(f"图片文件夹 {idx + 1} 上传完成")
self.log_message.emit(f"所有图片文件夹上传完成,共处理 {len(image_folders)} 个文件夹")
else:
self.log_message.emit("未找到图片文件夹,跳过图片上传")
self.progress.emit(100)
total_count = len(video_file_paths) + len(image_folders)
self.finished.emit(True, f"批量任务执行完成,共处理 {len(video_file_paths)} 个视频和 {len(image_folders)} 个图片文件夹")
else:
# 未预查找文件,使用原来的逻辑
folder_path = config.get('文件夹路径', '').strip()
if not folder_path:
folder_path = get_default_folder_path()
if not os.path.exists(folder_path):
self.finished.emit(False, f"文件夹路径不存在: {folder_path}")
return
# 第一步:收集所有视频文件
self.log_message.emit("第一步:收集所有视频文件...")
video_file_paths = self.prepare_batch_files(folder_path, config)
# 第二步:收集所有图片文件夹
self.log_message.emit("第二步:收集所有图片文件夹...")
image_folders = self.prepare_image_folders(folder_path, index)
# 第三步:如果有视频,批量上传所有视频
if video_file_paths:
self.log_message.emit(f"找到 {len(video_file_paths)} 个视频文件,开始批量上传...")
self.progress.emit(30)
pdd.action1(folder_path=video_file_paths)
self.log_message.emit(f"批量上传视频完成,共处理 {len(video_file_paths)} 个视频")
else:
self.log_message.emit("未找到视频文件,跳过视频上传")
# 第四步:如果有图片文件夹,逐个上传图片
if image_folders:
self.log_message.emit(f"找到 {len(image_folders)} 个图片文件夹,开始逐个上传...")
self.progress.emit(70)
for idx, image_folder in enumerate(image_folders):
if not self.is_running:
break
self.log_message.emit(f"处理第 {idx + 1}/{len(image_folders)} 个图片文件夹: {image_folder}")
# 使用action方法处理图片文件夹
pdd.action(folder_path=image_folder)
self.log_message.emit(f"图片文件夹 {idx + 1} 上传完成")
self.log_message.emit(f"所有图片文件夹上传完成,共处理 {len(image_folders)} 个文件夹")
else:
self.log_message.emit("未找到图片文件夹,跳过图片上传")
self.progress.emit(100)
total_count = len(video_file_paths) + len(image_folders)
self.finished.emit(True, f"批量任务执行完成,共处理 {len(video_file_paths)} 个视频和 {len(image_folders)} 个图片文件夹")
except Exception as e:
self.finished.emit(False, f"批量执行失败: {str(e)}")
logger.error(f"批量执行失败: {e}")
def prepare_image_folders(self, folder_path, index):
"""收集所有匹配序号的图片文件夹与main.py中action方法的逻辑一致"""
image_folders = []
try:
# 遍历最外层文件夹下的所有子文件夹与main.py逻辑一致
for file in os.listdir(folder_path): # 获取文件夹下所有的文件夹
file_path = os.path.join(folder_path, file) # 拼接文件夹
# 检查是否为目录,跳过文件(如.lnk快捷方式
if not os.path.isdir(file_path):
continue
files = os.listdir(file_path) # 获取用户id下的文件
for file_name in files:
# 用"-"分割文件名,检查第一部分是否等于序号
file_names = file_name.split("-")
if len(file_names) > 0 and file_names[0] == str(index):
path = Path(os.path.join(file_path, file_name))
# 判断是否为文件夹(图片文件夹)
if not path.is_file():
# 这是一个图片文件夹
image_folders.append(str(path))
except Exception as e:
logger.error(f"收集图片文件夹失败: {e}")
return image_folders
def count_videos_in_folder(self, folder_path, index):
"""统计文件夹中匹配序号的视频文件数量与main.py逻辑一致"""
count = 0
try:
# 遍历最外层文件夹下的所有子文件夹与main.py逻辑一致
for file in os.listdir(folder_path): # 获取文件夹下所有的文件夹
file_path = os.path.join(folder_path, file) # 拼接文件夹
# 检查是否为目录,跳过文件(如.lnk快捷方式
if not os.path.isdir(file_path):
continue
files = os.listdir(file_path) # 获取用户id下的文件
for file_name in files:
# 检查是否是视频文件(.mp4
if ".mp4" in file_name:
# 用"-"分割文件名,检查第一部分是否等于序号
# 文件名格式4-茶叶蛋大冒险-.mp4 -> ['4', '茶叶蛋大冒险', '', 'mp4']
file_names = file_name.split("-")
if len(file_names) > 0 and file_names[0] == str(index):
path = Path(os.path.join(file_path, file_name))
# 判断是否为文件
if path.is_file():
count += 1
else:
# 如果是文件夹,统计其中的文件
for sub_file in os.listdir(path):
sub_path = Path(os.path.join(path, sub_file))
if sub_path.is_file():
count += 1
except Exception as e:
logger.error(f"统计视频文件失败: {e}")
return count
def prepare_batch_files(self, folder_path, config):
"""准备批量上传的文件列表与main.py中的action1方法逻辑一致"""
file_paths = []
index = config.get('序号', '')
logger.info("=" * 50)
logger.info("开始准备批量上传文件列表...")
logger.info(f"文件夹路径: {folder_path}")
logger.info(f"查找序号: {index}")
try:
# 遍历最外层文件夹下的所有子文件夹与main.py逻辑一致
subdirs = [f for f in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, f))]
logger.info(f"在最外层文件夹下找到 {len(subdirs)} 个子文件夹多多ID文件夹")
for subdir_name in subdirs:
file_path = os.path.join(folder_path, subdir_name) # 拼接文件夹
logger.info(f" 正在扫描子文件夹: {subdir_name}")
# 检查是否为目录,跳过文件(如.lnk快捷方式
if not os.path.isdir(file_path):
logger.info(f" 跳过(不是文件夹)")
continue
files = os.listdir(file_path) # 获取用户id下的文件
logger.info(f" 该文件夹下有 {len(files)} 个项目")
for file_name in files:
logger.info(f" 检查项目: {file_name}")
# 检查是否是视频文件(.mp4
if ".mp4" in file_name:
logger.info(f" ✓ 是视频文件(包含.mp4")
# 用"-"分割文件名,检查第一部分是否等于序号
# 文件名格式4-茶叶蛋大冒险-.mp4 -> ['4', '茶叶蛋大冒险', '', 'mp4']
file_names = file_name.split("-")
logger.info(f" 文件名分割结果: {file_names}")
if len(file_names) > 0 and file_names[0] == str(index):
logger.info(f" ✓ 序号匹配!序号: {file_names[0]}, 目标序号: {index}")
path = Path(os.path.join(file_path, file_name))
# 判断是否为文件
if path.is_file():
logger.info(f" ✓ 是文件,添加到列表: {path.name}")
file_paths.append({
"url": config.get('达人链接', ''),
"user_id": config.get('多多id', ''),
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
"ht": config.get('话题', ''),
"index": str(index),
"path": path
})
logger.info(f" 当前视频总数: {len(file_paths)}")
# 注意这里不break因为可能有多个匹配的文件
else:
logger.info(f" 是文件夹,扫描其中的文件...")
# 如果是文件夹,添加其中的所有文件
try:
sub_files = os.listdir(path)
logger.info(f" 文件夹中有 {len(sub_files)} 个文件")
for sub_file in sub_files:
sub_path = Path(os.path.join(path, sub_file))
if sub_path.is_file():
logger.info(f" ✓ 添加文件: {sub_file}")
file_paths.append({
"url": config.get('达人链接', ''),
"user_id": config.get('多多id', ''),
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
"ht": config.get('话题', ''),
"index": str(index),
"path": sub_path
})
logger.info(f" 当前视频总数: {len(file_paths)}")
except Exception as e:
logger.error(f" 扫描子文件夹失败: {e}")
else:
if len(file_names) > 0:
logger.info(f" ✗ 序号不匹配: {file_names[0]} != {index}")
else:
logger.info(f" ✗ 文件名格式不正确,无法提取序号")
else:
logger.info(f" ✗ 不是视频文件(不包含.mp4跳过")
except Exception as e:
logger.error(f"准备批量文件失败: {e}")
import traceback
traceback.print_exc()
logger.info("=" * 50)
logger.info(f"文件收集完成!共找到 {len(file_paths)} 个视频文件")
if file_paths:
logger.info("视频文件列表:")
for idx, video_info in enumerate(file_paths, 1):
logger.info(f" {idx}. {video_info['path'].name} ({video_info['path']})")
return file_paths
def stop(self):
"""停止执行"""
self.is_running = False
class MainWindow(QMainWindow):
"""主窗口"""
def __init__(self):
super().__init__()
self.worker_thread = None
self.configs = [] # 存储从Excel导入的配置
self.prepared_files = None # 存储通过"更新数据"找到的文件列表
self.running_total = 0
self.running_done = 0
self.init_ui()
def init_ui(self):
self.setWindowTitle("拼多多自动化发布工具")
self.setGeometry(100, 100, 1000, 800)
self.apply_styles()
# 创建中央部件
central_widget = QWidget()
self.setCentralWidget(central_widget)
# 主布局
main_layout = QVBoxLayout()
main_layout.setContentsMargins(20, 20, 20, 20)
main_layout.setSpacing(14)
central_widget.setLayout(main_layout)
# 顶部标题区
header_layout = QHBoxLayout()
title_box = QVBoxLayout()
title_label = QLabel("拼多多自动化发布工具")
title_label.setObjectName("titleLabel")
subtitle_label = QLabel("配置导入 • 文件查找 • 批量上传")
subtitle_label.setObjectName("subtitleLabel")
title_box.addWidget(title_label)
title_box.addWidget(subtitle_label)
header_layout.addLayout(title_box)
header_layout.addStretch()
main_layout.addLayout(header_layout)
# 状态卡片区
status_layout = QHBoxLayout()
self.status_update_value = QLabel("未更新")
self.status_pending_value = QLabel("0")
self.status_running_value = QLabel("0")
update_card = self._build_status_card("更新状态", self.status_update_value)
pending_card = self._build_status_card("待执行", self.status_pending_value)
running_card = self._build_status_card("执行中", self.status_running_value)
status_layout.addWidget(update_card)
status_layout.addWidget(pending_card)
status_layout.addWidget(running_card)
main_layout.addLayout(status_layout)
# Excel导入区域
import_group = QGroupBox("Excel配置导入可选")
import_layout = QVBoxLayout()
h1 = QHBoxLayout()
h1.addWidget(QLabel("Excel文件:"))
self.excel_path_input = QLineEdit()
h1.addWidget(self.excel_path_input)
self.excel_browse_btn = QPushButton("浏览")
self.excel_browse_btn.clicked.connect(self.browse_excel)
h1.addWidget(self.excel_browse_btn)
self.import_btn = QPushButton("导入")
self.import_btn.clicked.connect(self.import_excel)
h1.addWidget(self.import_btn)
import_layout.addLayout(h1)
import_group.setLayout(import_layout)
main_layout.addWidget(import_group)
# 配置输入区域
config_group = QGroupBox("配置信息")
config_layout = QVBoxLayout()
grid = QGridLayout()
grid.setHorizontalSpacing(12)
grid.setVerticalSpacing(10)
self.user_id_input = QLineEdit()
self.index_input = QLineEdit()
self.topic_input = QLineEdit()
self.schedule_datetime = QDateTimeEdit()
self.schedule_datetime.setCalendarPopup(True)
self.schedule_datetime.setDateTime(QDateTime.currentDateTime())
self.url_input = QLineEdit()
self.executor_input = QLineEdit()
grid.addWidget(QLabel("多多ID:"), 0, 0)
grid.addWidget(self.user_id_input, 0, 1)
grid.addWidget(QLabel("序号:"), 0, 2)
grid.addWidget(self.index_input, 0, 3)
grid.addWidget(QLabel("话题:"), 1, 0)
grid.addWidget(self.topic_input, 1, 1)
grid.addWidget(QLabel("定时发布:"), 1, 2)
grid.addWidget(self.schedule_datetime, 1, 3)
grid.addWidget(QLabel("达人链接:"), 2, 0)
grid.addWidget(self.url_input, 2, 1)
grid.addWidget(QLabel("执行人:"), 2, 2)
grid.addWidget(self.executor_input, 2, 3)
# 文件夹路径(最外层文件夹)
grid.addWidget(QLabel("资料文件夹路径:"), 3, 0)
self.folder_path_input = QLineEdit()
default_path = get_default_folder_path()
self.folder_path_input.setPlaceholderText(f"留空则使用默认路径: {default_path}")
grid.addWidget(self.folder_path_input, 3, 1, 1, 2)
self.folder_browse_btn = QPushButton("浏览")
self.folder_browse_btn.setProperty("variant", "secondary")
self.folder_browse_btn.clicked.connect(self.browse_folder)
grid.addWidget(self.folder_browse_btn, 3, 3)
tip_label = QLabel("提示:只需填写最外层文件夹路径,程序会自动查找子文件夹中的文件")
tip_label.setStyleSheet("color: #666; font-size: 10px;")
grid.addWidget(tip_label, 4, 0, 1, 4)
# 更新数据按钮
update_row = QHBoxLayout()
self.update_data_btn = QPushButton("更新数据")
self.update_data_btn.setProperty("variant", "primary")
self.update_data_btn.clicked.connect(self.update_data)
update_row.addWidget(self.update_data_btn)
self.update_status_label = QLabel("未更新")
self.update_status_label.setStyleSheet("color: #666; font-size: 10px;")
update_row.addWidget(self.update_status_label)
update_row.addStretch()
update_row_widget = QWidget()
update_row_widget.setLayout(update_row)
grid.addWidget(update_row_widget, 6, 0, 1, 4)
# 批量上传勾选框
self.batch_upload_checkbox = QCheckBox("批量上传(如果文件夹中有多个视频,将使用批量上传模式)")
self.batch_upload_checkbox.setChecked(False)
grid.addWidget(self.batch_upload_checkbox, 7, 0, 1, 4)
config_layout.addLayout(grid)
config_group.setLayout(config_layout)
main_layout.addWidget(config_group)
# 配置列表表格如果从Excel导入
self.table_group = QGroupBox("配置列表从Excel导入后显示可直接在表格中编辑")
table_layout = QVBoxLayout()
self.config_table = QTableWidget()
self.config_table.setColumnCount(9)
self.config_table.setHorizontalHeaderLabels([
'多多ID', '序号', '话题', '定时发布', '间隔时间(秒)', '达人链接', '执行人', '情况', '文件路径'
])
self.config_table.horizontalHeader().setStretchLastSection(True)
# 设置表格可编辑
self.config_table.setEditTriggers(QTableWidget.AllEditTriggers)
table_layout.addWidget(self.config_table)
self.table_group.setLayout(table_layout)
self.table_group.setVisible(False) # 默认隐藏
main_layout.addWidget(self.table_group)
# 执行按钮
self.execute_btn = QPushButton("开始上传")
self.execute_btn.setProperty("variant", "accent")
self.execute_btn.clicked.connect(self.execute_task)
main_layout.addWidget(self.execute_btn)
# 日志显示区域
log_group = QGroupBox("执行日志")
log_layout = QVBoxLayout()
self.log_text = QTextEdit()
self.log_text.setReadOnly(True)
self.log_text.setFont(QFont("Consolas", 10))
log_layout.addWidget(self.log_text)
log_group.setLayout(log_layout)
main_layout.addWidget(log_group)
# 进度条
self.progress_bar = QProgressBar()
self.progress_bar.setVisible(False)
main_layout.addWidget(self.progress_bar)
# 配置日志输出保留控制台输出GUI通过信号接收
logger.remove()
logger.add(lambda msg: None) # 禁用默认输出通过信号在GUI中显示
def _build_status_card(self, title, value_label):
"""创建状态卡片"""
card = QWidget()
card.setObjectName("statusCard")
layout = QVBoxLayout(card)
layout.setContentsMargins(12, 10, 12, 10)
title_label = QLabel(title)
title_label.setObjectName("statusTitle")
value_label.setObjectName("statusValue")
layout.addWidget(title_label)
layout.addWidget(value_label)
return card
def set_status_cards(self, update_text=None, pending=None, running=None):
"""更新状态卡片显示"""
if update_text is not None:
self.status_update_value.setText(update_text)
if pending is not None:
self.status_pending_value.setText(str(pending))
if running is not None:
self.status_running_value.setText(str(running))
def set_running_progress(self, done, total):
"""更新执行中统计"""
self.running_done = done
self.running_total = total
if total > 0:
self.set_status_cards(running=f"{done}/{total}")
else:
self.set_status_cards(running="0")
def apply_styles(self):
"""应用统一的界面样式"""
self.setStyleSheet("""
QWidget {
font-family: "Microsoft YaHei";
font-size: 12px;
color: #1f1f1f;
background-color: #f6f7fb;
}
QGroupBox {
background-color: #ffffff;
border: 1px solid #e5e7eb;
border-radius: 10px;
margin-top: 14px;
}
QGroupBox::title {
subcontrol-origin: margin;
left: 12px;
padding: 0 6px;
color: #4b5563;
font-weight: 600;
}
QLabel {
color: #374151;
}
QLabel#titleLabel {
font-size: 20px;
font-weight: 700;
color: #111827;
background: transparent;
}
QLabel#subtitleLabel {
font-size: 12px;
color: #6b7280;
background: transparent;
}
QWidget#statusCard {
background-color: #ffffff;
border: 1px solid #e5e7eb;
border-radius: 10px;
}
QLabel#statusTitle {
font-size: 11px;
color: #6b7280;
}
QLabel#statusValue {
font-size: 16px;
font-weight: 700;
color: #111827;
}
QLineEdit, QDateTimeEdit, QTextEdit {
background-color: #ffffff;
border: 1px solid #d1d5db;
border-radius: 6px;
padding: 6px 8px;
}
QLineEdit:focus, QDateTimeEdit:focus, QTextEdit:focus {
border: 1px solid #3b82f6;
}
QPushButton {
border-radius: 6px;
padding: 8px 12px;
background-color: #e5e7eb;
color: #111827;
}
QPushButton[variant="secondary"] {
background-color: #f3f4f6;
color: #111827;
}
QPushButton[variant="primary"] {
background-color: #3b82f6;
color: #ffffff;
}
QPushButton[variant="accent"] {
background-color: #10b981;
color: #ffffff;
font-size: 14px;
padding: 10px 14px;
}
QPushButton:hover {
background-color: #d1d5db;
}
QPushButton[variant="primary"]:hover {
background-color: #2563eb;
}
QPushButton[variant="accent"]:hover {
background-color: #059669;
}
QTableWidget {
background-color: #ffffff;
border: 1px solid #e5e7eb;
border-radius: 8px;
gridline-color: #eef2f7;
}
QHeaderView::section {
background-color: #f3f4f6;
padding: 6px;
border: none;
font-weight: 600;
color: #374151;
}
QProgressBar {
border: 1px solid #e5e7eb;
border-radius: 6px;
background-color: #ffffff;
text-align: center;
}
QProgressBar::chunk {
background-color: #3b82f6;
border-radius: 6px;
}
""")
def browse_excel(self):
"""浏览Excel文件"""
file_path, _ = QFileDialog.getOpenFileName(
self, "选择Excel文件", "", "Excel文件 (*.xlsx *.xls)"
)
if file_path:
self.excel_path_input.setText(file_path)
def browse_folder(self):
"""浏览文件夹"""
folder_path = QFileDialog.getExistingDirectory(self, "选择文件夹")
if folder_path:
self.folder_path_input.setText(folder_path)
def import_excel(self):
"""导入Excel配置文件"""
excel_path = self.excel_path_input.text()
if not excel_path or not os.path.exists(excel_path):
QMessageBox.warning(self, "警告", "请先选择有效的Excel文件")
return
try:
# 读取Excel文件
df = pd.read_excel(excel_path)
# 检查必需的列
required_columns = ['多多id', '序号', '话题', '定时发布', '间隔时间', '达人链接', '执行人', '情况']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
QMessageBox.warning(
self, "警告",
f"Excel文件缺少以下列: {', '.join(missing_columns)}\n"
f"请确保Excel文件包含所有必需的列。"
)
return
# 转换为配置列表
self.configs = []
for _, row in df.iterrows():
config = {
'多多id': str(row.get('多多id', '')),
'序号': str(row.get('序号', '')),
'话题': str(row.get('话题', '')),
'定时发布': str(row.get('定时发布', '')) if pd.notna(row.get('定时发布')) else '',
'间隔时间': int(row.get('间隔时间', 0)) if pd.notna(row.get('间隔时间')) else 0,
'达人链接': str(row.get('达人链接', '')),
'执行人': str(row.get('执行人', '')),
'情况': str(row.get('情况', '待执行')),
'文件路径': '' # 文件路径字段初始为空,通过更新数据按钮填充
}
self.configs.append(config)
# 更新表格显示
self.update_table()
# 显示表格
self.table_group.setVisible(True)
self.set_status_cards(update_text="未更新", pending=len(self.configs))
QMessageBox.information(self, "成功", f"成功导入 {len(self.configs)} 条配置")
except Exception as e:
QMessageBox.critical(self, "错误", f"导入Excel文件失败: {str(e)}")
logger.error(f"导入Excel失败: {e}")
def update_table(self):
"""更新配置表格"""
self.config_table.setRowCount(len(self.configs))
for row_idx, config in enumerate(self.configs):
self.config_table.setItem(row_idx, 0, QTableWidgetItem(str(config.get('多多id', ''))))
self.config_table.setItem(row_idx, 1, QTableWidgetItem(str(config.get('序号', ''))))
self.config_table.setItem(row_idx, 2, QTableWidgetItem(str(config.get('话题', ''))))
self.config_table.setItem(row_idx, 3, QTableWidgetItem(str(config.get('定时发布', ''))))
self.config_table.setItem(row_idx, 4, QTableWidgetItem(str(config.get('间隔时间', 0))))
self.config_table.setItem(row_idx, 5, QTableWidgetItem(str(config.get('达人链接', ''))))
self.config_table.setItem(row_idx, 6, QTableWidgetItem(str(config.get('执行人', ''))))
self.config_table.setItem(row_idx, 7, QTableWidgetItem(str(config.get('情况', '待执行'))))
# 文件路径列第8列索引为8如果配置中没有则显示空
file_path = config.get('文件路径', '')
self.config_table.setItem(row_idx, 8, QTableWidgetItem(str(file_path)))
self.config_table.resizeColumnsToContents()
# 未更新前,用配置行数作为待执行提示
self.set_status_cards(pending=self.config_table.rowCount())
def get_config_from_table(self, row_index=0):
"""从表格中获取指定行的配置数据(使用表格中修改后的值)"""
if row_index >= self.config_table.rowCount():
return None
def get_cell_text(row, col):
"""安全获取单元格文本"""
item = self.config_table.item(row, col)
return item.text().strip() if item else ''
def get_cell_int(row, col, default=0):
"""安全获取单元格整数"""
item = self.config_table.item(row, col)
if item and item.text().strip():
try:
return int(item.text().strip())
except ValueError:
return default
return default
config = {
'多多id': get_cell_text(row_index, 0),
'序号': get_cell_text(row_index, 1),
'话题': get_cell_text(row_index, 2),
'定时发布': get_cell_text(row_index, 3),
'间隔时间': get_cell_int(row_index, 4, 0),
'达人链接': get_cell_text(row_index, 5),
'执行人': get_cell_text(row_index, 6),
'情况': get_cell_text(row_index, 7) or '待执行',
'文件路径': get_cell_text(row_index, 8) # 第8列是文件路径
}
return config
def get_config(self):
"""获取当前配置数据"""
schedule_time = self.schedule_datetime.dateTime().toString("yyyy-MM-dd HH:mm:ss")
# 获取文件夹路径,如果为空则使用默认路径
folder_path = self.folder_path_input.text().strip()
if not folder_path:
folder_path = get_default_folder_path()
return {
'多多id': self.user_id_input.text(),
'序号': self.index_input.text(),
'话题': self.topic_input.text(),
'定时发布': schedule_time if self.schedule_datetime.dateTime() > QDateTime.currentDateTime() else '',
'达人链接': self.url_input.text(),
'执行人': self.executor_input.text(),
'文件夹路径': folder_path,
'情况': '待执行'
}
def update_data(self):
"""更新数据:找出文件并保存到表格的文件路径列(更新所有行)"""
try:
# 检查是否有Excel导入的配置表格
if self.configs and self.config_table.rowCount() > 0:
# 获取文件夹路径
folder_path = self.folder_path_input.text().strip()
if not folder_path:
folder_path = get_default_folder_path()
if not os.path.exists(folder_path):
QMessageBox.warning(self, "警告", f"文件夹路径不存在: {folder_path}")
return
self.log_text.append("=" * 50)
self.log_text.append("开始批量更新所有行的文件路径...")
self.log_text.append(f"共有 {self.config_table.rowCount()} 行需要更新")
# 遍历所有行,更新每行的文件路径
total_found = 0
for row_idx in range(self.config_table.rowCount()):
config = self.get_config_from_table(row_idx)
if not config:
self.log_text.append(f"{row_idx + 1} 行:无法获取配置数据,跳过")
continue
# 验证必填字段
if not config.get('多多id') or not config.get('序号'):
self.log_text.append(f"{row_idx + 1}多多ID或序号为空跳过")
continue
self.log_text.append(f"正在更新第 {row_idx + 1} 行的文件路径...")
self.log_text.append(f" 多多ID: {config.get('多多id')}, 序号: {config.get('序号')}")
# 查找该行对应的文件
found_files = self._find_files_for_config(config, folder_path)
if found_files:
# 将找到的文件路径拼接成字符串(多个文件用分号分隔)
file_paths_str = "; ".join([str(f['path']) for f in found_files])
self.config_table.setItem(row_idx, 8, QTableWidgetItem(file_paths_str))
# 同时更新self.configs中对应的配置
if row_idx < len(self.configs):
self.configs[row_idx]['文件路径'] = file_paths_str
video_count = sum(1 for f in found_files if f['path'].is_file() and any(f['path'].suffix.lower() in ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm'] for ext in ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm']))
self.log_text.append(f" ✓ 找到 {len(found_files)} 个文件({video_count} 个视频)")
total_found += len(found_files)
else:
# 清空文件路径列
self.config_table.setItem(row_idx, 8, QTableWidgetItem(""))
if row_idx < len(self.configs):
self.configs[row_idx]['文件路径'] = ""
self.log_text.append(f" ✗ 未找到匹配的文件")
self.log_text.append("=" * 50)
self.log_text.append(f"批量更新完成!共更新 {self.config_table.rowCount()} 行,找到 {total_found} 个文件")
self.update_status_label.setText(f"已更新: {self.config_table.rowCount()}行,{total_found}个文件")
self.update_status_label.setStyleSheet("color: #4CAF50; font-size: 10px;")
self.set_status_cards(update_text=f"已更新: {self.config_table.rowCount()}", pending=total_found)
QMessageBox.information(self, "成功", f"已更新所有行的文件路径!\n{self.config_table.rowCount()} 行,找到 {total_found} 个文件")
return
else:
# 没有表格,使用原来的逻辑(单个配置)
config = self.get_config()
current_row = -1 # 表示没有表格使用prepared_files
# 验证必填字段
if not config.get('多多id') or not config.get('序号'):
QMessageBox.warning(self, "警告", "请先填写多多ID和序号")
return
# 获取文件夹路径
folder_path = config.get('文件夹路径', '')
if not folder_path:
folder_path = get_default_folder_path()
if not os.path.exists(folder_path):
QMessageBox.warning(self, "警告", f"文件夹路径不存在: {folder_path}")
return
self.log_text.append("=" * 50)
self.log_text.append("开始更新数据,查找文件...")
# 在文件夹中查找文件
found_files = self._find_files_for_config(config, folder_path)
if found_files:
self.prepared_files = found_files
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm']
video_count = sum(1 for f in found_files if f['path'].is_file() and any(f['path'].suffix.lower() == ext for ext in video_extensions))
folder_count = len(found_files) - video_count
self.log_text.append(f"更新完成!找到 {len(found_files)} 个文件/文件夹({video_count} 个视频,{folder_count} 个文件夹)")
self.update_status_label.setText(f"已更新: {len(found_files)}个文件")
self.update_status_label.setStyleSheet("color: #4CAF50; font-size: 10px;")
self.set_status_cards(update_text=f"已更新: {len(found_files)}个文件", pending=len(found_files))
QMessageBox.information(self, "成功", f"已找到 {len(found_files)} 个文件/文件夹")
else:
self.prepared_files = None
self.log_text.append("未找到匹配的文件")
self.update_status_label.setText("未找到文件")
self.update_status_label.setStyleSheet("color: #f44336; font-size: 10px;")
self.set_status_cards(update_text="未找到文件", pending=0)
QMessageBox.warning(self, "警告", "未找到匹配的文件")
except Exception as e:
error_msg = f"更新数据失败: {str(e)}"
self.log_text.append(error_msg)
QMessageBox.critical(self, "错误", error_msg)
logger.error(f"更新数据失败: {e}")
import traceback
traceback.print_exc()
def _find_files_for_config(self, config, folder_path):
"""根据配置查找文件(辅助方法)"""
found_files = []
index = config.get('序号', '')
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm']
try:
# 遍历最外层文件夹下的所有子文件夹
subdirs = [f for f in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, f))]
# 找到匹配当前多多ID的文件夹
target_subdir = None
for subdir_name in subdirs:
if subdir_name == str(config.get('多多id')):
target_subdir = os.path.join(folder_path, subdir_name)
break
if not target_subdir:
return found_files
# 扫描该文件夹下的文件
items = os.listdir(target_subdir)
for item_name in items:
item_path = os.path.join(target_subdir, item_name)
name_parts = item_name.split("-")
# 检查序号是否匹配
if len(name_parts) > 0 and name_parts[0] == str(index):
path_obj = Path(item_path)
if path_obj.is_file():
# 检查是否为视频文件
if any(path_obj.suffix.lower() == ext for ext in video_extensions):
found_files.append({
"url": config.get('达人链接', ''),
"user_id": config.get('多多id', ''),
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
"ht": config.get('话题', ''),
"index": str(index),
"path": path_obj
})
else:
# 如果是文件夹,可能是图片文件夹
found_files.append({
"url": config.get('达人链接', ''),
"user_id": config.get('多多id', ''),
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
"ht": config.get('话题', ''),
"index": str(index),
"path": path_obj
})
except Exception as e:
logger.error(f"查找文件失败: {e}")
return found_files
def execute_task(self):
"""执行任务"""
# 检查是否有Excel导入的配置
if self.configs:
# 如果有Excel配置批量处理
self.execute_batch_from_excel()
else:
# 否则使用当前输入的配置
config = self.get_config()
# 验证必填字段
if not config.get('多多id') or not config.get('序号'):
QMessageBox.warning(self, "警告", "请填写所有必填字段多多ID、序号")
return
folder_path = config.get('文件夹路径', '')
if not folder_path:
folder_path = get_default_folder_path()
config['文件夹路径'] = folder_path
if not os.path.exists(folder_path):
QMessageBox.warning(self, "警告", f"文件夹路径不存在: {folder_path}\n请检查路径或填写正确的文件夹路径")
return
# 显示使用的文件夹路径
self.log_text.append(f"使用文件夹路径: {folder_path}")
# 检查是否勾选了批量上传
is_batch_mode = self.batch_upload_checkbox.isChecked()
# 如果已经更新了数据,根据预查找的文件判断是否为批量上传
if self.prepared_files:
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm']
video_files = [f for f in self.prepared_files if f['path'].is_file() and any(f['path'].suffix.lower() == ext for ext in video_extensions)]
if is_batch_mode and len(video_files) > 1:
self.log_text.append(f"检测到 {len(video_files)} 个视频文件,使用批量上传模式...")
elif is_batch_mode and len(video_files) <= 1:
self.log_text.append(f"只找到 {len(video_files)} 个视频文件,将使用单个上传模式...")
is_batch_mode = False
elif not is_batch_mode:
self.log_text.append(f"使用单个上传模式...")
# 禁用按钮
self.execute_btn.setEnabled(False)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
# 创建工作线程,传递预查找的文件列表
self.worker_thread = WorkerThread(config, is_batch_mode, self.prepared_files, self)
# 连接finished信号用于单个任务完成后显示弹窗
self.worker_thread.finished.connect(self.on_task_finished)
self.worker_thread.log_message.connect(self.log_text.append)
self.worker_thread.progress.connect(self.progress_bar.setValue)
self.worker_thread.start()
self.set_status_cards(pending=1)
self.set_running_progress(0, 1)
self.log_text.append("=" * 50)
mode_text = "批量上传" if is_batch_mode else "逐个上传"
if self.prepared_files:
self.log_text.append(f"开始执行任务({mode_text}模式,使用预查找的文件列表)...")
else:
self.log_text.append(f"开始执行任务({mode_text}模式)...")
def execute_batch_from_excel(self):
"""从Excel配置批量执行自动判断相同多多ID的mp4文件批量上传"""
# 获取文件夹路径,如果为空则使用默认路径
folder_path = self.folder_path_input.text().strip()
if not folder_path:
folder_path = get_default_folder_path()
if not os.path.exists(folder_path):
QMessageBox.warning(self, "警告", f"文件夹路径不存在: {folder_path}")
return
# 从表格中获取所有配置(使用用户修改后的值)
if self.config_table.rowCount() == 0:
QMessageBox.warning(self, "警告", "配置列表为空请先导入Excel配置")
return
self.log_text.append("=" * 50)
self.log_text.append("开始分析配置,准备批量上传...")
# 收集所有配置及其对应的文件
all_configs_with_files = []
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm']
for row_idx in range(self.config_table.rowCount()):
config = self.get_config_from_table(row_idx)
if not config:
continue
# 验证必填字段
if not config.get('多多id') or not config.get('序号'):
self.log_text.append(f"{row_idx + 1}多多ID或序号为空跳过")
continue
# 添加文件夹路径
config['文件夹路径'] = folder_path
# 从文件路径列读取文件路径
file_path_str = config.get('文件路径', '').strip()
if file_path_str:
# 文件路径可能用分号分隔(多个文件)
file_paths = [p.strip() for p in file_path_str.split(';') if p.strip()]
files = []
for fp in file_paths:
if os.path.exists(fp):
path_obj = Path(fp)
files.append({
"url": config.get('达人链接', ''),
"user_id": config.get('多多id', ''),
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
"ht": config.get('话题', ''),
"index": config.get('序号', ''),
"path": path_obj
})
if files:
all_configs_with_files.append({
'config': config,
'files': files,
'row_idx': row_idx
})
self.log_text.append(f"{row_idx + 1} 行:从文件路径列读取到 {len(files)} 个文件")
else:
# 如果文件路径列为空,尝试查找文件
self.log_text.append(f"{row_idx + 1} 行:文件路径列为空,尝试查找文件...")
found_files = self._find_files_for_config(config, folder_path)
if found_files:
all_configs_with_files.append({
'config': config,
'files': found_files,
'row_idx': row_idx
})
self.log_text.append(f"{row_idx + 1} 行:找到 {len(found_files)} 个文件")
if not all_configs_with_files:
QMessageBox.warning(self, "警告", "未找到任何文件,请先点击'更新数据'按钮")
return
total_tasks = sum(len(item['files']) for item in all_configs_with_files)
pending_tasks = total_tasks
self.set_status_cards(pending=pending_tasks)
self.set_running_progress(0, total_tasks)
# 按多多ID分组
from collections import defaultdict
grouped_by_user_id = defaultdict(list)
for item in all_configs_with_files:
user_id = item['config'].get('多多id', '')
grouped_by_user_id[user_id].append(item)
self.log_text.append("=" * 50)
self.log_text.append(f"按多多ID分组{len(grouped_by_user_id)} 个不同的多多ID")
# 禁用按钮
self.execute_btn.setEnabled(False)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
self.set_status_cards(running=1)
# 处理每个多多ID组
total_processed = 0
for user_id, items in grouped_by_user_id.items():
self.log_text.append(f"\n处理多多ID: {user_id},共 {len(items)} 个配置")
# 收集该多多ID下的所有文件
all_files = []
for item in items:
all_files.extend(item['files'])
# 分离视频文件和图片文件夹
video_files = [f for f in all_files if f['path'].is_file() and any(f['path'].suffix.lower() == ext for ext in video_extensions)]
image_folders = [f for f in all_files if f['path'].is_dir()]
self.log_text.append(f" 视频文件: {len(video_files)}")
self.log_text.append(f" 图片文件夹: {len(image_folders)}")
# 使用第一个配置创建Pdd实例因为同一个多多ID配置应该相同
first_config = items[0]['config']
pdd = Pdd(
url=first_config.get('达人链接', ''),
user_id=user_id,
time_start=first_config.get('定时发布', '') if first_config.get('定时发布') else None,
ht=first_config.get('话题', ''),
index=first_config.get('序号', ''),
title=first_config.get('标题', None)
)
# 第一步:如果有多个视频文件(>1批量上传所有视频
if len(video_files) > 1:
# 检查是否都是mp4文件
all_mp4 = all(f['path'].suffix.lower() == '.mp4' for f in video_files)
if all_mp4:
self.log_text.append(f" ✓ 检测到 {len(video_files)} 个mp4文件批量上传所有视频action1方法")
else:
self.log_text.append(f" ✓ 检测到 {len(video_files)} 个视频文件批量上传所有视频action1方法")
# 批量上传所有视频
self.worker_thread = WorkerThread(first_config, True, video_files, self)
# 不连接finished信号避免每个任务完成就弹窗
self.worker_thread.log_message.connect(self.log_text.append)
self.worker_thread.progress.connect(self.progress_bar.setValue)
self.worker_thread.start()
# 等待完成
self.worker_thread.wait()
total_processed += len(video_files)
pending_tasks = max(pending_tasks - len(video_files), 0)
self.set_status_cards(pending=pending_tasks)
self.set_running_progress(total_processed, total_tasks)
self.log_text.append(f" ✓ 批量上传 {len(video_files)} 个视频完成")
elif len(video_files) == 1:
# 只有1个视频单个上传
self.log_text.append(f" → 只有1个视频文件单个上传")
self.worker_thread = WorkerThread(first_config, False, video_files, self)
# 不连接finished信号避免每个任务完成就弹窗
self.worker_thread.log_message.connect(self.log_text.append)
self.worker_thread.progress.connect(self.progress_bar.setValue)
self.worker_thread.start()
self.worker_thread.wait()
total_processed += len(video_files)
pending_tasks = max(pending_tasks - len(video_files), 0)
self.set_status_cards(pending=pending_tasks)
self.set_running_progress(total_processed, total_tasks)
self.log_text.append(f" ✓ 单个视频上传完成")
# 第二步:如果有图片文件夹,逐个上传图片
if image_folders:
self.log_text.append(f" → 开始逐个上传 {len(image_folders)} 个图片文件夹")
for idx, img_folder in enumerate(image_folders, 1):
# 找到该图片文件夹对应的配置(通过序号匹配)
folder_index = img_folder.get('index', '')
matching_config = None
for item in items:
if item['config'].get('序号', '') == folder_index:
matching_config = item['config']
break
# 如果找不到匹配的配置,使用第一个配置
if not matching_config:
matching_config = first_config
self.log_text.append(f" 上传第 {idx}/{len(image_folders)} 个图片文件夹(序号: {folder_index}")
pdd_img = Pdd(
url=matching_config.get('达人链接', ''),
user_id=user_id,
time_start=matching_config.get('定时发布', '') if matching_config.get('定时发布') else None,
ht=matching_config.get('话题', ''),
index=folder_index,
title=matching_config.get('标题', None)
)
self.worker_thread = WorkerThread(matching_config, False, [img_folder], self)
# 不连接finished信号避免每个任务完成就弹窗
self.worker_thread.log_message.connect(self.log_text.append)
self.worker_thread.progress.connect(self.progress_bar.setValue)
self.worker_thread.start()
self.worker_thread.wait()
total_processed += 1
pending_tasks = max(pending_tasks - 1, 0)
self.set_status_cards(pending=pending_tasks)
self.set_running_progress(total_processed, total_tasks)
self.log_text.append(f" ✓ 图片文件夹 {idx} 上传完成")
self.log_text.append(f" ✓ 所有图片文件夹上传完成")
self.progress_bar.setValue(100)
self.log_text.append("=" * 50)
self.log_text.append(f"所有任务执行完成!共处理 {total_processed} 个文件/文件夹")
self.execute_btn.setEnabled(True)
self.set_running_progress(0, 0)
self.set_status_cards(pending=0)
# 所有任务完成后,显示完成弹窗
QMessageBox.information(self, "任务完成", f"所有任务执行完成!\n共处理 {total_processed} 个文件/文件夹")
def count_videos_in_folder(self, folder_path, index):
"""统计文件夹中匹配序号的视频文件数量与main.py逻辑一致"""
count = 0
try:
# 遍历最外层文件夹下的所有子文件夹与main.py逻辑一致
for file in os.listdir(folder_path): # 获取文件夹下所有的文件夹
file_path = os.path.join(folder_path, file) # 拼接文件夹
# 检查是否为目录,跳过文件(如.lnk快捷方式
if not os.path.isdir(file_path):
continue
files = os.listdir(file_path) # 获取用户id下的文件
for file_name in files:
# 检查是否是视频文件(.mp4
if ".mp4" in file_name:
# 用"-"分割文件名,检查第一部分是否等于序号
# 文件名格式4-茶叶蛋大冒险-.mp4 -> ['4', '茶叶蛋大冒险', '', 'mp4']
file_names = file_name.split("-")
if len(file_names) > 0 and file_names[0] == str(index):
path = Path(os.path.join(file_path, file_name))
# 判断是否为文件
if path.is_file():
count += 1
else:
# 如果是文件夹,统计其中的文件
for sub_file in os.listdir(path):
sub_path = Path(os.path.join(path, sub_file))
if sub_path.is_file():
count += 1
except Exception as e:
logger.error(f"统计视频文件失败: {e}")
return count
def on_task_finished(self, success, message):
"""任务完成回调"""
self.progress_bar.setValue(100)
self.set_running_progress(0, 0)
self.set_status_cards(pending=0)
if success:
QMessageBox.information(self, "成功", message)
else:
QMessageBox.critical(self, "失败", message)
# 恢复按钮
self.execute_btn.setEnabled(True)
self.log_text.append(f"任务完成: {message}")
self.log_text.append("=" * 50)
def closeEvent(self, event):
"""关闭事件"""
if self.worker_thread and self.worker_thread.isRunning():
reply = QMessageBox.question(
self, "确认", "任务正在执行中,确定要退出吗?",
QMessageBox.Yes | QMessageBox.No
)
if reply == QMessageBox.Yes:
if self.worker_thread:
self.worker_thread.stop()
self.worker_thread.wait()
event.accept()
else:
event.ignore()
else:
event.accept()
def main():
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec_())
if __name__ == '__main__':
main()
# docker run honeygain/honeygain -tou-accept -email ddrwode1@gmail.com -pass 040828cjj -device DEVICE_NAME