import os import sys import json import time import re from pathlib import Path from datetime import datetime from typing import List, Dict, Optional import pandas as pd from PyQt5.QtWidgets import ( QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QFileDialog, QTableWidgetItem, QMessageBox, QDateTimeEdit, QGridLayout, QStackedWidget, QButtonGroup, QStyle, QComboBox, QFrame, QShortcut, QMenu, QAbstractItemView, QTableView, QStyledItemDelegate, QStyleOptionProgressBar, QStyleOptionButton, QHeaderView, QTabWidget, QSplitter, QSizePolicy ) from PyQt5.QtCore import Qt, QThread, pyqtSignal, QDateTime, QSize, QAbstractTableModel, QModelIndex, \ QSortFilterProxyModel, QRegularExpression, QSettings from PyQt5.QtGui import QFont, QTextDocument, QTextCursor, QKeySequence, QColor, QPainter from qfluentwidgets import ( setTheme, Theme, PushButton, PrimaryPushButton, LineEdit, TextEdit, TableWidget, CheckBox, ProgressBar, CardWidget, InfoBar, InfoBarPosition ) from main import Pdd from loguru import logger def get_default_folder_path(): """获取默认文件夹路径(桌面/多多自动化发文)""" desktop = os.path.join(os.path.expanduser("~"), "Desktop") default_path = os.path.join(desktop, "多多自动化发文") return default_path class WorkerThread(QThread): """工作线程,用于执行自动化任务""" finished = pyqtSignal(bool, str) log_message = pyqtSignal(str) progress = pyqtSignal(int) def __init__(self, config_data, is_batch_mode, prepared_files=None, parent=None): super().__init__(parent) self.config_data = config_data self.is_batch_mode = is_batch_mode self.prepared_files = prepared_files # 预查找的文件列表 self.is_running = True def run(self): try: # 配置日志输出到GUI logger.remove() logger.add(lambda msg: self.log_message.emit(str(msg).strip())) if self.is_batch_mode: self.run_batch_mode() else: self.run_single_mode() except Exception as e: error_msg = f"执行失败: {str(e)}" self.finished.emit(False, error_msg) self.log_message.emit(error_msg) logger.error(f"执行失败: {e}") import traceback traceback_str = traceback.format_exc() logger.error(traceback_str) self.log_message.emit(f"错误详情: {traceback_str}") def run_single_mode(self): """执行单个上传模式(action方法)""" try: config = self.config_data self.log_message.emit(f"开始处理单个任务(逐个上传模式)...") pdd = Pdd( url=config.get('达人链接', ''), user_id=config.get('多多id', ''), time_start=config.get('定时发布', '') if config.get('定时发布') else None, ht=config.get('话题', ''), index=config.get('序号', ''), title=config.get('标题', None) ) # 如果已经预查找了文件,直接使用 if self.prepared_files: self.log_message.emit(f"使用预查找的文件列表,共 {len(self.prepared_files)} 个文件") self.progress.emit(30) # 判断是否为视频文件 video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm'] video_files = [f for f in self.prepared_files if f['path'].is_file() and any(f['path'].suffix.lower() == ext for ext in video_extensions)] # 如果勾选了批量上传且是视频,调用action1方法 if self.is_batch_mode and video_files: # 批量上传视频,调用action1方法 self.log_message.emit(f"批量上传模式:找到 {len(video_files)} 个视频文件,调用action1方法") try: pdd.action1(folder_path=video_files) self.progress.emit(100) self.finished.emit(True, f"批量上传完成,共处理 {len(video_files)} 个视频") except Exception as e: error_msg = f"执行action1方法失败: {str(e)}" self.log_message.emit(error_msg) logger.error(error_msg) import traceback traceback_str = traceback.format_exc() logger.error(traceback_str) self.log_message.emit(f"错误详情: {traceback_str}") self.finished.emit(False, error_msg) return else: # 单个上传模式,使用action方法 # 从预查找的文件中提取文件夹路径(取第一个文件的父目录的父目录,回到最外层文件夹) if self.prepared_files: first_file = self.prepared_files[0] if first_file['path'].is_file(): # 文件路径:最外层文件夹/多多ID文件夹/文件名 # 需要回到最外层文件夹 folder_path = str(first_file['path'].parent.parent) else: # 文件夹路径:最外层文件夹/多多ID文件夹/文件夹名 folder_path = str(first_file['path'].parent.parent) self.log_message.emit(f"使用文件夹路径: {folder_path}") try: pdd.action(folder_path=folder_path, collect_all_videos=False) self.progress.emit(100) self.finished.emit(True, "单个任务执行完成") except Exception as e: error_msg = f"执行action方法失败: {str(e)}" self.log_message.emit(error_msg) logger.error(error_msg) import traceback traceback_str = traceback.format_exc() logger.error(traceback_str) self.log_message.emit(f"错误详情: {traceback_str}") self.finished.emit(False, error_msg) return else: # 未预查找文件,使用原来的逻辑 folder_path = config.get('文件夹路径', '').strip() if not folder_path: folder_path = get_default_folder_path() if not os.path.exists(folder_path): error_msg = f"文件夹路径不存在: {folder_path}" self.finished.emit(False, error_msg) self.log_message.emit(error_msg) return self.log_message.emit(f"使用文件夹路径: {folder_path}") self.progress.emit(30) # 检查是否勾选了批量上传 is_batch_mode = self.is_batch_mode # 调用action方法 try: pdd.action(folder_path=folder_path, collect_all_videos=is_batch_mode) except Exception as e: error_msg = f"执行action方法失败: {str(e)}" self.log_message.emit(error_msg) logger.error(error_msg) import traceback traceback_str = traceback.format_exc() logger.error(traceback_str) self.log_message.emit(f"错误详情: {traceback_str}") self.finished.emit(False, error_msg) return self.progress.emit(100) self.finished.emit(True, "单个任务执行完成") except Exception as e: error_msg = f"执行失败: {str(e)}" self.finished.emit(False, error_msg) self.log_message.emit(error_msg) logger.error(f"执行失败: {e}") import traceback traceback_str = traceback.format_exc() logger.error(traceback_str) self.log_message.emit(f"错误详情: {traceback_str}") def run_batch_mode(self): """执行批量上传模式:先批量上传所有视频,然后逐个上传图片""" try: config = self.config_data self.log_message.emit(f"开始处理批量上传任务...") index = config.get('序号', '') # 创建Pdd实例 pdd = Pdd( url=config.get('达人链接', ''), user_id=config.get('多多id', ''), time_start=config.get('定时发布', '') if config.get('定时发布') else None, ht=config.get('话题', ''), index=index, title=config.get('标题', None) ) # 如果已经预查找了文件,直接使用 if self.prepared_files: self.log_message.emit(f"使用预查找的文件列表,共 {len(self.prepared_files)} 个文件") self.progress.emit(30) # 分离视频文件和图片文件夹 video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm'] video_file_paths = [f for f in self.prepared_files if f['path'].is_file() and any( f['path'].suffix.lower() == ext for ext in video_extensions)] image_folders = [f for f in self.prepared_files if f['path'].is_dir()] # 如果有视频,批量上传所有视频 if video_file_paths: self.log_message.emit(f"找到 {len(video_file_paths)} 个视频文件,开始批量上传...") self.progress.emit(30) pdd.action1(folder_path=video_file_paths) self.log_message.emit(f"批量上传视频完成,共处理 {len(video_file_paths)} 个视频") else: self.log_message.emit("未找到视频文件,跳过视频上传") # 如果有图片文件夹,逐个上传图片 if image_folders: self.log_message.emit(f"找到 {len(image_folders)} 个图片文件夹,开始逐个上传...") self.progress.emit(70) for idx, image_folder_info in enumerate(image_folders): if not self.is_running: break image_folder = str(image_folder_info['path']) self.log_message.emit(f"处理第 {idx + 1}/{len(image_folders)} 个图片文件夹: {image_folder}") # 使用action方法处理图片文件夹 pdd.action(folder_path=image_folder) self.log_message.emit(f"图片文件夹 {idx + 1} 上传完成") self.log_message.emit(f"所有图片文件夹上传完成,共处理 {len(image_folders)} 个文件夹") else: self.log_message.emit("未找到图片文件夹,跳过图片上传") self.progress.emit(100) total_count = len(video_file_paths) + len(image_folders) self.finished.emit(True, f"批量任务执行完成,共处理 {len(video_file_paths)} 个视频和 {len(image_folders)} 个图片文件夹") else: # 未预查找文件,使用原来的逻辑 folder_path = config.get('文件夹路径', '').strip() if not folder_path: folder_path = get_default_folder_path() if not os.path.exists(folder_path): self.finished.emit(False, f"文件夹路径不存在: {folder_path}") return # 第一步:收集所有视频文件 self.log_message.emit("第一步:收集所有视频文件...") video_file_paths = self.prepare_batch_files(folder_path, config) # 第二步:收集所有图片文件夹 self.log_message.emit("第二步:收集所有图片文件夹...") image_folders = self.prepare_image_folders(folder_path, index) # 第三步:如果有视频,批量上传所有视频 if video_file_paths: self.log_message.emit(f"找到 {len(video_file_paths)} 个视频文件,开始批量上传...") self.progress.emit(30) pdd.action1(folder_path=video_file_paths) self.log_message.emit(f"批量上传视频完成,共处理 {len(video_file_paths)} 个视频") else: self.log_message.emit("未找到视频文件,跳过视频上传") # 第四步:如果有图片文件夹,逐个上传图片 if image_folders: self.log_message.emit(f"找到 {len(image_folders)} 个图片文件夹,开始逐个上传...") self.progress.emit(70) for idx, image_folder in enumerate(image_folders): if not self.is_running: break self.log_message.emit(f"处理第 {idx + 1}/{len(image_folders)} 个图片文件夹: {image_folder}") # 使用action方法处理图片文件夹 pdd.action(folder_path=image_folder) self.log_message.emit(f"图片文件夹 {idx + 1} 上传完成") self.log_message.emit(f"所有图片文件夹上传完成,共处理 {len(image_folders)} 个文件夹") else: self.log_message.emit("未找到图片文件夹,跳过图片上传") self.progress.emit(100) total_count = len(video_file_paths) + len(image_folders) self.finished.emit(True, f"批量任务执行完成,共处理 {len(video_file_paths)} 个视频和 {len(image_folders)} 个图片文件夹") except Exception as e: self.finished.emit(False, f"批量执行失败: {str(e)}") logger.error(f"批量执行失败: {e}") def prepare_image_folders(self, folder_path, index): """收集所有匹配序号的图片文件夹(与main.py中action方法的逻辑一致)""" image_folders = [] try: # 参数验证 if not folder_path or not os.path.exists(folder_path): logger.warning(f"文件夹路径无效或不存在: {folder_path}") return image_folders if not os.access(folder_path, os.R_OK): logger.error(f"没有权限读取文件夹: {folder_path}") return image_folders # 遍历最外层文件夹下的所有子文件夹(与main.py逻辑一致) try: subdirs = os.listdir(folder_path) except PermissionError: logger.error(f"没有权限访问文件夹: {folder_path}") return image_folders except Exception as e: logger.error(f"读取文件夹失败: {e}") return image_folders for file in subdirs: try: file_path = os.path.join(folder_path, file) # 拼接文件夹 # 检查是否为目录,跳过文件(如.lnk快捷方式) if not os.path.isdir(file_path): continue # 获取用户id下的文件 try: files = os.listdir(file_path) except PermissionError: logger.warning(f"没有权限访问子文件夹: {file_path}") continue except Exception as e: logger.warning(f"读取子文件夹失败: {file_path}, 错误: {e}") continue for file_name in files: try: # 跳过隐藏文件 if file_name.startswith('.'): continue # 用"-"分割文件名,检查第一部分是否等于序号 file_names = file_name.split("-") if len(file_names) > 0 and file_names[0] == str(index): path = Path(os.path.join(file_path, file_name)) # 确保路径存在 if path.exists() and path.is_dir(): # 这是一个图片文件夹 image_folders.append(str(path)) except Exception as e: logger.warning(f"处理文件 {file_name} 时出错: {e}") continue except Exception as e: logger.warning(f"处理子文件夹 {file} 时出错: {e}") continue except Exception as e: logger.error(f"收集图片文件夹失败: {e}", exc_info=True) return image_folders def count_videos_in_folder(self, folder_path, index): """统计文件夹中匹配序号的视频文件数量(与main.py逻辑一致)""" count = 0 try: # 遍历最外层文件夹下的所有子文件夹(与main.py逻辑一致) for file in os.listdir(folder_path): # 获取文件夹下所有的文件夹 file_path = os.path.join(folder_path, file) # 拼接文件夹 # 检查是否为目录,跳过文件(如.lnk快捷方式) if not os.path.isdir(file_path): continue files = os.listdir(file_path) # 获取用户id下的文件 for file_name in files: # 检查是否是视频文件(.mp4) if ".mp4" in file_name: # 用"-"分割文件名,检查第一部分是否等于序号 # 文件名格式:4-茶叶蛋大冒险-.mp4 -> ['4', '茶叶蛋大冒险', '', 'mp4'] file_names = file_name.split("-") if len(file_names) > 0 and file_names[0] == str(index): path = Path(os.path.join(file_path, file_name)) # 判断是否为文件 if path.is_file(): count += 1 else: # 如果是文件夹,统计其中的文件 for sub_file in os.listdir(path): sub_path = Path(os.path.join(path, sub_file)) if sub_path.is_file(): count += 1 except Exception as e: logger.error(f"统计视频文件失败: {e}") return count def prepare_batch_files(self, folder_path, config): """准备批量上传的文件列表(与main.py中的action1方法逻辑一致)""" file_paths = [] # 参数验证 if not config: logger.error("配置为空,无法准备批量上传文件") return file_paths index = str(config.get('序号', '')).strip() if not index: logger.warning("序号为空,无法准备批量上传文件") return file_paths if not folder_path or not os.path.exists(folder_path): logger.error(f"文件夹路径无效或不存在: {folder_path}") return file_paths if not os.access(folder_path, os.R_OK): logger.error(f"没有权限读取文件夹: {folder_path}") return file_paths logger.info("=" * 50) logger.info("开始准备批量上传文件列表...") logger.info(f"文件夹路径: {folder_path}") logger.info(f"查找序号: {index}") try: # 遍历最外层文件夹下的所有子文件夹(与main.py逻辑一致) try: all_items = os.listdir(folder_path) subdirs = [f for f in all_items if os.path.isdir(os.path.join(folder_path, f))] except PermissionError: logger.error(f"没有权限访问文件夹: {folder_path}") return file_paths except Exception as e: logger.error(f"读取文件夹失败: {e}") return file_paths logger.info(f"在最外层文件夹下找到 {len(subdirs)} 个子文件夹(多多ID文件夹)") for subdir_name in subdirs: file_path = os.path.join(folder_path, subdir_name) # 拼接文件夹 logger.info(f" 正在扫描子文件夹: {subdir_name}") # 检查是否为目录,跳过文件(如.lnk快捷方式) if not os.path.isdir(file_path): logger.info(f" 跳过(不是文件夹)") continue files = os.listdir(file_path) # 获取用户id下的文件 logger.info(f" 该文件夹下有 {len(files)} 个项目") for file_name in files: logger.info(f" 检查项目: {file_name}") # 检查是否是视频文件(.mp4) if ".mp4" in file_name: logger.info(f" ✓ 是视频文件(包含.mp4)") # 用"-"分割文件名,检查第一部分是否等于序号 # 文件名格式:4-茶叶蛋大冒险-.mp4 -> ['4', '茶叶蛋大冒险', '', 'mp4'] file_names = file_name.split("-") logger.info(f" 文件名分割结果: {file_names}") if len(file_names) > 0 and file_names[0] == str(index): logger.info(f" ✓ 序号匹配!序号: {file_names[0]}, 目标序号: {index}") path = Path(os.path.join(file_path, file_name)) # 判断是否为文件 if path.is_file(): logger.info(f" ✓ 是文件,添加到列表: {path.name}") file_paths.append({ "url": config.get('达人链接', ''), "user_id": config.get('多多id', ''), "time_start": config.get('定时发布', '') if config.get('定时发布') else None, "ht": config.get('话题', ''), "index": str(index), "path": path }) logger.info(f" 当前视频总数: {len(file_paths)}") # 注意:这里不break,因为可能有多个匹配的文件 else: logger.info(f" 是文件夹,扫描其中的文件...") # 如果是文件夹,添加其中的所有文件 try: sub_files = os.listdir(path) logger.info(f" 文件夹中有 {len(sub_files)} 个文件") for sub_file in sub_files: sub_path = Path(os.path.join(path, sub_file)) if sub_path.is_file(): logger.info(f" ✓ 添加文件: {sub_file}") file_paths.append({ "url": config.get('达人链接', ''), "user_id": config.get('多多id', ''), "time_start": config.get('定时发布', '') if config.get( '定时发布') else None, "ht": config.get('话题', ''), "index": str(index), "path": sub_path }) logger.info(f" 当前视频总数: {len(file_paths)}") except Exception as e: logger.error(f" 扫描子文件夹失败: {e}") else: if len(file_names) > 0: logger.info(f" ✗ 序号不匹配: {file_names[0]} != {index}") else: logger.info(f" ✗ 文件名格式不正确,无法提取序号") else: logger.info(f" ✗ 不是视频文件(不包含.mp4),跳过") except Exception as e: logger.error(f"准备批量文件失败: {e}") import traceback traceback.print_exc() logger.info("=" * 50) logger.info(f"文件收集完成!共找到 {len(file_paths)} 个视频文件") if file_paths: logger.info("视频文件列表:") for idx, video_info in enumerate(file_paths, 1): logger.info(f" {idx}. {video_info['path'].name} ({video_info['path']})") return file_paths def stop(self): """停止执行""" self.is_running = False def requestInterruption(self): """请求中断线程(重写QThread方法)""" super().requestInterruption() self.is_running = False class ConfigTableModel(QAbstractTableModel): """大数据量表格模型""" def __init__(self, configs, headers, parent=None): super().__init__(parent) self._configs = configs self._headers = headers def rowCount(self, parent=QModelIndex()): return len(self._configs) def columnCount(self, parent=QModelIndex()): return len(self._headers) def data(self, index, role=Qt.DisplayRole): if not index.isValid() or role != Qt.DisplayRole: return None row = index.row() col = index.column() config = self._configs[row] mapping = { 0: "多多id", 1: "序号", 2: "话题", 3: "定时发布", 4: "间隔时间", 5: "达人链接", 6: "执行人", 7: "情况", 8: "文件路径", 9: "", 10: "", } key = mapping.get(col, "") return str(config.get(key, "")) if key else "" def headerData(self, section, orientation, role=Qt.DisplayRole): if role != Qt.DisplayRole: return None if orientation == Qt.Horizontal: return self._headers[section] return section + 1 def flags(self, index): if not index.isValid(): return Qt.NoItemFlags return Qt.ItemIsEnabled | Qt.ItemIsSelectable | Qt.ItemIsEditable def setData(self, index, value, role=Qt.EditRole): if role != Qt.EditRole or not index.isValid(): return False row = index.row() col = index.column() mapping = { 0: "多多id", 1: "序号", 2: "话题", 3: "定时发布", 4: "间隔时间", 5: "达人链接", 6: "执行人", 7: "情况", 8: "文件路径", } key = mapping.get(col, "") if not key: return False self._configs[row][key] = str(value) self.dataChanged.emit(index, index, [Qt.DisplayRole]) return True def removeRows(self, row, count, parent=QModelIndex()): if row < 0 or row + count > len(self._configs): return False self.beginRemoveRows(parent, row, row + count - 1) del self._configs[row:row + count] self.endRemoveRows() return True class TableActionDelegate(QStyledItemDelegate): """Model/View 操作列 + 进度列 delegate""" def __init__(self, parent, on_edit, on_delete): super().__init__(parent) self.on_edit = on_edit self.on_delete = on_delete def paint(self, painter, option, index): if index.column() == 9: status_text = index.sibling(index.row(), 7).data() or "" value = 0 if "完成" in status_text or "成功" in status_text: value = 100 elif "执行中" in status_text or "进行" in status_text: value = 60 elif "待" in status_text: value = 10 bar = QStyleOptionProgressBar() bar.rect = option.rect.adjusted(6, option.rect.height() // 3, -6, -option.rect.height() // 3) bar.minimum = 0 bar.maximum = 100 bar.progress = value bar.textVisible = False QApplication.style().drawControl(QStyle.CE_ProgressBar, bar, painter) return if index.column() == 10: rect = option.rect btn_w = (rect.width() - 12) // 2 edit_rect = rect.adjusted(6, 4, -rect.width() + btn_w + 6, -4) delete_rect = rect.adjusted(6 + btn_w + 6, 4, -6, -4) edit_btn = QStyleOptionButton() edit_btn.rect = edit_rect edit_btn.text = "编辑" delete_btn = QStyleOptionButton() delete_btn.rect = delete_rect delete_btn.text = "删除" QApplication.style().drawControl(QStyle.CE_PushButton, edit_btn, painter) QApplication.style().drawControl(QStyle.CE_PushButton, delete_btn, painter) return super().paint(painter, option, index) def editorEvent(self, event, model, option, index): if index.column() != 10: return super().editorEvent(event, model, option, index) if event.type() == event.MouseButtonRelease: rect = option.rect btn_w = (rect.width() - 12) // 2 edit_rect = rect.adjusted(6, 4, -rect.width() + btn_w + 6, -4) delete_rect = rect.adjusted(6 + btn_w + 6, 4, -6, -4) if edit_rect.contains(event.pos()): self.on_edit(index) return True if delete_rect.contains(event.pos()): self.on_delete(index) return True return False def update_data(self, configs): self.beginResetModel() self._configs = configs self.endResetModel() class MainWindow(QMainWindow): """主窗口""" def __init__(self): super().__init__() self.worker_thread = None self.configs = [] # 存储从Excel导入的配置 self.prepared_files = None # 存储通过"更新数据"找到的文件列表 self.running_total = 0 self.running_done = 0 self.nav_compact = False self.table_match_rows = [] self.table_match_index = -1 self.table_sort_keys = [] self.page_size = 20 self.current_page = 1 self.page_row_indices = [] self.use_model_view = False self.table_model = None self.table_proxy = None self.log_match_positions = [] self.log_match_index = -1 self.log_match_items = [] self.is_updating_table = False self._is_closing = False # 标记是否正在关闭窗口 self.init_ui() def init_ui(self): self.setWindowTitle("拼多多自动化发布工具") self.setGeometry(100, 100, 1000, 800) # 创建中央部件 central_widget = QWidget() self.setCentralWidget(central_widget) # 主布局 main_layout = QVBoxLayout() main_layout.setContentsMargins(20, 20, 20, 20) main_layout.setSpacing(14) central_widget.setLayout(main_layout) # 顶部标题区 header_layout = QHBoxLayout() title_box = QVBoxLayout() title_label = QLabel("拼多多自动化发布工具") title_label.setFont(QFont("Microsoft YaHei", 16, QFont.Bold)) subtitle_label = QLabel("配置导入 • 文件查找 • 批量上传") subtitle_label.setFont(QFont("Microsoft YaHei", 10)) title_box.addWidget(title_label) title_box.addWidget(subtitle_label) header_layout.addLayout(title_box) header_layout.addStretch() header_actions = QHBoxLayout() self.theme_toggle = CheckBox("深色模式") self.theme_toggle.stateChanged.connect(self.toggle_theme) header_actions.addWidget(self.theme_toggle) self.execute_btn = PrimaryPushButton("开始上传") self.execute_btn.clicked.connect(self.execute_task) header_actions.addWidget(self.execute_btn) header_layout.addLayout(header_actions) main_layout.addLayout(header_layout) # 状态卡片区 status_layout = QHBoxLayout() self.status_update_value = QLabel("未更新") self.status_pending_value = QLabel("0") self.status_running_value = QLabel("0") update_card = self._build_status_card( "更新状态", self.status_update_value, self.style().standardIcon(QStyle.SP_BrowserReload), "#e6f4ff", "文件路径扫描" ) pending_card = self._build_status_card( "待执行", self.status_pending_value, self.style().standardIcon(QStyle.SP_FileDialogInfoView), "#fff7ed", "等待处理任务" ) running_card = self._build_status_card( "执行中", self.status_running_value, self.style().standardIcon(QStyle.SP_MediaPlay), "#ecfdf3", "当前执行进度", with_progress=True ) status_layout.addWidget(update_card) status_layout.addWidget(pending_card) status_layout.addWidget(running_card) main_layout.addLayout(status_layout) # 中间内容区(侧边导航 + 页面) content_layout = QHBoxLayout() content_layout.setSpacing(12) main_layout.addLayout(content_layout) nav_card = CardWidget() self.nav_card = nav_card nav_layout = QVBoxLayout(nav_card) nav_layout.setContentsMargins(10, 10, 10, 10) nav_layout.setSpacing(8) nav_card.setFixedWidth(150) nav_card.setStyleSheet(""" QPushButton { text-align: left; padding: 8px 10px; border-radius: 6px; border-left: 3px solid transparent; } QPushButton:hover { background-color: rgba(0, 120, 212, 0.08); } QPushButton:checked { background-color: rgba(0, 120, 212, 0.15); font-weight: 600; border-left: 3px solid #0078D4; } """) self.nav_title = QLabel("导航") self.nav_title.setFont(QFont("Microsoft YaHei", 10, QFont.Bold)) nav_layout.addWidget(self.nav_title) self.nav_divider = QFrame() self.nav_divider.setFrameShape(QFrame.HLine) self.nav_divider.setStyleSheet("color: rgba(0, 0, 0, 0.12);") nav_layout.addWidget(self.nav_divider) self.nav_group = QButtonGroup(self) self.nav_group.setExclusive(True) self.nav_main = PushButton("工作台") self.nav_log = PushButton("日志") nav_items = [ (self.nav_main, QStyle.SP_DesktopIcon), (self.nav_log, QStyle.SP_FileDialogContentsView), ] self.nav_buttons = [] for idx, (btn, icon_type) in enumerate(nav_items): btn.setCheckable(True) # 去掉左侧图标,避免在某些环境下图标位置异常遮挡中文文字 # 如果后续需要图标,可以在这里重新启用并配合样式表单独调整 padding # btn.setIcon(self.style().standardIcon(icon_type)) # btn.setIconSize(QSize(16, 16)) self.nav_group.addButton(btn, idx) nav_layout.addWidget(btn) self.nav_buttons.append(btn) nav_layout.addStretch() nav_footer_row = QHBoxLayout() self.nav_footer = QLabel("v1.0") self.nav_footer.setStyleSheet("color: #999; font-size: 10px;") nav_footer_row.addWidget(self.nav_footer) nav_footer_row.addStretch() self.nav_toggle_btn = PushButton("收起") self.nav_toggle_btn.clicked.connect(self.toggle_nav_compact) nav_footer_row.addWidget(self.nav_toggle_btn) nav_layout.addLayout(nav_footer_row) content_layout.addWidget(nav_card) self.page_stack = QStackedWidget() content_layout.addWidget(self.page_stack) content_layout.setStretch(1, 1) # 配置输入区域 config_group = CardWidget() config_layout = QVBoxLayout(config_group) config_layout.setContentsMargins(12, 12, 12, 12) config_title = QLabel("配置信息") config_title.setFont(QFont("Microsoft YaHei", 11, QFont.Bold)) config_layout.addWidget(config_title) config_group.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Preferred) config_group.setMaximumHeight(420) # Excel导入(合并到配置) import_row = QHBoxLayout() import_row.addWidget(QLabel("Excel文件:")) self.excel_path_input = LineEdit() import_row.addWidget(self.excel_path_input) self.excel_browse_btn = PushButton("浏览") self.excel_browse_btn.clicked.connect(self.browse_excel) import_row.addWidget(self.excel_browse_btn) self.import_btn = PrimaryPushButton("导入") self.import_btn.clicked.connect(self.import_excel) import_row.addWidget(self.import_btn) config_layout.addLayout(import_row) grid = QGridLayout() grid.setHorizontalSpacing(12) grid.setVerticalSpacing(10) self.user_id_input = LineEdit() self.index_input = LineEdit() self.topic_input = LineEdit() self.schedule_datetime = QDateTimeEdit() self.schedule_datetime.setCalendarPopup(True) self.schedule_datetime.setDateTime(QDateTime.currentDateTime()) self.schedule_datetime.setDisplayFormat("yyyy-MM-dd HH:mm") self.url_input = LineEdit() self.executor_input = LineEdit() self.user_id_input.setPlaceholderText("请输入多多ID") self.index_input.setPlaceholderText("请输入序号") self.topic_input.setPlaceholderText("请输入话题") self.url_input.setPlaceholderText("请输入达人链接") self.executor_input.setPlaceholderText("请输入执行人") for input_field in [ self.user_id_input, self.index_input, self.topic_input, self.url_input, self.executor_input ]: input_field.setClearButtonEnabled(True) grid.addWidget(QLabel("多多ID:"), 0, 0) grid.addWidget(self.user_id_input, 0, 1) grid.addWidget(QLabel("序号:"), 0, 2) grid.addWidget(self.index_input, 0, 3) grid.addWidget(QLabel("话题:"), 1, 0) grid.addWidget(self.topic_input, 1, 1) grid.addWidget(QLabel("定时发布:"), 1, 2) grid.addWidget(self.schedule_datetime, 1, 3) grid.addWidget(QLabel("达人链接:"), 2, 0) grid.addWidget(self.url_input, 2, 1) grid.addWidget(QLabel("执行人:"), 2, 2) grid.addWidget(self.executor_input, 2, 3) # 文件夹路径(最外层文件夹) grid.addWidget(QLabel("资料文件夹路径:"), 3, 0) self.folder_path_input = LineEdit() default_path = get_default_folder_path() self.folder_path_input.setPlaceholderText(f"留空则使用默认路径: {default_path}") self.folder_path_input.setClearButtonEnabled(True) grid.addWidget(self.folder_path_input, 3, 1, 1, 2) self.folder_browse_btn = PushButton("浏览") self.folder_browse_btn.clicked.connect(self.browse_folder) grid.addWidget(self.folder_browse_btn, 3, 3) tip_label = QLabel("提示:只需填写最外层文件夹路径,程序会自动查找子文件夹中的文件") tip_label.setStyleSheet("color: #666; font-size: 10px;") grid.addWidget(tip_label, 4, 0, 1, 4) # 更新数据按钮 + 批量上传(同一行) update_row = QHBoxLayout() self.update_data_btn = PrimaryPushButton("更新数据") self.update_data_btn.clicked.connect(self.update_data) update_row.addWidget(self.update_data_btn) self.update_status_label = QLabel("未更新") self.update_status_label.setStyleSheet("color: #666; font-size: 10px;") update_row.addWidget(self.update_status_label) update_row.addStretch() self.batch_upload_checkbox = CheckBox("批量上传(如果文件夹中有多个视频,将使用批量上传模式)") self.batch_upload_checkbox.setChecked(False) update_row.addWidget(self.batch_upload_checkbox) update_row_widget = QWidget() update_row_widget.setLayout(update_row) grid.addWidget(update_row_widget, 6, 0, 1, 4) config_layout.addLayout(grid) config_layout.addStretch() config_page = QWidget() config_page_layout = QVBoxLayout(config_page) config_page_layout.setContentsMargins(0, 0, 0, 0) config_page_layout.setSpacing(12) config_page_layout.addWidget(config_group) # 配置列表表格(如果从Excel导入) self.table_group = CardWidget() table_layout = QVBoxLayout(self.table_group) table_layout.setContentsMargins(12, 12, 12, 12) table_title = QLabel("配置列表(从Excel导入后显示,可直接在表格中编辑)") table_title.setFont(QFont("Microsoft YaHei", 11, QFont.Bold)) table_layout.addWidget(table_title) search_row = QHBoxLayout() self.table_search_input = LineEdit() self.table_search_input.setPlaceholderText("搜索表格(支持空格多关键词)") self.table_search_input.setClearButtonEnabled(True) self.table_search_input.textChanged.connect(self.filter_table) search_row.addWidget(self.table_search_input) self.table_column_filter = QComboBox() self.table_column_filter.currentIndexChanged.connect(lambda: self.filter_table(self.table_search_input.text())) search_row.addWidget(self.table_column_filter) self.table_case_sensitive = CheckBox("区分大小写") self.table_case_sensitive.stateChanged.connect(lambda: self.filter_table(self.table_search_input.text())) search_row.addWidget(self.table_case_sensitive) self.table_regex = CheckBox("正则") self.table_regex.stateChanged.connect(lambda: self.filter_table(self.table_search_input.text())) search_row.addWidget(self.table_regex) self.table_any_term = CheckBox("任意词匹配") self.table_any_term.stateChanged.connect(lambda: self.filter_table(self.table_search_input.text())) search_row.addWidget(self.table_any_term) self.table_highlight = CheckBox("高亮匹配") self.table_highlight.setChecked(True) self.table_highlight.stateChanged.connect(lambda: self.filter_table(self.table_search_input.text())) search_row.addWidget(self.table_highlight) self.table_locate_btn = PushButton("定位") self.table_locate_btn.clicked.connect(self.locate_table) search_row.addWidget(self.table_locate_btn) self.table_only_match = CheckBox("仅显示匹配") self.table_only_match.setChecked(True) self.table_only_match.stateChanged.connect(lambda: self.filter_table(self.table_search_input.text())) search_row.addWidget(self.table_only_match) self.table_clear_btn = PushButton("清空筛选") self.table_clear_btn.clicked.connect(lambda: self.table_search_input.setText("")) search_row.addWidget(self.table_clear_btn) self.table_filter_status = QLabel("显示: 0/0") self.table_filter_status.setStyleSheet("color: #666; font-size: 10px;") search_row.addWidget(self.table_filter_status) self.table_match_selector = QComboBox() self.table_match_selector.setMinimumWidth(180) self.table_match_selector.currentIndexChanged.connect(self.jump_to_table_match) search_row.addWidget(self.table_match_selector) self.table_prev_match_btn = PushButton("上一条") self.table_prev_match_btn.clicked.connect(self.prev_table_match) search_row.addWidget(self.table_prev_match_btn) self.table_next_match_btn = PushButton("下一条") self.table_next_match_btn.clicked.connect(self.next_table_match) search_row.addWidget(self.table_next_match_btn) self.table_export_all_btn = PushButton("导出全部") self.table_export_all_btn.clicked.connect(self.export_all_rows) search_row.addWidget(self.table_export_all_btn) self.table_select_count = QLabel("已选: 0") self.table_select_count.setStyleSheet("color: #666; font-size: 10px;") search_row.addWidget(self.table_select_count) table_layout.addLayout(search_row) self.config_table = TableWidget() self.config_table.setStyleSheet(""" QTableView::item:selected { background-color: rgba(0, 120, 212, 0.18); } QTableView::item:hover { background-color: rgba(0, 120, 212, 0.08); } QHeaderView::section { background-color: #1f2937; color: #ffffff; padding: 8px; border: none; } QHeaderView::section:hover { background-color: #374151; } """) self.config_table.setColumnCount(11) self.config_table.setHorizontalHeaderLabels([ '多多ID', '序号', '话题', '定时发布', '间隔时间', '达人链接', '执行人', '情况', '文件路径', '进度', '操作' ]) self.table_column_filter.addItem("全部列") for col in range(9): header = self.config_table.horizontalHeaderItem(col) if header: self.table_column_filter.addItem(header.text()) header = self.config_table.horizontalHeader() header.setStretchLastSection(False) header.setSectionResizeMode(QHeaderView.Interactive) self.config_table.setAlternatingRowColors(True) self.config_table.setSortingEnabled(True) self.config_table.horizontalHeader().setSortIndicatorShown(True) self.config_table.verticalHeader().setVisible(False) self.config_table.verticalHeader().setDefaultSectionSize(42) # 设置表格可编辑 self.config_table.setEditTriggers(TableWidget.AllEditTriggers) self.config_table.setSelectionBehavior(QAbstractItemView.SelectRows) self.config_table.setSelectionMode(QAbstractItemView.ExtendedSelection) self.config_table.setContextMenuPolicy(Qt.CustomContextMenu) self.config_table.customContextMenuRequested.connect(self.show_table_context_menu) self.config_table.itemChanged.connect(self.on_table_item_changed) self.config_table.horizontalHeader().sectionClicked.connect(self.on_header_clicked) table_layout.addWidget(self.config_table) # 大数据模式表格(Model/View) self.table_view = QTableView() self.table_view.setAlternatingRowColors(True) self.table_view.setSortingEnabled(True) self.table_view.verticalHeader().setVisible(False) self.table_view.setSelectionBehavior(QAbstractItemView.SelectRows) self.table_view.setSelectionMode(QAbstractItemView.ExtendedSelection) self.table_view.setContextMenuPolicy(Qt.CustomContextMenu) self.table_view.customContextMenuRequested.connect(self.show_table_context_menu) self.table_view.setStyleSheet(""" QTableView::item:selected { background-color: rgba(0, 120, 212, 0.18); } QTableView::item:hover { background-color: rgba(0, 120, 212, 0.08); } QHeaderView::section { background-color: #1f2937; color: #ffffff; padding: 8px; border: none; } QHeaderView::section:hover { background-color: #374151; } """) self.table_view.setVisible(False) table_layout.addWidget(self.table_view) self.table_empty_label = QLabel("暂无数据,请先导入Excel配置") self.table_empty_label.setStyleSheet("color: #999; font-size: 12px;") self.table_empty_label.setAlignment(Qt.AlignCenter) self.table_empty_label.setVisible(True) table_layout.addWidget(self.table_empty_label) pagination_row = QHBoxLayout() self.table_select_all = CheckBox("全选当前页") self.table_select_all.stateChanged.connect(self.toggle_select_all_rows) pagination_row.addWidget(self.table_select_all) pagination_row.addStretch() self.page_size_combo = QComboBox() self.page_size_combo.addItems(["10", "20", "50", "100"]) self.page_size_combo.setCurrentText(str(self.page_size)) self.page_size_combo.currentTextChanged.connect(self.change_page_size) pagination_row.addWidget(QLabel("每页")) pagination_row.addWidget(self.page_size_combo) self.page_info_label = QLabel("第 1/1 页") pagination_row.addWidget(self.page_info_label) self.page_first_btn = PushButton("首页") self.page_prev_btn = PushButton("上一页") self.page_next_btn = PushButton("下一页") self.page_last_btn = PushButton("末页") self.page_first_btn.clicked.connect(self.go_first_page) self.page_prev_btn.clicked.connect(self.go_prev_page) self.page_next_btn.clicked.connect(self.go_next_page) self.page_last_btn.clicked.connect(self.go_last_page) pagination_row.addWidget(self.page_first_btn) pagination_row.addWidget(self.page_prev_btn) pagination_row.addWidget(self.page_next_btn) pagination_row.addWidget(self.page_last_btn) table_layout.addLayout(pagination_row) self.table_group.setVisible(True) self.table_group.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) # 配置 + 列表 分割布局 self.config_splitter = QSplitter(Qt.Vertical) self.config_splitter.setChildrenCollapsible(True) self.config_splitter.addWidget(config_group) self.config_splitter.addWidget(self.table_group) self.config_splitter.setStretchFactor(0, 2) self.config_splitter.setStretchFactor(1, 3) self.config_splitter.splitterMoved.connect(self._save_splitter_sizes) config_page_layout.addWidget(self.config_splitter) self.page_stack.addWidget(config_page) # 工作台标签页 self.workbench_tabs = QTabWidget() self.workbench_tabs.addTab(config_page, "配置") workbench_page = QWidget() workbench_layout = QVBoxLayout(workbench_page) workbench_layout.setContentsMargins(0, 0, 0, 0) workbench_layout.addWidget(self.workbench_tabs) self.page_stack.addWidget(workbench_page) # 进度条(全局) self.progress_bar = ProgressBar() self.progress_bar.setVisible(False) main_layout.addWidget(self.progress_bar) # 日志显示区域 log_group = CardWidget() log_layout = QVBoxLayout(log_group) log_layout.setContentsMargins(12, 12, 12, 12) log_header = QHBoxLayout() log_title = QLabel("执行日志") log_title.setFont(QFont("Microsoft YaHei", 11, QFont.Bold)) log_header.addWidget(log_title) log_header.addStretch() self.log_search_input = LineEdit() self.log_search_input.setPlaceholderText("搜索日志") self.log_search_input.setClearButtonEnabled(True) self.log_search_input.textChanged.connect(self.filter_log) log_header.addWidget(self.log_search_input) self.log_highlight_check = CheckBox("高亮所有") self.log_highlight_check.setChecked(True) self.log_highlight_check.stateChanged.connect(lambda: self.filter_log(self.log_search_input.text())) log_header.addWidget(self.log_highlight_check) self.log_case_sensitive = CheckBox("区分大小写") self.log_case_sensitive.stateChanged.connect(lambda: self.filter_log(self.log_search_input.text())) log_header.addWidget(self.log_case_sensitive) self.log_whole_word = CheckBox("整词匹配") self.log_whole_word.stateChanged.connect(lambda: self.filter_log(self.log_search_input.text())) log_header.addWidget(self.log_whole_word) self.log_regex = CheckBox("正则") self.log_regex.stateChanged.connect(lambda: self.filter_log(self.log_search_input.text())) log_header.addWidget(self.log_regex) self.log_prev_btn = PushButton("上一个") self.log_prev_btn.clicked.connect(lambda: self.find_log(backward=True)) log_header.addWidget(self.log_prev_btn) self.log_next_btn = PushButton("下一个") self.log_next_btn.clicked.connect(lambda: self.find_log(backward=False)) log_header.addWidget(self.log_next_btn) self.log_match_status = QLabel("匹配: 0") self.log_match_status.setStyleSheet("color: #666; font-size: 10px;") log_header.addWidget(self.log_match_status) self.log_match_selector = QComboBox() self.log_match_selector.setMinimumWidth(160) self.log_match_selector.currentIndexChanged.connect(self.jump_to_log_match) log_header.addWidget(self.log_match_selector) self.log_export_btn = PushButton("导出日志") self.log_export_btn.clicked.connect(self.export_log) log_header.addWidget(self.log_export_btn) self.clear_log_btn = PushButton("清空日志") self.clear_log_btn.clicked.connect(self.clear_log) log_header.addWidget(self.clear_log_btn) log_layout.addLayout(log_header) self.log_text = TextEdit() self.log_text.setReadOnly(True) self.log_text.setFont(QFont("Consolas", 10)) log_layout.addWidget(self.log_text) log_page = QWidget() log_page_layout = QVBoxLayout(log_page) log_page_layout.setContentsMargins(0, 0, 0, 0) log_page_layout.addWidget(log_group) self.page_stack.addWidget(log_page) # 配置日志输出(保留控制台输出,GUI通过信号接收) logger.remove() logger.add(lambda msg: None) # 禁用默认输出,通过信号在GUI中显示 # 默认选中工作台 self.nav_main.setChecked(True) self.page_stack.setCurrentIndex(0) self.nav_group.buttonClicked[int].connect(self.switch_page) self._restore_splitter_sizes() # 快捷键 self.shortcut_log_next = QShortcut(QKeySequence("F3"), self) self.shortcut_log_next.activated.connect(lambda: self.find_log(backward=False)) self.shortcut_log_prev = QShortcut(QKeySequence("Shift+F3"), self) self.shortcut_log_prev.activated.connect(lambda: self.find_log(backward=True)) self.shortcut_table_next = QShortcut(QKeySequence("Ctrl+F3"), self) self.shortcut_table_next.activated.connect(self.next_table_match) self.shortcut_table_prev = QShortcut(QKeySequence("Ctrl+Shift+F3"), self) self.shortcut_table_prev.activated.connect(self.prev_table_match) def _build_status_card(self, title, value_label, icon, bg_color, subtitle, with_progress=False): """创建状态卡片""" card = CardWidget() layout = QVBoxLayout(card) layout.setContentsMargins(12, 10, 12, 10) card.setStyleSheet(f"background-color: {bg_color};") icon_label = QLabel() icon_label.setPixmap(icon.pixmap(16, 16)) title_row = QHBoxLayout() title_label = QLabel(title) title_label.setFont(QFont("Microsoft YaHei", 9)) title_row.addWidget(icon_label) title_row.addWidget(title_label) title_row.addStretch() subtitle_label = QLabel(subtitle) subtitle_label.setStyleSheet("color: #666; font-size: 10px;") value_label.setFont(QFont("Microsoft YaHei", 12, QFont.Bold)) layout.addLayout(title_row) layout.addWidget(value_label) layout.addWidget(subtitle_label) if with_progress: self.status_running_progress = ProgressBar() self.status_running_progress.setRange(0, 100) self.status_running_progress.setValue(0) self.status_running_progress.setTextVisible(False) self.status_running_progress.setFixedHeight(6) layout.addWidget(self.status_running_progress) return card def switch_page(self, page_index): """切换侧边导航页面""" self.page_stack.setCurrentIndex(page_index) def on_table_item_changed(self, item): """表格内容变更回调""" if self.is_updating_table: return if item.column() == 7: self._set_status_item(item.row(), item.text()) self._set_progress_item(item.row(), item.text()) self._sync_config_from_row(item.row()) def _set_status_item(self, row, text): """设置状态列图标与文本""" item = QTableWidgetItem(text) if "完成" in text or "成功" in text: item.setIcon(self.style().standardIcon(QStyle.SP_DialogApplyButton)) elif "失败" in text or "错误" in text: item.setIcon(self.style().standardIcon(QStyle.SP_MessageBoxCritical)) elif "执行中" in text or "进行" in text: item.setIcon(self.style().standardIcon(QStyle.SP_MediaPlay)) elif "待" in text: item.setIcon(self.style().standardIcon(QStyle.SP_MessageBoxInformation)) else: item.setIcon(self.style().standardIcon(QStyle.SP_FileDialogInfoView)) self.config_table.setItem(row, 7, item) def _set_progress_item(self, row, status_text): """设置进度列""" progress = ProgressBar() progress.setTextVisible(False) progress.setFixedHeight(8) progress.setRange(0, 100) value = 0 if "完成" in status_text or "成功" in status_text: value = 100 elif "执行中" in status_text or "进行" in status_text: value = 60 elif "待" in status_text: value = 10 progress.setValue(value) self.config_table.setCellWidget(row, 9, progress) def _set_action_buttons(self, row, config_index): """设置操作列按钮""" wrapper = QWidget() layout = QHBoxLayout(wrapper) layout.setContentsMargins(4, 0, 4, 0) edit_btn = PushButton("编辑") delete_btn = PushButton("删除") edit_btn.clicked.connect(lambda: self.config_table.setCurrentCell(row, 0)) delete_btn.clicked.connect(lambda: self.delete_row_by_index(config_index)) layout.addWidget(edit_btn) layout.addWidget(delete_btn) self.config_table.setCellWidget(row, 10, wrapper) def delete_row_by_index(self, row): """删除指定行""" if row < 0 or row >= len(self.configs): return self.configs.pop(row) self.update_table() def _sync_config_from_row(self, row): """同步表格行到configs""" if row < 0 or row >= self.config_table.rowCount(): return if not self.page_row_indices: return if row >= len(self.page_row_indices): return config_index = self.page_row_indices[row] if config_index >= len(self.configs): return def cell(col): item = self.config_table.item(row, col) return item.text().strip() if item else "" self.configs[config_index].update({ "多多id": cell(0), "序号": cell(1), "话题": cell(2), "定时发布": cell(3), "间隔时间": cell(4), "达人链接": cell(5), "执行人": cell(6), "情况": cell(7) or "待执行", "文件路径": cell(8), }) def _sync_configs_from_table(self): """从表格同步全部配置""" configs = [] for row in range(self.config_table.rowCount()): def cell(col): item = self.config_table.item(row, col) return item.text().strip() if item else "" configs.append({ "多多id": cell(0), "序号": cell(1), "话题": cell(2), "定时发布": cell(3), "间隔时间": cell(4), "达人链接": cell(5), "执行人": cell(6), "情况": cell(7) or "待执行", "文件路径": cell(8), }) self.configs = configs def update_table_selection_count(self): """更新已选行数量""" if not hasattr(self, "table_select_count"): return if self.use_model_view and self.table_view.selectionModel(): rows = set(idx.row() for idx in self.table_view.selectionModel().selectedRows()) self.table_select_count.setText(f"已选: {len(rows)}") return rows = set() for r in self.config_table.selectedRanges(): rows.update(range(r.topRow(), r.bottomRow() + 1)) self.table_select_count.setText(f"已选: {len(rows)}") def _save_splitter_sizes(self): """保存分割器尺寸""" if not hasattr(self, "config_splitter"): return settings = QSettings("haha", "gui_app") settings.setValue("config_splitter_sizes", self.config_splitter.sizes()) def _restore_splitter_sizes(self): """恢复分割器尺寸""" if not hasattr(self, "config_splitter"): return settings = QSettings("haha", "gui_app") sizes = settings.value("config_splitter_sizes") if sizes: self.config_splitter.setSizes([int(s) for s in sizes]) else: # 首次默认:配置区偏大 self.config_splitter.setSizes([450, 550]) def toggle_select_all_rows(self): """全选/取消全选当前页""" if self.table_select_all.isChecked(): self.config_table.selectAll() else: self.config_table.clearSelection() def change_page_size(self, value): """修改分页大小""" try: self.page_size = int(value) except ValueError: self.page_size = 20 self.current_page = 1 self.update_table() def go_first_page(self): """首页""" self.current_page = 1 self.update_table() def go_prev_page(self): """上一页""" if self.current_page > 1: self.current_page -= 1 self.update_table() def go_next_page(self): """下一页""" total_rows = len(self.configs) total_pages = max(1, (total_rows + self.page_size - 1) // self.page_size) if self.current_page < total_pages: self.current_page += 1 self.update_table() def go_last_page(self): """末页""" total_rows = len(self.configs) total_pages = max(1, (total_rows + self.page_size - 1) // self.page_size) self.current_page = total_pages self.update_table() def on_header_clicked(self, logical_index): """处理多列排序(Ctrl 多选)""" if logical_index >= 9: return modifiers = QApplication.keyboardModifiers() is_multi = modifiers & Qt.ControlModifier existing = next((i for i, (col, _) in enumerate(self.table_sort_keys) if col == logical_index), None) if existing is not None: col, order = self.table_sort_keys[existing] new_order = Qt.DescendingOrder if order == Qt.AscendingOrder else Qt.AscendingOrder self.table_sort_keys[existing] = (col, new_order) else: if not is_multi: self.table_sort_keys = [] self.table_sort_keys.append((logical_index, Qt.AscendingOrder)) self._sort_table_by_keys() def _sort_table_by_keys(self): """按多列排序表格""" if not self.table_sort_keys: return self._sync_configs_from_table() key_map = { 0: "多多id", 1: "序号", 2: "话题", 3: "定时发布", 4: "间隔时间", 5: "达人链接", 6: "执行人", 7: "情况", 8: "文件路径", } for col, order in reversed(self.table_sort_keys): field = key_map.get(col, "") self.configs.sort( key=lambda cfg: str(cfg.get(field, "")), reverse=(order == Qt.DescendingOrder) ) self.update_table() def show_table_context_menu(self, pos): """表格右键菜单""" menu = QMenu(self) copy_action = menu.addAction("复制选中行") export_csv_action = menu.addAction("导出选中行CSV") export_excel_action = menu.addAction("导出选中行Excel") delete_action = menu.addAction("删除选中行") view = self.table_view if self.use_model_view else self.config_table action = menu.exec_(view.viewport().mapToGlobal(pos)) if action == copy_action: self.copy_selected_rows() elif action == delete_action: self.delete_selected_rows() elif action == export_csv_action: self.export_selected_rows("csv") elif action == export_excel_action: self.export_selected_rows("excel") def copy_selected_rows(self): """复制选中行到剪贴板""" if self.use_model_view: if not self.table_view.selectionModel(): return rows = [idx.row() for idx in self.table_view.selectionModel().selectedRows()] if not rows: return lines = [] for row in sorted(set(rows)): values = [] for col in range(9): idx = self.table_proxy.index(row, col) values.append(str(self.table_proxy.data(idx))) lines.append("\t".join(values)) QApplication.clipboard().setText("\n".join(lines)) return ranges = self.config_table.selectedRanges() if not ranges: return rows = set() for r in ranges: rows.update(range(r.topRow(), r.bottomRow() + 1)) lines = [] for row in sorted(rows): values = [] for col in range(self.config_table.columnCount()): item = self.config_table.item(row, col) values.append(item.text() if item else "") lines.append("\t".join(values)) QApplication.clipboard().setText("\n".join(lines)) def delete_selected_rows(self): """删除选中行""" if self.use_model_view and self.table_view.selectionModel(): rows = sorted({idx.row() for idx in self.table_view.selectionModel().selectedRows()}, reverse=True) if not rows: return for row in rows: src_index = self.table_proxy.mapToSource(self.table_proxy.index(row, 0)) if src_index.isValid(): self.table_model.removeRows(src_index.row(), 1) self.update_table_selection_count() return ranges = self.config_table.selectedRanges() if not ranges: return rows = set() for r in ranges: rows.update(range(r.topRow(), r.bottomRow() + 1)) for row in sorted(rows, reverse=True): self.config_table.removeRow(row) if row < len(self.configs): self.configs.pop(row) self.update_table() def export_selected_rows(self, fmt): """导出选中行""" if self.use_model_view: if not self.table_view.selectionModel(): self._show_infobar("warning", "提示", "未选择任何行") return rows = [idx.row() for idx in self.table_view.selectionModel().selectedRows()] if not rows: self._show_infobar("warning", "提示", "未选择任何行") return data = [] headers = [self.table_proxy.headerData(i, Qt.Horizontal) for i in range(9)] for row in sorted(set(rows)): row_data = [] for col in range(9): idx = self.table_proxy.index(row, col) row_data.append(str(self.table_proxy.data(idx))) data.append(row_data) df = pd.DataFrame(data, columns=headers) if fmt == "csv": file_path, _ = QFileDialog.getSaveFileName(self, "导出CSV", "selected.csv", "CSV (*.csv)") if not file_path: return df.to_csv(file_path, index=False, encoding="utf-8-sig") self._show_infobar("success", "成功", f"已导出CSV: {file_path}") else: file_path, _ = QFileDialog.getSaveFileName(self, "导出Excel", "selected.xlsx", "Excel (*.xlsx)") if not file_path: return try: df.to_excel(file_path, index=False) self._show_infobar("success", "成功", f"已导出Excel: {file_path}") except Exception as e: self._show_infobar("error", "错误", f"导出失败: {str(e)}") return ranges = self.config_table.selectedRanges() if not ranges: self._show_infobar("warning", "提示", "未选择任何行") return rows = set() for r in ranges: rows.update(range(r.topRow(), r.bottomRow() + 1)) data = [] headers = [self.config_table.horizontalHeaderItem(i).text() for i in range(9)] for row in sorted(rows): if not self.page_row_indices or row >= len(self.page_row_indices): continue config_index = self.page_row_indices[row] row_data = [] for col in range(9): item = self.config_table.item(row, col) row_data.append(item.text() if item else "") data.append(row_data) df = pd.DataFrame(data, columns=headers) if fmt == "csv": file_path, _ = QFileDialog.getSaveFileName(self, "导出CSV", "selected.csv", "CSV (*.csv)") if not file_path: return df.to_csv(file_path, index=False, encoding="utf-8-sig") self._show_infobar("success", "成功", f"已导出CSV: {file_path}") else: file_path, _ = QFileDialog.getSaveFileName(self, "导出Excel", "selected.xlsx", "Excel (*.xlsx)") if not file_path: return try: df.to_excel(file_path, index=False) self._show_infobar("success", "成功", f"已导出Excel: {file_path}") except Exception as e: self._show_infobar("error", "错误", f"导出失败: {str(e)}") def export_all_rows(self): """导出全部数据""" if self.config_table.rowCount() == 0: self._show_infobar("warning", "提示", "没有可导出的数据") return if self.use_model_view and self.table_proxy: total_rows = self.table_proxy.rowCount() headers = [self.table_proxy.headerData(i, Qt.Horizontal) for i in range(9)] data = [] for row in range(total_rows): row_data = [] for col in range(9): idx = self.table_proxy.index(row, col) row_data.append(str(self.table_proxy.data(idx))) data.append(row_data) df = pd.DataFrame(data, columns=headers) file_path, selected_filter = QFileDialog.getSaveFileName( self, "导出数据", "all.csv", "CSV (*.csv);;Excel (*.xlsx)" ) if not file_path: return try: if file_path.lower().endswith(".xlsx") or "Excel" in selected_filter: df.to_excel(file_path, index=False) else: df.to_csv(file_path, index=False, encoding="utf-8-sig") self._show_infobar("success", "成功", f"已导出: {file_path}") except Exception as e: self._show_infobar("error", "错误", f"导出失败: {str(e)}") return headers = [self.config_table.horizontalHeaderItem(i).text() for i in range(9)] data = [] for row in range(self.config_table.rowCount()): row_data = [] for col in range(9): item = self.config_table.item(row, col) row_data.append(item.text() if item else "") data.append(row_data) df = pd.DataFrame(data, columns=headers) file_path, selected_filter = QFileDialog.getSaveFileName( self, "导出数据", "all.csv", "CSV (*.csv);;Excel (*.xlsx)" ) if not file_path: return try: if file_path.lower().endswith(".xlsx") or "Excel" in selected_filter: df.to_excel(file_path, index=False) else: df.to_csv(file_path, index=False, encoding="utf-8-sig") self._show_infobar("success", "成功", f"已导出: {file_path}") except Exception as e: self._show_infobar("error", "错误", f"导出失败: {str(e)}") def toggle_nav_compact(self): """切换侧边导航收起/展开""" self.nav_compact = not self.nav_compact if self.nav_compact: self.nav_card.setFixedWidth(60) for btn in self.nav_buttons: btn.setToolTip(btn.text()) btn.setText("") self.nav_toggle_btn.setText("展开") self.nav_title.setVisible(False) self.nav_divider.setVisible(False) self.nav_footer.setVisible(False) else: self.nav_card.setFixedWidth(150) labels = ["工作台", "日志"] for btn, label in zip(self.nav_buttons, labels): btn.setText(label) btn.setToolTip("") self.nav_toggle_btn.setText("收起") self.nav_title.setVisible(True) self.nav_divider.setVisible(True) self.nav_footer.setVisible(True) def filter_table(self, text): """筛选表格行并高亮关键词""" keyword_raw = text.strip() if self.use_model_view: if not self.table_proxy: return if not keyword_raw: self.table_proxy.setFilterRegularExpression(QRegularExpression()) if hasattr(self, "table_filter_status"): total_rows = self.table_proxy.rowCount() self.table_filter_status.setText(f"显示: {total_rows}/{total_rows} | 命中: 0") return regex_enabled = self.table_regex.isChecked() any_term = self.table_any_term.isChecked() column_index = self.table_column_filter.currentIndex() - 1 self.table_proxy.setFilterKeyColumn(column_index if column_index >= 0 else -1) if regex_enabled: pattern = keyword_raw else: terms = [re.escape(t) for t in keyword_raw.split() if t] if not terms: pattern = "" elif any_term: pattern = "|".join(terms) else: pattern = "".join([f"(?=.*{t})" for t in terms]) + ".*" regex = QRegularExpression(pattern) if not self.table_case_sensitive.isChecked(): regex.setPatternOptions(QRegularExpression.CaseInsensitiveOption) self.table_proxy.setFilterRegularExpression(regex) if hasattr(self, "table_filter_status"): self.table_filter_status.setText( f"显示: {self.table_proxy.rowCount()}/{self.table_model.rowCount()} | 命中: 0") return if not self.config_table or self.config_table.rowCount() == 0: if hasattr(self, "table_filter_status"): self.table_filter_status.setText("显示: 0/0") self._refresh_table_match_selector([]) return if not keyword_raw: # 清空筛选 for row in range(self.config_table.rowCount()): for col in range(self.config_table.columnCount()): item = self.config_table.item(row, col) if item: item.setBackground(self._default_color()) self.config_table.setRowHidden(row, False) if hasattr(self, "table_filter_status"): total_rows = self.config_table.rowCount() self.table_filter_status.setText(f"显示: {total_rows}/{total_rows} | 命中: 0") self._refresh_table_match_selector([]) return terms_raw = [t for t in keyword_raw.split() if t] keyword = keyword_raw if self.table_case_sensitive.isChecked() else keyword_raw.lower() column_index = self.table_column_filter.currentIndex() - 1 visible_count = 0 match_count = 0 matched_rows = [] regex_enabled = self.table_regex.isChecked() any_term = self.table_any_term.isChecked() pattern = None if keyword and regex_enabled: flags = 0 if self.table_case_sensitive.isChecked() else re.IGNORECASE try: pattern = re.compile(keyword_raw, flags) except re.error: self._show_infobar("warning", "提示", "正则表达式无效") return for row in range(self.config_table.rowCount()): match = False for col in range(self.config_table.columnCount()): if column_index >= 0 and col != column_index: item = self.config_table.item(row, col) if item: item.setBackground(self._default_color()) continue item = self.config_table.item(row, col) if item: cell_text = item.text() cell_compare = cell_text if self.table_case_sensitive.isChecked() else cell_text.lower() if keyword: if regex_enabled and pattern: if pattern.search(cell_text): match = True if self.table_highlight.isChecked(): item.setBackground(self._highlight_color()) match_count += 1 else: item.setBackground(self._default_color()) else: terms = terms_raw if self.table_case_sensitive.isChecked() else [t.lower() for t in terms_raw] term_hit = any(term in cell_compare for term in terms) if any_term else all( term in cell_compare for term in terms ) if term_hit: match = True if self.table_highlight.isChecked(): item.setBackground(self._highlight_color()) match_count += 1 else: item.setBackground(self._default_color()) else: item.setBackground(self._default_color()) else: continue only_match = self.table_only_match.isChecked() self.config_table.setRowHidden(row, (not match) if (keyword and only_match) else False) if keyword: if match: visible_count += 1 matched_rows.append(row) else: visible_count = self.config_table.rowCount() if hasattr(self, "table_filter_status"): self.table_filter_status.setText( f"显示: {visible_count}/{self.config_table.rowCount()} | 命中: {match_count}" ) self._refresh_table_match_selector(matched_rows) def locate_table(self): """快速定位匹配行""" keyword_raw = self.table_search_input.text().strip() if not keyword_raw: return terms_raw = [t for t in keyword_raw.split() if t] keyword = keyword_raw if self.table_case_sensitive.isChecked() else keyword_raw.lower() column_index = self.table_column_filter.currentIndex() - 1 regex_enabled = self.table_regex.isChecked() pattern = None if keyword and regex_enabled: flags = 0 if self.table_case_sensitive.isChecked() else re.IGNORECASE try: pattern = re.compile(keyword_raw, flags) except re.error: self._show_infobar("warning", "提示", "正则表达式无效") return for row in range(self.config_table.rowCount()): for col in range(self.config_table.columnCount()): if column_index >= 0 and col != column_index: continue item = self.config_table.item(row, col) if item: cell_text = item.text() cell_compare = cell_text if self.table_case_sensitive.isChecked() else cell_text.lower() if regex_enabled and pattern: if pattern.search(cell_text): self.config_table.setCurrentItem(item) self.config_table.scrollToItem(item) return else: terms = terms_raw if self.table_case_sensitive.isChecked() else [t.lower() for t in terms_raw] term_hit = any( term in cell_compare for term in terms) if self.table_any_term.isChecked() else all( term in cell_compare for term in terms ) if term_hit: self.config_table.setCurrentItem(item) self.config_table.scrollToItem(item) return self._show_infobar("warning", "提示", "未找到匹配内容") def _apply_schedule_intervals(self, configs_with_rows): """按多多ID应用定时发布+间隔时间规则""" from collections import defaultdict grouped = defaultdict(list) for item in configs_with_rows: config = item["config"] user_id = config.get("多多id", "") index = str(config.get("序号", "")).strip() if not user_id: continue grouped[(user_id, index)].append(item) for _, items in grouped.items(): items.sort(key=lambda x: x["row_idx"]) base_time = None interval_seconds = 0 for entry in items: config = entry["config"] row_idx = entry["row_idx"] schedule_text = (config.get("定时发布") or "").strip() interval_value = config.get("间隔时间", 0) if schedule_text: parsed_time = self._parse_schedule_time(schedule_text) interval_seconds = self._parse_interval_seconds(interval_value) if parsed_time and interval_seconds > 0: base_time = parsed_time else: base_time = None interval_seconds = 0 continue if base_time and interval_seconds > 0: base_time = base_time + pd.to_timedelta(interval_seconds, unit="s") new_text = self._format_schedule_time(base_time) config["定时发布"] = new_text self._update_table_cell(row_idx, 3, new_text, highlight=True) def _parse_schedule_time(self, text): """解析定时发布时间字符串""" if not text: return None try: dt = pd.to_datetime(text, errors="coerce") if pd.isna(dt): return None return dt.to_pydatetime() except Exception: return None def _parse_interval_seconds(self, interval_value): """解析间隔时间,支持秒/分钟/小时(如: 30, 10m, 2h, 10分钟, 2小时)""" if interval_value is None: return 0 value_str = str(interval_value).strip().lower() if not value_str: return 0 # 中文单位映射 cn_map = { "秒": "s", "分钟": "m", "分": "m", "小时": "h", "时": "h" } for cn, en in cn_map.items(): if value_str.endswith(cn): value_str = value_str[:-len(cn)] + en break try: # 纯数字,默认秒 if value_str.isdigit(): return int(value_str) # 支持 10m / 2h / 30s unit = value_str[-1] num_part = value_str[:-1].strip() if not num_part: return 0 num = float(num_part) if unit == "m": return int(num * 60) if unit == "h": return int(num * 3600) if unit == "s": return int(num) except Exception: return 0 return 0 def _format_schedule_time(self, dt): """格式化定时发布时间字符串""" if not dt: return "" return dt.strftime("%Y-%m-%d %H:%M:%S") def _update_table_cell(self, row, col, value, highlight=False): """更新表格单元格并同步configs""" item = QTableWidgetItem(str(value)) if highlight: item.setBackground(QColor("#E6F4FF")) self.config_table.setItem(row, col, item) if row < len(self.configs): self.configs[row]["定时发布"] = str(value) def _refresh_table_match_selector(self, rows): """更新表格匹配列表""" self.table_match_rows = rows self.table_match_index = -1 if not hasattr(self, "table_match_selector"): return self.table_match_selector.blockSignals(True) self.table_match_selector.clear() self.table_match_selector.addItem(f"匹配列表({len(rows)})") for row in rows[:200]: self.table_match_selector.addItem(self._build_table_match_label(row)) self.table_match_selector.setCurrentIndex(0) self.table_match_selector.blockSignals(False) def _build_table_match_label(self, row): """构建表格匹配项显示文本""" def cell_text(col): item = self.config_table.item(row, col) return item.text().strip() if item else "" user_id = cell_text(0) index = cell_text(1) topic = cell_text(2) label = " | ".join([v for v in [user_id, index, topic] if v]) if not label: for col in range(self.config_table.columnCount()): value = cell_text(col) if value: label = value break if len(label) > 60: label = label[:57] + "..." return f"R{row + 1}: {label}" def jump_to_table_match(self, index): """跳转到表格匹配行""" if index <= 0: return actual_index = index - 1 if actual_index >= len(self.table_match_rows): return row = self.table_match_rows[actual_index] self.table_match_index = actual_index self.config_table.selectRow(row) item = self.config_table.item(row, 0) or self.config_table.item(row, 1) if item: self.config_table.scrollToItem(item) def next_table_match(self): """跳转到下一条匹配""" if not self.table_match_rows: return if self.table_match_index < 0: self.table_match_index = 0 else: self.table_match_index = (self.table_match_index + 1) % len(self.table_match_rows) self.table_match_selector.setCurrentIndex(self.table_match_index + 1) def prev_table_match(self): """跳转到上一条匹配""" if not self.table_match_rows: return if self.table_match_index < 0: self.table_match_index = len(self.table_match_rows) - 1 else: self.table_match_index = (self.table_match_index - 1) % len(self.table_match_rows) self.table_match_selector.setCurrentIndex(self.table_match_index + 1) def _highlight_color(self): """高亮颜色""" return self.config_table.palette().color(self.config_table.palette().Highlight).lighter(160) def _default_color(self): """默认背景色""" return self.config_table.palette().color(self.config_table.palette().Base) def _show_all_rows(self): """显示全部行""" for row in range(self.config_table.rowCount()): self.config_table.setRowHidden(row, False) def set_status_cards(self, update_text=None, pending=None, running=None): """更新状态卡片显示""" if update_text is not None: self.status_update_value.setText(update_text) if "未找到" in update_text: self.status_update_value.setStyleSheet("color: #b42318;") elif "未更新" in update_text: self.status_update_value.setStyleSheet("color: #6b7280;") else: self.status_update_value.setStyleSheet("color: #1d4ed8;") if pending is not None: self.status_pending_value.setText(str(pending)) try: pending_num = int(str(pending)) except ValueError: pending_num = 0 if pending_num > 0: self.status_pending_value.setStyleSheet("color: #b45309;") else: self.status_pending_value.setStyleSheet("color: #15803d;") if running is not None: self.status_running_value.setText(str(running)) running_text = str(running) if running_text in ["0", "0/0"]: self.status_running_value.setStyleSheet("color: #6b7280;") else: self.status_running_value.setStyleSheet("color: #1d4ed8;") def set_running_progress(self, done, total): """更新执行中统计""" self.running_done = done self.running_total = total if total > 0: self.set_status_cards(running=f"{done}/{total}") if hasattr(self, "status_running_progress"): percent = int((done / total) * 100) self.status_running_progress.setValue(percent) else: self.set_status_cards(running="0") if hasattr(self, "status_running_progress"): self.status_running_progress.setValue(0) def toggle_theme(self): """切换浅色/深色主题""" if self.theme_toggle.isChecked(): setTheme(Theme.DARK) else: setTheme(Theme.LIGHT) def clear_log(self): """清空日志显示""" self.log_text.clear() def filter_log(self, text): """过滤日志内容""" keyword = text.strip() self._clear_log_highlight() self._update_log_match_status(0) if not keyword: self._refresh_log_match_selector([]) return self._update_log_matches(keyword) if self.log_highlight_check.isChecked(): self._highlight_log_matches() self.find_log(backward=False, reset=True) def export_log(self): """导出日志到文件""" log_content = self.log_text.toPlainText() if not log_content.strip(): self._show_infobar("warning", "提示", "日志为空,无法导出") return file_path, _ = QFileDialog.getSaveFileName( self, "保存日志", "log.txt", "文本文件 (*.txt)" ) if not file_path: return try: with open(file_path, "w", encoding="utf-8") as f: f.write(log_content) self._show_infobar("success", "成功", f"日志已导出: {file_path}") except Exception as e: self._show_infobar("error", "错误", f"导出失败: {str(e)}") def find_log(self, backward=False, reset=False): """查找日志""" keyword = self.log_search_input.text().strip() if not keyword: return if not self.log_match_positions: self._update_log_matches(keyword) if not self.log_match_positions: return if reset: self.log_match_index = 0 if not backward else len(self.log_match_positions) - 1 else: step = -1 if backward else 1 self.log_match_index = (self.log_match_index + step) % len(self.log_match_positions) start, end = self.log_match_positions[self.log_match_index] cursor = self.log_text.textCursor() cursor.setPosition(start) cursor.setPosition(end, QTextCursor.KeepAnchor) self.log_text.setTextCursor(cursor) self.log_text.ensureCursorVisible() def _clear_log_highlight(self): """清除日志高亮""" cursor = self.log_text.textCursor() cursor.select(cursor.Document) cursor.setCharFormat(self.log_text.currentCharFormat()) cursor.clearSelection() def _highlight_log_matches(self): """高亮日志所有匹配项""" if not self.log_match_positions: return fmt = self.log_text.currentCharFormat() fmt.setBackground(self.log_text.palette().color(self.log_text.palette().Highlight).lighter(160)) for start, end in self.log_match_positions: cursor = self.log_text.textCursor() cursor.setPosition(start) cursor.setPosition(end, QTextCursor.KeepAnchor) cursor.mergeCharFormat(fmt) def _update_log_matches(self, keyword): """更新日志匹配位置""" self.log_match_positions = [] self.log_match_index = -1 self.log_match_items = [] if not keyword: self._update_log_match_status(0) self._refresh_log_match_selector([]) return content = self.log_text.toPlainText() use_regex = self.log_regex.isChecked() case_sensitive = self.log_case_sensitive.isChecked() whole_word = self.log_whole_word.isChecked() flags = 0 if case_sensitive else re.IGNORECASE if use_regex: pattern_text = keyword if whole_word: pattern_text = rf"\b(?:{pattern_text})\b" try: pattern = re.compile(pattern_text, flags) except re.error: self._show_infobar("warning", "提示", "日志正则表达式无效") self._update_log_match_status(0) return else: pattern_text = re.escape(keyword) if whole_word: pattern_text = rf"\b{pattern_text}\b" pattern = re.compile(pattern_text, flags) for match in pattern.finditer(content): self.log_match_positions.append((match.start(), match.end())) self.log_match_items.append(self._build_log_match_label(content, match.start(), match.end())) self._update_log_match_status(len(self.log_match_positions)) self._refresh_log_match_selector(self.log_match_items) def _update_log_match_status(self, count): """更新日志匹配统计""" if hasattr(self, "log_match_status"): self.log_match_status.setText(f"匹配: {count}") def _apply_table_column_widths(self): """应用配置表列宽(不随内容变化)""" widths = { 0: 120, # 多多ID 1: 80, # 序号 2: 160, # 话题 3: 160, # 定时发布 4: 110, # 间隔时间 5: 160, # 达人链接 6: 120, # 执行人 7: 100, # 情况 8: 220, # 文件路径 9: 90, # 进度 10: 140 # 操作 } for col, width in widths.items(): if col < self.config_table.columnCount(): self.config_table.setColumnWidth(col, width) def _apply_table_view_column_widths(self): """应用 Model/View 列宽""" widths = { 0: 120, 1: 80, 2: 160, 3: 160, 4: 110, 5: 160, 6: 120, 7: 100, 8: 220, 9: 90, 10: 140 } for col, width in widths.items(): if self.table_view.model() and col < self.table_view.model().columnCount(): self.table_view.setColumnWidth(col, width) def _refresh_log_match_selector(self, items): """更新日志匹配下拉""" if not hasattr(self, "log_match_selector"): return self.log_match_selector.blockSignals(True) self.log_match_selector.clear() self.log_match_selector.addItem(f"匹配列表({len(items)})") for label in items[:200]: self.log_match_selector.addItem(label) self.log_match_selector.setCurrentIndex(0) self.log_match_selector.blockSignals(False) def _build_log_match_label(self, content, start, end): """构建匹配项显示文本""" line_start = content.rfind("\n", 0, start) + 1 line_end = content.find("\n", end) if line_end == -1: line_end = len(content) line_text = content[line_start:line_end].strip() line_no = content.count("\n", 0, start) + 1 if len(line_text) > 60: line_text = line_text[:57] + "..." return f"L{line_no}: {line_text}" def jump_to_log_match(self, index): """跳转到指定匹配""" if index <= 0: return actual_index = index - 1 if actual_index >= len(self.log_match_positions): return start, end = self.log_match_positions[actual_index] self.log_match_index = actual_index cursor = self.log_text.textCursor() cursor.setPosition(start) cursor.setPosition(end, QTextCursor.KeepAnchor) self.log_text.setTextCursor(cursor) self.log_text.ensureCursorVisible() def _show_infobar(self, level, title, content): """显示提示条""" if level == "success": InfoBar.success(title=title, content=content, parent=self, position=InfoBarPosition.TOP_RIGHT) elif level == "warning": InfoBar.warning(title=title, content=content, parent=self, position=InfoBarPosition.TOP_RIGHT) else: InfoBar.error(title=title, content=content, parent=self, position=InfoBarPosition.TOP_RIGHT) def browse_excel(self): """浏览Excel文件""" file_path, _ = QFileDialog.getOpenFileName( self, "选择Excel文件", "", "Excel文件 (*.xlsx *.xls)" ) if file_path: self.excel_path_input.setText(file_path) def browse_folder(self): """浏览文件夹""" folder_path = QFileDialog.getExistingDirectory(self, "选择文件夹") if folder_path: self.folder_path_input.setText(folder_path) def import_excel(self): """导入Excel配置文件""" excel_path = self.excel_path_input.text().strip() if not excel_path: self._show_infobar("warning", "警告", "请先选择Excel文件") return if not os.path.exists(excel_path): self._show_infobar("warning", "警告", f"Excel文件不存在: {excel_path}") return try: # 读取Excel文件,添加更多异常处理 try: df = pd.read_excel(excel_path) except FileNotFoundError: self._show_infobar("error", "错误", f"找不到文件: {excel_path}") logger.error(f"文件不存在: {excel_path}") return except PermissionError: self._show_infobar("error", "错误", f"文件被占用,请关闭Excel文件后重试") logger.error(f"文件被占用: {excel_path}") return except Exception as e: self._show_infobar("error", "错误", f"读取Excel文件失败: {str(e)}") logger.error(f"读取Excel失败: {e}") return if df.empty: self._show_infobar("warning", "警告", "Excel文件为空") return # 检查必需的列 required_columns = ['多多id', '序号', '话题', '定时发布', '间隔时间', '达人链接', '执行人', '情况'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: self._show_infobar( "warning", "警告", f"Excel缺少列: {', '.join(missing_columns)}" ) return # 转换为配置列表,添加异常处理 self.configs = [] for idx, row in df.iterrows(): try: config = { '多多id': str(row.get('多多id', '')).strip() if pd.notna(row.get('多多id')) else '', '序号': str(row.get('序号', '')).strip() if pd.notna(row.get('序号')) else '', '话题': str(row.get('话题', '')).strip() if pd.notna(row.get('话题')) else '', '定时发布': str(row.get('定时发布', '')).strip() if pd.notna(row.get('定时发布')) else '', '间隔时间': int(row.get('间隔时间', 0)) if pd.notna(row.get('间隔时间')) else 0, '达人链接': str(row.get('达人链接', '')).strip() if pd.notna(row.get('达人链接')) else '', '执行人': str(row.get('执行人', '')).strip() if pd.notna(row.get('执行人')) else '', '情况': str(row.get('情况', '待执行')).strip() if pd.notna(row.get('情况')) else '待执行', '文件路径': '' # 文件路径字段初始为空,通过更新数据按钮填充 } self.configs.append(config) except Exception as e: logger.warning(f"处理第 {idx + 1} 行数据时出错: {e}") continue # 跳过有问题的行 if not self.configs: self._show_infobar("warning", "警告", "未能解析出任何有效配置") return # 更新表格显示 try: self.update_table() except Exception as e: self._show_infobar("error", "错误", f"更新表格失败: {str(e)}") logger.error(f"更新表格失败: {e}") return # 显示表格 self.table_group.setVisible(True) self.set_status_cards(update_text="未更新", pending=len(self.configs)) self._show_infobar("success", "成功", f"成功导入 {len(self.configs)} 条配置") except Exception as e: error_msg = f"导入Excel文件失败: {str(e)}" self._show_infobar("error", "错误", error_msg) logger.error(f"导入Excel失败: {e}", exc_info=True) def update_table(self): """更新配置表格""" self.is_updating_table = True total_rows = len(self.configs) if total_rows > 1000: self._setup_model_view() self.is_updating_table = False return self.use_model_view = False if hasattr(self, "table_view"): self.table_view.setVisible(False) if hasattr(self, "config_table"): self.config_table.setVisible(True) if hasattr(self, "page_first_btn"): for btn in [self.page_first_btn, self.page_prev_btn, self.page_next_btn, self.page_last_btn, self.page_size_combo]: btn.setEnabled(True) if hasattr(self, "table_select_all"): self.table_select_all.setEnabled(True) total_pages = max(1, (total_rows + self.page_size - 1) // self.page_size) if self.current_page > total_pages: self.current_page = total_pages start = (self.current_page - 1) * self.page_size end = min(start + self.page_size, total_rows) self.page_row_indices = list(range(start, end)) self.config_table.setRowCount(len(self.page_row_indices)) for table_row, config_index in enumerate(self.page_row_indices): config = self.configs[config_index] self.config_table.setItem(table_row, 0, QTableWidgetItem(str(config.get('多多id', '')))) self.config_table.setItem(table_row, 1, QTableWidgetItem(str(config.get('序号', '')))) self.config_table.setItem(table_row, 2, QTableWidgetItem(str(config.get('话题', '')))) self.config_table.setItem(table_row, 3, QTableWidgetItem(str(config.get('定时发布', '')))) self.config_table.setItem(table_row, 4, QTableWidgetItem(str(config.get('间隔时间', 0)))) self.config_table.setItem(table_row, 5, QTableWidgetItem(str(config.get('达人链接', '')))) self.config_table.setItem(table_row, 6, QTableWidgetItem(str(config.get('执行人', '')))) self._set_status_item(table_row, str(config.get('情况', '待执行'))) # 文件路径列(第8列,索引为8),如果配置中没有则显示空 file_path = config.get('文件路径', '') self.config_table.setItem(table_row, 8, QTableWidgetItem(str(file_path))) self._set_progress_item(table_row, str(config.get('情况', '待执行'))) self._set_action_buttons(table_row, config_index) # 固定列宽(不随内容自适应) self._apply_table_column_widths() # 未更新前,用配置行数作为待执行提示 self.set_status_cards(pending=self.config_table.rowCount()) if hasattr(self, "table_filter_status"): self.table_filter_status.setText( f"显示: {self.config_table.rowCount()}/{total_rows} | 命中: 0" ) if hasattr(self, "table_empty_label"): self.table_empty_label.setVisible(total_rows == 0) if hasattr(self, "page_info_label"): self.page_info_label.setText(f"第 {self.current_page}/{total_pages} 页") self.is_updating_table = False self.update_table_selection_count() def _setup_model_view(self): """切换到大数据量 Model/View 模式""" headers = ['多多ID', '序号', '话题', '定时发布', '间隔时间', '达人链接', '执行人', '情况', '文件路径', '进度', '操作'] if self.table_model is None: self.table_model = ConfigTableModel(self.configs, headers, self) self.table_proxy = QSortFilterProxyModel(self) self.table_proxy.setSourceModel(self.table_model) self.table_proxy.setFilterKeyColumn(-1) self.table_view.setModel(self.table_proxy) if self.table_view.selectionModel(): self.table_view.selectionModel().selectionChanged.connect(self.update_table_selection_count) self.table_delegate = TableActionDelegate(self.table_view, self._edit_row_from_view, self._delete_row_from_view) self.table_view.setItemDelegate(self.table_delegate) header = self.table_view.horizontalHeader() header.setStretchLastSection(False) header.setSectionResizeMode(QHeaderView.Interactive) self._apply_table_view_column_widths() else: self.table_model.update_data(self.configs) self.use_model_view = True self.table_view.setVisible(True) self.config_table.setVisible(False) if hasattr(self, "page_first_btn"): for btn in [self.page_first_btn, self.page_prev_btn, self.page_next_btn, self.page_last_btn, self.page_size_combo]: btn.setEnabled(False) if hasattr(self, "table_select_all"): self.table_select_all.setEnabled(False) if hasattr(self, "table_empty_label"): self.table_empty_label.setVisible(len(self.configs) == 0) if hasattr(self, "page_info_label"): total_rows = len(self.configs) self.page_info_label.setText(f"大数据模式:{total_rows} 行") self._show_infobar("warning", "提示", "数据量较大,已切换到Model/View模式(部分功能受限)") def _edit_row_from_view(self, index): """Model/View 编辑行""" if not index.isValid(): return edit_index = index.sibling(index.row(), 0) self.table_view.setCurrentIndex(edit_index) self.table_view.edit(edit_index) def _delete_row_from_view(self, index): """Model/View 删除行""" if not index.isValid(): return src_index = self.table_proxy.mapToSource(index) if src_index.isValid(): self.table_model.removeRows(src_index.row(), 1) def get_config_from_table(self, row_index=0): """从表格中获取指定行的配置数据(使用表格中修改后的值)""" if row_index >= self.config_table.rowCount(): return None def get_cell_text(row, col): """安全获取单元格文本""" item = self.config_table.item(row, col) return item.text().strip() if item else '' def get_cell_int(row, col, default=0): """安全获取单元格整数""" item = self.config_table.item(row, col) if item and item.text().strip(): try: return int(item.text().strip()) except ValueError: return default return default config = { '多多id': get_cell_text(row_index, 0), '序号': get_cell_text(row_index, 1), '话题': get_cell_text(row_index, 2), '定时发布': get_cell_text(row_index, 3), '间隔时间': get_cell_int(row_index, 4, 0), '达人链接': get_cell_text(row_index, 5), '执行人': get_cell_text(row_index, 6), '情况': get_cell_text(row_index, 7) or '待执行', '文件路径': get_cell_text(row_index, 8) # 第8列是文件路径 } return config def get_config(self): """获取当前配置数据""" schedule_time = self.schedule_datetime.dateTime().toString("yyyy-MM-dd HH:mm:ss") # 获取文件夹路径,如果为空则使用默认路径 folder_path = self.folder_path_input.text().strip() if not folder_path: folder_path = get_default_folder_path() return { '多多id': self.user_id_input.text(), '序号': self.index_input.text(), '话题': self.topic_input.text(), '定时发布': schedule_time if self.schedule_datetime.dateTime() > QDateTime.currentDateTime() else '', '达人链接': self.url_input.text(), '执行人': self.executor_input.text(), '文件夹路径': folder_path, '情况': '待执行' } def update_data(self): """更新数据:找出文件并保存到表格的文件路径列(更新所有行)""" try: # 检查是否有Excel导入的配置表格 if self.configs and self.config_table.rowCount() > 0: # 获取文件夹路径 folder_path = self.folder_path_input.text().strip() if not folder_path: folder_path = get_default_folder_path() if not os.path.exists(folder_path): self._show_infobar("warning", "警告", f"文件夹路径不存在: {folder_path}") return self.log_text.append("=" * 50) self.log_text.append("开始批量更新所有行的文件路径...") self.log_text.append(f"共有 {self.config_table.rowCount()} 行需要更新") # 遍历所有行,更新每行的文件路径 total_found = 0 for row_idx in range(self.config_table.rowCount()): config = self.get_config_from_table(row_idx) if not config: self.log_text.append(f"第 {row_idx + 1} 行:无法获取配置数据,跳过") continue # 验证必填字段 if not config.get('多多id') or not config.get('序号'): self.log_text.append(f"第 {row_idx + 1} 行:多多ID或序号为空,跳过") continue self.log_text.append(f"正在更新第 {row_idx + 1} 行的文件路径...") self.log_text.append(f" 多多ID: {config.get('多多id')}, 序号: {config.get('序号')}") # 查找该行对应的文件 found_files = self._find_files_for_config(config, folder_path) if found_files: # 将找到的文件路径拼接成字符串(多个文件用分号分隔) file_paths_str = "; ".join([str(f['path']) for f in found_files]) self.config_table.setItem(row_idx, 8, QTableWidgetItem(file_paths_str)) # 同时更新self.configs中对应的配置 if row_idx < len(self.configs): self.configs[row_idx]['文件路径'] = file_paths_str video_count = sum(1 for f in found_files if f['path'].is_file() and any( f['path'].suffix.lower() in ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm'] for ext in ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm'])) self.log_text.append(f" ✓ 找到 {len(found_files)} 个文件({video_count} 个视频)") total_found += len(found_files) else: # 清空文件路径列 self.config_table.setItem(row_idx, 8, QTableWidgetItem("")) if row_idx < len(self.configs): self.configs[row_idx]['文件路径'] = "" self.log_text.append(f" ✗ 未找到匹配的文件") self.log_text.append("=" * 50) self.log_text.append(f"批量更新完成!共更新 {self.config_table.rowCount()} 行,找到 {total_found} 个文件") self.update_status_label.setText(f"已更新: {self.config_table.rowCount()}行,{total_found}个文件") self.update_status_label.setStyleSheet("color: #4CAF50; font-size: 10px;") self.set_status_cards(update_text=f"已更新: {self.config_table.rowCount()}行", pending=total_found) # 预览自动补齐时间(按多多ID+序号分组) configs_with_rows = [] for row_idx in range(self.config_table.rowCount()): config = self.get_config_from_table(row_idx) if config: configs_with_rows.append({"row_idx": row_idx, "config": config}) self._apply_schedule_intervals(configs_with_rows) self._show_infobar("success", "成功", f"已更新文件路径:{self.config_table.rowCount()}行,{total_found}个文件") return else: # 没有表格,使用原来的逻辑(单个配置) config = self.get_config() current_row = -1 # 表示没有表格,使用prepared_files # 验证必填字段 if not config.get('多多id') or not config.get('序号'): self._show_infobar("warning", "警告", "请先填写多多ID和序号") return # 获取文件夹路径 folder_path = config.get('文件夹路径', '') if not folder_path: folder_path = get_default_folder_path() if not os.path.exists(folder_path): self._show_infobar("warning", "警告", f"文件夹路径不存在: {folder_path}") return self.log_text.append("=" * 50) self.log_text.append("开始更新数据,查找文件...") # 在文件夹中查找文件 found_files = self._find_files_for_config(config, folder_path) if found_files: self.prepared_files = found_files video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm'] video_count = sum(1 for f in found_files if f['path'].is_file() and any( f['path'].suffix.lower() == ext for ext in video_extensions)) folder_count = len(found_files) - video_count self.log_text.append( f"更新完成!找到 {len(found_files)} 个文件/文件夹({video_count} 个视频,{folder_count} 个文件夹)") self.update_status_label.setText(f"已更新: {len(found_files)}个文件") self.update_status_label.setStyleSheet("color: #4CAF50; font-size: 10px;") self.set_status_cards(update_text=f"已更新: {len(found_files)}个文件", pending=len(found_files)) self._show_infobar("success", "成功", f"已找到 {len(found_files)} 个文件/文件夹") else: self.prepared_files = None self.log_text.append("未找到匹配的文件") self.update_status_label.setText("未找到文件") self.update_status_label.setStyleSheet("color: #f44336; font-size: 10px;") self.set_status_cards(update_text="未找到文件", pending=0) self._show_infobar("warning", "警告", "未找到匹配的文件") except Exception as e: error_msg = f"更新数据失败: {str(e)}" self.log_text.append(error_msg) self._show_infobar("error", "错误", error_msg) logger.error(f"更新数据失败: {e}") import traceback traceback.print_exc() def _find_files_for_config(self, config, folder_path): """根据配置查找文件(辅助方法)""" found_files = [] # 参数验证 if not config: logger.warning("配置为空,无法查找文件") return found_files if not folder_path or not os.path.exists(folder_path): logger.warning(f"文件夹路径无效或不存在: {folder_path}") return found_files index = str(config.get('序号', '')).strip() if not index: logger.warning("序号为空,无法查找文件") return found_files video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm'] try: # 检查文件夹是否可读 if not os.access(folder_path, os.R_OK): logger.error(f"没有权限读取文件夹: {folder_path}") return found_files # 遍历最外层文件夹下的所有子文件夹 try: subdirs = [f for f in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, f))] except PermissionError: logger.error(f"没有权限访问文件夹: {folder_path}") return found_files except Exception as e: logger.error(f"读取文件夹失败: {e}") return found_files # 找到匹配当前多多ID的文件夹 target_subdir = None user_id = str(config.get('多多id', '')).strip() if not user_id: logger.warning("多多ID为空,无法查找文件") return found_files for subdir_name in subdirs: if subdir_name == user_id: target_subdir = os.path.join(folder_path, subdir_name) break if not target_subdir or not os.path.exists(target_subdir): return found_files # 扫描该文件夹下的文件 try: items = os.listdir(target_subdir) except PermissionError: logger.error(f"没有权限访问子文件夹: {target_subdir}") return found_files except Exception as e: logger.error(f"读取子文件夹失败: {e}") return found_files for item_name in items: try: item_path = os.path.join(target_subdir, item_name) # 跳过隐藏文件/文件夹 if item_name.startswith('.'): continue name_parts = item_name.split("-") # 检查序号是否匹配 if len(name_parts) > 0 and name_parts[0] == index: path_obj = Path(item_path) # 确保路径存在 if not path_obj.exists(): continue if path_obj.is_file(): # 检查是否为视频文件 if any(path_obj.suffix.lower() == ext for ext in video_extensions): found_files.append({ "url": config.get('达人链接', ''), "user_id": user_id, "time_start": config.get('定时发布', '') if config.get('定时发布') else None, "ht": config.get('话题', ''), "index": index, "path": path_obj }) elif path_obj.is_dir(): # 如果是文件夹,可能是图片文件夹 found_files.append({ "url": config.get('达人链接', ''), "user_id": user_id, "time_start": config.get('定时发布', '') if config.get('定时发布') else None, "ht": config.get('话题', ''), "index": index, "path": path_obj }) except Exception as e: logger.warning(f"处理文件 {item_name} 时出错: {e}") continue # 继续处理其他文件 except Exception as e: logger.error(f"查找文件失败: {e}", exc_info=True) return found_files def execute_task(self): """执行任务""" # 检查是否有Excel导入的配置 if self.configs: # 如果有Excel配置,批量处理 self.execute_batch_from_excel() else: # 否则使用当前输入的配置 config = self.get_config() # 验证必填字段 if not config.get('多多id') or not config.get('序号'): self._show_infobar("warning", "警告", "请填写所有必填字段(多多ID、序号)") return folder_path = config.get('文件夹路径', '') if not folder_path: folder_path = get_default_folder_path() config['文件夹路径'] = folder_path if not os.path.exists(folder_path): self._show_infobar("warning", "警告", f"文件夹路径不存在: {folder_path}") return # 显示使用的文件夹路径 self.log_text.append(f"使用文件夹路径: {folder_path}") # 检查是否勾选了批量上传 is_batch_mode = self.batch_upload_checkbox.isChecked() # 如果已经更新了数据,根据预查找的文件判断是否为批量上传 if self.prepared_files: video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm'] video_files = [f for f in self.prepared_files if f['path'].is_file() and any(f['path'].suffix.lower() == ext for ext in video_extensions)] if is_batch_mode and len(video_files) > 1: self.log_text.append(f"检测到 {len(video_files)} 个视频文件,使用批量上传模式...") elif is_batch_mode and len(video_files) <= 1: self.log_text.append(f"只找到 {len(video_files)} 个视频文件,将使用单个上传模式...") is_batch_mode = False elif not is_batch_mode: self.log_text.append(f"使用单个上传模式...") # 检查是否有正在运行的线程 if self.worker_thread and self.worker_thread.isRunning(): self._show_infobar("warning", "警告", "已有任务正在执行,请等待完成") return # 禁用按钮 self.execute_btn.setEnabled(False) self.progress_bar.setVisible(True) self.progress_bar.setValue(0) # 清理旧线程引用(如果存在) if self.worker_thread: try: # 断开旧信号的连接 self.worker_thread.finished.disconnect() self.worker_thread.log_message.disconnect() self.worker_thread.progress.disconnect() except Exception: pass # 如果信号未连接,忽略错误 self.worker_thread = None # 创建工作线程,传递预查找的文件列表 try: self.worker_thread = WorkerThread(config, is_batch_mode, self.prepared_files, self) # 连接finished信号,用于单个任务完成后显示弹窗 self.worker_thread.finished.connect(self.on_task_finished) self.worker_thread.log_message.connect(self.log_text.append) self.worker_thread.progress.connect(self.progress_bar.setValue) self.worker_thread.start() self.set_status_cards(pending=1) self.set_running_progress(0, 1) except Exception as e: error_msg = f"启动任务失败: {str(e)}" self._show_infobar("error", "错误", error_msg) self.log_text.append(error_msg) logger.error(f"启动任务失败: {e}") self.execute_btn.setEnabled(True) self.progress_bar.setVisible(False) self.log_text.append("=" * 50) mode_text = "批量上传" if is_batch_mode else "逐个上传" if self.prepared_files: self.log_text.append(f"开始执行任务({mode_text}模式,使用预查找的文件列表)...") else: self.log_text.append(f"开始执行任务({mode_text}模式)...") def execute_batch_from_excel(self): """从Excel配置批量执行(自动判断相同多多ID的mp4文件,批量上传)""" # 获取文件夹路径,如果为空则使用默认路径 folder_path = self.folder_path_input.text().strip() if not folder_path: folder_path = get_default_folder_path() if not os.path.exists(folder_path): self._show_infobar("warning", "警告", f"文件夹路径不存在: {folder_path}") return # 从表格中获取所有配置(使用用户修改后的值) if self.config_table.rowCount() == 0: self._show_infobar("warning", "警告", "配置列表为空,请先导入Excel配置") return self.log_text.append("=" * 50) self.log_text.append("开始分析配置,准备批量上传...") # 收集所有配置及其对应的文件 all_configs_with_files = [] configs_with_rows = [] video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm'] for row_idx in range(self.config_table.rowCount()): config = self.get_config_from_table(row_idx) if not config: continue # 验证必填字段 if not config.get('多多id') or not config.get('序号'): self.log_text.append(f"第 {row_idx + 1} 行:多多ID或序号为空,跳过") continue # 添加文件夹路径 config['文件夹路径'] = folder_path configs_with_rows.append({"row_idx": row_idx, "config": config}) # 应用定时发布 + 间隔时间逻辑(按多多ID分组) self._apply_schedule_intervals(configs_with_rows) for item in configs_with_rows: row_idx = item["row_idx"] config = item["config"] # 从文件路径列读取文件路径 file_path_str = config.get('文件路径', '').strip() if file_path_str: # 文件路径可能用分号分隔(多个文件) file_paths = [p.strip() for p in file_path_str.split(';') if p.strip()] files = [] for fp in file_paths: if os.path.exists(fp): path_obj = Path(fp) files.append({ "url": config.get('达人链接', ''), "user_id": config.get('多多id', ''), "time_start": config.get('定时发布', '') if config.get('定时发布') else None, "ht": config.get('话题', ''), "index": config.get('序号', ''), "path": path_obj }) if files: all_configs_with_files.append({ 'config': config, 'files': files, 'row_idx': row_idx }) self.log_text.append(f"第 {row_idx + 1} 行:从文件路径列读取到 {len(files)} 个文件") else: # 如果文件路径列为空,尝试查找文件 self.log_text.append(f"第 {row_idx + 1} 行:文件路径列为空,尝试查找文件...") found_files = self._find_files_for_config(config, folder_path) if found_files: all_configs_with_files.append({ 'config': config, 'files': found_files, 'row_idx': row_idx }) self.log_text.append(f"第 {row_idx + 1} 行:找到 {len(found_files)} 个文件") if not all_configs_with_files: self._show_infobar("warning", "警告", "未找到任何文件,请先点击'更新数据'按钮") return total_tasks = sum(len(item['files']) for item in all_configs_with_files) pending_tasks = total_tasks self.set_status_cards(pending=pending_tasks) self.set_running_progress(0, total_tasks) # 按多多ID分组 from collections import defaultdict grouped_by_user_id = defaultdict(list) for item in all_configs_with_files: user_id = item['config'].get('多多id', '') grouped_by_user_id[user_id].append(item) self.log_text.append("=" * 50) self.log_text.append(f"按多多ID分组:共 {len(grouped_by_user_id)} 个不同的多多ID") # 检查是否有正在运行的线程 if self.worker_thread and self.worker_thread.isRunning(): self._show_infobar("warning", "警告", "已有任务正在执行,请等待完成") return # 禁用按钮 self.execute_btn.setEnabled(False) self.progress_bar.setVisible(True) self.progress_bar.setValue(0) self.set_status_cards(running=1) # 处理每个多多ID组 total_processed = 0 for user_id, items in grouped_by_user_id.items(): self.log_text.append(f"\n处理多多ID: {user_id},共 {len(items)} 个配置") # 收集该多多ID下的所有文件 all_files = [] for item in items: all_files.extend(item['files']) # 分离视频文件和图片文件夹 video_files = [f for f in all_files if f['path'].is_file() and any(f['path'].suffix.lower() == ext for ext in video_extensions)] image_folders = [f for f in all_files if f['path'].is_dir()] self.log_text.append(f" 视频文件: {len(video_files)} 个") self.log_text.append(f" 图片文件夹: {len(image_folders)} 个") # 使用第一个配置创建Pdd实例(因为同一个多多ID,配置应该相同) first_config = items[0]['config'] pdd = Pdd( url=first_config.get('达人链接', ''), user_id=user_id, time_start=first_config.get('定时发布', '') if first_config.get('定时发布') else None, ht=first_config.get('话题', ''), index=first_config.get('序号', ''), title=first_config.get('标题', None) ) # 第一步:如果有多个视频文件(>1),批量上传所有视频 if len(video_files) > 1: # 检查是否都是mp4文件 all_mp4 = all(f['path'].suffix.lower() == '.mp4' for f in video_files) if all_mp4: self.log_text.append(f" ✓ 检测到 {len(video_files)} 个mp4文件,批量上传所有视频(action1方法)") else: self.log_text.append(f" ✓ 检测到 {len(video_files)} 个视频文件,批量上传所有视频(action1方法)") # 批量上传所有视频 try: # 清理旧线程(如果存在) if self.worker_thread: try: self.worker_thread.log_message.disconnect() self.worker_thread.progress.disconnect() except Exception: pass self.worker_thread = WorkerThread(first_config, True, video_files, self) # 不连接finished信号,避免每个任务完成就弹窗 self.worker_thread.log_message.connect(self.log_text.append) self.worker_thread.progress.connect(self.progress_bar.setValue) self.worker_thread.start() # 等待完成,使用超时避免无限阻塞 if not self.worker_thread.wait(300000): # 最多等待5分钟 logger.warning("批量上传视频任务超时") self.log_text.append(" ⚠ 批量上传视频任务超时") else: self.log_text.append(f" ✓ 批量上传 {len(video_files)} 个视频完成") total_processed += len(video_files) pending_tasks = max(pending_tasks - len(video_files), 0) self.set_status_cards(pending=pending_tasks) self.set_running_progress(total_processed, total_tasks) except Exception as e: error_msg = f"批量上传视频失败: {str(e)}" self.log_text.append(f" ✗ {error_msg}") logger.error(error_msg, exc_info=True) total_processed += len(video_files) # 即使失败也计入处理数,避免卡住 pending_tasks = max(pending_tasks - len(video_files), 0) self.set_status_cards(pending=pending_tasks) self.set_running_progress(total_processed, total_tasks) elif len(video_files) == 1: # 只有1个视频,单个上传 self.log_text.append(f" → 只有1个视频文件,单个上传") try: # 清理旧线程(如果存在) if self.worker_thread: try: self.worker_thread.log_message.disconnect() self.worker_thread.progress.disconnect() except Exception: pass self.worker_thread = WorkerThread(first_config, False, video_files, self) # 不连接finished信号,避免每个任务完成就弹窗 self.worker_thread.log_message.connect(self.log_text.append) self.worker_thread.progress.connect(self.progress_bar.setValue) self.worker_thread.start() # 使用超时等待,避免无限阻塞 if not self.worker_thread.wait(300000): # 最多等待5分钟 logger.warning("单个上传视频任务超时") except Exception as e: error_msg = f"单个上传视频失败: {str(e)}" self.log_text.append(error_msg) logger.error(error_msg, exc_info=True) continue # 继续处理下一个多多ID组 total_processed += len(video_files) pending_tasks = max(pending_tasks - len(video_files), 0) self.set_status_cards(pending=pending_tasks) self.set_running_progress(total_processed, total_tasks) self.log_text.append(f" ✓ 单个视频上传完成") # 第二步:如果有图片文件夹,逐个上传图片 if image_folders: self.log_text.append(f" → 开始逐个上传 {len(image_folders)} 个图片文件夹") for idx, img_folder in enumerate(image_folders, 1): # 找到该图片文件夹对应的配置(通过序号匹配) folder_index = img_folder.get('index', '') matching_config = None for item in items: if item['config'].get('序号', '') == folder_index: matching_config = item['config'] break # 如果找不到匹配的配置,使用第一个配置 if not matching_config: matching_config = first_config self.log_text.append(f" 上传第 {idx}/{len(image_folders)} 个图片文件夹(序号: {folder_index})") pdd_img = Pdd( url=matching_config.get('达人链接', ''), user_id=user_id, time_start=matching_config.get('定时发布', '') if matching_config.get('定时发布') else None, ht=matching_config.get('话题', ''), index=folder_index, title=matching_config.get('标题', None) ) try: # 清理旧线程(如果存在) if self.worker_thread: try: self.worker_thread.log_message.disconnect() self.worker_thread.progress.disconnect() except Exception: pass self.worker_thread = WorkerThread(matching_config, False, [img_folder], self) # 不连接finished信号,避免每个任务完成就弹窗 self.worker_thread.log_message.connect(self.log_text.append) self.worker_thread.progress.connect(self.progress_bar.setValue) self.worker_thread.start() # 使用超时等待,避免无限阻塞 if not self.worker_thread.wait(300000): # 最多等待5分钟 logger.warning(f"上传图片文件夹任务超时: {img_folder}") total_processed += 1 continue except Exception as e: error_msg = f"上传图片文件夹失败: {str(e)}" self.log_text.append(error_msg) logger.error(error_msg, exc_info=True) total_processed += 1 continue # 继续处理下一个图片文件夹 total_processed += 1 pending_tasks = max(pending_tasks - 1, 0) self.set_status_cards(pending=pending_tasks) self.set_running_progress(total_processed, total_tasks) self.log_text.append(f" ✓ 图片文件夹 {idx} 上传完成") self.log_text.append(f" ✓ 所有图片文件夹上传完成") self.progress_bar.setValue(100) self.log_text.append("=" * 50) self.log_text.append(f"所有任务执行完成!共处理 {total_processed} 个文件/文件夹") self.execute_btn.setEnabled(True) self.set_running_progress(0, 0) self.set_status_cards(pending=0) # 所有任务完成后,显示完成弹窗 self._show_infobar("success", "任务完成", f"共处理 {total_processed} 个文件/文件夹") def count_videos_in_folder(self, folder_path, index): """统计文件夹中匹配序号的视频文件数量(与main.py逻辑一致)""" count = 0 try: # 遍历最外层文件夹下的所有子文件夹(与main.py逻辑一致) for file in os.listdir(folder_path): # 获取文件夹下所有的文件夹 file_path = os.path.join(folder_path, file) # 拼接文件夹 # 检查是否为目录,跳过文件(如.lnk快捷方式) if not os.path.isdir(file_path): continue files = os.listdir(file_path) # 获取用户id下的文件 for file_name in files: # 检查是否是视频文件(.mp4) if ".mp4" in file_name: # 用"-"分割文件名,检查第一部分是否等于序号 # 文件名格式:4-茶叶蛋大冒险-.mp4 -> ['4', '茶叶蛋大冒险', '', 'mp4'] file_names = file_name.split("-") if len(file_names) > 0 and file_names[0] == str(index): path = Path(os.path.join(file_path, file_name)) # 判断是否为文件 if path.is_file(): count += 1 else: # 如果是文件夹,统计其中的文件 for sub_file in os.listdir(path): sub_path = Path(os.path.join(path, sub_file)) if sub_path.is_file(): count += 1 except Exception as e: logger.error(f"统计视频文件失败: {e}") return count def on_task_finished(self, success, message): """任务完成回调""" self.progress_bar.setValue(100) self.set_running_progress(0, 0) self.set_status_cards(pending=0) if success: self._show_infobar("success", "成功", message) else: self._show_infobar("error", "失败", message) # 恢复按钮 self.execute_btn.setEnabled(True) self.log_text.append(f"任务完成: {message}") self.log_text.append("=" * 50) def closeEvent(self, event): """关闭事件""" if self._is_closing: event.accept() return if self.worker_thread and self.worker_thread.isRunning(): reply = QMessageBox.question( self, "确认", "任务正在执行中,确定要退出吗?", QMessageBox.Yes | QMessageBox.No ) if reply == QMessageBox.Yes: self._is_closing = True try: if self.worker_thread: # 停止线程 self.worker_thread.stop() # 等待线程结束,设置超时避免无限等待 if not self.worker_thread.wait(3000): # 等待3秒 logger.warning("线程未能及时停止,强制终止") self.worker_thread.terminate() self.worker_thread.wait(1000) # 再等待1秒 # 断开信号连接 try: self.worker_thread.finished.disconnect() self.worker_thread.log_message.disconnect() self.worker_thread.progress.disconnect() except Exception: pass self.worker_thread = None except Exception as e: logger.error(f"关闭线程时出错: {e}") event.accept() else: event.ignore() else: event.accept() def main(): app = QApplication(sys.argv) setTheme(Theme.LIGHT) window = MainWindow() window.show() sys.exit(app.exec_()) if __name__ == '__main__': main() # docker run honeygain/honeygain -tou-accept -email ddrwode1@gmail.com -pass 040828cjj -device DEVICE_NAME