Files
haha/config_gui.py
2026-01-16 12:46:29 +08:00

1026 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
import time
import json
import traceback
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QLabel, QLineEdit, QPushButton,
QTextEdit, QFileDialog, QMessageBox, QTableWidget,
QTableWidgetItem, QCheckBox, QSpinBox, QDateTimeEdit,
QGroupBox, QProgressBar, QComboBox, QHeaderView,
QAbstractItemView, QMenu, QAction, QDialog, QFormLayout,
QDialogButtonBox, QSpinBox, QFileSystemModel, QTreeView)
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QDateTime, QTimer
from PyQt5.QtGui import QFont, QColor, QIcon
from main import Pdd
class PublishThread(QThread):
"""发布任务线程"""
progress = pyqtSignal(str)
finished = pyqtSignal(bool, str)
update_progress = pyqtSignal(int, int) # 当前进度, 总进度
def __init__(self, configs):
super().__init__()
self.configs = configs # 配置列表
self.is_running = True
def stop(self):
self.is_running = False
def run(self):
try:
total = len(self.configs)
success_count = 0
fail_count = 0
total_folders = 0
processed_folders = 0
# 计算总文件夹数
for config in self.configs:
try:
folder_path = str(config['文件路径'])
if os.path.exists(folder_path):
folders = [f for f in os.listdir(folder_path)
if os.path.isdir(os.path.join(folder_path, f))]
quantity = int(config.get('数量', 1))
total_folders += min(len(folders), quantity)
except:
pass
for idx, config in enumerate(self.configs):
if not self.is_running:
self.progress.emit("任务已停止")
break
try:
# 数据验证
user_id = str(config['用户ID']).strip()
if not user_id:
self.progress.emit(f"✗ 配置 {idx+1}: 用户ID为空")
fail_count += 1
continue
topics = str(config.get('话题', ''))
time_start = config.get('定时发布', '')
interval = int(config.get('间隔时间', 30)) # 分钟
folder_path = str(config['文件路径']).strip()
quantity = int(config.get('数量', 1))
url = str(config.get('达人链接', '')).strip() # 获取达人链接
# 验证文件路径
if not folder_path:
self.progress.emit(f"✗ 配置 {idx+1}: 文件路径为空")
fail_count += 1
continue
# 转换话题格式: 优先使用中文"—"分隔,每个前面加#号,最后加换行符
if topics and topics != 'nan':
topic_str = str(topics).strip()
# 优先使用中文"—"em dashU+2014
if '' in topic_str:
topic_list = [t.strip() for t in topic_str.split('') if t.strip()]
# 其次使用全角""fullwidth hyphen-minusU+FF0D
elif '' in topic_str:
topic_list = [t.strip() for t in topic_str.split('') if t.strip()]
# 最后使用英文"-"
elif '-' in topic_str:
topic_list = [t.strip() for t in topic_str.split('-') if t.strip()]
else:
topic_list = [topic_str] if topic_str else []
# 每个话题前加#号用空格分隔最后加换行符与main.py保持一致
ht = ' '.join([f'#{topic}' for topic in topic_list if topic]) + "\n"
else:
ht = "\n"
# 获取文件夹列表
if not os.path.exists(folder_path):
self.progress.emit(f"✗ 配置 {idx+1}: 文件路径不存在: {folder_path}")
fail_count += 1
continue
folders = []
try:
for item in os.listdir(folder_path):
item_path = os.path.join(folder_path, item)
if os.path.isdir(item_path):
folders.append(item_path)
except PermissionError:
self.progress.emit(f"✗ 配置 {idx+1}: 无权限访问文件夹: {folder_path}")
fail_count += 1
continue
except Exception as e:
self.progress.emit(f"✗ 配置 {idx+1}: 读取文件夹失败: {str(e)}")
fail_count += 1
continue
folders = sorted(folders)[:quantity]
if not folders:
self.progress.emit(f"✗ 配置 {idx+1}: 未找到文件夹")
fail_count += 1
continue
# 处理每个文件夹
for folder_idx, folder_path_item in enumerate(folders):
if not self.is_running:
break
folder_name = os.path.basename(folder_path_item)
title = folder_name
# 计算定时发布时间
publish_time = None
if time_start and time_start != 'nan' and time_start.strip():
try:
if folder_idx == 0:
publish_time = time_start
else:
base_time = datetime.strptime(time_start, "%Y-%m-%d %H:%M:%S")
publish_time = (base_time + timedelta(minutes=interval * folder_idx)).strftime("%Y-%m-%d %H:%M:%S")
except ValueError as e:
self.progress.emit(f"⚠ 配置 {idx+1}: 时间格式错误,将不设置定时发布: {str(e)}")
publish_time = None
self.progress.emit(f"配置 {idx+1}/{total} - 文件夹 {folder_idx+1}/{len(folders)}: {folder_name}")
# 创建Pdd实例并执行
try:
pdd = Pdd(
url=url, # 使用达人链接,用于绑定任务
user_id=user_id,
time_start=publish_time,
title=title,
ht=ht
)
pdd.action(folder_path=folder_path_item)
self.progress.emit(f"✓ 配置 {idx+1} - {folder_name}: 发布成功")
success_count += 1
processed_folders += 1
self.update_progress.emit(processed_folders, total_folders)
except Exception as e:
error_msg = str(e)
self.progress.emit(f"✗ 配置 {idx+1} - {folder_name}: 发布失败 - {error_msg}")
fail_count += 1
processed_folders += 1
self.update_progress.emit(processed_folders, total_folders)
# 如果不是最后一个文件夹,等待间隔时间
if folder_idx < len(folders) - 1 and self.is_running:
self.progress.emit(f"等待 {interval} 分钟...")
for _ in range(interval * 60):
if not self.is_running:
break
time.sleep(1)
# 配置之间的间隔
if idx < total - 1 and self.is_running:
time.sleep(5) # 配置之间等待5秒
except Exception as e:
error_detail = traceback.format_exc()
self.progress.emit(f"✗ 配置 {idx+1} 处理出错: {str(e)}")
self.progress.emit(f"错误详情: {error_detail}")
fail_count += 1
result_msg = f"完成! 成功: {success_count}, 失败: {fail_count}, 总计: {total}"
self.finished.emit(fail_count == 0, result_msg)
except Exception as e:
error_detail = traceback.format_exc()
self.progress.emit(f"执行出错: {str(e)}")
self.progress.emit(f"错误详情: {error_detail}")
self.finished.emit(False, f"执行出错: {str(e)}")
class ConfigGUI(QMainWindow):
def __init__(self):
super().__init__()
self.configs = [] # 配置列表
self.publish_thread = None
self.config_file = "config_backup.json" # 自动保存文件
self.init_ui()
self.load_auto_save() # 加载自动保存的配置
def init_ui(self):
self.setWindowTitle('发布配置工具 - 批量版 v2.0')
self.setGeometry(100, 100, 1500, 950)
# 主窗口部件
main_widget = QWidget()
self.setCentralWidget(main_widget)
main_layout = QVBoxLayout(main_widget)
# 标题
title_label = QLabel('发布配置工具 - 批量版 v2.0')
title_label.setFont(QFont('Arial', 16, QFont.Bold))
title_label.setAlignment(Qt.AlignCenter)
main_layout.addWidget(title_label)
# 工具栏
toolbar_layout = QHBoxLayout()
import_btn = QPushButton('导入Excel配置')
import_btn.clicked.connect(self.import_config)
import_btn.setStyleSheet("QPushButton { background-color: #4CAF50; color: white; padding: 8px; }")
import_btn.setToolTip('从Excel文件导入配置')
toolbar_layout.addWidget(import_btn)
add_btn = QPushButton('添加配置')
add_btn.clicked.connect(self.add_config)
add_btn.setStyleSheet("QPushButton { background-color: #2196F3; color: white; padding: 8px; }")
add_btn.setToolTip('添加一条新配置')
toolbar_layout.addWidget(add_btn)
delete_btn = QPushButton('删除选中')
delete_btn.clicked.connect(self.delete_selected)
delete_btn.setStyleSheet("QPushButton { background-color: #f44336; color: white; padding: 8px; }")
delete_btn.setToolTip('删除选中的配置行')
toolbar_layout.addWidget(delete_btn)
duplicate_btn = QPushButton('复制选中')
duplicate_btn.clicked.connect(self.duplicate_selected)
duplicate_btn.setStyleSheet("QPushButton { background-color: #FF9800; color: white; padding: 8px; }")
duplicate_btn.setToolTip('复制选中的配置行')
toolbar_layout.addWidget(duplicate_btn)
save_btn = QPushButton('导出Excel')
save_btn.clicked.connect(self.export_config)
save_btn.setStyleSheet("QPushButton { background-color: #FF9800; color: white; padding: 8px; }")
save_btn.setToolTip('导出配置到Excel文件')
toolbar_layout.addWidget(save_btn)
clear_btn = QPushButton('清空配置')
clear_btn.clicked.connect(self.clear_all)
clear_btn.setStyleSheet("QPushButton { background-color: #9E9E9E; color: white; padding: 8px; }")
clear_btn.setToolTip('清空所有配置')
toolbar_layout.addWidget(clear_btn)
toolbar_layout.addStretch()
validate_btn = QPushButton('验证配置')
validate_btn.clicked.connect(self.validate_configs)
validate_btn.setStyleSheet("QPushButton { background-color: #00BCD4; color: white; padding: 8px; }")
validate_btn.setToolTip('验证所有配置的有效性')
toolbar_layout.addWidget(validate_btn)
start_btn = QPushButton('开始批量发布')
start_btn.clicked.connect(self.start_publish)
start_btn.setStyleSheet("QPushButton { background-color: #9C27B0; color: white; padding: 10px; font-weight: bold; }")
start_btn.setToolTip('开始执行批量发布任务')
toolbar_layout.addWidget(start_btn)
stop_btn = QPushButton('停止')
stop_btn.clicked.connect(self.stop_publish)
stop_btn.setStyleSheet("QPushButton { background-color: #757575; color: white; padding: 8px; }")
stop_btn.setEnabled(False)
stop_btn.setToolTip('停止当前执行的任务')
self.stop_btn = stop_btn
toolbar_layout.addWidget(stop_btn)
main_layout.addLayout(toolbar_layout)
# 配置表格
table_group = QGroupBox('配置列表')
table_layout = QVBoxLayout()
# 统计信息
stats_layout = QHBoxLayout()
self.stats_label = QLabel('总计: 0 条配置')
self.stats_label.setStyleSheet("QLabel { color: #666; font-weight: bold; }")
stats_layout.addWidget(self.stats_label)
stats_layout.addStretch()
table_layout.addLayout(stats_layout)
self.table = QTableWidget()
self.table.setColumnCount(9) # 8个显示列 + 1个隐藏的状态列
self.table.setHorizontalHeaderLabels([
'用户ID', '文件路径', '话题(以中文"-"分隔)', '定时发布', '间隔时间', '达人链接', '数量', '情况', '状态'
])
# 隐藏状态列
self.table.setColumnHidden(8, True)
# 设置表格属性
self.table.setSelectionBehavior(QAbstractItemView.SelectRows)
self.table.setSelectionMode(QAbstractItemView.ExtendedSelection)
self.table.setEditTriggers(QAbstractItemView.DoubleClicked | QAbstractItemView.SelectedClicked)
self.table.horizontalHeader().setStretchLastSection(True)
self.table.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive)
self.table.setAlternatingRowColors(True)
# 连接双击事件,用于编辑日期时间
self.table.cellDoubleClicked.connect(self.on_cell_double_clicked)
# 连接单元格变化事件,自动保存
self.table.itemChanged.connect(self.on_item_changed)
# 设置列宽
self.table.setColumnWidth(0, 100) # 用户ID
self.table.setColumnWidth(1, 300) # 文件路径
self.table.setColumnWidth(2, 250) # 话题(以中文"-"分隔)
self.table.setColumnWidth(3, 150) # 定时发布
self.table.setColumnWidth(4, 100) # 间隔时间
self.table.setColumnWidth(5, 350) # 达人链接(用于绑定任务)
self.table.setColumnWidth(6, 60) # 数量
self.table.setColumnWidth(7, 100) # 情况
# 右键菜单
self.table.setContextMenuPolicy(Qt.CustomContextMenu)
self.table.customContextMenuRequested.connect(self.show_context_menu)
table_layout.addWidget(self.table)
table_group.setLayout(table_layout)
main_layout.addWidget(table_group)
# 进度条
progress_layout = QHBoxLayout()
progress_layout.addWidget(QLabel('进度:'))
self.progress_bar = QProgressBar()
self.progress_bar.setVisible(False)
self.progress_bar.setFormat("%p% (%v/%m)")
progress_layout.addWidget(self.progress_bar)
main_layout.addLayout(progress_layout)
# 日志输出
log_group = QGroupBox('执行日志')
log_layout = QVBoxLayout()
log_toolbar = QHBoxLayout()
clear_log_btn = QPushButton('清空日志')
clear_log_btn.clicked.connect(self.clear_log)
clear_log_btn.setStyleSheet("QPushButton { padding: 4px; }")
log_toolbar.addWidget(clear_log_btn)
log_toolbar.addStretch()
log_layout.addLayout(log_toolbar)
self.log_output = QTextEdit()
self.log_output.setReadOnly(True)
self.log_output.setMaximumHeight(200)
log_layout.addWidget(self.log_output)
log_group.setLayout(log_layout)
main_layout.addWidget(log_group)
# 自动保存定时器
self.auto_save_timer = QTimer()
self.auto_save_timer.timeout.connect(self.auto_save)
self.auto_save_timer.start(30000) # 每30秒自动保存一次
def show_context_menu(self, position):
menu = QMenu(self)
edit_action = QAction('编辑文件路径', self)
edit_action.triggered.connect(self.edit_selected)
menu.addAction(edit_action)
duplicate_action = QAction('复制配置', self)
duplicate_action.triggered.connect(self.duplicate_selected)
menu.addAction(duplicate_action)
menu.addSeparator()
delete_action = QAction('删除', self)
delete_action.triggered.connect(self.delete_selected)
menu.addAction(delete_action)
menu.exec_(self.table.viewport().mapToGlobal(position))
def create_datetime_editor(self, datetime_str=None):
"""创建日期时间编辑器控件"""
editor = QDateTimeEdit()
editor.setCalendarPopup(True)
editor.setDisplayFormat("yyyy-MM-dd HH:mm:ss")
editor.setDateTime(QDateTime.currentDateTime().addDays(1))
editor.setMinimumDateTime(QDateTime.currentDateTime())
if datetime_str and datetime_str.strip():
try:
dt = QDateTime.fromString(datetime_str, "yyyy-MM-dd HH:mm:ss")
if dt.isValid() and dt >= QDateTime.currentDateTime():
editor.setDateTime(dt)
except:
pass
return editor
def add_config(self):
try:
row = self.table.rowCount()
self.table.insertRow(row)
# 用户ID
self.table.setItem(row, 0, QTableWidgetItem(''))
# 文件路径
path_item = QTableWidgetItem('')
self.table.setItem(row, 1, path_item)
# 话题
self.table.setItem(row, 2, QTableWidgetItem(''))
# 定时发布 - 使用日期时间选择器
datetime_editor = self.create_datetime_editor()
self.table.setCellWidget(row, 3, datetime_editor)
# 间隔时间
self.table.setItem(row, 4, QTableWidgetItem('30'))
# 达人链接
self.table.setItem(row, 5, QTableWidgetItem(''))
# 数量
self.table.setItem(row, 6, QTableWidgetItem('1'))
# 情况
self.table.setItem(row, 7, QTableWidgetItem(''))
# 状态(隐藏列,用于内部状态跟踪)
status_item = QTableWidgetItem('待执行')
self.table.setItem(row, 8, status_item)
# 选中新行
self.table.selectRow(row)
self.update_stats()
except Exception as e:
self.log(f"✗ 添加配置失败: {str(e)}")
QMessageBox.critical(self, '错误', f'添加配置失败: {str(e)}')
def duplicate_selected(self):
selected_rows = set()
for item in self.table.selectedItems():
selected_rows.add(item.row())
if not selected_rows:
QMessageBox.warning(self, '警告', '请先选择要复制的行')
return
try:
for row in sorted(selected_rows):
new_row = self.table.rowCount()
self.table.insertRow(new_row)
# 复制所有列的数据
for col in range(8): # 不包括状态列
if col == 3: # 定时发布列,需要复制控件
old_widget = self.table.cellWidget(row, 3)
if old_widget and isinstance(old_widget, QDateTimeEdit):
datetime_str = old_widget.dateTime().toString("yyyy-MM-dd HH:mm:ss")
new_widget = self.create_datetime_editor(datetime_str)
self.table.setCellWidget(new_row, 3, new_widget)
else:
old_item = self.table.item(row, col)
if old_item:
new_item = QTableWidgetItem(old_item.text())
self.table.setItem(new_row, col, new_item)
else:
self.table.setItem(new_row, col, QTableWidgetItem(''))
# 状态列
status_item = QTableWidgetItem('待执行')
self.table.setItem(new_row, 8, status_item)
self.log(f"✓ 已复制 {len(selected_rows)} 行配置")
self.update_stats()
except Exception as e:
self.log(f"✗ 复制配置失败: {str(e)}")
QMessageBox.critical(self, '错误', f'复制配置失败: {str(e)}')
def clear_all(self):
if self.table.rowCount() == 0:
return
reply = QMessageBox.question(
self, '确认清空',
f'确定要清空所有 {self.table.rowCount()} 条配置吗?',
QMessageBox.Yes | QMessageBox.No
)
if reply == QMessageBox.Yes:
self.table.setRowCount(0)
self.log("已清空所有配置")
self.update_stats()
def validate_configs(self):
"""验证所有配置的有效性"""
errors = []
warnings = []
for row in range(self.table.rowCount()):
# 检查用户ID
user_id_item = self.table.item(row, 0)
user_id = user_id_item.text().strip() if user_id_item else ''
if not user_id:
errors.append(f"{row+1} 行: 用户ID为空")
# 检查文件路径
path_item = self.table.item(row, 1)
file_path = path_item.text().strip() if path_item else ''
if not file_path:
errors.append(f"{row+1} 行: 文件路径为空")
elif not os.path.exists(file_path):
errors.append(f"{row+1} 行: 文件路径不存在: {file_path}")
else:
# 检查文件夹数量
try:
folders = [f for f in os.listdir(file_path)
if os.path.isdir(os.path.join(file_path, f))]
quantity_item = self.table.item(row, 6)
quantity = int(quantity_item.text()) if quantity_item else 1
if quantity > len(folders):
warnings.append(f"{row+1} 行: 数量({quantity})大于可用文件夹数({len(folders)})")
except Exception as e:
warnings.append(f"{row+1} 行: 无法读取文件夹: {str(e)}")
# 检查间隔时间
interval_item = self.table.item(row, 4)
interval = interval_item.text().strip() if interval_item else '30'
try:
interval_int = int(interval)
if interval_int < 1:
errors.append(f"{row+1} 行: 间隔时间必须大于0")
except:
errors.append(f"{row+1} 行: 间隔时间格式错误")
# 检查数量
quantity_item = self.table.item(row, 6)
quantity = quantity_item.text().strip() if quantity_item else '1'
try:
quantity_int = int(quantity)
if quantity_int < 1:
errors.append(f"{row+1} 行: 数量必须大于0")
except:
errors.append(f"{row+1} 行: 数量格式错误")
# 显示验证结果
if errors or warnings:
msg = ""
if errors:
msg += "错误:\n" + "\n".join(errors) + "\n\n"
if warnings:
msg += "警告:\n" + "\n".join(warnings)
QMessageBox.warning(self, '配置验证结果', msg)
else:
QMessageBox.information(self, '配置验证', '所有配置验证通过!')
def delete_selected(self):
selected_rows = set()
for item in self.table.selectedItems():
selected_rows.add(item.row())
if not selected_rows:
QMessageBox.warning(self, '警告', '请先选择要删除的行')
return
reply = QMessageBox.question(
self, '确认删除',
f'确定要删除 {len(selected_rows)} 行配置吗?',
QMessageBox.Yes | QMessageBox.No
)
if reply == QMessageBox.Yes:
try:
# 从后往前删除,避免索引变化
for row in sorted(selected_rows, reverse=True):
self.table.removeRow(row)
self.log(f"已删除 {len(selected_rows)} 行配置")
self.update_stats()
except Exception as e:
self.log(f"✗ 删除失败: {str(e)}")
QMessageBox.critical(self, '错误', f'删除失败: {str(e)}')
def edit_selected(self):
selected_rows = set()
for item in self.table.selectedItems():
selected_rows.add(item.row())
if not selected_rows or len(selected_rows) > 1:
QMessageBox.warning(self, '警告', '请选择一行进行编辑')
return
row = list(selected_rows)[0]
# 打开文件路径选择对话框
folder = QFileDialog.getExistingDirectory(self, '选择文件夹')
if folder:
self.table.setItem(row, 1, QTableWidgetItem(folder))
self.log(f"已更新第 {row+1} 行的文件路径")
def on_cell_double_clicked(self, row, column):
"""处理单元格双击事件"""
if column == 1: # 文件路径列
folder = QFileDialog.getExistingDirectory(self, '选择文件夹')
if folder:
self.table.setItem(row, 1, QTableWidgetItem(folder))
elif column == 3: # 定时发布列
# 确保有日期时间选择器
datetime_widget = self.table.cellWidget(row, 3)
if not datetime_widget or not isinstance(datetime_widget, QDateTimeEdit):
# 如果没有,创建一个
datetime_editor = self.create_datetime_editor()
self.table.setCellWidget(row, 3, datetime_editor)
datetime_editor.showPopup() # 显示日期选择器
else:
datetime_widget.showPopup() # 显示日期选择器
def on_item_changed(self, item):
"""处理单元格内容变化,自动保存"""
self.update_stats()
def update_stats(self):
"""更新统计信息"""
count = self.table.rowCount()
self.stats_label.setText(f'总计: {count} 条配置')
def edit_datetime(self, row):
"""编辑指定行的日期时间"""
datetime_widget = self.table.cellWidget(row, 3)
if datetime_widget and isinstance(datetime_widget, QDateTimeEdit):
# 如果已有控件,直接使用
return datetime_widget
else:
# 创建新的日期时间选择器
datetime_editor = self.create_datetime_editor()
self.table.setCellWidget(row, 3, datetime_editor)
return datetime_editor
def import_config(self):
file_path, _ = QFileDialog.getOpenFileName(
self, '选择Excel配置文件', '', 'Excel Files (*.xlsx *.xls)'
)
if not file_path:
return
try:
# 读取Excel文件
df = pd.read_excel(file_path)
# 检查必要的列(支持完整列名或简化列名)
required_columns = ['用户ID', '文件路径', '定时发布', '间隔时间', '达人链接', '数量', '情况']
# 话题列可以是完整名称或简化名称
topic_columns = ['话题(以中文"-"分隔)', '话题']
has_topic = any(col in df.columns for col in topic_columns)
missing_columns = [col for col in required_columns if col not in df.columns]
if not has_topic:
missing_columns.append('话题(以中文"-"分隔)或话题')
if missing_columns:
QMessageBox.warning(self, '错误', f'Excel文件缺少以下列: {", ".join(missing_columns)}')
return
# 询问是否清空现有配置
if self.table.rowCount() > 0:
reply = QMessageBox.question(
self, '确认',
'是否清空现有配置?\n选择""将清空现有配置,选择""将追加到现有配置后。',
QMessageBox.Yes | QMessageBox.No | QMessageBox.Cancel
)
if reply == QMessageBox.Cancel:
return
elif reply == QMessageBox.Yes:
self.table.setRowCount(0)
# 导入所有行
imported_count = 0
for idx, row in df.iterrows():
try:
table_row = self.table.rowCount()
self.table.insertRow(table_row)
# 填充数据
self.table.setItem(table_row, 0, QTableWidgetItem(str(row['用户ID'])))
self.table.setItem(table_row, 1, QTableWidgetItem(str(row['文件路径'])))
# 处理话题字段(支持完整列名或简化列名)
if '话题(以中文"-"分隔)' in df.columns:
topic_col = '话题(以中文"-"分隔)'
elif '话题' in df.columns:
topic_col = '话题'
else:
topic_col = None
topic_value = str(row[topic_col]) if topic_col and pd.notna(row.get(topic_col, '')) else ''
self.table.setItem(table_row, 2, QTableWidgetItem(topic_value))
# 处理定时发布 - 使用日期时间选择器
time_str = ''
if pd.notna(row.get('定时发布', '')):
try:
if isinstance(row['定时发布'], pd.Timestamp):
time_str = row['定时发布'].strftime("%Y-%m-%d %H:%M:%S")
else:
time_str = str(row['定时发布'])
# 尝试解析各种时间格式
if len(time_str) > 19:
time_str = time_str[:19]
except:
pass
datetime_editor = self.create_datetime_editor(time_str)
self.table.setCellWidget(table_row, 3, datetime_editor)
self.table.setItem(table_row, 4, QTableWidgetItem(str(int(row['间隔时间'])) if pd.notna(row.get('间隔时间', '')) else '30'))
# 达人链接(保留字段)
self.table.setItem(table_row, 5, QTableWidgetItem(str(row.get('达人链接', '')) if pd.notna(row.get('达人链接', '')) else ''))
self.table.setItem(table_row, 6, QTableWidgetItem(str(int(row['数量'])) if pd.notna(row.get('数量', '')) else '1'))
self.table.setItem(table_row, 7, QTableWidgetItem(str(row.get('情况', '')) if pd.notna(row.get('情况', '')) else ''))
self.table.setItem(table_row, 8, QTableWidgetItem('待执行'))
imported_count += 1
except Exception as e:
self.log(f"✗ 导入第 {idx+1} 行失败: {str(e)}")
self.log(f"✓ 成功导入 {imported_count}/{len(df)} 条配置")
self.update_stats()
QMessageBox.information(self, '成功', f'成功导入 {imported_count}/{len(df)} 条配置!')
except Exception as e:
error_detail = traceback.format_exc()
self.log(f"✗ 导入配置失败: {str(e)}")
self.log(f"错误详情: {error_detail}")
QMessageBox.critical(self, '错误', f'导入配置失败: {str(e)}')
def export_config(self):
if self.table.rowCount() == 0:
QMessageBox.warning(self, '警告', '没有配置可导出')
return
file_path, _ = QFileDialog.getSaveFileName(
self, '保存配置', '', 'Excel Files (*.xlsx)'
)
if not file_path:
return
try:
data = []
for row in range(self.table.rowCount()):
# 从日期时间选择器获取时间
datetime_widget = self.table.cellWidget(row, 3)
if datetime_widget and isinstance(datetime_widget, QDateTimeEdit):
time_str = datetime_widget.dateTime().toString("yyyy-MM-dd HH:mm:ss")
else:
# 兼容旧数据(文本格式)
time_item = self.table.item(row, 3)
time_str = time_item.text() if time_item else ''
row_data = {
'用户ID': self.table.item(row, 0).text() if self.table.item(row, 0) else '',
'文件路径': self.table.item(row, 1).text() if self.table.item(row, 1) else '',
'话题(以中文"-"分隔)': self.table.item(row, 2).text() if self.table.item(row, 2) else '',
'定时发布': time_str,
'间隔时间': self.table.item(row, 4).text() if self.table.item(row, 4) else '30',
'达人链接': self.table.item(row, 5).text() if self.table.item(row, 5) else '',
'数量': self.table.item(row, 6).text() if self.table.item(row, 6) else '1',
'情况': self.table.item(row, 7).text() if self.table.item(row, 7) else '',
}
data.append(row_data)
df = pd.DataFrame(data)
df.to_excel(file_path, index=False)
self.log("✓ 配置导出成功")
QMessageBox.information(self, '成功', '配置导出成功!')
except Exception as e:
error_detail = traceback.format_exc()
self.log(f"✗ 导出配置失败: {str(e)}")
self.log(f"错误详情: {error_detail}")
QMessageBox.critical(self, '错误', f'导出配置失败: {str(e)}')
def get_configs_from_table(self):
"""从表格获取所有配置"""
configs = []
for row in range(self.table.rowCount()):
user_id = self.table.item(row, 0).text() if self.table.item(row, 0) else ''
file_path = self.table.item(row, 1).text() if self.table.item(row, 1) else ''
topics = self.table.item(row, 2).text() if self.table.item(row, 2) else ''
# 从日期时间选择器获取时间
datetime_widget = self.table.cellWidget(row, 3)
if datetime_widget and isinstance(datetime_widget, QDateTimeEdit):
time_publish = datetime_widget.dateTime().toString("yyyy-MM-dd HH:mm:ss")
else:
# 兼容旧数据(文本格式)
time_item = self.table.item(row, 3)
time_publish = time_item.text() if time_item else ''
interval = self.table.item(row, 4).text() if self.table.item(row, 4) else '30'
url = self.table.item(row, 5).text() if self.table.item(row, 5) else '' # 达人链接在第5列
quantity = self.table.item(row, 6).text() if self.table.item(row, 6) else '1' # 注意数量在第6列
status = self.table.item(row, 7).text() if self.table.item(row, 7) else '' # 注意情况在第7列
if not user_id or not file_path:
continue
configs.append({
'用户ID': user_id,
'文件路径': file_path,
'话题': topics,
'定时发布': time_publish,
'间隔时间': interval,
'达人链接': url, # 添加达人链接
'数量': quantity,
'情况': status
})
return configs
def start_publish(self):
configs = self.get_configs_from_table()
if not configs:
QMessageBox.warning(self, '警告', '没有有效的配置,请先添加或导入配置')
return
# 验证配置
invalid_configs = []
for idx, config in enumerate(configs):
if not os.path.exists(config['文件路径']):
invalid_configs.append(f"配置 {idx+1}: 文件路径不存在")
if invalid_configs:
QMessageBox.warning(self, '警告', '以下配置有问题:\n' + '\n'.join(invalid_configs))
return
# 确认对话框
total_folders = sum(
min(len([f for f in os.listdir(c['文件路径']) if os.path.isdir(os.path.join(c['文件路径'], f))]),
int(c.get('数量', 1)))
for c in configs if os.path.exists(c['文件路径'])
)
reply = QMessageBox.question(
self, '确认发布',
f'将处理 {len(configs)} 条配置,共 {total_folders} 个文件夹,是否开始?',
QMessageBox.Yes | QMessageBox.No
)
if reply == QMessageBox.No:
return
# 更新状态
for row in range(self.table.rowCount()):
status_item = self.table.item(row, 8) # 状态在第8列隐藏列
if status_item:
status_item.setText('执行中')
status_item.setForeground(QColor(255, 165, 0))
# 创建并启动线程
self.publish_thread = PublishThread(configs)
self.publish_thread.progress.connect(self.log)
self.publish_thread.finished.connect(self.on_publish_finished)
self.publish_thread.update_progress.connect(self.update_progress_bar)
self.progress_bar.setVisible(True)
self.progress_bar.setRange(0, 0) # 不确定进度
self.stop_btn.setEnabled(True)
self.publish_thread.start()
self.log("=" * 50)
self.log(f"开始批量发布任务,共 {len(configs)} 条配置...")
def update_progress_bar(self, current, total):
"""更新进度条"""
if total > 0:
self.progress_bar.setRange(0, total)
self.progress_bar.setValue(current)
def stop_publish(self):
if self.publish_thread and self.publish_thread.isRunning():
reply = QMessageBox.question(
self, '确认停止',
'确定要停止当前任务吗?',
QMessageBox.Yes | QMessageBox.No
)
if reply == QMessageBox.Yes:
self.publish_thread.stop()
self.log("正在停止任务...")
self.stop_btn.setEnabled(False)
def on_publish_finished(self, success, message):
self.progress_bar.setVisible(False)
self.stop_btn.setEnabled(False)
self.log(message)
# 更新状态
for row in range(self.table.rowCount()):
status_item = self.table.item(row, 8) # 状态在第8列隐藏列
if status_item:
status_text = status_item.text()
if status_text == '执行中':
if success:
status_item.setText('完成')
status_item.setForeground(QColor(0, 128, 0))
else:
status_item.setText('失败')
status_item.setForeground(QColor(255, 0, 0))
if success:
QMessageBox.information(self, '完成', message)
else:
QMessageBox.warning(self, '完成', message)
def clear_log(self):
self.log_output.clear()
self.log("日志已清空")
def log(self, message):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.log_output.append(f"[{timestamp}] {message}")
# 自动滚动到底部
scrollbar = self.log_output.verticalScrollBar()
scrollbar.setValue(scrollbar.maximum())
def auto_save(self):
"""自动保存配置"""
try:
data = []
for row in range(self.table.rowCount()):
datetime_widget = self.table.cellWidget(row, 3)
if datetime_widget and isinstance(datetime_widget, QDateTimeEdit):
time_str = datetime_widget.dateTime().toString("yyyy-MM-dd HH:mm:ss")
else:
time_item = self.table.item(row, 3)
time_str = time_item.text() if time_item else ''
row_data = {
'用户ID': self.table.item(row, 0).text() if self.table.item(row, 0) else '',
'文件路径': self.table.item(row, 1).text() if self.table.item(row, 1) else '',
'话题': self.table.item(row, 2).text() if self.table.item(row, 2) else '',
'定时发布': time_str,
'间隔时间': self.table.item(row, 4).text() if self.table.item(row, 4) else '30',
'达人链接': self.table.item(row, 5).text() if self.table.item(row, 5) else '',
'数量': self.table.item(row, 6).text() if self.table.item(row, 6) else '1',
'情况': self.table.item(row, 7).text() if self.table.item(row, 7) else '',
}
data.append(row_data)
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
# 静默失败,不影响用户体验
pass
def load_auto_save(self):
"""加载自动保存的配置"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if data:
reply = QMessageBox.question(
self, '发现自动保存的配置',
f'发现 {len(data)} 条自动保存的配置,是否加载?',
QMessageBox.Yes | QMessageBox.No
)
if reply == QMessageBox.Yes:
for row_data in data:
row = self.table.rowCount()
self.table.insertRow(row)
self.table.setItem(row, 0, QTableWidgetItem(row_data.get('用户ID', '')))
self.table.setItem(row, 1, QTableWidgetItem(row_data.get('文件路径', '')))
self.table.setItem(row, 2, QTableWidgetItem(row_data.get('话题', '')))
datetime_editor = self.create_datetime_editor(row_data.get('定时发布', ''))
self.table.setCellWidget(row, 3, datetime_editor)
self.table.setItem(row, 4, QTableWidgetItem(str(row_data.get('间隔时间', '30'))))
self.table.setItem(row, 5, QTableWidgetItem(row_data.get('达人链接', '')))
self.table.setItem(row, 6, QTableWidgetItem(str(row_data.get('数量', '1'))))
self.table.setItem(row, 7, QTableWidgetItem(row_data.get('情况', '')))
self.table.setItem(row, 8, QTableWidgetItem('待执行'))
self.update_stats()
self.log(f"✓ 已加载 {len(data)} 条自动保存的配置")
except Exception as e:
# 静默失败
pass
def closeEvent(self, event):
"""窗口关闭事件"""
if self.publish_thread and self.publish_thread.isRunning():
reply = QMessageBox.question(
self, '确认退出',
'有任务正在运行,确定要退出吗?',
QMessageBox.Yes | QMessageBox.No
)
if reply == QMessageBox.No:
event.ignore()
return
# 自动保存
self.auto_save()
event.accept()
if __name__ == '__main__':
app = QApplication(sys.argv)
try:
window = ConfigGUI()
window.show()
sys.exit(app.exec_())
except Exception as e:
QMessageBox.critical(None, '启动错误', f'程序启动失败: {str(e)}\n{traceback.format_exc()}')
sys.exit(1)