Files
haha/gui_app.py
27942 8616ca6b4b gui
第一版
2026-01-18 06:11:21 +08:00

868 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
import re
import json
from datetime import datetime, timedelta
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QPushButton, QTableWidget, QTableWidgetItem,
QFileDialog, QMessageBox, QHeaderView, QLabel, QMenu,
QInputDialog, QDialog, QDialogButtonBox, QFormLayout,
QLineEdit, QSpinBox, QMessageBox, QToolBar, QAction,
QProgressBar)
from PyQt5.QtCore import Qt, QThread, pyqtSignal
from PyQt5.QtGui import QKeySequence
import pandas as pd
class TaskWorker(QThread):
progress = pyqtSignal(int, int, str)
finished = pyqtSignal(int, int, list)
error = pyqtSignal(str)
def __init__(self, tasks):
super().__init__()
self.tasks = tasks
def run(self):
try:
from main import Pdd
from loguru import logger
success_count = 0
fail_count = 0
error_messages = []
total = len(self.tasks)
for idx, data in enumerate(self.tasks, 1):
user_id = data.get('用户ID', '')
file_path = data.get('文件路径', '')
topics = data.get('话题', '')
time_start = data.get('计算后的发布时间', '')
url = data.get('达人链接', '')
index = data.get('索引', '')
self.progress.emit(idx, total, f"正在执行任务 {idx}/{total} - 用户ID: {user_id}")
if not url or not user_id:
error_msg = f"任务 {idx}: 缺少必需参数 - 用户ID={user_id}, 达人链接={url}"
logger.warning(error_msg)
error_messages.append(error_msg)
fail_count += 1
continue
# 处理话题格式
if topics:
topic_list = []
for sep in ['', '', '-']:
if sep in topics:
topic_list = [t.strip() for t in topics.split(sep) if t.strip()]
break
if not topic_list:
topic_list = [topics.strip()] if topics.strip() else []
ht = ' '.join([f"#{t}#" for t in topic_list if t])
else:
ht = ""
if not index and file_path:
filename = os.path.basename(file_path)
parts = filename.split('-')
if parts:
index = parts[0]
try:
logger.info(
f"开始执行任务 {idx} - 用户ID: {user_id}, 达人链接: {url}, 文件路径: {file_path}, 索引: {index}"
)
pdd = Pdd(
url=url,
user_id=user_id,
time_start=time_start if time_start else None,
ht=ht,
index=index,
)
folder_path = None
if file_path:
if os.path.exists(file_path):
if os.path.isdir(file_path):
folder_path = file_path
else:
folder_path = os.path.dirname(file_path)
else:
folder_path = os.path.dirname(file_path) if file_path else None
if folder_path and not os.path.exists(folder_path):
folder_path = None
logger.info(f"调用 pdd.action(folder_path={folder_path})")
pdd.action(folder_path=folder_path)
logger.info(f"任务 {idx} 执行成功")
success_count += 1
except Exception as e:
error_msg = f"任务 {idx} 执行失败 - 用户ID: {user_id}, 错误: {str(e)}"
logger.error(error_msg)
logger.exception("详细错误信息:")
error_messages.append(error_msg)
fail_count += 1
continue
self.finished.emit(success_count, fail_count, error_messages)
except Exception as e:
self.error.emit(str(e))
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("拼多多MCN发布管理工具")
self.setGeometry(100, 100, 1200, 800)
# 存储原始数据和处理后的数据
self.raw_data = []
self.processed_data = []
self.worker = None
# 创建主窗口部件
main_widget = QWidget()
self.setCentralWidget(main_widget)
# 创建布局
layout = QVBoxLayout()
main_widget.setLayout(layout)
# 创建工具栏
toolbar = QToolBar("主工具栏")
self.addToolBar(toolbar)
# 导入Excel
self.import_action = QAction("导入Excel", self)
self.import_action.setShortcut(QKeySequence("Ctrl+O"))
self.import_action.triggered.connect(self.import_excel)
toolbar.addAction(self.import_action)
# 添加行
self.add_row_action = QAction("添加行", self)
self.add_row_action.setShortcut(QKeySequence("Ctrl+N"))
self.add_row_action.triggered.connect(self.add_row)
toolbar.addAction(self.add_row_action)
# 批量添加行
self.batch_add_action = QAction("批量添加", self)
self.batch_add_action.triggered.connect(self.batch_add_rows)
toolbar.addAction(self.batch_add_action)
# 删除行
self.delete_row_action = QAction("删除选中行", self)
self.delete_row_action.setShortcut(QKeySequence("Delete"))
self.delete_row_action.triggered.connect(self.delete_selected_rows)
toolbar.addAction(self.delete_row_action)
toolbar.addSeparator()
# 保存配置
self.save_action = QAction("保存配置", self)
self.save_action.setShortcut(QKeySequence("Ctrl+S"))
self.save_action.triggered.connect(self.save_config)
toolbar.addAction(self.save_action)
# 加载配置
self.load_action = QAction("加载配置", self)
self.load_action.setShortcut(QKeySequence("Ctrl+L"))
self.load_action.triggered.connect(self.load_config)
toolbar.addAction(self.load_action)
toolbar.addSeparator()
# 执行任务
execute_action = QAction("执行任务", self)
execute_action.setShortcut(QKeySequence("F5"))
execute_action.triggered.connect(self.execute_tasks)
execute_action.setEnabled(False)
self.execute_btn = execute_action
toolbar.addAction(execute_action)
# 导出Excel
export_action = QAction("导出Excel", self)
export_action.triggered.connect(self.export_excel)
export_action.setEnabled(False)
self.export_btn = export_action
toolbar.addAction(export_action)
# 导出Excel模板
self.export_template_action = QAction("导出Excel模板", self)
self.export_template_action.triggered.connect(self.export_template)
toolbar.addAction(self.export_template_action)
# 创建表格
self.table = QTableWidget()
self.table.setColumnCount(9)
self.table.setHorizontalHeaderLabels([
"用户ID", "索引", "文件路径", "话题", "定时发布", "间隔时间(分钟)",
"达人链接", "情况", "计算后的发布时间"
])
# 设置表格列宽自适应
header = self.table.horizontalHeader()
header.setSectionResizeMode(QHeaderView.Stretch)
# 允许编辑
self.table.setEditTriggers(QTableWidget.DoubleClicked | QTableWidget.SelectedClicked | QTableWidget.EditKeyPressed)
# 允许选择多行
self.table.setSelectionBehavior(QTableWidget.SelectRows)
self.table.setSelectionMode(QTableWidget.ExtendedSelection)
# UI优化
self.table.setAlternatingRowColors(True)
self.table.setWordWrap(False)
# 连接单元格变化信号,实时更新处理后的数据
self.table.cellChanged.connect(self.on_cell_changed)
# 设置右键菜单
self.table.setContextMenuPolicy(Qt.CustomContextMenu)
self.table.customContextMenuRequested.connect(self.show_context_menu)
layout.addWidget(self.table)
# 进度条
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 100)
self.progress_bar.setValue(0)
layout.addWidget(self.progress_bar)
# 状态标签
self.status_label = QLabel("就绪")
layout.addWidget(self.status_label)
def import_excel(self):
"""导入Excel文件"""
file_path, _ = QFileDialog.getOpenFileName(
self, "选择Excel文件", "", "Excel Files (*.xlsx *.xls)"
)
if not file_path:
return
try:
# 读取Excel文件
df = pd.read_excel(file_path)
# 检查必需的列
required_columns = ['用户ID', '文件路径', '话题(以中文"-"分隔)',
'定时发布', '间隔时间', '达人链接', '情况']
optional_columns = ['索引'] # 可选字段
# 检查列名是否存在(允许部分匹配)
missing_columns = []
column_mapping = {}
# 检查必需字段
for req_col in required_columns:
found = False
for col in df.columns:
if req_col in str(col) or str(col) in req_col:
column_mapping[req_col] = col
found = True
break
if not found:
missing_columns.append(req_col)
# 检查可选字段
for opt_col in optional_columns:
found = False
for col in df.columns:
if opt_col in str(col) or str(col) in opt_col:
column_mapping[opt_col] = col
found = True
break
if missing_columns:
QMessageBox.warning(
self, "错误",
f"Excel文件缺少以下必需的列\n{', '.join(missing_columns)}"
)
return
# 读取数据
self.raw_data = []
for index, row in df.iterrows():
# 获取索引字段(如果存在)
index_value = ''
if '索引' in column_mapping:
index_value = str(row[column_mapping['索引']]) if pd.notna(row[column_mapping['索引']]) else ''
data = {
'用户ID': str(row[column_mapping['用户ID']]) if pd.notna(row[column_mapping['用户ID']]) else '',
'索引': index_value,
'文件路径': str(row[column_mapping['文件路径']]) if pd.notna(row[column_mapping['文件路径']]) else '',
'话题': str(row[column_mapping['话题(以中文"-"分隔)']]) if pd.notna(row[column_mapping['话题(以中文"-"分隔)']]) else '',
'定时发布': str(row[column_mapping['定时发布']]) if pd.notna(row[column_mapping['定时发布']]) else '',
'间隔时间': str(row[column_mapping['间隔时间']]) if pd.notna(row[column_mapping['间隔时间']]) else '',
'达人链接': str(row[column_mapping['达人链接']]) if pd.notna(row[column_mapping['达人链接']]) else '',
'情况': str(row[column_mapping['情况']]) if pd.notna(row[column_mapping['情况']]) else '',
}
self.raw_data.append(data)
# 处理数据:计算间隔时间
self.process_data()
# 显示数据
self.display_data()
self.status_label.setText(f"成功导入 {len(self.processed_data)} 条数据")
self.enable_action_buttons()
self.enable_action_buttons()
except Exception as e:
QMessageBox.critical(self, "错误", f"导入Excel文件时出错\n{str(e)}")
import traceback
traceback.print_exc()
def process_data(self):
"""处理数据计算相同用户ID的间隔时间"""
self.processed_data = []
# 按用户ID分组
user_groups = {}
for data in self.raw_data:
user_id = data['用户ID']
if user_id not in user_groups:
user_groups[user_id] = []
user_groups[user_id].append(data)
# 处理每个用户组
for user_id, user_data_list in user_groups.items():
# 获取第一条数据的定时发布时间
first_time_str = user_data_list[0].get('定时发布', '')
first_time = None
if first_time_str:
try:
# 尝试解析时间格式YYYY-MM-DD HH:MM:SS 或 YYYY/MM/DD HH:MM:SS
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S",
"%Y-%m-%d %H:%M", "%Y/%m/%d %H:%M"]:
try:
first_time = datetime.strptime(first_time_str, fmt)
break
except ValueError:
continue
except Exception as e:
print(f"解析时间失败:{first_time_str}, 错误:{e}")
# 处理每条数据
for index, data in enumerate(user_data_list):
processed_item = data.copy()
if index == 0:
# 第一条数据使用原始的定时发布时间
processed_item['计算后的发布时间'] = first_time_str if first_time else ''
else:
# 后续数据根据间隔时间计算
if first_time:
try:
# 获取间隔时间(分钟)- 从第一条数据获取
interval_str = user_data_list[0].get('间隔时间', '0')
# 尝试转换为数字(可能是字符串格式,如"30分钟"、"30"等)
interval_minutes = 0
# 提取数字
numbers = re.findall(r'\d+', str(interval_str))
if numbers:
interval_minutes = int(numbers[0])
# 计算发布时间:第一条时间 + (索引 * 间隔时间)
calculated_time = first_time + timedelta(minutes=interval_minutes * index)
processed_item['计算后的发布时间'] = calculated_time.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
print(f"计算发布时间失败:{e}")
processed_item['计算后的发布时间'] = ''
else:
processed_item['计算后的发布时间'] = ''
self.processed_data.append(processed_item)
def display_data(self):
"""在表格中显示数据"""
self.table.blockSignals(True)
self.table.setRowCount(len(self.processed_data))
for row, data in enumerate(self.processed_data):
# 用户ID
self.table.setItem(row, 0, QTableWidgetItem(str(data.get('用户ID', ''))))
# 索引
self.table.setItem(row, 1, QTableWidgetItem(str(data.get('索引', ''))))
# 文件路径
self.table.setItem(row, 2, QTableWidgetItem(str(data.get('文件路径', ''))))
# 话题
self.table.setItem(row, 3, QTableWidgetItem(str(data.get('话题', ''))))
# 定时发布
self.table.setItem(row, 4, QTableWidgetItem(str(data.get('定时发布', ''))))
# 间隔时间
self.table.setItem(row, 5, QTableWidgetItem(str(data.get('间隔时间', ''))))
# 达人链接
self.table.setItem(row, 6, QTableWidgetItem(str(data.get('达人链接', ''))))
# 情况
self.table.setItem(row, 7, QTableWidgetItem(str(data.get('情况', ''))))
# 计算后的发布时间
calc_item = QTableWidgetItem(str(data.get('计算后的发布时间', '')))
calc_item.setFlags(calc_item.flags() & ~Qt.ItemIsEditable)
self.table.setItem(row, 8, calc_item)
self.table.blockSignals(False)
def on_cell_changed(self, row, column):
"""当单元格内容改变时,重新处理数据"""
if column == 8: # 计算后的发布时间列不允许直接编辑
return
# 从表格中读取数据
self.sync_table_to_raw_data()
# 重新处理数据
self.process_data()
# 更新显示(跳过计算后的发布时间列的更新,避免循环)
self.update_table_from_processed_data()
def sync_table_to_raw_data(self):
"""从表格同步数据到raw_data"""
self.raw_data = []
for row in range(self.table.rowCount()):
data = {
'用户ID': self.table.item(row, 0).text() if self.table.item(row, 0) else '',
'索引': self.table.item(row, 1).text() if self.table.item(row, 1) else '',
'文件路径': self.table.item(row, 2).text() if self.table.item(row, 2) else '',
'话题': self.table.item(row, 3).text() if self.table.item(row, 3) else '',
'定时发布': self.table.item(row, 4).text() if self.table.item(row, 4) else '',
'间隔时间': self.table.item(row, 5).text() if self.table.item(row, 5) else '',
'达人链接': self.table.item(row, 6).text() if self.table.item(row, 6) else '',
'情况': self.table.item(row, 7).text() if self.table.item(row, 7) else '',
}
self.raw_data.append(data)
def update_table_from_processed_data(self):
"""从processed_data更新表格只更新计算后的发布时间列"""
# 临时断开信号避免触发cellChanged
self.table.blockSignals(True)
for row in range(min(len(self.processed_data), self.table.rowCount())):
calculated_time = self.processed_data[row].get('计算后的发布时间', '')
if self.table.item(row, 8):
self.table.item(row, 8).setText(str(calculated_time))
else:
item = QTableWidgetItem(str(calculated_time))
item.setFlags(item.flags() & ~Qt.ItemIsEditable)
self.table.setItem(row, 8, item)
self.table.blockSignals(False)
def show_context_menu(self, position):
"""显示右键菜单"""
menu = QMenu(self)
add_action = menu.addAction("添加行")
add_action.triggered.connect(self.add_row)
batch_add_action = menu.addAction("批量添加行")
batch_add_action.triggered.connect(self.batch_add_rows)
menu.addSeparator()
delete_action = menu.addAction("删除选中行")
delete_action.triggered.connect(self.delete_selected_rows)
menu.addSeparator()
copy_action = menu.addAction("复制行")
copy_action.triggered.connect(self.copy_selected_rows)
paste_action = menu.addAction("粘贴行")
paste_action.triggered.connect(self.paste_rows)
menu.exec_(self.table.viewport().mapToGlobal(position))
def add_row(self):
"""添加一行空数据"""
row_count = self.table.rowCount()
self.table.insertRow(row_count)
# 初始化空单元格
for col in range(9):
item = QTableWidgetItem("")
if col == 8:
item.setFlags(item.flags() & ~Qt.ItemIsEditable)
self.table.setItem(row_count, col, item)
# 更新数据
self.sync_table_to_raw_data()
self.process_data()
self.display_data()
# 启用按钮
self.enable_action_buttons()
def batch_add_rows(self):
"""批量添加行"""
count, ok = QInputDialog.getInt(self, "批量添加", "请输入要添加的行数:", 1, 1, 1000, 1)
if ok and count > 0:
row_count = self.table.rowCount()
self.table.setRowCount(row_count + count)
# 初始化新行
for i in range(count):
for col in range(9):
item = QTableWidgetItem("")
if col == 8:
item.setFlags(item.flags() & ~Qt.ItemIsEditable)
self.table.setItem(row_count + i, col, item)
# 更新数据
self.sync_table_to_raw_data()
self.process_data()
self.display_data()
# 启用按钮
self.enable_action_buttons()
QMessageBox.information(self, "成功", f"已添加 {count}")
def delete_selected_rows(self):
"""删除选中的行"""
selected_rows = set()
for item in self.table.selectedItems():
selected_rows.add(item.row())
if not selected_rows:
QMessageBox.warning(self, "警告", "请先选择要删除的行")
return
# 按行号降序排列,从后往前删除
rows_to_delete = sorted(selected_rows, reverse=True)
reply = QMessageBox.question(
self, "确认",
f"确定要删除 {len(rows_to_delete)} 行吗?",
QMessageBox.Yes | QMessageBox.No
)
if reply == QMessageBox.Yes:
for row in rows_to_delete:
self.table.removeRow(row)
# 更新数据
self.sync_table_to_raw_data()
self.process_data()
self.display_data()
# 更新按钮状态
self.enable_action_buttons()
def copy_selected_rows(self):
"""复制选中的行"""
selected_rows = set()
for item in self.table.selectedItems():
selected_rows.add(item.row())
if not selected_rows:
QMessageBox.warning(self, "警告", "请先选择要复制的行")
return
# 将数据序列化为JSON字符串存储到剪贴板
rows_data = []
for row in sorted(selected_rows):
row_data = {}
for col in range(9):
item = self.table.item(row, col)
header = self.table.horizontalHeaderItem(col).text()
row_data[header] = item.text() if item else ""
rows_data.append(row_data)
clipboard = QApplication.clipboard()
clipboard.setText(json.dumps(rows_data, ensure_ascii=False, indent=2))
QMessageBox.information(self, "成功", f"已复制 {len(rows_data)} 行到剪贴板")
def paste_rows(self):
"""粘贴行"""
clipboard = QApplication.clipboard()
text = clipboard.text()
try:
rows_data = json.loads(text)
if not isinstance(rows_data, list):
QMessageBox.warning(self, "错误", "剪贴板中没有有效的行数据")
return
# 列名映射
column_mapping = {
"用户ID": 0, "索引": 1, "文件路径": 2, "话题": 3,
"定时发布": 4, "间隔时间(分钟)": 5, "达人链接": 6,
"情况": 7, "计算后的发布时间": 8
}
# 在末尾插入新行
current_row = self.table.rowCount()
self.table.setRowCount(current_row + len(rows_data))
for i, row_data in enumerate(rows_data):
if isinstance(row_data, dict):
for key, value in row_data.items():
col = column_mapping.get(key)
if col is not None:
item = QTableWidgetItem(str(value))
if col == 8:
item.setFlags(item.flags() & ~Qt.ItemIsEditable)
self.table.setItem(current_row + i, col, item)
# 更新数据
self.sync_table_to_raw_data()
self.process_data()
self.display_data()
# 启用按钮
self.enable_action_buttons()
QMessageBox.information(self, "成功", f"已粘贴 {len(rows_data)}")
except json.JSONDecodeError:
QMessageBox.warning(self, "错误", "剪贴板中的数据格式不正确")
except Exception as e:
QMessageBox.critical(self, "错误", f"粘贴时出错:{str(e)}")
def save_config(self):
"""保存配置到JSON文件"""
if not self.raw_data:
QMessageBox.warning(self, "警告", "没有可保存的数据")
return
file_path, _ = QFileDialog.getSaveFileName(
self, "保存配置", "配置.json", "JSON Files (*.json)"
)
if not file_path:
return
try:
# 从表格同步数据
self.sync_table_to_raw_data()
# 保存到JSON
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(self.raw_data, f, ensure_ascii=False, indent=2)
QMessageBox.information(self, "成功", f"配置已保存到:\n{file_path}")
self.status_label.setText(f"配置已保存:{file_path}")
except Exception as e:
QMessageBox.critical(self, "错误", f"保存配置时出错:\n{str(e)}")
def load_config(self):
"""从JSON文件加载配置"""
file_path, _ = QFileDialog.getOpenFileName(
self, "加载配置", "", "JSON Files (*.json)"
)
if not file_path:
return
try:
# 读取JSON
with open(file_path, 'r', encoding='utf-8') as f:
self.raw_data = json.load(f)
# 处理数据
self.process_data()
# 显示数据
self.display_data()
# 启用按钮
self.enable_action_buttons()
QMessageBox.information(self, "成功", f"已加载 {len(self.raw_data)} 条配置")
self.status_label.setText(f"已加载配置:{file_path}")
except Exception as e:
QMessageBox.critical(self, "错误", f"加载配置时出错:\n{str(e)}")
def enable_action_buttons(self):
"""根据数据状态启用/禁用按钮"""
has_data = len(self.raw_data) > 0
self.execute_btn.setEnabled(has_data)
self.export_btn.setEnabled(has_data)
def set_busy(self, busy):
"""执行任务时禁用交互,防止卡死"""
self.import_action.setEnabled(not busy)
self.add_row_action.setEnabled(not busy)
self.batch_add_action.setEnabled(not busy)
self.delete_row_action.setEnabled(not busy)
self.save_action.setEnabled(not busy)
self.load_action.setEnabled(not busy)
self.export_template_action.setEnabled(not busy)
self.execute_btn.setEnabled(not busy and len(self.raw_data) > 0)
self.export_btn.setEnabled(not busy and len(self.raw_data) > 0)
self.table.setEnabled(not busy)
def on_worker_progress(self, current, total, message):
"""后台任务进度更新"""
if total > 0:
percent = int((current / total) * 100)
self.progress_bar.setValue(percent)
self.status_label.setText(message)
def on_worker_finished(self, success_count, fail_count, error_messages):
"""后台任务完成"""
result_msg = f"任务执行完成!\n成功: {success_count}\n失败: {fail_count}"
if error_messages:
result_msg += f"\n\n错误详情:\n" + "\n".join(error_messages[:5])
if len(error_messages) > 5:
result_msg += f"\n... 还有 {len(error_messages) - 5} 个错误(请查看日志)"
self.progress_bar.setValue(100)
self.status_label.setText(f"执行完成 - 成功: {success_count}, 失败: {fail_count}")
self.set_busy(False)
self.worker = None
QMessageBox.information(self, "执行结果", result_msg)
def on_worker_error(self, error_message):
"""后台任务异常"""
self.progress_bar.setValue(0)
self.status_label.setText("执行失败")
self.set_busy(False)
self.worker = None
QMessageBox.critical(self, "错误", f"执行任务时出错:\n{error_message}")
def execute_tasks(self):
"""执行任务调用main.py中的Pdd类"""
# 同步最新编辑内容
self.sync_table_to_raw_data()
self.process_data()
self.update_table_from_processed_data()
if not self.processed_data:
QMessageBox.warning(self, "警告", "没有可执行的数据")
return
reply = QMessageBox.question(
self, "确认",
f"确定要执行 {len(self.processed_data)} 个任务吗?",
QMessageBox.Yes | QMessageBox.No
)
if reply == QMessageBox.Yes:
if self.worker is not None:
QMessageBox.warning(self, "提示", "任务正在执行中,请稍后")
return
self.progress_bar.setValue(0)
self.status_label.setText("开始执行任务...")
self.set_busy(True)
tasks = [dict(item) for item in self.processed_data]
self.worker = TaskWorker(tasks)
self.worker.progress.connect(self.on_worker_progress)
self.worker.finished.connect(self.on_worker_finished)
self.worker.error.connect(self.on_worker_error)
self.worker.start()
def export_template(self):
"""导出Excel模板"""
file_path, _ = QFileDialog.getSaveFileName(
self, "保存Excel模板", "配置模板.xlsx", "Excel Files (*.xlsx)"
)
if not file_path:
return
try:
# 创建模板数据(空行)
template_data = {
'用户ID': ['示例1050100241'],
'索引': ['示例1'],
'文件路径': ['示例C:\\Users\\user\\data\\1-title-desc.mp4'],
'话题(以中文"-"分隔)': ['示例python-自动化-技术'],
'定时发布': ['示例2026-01-28 09:30:00'],
'间隔时间': ['示例30单位分钟'],
'达人链接': ['示例https://www.xiaohongshu.com/explore/xxx'],
'情况': ['备注信息']
}
# 创建DataFrame并导出
df = pd.DataFrame(template_data)
df.to_excel(file_path, index=False, engine='openpyxl')
QMessageBox.information(
self, "成功",
f"Excel模板已导出到\n{file_path}\n\n"
"提示:\n"
"1. 索引字段用于标识文件序号\n"
"2. 话题使用中文破折号\"\"或短横线\"\"分隔\n"
"3. 相同用户ID的第一条数据使用定时发布时间后续根据间隔时间自动计算\n"
"4. 间隔时间单位为分钟"
)
self.status_label.setText(f"Excel模板已导出到{file_path}")
except Exception as e:
QMessageBox.critical(self, "错误", f"导出Excel模板时出错\n{str(e)}")
import traceback
traceback.print_exc()
def export_excel(self):
"""导出处理后的数据到Excel"""
if not self.processed_data:
QMessageBox.warning(self, "警告", "没有可导出的数据")
return
file_path, _ = QFileDialog.getSaveFileName(
self, "保存Excel文件", "", "Excel Files (*.xlsx)"
)
if not file_path:
return
try:
# 准备导出数据
export_data = []
for data in self.processed_data:
export_data.append({
'用户ID': data.get('用户ID', ''),
'索引': data.get('索引', ''),
'文件路径': data.get('文件路径', ''),
'话题(以中文"-"分隔)': data.get('话题', ''),
'定时发布': data.get('定时发布', ''),
'间隔时间': data.get('间隔时间', ''),
'达人链接': data.get('达人链接', ''),
'情况': data.get('情况', ''),
'计算后的发布时间': data.get('计算后的发布时间', ''),
})
# 创建DataFrame并导出
df = pd.DataFrame(export_data)
df.to_excel(file_path, index=False, engine='openpyxl')
QMessageBox.information(self, "成功", f"数据已导出到:\n{file_path}")
self.status_label.setText(f"数据已导出到:{file_path}")
except Exception as e:
QMessageBox.critical(self, "错误", f"导出Excel文件时出错\n{str(e)}")
import traceback
traceback.print_exc()
def main():
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec_())
if __name__ == '__main__':
main()