Files
haha/gui_app.py
2026-02-09 00:22:28 +08:00

4347 lines
195 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import re
from pathlib import Path
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
import pandas as pd
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QFileDialog, QTableWidgetItem, QMessageBox,
QGridLayout, QStackedWidget, QButtonGroup,
QStyle, QComboBox, QFrame, QShortcut, QMenu,
QAbstractItemView, QTableView, QHeaderView,
QTabWidget, QSplitter, QSizePolicy, QCheckBox
)
from PyQt5.QtCore import Qt, QSortFilterProxyModel, QRegularExpression, QSettings, QTimer, QEvent
from PyQt5.QtGui import QFont, QTextCursor, QKeySequence, QColor
from qfluentwidgets import (
setTheme, Theme,
PushButton, PrimaryPushButton, LineEdit, TextEdit,
TableWidget, CheckBox, ProgressBar, CardWidget,
InfoBar, InfoBarPosition
)
from loguru import logger
from gui_constants import get_default_folder_path, TABLE_HEADERS, MODEL_VIEW_HEADERS
from gui_worker import WorkerThread
from gui_models import ConfigTableModel, TableActionDelegate
# ---------------------------------------------------------------------------
# 主窗口
# ---------------------------------------------------------------------------
class MainWindow(QMainWindow):
"""主窗口"""
def __init__(self):
super().__init__()
self.worker_thread = None
self.configs: List[Dict[str, Any]] = [] # 存储从Excel导入的配置
self.prepared_files = None # 存储通过"更新数据"找到的文件列表
self.running_total = 0
self.running_done = 0
self.nav_compact = False
self.table_match_rows = []
self.table_match_index = -1
self.table_sort_keys = []
self.page_size = 20
self.current_page = 1
self.page_row_indices = []
self.filtered_config_indices = None # 文本/状态筛选后的配置索引None 表示未筛选(显示全部)
self.use_model_view = False
# 批量任务队列相关
self.batch_task_queue = [] # 任务队列
self.current_batch_task_index = 0 # 当前任务索引
self.batch_total_tasks = 0 # 总任务数
self.batch_processed = 0 # 已处理任务数
self.batch_pending_tasks = 0 # 待处理任务数
self.batch_success_count = 0 # 成功数量
self.batch_failed_count = 0 # 失败数量
self._is_paused = False # 任务是否暂停
self._is_terminated = False # 任务是否被终止
self.table_model = None
self.table_proxy = None
self.log_match_positions = []
self.log_match_index = -1
self.is_updating_table = False
self._is_closing = False # 标记是否正在关闭窗口
self._column_width_ratios = {} # 存储列宽比例,用于自动调整
self._is_manual_resize = False # 标记是否正在手动调整列宽
# 任务执行时用于“多多ID+序号 -> 行号”的映射(用于精确回写状态)
self._row_map_by_user_index = {}
# 以下属性在 init_ui() 中初始化,提前声明以满足 PyCharm 代码审查
self._table_view_column_width_ratios = {}
self._is_table_view_manual_resize = False
self._resize_timer = None
self._table_view_resize_timer = None
self._auto_resize_timer = None
self._auto_resize_table_view_timer = None
self.execute_btn = None
self.pause_btn = None
self.resume_btn = None
self.terminate_btn = None
self.status_update_value = None
self.status_pending_value = None
self.status_running_value = None
self.status_success_value = None
self.status_failed_value = None
self.nav_card = None
self.nav_title = None
self.nav_divider = None
self.nav_group = None
self.nav_main = None
self.nav_log = None
self.nav_buttons = []
self.nav_footer = None
self.nav_toggle_btn = None
self.page_stack = None
self.excel_path_input = None
self.import_btn = None
self.download_template_btn = None
self.add_user_id_input = None
self.add_index_input = None
self.add_topic_input = None
self.add_schedule_input = None
self.add_interval_input = None
self.add_expert_link_input = None
self.add_executor_input = None
self.add_config_btn = None
self.clear_add_inputs_btn = None
self.folder_path_input = None
self.folder_browse_btn = None
self.update_data_btn = None
self.update_status_label = None
self.batch_upload_checkbox = None
self.batch_limit_input = None
self.table_group = None
self.table_edit_hint = None
self.table_select_all_checkbox = None
self.table_search_input = None
self.table_column_filter = None
self.table_highlight = None
self.table_only_match = None
self.table_prev_match_btn = None
self.table_next_match_btn = None
self.table_clear_btn = None
self.table_export_all_btn = None
self.table_select_count = None
self.config_table = None
self._filter_table_columns = []
self._filter_model_columns = []
self.table_view = None
self.table_empty_label = None
self.page_size_combo = None
self.page_info_label = None
self.page_first_btn = None
self.page_prev_btn = None
self.page_next_btn = None
self.page_last_btn = None
self.config_splitter = None
self.workbench_tabs = None
self.progress_bar = None
self.log_search_input = None
self.log_highlight_check = None
self.log_whole_word = None
self.log_prev_btn = None
self.log_next_btn = None
self.log_match_status = None
self.log_export_btn = None
self.clear_log_btn = None
self.log_text = None
self.shortcut_log_next = None
self.shortcut_log_prev = None
self._suppress_filter_clear_once = False
self._current_status_filter = None
self._update_data_running = False
self.init_ui()
def init_ui(self):
# 初始化列宽调整相关的变量和定时器
if not hasattr(self, '_table_view_column_width_ratios'):
self._table_view_column_width_ratios = {}
if not hasattr(self, '_is_table_view_manual_resize'):
self._is_table_view_manual_resize = False
if not hasattr(self, '_resize_timer'):
self._resize_timer = QTimer()
self._resize_timer.setSingleShot(True)
self._resize_timer.timeout.connect(self._delayed_update_ratios)
if not hasattr(self, '_table_view_resize_timer'):
self._table_view_resize_timer = QTimer()
self._table_view_resize_timer.setSingleShot(True)
self._table_view_resize_timer.timeout.connect(self._delayed_update_table_view_ratios)
if not hasattr(self, '_auto_resize_timer'):
self._auto_resize_timer = QTimer()
self._auto_resize_timer.setSingleShot(True)
self._auto_resize_timer.timeout.connect(self._auto_resize_table_columns)
if not hasattr(self, '_auto_resize_table_view_timer'):
self._auto_resize_table_view_timer = QTimer()
self._auto_resize_table_view_timer.setSingleShot(True)
self._auto_resize_table_view_timer.timeout.connect(self._auto_resize_table_view_columns)
self.setWindowTitle("拼多多自动化发布工具")
self.setGeometry(100, 100, 1000, 800)
# 创建中央部件
central_widget = QWidget()
self.setCentralWidget(central_widget)
# 主布局
main_layout = QVBoxLayout()
main_layout.setContentsMargins(20, 20, 20, 20)
main_layout.setSpacing(14)
central_widget.setLayout(main_layout)
# 顶部标题区
header_layout = QHBoxLayout()
title_box = QVBoxLayout()
title_label = QLabel("拼多多自动化发布工具", None)
title_label.setFont(QFont("Microsoft YaHei", 16, QFont.Bold))
subtitle_label = QLabel("配置导入 • 文件查找 • 批量上传", None)
subtitle_label.setFont(QFont("Microsoft YaHei", 10))
title_box.addWidget(title_label)
title_box.addWidget(subtitle_label)
header_layout.addLayout(title_box)
header_layout.addStretch()
header_actions = QHBoxLayout()
self.execute_btn = PrimaryPushButton("开始上传", None)
self.execute_btn.clicked.connect(self.execute_task)
header_actions.addWidget(self.execute_btn)
self.pause_btn = PushButton("暂停", None)
self.pause_btn.setVisible(False)
self.pause_btn.clicked.connect(self.pause_task)
self.pause_btn.setStyleSheet("QPushButton { background-color: #fff3e0; color: #e65100; }")
header_actions.addWidget(self.pause_btn)
self.resume_btn = PushButton("继续", None)
self.resume_btn.setVisible(False)
self.resume_btn.clicked.connect(self.resume_task)
self.resume_btn.setStyleSheet("QPushButton { background-color: #e8f5e9; color: #2e7d32; }")
header_actions.addWidget(self.resume_btn)
self.terminate_btn = PushButton("终止", None)
self.terminate_btn.setVisible(False)
self.terminate_btn.clicked.connect(self.terminate_task)
self.terminate_btn.setStyleSheet("QPushButton { background-color: #ffebee; color: #c62828; }")
header_actions.addWidget(self.terminate_btn)
header_layout.addLayout(header_actions)
main_layout.addLayout(header_layout)
# 状态卡片区
status_layout = QHBoxLayout()
self.status_update_value = QLabel("未更新", None)
self.status_pending_value = QLabel("0", None)
self.status_running_value = QLabel("0", None)
self.status_success_value = QLabel("0", None)
self.status_failed_value = QLabel("0", None)
update_card = self._build_status_card(
"更新状态",
self.status_update_value,
self.style().standardIcon(QStyle.StandardPixmap.SP_BrowserReload),
"#e6f4ff",
"文件路径扫描"
)
pending_card = self._build_status_card(
"待执行",
self.status_pending_value,
self.style().standardIcon(QStyle.StandardPixmap.SP_FileDialogInfoView),
"#fff7ed",
"等待处理任务"
)
running_card = self._build_status_card(
"执行中",
self.status_running_value,
self.style().standardIcon(QStyle.StandardPixmap.SP_MediaPlay),
"#ecfdf3",
"当前执行进度",
with_progress=True
)
success_card = self._build_status_card(
"成功",
self.status_success_value,
self.style().standardIcon(QStyle.StandardPixmap.SP_DialogApplyButton),
"#dcfce7",
"成功任务数量",
clickable=False
)
failed_card = self._build_status_card(
"失败",
self.status_failed_value,
self.style().standardIcon(QStyle.StandardPixmap.SP_MessageBoxCritical),
"#fee2e2",
"失败任务数量",
clickable=False
)
status_layout.addWidget(update_card)
status_layout.addWidget(pending_card)
status_layout.addWidget(running_card)
status_layout.addWidget(success_card)
status_layout.addWidget(failed_card)
main_layout.addLayout(status_layout)
# 中间内容区(侧边导航 + 页面)
content_layout = QHBoxLayout()
content_layout.setSpacing(12)
main_layout.addLayout(content_layout)
nav_card = CardWidget()
self.nav_card = nav_card
nav_layout = QVBoxLayout(nav_card)
nav_layout.setContentsMargins(10, 10, 10, 10)
nav_layout.setSpacing(8)
nav_card.setFixedWidth(150)
nav_card.setStyleSheet("""
QPushButton {
text-align: left;
padding: 8px 10px;
border-radius: 6px;
border-left: 3px solid transparent;
}
QPushButton:hover {
background-color: rgba(0, 120, 212, 0.08);
}
QPushButton:checked {
background-color: rgba(0, 120, 212, 0.15);
font-weight: 600;
border-left: 3px solid #0078D4;
}
""")
self.nav_title = QLabel("导航", None)
self.nav_title.setFont(QFont("Microsoft YaHei", 10, QFont.Bold))
nav_layout.addWidget(self.nav_title)
self.nav_divider = QFrame()
self.nav_divider.setFrameShape(QFrame.HLine)
self.nav_divider.setStyleSheet("color: rgba(0, 0, 0, 0.12);")
nav_layout.addWidget(self.nav_divider)
self.nav_group = QButtonGroup(self)
self.nav_group.setExclusive(True)
self.nav_main = PushButton("工作台", None)
self.nav_log = PushButton("日志", None)
nav_items = [
(self.nav_main, QStyle.StandardPixmap.SP_DesktopIcon),
(self.nav_log, QStyle.StandardPixmap.SP_FileDialogContentsView),
]
self.nav_buttons = []
for idx, (btn, icon_type) in enumerate(nav_items):
btn.setCheckable(True)
# 去掉左侧图标,避免在某些环境下图标位置异常遮挡中文文字
# 如果后续需要图标,可以在这里重新启用并配合样式表单独调整 padding
# btn.setIcon(self.style().standardIcon(icon_type))
# btn.setIconSize(QSize(16, 16))
self.nav_group.addButton(btn, idx)
nav_layout.addWidget(btn)
self.nav_buttons.append(btn)
nav_layout.addStretch()
nav_footer_row = QHBoxLayout()
self.nav_footer = QLabel("v1.0", None)
self.nav_footer.setStyleSheet("color: #999; font-size: 10px;")
nav_footer_row.addWidget(self.nav_footer)
nav_footer_row.addStretch()
self.nav_toggle_btn = PushButton("收起", None)
self.nav_toggle_btn.clicked.connect(self.toggle_nav_compact)
nav_footer_row.addWidget(self.nav_toggle_btn)
nav_layout.addLayout(nav_footer_row)
content_layout.addWidget(nav_card)
self.page_stack = QStackedWidget()
content_layout.addWidget(self.page_stack)
content_layout.setStretch(1, 1)
# 配置输入区域
config_group = CardWidget()
config_layout = QVBoxLayout(config_group)
config_layout.setContentsMargins(12, 12, 12, 12)
config_title = QLabel("配置信息", None)
config_title.setFont(QFont("Microsoft YaHei", 11, QFont.Bold))
config_layout.addWidget(config_title)
config_group.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Preferred)
config_group.setMaximumHeight(450) # 增加高度以容纳单条配置输入区域
# Excel导入合并到配置
import_row = QHBoxLayout()
import_row.addWidget(QLabel("Excel文件:", None))
self.excel_path_input = LineEdit()
self.excel_path_input.setReadOnly(True)
self.excel_path_input.setPlaceholderText("点击导入按钮选择Excel配置文件")
import_row.addWidget(self.excel_path_input)
self.import_btn = PrimaryPushButton("导入配置", None)
self.import_btn.setToolTip("选择Excel文件并导入配置")
self.import_btn.clicked.connect(self.import_excel)
import_row.addWidget(self.import_btn)
self.download_template_btn = PushButton("下载模板", None)
self.download_template_btn.setToolTip("下载Excel配置模板文件")
self.download_template_btn.clicked.connect(self.download_excel_template)
import_row.addWidget(self.download_template_btn)
config_layout.addLayout(import_row)
# 添加单条配置输入区域
add_config_separator = QLabel("或 手动添加单条配置", None)
add_config_separator.setFont(QFont("Microsoft YaHei", 9, QFont.Bold))
add_config_separator.setStyleSheet("color: #666; margin-top: 8px;")
config_layout.addWidget(add_config_separator)
# 单条配置输入表单
add_config_grid = QGridLayout()
add_config_grid.setHorizontalSpacing(12)
add_config_grid.setVerticalSpacing(8)
# 第一行多多ID、序号、话题
add_config_grid.addWidget(QLabel("多多ID:", None), 0, 0)
self.add_user_id_input = LineEdit()
self.add_user_id_input.setPlaceholderText("输入多多ID")
add_config_grid.addWidget(self.add_user_id_input, 0, 1)
add_config_grid.addWidget(QLabel("序号:", None), 0, 2)
self.add_index_input = LineEdit()
self.add_index_input.setPlaceholderText("输入序号")
self.add_index_input.setFixedWidth(80)
add_config_grid.addWidget(self.add_index_input, 0, 3)
add_config_grid.addWidget(QLabel("话题:", None), 0, 4)
self.add_topic_input = LineEdit()
self.add_topic_input.setPlaceholderText("输入话题(如 #话题)")
add_config_grid.addWidget(self.add_topic_input, 0, 5)
# 第二行:定时发布、间隔时间、达人链接
add_config_grid.addWidget(QLabel("定时发布:", None), 1, 0)
self.add_schedule_input = LineEdit()
self.add_schedule_input.setPlaceholderText("yyyy-MM-dd HH:mm可选")
add_config_grid.addWidget(self.add_schedule_input, 1, 1)
add_config_grid.addWidget(QLabel("间隔(分):", None), 1, 2)
self.add_interval_input = LineEdit()
self.add_interval_input.setPlaceholderText("延迟分钟")
self.add_interval_input.setFixedWidth(80)
add_config_grid.addWidget(self.add_interval_input, 1, 3)
add_config_grid.addWidget(QLabel("达人链接:", None), 1, 4)
self.add_expert_link_input = LineEdit()
self.add_expert_link_input.setPlaceholderText("输入达人链接(可选)")
add_config_grid.addWidget(self.add_expert_link_input, 1, 5)
# 第三行:执行人和添加按钮
add_config_grid.addWidget(QLabel("执行人:", None), 2, 0)
self.add_executor_input = LineEdit()
self.add_executor_input.setPlaceholderText("输入执行人(可选)")
add_config_grid.addWidget(self.add_executor_input, 2, 1)
add_config_btn_layout = QHBoxLayout()
add_config_btn_layout.addStretch()
self.add_config_btn = PrimaryPushButton("添加到配置列表", None)
self.add_config_btn.setToolTip("将当前输入的配置添加到配置列表中")
self.add_config_btn.clicked.connect(self.add_single_config)
add_config_btn_layout.addWidget(self.add_config_btn)
self.clear_add_inputs_btn = PushButton("清空输入", None)
self.clear_add_inputs_btn.clicked.connect(self.clear_add_config_inputs)
add_config_btn_layout.addWidget(self.clear_add_inputs_btn)
add_config_grid.addLayout(add_config_btn_layout, 2, 2, 1, 4)
config_layout.addLayout(add_config_grid)
# 分隔线
divider = QFrame()
divider.setFrameShape(QFrame.HLine)
divider.setStyleSheet("color: rgba(0, 0, 0, 0.12); margin: 8px 0;")
config_layout.addWidget(divider)
grid = QGridLayout()
grid.setHorizontalSpacing(12)
grid.setVerticalSpacing(10)
# 文件夹路径(最外层文件夹)
grid.addWidget(QLabel("资料文件夹路径:", None), 0, 0)
self.folder_path_input = LineEdit()
default_path = get_default_folder_path()
self.folder_path_input.setPlaceholderText(f"留空则使用默认路径: {default_path}")
self.folder_path_input.setClearButtonEnabled(True)
grid.addWidget(self.folder_path_input, 0, 1, 1, 2)
self.folder_browse_btn = PushButton("浏览", None)
self.folder_browse_btn.clicked.connect(self.browse_folder)
grid.addWidget(self.folder_browse_btn, 0, 3)
tip_label = QLabel("提示:只需填写最外层文件夹路径,程序会自动查找子文件夹中的文件", None)
tip_label.setStyleSheet("color: #666; font-size: 10px;")
grid.addWidget(tip_label, 1, 0, 1, 4)
# 更新数据按钮 + 批量上传(同一行)
update_row = QHBoxLayout()
self.update_data_btn = PrimaryPushButton("更新数据", None)
self.update_data_btn.clicked.connect(self.update_data)
update_row.addWidget(self.update_data_btn)
self.update_status_label = QLabel("未更新", None)
self.update_status_label.setStyleSheet("color: #666; font-size: 10px;")
update_row.addWidget(self.update_status_label)
update_row.addStretch()
self.batch_upload_checkbox = CheckBox("批量上传(如果文件夹中有多个视频,将使用批量上传模式)", None)
self.batch_upload_checkbox.setChecked(False)
update_row.addWidget(self.batch_upload_checkbox)
# 添加间隔
update_row.addSpacing(20)
# 单次上限数输入框
batch_limit_label = QLabel("单次上限数:", None)
update_row.addWidget(batch_limit_label)
self.batch_limit_input = LineEdit()
self.batch_limit_input.setPlaceholderText("5")
self.batch_limit_input.setText("5")
self.batch_limit_input.setFixedWidth(50)
self.batch_limit_input.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.batch_limit_input.setToolTip("每次批量上传的最大视频数量,超过则分批上传")
update_row.addWidget(self.batch_limit_input)
update_row_widget = QWidget()
update_row_widget.setLayout(update_row)
grid.addWidget(update_row_widget, 2, 0, 1, 4)
config_layout.addLayout(grid)
config_layout.addStretch()
config_page = QWidget()
config_page_layout = QVBoxLayout(config_page)
config_page_layout.setContentsMargins(0, 0, 0, 0)
config_page_layout.setSpacing(12)
config_page_layout.addWidget(config_group)
# 配置列表表格如果从Excel导入
self.table_group = CardWidget()
table_layout = QVBoxLayout(self.table_group)
table_layout.setContentsMargins(12, 12, 12, 12)
table_title = QLabel("配置列表从Excel导入后显示可直接在表格中编辑", None)
table_title.setFont(QFont("Microsoft YaHei", 11, QFont.Bold))
table_layout.addWidget(table_title)
self.table_edit_hint = QLabel("编辑模式:当前行已高亮,其它行已锁定。修改后点击“确认”保存,点击“退出”还原。", None)
self.table_edit_hint.setVisible(False)
self.table_edit_hint.setStyleSheet(
"background-color: #fff7ed; color: #9a3412; "
"border: 1px solid #fed7aa; border-radius: 6px; padding: 6px 8px;"
)
table_layout.addWidget(self.table_edit_hint)
search_row = QHBoxLayout()
self.table_select_all_checkbox = CheckBox("全选/取消", None)
# noinspection PyUnresolvedReferences
self.table_select_all_checkbox.stateChanged.connect(self.toggle_all_checkboxes)
search_row.addWidget(self.table_select_all_checkbox)
self.table_search_input = LineEdit()
self.table_search_input.setPlaceholderText("搜索表格(支持空格多关键词)")
self.table_search_input.setClearButtonEnabled(True)
self.table_search_input.setFixedWidth(250)
self.table_search_input.textChanged.connect(self.filter_table)
search_row.addWidget(self.table_search_input)
self.table_column_filter = QComboBox()
self.table_column_filter.currentIndexChanged.connect(lambda: self.filter_table(self.table_search_input.text()))
search_row.addWidget(self.table_column_filter)
self.table_highlight = CheckBox("高亮匹配", None)
self.table_highlight.setChecked(True)
# noinspection PyUnresolvedReferences
self.table_highlight.stateChanged.connect(lambda: self.filter_table(self.table_search_input.text()))
search_row.addWidget(self.table_highlight)
self.table_only_match = CheckBox("仅显示匹配项", None)
self.table_only_match.setChecked(True)
# noinspection PyUnresolvedReferences
self.table_only_match.stateChanged.connect(lambda: self.filter_table(self.table_search_input.text()))
search_row.addWidget(self.table_only_match)
self.table_prev_match_btn = PushButton("上一条", None)
self.table_prev_match_btn.clicked.connect(self.prev_table_match)
search_row.addWidget(self.table_prev_match_btn)
self.table_next_match_btn = PushButton("下一条", None)
self.table_next_match_btn.clicked.connect(self.next_table_match)
search_row.addWidget(self.table_next_match_btn)
self.table_clear_btn = PushButton("清空筛选", None)
self.table_clear_btn.clicked.connect(self._clear_filter_and_selection)
search_row.addWidget(self.table_clear_btn)
update_row.addSpacing(20)
self.table_export_all_btn = PushButton("导出全部", None)
self.table_export_all_btn.clicked.connect(self.export_all_rows)
search_row.addWidget(self.table_export_all_btn)
self.table_select_count = QLabel("已选: 0", None)
self.table_select_count.setStyleSheet("color: #666; font-weight: bold;")
search_row.addWidget(self.table_select_count)
table_layout.addLayout(search_row)
self.config_table = TableWidget()
self.config_table.setStyleSheet("""
QTableView::item:selected {
background-color: rgba(0, 120, 212, 0.18);
}
QTableView::item:hover {
background-color: rgba(0, 120, 212, 0.08);
}
QHeaderView::section {
background-color: #1f2937;
color: #ffffff;
padding: 8px;
border: none;
}
QHeaderView::section:hover {
background-color: #374151;
}
""")
self.config_table.setColumnCount(12)
self.config_table.setHorizontalHeaderLabels(TABLE_HEADERS)
self.table_column_filter.addItem("全部列")
# 第0列为勾选框记录下拉项对应的表格列索引及 Model 列索引
self._filter_table_columns = []
self._filter_model_columns = [] # Model/View 无勾选列
for col in range(1, 10):
header = self.config_table.horizontalHeaderItem(col)
if header:
self.table_column_filter.addItem(header.text())
self._filter_table_columns.append(col)
# Model/View 列映射1..8→0..7, 9→8
self._filter_model_columns.append(8 if col == 9 else col - 1)
# 默认按多多ID筛选多多ID为下拉第2项index=1
if self._filter_table_columns and self._filter_table_columns[0] == 1:
self.table_column_filter.setCurrentIndex(1)
header = self.config_table.horizontalHeader()
header.setStretchLastSection(True)
# 设置所有列为Interactive模式允许用户拖拽调整宽度
header.setSectionResizeMode(QHeaderView.Interactive)
# 设置最小列宽,确保内容可见
header.setMinimumSectionSize(50)
# 设置文本省略模式(超长文本显示省略号)
self.config_table.setTextElideMode(Qt.TextElideMode.ElideRight)
self.config_table.setAlternatingRowColors(True)
self.config_table.setSortingEnabled(True)
self.config_table.horizontalHeader().setSortIndicatorShown(True)
self.config_table.verticalHeader().setVisible(False)
self.config_table.verticalHeader().setDefaultSectionSize(42)
# 禁用直接编辑,只能通过编辑按钮进入编辑模式
self.config_table.setEditTriggers(TableWidget.NoEditTriggers)
self.config_table.setSelectionBehavior(QAbstractItemView.SelectRows)
self.config_table.setSelectionMode(QAbstractItemView.ExtendedSelection)
self.config_table.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu)
self.config_table.customContextMenuRequested.connect(self.show_table_context_menu)
self.config_table.itemChanged.connect(self.on_table_item_changed)
self.config_table.horizontalHeader().sectionClicked.connect(self.on_header_clicked)
# 连接列宽调整信号,用于跟踪手动调整和自动调整
self.config_table.horizontalHeader().sectionResized.connect(self.on_column_resized)
# 连接表格resize事件用于自动按比例调整列宽
self.config_table.horizontalHeader().geometriesChanged.connect(self.on_table_geometry_changed)
# 点击空白区域或按Esc键时退出编辑状态
self.config_table.viewport().installEventFilter(self)
self.config_table.installEventFilter(self)
table_layout.addWidget(self.config_table, 1) # stretch=1占据剩余空间
# 大数据模式表格Model/View
self.table_view = QTableView()
self.table_view.setAlternatingRowColors(True)
self.table_view.setSortingEnabled(True)
self.table_view.verticalHeader().setVisible(False)
self.table_view.setSelectionBehavior(QAbstractItemView.SelectRows)
self.table_view.setSelectionMode(QAbstractItemView.ExtendedSelection)
# 设置文本省略模式(超长文本显示省略号)
self.table_view.setTextElideMode(Qt.TextElideMode.ElideRight)
self.table_view.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu)
self.table_view.customContextMenuRequested.connect(self.show_table_context_menu)
self.table_view.setStyleSheet("""
QTableView::item:selected {
background-color: rgba(0, 120, 212, 0.18);
}
QTableView::item:hover {
background-color: rgba(0, 120, 212, 0.08);
}
QHeaderView::section {
background-color: #1f2937;
color: #ffffff;
padding: 8px;
border: none;
}
QHeaderView::section:hover {
background-color: #374151;
}
""")
self.table_view.setVisible(False)
table_layout.addWidget(self.table_view, 1) # stretch=1占据剩余空间
self.table_empty_label = QLabel("暂无数据请先导入Excel配置", None)
self.table_empty_label.setStyleSheet("color: #999; font-size: 12px;")
self.table_empty_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.table_empty_label.setVisible(True)
table_layout.addWidget(self.table_empty_label)
# 分页控件行(不设置 stretch固定在底部
pagination_row = QHBoxLayout()
pagination_row.setContentsMargins(0, 8, 0, 0)
pagination_row.addStretch()
self.page_size_combo = QComboBox()
self.page_size_combo.addItems(["10", "20", "50", "100"])
self.page_size_combo.setCurrentText(str(self.page_size))
self.page_size_combo.currentTextChanged.connect(self.change_page_size)
pagination_row.addWidget(QLabel("每页", None))
pagination_row.addWidget(self.page_size_combo)
self.page_info_label = QLabel("第 1/1 页", None)
pagination_row.addWidget(self.page_info_label)
self.page_first_btn = PushButton("首页", None)
self.page_prev_btn = PushButton("上一页", None)
self.page_next_btn = PushButton("下一页", None)
self.page_last_btn = PushButton("末页", None)
self.page_first_btn.clicked.connect(self.go_first_page)
self.page_prev_btn.clicked.connect(self.go_prev_page)
self.page_next_btn.clicked.connect(self.go_next_page)
self.page_last_btn.clicked.connect(self.go_last_page)
pagination_row.addWidget(self.page_first_btn)
pagination_row.addWidget(self.page_prev_btn)
pagination_row.addWidget(self.page_next_btn)
pagination_row.addWidget(self.page_last_btn)
table_layout.addLayout(pagination_row, 0) # stretch=0固定大小
self.table_group.setVisible(True)
self.table_group.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
# 配置 + 列表 分割布局
self.config_splitter = QSplitter(Qt.Orientation.Vertical)
self.config_splitter.setChildrenCollapsible(True)
self.config_splitter.addWidget(config_group)
self.config_splitter.addWidget(self.table_group)
self.config_splitter.setStretchFactor(0, 2)
self.config_splitter.setStretchFactor(1, 3)
self.config_splitter.splitterMoved.connect(self._save_splitter_sizes)
config_page_layout.addWidget(self.config_splitter)
self.page_stack.addWidget(config_page)
# 工作台标签页
self.workbench_tabs = QTabWidget()
self.workbench_tabs.addTab(config_page, "配置")
workbench_page = QWidget()
workbench_layout = QVBoxLayout(workbench_page)
workbench_layout.setContentsMargins(0, 0, 0, 0)
workbench_layout.addWidget(self.workbench_tabs)
self.page_stack.addWidget(workbench_page)
# 进度条(全局)
self.progress_bar = ProgressBar()
self.progress_bar.setVisible(False)
main_layout.addWidget(self.progress_bar)
# 日志显示区域
log_group = CardWidget()
log_layout = QVBoxLayout(log_group)
log_layout.setContentsMargins(12, 12, 12, 12)
log_header = QHBoxLayout()
log_title = QLabel("执行日志", None)
log_title.setFont(QFont("Microsoft YaHei", 11, QFont.Bold))
log_header.addWidget(log_title)
log_header.addStretch()
self.log_search_input = LineEdit()
self.log_search_input.setPlaceholderText("搜索日志")
self.log_search_input.setClearButtonEnabled(True)
self.log_search_input.textChanged.connect(self.filter_log)
log_header.addWidget(self.log_search_input)
self.log_highlight_check = CheckBox("高亮所有", None)
self.log_highlight_check.setChecked(True)
# noinspection PyUnresolvedReferences
self.log_highlight_check.stateChanged.connect(lambda: self.filter_log(self.log_search_input.text()))
log_header.addWidget(self.log_highlight_check)
self.log_whole_word = CheckBox("整词匹配", None)
# noinspection PyUnresolvedReferences
self.log_whole_word.stateChanged.connect(lambda: self.filter_log(self.log_search_input.text()))
log_header.addWidget(self.log_whole_word)
self.log_prev_btn = PushButton("上一个", None)
self.log_prev_btn.clicked.connect(lambda: self.find_log(backward=True))
log_header.addWidget(self.log_prev_btn)
self.log_next_btn = PushButton("下一个", None)
self.log_next_btn.clicked.connect(lambda: self.find_log(backward=False))
log_header.addWidget(self.log_next_btn)
self.log_match_status = QLabel("匹配: 0", None)
self.log_match_status.setStyleSheet("color: #666; font-size: 10px;")
log_header.addWidget(self.log_match_status)
# 初始化表格列宽比例
QTimer.singleShot(100, self._apply_table_column_widths)
self.log_export_btn = PushButton("导出日志", None)
self.log_export_btn.clicked.connect(self.export_log)
log_header.addWidget(self.log_export_btn)
self.clear_log_btn = PushButton("清空日志", None)
self.clear_log_btn.clicked.connect(self.clear_log)
log_header.addWidget(self.clear_log_btn)
log_layout.addLayout(log_header)
self.log_text = TextEdit()
self.log_text.setReadOnly(True)
self.log_text.setFont(QFont("Consolas", 10))
log_layout.addWidget(self.log_text)
log_page = QWidget()
log_page_layout = QVBoxLayout(log_page)
log_page_layout.setContentsMargins(0, 0, 0, 0)
log_page_layout.addWidget(log_group)
self.page_stack.addWidget(log_page)
# 配置日志输出保留控制台输出GUI通过信号接收
logger.remove()
logger.add(lambda msg: None) # 禁用默认输出通过信号在GUI中显示
# 默认选中工作台
self.nav_main.setChecked(True)
self.page_stack.setCurrentIndex(0)
self.nav_group.buttonClicked[int].connect(self.switch_page)
self._restore_splitter_sizes()
# 快捷键
self.shortcut_log_next = QShortcut(QKeySequence("F3"), self)
self.shortcut_log_next.activated.connect(lambda: self.find_log(backward=False))
self.shortcut_log_prev = QShortcut(QKeySequence("Shift+F3"), self)
self.shortcut_log_prev.activated.connect(lambda: self.find_log(backward=True))
# 程序启动时重置状态(不累计历史数据)
self.set_status_cards(update_text="未更新", pending=0, running=0, success=0, failed=0)
def _build_status_card(self, title, value_label, icon, bg_color, subtitle, with_progress=False, clickable=False):
"""创建状态卡片"""
card = CardWidget()
if clickable:
card.setCursor(Qt.CursorShape.PointingHandCursor)
card.setToolTip(subtitle)
layout = QVBoxLayout(card)
layout.setContentsMargins(12, 10, 12, 10)
card.setStyleSheet(f"background-color: {bg_color};")
icon_label = QLabel()
icon_label.setPixmap(icon.pixmap(16, 16))
title_row = QHBoxLayout()
title_label = QLabel(title)
title_label.setFont(QFont("Microsoft YaHei", 9))
title_row.addWidget(icon_label)
title_row.addWidget(title_label)
title_row.addStretch()
subtitle_label = QLabel(subtitle)
subtitle_label.setStyleSheet("color: #666; font-size: 10px;")
value_label.setFont(QFont("Microsoft YaHei", 12, QFont.Bold))
layout.addLayout(title_row)
layout.addWidget(value_label)
layout.addWidget(subtitle_label)
if with_progress:
self.status_running_progress = ProgressBar()
self.status_running_progress.setRange(0, 100)
self.status_running_progress.setValue(0)
self.status_running_progress.setTextVisible(False)
self.status_running_progress.setFixedHeight(6)
layout.addWidget(self.status_running_progress)
return card
def switch_page(self, page_index):
"""切换侧边导航页面"""
self.page_stack.setCurrentIndex(page_index)
def eventFilter(self, obj, event):
"""事件过滤器处理点击空白区域和按Esc键退出编辑状态"""
# 处理表格viewport的鼠标点击事件
if obj == self.config_table.viewport() and event.type() == QEvent.Type.MouseButtonPress:
# 获取点击位置对应的单元格
index = self.config_table.indexAt(event.pos())
if not index.isValid():
# 点击了空白区域,退出编辑状态
self._exit_table_edit_mode()
return False # 继续传递事件
# 处理表格的键盘事件Esc键退出编辑
if obj == self.config_table and event.type() == QEvent.Type.KeyPress:
if event.key() == Qt.Key.Key_Escape:
self._exit_table_edit_mode()
return True # 阻止事件继续传递
return super().eventFilter(obj, event)
def _exit_table_edit_mode(self):
"""退出表格编辑状态"""
self.config_table.clearSelection()
self.config_table.clearFocus()
# 关闭当前编辑器
current_item = self.config_table.currentItem()
if current_item:
self.config_table.closePersistentEditor(current_item)
self.config_table.setCurrentItem(None)
# 恢复为只读模式
self.config_table.setEditTriggers(TableWidget.NoEditTriggers)
self._cleanup_edit_mode_state()
def _set_row_highlight(self, row, enabled):
"""高亮/取消高亮某一行"""
highlight_color = QColor(255, 247, 216)
for col in range(1, 10): # 数据列 1-9
item = self.config_table.item(row, col)
if not item:
continue
if enabled:
item.setBackground(highlight_color)
else:
item.setData(Qt.ItemDataRole.BackgroundRole, None)
def _set_other_rows_locked(self, edit_row, locked):
"""锁定/解锁编辑行以外的行"""
for r in range(self.config_table.rowCount()):
if r == edit_row:
# 编辑行保持可用
continue
# 操作列按钮禁用/启用
action_widget = self.config_table.cellWidget(r, 11)
if action_widget:
action_widget.setEnabled(not locked)
# 数据列禁用/启用
for col in range(1, 10): # 数据列 1-9
item = self.config_table.item(r, col)
if not item:
continue
if locked:
item.setFlags(item.flags() & ~Qt.ItemFlag.ItemIsEnabled)
else:
item.setFlags((item.flags() | Qt.ItemFlag.ItemIsEnabled) & ~Qt.ItemFlag.ItemIsEditable)
def _cleanup_edit_mode_state(self):
"""清理编辑态UI状态"""
edit_row = getattr(self, '_editing_row', None)
if edit_row is None:
return
self._set_row_highlight(edit_row, False)
self._set_other_rows_locked(edit_row, False)
self._editing_row = None
if getattr(self, "table_edit_hint", None):
self.table_edit_hint.setVisible(False)
if hasattr(self, '_edit_selection_mode_backup'):
self.config_table.setSelectionMode(self._edit_selection_mode_backup)
def on_table_item_changed(self, item):
"""表格内容变更回调"""
try:
if not item:
return
# 防止递归调用:如果正在更新中,跳过
if self.is_updating_table:
return
# 检查 item 是否仍然有效(避免访问已删除的对象)
try:
row = item.row()
col = item.column()
text = item.text()
except RuntimeError:
# QTableWidgetItem 已被删除
return
# 第8列情况列联动进度列的显示
if col == 8:
# 设置标志,防止递归
self.is_updating_table = True
try:
self._set_status_item(row, text)
self._set_progress_item(row, text)
finally:
self.is_updating_table = False
# 其它可编辑列:同步当前行到 configs
self._sync_config_from_row(row)
except Exception as e:
logger.warning(f"表格项改变回调出错: {e}")
# 确保标志被重置
self.is_updating_table = False
def _set_checkbox_item(self, row, config_index, row_height=None):
"""设置勾选框列第0列。row_height 传入时使用固定高度+居中对齐,避免第二行起勾选框往下错位。"""
checkbox = QCheckBox()
checkbox.blockSignals(True)
checkbox.setChecked(self.configs[config_index].get('勾选', False)) # 默认不勾选
checkbox.blockSignals(False)
# noinspection PyUnresolvedReferences
checkbox.stateChanged.connect(lambda state, idx=config_index: self._on_checkbox_changed(idx, state))
checkbox.setStyleSheet(
"QCheckBox { margin: 0px; padding: 0px; }"
"QCheckBox::indicator { width: 20px; height: 20px; }"
)
wrapper = QWidget()
h = row_height if (row_height is not None and row_height > 0) else 42
wrapper.setFixedHeight(h)
wrapper.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
layout = QVBoxLayout(wrapper)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(checkbox)
self.config_table.setCellWidget(row, 0, wrapper)
def _sync_checked_from_table(self):
"""从表格当前页的勾选框同步勾选状态到 configs仅非 model_view 且当前页)"""
if self.use_model_view or not self.page_row_indices:
return
for table_row, config_index in enumerate(self.page_row_indices):
if config_index >= len(self.configs):
continue
w = self.config_table.cellWidget(table_row, 0)
if not w:
continue
checkbox = w.findChild(QCheckBox) if hasattr(w, 'findChild') else None
if checkbox is not None:
self.configs[config_index]['勾选'] = checkbox.isChecked()
def _on_checkbox_changed(self, config_index, state):
"""勾选框状态改变回调"""
if config_index < len(self.configs):
self.configs[config_index]['勾选'] = (state == Qt.CheckState.Checked)
# 更新已选数量显示
self._update_checked_count()
# 更新状态统计
self._update_status_statistics()
def _update_checked_count(self):
"""更新已勾选的数量显示"""
if not hasattr(self, "table_select_count"):
return
checked_count = sum(1 for config in self.configs if config.get('勾选', False))
self.table_select_count.setText(f"已选: {checked_count}")
def _clear_filter_and_selection(self):
"""清空筛选并取消所有勾选"""
for config in self.configs:
config['勾选'] = False
if hasattr(self, 'table_select_all_checkbox'):
self.table_select_all_checkbox.setChecked(False)
self._current_status_filter = None
self.filtered_config_indices = None
# 清空搜索框会触发 filter_table(""),从而刷新为全量显示
self.table_search_input.setText("")
self._update_checked_count()
self._show_infobar("info", "已清空", "筛选条件和勾选已清空")
def toggle_all_checkboxes(self):
"""全选/取消全选 - 跨页操作所有数据(考虑筛选条件)"""
is_checked = self.table_select_all_checkbox.isChecked()
visible_count = 0
# 有筛选时只操作筛选结果filtered_config_indices否则操作全部
indices = getattr(self, 'filtered_config_indices', None)
if indices is not None:
for config_index in indices:
if 0 <= config_index < len(self.configs):
self.configs[config_index]['勾选'] = is_checked
visible_count += 1
else:
for config_index in range(len(self.configs)):
self.configs[config_index]['勾选'] = is_checked
visible_count += 1
self.update_table()
self._update_checked_count()
self._update_status_statistics()
if hasattr(self, 'table_search_input') and self.table_search_input.text().strip():
self._suppress_filter_clear_once = True
self.filter_table(self.table_search_input.text())
if is_checked:
self._show_infobar("success", "提示", f"已勾选 {visible_count} 行(跨页操作)")
else:
self._show_infobar("success", "提示", f"已取消 {visible_count} 行勾选(跨页操作)")
@staticmethod
def _create_centered_item(text):
"""创建居中对齐的表格单元格"""
item = QTableWidgetItem(str(text))
item.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
# 默认不可编辑,必须通过编辑按钮进入编辑模式
item.setFlags(item.flags() & ~Qt.ItemFlag.ItemIsEditable)
return item
def _set_status_item(self, row, text):
"""设置状态列文本"""
try:
# 使用 blockSignals 临时阻止信号,防止递归
self.config_table.blockSignals(True)
try:
item = QTableWidgetItem(text)
item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) # 居中对齐
item.setFlags(item.flags() & ~Qt.ItemFlag.ItemIsEditable)
# 根据状态设置文字颜色
if "完成" in text or "成功" in text:
item.setForeground(QColor("#155724")) # 绿色
elif "失败" in text or "错误" in text:
item.setForeground(QColor("#721C24")) # 红色
elif "执行中" in text or "进行" in text:
item.setForeground(QColor("#0C5460")) # 蓝色
self._apply_current_highlight_to_item(row, 8, item, text)
self.config_table.setItem(row, 8, item)
finally:
# 恢复信号
self.config_table.blockSignals(False)
except Exception as e:
logger.warning(f"设置状态项失败: {e}")
# 确保信号恢复
self.config_table.blockSignals(False)
def _set_progress_item(self, row, status_text):
"""设置进度列"""
progress = ProgressBar()
progress.setTextVisible(False)
progress.setFixedHeight(8)
progress.setRange(0, 100)
value = 0
if "完成" in status_text or "成功" in status_text:
value = 100
elif "执行中" in status_text or "进行" in status_text:
value = 60
elif "" in status_text:
value = 10
progress.setValue(value)
self.config_table.setCellWidget(row, 10, progress)
def _set_action_buttons(self, row, config_index):
"""设置操作列按钮(编辑和删除)"""
wrapper = QWidget()
layout = QHBoxLayout(wrapper)
layout.setContentsMargins(2, 0, 2, 0)
layout.setSpacing(2)
edit_btn = PushButton("编辑", None)
edit_btn.setFixedWidth(52)
edit_btn.setFixedHeight(32)
edit_btn.setFont(QFont("Microsoft YaHei", 10))
delete_btn = PushButton("删除", None)
delete_btn.setFixedWidth(52)
delete_btn.setFixedHeight(32)
delete_btn.setFont(QFont("Microsoft YaHei", 10))
# 使用默认参数捕获当前值,避免闭包问题
edit_btn.clicked.connect(lambda checked, r=row, idx=config_index: self._enter_edit_mode(r, idx))
delete_btn.clicked.connect(lambda checked, idx=config_index: self.delete_row_by_index(idx))
layout.addWidget(edit_btn)
layout.addWidget(delete_btn)
self.config_table.setCellWidget(row, 11, wrapper)
def _set_edit_mode_buttons(self, row, config_index):
"""设置编辑模式按钮(确认和退出)"""
wrapper = QWidget()
layout = QHBoxLayout(wrapper)
layout.setContentsMargins(2, 0, 2, 0)
layout.setSpacing(2)
confirm_btn = PushButton("确认", None)
confirm_btn.setFixedWidth(52)
confirm_btn.setFixedHeight(32)
confirm_btn.setFont(QFont("Microsoft YaHei", 10))
cancel_btn = PushButton("退出", None)
cancel_btn.setFixedWidth(52)
cancel_btn.setFixedHeight(32)
cancel_btn.setFont(QFont("Microsoft YaHei", 10))
# 使用默认参数捕获当前值
confirm_btn.clicked.connect(lambda checked, r=row, idx=config_index: self._confirm_edit(r, idx))
cancel_btn.clicked.connect(lambda checked, r=row, idx=config_index: self._cancel_edit(r, idx))
layout.addWidget(confirm_btn)
layout.addWidget(cancel_btn)
self.config_table.setCellWidget(row, 11, wrapper)
def _enter_edit_mode(self, row, config_index):
"""进入编辑模式"""
if row < 0 or row >= self.config_table.rowCount():
return
# 允许当前行进入编辑时的交互触发
if not hasattr(self, '_edit_triggers_backup'):
self._edit_triggers_backup = self.config_table.editTriggers()
self.config_table.setEditTriggers(QAbstractItemView.DoubleClicked | QAbstractItemView.SelectedClicked)
if not hasattr(self, '_edit_selection_mode_backup'):
self._edit_selection_mode_backup = self.config_table.selectionMode()
self.config_table.setSelectionMode(QAbstractItemView.SingleSelection)
self._editing_row = row
# 保存原始数据用于还原
if not hasattr(self, '_edit_backup'):
self._edit_backup = {}
original_data = {}
for col in range(1, 10): # 1-9列是数据列
item = self.config_table.item(row, col)
original_data[col] = item.text() if item else ""
self._edit_backup[row] = original_data
# 启用该行的编辑
for col in range(1, 10):
item = self.config_table.item(row, col)
if item:
item.setFlags(item.flags() | Qt.ItemFlag.ItemIsEditable)
# 高亮编辑行并锁定其他行
self._set_row_highlight(row, True)
self._set_other_rows_locked(row, True)
# 切换到编辑模式按钮
self._set_edit_mode_buttons(row, config_index)
# 自动聚焦第一个可编辑单元格
first_item = None
for col in range(1, 10):
item = self.config_table.item(row, col)
if item and (item.flags() & Qt.ItemFlag.ItemIsEditable):
first_item = item
self.config_table.setCurrentCell(row, col)
break
if first_item:
self.config_table.setFocus(Qt.FocusReason.TabFocusReason)
self.config_table.scrollToItem(first_item, QAbstractItemView.PositionAtCenter)
self.config_table.editItem(first_item)
if self.table_edit_hint:
self.table_edit_hint.setVisible(True)
self._show_infobar("info", "编辑模式", f"正在编辑第 {row + 1} 行,修改后点击确认保存或退出还原")
def _confirm_edit(self, row, config_index):
"""确认编辑并保存"""
if row < 0 or row >= self.config_table.rowCount():
return
# 同步表格数据到configs
self._sync_config_from_row(row)
# 禁用该行的编辑
for col in range(1, 10):
item = self.config_table.item(row, col)
if item:
item.setFlags(item.flags() & ~Qt.ItemFlag.ItemIsEditable)
# 清除备份
if hasattr(self, '_edit_backup') and row in self._edit_backup:
del self._edit_backup[row]
# 恢复操作按钮
self._set_action_buttons(row, config_index)
self._show_infobar("success", "保存成功", f"{row + 1} 行数据已保存")
# 退出编辑模式后恢复为只读
self.config_table.setEditTriggers(TableWidget.NoEditTriggers)
self._cleanup_edit_mode_state()
def _cancel_edit(self, row, config_index):
"""取消编辑并还原数据"""
if row < 0 or row >= self.config_table.rowCount():
return
# 还原原始数据
if hasattr(self, '_edit_backup') and row in self._edit_backup:
original_data = self._edit_backup[row]
# 临时断开信号防止触发同步
self.is_updating_table = True
try:
for col, value in original_data.items():
if col == 8:
self._set_status_item(row, value)
self._set_progress_item(row, value)
else:
item = self.config_table.item(row, col)
if item:
item.setText(value)
# 确保清除编辑标志
item = self.config_table.item(row, col)
if item:
item.setFlags(item.flags() & ~Qt.ItemFlag.ItemIsEditable)
finally:
self.is_updating_table = False
del self._edit_backup[row]
# 恢复操作按钮
self._set_action_buttons(row, config_index)
self._show_infobar("info", "已退出", f"{row + 1} 行数据已还原")
# 退出编辑模式后恢复为只读
self.config_table.setEditTriggers(TableWidget.NoEditTriggers)
self._cleanup_edit_mode_state()
def delete_row_by_index(self, row):
"""删除指定行"""
if row < 0 or row >= len(self.configs):
return
self.configs.pop(row)
self.update_table()
def _sync_config_from_row(self, row):
"""同步表格行到configs"""
if row < 0 or row >= self.config_table.rowCount():
return
if not self.page_row_indices:
return
if row >= len(self.page_row_indices):
return
config_index = self.page_row_indices[row]
if config_index >= len(self.configs):
return
# 第0列是勾选框数据列从第1列开始
def cell(col):
item = self.config_table.item(row, col + 1)
return item.text().strip() if item else ""
self.configs[config_index].update({
"多多id": cell(0),
"序号": cell(1),
"话题": cell(2),
"定时发布": cell(3),
"间隔时间": cell(4),
"达人链接": cell(5),
"执行人": cell(6),
"情况": cell(7) or "待执行",
"文件路径": cell(8),
})
def _sync_configs_from_table(self):
"""从表格同步当前页配置(跳过空行)"""
if not self.page_row_indices:
return
for row, config_index in enumerate(self.page_row_indices):
if config_index >= len(self.configs):
continue
# 第0列是勾选框数据列从第1列开始
def cell(col):
item = self.config_table.item(row, col + 1)
return item.text().strip() if item else ""
# 判断是否为空行(避免把空白占位行写回)
values = [cell(i) for i in range(1, 10)]
if not any(values):
continue
self.configs[config_index].update({
"多多id": cell(0),
"序号": cell(1),
"话题": cell(2),
"定时发布": cell(3),
"间隔时间": cell(4),
"达人链接": cell(5),
"执行人": cell(6),
"情况": cell(7) or "待执行",
"文件路径": cell(8),
})
def _save_splitter_sizes(self):
"""保存分割器尺寸"""
if not hasattr(self, "config_splitter"):
return
settings = QSettings("haha", "gui_app")
settings.setValue("config_splitter_sizes", self.config_splitter.sizes())
def _restore_splitter_sizes(self):
"""恢复分割器尺寸"""
if not hasattr(self, "config_splitter"):
return
settings = QSettings("haha", "gui_app")
sizes = settings.value("config_splitter_sizes")
if sizes:
self.config_splitter.setSizes([int(s) for s in sizes])
else:
# 首次默认:配置区偏大
self.config_splitter.setSizes([450, 550])
def change_page_size(self, value):
"""修改分页大小"""
try:
self.page_size = int(value)
except ValueError:
self.page_size = 20
self.current_page = 1
self.update_table()
def go_first_page(self):
"""首页"""
self.current_page = 1
self.update_table()
def go_prev_page(self):
"""上一页"""
if self.current_page > 1:
self.current_page -= 1
self.update_table()
def go_next_page(self):
"""下一页"""
total_rows = len(self.configs)
total_pages = max(1, (total_rows + self.page_size - 1) // self.page_size)
if self.current_page < total_pages:
self.current_page += 1
self.update_table()
def go_last_page(self):
"""末页"""
total_rows = len(self.configs)
total_pages = max(1, (total_rows + self.page_size - 1) // self.page_size)
self.current_page = total_pages
self.update_table()
def on_header_clicked(self, logical_index):
"""处理多列排序Ctrl 多选)"""
# 0列为勾选框10/11列为进度/操作,禁止排序
if logical_index == 0 or logical_index >= 10:
return
modifiers = QApplication.keyboardModifiers()
is_multi = modifiers & Qt.KeyboardModifier.ControlModifier
existing: Optional[int] = next((i for i, (col, _) in enumerate(self.table_sort_keys) if col == logical_index), None)
if existing is not None:
col, order = self.table_sort_keys[int(existing)]
new_order = Qt.SortOrder.DescendingOrder if order == Qt.SortOrder.AscendingOrder else Qt.SortOrder.AscendingOrder
self.table_sort_keys[int(existing)] = (col, new_order)
else:
if not is_multi:
self.table_sort_keys = []
self.table_sort_keys.append((logical_index, Qt.SortOrder.AscendingOrder))
self._sort_table_by_keys()
def _sort_table_by_keys(self):
"""按多列排序表格"""
if not self.table_sort_keys:
return
self._sync_configs_from_table()
key_map = {
1: "多多id",
2: "序号",
3: "话题",
4: "定时发布",
5: "间隔时间",
6: "达人链接",
7: "执行人",
8: "情况",
9: "文件路径",
}
# 数字类型的列(需要按数字排序)
numeric_fields = {"序号", "间隔时间"}
def get_sort_key(cfg, field):
"""获取排序键值,数字列按数字排序"""
value = cfg.get(field, "")
if field in numeric_fields:
try:
# 尝试转换为数字进行排序
return (0, float(str(value)) if value else 0)
except (ValueError, TypeError):
# 转换失败则作为字符串排序,排在数字后面
return (1, str(value))
return (0, str(value))
for col, order in reversed(self.table_sort_keys):
sort_field = key_map.get(col, "")
self.configs.sort(
key=lambda cfg, f=sort_field: get_sort_key(cfg, f),
reverse=(order == Qt.SortOrder.DescendingOrder)
)
self.update_table()
def show_table_context_menu(self, pos):
"""表格右键菜单"""
menu = QMenu(self)
copy_action = menu.addAction("复制选中行")
export_csv_action = menu.addAction("导出选中行CSV")
export_excel_action = menu.addAction("导出选中行Excel")
delete_action = menu.addAction("删除选中行")
view = self.table_view if self.use_model_view else self.config_table
action = menu.exec_(view.viewport().mapToGlobal(pos))
if action == copy_action:
self.copy_selected_rows()
elif action == delete_action:
self.delete_selected_rows()
elif action == export_csv_action:
self.export_selected_rows("csv")
elif action == export_excel_action:
self.export_selected_rows("excel")
def copy_selected_rows(self):
"""复制选中行到剪贴板"""
if self.use_model_view:
if not self.table_view.selectionModel():
return
rows = [idx.row() for idx in self.table_view.selectionModel().selectedRows()]
if not rows:
return
lines = []
for row in sorted(set(rows)):
values = []
for col in range(9):
idx = self.table_proxy.index(row, col)
values.append(str(self.table_proxy.data(idx)))
lines.append("\t".join(values))
QApplication.clipboard().setText("\n".join(lines))
return
ranges = self.config_table.selectedRanges()
if not ranges:
return
rows = set()
for r in ranges:
rows.update(range(r.topRow(), r.bottomRow() + 1))
lines = []
for row in sorted(rows):
values = []
for col in range(self.config_table.columnCount()):
item = self.config_table.item(row, col)
values.append(item.text() if item else "")
lines.append("\t".join(values))
QApplication.clipboard().setText("\n".join(lines))
def delete_selected_rows(self):
"""删除选中行"""
if self.use_model_view and self.table_view.selectionModel():
rows = sorted({idx.row() for idx in self.table_view.selectionModel().selectedRows()}, reverse=True)
if not rows:
return
for row in rows:
src_index = self.table_proxy.mapToSource(self.table_proxy.index(row, 0))
if src_index.isValid():
self.table_model.removeRows(src_index.row(), 1)
self._update_checked_count()
return
ranges = self.config_table.selectedRanges()
if not ranges:
return
rows = set()
for r in ranges:
rows.update(range(r.topRow(), r.bottomRow() + 1))
for row in sorted(rows, reverse=True):
self.config_table.removeRow(row)
if row < len(self.configs):
self.configs.pop(row)
self.update_table()
def export_selected_rows(self, fmt):
"""导出选中行"""
if self.use_model_view:
if not self.table_view.selectionModel():
self._show_infobar("warning", "提示", "未选择任何行")
return
rows = [idx.row() for idx in self.table_view.selectionModel().selectedRows()]
if not rows:
self._show_infobar("warning", "提示", "未选择任何行")
return
data = []
headers = [self.table_proxy.headerData(i, Qt.Orientation.Horizontal) for i in range(9)]
for row in sorted(set(rows)):
row_data = []
for col in range(9):
idx = self.table_proxy.index(row, col)
row_data.append(str(self.table_proxy.data(idx)))
data.append(row_data)
df = pd.DataFrame(data, columns=headers)
if fmt == "csv":
file_path, _ = QFileDialog.getSaveFileName(self, "导出CSV", "selected.csv", "CSV (*.csv)")
if not file_path:
return
df.to_csv(file_path, index=False, encoding="utf-8-sig")
self._show_infobar("success", "成功", f"已导出CSV: {file_path}")
else:
file_path, _ = QFileDialog.getSaveFileName(self, "导出Excel", "selected.xlsx", "Excel (*.xlsx)")
if not file_path:
return
try:
df.to_excel(file_path, index=False)
self._show_infobar("success", "成功", f"已导出Excel: {file_path}")
except Exception as e:
self._show_infobar("error", "错误", f"导出失败: {str(e)}")
return
ranges = self.config_table.selectedRanges()
if not ranges:
self._show_infobar("warning", "提示", "未选择任何行")
return
rows = set()
for r in ranges:
rows.update(range(r.topRow(), r.bottomRow() + 1))
data = []
headers = [self.config_table.horizontalHeaderItem(i).text() for i in range(1, 10)]
for row in sorted(rows):
if not self.page_row_indices or row >= len(self.page_row_indices):
continue
config_index = self.page_row_indices[row]
row_data = []
for col in range(1, 10):
item = self.config_table.item(row, col)
row_data.append(item.text() if item else "")
data.append(row_data)
df = pd.DataFrame(data, columns=headers)
if fmt == "csv":
file_path, _ = QFileDialog.getSaveFileName(self, "导出CSV", "selected.csv", "CSV (*.csv)")
if not file_path:
return
df.to_csv(file_path, index=False, encoding="utf-8-sig")
self._show_infobar("success", "成功", f"已导出CSV: {file_path}")
else:
file_path, _ = QFileDialog.getSaveFileName(self, "导出Excel", "selected.xlsx", "Excel (*.xlsx)")
if not file_path:
return
try:
df.to_excel(file_path, index=False)
self._show_infobar("success", "成功", f"已导出Excel: {file_path}")
except Exception as e:
self._show_infobar("error", "错误", f"导出失败: {str(e)}")
def export_all_rows(self):
"""导出全部数据"""
if self.config_table.rowCount() == 0:
self._show_infobar("warning", "提示", "没有可导出的数据")
return
if self.use_model_view and self.table_proxy:
total_rows = self.table_proxy.rowCount()
headers = [self.table_proxy.headerData(i, Qt.Orientation.Horizontal) for i in range(9)]
data = []
for row in range(total_rows):
row_data = []
for col in range(9):
idx = self.table_proxy.index(row, col)
row_data.append(str(self.table_proxy.data(idx)))
data.append(row_data)
df = pd.DataFrame(data, columns=headers)
file_path, selected_filter = QFileDialog.getSaveFileName(
self, "导出数据", "all.csv", "CSV (*.csv);;Excel (*.xlsx)"
)
if not file_path:
return
try:
if file_path.lower().endswith(".xlsx") or "Excel" in selected_filter:
df.to_excel(file_path, index=False)
else:
df.to_csv(file_path, index=False, encoding="utf-8-sig")
self._show_infobar("success", "成功", f"已导出: {file_path}")
except Exception as e:
self._show_infobar("error", "错误", f"导出失败: {str(e)}")
return
headers = [self.config_table.horizontalHeaderItem(i).text() for i in range(1, 10)]
data = []
for row in range(self.config_table.rowCount()):
row_data = []
for col in range(1, 10):
item = self.config_table.item(row, col)
row_data.append(item.text() if item else "")
data.append(row_data)
df = pd.DataFrame(data, columns=headers)
file_path, selected_filter = QFileDialog.getSaveFileName(
self, "导出数据", "all.csv", "CSV (*.csv);;Excel (*.xlsx)"
)
if not file_path:
return
try:
if file_path.lower().endswith(".xlsx") or "Excel" in selected_filter:
df.to_excel(file_path, index=False)
else:
df.to_csv(file_path, index=False, encoding="utf-8-sig")
self._show_infobar("success", "成功", f"已导出: {file_path}")
except Exception as e:
self._show_infobar("error", "错误", f"导出失败: {str(e)}")
def toggle_nav_compact(self):
"""切换侧边导航收起/展开"""
self.nav_compact = not self.nav_compact
if self.nav_compact:
self.nav_card.setFixedWidth(60)
for btn in self.nav_buttons:
btn.setToolTip(btn.text())
btn.setText("")
self.nav_toggle_btn.setText("展开")
self.nav_title.setVisible(False)
self.nav_divider.setVisible(False)
self.nav_footer.setVisible(False)
else:
self.nav_card.setFixedWidth(150)
labels = ["工作台", "日志"]
for btn, label in zip(self.nav_buttons, labels):
btn.setText(label)
btn.setToolTip("")
self.nav_toggle_btn.setText("收起")
self.nav_title.setVisible(True)
self.nav_divider.setVisible(True)
self.nav_footer.setVisible(True)
def filter_table(self, text):
"""筛选表格行并高亮关键词"""
keyword_raw = text.strip()
suppress_clear = getattr(self, "_suppress_filter_clear_once", False)
if suppress_clear:
self._suppress_filter_clear_once = False
# 清除状态筛选,确保文本筛选基于全量数据
if hasattr(self, '_current_status_filter') and self._current_status_filter:
self._current_status_filter = None
self.filtered_config_indices = None
if not self.use_model_view and hasattr(self, 'config_table'):
for row in range(self.config_table.rowCount()):
self.config_table.setRowHidden(row, False)
# 如果先全选再进行筛选,先清空勾选,避免隐藏行仍保持勾选
if (not suppress_clear and keyword_raw and hasattr(self, "table_select_all_checkbox")
and self.table_select_all_checkbox.isChecked()):
for config in self.configs:
config['勾选'] = False
self.table_select_all_checkbox.blockSignals(True)
self.table_select_all_checkbox.setChecked(False)
self.table_select_all_checkbox.blockSignals(False)
self._update_checked_count()
self._update_status_statistics()
# 同步当前页勾选框显示
if not self.use_model_view and hasattr(self, "config_table"):
for row in range(self.config_table.rowCount()):
wrapper = self.config_table.cellWidget(row, 0)
if wrapper:
checkbox = wrapper.findChild(QCheckBox)
if checkbox:
checkbox.blockSignals(True)
checkbox.setChecked(False)
checkbox.blockSignals(False)
if self.use_model_view:
if not self.table_proxy:
return
if not keyword_raw:
self.table_proxy.setFilterRegularExpression(QRegularExpression())
return
terms = [re.escape(t) for t in keyword_raw.split() if t]
if not terms:
pattern = ""
else:
pattern = "".join([f"(?=.*{t})" for t in terms]) + ".*"
regex = QRegularExpression(pattern, QRegularExpression.CaseInsensitiveOption)
column_index = self._filter_model_column_index()
self.table_proxy.setFilterKeyColumn(column_index)
self.table_proxy.setFilterRegularExpression(regex)
return
if not self.config_table:
return
# 表格列与 config 键的对应(列 1~9
_table_config_keys = ('多多id', '序号', '话题', '定时发布', '间隔时间', '达人链接', '执行人', '情况', '文件路径')
if not keyword_raw:
# 清空筛选:在全量数据上显示,并清除高亮
self.filtered_config_indices = None
self.current_page = 1
self.update_table()
for row in range(self.config_table.rowCount()):
for col in range(self.config_table.columnCount()):
item = self.config_table.item(row, col)
if item:
item.setBackground(self._default_color())
self.config_table.setRowHidden(row, False)
self.table_match_rows = []
self.table_match_index = -1
return
# 在全量 configs 上计算匹配的配置索引(不限于当前页)
terms_raw = [t for t in keyword_raw.split() if t]
terms_lower = [t.lower() for t in terms_raw]
column_index = self._filter_column_index()
matching_indices = []
for i, config in enumerate(self.configs):
if column_index >= 1 and column_index <= 9:
key = _table_config_keys[column_index - 1]
cell_text = str(config.get(key, ''))
else:
cell_text = ' '.join(str(config.get(k, '')) for k in _table_config_keys)
cell_compare = cell_text.lower()
if all(term in cell_compare for term in terms_lower):
matching_indices.append(i)
only_match = self.table_only_match.isChecked() if hasattr(self, 'table_only_match') and self.table_only_match else False
if only_match:
# 仅显示匹配项:只显示匹配的数据,表格行数=当前页数据行,不出现空行或其它数据
self.filtered_config_indices = matching_indices
else:
# 显示全部,只做高亮
self.filtered_config_indices = None
self.current_page = 1
self.update_table()
# 高亮当前页中的匹配单元格(仅显示匹配项时当前页全是匹配行;显示全部时只高亮匹配的格)
for row in range(self.config_table.rowCount()):
for col in range(self.config_table.columnCount()):
if column_index >= 0 and col != column_index:
try:
item = self.config_table.item(row, col)
if item:
item.setBackground(self._default_color())
except RuntimeError:
pass
continue
try:
item = self.config_table.item(row, col)
if item:
cell_text = item.text()
cell_compare = cell_text.lower()
if all(term in cell_compare for term in terms_lower) and self.table_highlight.isChecked():
item.setBackground(self._highlight_color())
else:
item.setBackground(self._default_color())
except RuntimeError:
continue
self.table_match_rows = list(range(self.config_table.rowCount()))
self.table_match_index = -1
def next_table_match(self):
"""跳转到下一条匹配"""
if not self.table_match_rows:
return
if self.table_match_index < 0:
self.table_match_index = 0
else:
self.table_match_index = (self.table_match_index + 1) % len(self.table_match_rows)
row = self.table_match_rows[self.table_match_index]
self.config_table.selectRow(row)
item = self.config_table.item(row, 0) or self.config_table.item(row, 1)
if item:
self.config_table.scrollToItem(item)
def prev_table_match(self):
"""跳转到上一条匹配"""
if not self.table_match_rows:
return
if self.table_match_index < 0:
self.table_match_index = len(self.table_match_rows) - 1
else:
self.table_match_index = (self.table_match_index - 1) % len(self.table_match_rows)
row = self.table_match_rows[self.table_match_index]
self.config_table.selectRow(row)
item = self.config_table.item(row, 0) or self.config_table.item(row, 1)
if item:
self.config_table.scrollToItem(item)
def _apply_schedule_intervals(self, configs_with_indices):
"""按多多ID应用定时发布+间隔时间规则
configs_with_indices: [{"config_index": config_index, "config": config}, ...]
"""
from collections import defaultdict
grouped = defaultdict(list)
for item in configs_with_indices:
config = item["config"]
user_id = config.get("多多id", "")
if not user_id:
continue
grouped[user_id].append(item)
updated_count = 0
for user_id, items in grouped.items():
# 按原始配置列表中的顺序处理
items.sort(key=lambda x: x["config_index"])
base_time = None # 上一条的发布时间(基准时间)
for entry in items:
config = entry["config"]
config_index = entry["config_index"]
schedule_text = (config.get("定时发布") or "").strip()
interval_value = config.get("间隔时间", 0)
# 解析间隔时间(分钟转秒)
current_interval = self._parse_interval_seconds(interval_value)
# 解析定时时间
parsed_time = self._parse_schedule_time(schedule_text) if schedule_text else None
# 情况1当前条目有定时时间 -> 使用该定时时间,并记录为基准时间
if parsed_time:
base_time = parsed_time
# 情况2当前条目没有定时时间但有间隔时间
elif not parsed_time and current_interval > 0:
if base_time:
# 有基准时间 -> 新时间 = 基准时间 + 间隔时间
base_time = base_time + timedelta(seconds=current_interval)
new_text = self._format_schedule_time(base_time)
# 更新配置和表格单元格(如果可见)
self._update_table_cell(config_index, 4, new_text, highlight=True, is_config_index=True)
updated_count += 1
return updated_count
@staticmethod
def _parse_schedule_time(text):
"""解析定时发布时间字符串"""
if not text:
return None
try:
dt = pd.to_datetime(text, errors="coerce")
if pd.isna(dt):
return None
return dt.to_pydatetime()
except (ValueError, TypeError, AttributeError):
return None
def _is_schedule_time_expired(self, time_start):
"""检查定时发布时间是否已过期(早于当前时间)
Args:
time_start: 定时发布时间字符串,如 "2026-01-15 09:30:00"
Returns:
bool: 如果时间已过期返回True否则返回False。如果时间为空或无法解析返回False不跳过
"""
if not time_start:
return False
try:
# 解析时间字符串
schedule_dt = self._parse_schedule_time(time_start)
if not schedule_dt:
return False
# 与当前时间比较
now = datetime.now()
if schedule_dt < now:
return True
return False
except (ValueError, TypeError, AttributeError) as e:
logger.warning(f"检查定时发布时间是否过期时出错: {e}")
return False
@staticmethod
def _parse_interval_seconds(interval_value):
"""解析间隔时间(默认按分钟),支持秒/分钟/小时(如: 30, 10m, 2h, 10分钟, 2小时
注意纯数字默认按分钟空字符串或None返回0表示没有间隔时间
"""
if interval_value is None or interval_value == '':
return 0
value_str = str(interval_value).strip().lower()
if not value_str:
return 0
# 中文单位映射
cn_map = {
"": "s",
"分钟": "m",
"": "m",
"小时": "h",
"": "h"
}
for cn, en in cn_map.items():
if value_str.endswith(cn):
value_str = value_str[:-len(cn)] + en
break
try:
# 纯数字,默认分钟
if value_str.isdigit():
return int(value_str) * 60
# 支持 10m / 2h / 30s
unit = value_str[-1]
num_part = value_str[:-1].strip()
if not num_part:
return 0
num = float(num_part)
if unit == "m":
return int(num * 60)
if unit == "h":
return int(num * 3600)
if unit == "s":
return int(num)
except (ValueError, TypeError):
return 0
return 0
@staticmethod
def _format_schedule_time(dt):
"""格式化定时发布时间字符串"""
if not dt:
return ""
return dt.strftime("%Y-%m-%d %H:%M:%S")
def _update_table_cell(self, index, col, value, highlight=False, is_config_index=False):
"""更新配置数据及其对应的表格单元格(如果当前可见)
index: 行号 (is_config_index=False) 或 配置列表索引 (is_config_index=True)
"""
try:
# 1. 确定配置索引和表格行号
if is_config_index:
config_index = index
row_idx = self.page_row_indices.index(config_index) if hasattr(self, 'page_row_indices') and config_index in self.page_row_indices else -1
else:
row_idx = index
config_index = self.page_row_indices[row_idx] if hasattr(self, 'page_row_indices') and row_idx < len(self.page_row_indices) else row_idx
# 2. 更新数据源 self.configs
if 0 <= config_index < len(self.configs):
col_to_field = {
4: "定时发布",
8: "情况",
9: "文件路径",
}
field = col_to_field.get(col)
if field:
self.configs[config_index][field] = str(value)
# 3. 如果行可见,更新 UI
if 0 <= row_idx < self.config_table.rowCount():
# 使用 blockSignals 临时阻止信号,防止递归
self.config_table.blockSignals(True)
try:
item = QTableWidgetItem(str(value))
item.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
item.setFlags(item.flags() & ~Qt.ItemFlag.ItemIsEditable)
if highlight:
item.setBackground(QColor("#E6F4FF"))
self.config_table.setItem(row_idx, col, item)
finally:
self.config_table.blockSignals(False)
except Exception as e:
logger.warning(f"更新单元格失败: {e}")
self.config_table.blockSignals(False)
def _update_table_status(self, index, status, is_config_index=False):
"""更新状态列及其对应的表格单元格
index: 行号 (is_config_index=False) 或 配置列表索引 (is_config_index=True)
"""
try:
# 1. 确定配置索引和表格行号
if is_config_index:
config_index = index
row_idx = self.page_row_indices.index(config_index) if hasattr(self, 'page_row_indices') and config_index in self.page_row_indices else -1
else:
row_idx = index
config_index = self.page_row_indices[row_idx] if hasattr(self, 'page_row_indices') and row_idx < len(self.page_row_indices) else row_idx
# 2. 更新数据源
if 0 <= config_index < len(self.configs):
self.configs[config_index]["情况"] = status
# 3. 如果可见,更新 UI
if 0 <= row_idx < self.config_table.rowCount():
# 使用 blockSignals 临时阻止信号,防止递归
self.config_table.blockSignals(True)
try:
# 第8列是"情况"列,只设置文字颜色,不设置背景色和图标
status_item = QTableWidgetItem(status)
status_item.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
status_item.setFlags(status_item.flags() & ~Qt.ItemFlag.ItemIsEditable)
if status == "已完成":
status_item.setForeground(QColor("#155724")) # 绿色
elif status == "失败":
status_item.setForeground(QColor("#721C24")) # 红色
elif status == "执行中":
status_item.setForeground(QColor("#0C5460")) # 蓝色
self._apply_current_highlight_to_item(row_idx, 8, status_item, status)
self.config_table.setItem(row_idx, 8, status_item)
finally:
self.config_table.blockSignals(False)
except Exception as e:
logger.error(f"更新状态失败: {e}")
self.config_table.blockSignals(False)
@staticmethod
def _highlight_color():
"""高亮颜色 - 淡蓝色背景"""
return QColor(173, 216, 230) # 淡蓝色 (Light Blue)
def _default_color(self):
"""默认背景色"""
return self.config_table.palette().color(self.config_table.palette().Base)
def _apply_current_highlight_to_item(self, _row, col, item, text=None):
"""根据当前筛选条件对单元格应用高亮"""
if not item or not hasattr(self, "table_search_input"):
return
if not self.table_highlight.isChecked():
return
keyword_raw = self.table_search_input.text().strip()
if not keyword_raw:
return
column_index = self._filter_column_index()
if column_index >= 0 and col != column_index:
return
terms = [t.lower() for t in keyword_raw.split() if t]
if not terms:
return
cell_text = text if text is not None else item.text()
cell_compare = (cell_text or "").lower()
if all(term in cell_compare for term in terms):
item.setBackground(self._highlight_color())
else:
item.setBackground(self._default_color())
def _filter_column_index(self):
"""筛选项下拉对应的表格列索引;-1 表示全部列。排除情况列以多多ID等为筛选项。"""
i = self.table_column_filter.currentIndex()
if i <= 0 or not getattr(self, "_filter_table_columns", None):
return -1
k = i - 1
if k >= len(self._filter_table_columns):
return -1
return self._filter_table_columns[k]
def _filter_model_column_index(self):
"""Model/View 模式下筛选项对应的列索引;-1 表示全部列。"""
i = self.table_column_filter.currentIndex()
if i <= 0 or not getattr(self, "_filter_model_columns", None):
return -1
k = i - 1
if k >= len(self._filter_model_columns):
return -1
return self._filter_model_columns[k]
def _show_all_rows(self):
"""显示全部行"""
for row in range(self.config_table.rowCount()):
self.config_table.setRowHidden(row, False)
def _filter_by_status(self, status):
"""按状态筛选表格行(在全量数据上计算,分页显示筛选结果)"""
if not getattr(self, 'configs', None) or not self.configs:
self._show_infobar("warning", "提示", "暂无数据")
return
# 清除文本筛选,确保每次筛选都基于全量数据
if hasattr(self, 'table_search_input') and self.table_search_input.text().strip():
self.table_search_input.blockSignals(True)
self.table_search_input.clear()
self.table_search_input.blockSignals(False)
self.filtered_config_indices = None
self.table_match_rows = []
self.table_match_index = -1
# 切换筛选状态
current_filter = getattr(self, '_current_status_filter', None)
if current_filter == status:
self._current_status_filter = None
self.filtered_config_indices = None
self.current_page = 1
self.update_table()
self._show_infobar("success", "提示", "已显示全部记录")
return
self._current_status_filter = status
# 在全量 configs 上计算匹配的配置索引
matching_indices = []
for i, config in enumerate(self.configs):
config_status = str(config.get('情况', ''))
if status == "成功":
match = "完成" in config_status or "成功" in config_status
elif status == "失败":
match = "失败" in config_status or "错误" in config_status
else:
match = status in config_status
if match:
matching_indices.append(i)
self.filtered_config_indices = matching_indices
self.current_page = 1
self.update_table()
if not matching_indices:
self._show_infobar("warning", "提示", f"没有{status}的记录")
else:
self._show_infobar("success", "筛选", f"已筛选出 {len(matching_indices)}{status}记录,再次点击取消筛选")
def _update_status_statistics(self):
"""更新状态统计(成功/失败/待执行数量)- 统计所有配置的真实状态
待执行:勾选的且状态不是"已完成"的数据,表示用户选择要处理的任务数
执行中只有在任务运行时才有值任务完成后为0
成功:状态为"已完成"的数据数量
失败:状态为"失败/跳过/过期"的数据数量
"""
if not hasattr(self, 'configs') or not self.configs:
self.set_status_cards(pending=0, running=0, success=0, failed=0)
return
pending_count = 0
success_count = 0
failed_count = 0
# 统计所有配置的状态(根据数据总量来统计)
for config in self.configs:
status = config.get('情况', '待执行')
is_checked = config.get('勾选', False)
if "完成" in status or "成功" in status:
success_count += 1
elif "失败" in status or "错误" in status or "过期" in status or "跳过" in status:
failed_count += 1
# 只有勾选的失败项才算待执行
if is_checked:
pending_count += 1
elif "" in status or not status:
# 只有勾选的待执行项才算待执行
if is_checked:
pending_count += 1
# 执行中始终为0只有任务运行时才会被单独更新
self.set_status_cards(pending=pending_count, running=0, success=success_count, failed=failed_count)
def retry_failed_items(self):
"""重新发布所有失败的项目"""
if not hasattr(self, 'configs') or not self.configs:
self._show_infobar("warning", "提示", "暂无数据")
return
# 收集失败的项目索引
failed_indices = []
for idx, config in enumerate(self.configs):
status = config.get('情况', '')
if "失败" in status or "错误" in status or "过期" in status or "跳过" in status:
failed_indices.append(idx)
if not failed_indices:
self._show_infobar("warning", "提示", "没有失败的项目需要重新发布")
return
# 确认对话框
reply = QMessageBox.question(
self,
"确认重新发布",
f"发现 {len(failed_indices)} 条失败记录,是否重新发布?\n\n"
"重新发布将把这些记录的状态重置为【待执行】,然后执行发布任务。",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply != QMessageBox.Yes:
return
# 重置失败项的状态为"待执行"
for idx in failed_indices:
self.configs[idx]['情况'] = '待执行'
# 更新表格显示
self.update_table()
self._update_status_statistics()
# 筛选显示这些待重新发布的项
self._filter_by_status("待执行")
self._show_infobar("success", "已重置", f"已将 {len(failed_indices)} 条失败记录重置为待执行状态,请点击【开始上传】按钮开始发布")
def get_failed_configs(self):
"""获取所有失败的配置项"""
if not hasattr(self, 'configs') or not self.configs:
return []
failed_configs = []
for config in self.configs:
status = config.get('情况', '')
if "失败" in status or "错误" in status or "过期" in status or "跳过" in status:
failed_configs.append(config)
return failed_configs
def set_status_cards(self, update_text=None, pending=None, running=None, success=None, failed=None):
"""更新状态卡片显示"""
if update_text is not None:
self.status_update_value.setText(update_text)
if "未找到" in update_text:
self.status_update_value.setStyleSheet("color: #b42318;")
elif "未更新" in update_text:
self.status_update_value.setStyleSheet("color: #6b7280;")
else:
self.status_update_value.setStyleSheet("color: #1d4ed8;")
if pending is not None:
self.status_pending_value.setText(str(pending))
try:
pending_num = int(str(pending))
except ValueError:
pending_num = 0
if pending_num > 0:
self.status_pending_value.setStyleSheet("color: #b45309;")
else:
self.status_pending_value.setStyleSheet("color: #15803d;")
if running is not None:
self.status_running_value.setText(str(running))
running_text = str(running)
if running_text in ["0", "0/0"]:
self.status_running_value.setStyleSheet("color: #6b7280;")
else:
self.status_running_value.setStyleSheet("color: #1d4ed8;")
if success is not None:
self.status_success_value.setText(str(success))
try:
success_num = int(str(success))
except ValueError:
success_num = 0
if success_num > 0:
self.status_success_value.setStyleSheet("color: #15803d; font-weight: bold;")
else:
self.status_success_value.setStyleSheet("color: #6b7280;")
if failed is not None:
self.status_failed_value.setText(str(failed))
try:
failed_num = int(str(failed))
except ValueError:
failed_num = 0
if failed_num > 0:
self.status_failed_value.setStyleSheet("color: #dc2626; font-weight: bold;")
else:
self.status_failed_value.setStyleSheet("color: #6b7280;")
def set_running_progress(self, done, total):
"""更新执行中统计"""
self.running_done = done
self.running_total = total
if total > 0:
self.set_status_cards(running=f"{done}/{total}")
if hasattr(self, "status_running_progress"):
percent = int((done / total) * 100)
self.status_running_progress.setValue(percent)
else:
self.set_status_cards(running="0")
if hasattr(self, "status_running_progress"):
self.status_running_progress.setValue(0)
def clear_log(self):
"""清空日志显示"""
self.log_text.clear()
def filter_log(self, text):
"""过滤日志内容"""
keyword = text.strip()
self._clear_log_highlight()
self._update_log_match_status(0)
if not keyword:
return
self._update_log_matches(keyword)
if self.log_highlight_check.isChecked():
self._highlight_log_matches()
self.find_log(backward=False, reset=True)
def export_log(self):
"""导出日志到文件"""
log_content = self.log_text.toPlainText()
if not log_content.strip():
self._show_infobar("warning", "提示", "日志为空,无法导出")
return
file_path, _ = QFileDialog.getSaveFileName(
self, "保存日志", "log.txt", "文本文件 (*.txt)"
)
if not file_path:
return
try:
with open(file_path, "w", encoding="utf-8") as f:
f.write(log_content)
self._show_infobar("success", "成功", f"日志已导出: {file_path}")
except Exception as e:
self._show_infobar("error", "错误", f"导出失败: {str(e)}")
def find_log(self, backward=False, reset=False):
"""查找日志"""
keyword = self.log_search_input.text().strip()
if not keyword:
return
if not self.log_match_positions:
self._update_log_matches(keyword)
if not self.log_match_positions:
return
if reset:
self.log_match_index = 0 if not backward else len(self.log_match_positions) - 1
else:
step = -1 if backward else 1
self.log_match_index = (self.log_match_index + step) % len(self.log_match_positions)
start, end = self.log_match_positions[self.log_match_index]
cursor = self.log_text.textCursor()
cursor.setPosition(start)
cursor.setPosition(end, QTextCursor.KeepAnchor)
self.log_text.setTextCursor(cursor)
self.log_text.ensureCursorVisible()
def _clear_log_highlight(self):
"""清除日志高亮"""
cursor = self.log_text.textCursor()
cursor.select(cursor.Document)
cursor.setCharFormat(self.log_text.currentCharFormat())
cursor.clearSelection()
def _highlight_log_matches(self):
"""高亮日志所有匹配项"""
if not self.log_match_positions:
return
fmt = self.log_text.currentCharFormat()
fmt.setBackground(self.log_text.palette().color(self.log_text.palette().Highlight).lighter(160))
for start, end in self.log_match_positions:
cursor = self.log_text.textCursor()
cursor.setPosition(start)
cursor.setPosition(end, QTextCursor.KeepAnchor)
cursor.mergeCharFormat(fmt)
def _update_log_matches(self, keyword):
"""更新日志匹配位置"""
self.log_match_positions = []
self.log_match_index = -1
if not keyword:
self._update_log_match_status(0)
return
content = self.log_text.toPlainText()
whole_word = self.log_whole_word.isChecked()
pattern_text = re.escape(keyword)
if whole_word:
pattern_text = rf"\b{pattern_text}\b"
pattern = re.compile(pattern_text, re.IGNORECASE)
for match in pattern.finditer(content):
self.log_match_positions.append((match.start(), match.end()))
self._update_log_match_status(len(self.log_match_positions))
def _update_log_match_status(self, count):
"""更新日志匹配统计"""
if hasattr(self, "log_match_status"):
self.log_match_status.setText(f"匹配: {count}")
def _apply_table_column_widths(self):
"""应用配置表列宽(仅按窗口宽度自适应,不根据内容变化)"""
header = self.config_table.horizontalHeader()
header.setStretchLastSection(True)
# 禁用根据内容自适应,防止导入或更新数据时列宽剧烈跳动
# 设置最小列宽,确保内容可见
min_widths = {
0: 50, # 勾选框
1: 80, # 多多ID
2: 60, # 序号
3: 100, # 话题
4: 100, # 定时发布
5: 80, # 间隔时间
6: 120, # 达人链接
7: 80, # 执行人
8: 80, # 情况
9: 150, # 文件路径
10: 70, # 进度
11: 100 # 操作
}
# 切换到Interactive模式允许手动拖拽调整
header.setSectionResizeMode(0, QHeaderView.Fixed) # 勾选框固定
for col in range(1, self.config_table.columnCount()):
header.setSectionResizeMode(col, QHeaderView.Interactive)
if col in min_widths:
# 仅在第一次或比例未建立时设置初始宽度
if not self._column_width_ratios:
self.config_table.setColumnWidth(col, min_widths[col])
# 如果已经有保存的比例,直接按比例缩放
if self._column_width_ratios:
self._auto_resize_table_columns()
else:
# 第一次加载,根据当前设置的初始宽度计算比例
self._update_column_width_ratios()
self._auto_resize_table_columns()
def _update_column_width_ratios(self):
"""更新列宽比例"""
if self.config_table.columnCount() == 0:
return
total_width = 0
widths = {}
# 计算所有列的总宽度(不包括勾选框)
for col in range(1, self.config_table.columnCount()):
width = self.config_table.columnWidth(col)
widths[col] = width
total_width += width
# 计算比例
if total_width > 0:
self._column_width_ratios = {}
for col, width in widths.items():
self._column_width_ratios[col] = width / total_width
def _auto_resize_table_columns(self):
"""根据比例自动调整表格列宽"""
if not self._column_width_ratios or self._is_manual_resize:
return
header = self.config_table.horizontalHeader()
if header.sectionResizeMode(1) != QHeaderView.Interactive:
return
# 获取表格可用宽度
table_width = self.config_table.viewport().width()
checkbox_width = self.config_table.columnWidth(0) if self.config_table.columnCount() > 0 else 0
available_width = table_width - checkbox_width
if available_width <= 0:
return
# 按比例分配宽度
self._is_manual_resize = True # 临时标记,避免触发信号
columns = sorted(self._column_width_ratios.keys())
assigned_width = 0
for i, col in enumerate(columns):
if col < self.config_table.columnCount():
ratio = self._column_width_ratios[col]
# 最后一列取剩余所有宽度,确保填满
if i == len(columns) - 1:
new_width = max(60, available_width - assigned_width)
else:
new_width = int(available_width * ratio)
min_width = 60
if new_width < min_width:
new_width = min_width
self.config_table.setColumnWidth(col, new_width)
assigned_width += new_width
self._is_manual_resize = False
def on_column_resized(self, logical_index, _old_size, _new_size):
"""列宽调整时的回调(手动或自动)"""
if not self._is_manual_resize and logical_index > 0 and self._resize_timer is not None:
# 用户手动调整了列宽,延迟更新比例(避免拖拽时频繁更新)
self._resize_timer.stop()
self._resize_timer.start(300) # 300ms后更新比例
def _delayed_update_ratios(self):
"""延迟更新列宽比例"""
if not self._is_manual_resize:
self._update_column_width_ratios()
def on_table_geometry_changed(self):
"""表格几何形状改变时的回调(窗口大小改变)"""
if not self._is_manual_resize and self._auto_resize_timer is not None:
# 使用独立的定时器进行防抖,避免频繁触发导致卡顿
self._auto_resize_timer.stop()
self._auto_resize_timer.start(50)
def _apply_table_view_column_widths(self):
"""应用 Model/View 列宽(仅按窗口宽度自适应,不根据内容变化)"""
if not self.table_view.model():
return
header = self.table_view.horizontalHeader()
header.setStretchLastSection(True)
# 禁用根据内容自适应
min_widths = {
0: 80, # 多多ID
1: 60, # 序号
2: 100, # 话题
3: 100, # 定时发布
4: 80, # 间隔时间
5: 120, # 达人链接
6: 80, # 执行人
7: 80, # 情况
8: 150, # 文件路径
9: 70, # 进度
10: 100 # 操作
}
# 切换到Interactive模式
for col in range(self.table_view.model().columnCount()):
header.setSectionResizeMode(col, QHeaderView.Interactive)
if col in min_widths and not self._table_view_column_width_ratios:
self.table_view.setColumnWidth(col, min_widths[col])
# 按比例分配或初始化比例
if self._table_view_column_width_ratios:
self._auto_resize_table_view_columns()
else:
self._update_table_view_column_width_ratios()
self._auto_resize_table_view_columns()
def _update_table_view_column_width_ratios(self):
"""更新Model/View模式的列宽比例"""
if not self.table_view.model() or self.table_view.model().columnCount() == 0:
return
total_width = 0
widths = {}
# 计算所有列的总宽度
for col in range(self.table_view.model().columnCount()):
width = self.table_view.columnWidth(col)
widths[col] = width
total_width += width
# 计算比例
if total_width > 0:
self._table_view_column_width_ratios = {}
for col, width in widths.items():
self._table_view_column_width_ratios[col] = width / total_width
def _auto_resize_table_view_columns(self):
"""根据比例自动调整Model/View表格列宽"""
if not self._table_view_column_width_ratios or self._is_table_view_manual_resize:
return
header = self.table_view.horizontalHeader()
if not header or header.sectionResizeMode(0) != QHeaderView.Interactive:
return
# 获取表格可用宽度
table_width = self.table_view.viewport().width()
if table_width <= 0:
return
# 按比例分配宽度
self._is_table_view_manual_resize = True # 临时标记,避免触发信号
columns = sorted(self._table_view_column_width_ratios.keys())
assigned_width = 0
for i, col in enumerate(columns):
if col < self.table_view.model().columnCount():
ratio = self._table_view_column_width_ratios[col]
# 最后一列取剩余所有宽度,确保填满
if i == len(columns) - 1:
new_width = max(60, table_width - assigned_width)
else:
new_width = int(table_width * ratio)
min_width = 60
if new_width < min_width:
new_width = min_width
self.table_view.setColumnWidth(col, new_width)
assigned_width += new_width
self._is_table_view_manual_resize = False
def on_table_view_column_resized(self, _logical_index, _old_size, _new_size):
"""Model/View模式列宽调整时的回调手动或自动"""
if not self._is_table_view_manual_resize and self._table_view_resize_timer is not None:
# 用户手动调整了列宽,延迟更新比例(避免拖拽时频繁更新)
self._table_view_resize_timer.stop()
self._table_view_resize_timer.start(300) # 300ms后更新比例
def _delayed_update_table_view_ratios(self):
"""延迟更新Model/View模式的列宽比例"""
if not self._is_table_view_manual_resize:
self._update_table_view_column_width_ratios()
def on_table_view_geometry_changed(self):
"""Model/View模式表格几何形状改变时的回调窗口大小改变"""
if not self._is_table_view_manual_resize and self._auto_resize_table_view_timer is not None:
# 使用独立的定时器进行防抖,避免频繁触发导致卡顿
self._auto_resize_table_view_timer.stop()
self._auto_resize_table_view_timer.start(50)
# noinspection SpellCheckingInspection
def _show_infobar(self, level, title, content):
"""显示提示条"""
# noinspection PyArgumentList
if level == "success":
InfoBar.success(title=title, content=content, parent=self, position=InfoBarPosition.TOP_RIGHT)
elif level == "warning":
# noinspection PyArgumentList
InfoBar.warning(title=title, content=content, parent=self, position=InfoBarPosition.TOP_RIGHT)
elif level == "info":
# noinspection PyArgumentList
InfoBar.info(title=title, content=content, parent=self, position=InfoBarPosition.TOP_RIGHT)
else:
# noinspection PyArgumentList
InfoBar.error(title=title, content=content, parent=self, position=InfoBarPosition.TOP_RIGHT)
def browse_folder(self):
"""浏览文件夹"""
folder_path = QFileDialog.getExistingDirectory(self, "选择文件夹")
if folder_path:
self.folder_path_input.setText(folder_path)
def download_excel_template(self):
"""下载Excel配置模板"""
try:
# 选择保存路径
file_path, _ = QFileDialog.getSaveFileName(
self,
"保存配置模板",
"配置模板.xlsx",
"Excel文件 (*.xlsx)"
)
if not file_path:
return
# 创建模板数据
template_data = {
'多多id': ['示例ID_001', '示例ID_001', '示例ID_002'],
'序号': ['1', '2', '1'],
'话题': ['#话题1', '#话题2', '#话题1'],
'定时发布': ['2024-01-01 10:00', '', ''],
'间隔时间': [5, 5, 0],
'达人链接': ['https://example.com/user1', '', 'https://example.com/user2'],
'执行人': ['张三', '张三', '李四'],
'情况': ['待执行', '待执行', '待执行']
}
df = pd.DataFrame(template_data)
# 保存Excel文件
df.to_excel(file_path, index=False, engine='openpyxl')
self._show_infobar("success", "成功", f"模板已保存: {file_path}")
self.log_text.append(f"配置模板已下载: {file_path}")
self.log_text.append("模板说明:")
self.log_text.append(" - 多多id: 用户ID相同ID的数据会按序号顺序处理")
self.log_text.append(" - 序号: 文件序号用于匹配文件夹中的文件1-视频名称.mp4")
self.log_text.append(" - 话题: 发布时的话题标签")
self.log_text.append(" - 定时发布: 定时发布时间格式yyyy-MM-dd HH:mm")
self.log_text.append(" - 间隔时间: 在上一条基础上延迟的分钟数(无定时时间时生效)")
self.log_text.append(" - 达人链接: 达人主页链接")
self.log_text.append(" - 执行人: 负责人")
self.log_text.append(" - 情况: 执行状态(待执行/执行中/已完成/失败)")
except Exception as e:
self._show_infobar("error", "错误", f"保存模板失败: {str(e)}")
logger.error(f"保存模板失败: {e}")
def import_excel(self):
"""导入Excel配置文件直接弹出文件选择对话框"""
# 弹出文件选择对话框
excel_path, _ = QFileDialog.getOpenFileName(
self, "选择Excel配置文件", "", "Excel文件 (*.xlsx *.xls)"
)
if not excel_path:
return # 用户取消选择
# 显示选择的文件路径
self.excel_path_input.setText(excel_path)
try:
# 读取Excel文件添加更多异常处理
try:
df = pd.read_excel(excel_path)
except FileNotFoundError:
self._show_infobar("error", "错误", f"找不到文件: {excel_path}")
logger.error(f"文件不存在: {excel_path}")
return
except PermissionError:
self._show_infobar("error", "错误", f"文件被占用请关闭Excel文件后重试")
logger.error(f"文件被占用: {excel_path}")
return
except Exception as e:
self._show_infobar("error", "错误", f"读取Excel文件失败: {str(e)}")
logger.error(f"读取Excel失败: {e}")
return
if df.empty:
self._show_infobar("warning", "警告", "Excel文件为空")
return
# 检查必需的列
required_columns = ['多多id', '序号', '话题', '定时发布', '间隔时间', '达人链接', '执行人', '情况']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
self._show_infobar(
"warning",
"警告",
f"Excel缺少列: {', '.join(missing_columns)}"
)
return
# 清空所有状态根据新的Excel配置表格为准完全重置不保留任何上一次的数据
# 1. 清空配置列表
self.configs = []
# 2. 重置运行状态变量
self.running_total = 0
self.running_done = 0
# 3. 清空批量任务队列
self.batch_task_queue = []
self.current_batch_task_index = 0
# 4. 清空行映射(用于状态回写)
self._row_map_by_user_index = {}
# 5. 重置所有计数变量确保导入新Excel时完全清空上一次的统计
self.batch_success_count = 0
self.batch_failed_count = 0
self.batch_total_tasks = 0
self.batch_processed = 0
self.batch_pending_tasks = 0
# 6. 重置所有状态卡片显示(全部归零)
self.set_status_cards(update_text="未更新", pending=0, running=0, success=0, failed=0)
# 7. 重置进度条
if hasattr(self, "status_running_progress"):
self.status_running_progress.setValue(0)
if hasattr(self, "progress_bar"):
self.progress_bar.setValue(0)
self.progress_bar.setVisible(False)
# 8. 重置“更新数据”提示文本
if hasattr(self, "update_status_label"):
self.update_status_label.setText("未更新")
self.update_status_label.setStyleSheet("color: #666; font-size: 10px;")
# 转换为配置列表,添加异常处理
for idx, row in df.iterrows():
try:
# 处理间隔时间:保持原始状态,空值保持为空字符串
interval_raw = row.get('间隔时间')
if pd.notna(interval_raw) and str(interval_raw).strip() != '':
try:
interval_value = int(float(interval_raw)) # 先转float再转int处理小数情况
except (ValueError, TypeError):
interval_value = '' # 无效值保持为空
else:
interval_value = '' # 空值保持为空
# 辅助函数:将可能带.0的数字字符串转为整数型字符串
def clean_str(val):
if pd.isna(val): return ''
s = str(val).strip()
if s.endswith('.0'):
return s[:-2]
return s
# 导入新Excel时强制将所有状态重置为"待执行"根据新的Excel配置表格为准
# 忽略Excel中"情况"列的旧状态值,统一重置为"待执行"
status_value = '待执行'
config = {
'勾选': False, # 导入后默认不勾选,交由用户选择
'多多id': clean_str(row.get('多多id', '')),
'序号': clean_str(row.get('序号', '')),
'话题': str(row.get('话题', '')).strip() if pd.notna(row.get('话题')) else '',
'定时发布': str(row.get('定时发布', '')).strip() if pd.notna(row.get('定时发布')) else '',
'间隔时间': interval_value,
'达人链接': str(row.get('达人链接', '')).strip() if pd.notna(row.get('达人链接')) else '',
'执行人': str(row.get('执行人', '')).strip() if pd.notna(row.get('执行人')) else '',
'情况': status_value, # 强制重置为"待执行"根据新的Excel配置表格为准
'文件路径': '' # 文件路径字段初始为空,通过更新数据按钮填充
}
self.configs.append(config)
except Exception as e:
logger.warning(f"处理第 {idx + 1} 行数据时出错: {e}")
continue # 跳过有问题的行
if not self.configs:
self._show_infobar("warning", "警告", "未能解析出任何有效配置")
return
# 清除排序状态保持Excel原始顺序
self.table_sort_keys = []
self.config_table.horizontalHeader().setSortIndicator(-1, Qt.SortOrder.AscendingOrder)
# 更新表格显示(跳过表格->configs同步避免旧数据回写
try:
self.update_table(skip_sync=True)
except Exception as e:
self._show_infobar("error", "错误", f"更新表格失败: {str(e)}")
logger.error(f"更新表格失败: {e}")
return
# 导入后默认不勾选,确保全选状态为未选中
if hasattr(self, "table_select_all_checkbox"):
self.table_select_all_checkbox.blockSignals(True)
self.table_select_all_checkbox.setChecked(False)
self.table_select_all_checkbox.blockSignals(False)
# 显示表格
self.table_group.setVisible(True)
# 更新状态统计(基于新导入的配置重新计算)
self._update_status_statistics()
# 确保更新状态显示为"未更新"(文件路径还未更新)
self.set_status_cards(update_text="未更新")
self._show_infobar("success", "成功", f"成功导入 {len(self.configs)} 条配置,所有状态已清空重置")
except Exception as e:
error_msg = f"导入Excel文件失败: {str(e)}"
self._show_infobar("error", "错误", error_msg)
logger.error(f"导入Excel失败: {e}", exc_info=True)
def add_single_config(self):
"""添加单条配置到配置列表"""
try:
# 获取输入框的值
user_id = self.add_user_id_input.text().strip()
index = self.add_index_input.text().strip()
topic = self.add_topic_input.text().strip()
schedule = self.add_schedule_input.text().strip()
interval_text = self.add_interval_input.text().strip()
expert_link = self.add_expert_link_input.text().strip()
executor = self.add_executor_input.text().strip()
# 验证必填字段
if not user_id:
self._show_infobar("warning", "警告", "请输入多多ID")
self.add_user_id_input.setFocus()
return
if not index:
self._show_infobar("warning", "警告", "请输入序号")
self.add_index_input.setFocus()
return
# 处理间隔时间
interval_value = ''
if interval_text:
try:
interval_value = int(float(interval_text))
except ValueError:
self._show_infobar("warning", "警告", "间隔时间必须是数字")
self.add_interval_input.setFocus()
return
# 创建配置对象
config = {
'勾选': False,
'多多id': user_id,
'序号': index,
'话题': topic,
'定时发布': schedule,
'间隔时间': interval_value,
'达人链接': expert_link,
'执行人': executor,
'情况': '待执行',
'文件路径': ''
}
# 添加到配置列表
self.configs.append(config)
# 更新表格显示
self.update_table()
# 显示表格
self.table_group.setVisible(True)
# 更新状态统计
self._update_status_statistics()
self.set_status_cards(update_text="未更新")
# 清空输入框
self.clear_add_config_inputs()
self._show_infobar("success", "成功", f"已添加配置: {user_id} - {index}")
except Exception as e:
error_msg = f"添加配置失败: {str(e)}"
self._show_infobar("error", "错误", error_msg)
logger.error(f"添加配置失败: {e}", exc_info=True)
def clear_add_config_inputs(self):
"""清空单条配置输入框"""
self.add_user_id_input.clear()
self.add_index_input.clear()
self.add_topic_input.clear()
self.add_schedule_input.clear()
self.add_interval_input.clear()
self.add_expert_link_input.clear()
self.add_executor_input.clear()
# 设置焦点到第一个输入框
self.add_user_id_input.setFocus()
def update_table(self, skip_sync=False):
"""更新配置表格。skip_sync=True 时跳过 表格→configs 同步(如刚由更新数据写入 configs 后刷新表格)。"""
if getattr(self, "config_table", None) is None:
return
if not self.use_model_view and not skip_sync:
self._sync_configs_from_table()
self.is_updating_table = True
# 使用筛选后的索引或全部索引进行分页(筛选在全量数据上计算,分页只切当前页)
effective_indices = self.filtered_config_indices if getattr(self, 'filtered_config_indices', None) is not None else list(range(len(self.configs)))
total_rows = len(effective_indices)
# 设置最小显示行数,即使没有数据也显示空行
min_display_rows = 15 # 最少显示15行
if len(self.configs) > 1000:
self._setup_model_view()
self.is_updating_table = False
return
self.use_model_view = False
if getattr(self, "table_view", None) is not None:
self.table_view.setVisible(False)
self.config_table.setVisible(True)
for btn in [getattr(self, "page_first_btn", None), getattr(self, "page_prev_btn", None),
getattr(self, "page_next_btn", None), getattr(self, "page_last_btn", None),
getattr(self, "page_size_combo", None)]:
if btn is not None:
btn.setEnabled(True)
total_pages = max(1, (total_rows + self.page_size - 1) // self.page_size)
if self.current_page > total_pages:
self.current_page = total_pages
start = (self.current_page - 1) * self.page_size
end = min(start + self.page_size, total_rows)
self.page_row_indices = list(effective_indices[start:end])
# 临时禁用排序,防止填充数据时自动排序打乱顺序
self.config_table.setSortingEnabled(False)
# 筛选时只显示当前页数据行,不预留空行,避免“仅显示匹配项”时仍看到多余行
is_filtered = getattr(self, 'filtered_config_indices', None) is not None
display_rows = len(self.page_row_indices) if is_filtered else max(len(self.page_row_indices), min_display_rows)
self.config_table.setRowCount(display_rows)
# 先统一设置行高,再填数据,避免后设行高导致勾选框列布局重算、出现“除第一行外往下移”
default_row_height = self.config_table.verticalHeader().defaultSectionSize()
for row in range(display_rows):
self.config_table.setRowHeight(row, default_row_height)
# 先清除第 0 列所有单元格控件,避免连续刷新时旧勾选框残留导致错位
for row in range(display_rows):
self.config_table.removeCellWidget(row, 0)
# 填充期间禁止表格重绘,避免中间布局导致勾选框错位
self.config_table.setUpdatesEnabled(False)
try:
# 填充数据行
for table_row, config_index in enumerate(self.page_row_indices):
config = self.configs[config_index]
# 第0列勾选框传入行高固定高度+居中对齐,避免第一页第二行等错位)
self._set_checkbox_item(table_row, config_index, default_row_height)
# 其他列的索引均+1
self.config_table.setItem(table_row, 1, self._create_centered_item(str(config.get('多多id', ''))))
self.config_table.setItem(table_row, 2, self._create_centered_item(str(config.get('序号', ''))))
self.config_table.setItem(table_row, 3, self._create_centered_item(str(config.get('话题', ''))))
self.config_table.setItem(table_row, 4, self._create_centered_item(str(config.get('定时发布', ''))))
# 间隔时间:保持原始状态,空值显示为空
interval_val = config.get('间隔时间', '')
if interval_val == '' or interval_val is None:
interval_display = ''
else:
interval_display = str(interval_val)
self.config_table.setItem(table_row, 5, self._create_centered_item(interval_display))
url_text = str(config.get('达人链接', ''))
url_item = self._create_centered_item(url_text)
url_item.setToolTip(url_text)
self.config_table.setItem(table_row, 6, url_item)
self.config_table.setItem(table_row, 7, self._create_centered_item(str(config.get('执行人', ''))))
self._set_status_item(table_row, str(config.get('情况', '待执行')))
file_path = str(config.get('文件路径', ''))
file_path_item = self._create_centered_item(file_path)
file_path_item.setToolTip(file_path)
self.config_table.setItem(table_row, 9, file_path_item)
self._set_progress_item(table_row, str(config.get('情况', '待执行')))
self._set_action_buttons(table_row, config_index)
finally:
self.config_table.setUpdatesEnabled(True)
# 强制表格立即完成布局,避免第一页第二行等勾选框错位
self.config_table.doItemsLayout()
# 重新启用排序功能(但不会自动排序已填充的数据)
self.config_table.setSortingEnabled(True)
# 固定列宽(不随内容自适应)
self._apply_table_column_widths()
# 更新状态卡片统计(只统计真实数据)
self._update_status_statistics()
if getattr(self, "table_empty_label", None) is not None:
self.table_empty_label.setVisible(False)
if getattr(self, "page_info_label", None) is not None:
self.page_info_label.setText(f"{self.current_page}/{total_pages}")
self.is_updating_table = False
self._update_checked_count()
def _setup_model_view(self):
"""切换到大数据量 Model/View 模式"""
headers = MODEL_VIEW_HEADERS
if self.table_model is None:
self.table_model = ConfigTableModel(self.configs, headers, self)
self.table_proxy = QSortFilterProxyModel(self)
self.table_proxy.setSourceModel(self.table_model)
self.table_proxy.setFilterKeyColumn(-1)
self.table_view.setModel(self.table_proxy)
self.table_delegate = TableActionDelegate(self.table_view, self._edit_row_from_view,
self._delete_row_from_view)
self.table_view.setItemDelegate(self.table_delegate)
header = self.table_view.horizontalHeader()
header.setStretchLastSection(True)
header.setSectionResizeMode(QHeaderView.Interactive)
# 连接列宽调整信号,用于跟踪手动调整和自动调整
header.sectionResized.connect(self.on_table_view_column_resized)
header.geometriesChanged.connect(self.on_table_view_geometry_changed)
self._apply_table_view_column_widths()
else:
self.table_model.update_data(self.configs)
self.use_model_view = True
self.table_view.setVisible(True)
self.config_table.setVisible(False)
if hasattr(self, "page_first_btn"):
for btn in [self.page_first_btn, self.page_prev_btn, self.page_next_btn, self.page_last_btn,
self.page_size_combo]:
btn.setEnabled(False)
if hasattr(self, "table_empty_label"):
self.table_empty_label.setVisible(len(self.configs) == 0)
if hasattr(self, "page_info_label"):
total_rows = len(self.configs)
self.page_info_label.setText(f"大数据模式:{total_rows}")
self._update_checked_count()
self._show_infobar("warning", "提示", "数据量较大已切换到Model/View模式部分功能受限")
def _edit_row_from_view(self, index):
"""Model/View 编辑行"""
if not index.isValid():
return
edit_index = index.sibling(index.row(), 0)
self.table_view.setCurrentIndex(edit_index)
self.table_view.edit(edit_index)
def _delete_row_from_view(self, index):
"""Model/View 删除行"""
if not index.isValid():
return
src_index = self.table_proxy.mapToSource(index)
if src_index.isValid():
self.table_model.removeRows(src_index.row(), 1)
def get_config_from_table(self, row_index=0):
"""从表格中获取指定行的配置数据(使用表格中修改后的值)"""
if row_index >= self.config_table.rowCount():
return None
def get_cell_text(row, col):
"""安全获取单元格文本"""
item = self.config_table.item(row, col)
return item.text().strip() if item else ''
def get_cell_int_or_empty(row, col):
"""安全获取单元格整数或空字符串"""
item = self.config_table.item(row, col)
if item and item.text().strip():
try:
return int(item.text().strip())
except ValueError:
return '' # 无效值返回空字符串
return '' # 空值返回空字符串
config = {
'多多id': get_cell_text(row_index, 1), # 第1列
'序号': get_cell_text(row_index, 2), # 第2列
'话题': get_cell_text(row_index, 3), # 第3列
'定时发布': get_cell_text(row_index, 4), # 第4列
'间隔时间': get_cell_int_or_empty(row_index, 5), # 第5列空值保持为空
'达人链接': get_cell_text(row_index, 6), # 第6列
'执行人': get_cell_text(row_index, 7), # 第7列
'情况': get_cell_text(row_index, 8) or '待执行', # 第8列
'文件路径': get_cell_text(row_index, 9) # 第9列是文件路径
}
return config
@staticmethod
def get_config():
"""获取当前配置数据已废弃配置从Excel导入"""
# 配置现在从Excel导入此方法返回None表示需要先导入Excel
return None
def update_data(self):
"""更新数据:找出文件并保存到各行的文件路径列"""
# 防止连续点击导致重入,避免勾选框错位
if getattr(self, '_update_data_running', False):
return
try:
self._update_data_running = True
if hasattr(self, 'update_data_btn'):
self.update_data_btn.setEnabled(False)
if not self.configs:
self._show_infobar("warning", "提示", "请先导入Excel配置文件")
return
# 获取文件夹路径
folder_path = self.folder_path_input.text().strip()
if not folder_path:
folder_path = get_default_folder_path()
if not os.path.exists(folder_path):
self._show_infobar("warning", "警告", f"文件夹路径不存在: {folder_path}")
return
self.log_text.append("=" * 50)
self.log_text.append("开始批量更新文件路径...")
self.log_text.append(f"共有 {len(self.configs)} 行配置需要处理")
self.log_text.append(f"搜索根目录: {folder_path} (结构: 根目录/多多ID/序号(-xxx).mp4 或 序号(-xxx)/")
total_found_files = 0
rows_with_files = 0
skip_no_id = 0
no_match_log = []
# 标记正在更新,防止触发信号
self.is_updating_table = True
# 遍历所有配置项进行更新
for config_idx, config in enumerate(self.configs):
user_id = str(config.get('多多id', '')).strip()
index = str(config.get('序号', '')).strip()
if not user_id or not index:
skip_no_id += 1
continue
found_files = self._find_files_for_config(config, folder_path)
if found_files:
file_paths_str = "; ".join([str(f['path']) for f in found_files])
config['文件路径'] = file_paths_str
rows_with_files += 1
total_found_files += len(found_files)
else:
config['文件路径'] = ""
no_match_log.append(f"多多ID={user_id} 序号={index}")
# 更新 UI 表格显示(跳过 syncconfigs 已由更新数据写入,勿用表格回写覆盖)
if self.use_model_view:
if self.table_model:
self.table_model.layoutChanged.emit()
else:
self.update_table(skip_sync=True)
# 结束更新
self.is_updating_table = False
if skip_no_id:
self.log_text.append(f"跳过 {skip_no_id}多多ID或序号为空")
if no_match_log:
self.log_text.append(f"未匹配到文件 {len(no_match_log)} 行:")
for s in no_match_log[:20]:
self.log_text.append(f" · {s}")
if len(no_match_log) > 20:
self.log_text.append(f" ... 还有 {len(no_match_log) - 20}")
total_rows = len(self.configs)
self.log_text.append("=" * 50)
self.log_text.append(f"更新完成!数据 {total_rows} 条,文件 {total_found_files} 个,匹配 {rows_with_files}")
status_msg = f"数据 {total_rows} 条,文件 {total_found_files} 个,匹配 {rows_with_files}"
self.update_status_label.setText(status_msg)
self.update_status_label.setStyleSheet("color: #4CAF50; font-size: 10px;")
self.set_status_cards(update_text=status_msg)
self._update_status_statistics()
# 更新映射
self._rebuild_row_map()
# 计算并应用间隔时间规则
self.log_text.append("=" * 50)
self.log_text.append("开始计算间隔时间并更新定时发布时间...")
configs_with_rows = []
for i, cfg in enumerate(self.configs):
configs_with_rows.append({"config_index": i, "config": cfg})
updated_count = self._apply_schedule_intervals(configs_with_rows)
if updated_count > 0:
self.log_text.append(f"✓ 已自动计算并更新 {updated_count} 行的定时发布时间")
if not self.use_model_view:
self.update_table(skip_sync=True)
elif self.table_model:
self.table_model.layoutChanged.emit()
self.log_text.append("=" * 50)
self._show_infobar("success", "成功",
f"数据 {total_rows} 条,文件 {total_found_files} 个,匹配 {rows_with_files}")
return
except Exception as e:
error_msg = f"更新数据失败: {str(e)}"
self.log_text.append(error_msg)
self._show_infobar("error", "错误", error_msg)
logger.error(f"更新数据失败: {e}")
import traceback
traceback.print_exc()
finally:
self._update_data_running = False
if hasattr(self, 'update_data_btn'):
self.update_data_btn.setEnabled(True)
@staticmethod
def _index_matches_name(index, name_or_stem):
"""序号与文件名/文件夹名是否匹配。支持:序号、序号.mp4、序号-xxx、01 与 1 数字等价。"""
if not name_or_stem or not index:
return False
s = str(name_or_stem).strip()
if s == index:
return True
if s.startswith(index + "-"):
return True
parts = s.split("-")
part0 = parts[0].strip() if parts else ""
if part0 == index:
return True
try:
if int(s) == int(index):
return True
if part0 and int(part0) == int(index):
return True
except (ValueError, TypeError):
pass
return False
def _find_files_for_config(self, config, folder_path):
"""根据配置查找文件(辅助方法)。目录结构:文件夹路径/多多ID/[(子目录)/] 序号(-xxx).mp4 或 序号(-xxx)/"""
found_files = []
if not config:
logger.warning("配置为空,无法查找文件")
return found_files
if not folder_path or not os.path.exists(folder_path):
logger.warning(f"文件夹路径无效或不存在: {folder_path}")
return found_files
index = str(config.get('序号', '')).strip()
if not index:
logger.warning("序号为空,无法查找文件")
return found_files
user_id = str(config.get('多多id', '')).strip()
if not user_id:
logger.warning("多多ID为空无法查找文件")
return found_files
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm']
try:
if not os.access(folder_path, os.R_OK):
logger.error(f"没有权限读取文件夹: {folder_path}")
return found_files
try:
sub_dirs = [entry for entry in os.listdir(folder_path)
if os.path.isdir(os.path.join(folder_path, entry))]
except (PermissionError, OSError) as e:
logger.error(f"读取文件夹失败: {e}")
return found_files
target_subdir = None
for subdir_name in sub_dirs:
if subdir_name == user_id:
target_subdir = os.path.join(folder_path, subdir_name)
break
if not target_subdir or not os.path.exists(target_subdir):
return found_files
def append_file(fpath):
fpath_obj = Path(fpath)
if not fpath_obj.exists():
return
if fpath_obj.is_file():
if any(fpath_obj.suffix.lower() == ext for ext in video_extensions):
found_files.append({
"url": config.get('达人链接', ''),
"user_id": user_id,
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
"ht": config.get('话题', ''),
"index": index,
"path": fpath_obj
})
elif fpath_obj.is_dir():
found_files.append({
"url": config.get('达人链接', ''),
"user_id": user_id,
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
"ht": config.get('话题', ''),
"index": index,
"path": fpath_obj
})
search_dirs = [target_subdir]
try:
for sub in os.listdir(target_subdir):
p = os.path.join(target_subdir, sub)
if os.path.isdir(p):
search_dirs.append(p)
except (PermissionError, OSError):
pass
for search_dir in search_dirs:
try:
dir_items = os.listdir(search_dir)
except (PermissionError, OSError):
continue
for item_name in dir_items:
if item_name.startswith('.'):
continue
item_path = os.path.join(search_dir, item_name)
try:
path_obj = Path(item_path)
if path_obj.is_file():
base = path_obj.stem
else:
base = item_name
if not self._index_matches_name(index, base):
continue
append_file(item_path)
except (OSError, ValueError) as e:
logger.warning(f"处理 {item_name} 时出错: {e}")
continue
except (OSError, ValueError) as e:
logger.error(f"查找文件失败: {e}", exc_info=True)
return found_files
def execute_task(self):
"""执行任务"""
try:
# 检查是否有Excel导入的配置
if self.configs:
# 检查用户是否已经勾选了任何项
has_user_selection = any(cfg.get('勾选', False) for cfg in self.configs)
# 只有当用户没有勾选任何项时,才自动勾选失败项
if not has_user_selection:
failed_indices = []
for idx, cfg in enumerate(self.configs):
status = cfg.get('情况', '')
if "失败" in status or "错误" in status or "过期" in status or "跳过" in status:
failed_indices.append(idx)
if failed_indices:
for idx, cfg in enumerate(self.configs):
cfg['勾选'] = idx in failed_indices
self.update_table()
self._show_infobar("info", "提示", f"检测到 {len(failed_indices)} 条失败记录,已自动仅选择失败项")
# 如果有Excel配置批量处理
self.execute_batch_from_excel()
return # 批量处理完成后直接返回
# 没有Excel导入的配置提示用户先导入
self._show_infobar("warning", "提示", "请先导入Excel配置文件")
return
except Exception as e:
import traceback
error_detail = traceback.format_exc()
logger.error(f"执行任务失败: {e}\n{error_detail}")
try:
self._show_infobar("error", "错误", f"执行任务失败: {str(e)}")
self.log_text.append(f"执行任务失败: {str(e)}")
self.log_text.append(f"错误详情: {error_detail}")
except:
pass
self._set_task_buttons_state('idle')
self.progress_bar.setVisible(False)
def execute_batch_from_excel(self):
"""从 Excel配置批量执行自动判断相同多多ID的mp4文件批量上传"""
# 执行前强制同步当前页数据,确保手动修改的内容被包含
if not self.use_model_view:
self._sync_configs_from_table()
# 从表格当前页勾选框同步勾选状态到 configs确保执行的是用户当前勾选的行
self._sync_checked_from_table()
# 开始新任务时,从当前数据中统计已有的成功/失败/待执行数量(累计历史数据)
existing_success = 0
existing_failed = 0
existing_pending = 0
for config in self.configs:
status = config.get('情况', '待执行')
if "完成" in status or "成功" in status:
existing_success += 1
elif "失败" in status or "错误" in status or "过期" in status or "跳过" in status:
existing_failed += 1
existing_pending += 1 # 失败的也算待执行
elif "" in status or not status:
existing_pending += 1
self.batch_success_count = existing_success
self.batch_failed_count = existing_failed
self.set_status_cards(pending=existing_pending, running=0, success=existing_success, failed=existing_failed)
# 获取文件夹路径
folder_path = self.folder_path_input.text().strip()
if not folder_path:
folder_path = get_default_folder_path()
if not os.path.exists(folder_path):
self._show_infobar("warning", "警告", f"文件夹路径不存在: {folder_path}")
return
# 筛选勾选的待处理配置(直接从 self.configs 获取最新数据)
checked_indices = [i for i, cfg in enumerate(self.configs) if cfg.get('勾选', False)]
if not checked_indices:
self._show_infobar("warning", "提示", "请先勾选需要上传的数据行")
return
# 分析勾选项
all_configs_with_files = []
configs_to_process = [] # 用于计算间隔时间
for idx in checked_indices:
config = self.configs[idx]
# 跳过状态为「已完成」的任务(仅上传勾选且未完成的项)
status = (config.get('情况') or '').strip()
if '已完成' in status:
continue
# 验证必填
if not config.get('多多id') or not config.get('序号'):
continue
configs_to_process.append({"config_index": idx, "config": config})
if not configs_to_process:
self._show_infobar("warning", "提示", "所选项已完成或数据不完整")
return
# 重置所有要处理的配置状态为"待执行"(确保第二次运行时清除旧状态)
for item in configs_to_process:
idx = item["config_index"]
self.configs[idx]['情况'] = '待执行'
self._update_table_status(idx, "待执行", is_config_index=True)
# 重置任务状态后,重新统计成功/失败/待执行数量以保持一致性
existing_success = 0
existing_failed = 0
existing_pending = 0
for config in self.configs:
status = config.get('情况', '待执行')
if "完成" in status or "成功" in status:
existing_success += 1
elif "失败" in status or "错误" in status or "过期" in status or "跳过" in status:
existing_failed += 1
existing_pending += 1 # 失败的也算待执行
elif "" in status or not status:
existing_pending += 1
self.batch_success_count = existing_success
self.batch_failed_count = existing_failed
self.set_status_cards(pending=existing_pending, success=existing_success, failed=existing_failed)
# 注意:不再在此处调用 _apply_schedule_intervals
# 间隔时间计算已在"更新数据"按钮中完成,用户可能在之后手动修改了时间,应保留用户修改
self.log_text.append("=" * 50)
self.log_text.append(f"开始分析配置,准备批量上传...")
for item in configs_to_process:
idx = item["config_index"]
config = item["config"]
# 从已保存的文件路径中获取文件(由"更新数据"按钮写入)
file_path_str = config.get('文件路径', '').strip()
found_files = []
if file_path_str:
file_paths = [p.strip() for p in file_path_str.split(';') if p.strip()]
for fp in file_paths:
if os.path.exists(fp):
found_files.append({
"url": config.get('达人链接', ''),
"user_id": config.get('多多id', ''),
"time_start": config.get('定时发布', '') if config.get('定时发布') else None,
"ht": config.get('话题', ''),
"index": config.get('序号', ''),
"path": Path(fp)
})
else:
self.log_text.append(f" ⚠ 文件路径不存在,已跳过: {fp}")
if not found_files and not file_path_str:
self.log_text.append(f" 索引 {idx+1} ({config.get('多多id')}): 未找到文件路径,请先点击'更新数据'按钮")
if found_files:
all_configs_with_files.append({
'config': config,
'files': found_files,
'config_index': idx
})
self.log_text.append(f"索引 {idx+1} ({config.get('多多id')}): 准备上传 {len(found_files)} 个项目")
if not all_configs_with_files:
self._show_infobar("warning", "警告", "未找到任何匹配文件,请先点击‘更新数据’按钮")
return
total_tasks = sum(len(item['files']) for item in all_configs_with_files)
self.set_status_cards(pending=total_tasks)
self.set_running_progress(0, total_tasks)
# 按多多ID分组任务
from collections import defaultdict
grouped_by_user_id = defaultdict(list)
for item in all_configs_with_files:
user_id = item['config'].get('多多id', '')
grouped_by_user_id[user_id].append(item)
# 检查线程
if self.worker_thread and self.worker_thread.isRunning():
self._show_infobar("warning", "警告", "已有任务正在执行,请等待完成")
return
# 重置暂停/终止状态
self._is_paused = False
self._is_terminated = False
self._set_task_buttons_state('running')
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
# 构建任务队列
self.batch_task_queue = []
self.current_batch_task_index = 0
self.batch_total_tasks = total_tasks
self.batch_processed = 0
self.batch_pending_tasks = total_tasks
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm']
# 获取用户是否勾选了"批量上传"复选框
use_batch_upload = self.batch_upload_checkbox.isChecked()
self.log_text.append(f"批量上传模式: {'已勾选' if use_batch_upload else '未勾选'}")
# 获取最大批量数(单次上限数)
batch_limit = 5 # 默认值
try:
batch_limit_text = self.batch_limit_input.text().strip()
if batch_limit_text:
batch_limit = int(batch_limit_text)
if batch_limit < 1:
batch_limit = 1
except (ValueError, AttributeError):
batch_limit = 5
self.log_text.append(f"单次上限数: {batch_limit}")
def _skip_expired_and_append(file_info, file_index, group_items, _cfg_indices, into_videos, into_non_videos, file_is_video):
"""若定时过期则更新状态并返回 True失败(定时过期)),否则加入列表并返回 False"""
f_time_start = file_info.get('time_start', '')
if self._is_schedule_time_expired(f_time_start):
self.log_text.append(f" 跳过序号 {file_index}:定时发布时间已过期 ({f_time_start})")
expire_idx = next((it['config_index'] for it in group_items if it['config'].get('序号') == file_index), None)
if expire_idx is not None:
self._update_table_status(expire_idx, "失败(定时过期)", is_config_index=True)
self.configs[expire_idx]['情况'] = '失败(定时过期)'
self.batch_failed_count = getattr(self, "batch_failed_count", 0) + 1
self.set_status_cards(failed=self.batch_failed_count)
return True
if file_is_video:
into_videos.append(file_info)
else:
into_non_videos.append(file_info)
return False
# 按 config 顺序确定多多ID 的迭代顺序先出现的多多ID 先处理)
def _first_config_index(pair):
return min(it['config_index'] for it in pair[1])
sorted_groups = sorted(grouped_by_user_id.items(), key=_first_config_index)
for user_id, items in sorted_groups:
all_files = []
related_config_indices = []
for item in items:
all_files.extend(item['files'])
if item['config_index'] not in related_config_indices:
related_config_indices.append(item['config_index'])
def get_sort_key(file_entry):
try:
return int(file_entry.get('index', '0'))
except (ValueError, TypeError):
return 0
all_files_sorted = sorted(all_files, key=get_sort_key)
first_config = items[0].get('config', {})
if not use_batch_upload:
# 未勾选批量上传按序号一条一条上传先完成当前多多ID的 1,2,3,4 再处理下一个多多ID
i = 0
while i < len(all_files_sorted):
current_file = all_files_sorted[i]
is_video = current_file['path'].is_file() and any(
current_file['path'].suffix.lower() == ext for ext in video_extensions)
f_index = current_file.get('index', '')
if _skip_expired_and_append(current_file, f_index, items, related_config_indices, [], [], is_video):
i += 1
continue
matching_idx = next(
(it['config_index'] for it in items if it['config'].get('序号') == f_index),
related_config_indices[0] if related_config_indices else 0)
matching_config = next(
(it['config'] for it in items if it['config'].get('序号') == f_index),
first_config)
if is_video:
self.batch_task_queue.append({
'type': 'single_video',
'config': matching_config,
'files': [current_file],
'user_id': user_id,
'count': 1,
'config_indices': [matching_idx]
})
else:
self.batch_task_queue.append({
'type': 'image_folder',
'config': matching_config,
'files': [current_file],
'user_id': user_id,
'count': 1,
'config_indices': [matching_idx]
})
i += 1
else:
# 勾选批量上传严格按序号顺序处理。遇到图文立即上传第一次遇到视频时收集该多多ID下全部视频按上限分批上传再继续后续序号图文照常上传
ordered_list = [] # (f, is_video) 按序号顺序,已排除定时过期
for f in all_files_sorted:
is_video = f['path'].is_file() and any(
f['path'].suffix.lower() == ext for ext in video_extensions)
f_index = f.get('index', '')
if _skip_expired_and_append(f, f_index, items, related_config_indices, [], [], is_video):
continue
ordered_list.append((f, is_video))
videos_in_order = [entry for (entry, iv) in ordered_list if iv]
videos_emitted = False
for (f, is_video) in ordered_list:
f_index = f.get('index', '')
if not is_video:
matching_idx = next(
(it['config_index'] for it in items if it['config'].get('序号') == f_index),
related_config_indices[0])
matching_config = next(
(it['config'] for it in items if it['config'].get('序号') == f_index),
first_config)
self.batch_task_queue.append({
'type': 'image_folder',
'config': matching_config,
'files': [f],
'user_id': user_id,
'count': 1,
'config_indices': [matching_idx]
})
else:
if not videos_emitted:
for start in range(0, len(videos_in_order), batch_limit):
chunk = videos_in_order[start:start + batch_limit]
batch_config_indices = []
for fc in chunk:
fi = fc.get('index', '')
match_idx = next((it['config_index'] for it in items if it['config'].get('序号') == fi), None)
if match_idx is not None and match_idx not in batch_config_indices:
batch_config_indices.append(match_idx)
if len(chunk) > 1:
self.batch_task_queue.append({
'type': 'batch_video',
'config': first_config,
'files': chunk,
'user_id': user_id,
'count': len(chunk),
'config_indices': batch_config_indices
})
else:
vid_index = chunk[0].get('index', '')
matching_config = next(
(it['config'] for it in items if it['config'].get('序号') == vid_index),
first_config)
self.batch_task_queue.append({
'type': 'single_video',
'config': matching_config,
'files': chunk,
'user_id': user_id,
'count': 1,
'config_indices': batch_config_indices
})
videos_emitted = True
# 已在该序号前统一发过视频批次,此处跳过
if self.batch_task_queue:
self._process_next_batch_task()
else:
self._show_infobar("warning", "警告", "任务分析失败,未构建有效队列")
self._set_task_buttons_state('idle')
self.progress_bar.setVisible(False)
def _cleanup_worker_thread(self):
"""安全清理工作线程"""
if not self.worker_thread:
return
try:
# 如果线程正在运行,请求停止
if self.worker_thread.isRunning():
# 请求停止(优雅方式)
self.worker_thread.stop()
self.worker_thread.requestInterruption()
# 等待线程结束,设置合理超时
if not self.worker_thread.wait(2000):
logger.warning("工作线程未能在2秒内停止")
# 不使用 terminate(),继续处理
# 安全断开所有信号连接
try:
self.worker_thread.finished.disconnect()
except (TypeError, RuntimeError):
pass # 信号未连接或已断开
try:
self.worker_thread.log_message.disconnect()
except (TypeError, RuntimeError):
pass
try:
self.worker_thread.progress.disconnect()
except (TypeError, RuntimeError):
pass
try:
self.worker_thread.item_result.disconnect()
except (TypeError, RuntimeError):
pass
except Exception as e:
logger.warning(f"清理工作线程时出错: {e}")
finally:
self.worker_thread = None
def _set_task_buttons_state(self, state):
"""设置任务控制按钮的显示状态
state: 'idle' | 'running' | 'paused'
"""
if state == 'idle':
self.execute_btn.setEnabled(True)
self.pause_btn.setVisible(False)
self.resume_btn.setVisible(False)
self.terminate_btn.setVisible(False)
elif state == 'running':
self.execute_btn.setEnabled(False)
self.pause_btn.setVisible(True)
self.resume_btn.setVisible(False)
self.terminate_btn.setVisible(True)
elif state == 'paused':
self.execute_btn.setEnabled(False)
self.pause_btn.setVisible(False)
self.resume_btn.setVisible(True)
self.terminate_btn.setVisible(True)
def pause_task(self):
"""暂停任务:当前正在执行的任务会完成,但不再启动下一个"""
self._is_paused = True
self._set_task_buttons_state('paused')
self.log_text.append("=" * 50)
self.log_text.append("⏸ 任务已暂停,当前正在执行的任务完成后将停止。点击「继续」恢复执行。")
self._show_infobar("warning", "已暂停", "当前任务完成后将暂停,点击「继续」恢复")
def resume_task(self):
"""继续执行暂停的任务"""
self._is_paused = False
self._set_task_buttons_state('running')
self.log_text.append("=" * 50)
self.log_text.append("▶ 任务已恢复,继续执行剩余任务...")
self._show_infobar("success", "已恢复", "继续执行剩余任务")
# 继续处理下一个任务
self._process_next_batch_task()
def terminate_task(self):
"""终止所有任务"""
reply = QMessageBox.question(
self, "确认终止", "确定要终止所有任务吗?当前正在执行的任务将被中断。",
QMessageBox.Yes | QMessageBox.No
)
if reply != QMessageBox.Yes:
return
self._is_terminated = True
self._is_paused = False
self.log_text.append("=" * 50)
self.log_text.append("⏹ 正在终止任务...")
# 停止当前工作线程
self._cleanup_worker_thread()
# 将队列中所有未执行的任务标记为"已终止"
remaining = 0
for i in range(self.current_batch_task_index, len(self.batch_task_queue)):
task = self.batch_task_queue[i]
config_indices = task.get('config_indices', [])
for cfg_idx in config_indices:
try:
if self.configs[cfg_idx].get('情况', '') in ('待执行', '执行中'):
self.configs[cfg_idx]['情况'] = '已终止'
self._update_table_status(cfg_idx, "已终止", is_config_index=True)
remaining += 1
except (IndexError, KeyError):
pass
# 清空任务队列
self.batch_task_queue = []
self.current_batch_task_index = 0
# 重置 UI 状态
self._set_task_buttons_state('idle')
self.progress_bar.setVisible(False)
self._is_terminated = False
self.log_text.append(f"⏹ 任务已终止。{remaining} 个待执行任务被取消。")
self.log_text.append(f" 已完成: {self.batch_success_count} 成功, {self.batch_failed_count} 失败")
self._update_status_statistics()
self._show_infobar("warning", "已终止", f"任务已终止,{remaining} 个任务被取消")
def _process_next_batch_task(self):
"""处理任务队列中的下一个任务异步不阻塞GUI"""
# 检查是否还有任务
if self.current_batch_task_index >= len(self.batch_task_queue):
# 所有任务完成
self.progress_bar.setValue(100)
self.log_text.append("=" * 50)
self.log_text.append(f"所有任务执行完成!共处理 {self.batch_processed} 个文件/文件夹")
self._set_task_buttons_state('idle')
self.progress_bar.setVisible(False)
self.set_running_progress(0, 0)
# 更新状态统计(成功/失败/待执行数量)
self._update_status_statistics()
self._show_infobar("success", "任务完成", f"共处理 {self.batch_processed} 个文件/文件夹")
# 重置任务队列
self.batch_task_queue = []
self.current_batch_task_index = 0
return
# 获取当前任务
task = self.batch_task_queue[self.current_batch_task_index]
task_type = task['type']
config = task['config']
files = task['files']
_ = task['user_id'] # 仅提取以验证存在性
# 根据任务类型处理
try:
# 清理旧线程(如果存在)
self._cleanup_worker_thread()
# 验证 files 参数
if not files or not isinstance(files, list):
raise ValueError(f"无效的 files 参数: {files}")
# 确定是否为批量模式
is_batch = (task_type == 'batch_video' and len(files) > 1)
# 创建并启动工作线程
self.worker_thread = WorkerThread(config, is_batch, files, self)
# 连接信号
self.worker_thread.finished.connect(self._on_batch_task_finished)
self.worker_thread.log_message.connect(self.log_text.append)
self.worker_thread.progress.connect(self.progress_bar.setValue)
self.worker_thread.item_result.connect(self._on_worker_item_result)
# 记录任务信息
if task_type == 'batch_video':
self.log_text.append(f"\n开始批量上传 {len(files)} 个视频文件...")
elif task_type == 'single_video':
self.log_text.append(f"\n开始上传单个视频文件...")
elif task_type == 'image_folder':
idx = task.get('index', 0)
total = task.get('total', 0)
folder_index = task.get('folder_index', '')
self.log_text.append(f"\n开始上传第 {idx}/{total} 个图片文件夹(序号: {folder_index}...")
# 更新表格状态为"执行中"
try:
config_indices = task.get('config_indices', [])
if config_indices:
for cfg_idx in config_indices:
try:
self._update_table_status(cfg_idx, "执行中", is_config_index=True)
except Exception as e:
logger.warning(f"更新状态失败(索引{cfg_idx}: {e}")
except Exception as e:
logger.warning(f"更新执行中状态失败: {e}")
# 启动线程(不阻塞)
try:
self.worker_thread.start()
except Exception as e:
error_msg = f"启动线程失败: {str(e)}"
logger.error(error_msg)
self.log_text.append(f"{error_msg}")
raise
except Exception as e:
error_msg = f"启动任务失败: {str(e)}"
self.log_text.append(f"{error_msg}")
logger.error(error_msg, exc_info=True)
# 即使失败也继续下一个任务,立即更新表格为失败并增加失败条数
count = task.get('count', 1)
config_indices = task.get('config_indices', [])
for cfg_idx in config_indices:
try:
self._update_table_status(cfg_idx, "失败", is_config_index=True)
except Exception:
pass
self.batch_failed_count += len(config_indices) if config_indices else 1
self.batch_processed += count
self.batch_pending_tasks = max(self.batch_pending_tasks - count, 0)
self.set_status_cards(
pending=self.batch_pending_tasks,
success=getattr(self, 'batch_success_count', 0),
failed=getattr(self, 'batch_failed_count', 0)
)
self.set_running_progress(self.batch_processed, self.batch_total_tasks)
self.current_batch_task_index += 1
if not self._is_terminated and not self._is_paused:
QApplication.processEvents()
self._process_next_batch_task()
def _on_batch_task_finished(self, success, message):
"""批量任务完成回调不阻塞GUI"""
try:
# 获取当前任务(注意:此时任务已完成,但索引还未更新)
if self.current_batch_task_index < len(self.batch_task_queue):
task = self.batch_task_queue[self.current_batch_task_index]
task_type = task['type']
count = task.get('count', 1)
config_indices = task.get('config_indices', [])
# 更新进度日志
if success:
if task_type == 'batch_video':
self.log_text.append(f" ✓ 批量上传 {count} 个视频完成")
elif task_type == 'single_video':
self.log_text.append(f" ✓ 单个视频上传完成")
elif task_type == 'image_folder':
self.log_text.append(f" ✓ 图片文件夹上传完成")
else:
reason_text = f"失败原因: {message}" if message else "失败原因: 未知"
self.log_text.append(f" ✗ 任务失败: {reason_text}")
# 对于 batch_video 类型,状态和计数已经在 _on_worker_item_result 中实时更新了
# 所以这里不再重复更新,只需要处理 single_video 和 image_folder 类型
if task_type != 'batch_video':
if success:
# 更新表格为"已完成",并增加成功条数
for cfg_idx in config_indices:
self._update_table_status(cfg_idx, "已完成", is_config_index=True)
self.batch_success_count += len(config_indices)
else:
# 更新表格为"失败",并增加失败条数
for cfg_idx in config_indices:
self._update_table_status(cfg_idx, "失败", is_config_index=True)
self.batch_failed_count += len(config_indices)
# 更新统计single_video 和 image_folder 需要在这里更新)
self.batch_processed += count
self.batch_pending_tasks = max(self.batch_pending_tasks - count, 0)
self.set_status_cards(
pending=self.batch_pending_tasks,
success=self.batch_success_count,
failed=self.batch_failed_count
)
self.set_running_progress(self.batch_processed, self.batch_total_tasks)
# 移动到下一个任务
self.current_batch_task_index += 1
# 获取发布间隔时间(毫秒)
interval_ms = self._get_publish_interval_ms()
if interval_ms > 0 and self.current_batch_task_index < len(self.batch_task_queue):
self.log_text.append(f" 等待 {interval_ms // 1000} 秒后执行下一个任务...")
# 检查是否被终止
if self._is_terminated:
return
# 检查是否被暂停
if self._is_paused:
self.log_text.append("⏸ 当前任务已完成,等待用户点击「继续」...")
self._set_task_buttons_state('paused')
return
# 处理下一个任务使用QTimer延迟包含发布间隔时间
delay_ms = max(100, interval_ms) # 至少100ms确保GUI更新
QTimer.singleShot(delay_ms, self._process_next_batch_task)
except Exception as e:
logger.error(f"批量任务完成回调失败: {e}")
import traceback
traceback.print_exc()
# 即使出错也继续处理下一个任务(除非已暂停或终止)
if not self._is_terminated and not self._is_paused and self.current_batch_task_index < len(self.batch_task_queue):
self.current_batch_task_index += 1
QTimer.singleShot(100, self._process_next_batch_task)
def _get_publish_interval_ms(self):
"""获取发布间隔时间(毫秒)"""
try:
if not hasattr(self, 'publish_interval_input'):
return 0
value_str = self.publish_interval_input.text().strip().lower()
if not value_str or value_str == '0':
return 0
# 解析时间格式:数字(秒)、30s(秒)、2m(分钟)
if value_str.endswith('m') or value_str.endswith('分钟'):
# 分钟
num_str = value_str.replace('m', '').replace('分钟', '').strip()
return int(float(num_str) * 60 * 1000)
elif value_str.endswith('s') or value_str.endswith(''):
# 秒
num_str = value_str.replace('s', '').replace('', '').strip()
return int(float(num_str) * 1000)
else:
# 默认为秒
return int(float(value_str) * 1000)
except (ValueError, TypeError):
return 0
def _rebuild_row_map(self):
"""重建 多多ID+序号 -> 表格行号 映射(仅针对当前页可见行)"""
m = {}
try:
if not hasattr(self, "config_table") or self.use_model_view:
self._row_map_by_user_index = {}
return
for r in range(self.config_table.rowCount()):
# 多多ID在第1列序号在第2列第0列是勾选框
user_id = self.get_cell_text(r, 1)
idx = self.get_cell_text(r, 2)
if user_id and idx:
m[(user_id, idx)] = r
except Exception:
pass
self._row_map_by_user_index = m
def _on_worker_item_result(self, payload):
"""接收自动化侧逐条结果 - 仅用于 batch_video 类型任务的实时状态更新"""
try:
if not isinstance(payload, dict):
return
user_id = str(payload.get("user_id", "")).strip()
idx = str(payload.get("index", "")).strip()
ok = bool(payload.get("ok"))
reason = str(payload.get("reason", "")).strip()
name = str(payload.get("name", "")).strip()
if not user_id:
return
# 检查当前任务类型,只有 batch_video 类型才在这里更新计数
# single_video 和 image_folder 类型在 _on_batch_task_finished 中更新
is_batch_video_task = False
if hasattr(self, 'current_batch_task_index') and self.current_batch_task_index < len(self.batch_task_queue):
current_task = self.batch_task_queue[self.current_batch_task_index]
is_batch_video_task = current_task.get('type') == 'batch_video'
# 全局搜索匹配的多多ID和序号
found_config_idx = -1
updated_count = 0 # 记录更新了多少条配置
if idx: # 如果有序号优先通过多多ID和序号匹配
for i, cfg in enumerate(self.configs):
if str(cfg.get("多多id")) == user_id and str(cfg.get("序号")) == idx:
found_config_idx = i
break
# 如果通过序号没找到尝试通过多多ID匹配
if found_config_idx == -1:
# 查找所有匹配多多ID的配置
matching_indices = []
for i, cfg in enumerate(self.configs):
if str(cfg.get("多多id")) == user_id:
matching_indices.append(i)
# 如果只找到一个匹配的配置,直接使用它
if len(matching_indices) == 1:
found_config_idx = matching_indices[0]
# 如果有多个匹配的配置,且是批量视频任务,更新相关配置的状态
elif len(matching_indices) > 1 and is_batch_video_task:
current_task = self.batch_task_queue[self.current_batch_task_index]
task_config_indices = current_task.get('config_indices', [])
# 只更新当前任务相关的配置状态
for cfg_idx in task_config_indices:
if cfg_idx in matching_indices:
self._update_table_status(cfg_idx, "已完成" if ok else "失败", is_config_index=True)
updated_count += 1
found_config_idx = -2 # 标记为已处理(多个配置)
if found_config_idx >= 0:
# 单个配置匹配成功,更新表格状态
self._update_table_status(found_config_idx, "已完成" if ok else "失败", is_config_index=True)
updated_count = 1
label = name if name else payload.get("path", "") or ""
reason_text = f"失败原因: {reason}" if (not ok and reason) else reason
self.log_text.append(f"[结果] {user_id}-{idx}: {'' if ok else ''} {label} {reason_text}")
elif found_config_idx == -2:
# 多个配置已处理,只记录日志
label = name if name else payload.get("path", "") or ""
reason_text = f"失败原因: {reason}" if (not ok and reason) else reason
self.log_text.append(f"[结果] {user_id}-{idx}: {'' if ok else ''} {label} {reason_text}")
else:
reason_text = f"失败原因: {reason}" if (not ok and reason) else reason
self.log_text.append(f"[结果] {user_id}-{idx}: ok={ok} {reason_text} (未在列表中找到匹配项)")
# 只有 batch_video 类型任务才在这里更新计数(实时反馈)
# single_video 和 image_folder 在 _on_batch_task_finished 中更新,避免重复计数
if is_batch_video_task and updated_count > 0:
if ok:
self.batch_success_count += updated_count
else:
self.batch_failed_count += updated_count
# 更新待处理数量
self.batch_pending_tasks = max(self.batch_pending_tasks - updated_count, 0)
# 立即刷新状态卡片显示
self.set_status_cards(
pending=self.batch_pending_tasks,
success=self.batch_success_count,
failed=self.batch_failed_count
)
# 更新进度
self.batch_processed += updated_count
self.set_running_progress(self.batch_processed, self.batch_total_tasks)
except Exception as e:
logger.warning(f"处理单条结果失败: {e}")
@staticmethod
def count_videos_in_folder(folder_path, index):
"""统计文件夹中匹配序号的视频文件数量与main.py逻辑一致"""
count = 0
try:
# 遍历最外层文件夹下的所有子文件夹与main.py逻辑一致
for file in os.listdir(folder_path): # 获取文件夹下所有的文件夹
file_path = os.path.join(folder_path, file) # 拼接文件夹
# 检查是否为目录,跳过文件(如.lnk快捷方式
if not os.path.isdir(file_path):
continue
files = os.listdir(file_path) # 获取用户id下的文件
for file_name in files:
# 检查是否是视频文件(.mp4
if ".mp4" in file_name:
# 用"-"分割文件名,检查第一部分是否等于序号
# 文件名格式4-茶叶蛋大冒险-.mp4 -> ['4', '茶叶蛋大冒险', '', 'mp4']
file_names = file_name.split("-")
if len(file_names) > 0 and file_names[0] == str(index):
path = Path(os.path.join(file_path, file_name))
# 判断是否为文件
if path.is_file():
count += 1
else:
# 如果是文件夹,统计其中的文件
for sub_file in os.listdir(path):
sub_path = Path(os.path.join(path, sub_file))
if sub_path.is_file():
count += 1
except (OSError, PermissionError) as e:
logger.error(f"统计视频文件失败: {e}")
return count
def on_task_finished(self, success, message):
"""任务完成回调(单个任务)"""
self.progress_bar.setValue(100)
self.set_running_progress(0, 0)
if success:
self._show_infobar("success", "成功", message)
# 更新表格状态为"已完成"
# 查找对应的行(通过配置信息)
self._update_single_task_status(success)
else:
self._show_infobar("error", "失败", message)
# 更新表格状态为"失败"
self._update_single_task_status(success)
# 更新状态统计(成功/失败/待执行数量)
self._update_status_statistics()
# 恢复按钮
self._set_task_buttons_state('idle')
self.log_text.append(f"任务完成: {message}")
self.log_text.append("=" * 50)
def _update_single_task_status_start(self, config):
"""更新单个任务的状态为"执行中"(根据配置查找对应的表格行)"""
try:
# 检查是否有表格
if not hasattr(self, 'config_table') or self.config_table.rowCount() == 0:
return
user_id = config.get('多多id', '')
index = config.get('序号', '')
if not user_id or not index:
return
# 在表格中查找匹配的行
for row_idx in range(self.config_table.rowCount()):
try:
row_user_id = self.get_cell_text(row_idx, 0)
row_index = self.get_cell_text(row_idx, 1)
if row_user_id == user_id and row_index == index:
self._update_table_status(row_idx, "执行中")
break
except Exception as e:
logger.warning(f"更新状态时出错(行{row_idx}: {e}")
continue
except Exception as e:
logger.error(f"更新任务状态失败: {e}")
import traceback
traceback.print_exc()
def _update_single_task_status(self, success):
"""更新单个任务的状态(根据当前配置查找对应的表格行)"""
try:
# 检查是否有表格
if not hasattr(self, 'config_table') or self.config_table.rowCount() == 0:
return
# 获取当前配置从worker_thread中获取
if not self.worker_thread:
return
config = self.worker_thread.config_data
user_id = config.get('多多id', '')
index = config.get('序号', '')
if not user_id or not index:
return
# 在表格中查找匹配的行
for row_idx in range(self.config_table.rowCount()):
try:
row_user_id = self.get_cell_text(row_idx, 0)
row_index = self.get_cell_text(row_idx, 1)
if row_user_id == user_id and row_index == index:
status = "已完成" if success else "失败"
self._update_table_status(row_idx, status)
break
except Exception as e:
logger.warning(f"更新状态时出错(行{row_idx}: {e}")
continue
except Exception as e:
logger.error(f"更新任务状态失败: {e}")
import traceback
traceback.print_exc()
def get_cell_text(self, row, col):
"""获取表格单元格文本"""
try:
if not hasattr(self, 'config_table'):
return ""
if row < 0 or row >= self.config_table.rowCount():
return ""
item = self.config_table.item(row, col)
return item.text().strip() if item else ""
except Exception as e:
logger.warning(f"获取单元格文本失败(行{row},列{col}: {e}")
return ""
def closeEvent(self, event):
"""关闭事件"""
if self._is_closing:
event.accept()
return
if self.worker_thread and self.worker_thread.isRunning():
reply = QMessageBox.question(
self, "确认", "任务正在执行中,确定要退出吗?",
QMessageBox.Yes | QMessageBox.No
)
if reply == QMessageBox.Yes:
self._is_closing = True
# 使用统一的线程清理方法
self._cleanup_worker_thread()
event.accept()
else:
event.ignore()
else:
event.accept()
def exception_handler(exc_type, exc_value, exc_traceback):
"""全局异常处理器"""
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
# 防止递归:如果异常处理器本身出错,使用默认处理器
if exc_type is RecursionError:
# 对于递归错误,直接打印到控制台,避免再次触发递归
print(f"递归错误: {exc_value}")
print("这通常是由于无限递归调用导致的")
return
try:
import traceback
# 限制堆栈深度,避免递归错误
error_msg = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback, limit=50))
logger.critical(f"未捕获的异常:\n{error_msg}")
# 尝试显示错误对话框
try:
from PyQt5.QtWidgets import QMessageBox
msg = QMessageBox()
msg.setIcon(QMessageBox.Critical)
msg.setWindowTitle("程序错误")
msg.setText("程序发生未处理的错误")
# 限制详细文本长度
detailed_text = error_msg[:2000] if len(error_msg) > 2000 else error_msg
msg.setDetailedText(detailed_text)
msg.exec_()
except:
# 如果无法显示对话框,至少打印到控制台
print(f"未捕获的异常:\n{error_msg[:1000]}") # 限制输出长度
except Exception:
# 如果异常处理器本身出错,使用默认处理器
sys.__excepthook__(exc_type, exc_value, exc_traceback)
def main():
# 设置全局异常处理器
sys.excepthook = exception_handler
app = QApplication(sys.argv)
# noinspection SpellCheckingInspection
# 解决 macOS 上缺失 Segoe UI 字体导致的警告
if sys.platform == "darwin":
font = QFont(".AppleSystemUIFont", 10)
font.setFamilies([".AppleSystemUIFont", "PingFang SC", "Helvetica Neue", "Arial"])
app.setFont(font)
setTheme(Theme.LIGHT)
try:
window = MainWindow()
window.show()
sys.exit(app.exec_())
except Exception as e:
import traceback
error_detail = traceback.format_exc()
logger.critical(f"程序启动失败: {e}\n{error_detail}")
print(f"程序启动失败: {e}\n{error_detail}")
sys.exit(1)
if __name__ == '__main__':
main()