1778 lines
83 KiB
Python
1778 lines
83 KiB
Python
import os
|
||
import re
|
||
import json
|
||
import time
|
||
import threading
|
||
|
||
from pathlib import Path
|
||
from loguru import logger
|
||
from bs4 import BeautifulSoup
|
||
from curl_cffi import requests
|
||
from DrissionPage import ChromiumPage, ChromiumOptions, SessionPage
|
||
|
||
|
||
class ThreadSafeDict:
|
||
"""线程安全的字典包装类"""
|
||
|
||
def __init__(self):
|
||
self._dict = {}
|
||
self._lock = threading.Lock()
|
||
|
||
def get(self, key, default=None):
|
||
with self._lock:
|
||
return self._dict.get(key, default)
|
||
|
||
def __setitem__(self, key, value):
|
||
with self._lock:
|
||
self._dict[key] = value
|
||
|
||
def __getitem__(self, key):
|
||
with self._lock:
|
||
return self._dict[key]
|
||
|
||
def __contains__(self, key):
|
||
with self._lock:
|
||
return key in self._dict
|
||
|
||
|
||
class Pdd:
|
||
def __init__(self, url, user_id, time_start, ht, index, title=None):
|
||
self.url = url
|
||
self.user_id = user_id
|
||
self.time_start = time_start
|
||
|
||
self.session = requests.Session()
|
||
|
||
# 浏览器和URL模板
|
||
self.page = None
|
||
self.user_url_template = None # 用户视频列表URL模板
|
||
self.user_profile_url_template = None # 用户信息URL模板
|
||
|
||
self.title = title
|
||
self.ht = self._format_topic(ht) # 格式化话题
|
||
self.index = index
|
||
|
||
def _format_topic(self, topic_str):
|
||
"""
|
||
格式化话题:将"拍出氛围感-好看照片分享"转换为"#拍出氛围感 #好看照片分享"
|
||
|
||
Args:
|
||
topic_str: 原始话题字符串,可能包含"-"分隔符
|
||
|
||
Returns:
|
||
格式化后的话题字符串,每个部分前加"#"
|
||
"""
|
||
if not topic_str:
|
||
return ""
|
||
|
||
# 按"-"分割话题
|
||
parts = [part.strip() for part in topic_str.split("-") if part.strip()]
|
||
|
||
# 为每个部分添加"#"
|
||
formatted_parts = []
|
||
for part in parts:
|
||
# 如果部分已经以"#"开头,不再添加
|
||
if part.startswith("#"):
|
||
formatted_parts.append(part)
|
||
else:
|
||
formatted_parts.append(f"#{part}")
|
||
|
||
# 用空格连接
|
||
return " ".join(formatted_parts)
|
||
|
||
def create_page(self):
|
||
co = ChromiumOptions()
|
||
|
||
co.set_tmp_path("user/tmp")
|
||
co.set_user_data_path("user/user_data")
|
||
|
||
# 以该配置创建页面对象
|
||
self.page = ChromiumPage(addr_or_opts=co)
|
||
|
||
def extract_note_data(self, initial_state):
|
||
"""
|
||
从初始状态中提取笔记数据(只提取标题、描述、图片列表、视频列表和话题)
|
||
|
||
Args:
|
||
initial_state: window.__INITIAL_STATE__ 解析后的字典
|
||
|
||
Returns:
|
||
dict: 提取的笔记数据
|
||
"""
|
||
try:
|
||
# 获取笔记详情
|
||
note_store = initial_state.get('note', {})
|
||
note_detail_map = note_store.get('noteDetailMap', {})
|
||
|
||
# 获取第一个笔记ID
|
||
first_note_id = note_store.get('firstNoteId')
|
||
if not first_note_id:
|
||
# 如果没有firstNoteId,尝试获取noteDetailMap中的第一个key
|
||
if note_detail_map:
|
||
first_note_id = list(note_detail_map.keys())[0]
|
||
else:
|
||
print("未找到笔记ID")
|
||
return None
|
||
|
||
# 获取笔记详情
|
||
note_detail = note_detail_map.get(first_note_id, {})
|
||
note_info = note_detail.get('note', {})
|
||
|
||
if not note_info:
|
||
print("未找到笔记信息")
|
||
return None
|
||
|
||
# 只提取需要的字段
|
||
extracted_data = {
|
||
'title': note_info.get('title'),
|
||
'desc': note_info.get('desc'),
|
||
'images': [],
|
||
'videos': [],
|
||
'topics': []
|
||
}
|
||
|
||
# 提取图片信息
|
||
image_list = note_info.get('imageList', [])
|
||
for img in image_list:
|
||
image_data = {
|
||
'url': img.get('urlDefault') or img.get('url'),
|
||
'urlPre': img.get('urlPre'),
|
||
'width': img.get('width'),
|
||
'height': img.get('height'),
|
||
}
|
||
extracted_data['images'].append(image_data)
|
||
|
||
# 提取视频信息(如果存在)
|
||
video_info = note_info.get('video', {})
|
||
if video_info:
|
||
video_data = {}
|
||
|
||
# 尝试提取视频URL
|
||
media = video_info.get('media', {})
|
||
if media:
|
||
stream = media.get('stream', {})
|
||
if stream:
|
||
hls = stream.get('hls', {})
|
||
if hls:
|
||
video_data['url'] = hls.get('masterUrl') or hls.get('url')
|
||
# 如果没有hls,尝试其他字段
|
||
if not video_data.get('url'):
|
||
video_data['url'] = media.get('url') or media.get('videoUrl')
|
||
|
||
# 提取视频封面
|
||
if video_info.get('cover'):
|
||
video_data['cover'] = video_info.get('cover')
|
||
|
||
# 提取视频时长
|
||
if video_info.get('time'):
|
||
video_data['time'] = video_info.get('time')
|
||
|
||
if video_data.get('url'):
|
||
extracted_data['videos'].append(video_data)
|
||
|
||
# 提取话题信息
|
||
# 话题可能在多个位置,尝试不同的字段名
|
||
topic_list = note_info.get('topicList', []) or note_info.get('tagList', []) or note_info.get('hashtagList',
|
||
[])
|
||
if topic_list:
|
||
for topic in topic_list:
|
||
topic_data = {
|
||
'name': topic.get('name') or topic.get('title') or topic.get('tagName'),
|
||
'id': topic.get('id') or topic.get('topicId') or topic.get('tagId'),
|
||
}
|
||
if topic_data.get('name'):
|
||
extracted_data['topics'].append(topic_data)
|
||
|
||
# 如果描述中包含话题(#话题#格式),也提取出来
|
||
desc = note_info.get('desc', '')
|
||
if desc:
|
||
# 使用正则表达式提取 #话题# 格式
|
||
topic_pattern = r'#([^#]+)#'
|
||
matches = re.findall(topic_pattern, desc)
|
||
for match in matches:
|
||
# 避免重复添加
|
||
if not any(t.get('name') == match for t in extracted_data['topics']):
|
||
extracted_data['topics'].append({'name': match})
|
||
|
||
return extracted_data
|
||
|
||
except Exception as e:
|
||
print(f"提取笔记数据时出错:{e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
def extract_video_from_meta(self, html_content):
|
||
"""
|
||
从HTML的meta标签中提取视频信息
|
||
|
||
Args:
|
||
html_content: HTML内容字符串
|
||
|
||
Returns:
|
||
dict: 视频信息字典,如果没有找到则返回None
|
||
"""
|
||
try:
|
||
soup = BeautifulSoup(html_content, 'html.parser')
|
||
video_info = {}
|
||
|
||
# 提取og:video标签
|
||
og_video = soup.find('meta', {'name': 'og:video'})
|
||
if og_video and og_video.get('content'):
|
||
video_info['url'] = og_video.get('content')
|
||
|
||
# 提取视频时长
|
||
og_videotime = soup.find('meta', {'name': 'og:videotime'})
|
||
if og_videotime and og_videotime.get('content'):
|
||
video_info['time'] = og_videotime.get('content')
|
||
|
||
# 提取视频质量
|
||
og_videoquality = soup.find('meta', {'name': 'og:videoquality'})
|
||
if og_videoquality and og_videoquality.get('content'):
|
||
video_info['quality'] = og_videoquality.get('content')
|
||
|
||
# 如果找到了视频URL,返回视频信息
|
||
if video_info.get('url'):
|
||
return video_info
|
||
|
||
return None
|
||
except Exception as e:
|
||
print(f"从meta标签提取视频信息时出错:{e}")
|
||
return None
|
||
|
||
def get_page_datas(self):
|
||
tab = self.page.new_tab()
|
||
tab.listen.start(self.url)
|
||
|
||
tab.get(url=self.url)
|
||
|
||
res = tab.listen.wait(timeout=3)
|
||
if res:
|
||
print(res.response.body)
|
||
|
||
# 提取meta标签中的视频信息
|
||
video_info = self.extract_video_from_meta(res.response.body)
|
||
|
||
# 使用正则表达式提取window.__INITIAL_STATE__的内容
|
||
pattern = r'<script>window\.__INITIAL_STATE__\s*=\s*({.*?});?\s*</script>'
|
||
match = re.search(pattern, res.response.body, re.DOTALL)
|
||
|
||
if not match:
|
||
print("未找到 window.__INITIAL_STATE__ 数据")
|
||
# 如果只有视频信息,返回视频信息
|
||
if video_info:
|
||
return {'videos': [video_info]}
|
||
return None
|
||
|
||
# 提取JSON字符串
|
||
json_str = match.group(1)
|
||
|
||
# 处理JavaScript中的undefined值(Python JSON不支持undefined)
|
||
json_str = re.sub(r'\bundefined\b', 'null', json_str)
|
||
|
||
# 解析JSON
|
||
initial_state = json.loads(json_str)
|
||
|
||
# 提取笔记数据
|
||
note_data = self.extract_note_data(initial_state)
|
||
|
||
# 如果提取到视频信息,添加到笔记数据中
|
||
if video_info and note_data:
|
||
if 'videos' not in note_data or not note_data['videos']:
|
||
note_data['videos'] = []
|
||
note_data['videos'].append(video_info)
|
||
|
||
tab.close()
|
||
|
||
return note_data
|
||
|
||
def download_video(self, url):
|
||
page = SessionPage()
|
||
page.download(url)
|
||
|
||
def download_image(self, url, name):
|
||
"""
|
||
下载图片文件
|
||
|
||
Args:
|
||
url: 图片URL
|
||
save_path: 保存路径,如果为None则使用URL中的文件名
|
||
"""
|
||
# 设置请求头
|
||
headers = {
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
|
||
'Cache-Control': 'no-cache',
|
||
'DNT': '1',
|
||
'Pragma': 'no-cache',
|
||
'Proxy-Connection': 'keep-alive',
|
||
'Upgrade-Insecure-Requests': '1',
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0'
|
||
}
|
||
|
||
try:
|
||
# 发送请求,verify=False 相当于 curl 的 --insecure
|
||
response = requests.get(url, headers=headers, verify=False, timeout=30)
|
||
response.raise_for_status() # 检查HTTP错误
|
||
|
||
# 保存文件
|
||
with open(f"{name}.webp", 'wb') as f:
|
||
f.write(response.content)
|
||
|
||
return True
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"下载失败: {e}")
|
||
return None
|
||
|
||
def action(self, folder_path=None, collect_all_videos=False):
|
||
logger.info("=" * 50)
|
||
logger.info("开始执行 action 方法(单个/批量上传模式)")
|
||
logger.info(f"多多ID: {self.user_id}, 序号: {self.index}")
|
||
logger.info(f"文件夹路径: {folder_path}")
|
||
logger.info(f"批量上传模式: {collect_all_videos}")
|
||
|
||
logger.info("=" * 50)
|
||
logger.info("步骤1: 创建浏览器页面...")
|
||
self.create_page()
|
||
logger.info(" ✓ 浏览器页面创建完成")
|
||
|
||
if datas.get(self.user_id):
|
||
logger.info("=" * 50)
|
||
logger.info("步骤2: 使用已保存的标签页...")
|
||
creator_tab = self.page.new_tab(datas.get(self.user_id))
|
||
logger.info(f" ✓ 已打开已保存的标签页: {datas.get(self.user_id)}")
|
||
else:
|
||
logger.info("=" * 50)
|
||
logger.info("步骤2: 登录并导航到发布页面...")
|
||
self.page.get(url="https://mcn.pinduoduo.com/register")
|
||
logger.info(" ✓ 已打开登录页面")
|
||
|
||
for i in range(100):
|
||
if self.page.ele("x://*[text()='登录']", timeout=5):
|
||
logger.warning(" 请登录》》》")
|
||
elif self.page.ele("x://*[text()='主播/作者管理']", timeout=5):
|
||
logger.info(" ✓ 检测到已登录")
|
||
break
|
||
|
||
if i in [5,10,15,20,35,30]:
|
||
self.page.get(url="https://mcn.pinduoduo.com/register")
|
||
else:
|
||
logger.error(" ✗ 未登录!!!")
|
||
return {"ok": False, "reason": "未登录"}
|
||
|
||
logger.info(" 正在导航到发布页面...")
|
||
self.page.ele("x://*[text()='主播/作者管理']").click()
|
||
logger.info(" ✓ 已点击'主播/作者管理'")
|
||
time.sleep(1)
|
||
|
||
self.page.ele("x://*[text()='签约主播/作者']").click()
|
||
logger.info(" ✓ 已点击'签约主播/作者'")
|
||
|
||
ele = self.page.ele("x://*[text()='稍后再说']", timeout=3)
|
||
if ele:
|
||
ele.click()
|
||
logger.info(" ✓ 已关闭提示框")
|
||
time.sleep(1)
|
||
|
||
ele = self.page.ele("x://*[text()='我知道了']", timeout=3)
|
||
if ele:
|
||
ele.click()
|
||
logger.info(" ✓ 已关闭提示框")
|
||
time.sleep(1)
|
||
|
||
logger.info(f" 正在搜索多多ID: {self.user_id}")
|
||
self.page.ele('x://*[@placeholder="输入主播/作者ID搜索"]').input(vals=self.user_id, clear=True)
|
||
time.sleep(1)
|
||
self.page.ele("x://*[text()='提交']").click()
|
||
logger.info(" ✓ 已提交搜索")
|
||
time.sleep(1)
|
||
|
||
self.page.actions.move_to(ele_or_loc="x://*[text()='内容管理']")
|
||
time.sleep(1)
|
||
self.page.ele("x://*[text()='内容管理']").click()
|
||
logger.info(" ✓ 已点击'内容管理'")
|
||
time.sleep(3)
|
||
|
||
creator_tab = self.page.get_tab(url="home/creator/manage")
|
||
creator_tab.ele("x://*[text()='发布视频']").click()
|
||
logger.info(" ✓ 已点击'发布视频',进入发布页面")
|
||
datas[self.user_id] = creator_tab.url
|
||
|
||
# 从文件夹读取文件
|
||
video_file_paths = [] # 视频文件列表
|
||
image_folder_paths = [] # 图片文件夹列表(用于存储图片文件夹路径)
|
||
if folder_path and os.path.exists(folder_path):
|
||
logger.info(f"开始读取文件夹: {folder_path}")
|
||
logger.info(f"查找序号为 '{self.index}' 的文件")
|
||
|
||
# 支持的视频格式
|
||
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm']
|
||
# 支持的图片格式
|
||
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']
|
||
|
||
# 第一步:先收集所有匹配的视频文件和图片文件夹
|
||
logger.info("=" * 50)
|
||
if collect_all_videos:
|
||
logger.info("第一步:扫描文件夹,收集所有视频文件(批量上传模式,不限制序号)...")
|
||
else:
|
||
logger.info(f"第一步:扫描文件夹,收集序号为 '{self.index}' 的文件...")
|
||
|
||
# 获取最外层文件夹下的所有子文件夹(多多ID文件夹)
|
||
subdirs = [f for f in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, f))]
|
||
logger.info(f"在最外层文件夹下找到 {len(subdirs)} 个子文件夹(多多ID文件夹)")
|
||
|
||
# 找到匹配当前多多ID的文件夹
|
||
target_subdir = None
|
||
for subdir_name in subdirs:
|
||
if subdir_name == str(self.user_id):
|
||
target_subdir = os.path.join(folder_path, subdir_name)
|
||
logger.info(f"✓ 找到匹配的多多ID文件夹: {subdir_name}")
|
||
break
|
||
|
||
if not target_subdir:
|
||
logger.warning(f"未找到多多ID为 {self.user_id} 的文件夹")
|
||
logger.info(f"可用的文件夹: {subdirs}")
|
||
logger.error("无法继续执行,因为未找到匹配的多多ID文件夹")
|
||
creator_tab.close()
|
||
return {"ok": False, "reason": f"未找到多多ID为 {self.user_id} 的文件夹"}
|
||
else:
|
||
# 只扫描匹配的多多ID文件夹
|
||
logger.info(f" 正在扫描子文件夹: {os.path.basename(target_subdir)}")
|
||
|
||
# 获取该子文件夹下的所有文件和文件夹
|
||
try:
|
||
items = os.listdir(target_subdir)
|
||
logger.info(f" 该文件夹下有 {len(items)} 个项目")
|
||
|
||
for item_name in items:
|
||
item_path = os.path.join(target_subdir, item_name)
|
||
logger.info(f" 检查项目: {item_name}")
|
||
|
||
# 分割文件名,检查第一部分是否匹配序号
|
||
name_parts = item_name.split("-")
|
||
logger.info(f" 文件名分割结果: {name_parts}")
|
||
|
||
# 如果是批量上传模式,收集所有视频文件;否则只收集匹配序号的文件
|
||
should_collect = False
|
||
if collect_all_videos:
|
||
# 批量上传模式:只要文件名格式正确(有序号),就收集视频文件
|
||
if len(name_parts) > 0 and name_parts[0].isdigit():
|
||
should_collect = True
|
||
logger.info(f" 批量上传模式:序号 {name_parts[0]} 将被收集")
|
||
else:
|
||
# 单个上传模式:只收集匹配当前序号的文件
|
||
if len(name_parts) > 0 and name_parts[0] == str(self.index):
|
||
should_collect = True
|
||
logger.info(f" ✓ 序号匹配!序号: {name_parts[0]}, 目标序号: {self.index}")
|
||
|
||
if should_collect:
|
||
path = Path(item_path)
|
||
|
||
# 判断是否为文件(视频文件)
|
||
if path.is_file():
|
||
# 检查是否为视频文件
|
||
file_ext = path.suffix.lower()
|
||
logger.info(f" 这是一个文件,扩展名: {file_ext}")
|
||
|
||
if any(file_ext == ext for ext in video_extensions):
|
||
video_file_paths.append(path)
|
||
logger.info(f" ✓ 添加到视频列表: {path.name}")
|
||
logger.info(f" 当前视频总数: {len(video_file_paths)}")
|
||
else:
|
||
logger.info(f" ✗ 不是视频文件,跳过")
|
||
else:
|
||
# 这是一个图片文件夹(只在单个上传模式下处理)
|
||
if not collect_all_videos:
|
||
image_folder_paths.append(path)
|
||
logger.info(f" ✓ 添加到图片文件夹列表: {path.name}")
|
||
logger.info(f" 当前图片文件夹总数: {len(image_folder_paths)}")
|
||
else:
|
||
if len(name_parts) > 0:
|
||
if collect_all_videos:
|
||
logger.info(f" ✗ 文件名格式不正确(序号不是数字),跳过")
|
||
else:
|
||
logger.info(f" ✗ 序号不匹配: {name_parts[0]} != {self.index}")
|
||
else:
|
||
logger.info(f" ✗ 文件名格式不正确,无法提取序号")
|
||
except Exception as e:
|
||
logger.error(f" 扫描子文件夹 {target_subdir} 时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
logger.info("=" * 50)
|
||
logger.info(f"文件收集完成!")
|
||
logger.info(f" 视频文件数量: {len(video_file_paths)}")
|
||
logger.info(f" 图片文件夹数量: {len(image_folder_paths)}")
|
||
|
||
if video_file_paths:
|
||
logger.info("视频文件列表:")
|
||
for idx, video_path in enumerate(video_file_paths, 1):
|
||
logger.info(f" {idx}. {video_path}")
|
||
|
||
# 第二步:如果有视频文件,批量上传所有视频
|
||
if video_file_paths:
|
||
logger.info("=" * 50)
|
||
logger.info(f"第二步:开始批量上传 {len(video_file_paths)} 个视频文件...")
|
||
|
||
# 检查是否有"支持批量上传"按钮,如果有则使用批量上传
|
||
logger.info("查找批量上传按钮...")
|
||
batch_upload_btn = creator_tab.ele("x://*[text()='支持批量上传']", timeout=3)
|
||
|
||
if batch_upload_btn:
|
||
logger.info("✓ 找到'支持批量上传'按钮")
|
||
if len(video_file_paths) > 1:
|
||
logger.info(f"使用批量上传模式,准备上传 {len(video_file_paths)} 个视频")
|
||
logger.info(f"上传文件列表: {[str(p) for p in video_file_paths]}")
|
||
batch_upload_btn.click.to_upload(video_file_paths)
|
||
logger.info(f"✓ 已触发批量上传,上传 {len(video_file_paths)} 个视频")
|
||
else:
|
||
logger.info(f"只有1个视频,使用批量上传按钮上传")
|
||
batch_upload_btn.click.to_upload(video_file_paths)
|
||
logger.info(f"✓ 已触发上传")
|
||
else:
|
||
logger.warning("✗ 未找到'支持批量上传'按钮,尝试使用'添加视频'按钮")
|
||
# 使用单个上传(兼容旧版本或只有一个视频的情况)
|
||
upload_btn = creator_tab.ele("x://*[text()='添加视频']", timeout=3)
|
||
if upload_btn:
|
||
logger.info("✓ 找到'添加视频'按钮")
|
||
logger.info(f"使用添加视频按钮,准备上传 {len(video_file_paths)} 个视频")
|
||
logger.info(f"上传文件列表: {[str(p) for p in video_file_paths]}")
|
||
upload_btn.click.to_upload(video_file_paths)
|
||
logger.info(f"✓ 已触发上传,上传 {len(video_file_paths)} 个视频")
|
||
else:
|
||
logger.error("✗ 未找到任何上传按钮!")
|
||
|
||
logger.info("等待上传完成...")
|
||
while True:
|
||
# if creator_tab.ele("x://div[contains(text(), '视频上传成功')]", timeout=3):
|
||
if creator_tab.ele('x://*[contains(., "视频上传成功")]', timeout=3):
|
||
print(1)
|
||
break
|
||
time.sleep(5)
|
||
|
||
# 输入视频描述(只输入第一个视频的描述,因为批量上传后可能需要单独处理每个视频)
|
||
if video_file_paths:
|
||
first_video_name = video_file_paths[0].name
|
||
file_names = first_video_name.split("-")
|
||
if len(file_names) > 0:
|
||
desc_text = file_names[-1].split(".")[0] + self.ht
|
||
logger.info(f"准备输入视频描述: {desc_text[:50]}...")
|
||
desc_input = creator_tab.ele(f'x://*[@id="magicdomid1"]', timeout=3)
|
||
if desc_input:
|
||
desc_input.input(vals=desc_text, clear=True)
|
||
logger.info(f"✓ 已输入视频描述")
|
||
else:
|
||
logger.warning("✗ 未找到描述输入框")
|
||
else:
|
||
logger.warning("未找到任何视频文件,跳过视频上传")
|
||
|
||
# 第三步:如果有图片文件夹,逐个上传图片
|
||
if image_folder_paths:
|
||
logger.info(f"找到 {len(image_folder_paths)} 个图片文件夹,开始逐个上传...")
|
||
for image_folder_path in image_folder_paths:
|
||
image_files = []
|
||
# 收集图片文件夹中的所有图片文件
|
||
if image_folder_path.is_dir():
|
||
for img_file in os.listdir(image_folder_path):
|
||
img_path = Path(os.path.join(image_folder_path, img_file))
|
||
if img_path.is_file() and any(img_path.suffix.lower() == ext for ext in image_extensions):
|
||
image_files.append(str(img_path))
|
||
|
||
if image_files:
|
||
creator_tab.ele('x://*[text()="添加图片"]').click.to_upload(image_files)
|
||
time.sleep(3)
|
||
|
||
# 提取文件夹名称用于标题和描述
|
||
folder_name = image_folder_path.name
|
||
file_names = folder_name.split("-")
|
||
if len(file_names) > 1:
|
||
creator_tab.ele('x://*[@placeholder="添加标题"]').input(vals=file_names[1], clear=True)
|
||
|
||
xpath_path = creator_tab.ele('x://*[text()="添加视频描述"]').xpath
|
||
# 方法2:使用正则表达式替换最后一个div[1]
|
||
new_path = re.sub(r'div\[1\]$', 'div[2]', xpath_path)
|
||
new_path += "/div/div[3]/div/div/div"
|
||
if len(file_names) > 2:
|
||
creator_tab.ele(f'x:{new_path}').input(vals=file_names[2] + " " + self.ht, clear=True)
|
||
logger.info(f"已上传图片文件夹: {folder_name}")
|
||
|
||
# if ".mp4" in file_path:
|
||
# # 上传视频
|
||
# creator_tab.ele("x://*[text()='添加视频']").click.to_upload(file_path)
|
||
# else:
|
||
# # 上传图片
|
||
# image_files = [f for f in files if os.path.splitext(f)[1].lower() in image_extensions]
|
||
# path_datas = image_files
|
||
# if path_datas:
|
||
# creator_tab.ele('x://*[text()="添加图片"]').click.to_upload(path_datas)
|
||
|
||
time.sleep(1)
|
||
|
||
# 点击立即发布选项
|
||
logger.info("=" * 50)
|
||
logger.info("步骤:选择发布方式...")
|
||
immediate_publish = creator_tab.ele('x://*[contains(text(), "立即发布")]', timeout=3)
|
||
if immediate_publish:
|
||
immediate_publish.click()
|
||
logger.info(" ✓ 已点击'立即发布'选项")
|
||
time.sleep(3)
|
||
else:
|
||
logger.warning(" ✗ 未找到'立即发布'选项")
|
||
|
||
# 定时
|
||
if self.time_start:
|
||
logger.info("=" * 50)
|
||
logger.info("步骤:设置定时发布...")
|
||
# 点击"定时发布"选项
|
||
logger.info(f" 定时时间: {self.time_start}")
|
||
schedule_btn = creator_tab.ele('x://*[contains(text(), "定时发布")]', timeout=3)
|
||
if schedule_btn:
|
||
schedule_btn.click()
|
||
logger.info(" ✓ 已点击'定时发布'选项")
|
||
time.sleep(1)
|
||
else:
|
||
logger.warning(" ✗ 未找到'定时发布'选项")
|
||
creator_tab.close()
|
||
return {"ok": False, "reason": "定时设置失败:未找到定时发布选项"}
|
||
|
||
# 获取日期选择器元素
|
||
date_picker_ele = creator_tab.ele('x://*[@placeholder="选择日期"]', timeout=3)
|
||
if date_picker_ele:
|
||
# 解析时间字符串,格式:2026-01-15 09:30:00
|
||
try:
|
||
from datetime import datetime
|
||
dt = datetime.strptime(self.time_start, "%Y-%m-%d %H:%M:%S")
|
||
date_str = dt.strftime("%Y-%m-%d")
|
||
time_str = dt.strftime("%H:%M:%S")
|
||
year = dt.year
|
||
month = dt.month
|
||
day = dt.day
|
||
hour = dt.hour
|
||
minute = dt.minute
|
||
second = dt.second
|
||
|
||
logger.info(
|
||
f"开始设置定时时间: {self.time_start} (年={year}, 月={month}, 日={day}, 时={hour}, 分={minute}, 秒={second})")
|
||
|
||
# 点击日期选择器打开面板
|
||
date_picker_ele.click()
|
||
time.sleep(1.5) # 等待面板完全加载
|
||
|
||
# 方法:通过点击日期和时间选择器来设置
|
||
# 1. 如果需要,先切换年月
|
||
# 2. 点击日期单元格
|
||
# 3. 点击时间选择器中的小时、分钟、秒
|
||
# 4. 点击确认按钮
|
||
|
||
# 检查并切换年月(如果需要)
|
||
# 获取当前显示的月份
|
||
try:
|
||
month_text_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]', timeout=2)
|
||
if month_text_ele:
|
||
current_month = month_text_ele.text
|
||
logger.info(f"当前显示的月份: {current_month}")
|
||
|
||
# 如果需要切换月份
|
||
target_month_str = f"{month}月"
|
||
if current_month != target_month_str:
|
||
logger.info(f"需要切换到目标月份: {target_month_str}")
|
||
# 计算月份差值(简化处理,只考虑同一年内)
|
||
current_month_num = int(current_month.replace('月', ''))
|
||
target_month_num = month
|
||
|
||
# 限定在日期选择器内,用 class 定位(RPR_right 仅日历右箭头有,避免轮播图)
|
||
date_root = 'x://div[@data-testid="beast-core-datePicker-dropdown-contentRoot"]'
|
||
if target_month_num > current_month_num:
|
||
arrow_selector = f'{date_root}//*[contains(@class,"RPR_iconPrevNext") and contains(@class,"RPR_right")]'
|
||
clicks_needed = target_month_num - current_month_num
|
||
else:
|
||
arrow_selector = f'{date_root}//*[contains(@class,"RPR_iconPrevNext") and not(contains(@class,"RPR_right"))]'
|
||
clicks_needed = current_month_num - target_month_num
|
||
|
||
# 点击箭头切换月份
|
||
for _ in range(min(clicks_needed, 12)):
|
||
arrow = creator_tab.ele(arrow_selector, timeout=1)
|
||
if arrow:
|
||
try:
|
||
arrow.click()
|
||
except Exception:
|
||
pass
|
||
time.sleep(0.5)
|
||
# 验证是否切换成功
|
||
new_month_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]',
|
||
timeout=1)
|
||
if new_month_ele and new_month_ele.text == target_month_str:
|
||
logger.info(f"成功切换到目标月份: {target_month_str}")
|
||
break
|
||
except Exception as e:
|
||
logger.warning(f"切换月份时出错: {e},继续尝试选择日期")
|
||
|
||
# 选择日期 - 点击对应的日期单元格
|
||
date_cell = creator_tab.ele(
|
||
f'x://td[@role="date-cell"]//div[@title="{day}" and not(contains(@class, "RPR_disabled")) and not(contains(@class, "RPR_outOfMonth"))]',
|
||
timeout=3)
|
||
if date_cell:
|
||
date_cell.click()
|
||
logger.info(f"已点击日期: {day}")
|
||
time.sleep(0.5)
|
||
else:
|
||
logger.warning(f"未找到日期单元格: {day}")
|
||
|
||
# 先点击时间输入框打开时间选择器
|
||
time_input = creator_tab.ele('x://input[@data-testid="beast-core-timePicker-html-input"]',
|
||
timeout=3)
|
||
if time_input:
|
||
time_input.click()
|
||
logger.info("已点击时间输入框,打开时间选择器")
|
||
time.sleep(0.8) # 等待时间选择器面板打开
|
||
else:
|
||
logger.warning("未找到时间输入框,尝试使用XPath")
|
||
# 备用方案:使用用户提供的XPath
|
||
try:
|
||
time_input_xpath = '/html/body/div[2]/div/div/div/div/div/footer/div/div/div/div/div/div/div/div[1]/input'
|
||
time_input = creator_tab.ele(f'x:{time_input_xpath}', timeout=2)
|
||
if time_input:
|
||
time_input.click()
|
||
logger.info("通过XPath点击了时间输入框")
|
||
time.sleep(0.8)
|
||
except Exception as e:
|
||
logger.warning(f"通过XPath也未能找到时间输入框: {e}")
|
||
|
||
# 选择时间 - 点击时间选择器中的小时、分钟、秒
|
||
# 小时
|
||
hour_str = f"{hour:02d}"
|
||
hour_item = creator_tab.ele(
|
||
f'x://ul[@data-testid="beast-core-timePicker-list-hh"]//li[text()="{hour_str}"]', timeout=3)
|
||
if hour_item:
|
||
hour_item.scroll.to_see()
|
||
time.sleep(0.2)
|
||
hour_item.click()
|
||
logger.info(f"已选择小时: {hour_str}")
|
||
time.sleep(0.3)
|
||
else:
|
||
logger.warning(f"未找到小时选项: {hour_str}")
|
||
|
||
# 分钟
|
||
minute_str = f"{minute:02d}"
|
||
minute_item = creator_tab.ele(
|
||
f'x://ul[@data-testid="beast-core-timePicker-list-mm"]//li[text()="{minute_str}"]', timeout=3)
|
||
if minute_item:
|
||
minute_item.scroll.to_see()
|
||
time.sleep(0.2)
|
||
minute_item.click()
|
||
logger.info(f"已选择分钟: {minute_str}")
|
||
time.sleep(0.3)
|
||
else:
|
||
logger.warning(f"未找到分钟选项: {minute_str}")
|
||
|
||
# 秒
|
||
second_str = f"{second:02d}"
|
||
second_item = creator_tab.ele(
|
||
f'x://ul[@data-testid="beast-core-timePicker-list-ss"]//li[text()="{second_str}"]', timeout=3)
|
||
if second_item:
|
||
second_item.scroll.to_see()
|
||
time.sleep(0.2)
|
||
second_item.click()
|
||
logger.info(f"已选择秒: {second_str}")
|
||
time.sleep(0.3)
|
||
else:
|
||
logger.warning(f"未找到秒选项: {second_str}")
|
||
|
||
# 点击确认按钮
|
||
try:
|
||
# 查找确认按钮
|
||
confirm_btn = creator_tab.ele(
|
||
'x://button[@data-testid="beast-core-button"]//span[text()="确认"]', timeout=3)
|
||
if confirm_btn:
|
||
confirm_btn.click()
|
||
logger.info("已点击确认按钮")
|
||
time.sleep(0.5)
|
||
else:
|
||
# 尝试通过JavaScript点击确认按钮
|
||
confirm_js = """
|
||
(function() {
|
||
const buttons = document.querySelectorAll('button[data-testid="beast-core-button"]');
|
||
for (let btn of buttons) {
|
||
const span = btn.querySelector('span');
|
||
if (span && span.textContent.includes('确认')) {
|
||
btn.click();
|
||
return true;
|
||
}
|
||
}
|
||
return false;
|
||
})();
|
||
"""
|
||
result = creator_tab.run_js(confirm_js)
|
||
if result:
|
||
logger.info("通过JavaScript点击了确认按钮")
|
||
else:
|
||
logger.warning("未找到确认按钮")
|
||
time.sleep(0.5)
|
||
except Exception as e:
|
||
logger.warning(f"点击确认按钮失败: {e}")
|
||
|
||
# 验证设置是否成功(同时校验日期+时间)
|
||
time.sleep(0.5)
|
||
ok, reason, actual_date, actual_time = self._verify_schedule_value(
|
||
creator_tab, dt, video_container=None
|
||
)
|
||
if ok:
|
||
logger.info(f"成功设置定时时间: {date_str} {time_str}")
|
||
else:
|
||
logger.warning(
|
||
f"设置的时间可能不准确,日期={actual_date} 时间={actual_time}, 期望={date_str} {time_str}"
|
||
)
|
||
creator_tab.close()
|
||
return {"ok": False, "reason": reason}
|
||
|
||
except ValueError as e:
|
||
logger.error(f"时间格式错误: {self.time_start}, 正确格式应为: YYYY-MM-DD HH:MM:SS, 错误: {e}")
|
||
creator_tab.close()
|
||
return {"ok": False, "reason": "定时设置失败:时间格式错误"}
|
||
except Exception as e:
|
||
logger.error(f"设置定时时间失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
creator_tab.close()
|
||
return {"ok": False, "reason": f"定时设置失败:{e}"}
|
||
else:
|
||
logger.error("未找到日期选择器,可能设置失败")
|
||
creator_tab.close()
|
||
return {"ok": False, "reason": "定时设置失败:未找到日期选择器"}
|
||
|
||
# 绑定任务
|
||
ele = creator_tab.ele('x://*[text()="点击绑定任务"]', timeout=3)
|
||
if ele:
|
||
ele.click()
|
||
creator_tab.ele('x://*[@placeholder="请输入个人主页链接"]').input(self.url)
|
||
time.sleep(1)
|
||
creator_tab.ele('x://*[text()="确认"]').click()
|
||
time.sleep(1)
|
||
|
||
ele = creator_tab.ele('x://*[text()="我已阅读并同意"]', timeout=3)
|
||
if ele:
|
||
ele.click()
|
||
time.sleep(1)
|
||
creator_tab.ele('x://*[text()="一键发布"]').click()
|
||
|
||
# 尝试验证是否发布成功(避免“点了就算成功”的误判)
|
||
publish_ok, publish_reason = self._verify_publish_success(creator_tab=creator_tab, video_container=None)
|
||
if publish_ok:
|
||
logger.info(f"✓ 发布校验通过:{publish_reason}")
|
||
else:
|
||
logger.warning(f"✗ 发布校验失败:{publish_reason}")
|
||
|
||
time.sleep(2)
|
||
|
||
creator_tab.close()
|
||
return {"ok": publish_ok, "reason": publish_reason}
|
||
|
||
def action1(self, folder_path=None, input_delay=0, on_item_done=None):
|
||
"""
|
||
批量上传视频,针对每个视频单独处理详情、定时任务和绑定任务
|
||
|
||
Args:
|
||
folder_path: 视频文件列表
|
||
input_delay: 输入延迟时间
|
||
on_item_done: 回调函数,每处理完一个视频后调用,参数为 dict: {index, path, name, ok, reason}
|
||
"""
|
||
logger.info("=" * 50)
|
||
logger.info("开始执行 action1 方法(批量上传模式)")
|
||
logger.info(f"多多ID: {self.user_id}, 序号: {self.index}")
|
||
logger.info(f"需要处理的视频数量: {len(folder_path) if folder_path else 0}")
|
||
|
||
logger.info("=" * 50)
|
||
logger.info("步骤1: 创建浏览器页面...")
|
||
self.create_page()
|
||
logger.info(" ✓ 浏览器页面创建完成")
|
||
|
||
logger.info("=" * 50)
|
||
logger.info("步骤2: 登录并导航到发布页面...")
|
||
if datas.get(self.user_id):
|
||
logger.info(" 使用已保存的标签页...")
|
||
creator_tab = self.page.new_tab(datas.get(self.user_id))
|
||
logger.info(f" ✓ 已打开已保存的标签页: {datas.get(self.user_id)}")
|
||
else:
|
||
|
||
logger.info("=" * 50)
|
||
logger.info("步骤2: 登录并导航到发布页面...")
|
||
self.page.get(url="https://mcn.pinduoduo.com/register")
|
||
logger.info(" ✓ 已打开登录页面")
|
||
|
||
for i in range(100):
|
||
if self.page.ele("x://*[text()='登录']", timeout=5):
|
||
logger.warning(" 请登录》》》")
|
||
elif self.page.ele("x://*[text()='主播/作者管理']", timeout=5):
|
||
logger.info(" ✓ 检测到已登录")
|
||
break
|
||
|
||
if i in [5, 10, 15, 20, 35, 30]:
|
||
self.page.get(url="https://mcn.pinduoduo.com/register")
|
||
else:
|
||
logger.error(" ✗ 未登录!!!")
|
||
return {"ok": False, "results": [], "reason": "未登录"}
|
||
|
||
logger.info(" 正在导航到发布页面...")
|
||
self.page.ele("x://*[text()='主播/作者管理']").click()
|
||
logger.info(" ✓ 已点击'主播/作者管理'")
|
||
time.sleep(1)
|
||
|
||
self.page.ele("x://*[text()='签约主播/作者']").click()
|
||
logger.info(" ✓ 已点击'签约主播/作者'")
|
||
|
||
ele = self.page.ele("x://*[text()='我知道了']", timeout=3)
|
||
if ele:
|
||
ele.click()
|
||
logger.info(" ✓ 已关闭提示框")
|
||
time.sleep(1)
|
||
|
||
logger.info(f" 正在搜索多多ID: {self.user_id}")
|
||
self.page.ele('x://*[@placeholder="输入主播/作者ID搜索"]').input(vals=self.user_id, clear=True)
|
||
time.sleep(1)
|
||
self.page.ele("x://*[text()='提交']").click()
|
||
logger.info(" ✓ 已提交搜索")
|
||
time.sleep(1)
|
||
|
||
self.page.actions.move_to(ele_or_loc="x://*[text()='内容管理']")
|
||
time.sleep(1)
|
||
self.page.ele("x://*[text()='内容管理']").click()
|
||
logger.info(" ✓ 已点击'内容管理'")
|
||
time.sleep(3)
|
||
|
||
creator_tab = self.page.get_tab(url="home/creator/manage")
|
||
creator_tab.ele("x://*[text()='发布视频']").click()
|
||
logger.info(" ✓ 已点击'发布视频',进入发布页面")
|
||
datas[self.user_id] = creator_tab.url
|
||
|
||
# 批量上传视频
|
||
logger.info("=" * 50)
|
||
logger.info("步骤3: 准备批量上传视频...")
|
||
videos = []
|
||
for idx, video_info in enumerate(folder_path, 1):
|
||
video_path = video_info["path"]
|
||
videos.append(video_path)
|
||
logger.info(f" {idx}. {video_path.name} ({video_path})")
|
||
|
||
logger.info(f"=" * 50)
|
||
logger.info(f"准备批量上传 {len(videos)} 个视频")
|
||
logger.info(f"视频文件列表: {[str(v) for v in videos]}")
|
||
|
||
# 优先使用"支持批量上传"按钮,如果没有则使用"添加视频"按钮
|
||
logger.info("查找上传按钮...")
|
||
batch_upload_btn = creator_tab.ele("x://*[text()='支持批量上传']", timeout=3)
|
||
if batch_upload_btn:
|
||
logger.info("✓ 找到'支持批量上传'按钮")
|
||
logger.info(f"使用批量上传按钮,准备上传 {len(videos)} 个视频")
|
||
batch_upload_btn.click.to_upload(videos)
|
||
logger.info(f"✓ 已触发批量上传,上传 {len(videos)} 个视频")
|
||
else:
|
||
logger.warning("✗ 未找到'支持批量上传'按钮,尝试使用'添加视频'按钮")
|
||
# 备用方案:使用"添加视频"按钮(可能也支持批量上传)
|
||
add_video_btn = creator_tab.ele("x://*[text()='添加视频']", timeout=3)
|
||
if add_video_btn:
|
||
logger.info("✓ 找到'添加视频'按钮")
|
||
logger.info(f"使用添加视频按钮,准备上传 {len(videos)} 个视频")
|
||
add_video_btn.click.to_upload(videos)
|
||
logger.info(f"✓ 已触发上传,上传 {len(videos)} 个视频")
|
||
else:
|
||
logger.error("✗ 未找到任何视频上传按钮!")
|
||
creator_tab.close()
|
||
return {"ok": False, "results": [], "reason": "未找到任何视频上传按钮"}
|
||
|
||
# 等待一段时间让视频开始上传和页面渲染
|
||
logger.info("=" * 50)
|
||
logger.info("步骤4: 等待视频上传和页面渲染...")
|
||
logger.info(f" 已触发上传 {len(videos)} 个视频,等待 {5} 秒让页面渲染...")
|
||
time.sleep(5)
|
||
logger.info(" ✓ 等待完成,开始处理视频详情")
|
||
|
||
# 不等待所有视频上传完成,而是检测每个视频的状态
|
||
# 只处理已上传完成的视频,跳过还在上传中的视频
|
||
logger.info("=" * 50)
|
||
logger.info("步骤5: 开始处理每个视频的详细信息...")
|
||
logger.info(f" 需要处理的视频总数: {len(folder_path)}")
|
||
logger.info(" 将逐个处理:输入描述 -> 设置定时 -> 绑定任务 -> 点击发布 -> 验证发布结果")
|
||
|
||
results = []
|
||
for idx, video_info in enumerate(folder_path):
|
||
try:
|
||
video_path = video_info["path"]
|
||
video_name = video_path.name
|
||
# 格式化话题(如果还没有格式化)
|
||
raw_ht = video_info.get("ht", self.ht)
|
||
video_ht = self._format_topic(raw_ht) if raw_ht else self.ht
|
||
video_time_start = video_info.get("time_start", self.time_start)
|
||
video_url = video_info.get("url", self.url)
|
||
|
||
logger.info("=" * 50)
|
||
logger.info(f"处理第 {idx + 1}/{len(folder_path)} 个视频")
|
||
logger.info(f" 视频名称: {video_name}")
|
||
logger.info(f" 视频路径: {video_path}")
|
||
logger.info(f" 话题: {video_ht}")
|
||
logger.info(f" 定时时间: {video_time_start}")
|
||
logger.info(f" 绑定URL: {video_url[:50] if video_url else 'None'}...")
|
||
|
||
# 定位视频容器:优先按文件名匹配,其次按索引
|
||
logger.info(f" 正在定位视频容器...")
|
||
video_container = None
|
||
video_name_without_ext = video_name.rsplit(".", 1)[0]
|
||
logger.info(f" 完整文件名: {video_name}")
|
||
logger.info(f" 不含扩展名: {video_name_without_ext}")
|
||
|
||
container_xpath = (
|
||
'x://p[contains(text(), "文件名:") and contains(text(), "{name}")]'
|
||
'/ancestor::div[contains(@class, "y0VjbyIp")][1]'
|
||
)
|
||
|
||
logger.info(f" 方法1: 按完整文件名匹配...")
|
||
video_container = creator_tab.ele(
|
||
container_xpath.format(name=video_name), timeout=5
|
||
)
|
||
if video_container:
|
||
logger.info(f" ✓ 通过完整文件名找到视频容器")
|
||
else:
|
||
logger.info(f" ✗ 未找到,尝试不含扩展名...")
|
||
video_container = creator_tab.ele(
|
||
container_xpath.format(name=video_name_without_ext), timeout=3
|
||
)
|
||
if video_container:
|
||
logger.info(f" ✓ 通过不含扩展名找到视频容器")
|
||
|
||
if not video_container:
|
||
logger.info(f" ✗ 按文件名未找到,尝试按索引匹配...")
|
||
containers = creator_tab.eles(
|
||
'x://p[contains(text(), "文件名:")]'
|
||
'/ancestor::div[contains(@class, "y0VjbyIp")][1]'
|
||
)
|
||
logger.info(f" 页面上共有 {len(containers)} 个视频容器")
|
||
if idx < len(containers):
|
||
video_container = containers[idx]
|
||
logger.warning(
|
||
f" ✓ 通过索引方式定位到视频容器: {idx + 1}/{len(containers)}"
|
||
)
|
||
else:
|
||
logger.error(f" ✗ 索引超出范围: {idx} >= {len(containers)}")
|
||
|
||
if not video_container:
|
||
logger.error(f" ✗ 未找到视频 {video_name} 的容器,跳过该视频")
|
||
continue
|
||
else:
|
||
logger.info(f" ✓ 成功定位到视频容器")
|
||
|
||
# 检测上传状态(在当前视频容器内)
|
||
logger.info(f" 检测视频上传状态...")
|
||
# 优先判断是否仍在上传,其次判断发布按钮是否可用
|
||
|
||
while True:
|
||
if video_container.ele(
|
||
'x://*[contains(., "视频上传成功")]', timeout=0.5
|
||
):
|
||
break
|
||
|
||
logger.warning(f" ✗ 视频 {video_name} 还在上传中,跳过处理")
|
||
time.sleep(5)
|
||
|
||
success_text = video_container.ele(
|
||
'x://*[contains(., "视频上传成功")]', timeout=0.5
|
||
)
|
||
if success_text:
|
||
logger.info(f" ✓ 检测到视频 {video_name} 上传成功标识")
|
||
else:
|
||
logger.info(f" 未找到'上传成功'标识,检查发布按钮状态...")
|
||
# 备用判断:发布按钮未禁用即可认为已完成
|
||
disabled_publish = video_container.ele(
|
||
'x://button[@data-testid="beast-core-button" and (@disabled or contains(@class, "BTN_disabled"))]//span[text()="发布"]',
|
||
timeout=0.5
|
||
)
|
||
if disabled_publish:
|
||
logger.warning(f" ✗ 视频 {video_name} 发布按钮仍禁用,跳过")
|
||
continue
|
||
logger.info(f" ✓ 未检测到成功标识,但发布按钮可用,继续处理 {video_name}")
|
||
|
||
# 1. 输入视频描述
|
||
logger.info(f" 步骤1: 输入视频描述...")
|
||
try:
|
||
# 格式化话题(如果还没有格式化)
|
||
formatted_ht = self._format_topic(video_ht) if video_ht else ""
|
||
desc_text = video_name.split(".")[0].split("-")[-1] + formatted_ht
|
||
logger.info(f" 描述文本: {desc_text[:50]}...")
|
||
desc_inputs = video_container.eles('x://*[starts-with(@id, "magicdomid")]')
|
||
if desc_inputs:
|
||
logger.info(f" 找到 {len(desc_inputs)} 个描述输入框,使用第一个")
|
||
desc_inputs[0].input(vals=desc_text, clear=True)
|
||
logger.info(f" ✓ 已输入视频描述")
|
||
else:
|
||
logger.warning(f" ✗ 在视频容器中未找到描述输入框")
|
||
except Exception as e:
|
||
logger.warning(f" ✗ 输入视频描述失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
time.sleep(1)
|
||
|
||
# 给用户留出填写信息的时间
|
||
if input_delay and input_delay > 0:
|
||
logger.info(f"等待用户填写视频信息: {input_delay}s")
|
||
time.sleep(input_delay)
|
||
|
||
# 2. 设置定时任务(如果该视频有定时时间)
|
||
if video_time_start:
|
||
try:
|
||
# 定位并确认勾选“定时发布”
|
||
if not self._ensure_schedule_selected(video_container):
|
||
reason = "定时设置失败:未能切换到定时发布"
|
||
logger.warning(f" ✗ {reason}")
|
||
result_item = {
|
||
"index": str(video_info.get("index", "")),
|
||
"path": str(video_path),
|
||
"name": video_name,
|
||
"ok": False,
|
||
"reason": reason,
|
||
}
|
||
results.append(result_item)
|
||
if on_item_done and callable(on_item_done):
|
||
try:
|
||
on_item_done(result_item)
|
||
except Exception:
|
||
pass
|
||
continue
|
||
|
||
# 设置定时时间
|
||
schedule_ok, schedule_reason = self._set_schedule_time(
|
||
creator_tab, video_time_start, video_container, idx
|
||
)
|
||
if not schedule_ok:
|
||
reason = schedule_reason or "定时设置失败:未知原因"
|
||
logger.warning(f" ✗ {reason}")
|
||
result_item = {
|
||
"index": str(video_info.get("index", "")),
|
||
"path": str(video_path),
|
||
"name": video_name,
|
||
"ok": False,
|
||
"reason": reason,
|
||
}
|
||
results.append(result_item)
|
||
if on_item_done and callable(on_item_done):
|
||
try:
|
||
on_item_done(result_item)
|
||
except Exception:
|
||
pass
|
||
continue
|
||
except Exception as e:
|
||
logger.warning(f" ✗ 设置定时任务失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
result_item = {
|
||
"index": str(video_info.get("index", "")),
|
||
"path": str(video_path),
|
||
"name": video_name,
|
||
"ok": False,
|
||
"reason": f"定时设置异常: {e}",
|
||
}
|
||
results.append(result_item)
|
||
if on_item_done and callable(on_item_done):
|
||
try:
|
||
on_item_done(result_item)
|
||
except Exception:
|
||
pass
|
||
continue
|
||
else:
|
||
logger.info(f" 步骤2: 跳过定时任务(未设置定时时间)")
|
||
|
||
# 3. 绑定任务(如果该视频有URL)
|
||
if video_url:
|
||
logger.info(f" 步骤3: 绑定任务...")
|
||
logger.info(f" 绑定URL: {video_url[:50]}...")
|
||
try:
|
||
bind_btn = video_container.ele('x://*[text()="点击绑定任务"]', timeout=3)
|
||
if bind_btn:
|
||
logger.info(f" ✓ 找到绑定任务按钮")
|
||
bind_btn.click()
|
||
time.sleep(1)
|
||
# 输入URL(这个输入框可能在弹窗中,使用全局查找)
|
||
url_input = creator_tab.ele('x://*[@placeholder="请输入个人主页链接"]', timeout=3)
|
||
if url_input:
|
||
logger.info(f" ✓ 找到URL输入框")
|
||
url_input.input(video_url, clear=True)
|
||
time.sleep(1)
|
||
confirm_btn = creator_tab.ele('x://*[text()="确认"]', timeout=3)
|
||
if confirm_btn:
|
||
confirm_btn.click()
|
||
logger.info(f" ✓ 已绑定任务")
|
||
time.sleep(1)
|
||
else:
|
||
logger.warning(f" ✗ 未找到确认按钮")
|
||
else:
|
||
logger.warning(f" ✗ 未找到URL输入框")
|
||
else:
|
||
logger.warning(f" ✗ 未找到绑定任务按钮")
|
||
except Exception as e:
|
||
logger.warning(f" ✗ 绑定任务失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
else:
|
||
logger.info(f" 步骤3: 跳过绑定任务(未设置URL)")
|
||
|
||
# 4. 点击该视频的发布按钮(注意:这里应该是单个视频的"发布"按钮,不是"立即发布")
|
||
logger.info(f" 步骤4: 点击发布按钮...")
|
||
try:
|
||
publish_btn = video_container.ele(
|
||
'x://button[@data-testid="beast-core-button"]//span[text()="发布"]', timeout=3
|
||
)
|
||
if publish_btn:
|
||
publish_btn.click()
|
||
logger.info(f" ✓ 已点击发布按钮")
|
||
ok, reason = self._verify_publish_success(creator_tab=creator_tab,
|
||
video_container=video_container)
|
||
if ok:
|
||
logger.info(f" ✓ 发布校验通过:{reason}")
|
||
else:
|
||
logger.warning(f" ✗ 发布校验失败:{reason}")
|
||
else:
|
||
ok, reason = False, "在视频容器中未找到发布按钮"
|
||
logger.warning(f" ✗ {reason}")
|
||
except Exception as e:
|
||
ok, reason = False, f"点击发布按钮异常: {e}"
|
||
logger.warning(f" ✗ {reason}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
result_item = {
|
||
"index": str(video_info.get("index", "")),
|
||
"path": str(video_path),
|
||
"name": video_name,
|
||
"ok": bool(ok),
|
||
"reason": str(reason),
|
||
}
|
||
results.append(result_item)
|
||
logger.info(f" ✓ 视频 {video_name} 处理完成(ok={ok})")
|
||
|
||
# 实时回调通知 GUI 更新状态(每处理完一个视频就通知)
|
||
if on_item_done and callable(on_item_done):
|
||
try:
|
||
on_item_done(result_item)
|
||
except Exception as e:
|
||
logger.warning(f"回调通知失败: {e}")
|
||
|
||
time.sleep(2) # 每个视频处理间隔
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"处理视频 {video_info.get('path', {}).name if hasattr(video_info.get('path', ''), 'name') else 'unknown'} 时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
try:
|
||
error_result = {
|
||
"index": str(video_info.get("index", "")),
|
||
"path": str(video_info.get("path", "")),
|
||
"name": getattr(video_info.get("path", None), "name", "unknown"),
|
||
"ok": False,
|
||
"reason": f"处理异常: {e}",
|
||
}
|
||
results.append(error_result)
|
||
|
||
# 实时回调通知 GUI 更新状态(失败情况)
|
||
if on_item_done and callable(on_item_done):
|
||
try:
|
||
on_item_done(error_result)
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
continue
|
||
|
||
# 最后点击一键发布(如果有)
|
||
logger.info("=" * 50)
|
||
logger.info("步骤:点击一键发布(最终确认)...")
|
||
try:
|
||
ele = creator_tab.ele('x://*[text()="我已阅读并同意"]', timeout=3)
|
||
if ele:
|
||
ele.click()
|
||
logger.info(" ✓ 已勾选'我已阅读并同意'")
|
||
time.sleep(0.5)
|
||
else:
|
||
logger.info(" - 未找到'我已阅读并同意'复选框,可能已默认勾选")
|
||
|
||
# one_key_publish = creator_tab.ele('x://*[text()="一键发布"]', timeout=3)
|
||
# if one_key_publish:
|
||
# logger.info(" ✓ 找到'一键发布'按钮")
|
||
# one_key_publish.click()
|
||
# logger.info(" ✓ 已点击一键发布按钮")
|
||
#
|
||
# logger.info(" 正在验证一键发布是否成功...")
|
||
# ok, reason = self._verify_publish_success(creator_tab=creator_tab, video_container=None)
|
||
# if ok:
|
||
# logger.info(f" ✓ 一键发布校验通过:{reason}")
|
||
# else:
|
||
# logger.warning(f" ✗ 一键发布校验失败:{reason}")
|
||
# time.sleep(2)
|
||
# else:
|
||
# logger.warning(" ✗ 未找到'一键发布'按钮")
|
||
except Exception as e:
|
||
logger.warning(f" ✗ 一键发布失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
time.sleep(2)
|
||
logger.info("=" * 50)
|
||
success_count = sum(1 for r in results if r.get("ok")) if results else 0
|
||
total_count = len(results) if results else 0
|
||
logger.info(f"批量上传任务完成,成功: {success_count}/{total_count}")
|
||
|
||
creator_tab.close()
|
||
return {"ok": all(r.get("ok") for r in results) if results else True, "results": results}
|
||
|
||
def _verify_publish_success(self, creator_tab, video_container=None, timeout_seconds: float = 8.0):
|
||
"""
|
||
尝试验证“是否真的发布成功”。
|
||
说明:不同版本页面反馈可能不同,这里采用多策略:
|
||
- toast/提示文本包含:发布成功/提交成功/已发布/已提交/成功
|
||
- 发布按钮变为禁用/消失/文案变化
|
||
"""
|
||
import time as _t
|
||
start = _t.time()
|
||
last_hint = "未检测到成功反馈"
|
||
|
||
success_keywords = ["发布成功", "提交成功", "已发布", "已提交", "成功"]
|
||
fail_keywords = ["失败", "错误", "异常", "请重试"]
|
||
|
||
def _has_text_anywhere(keywords):
|
||
for kw in keywords:
|
||
# 全局找,兼容 toast
|
||
if creator_tab.ele(f'x://*[contains(text(), "{kw}")]', timeout=0.2):
|
||
return kw
|
||
return None
|
||
|
||
while _t.time() - start < timeout_seconds:
|
||
hit_fail = _has_text_anywhere(fail_keywords)
|
||
if hit_fail:
|
||
return False, f"检测到失败提示: {hit_fail}"
|
||
|
||
hit_ok = _has_text_anywhere(success_keywords)
|
||
if hit_ok:
|
||
return True, f"检测到成功提示: {hit_ok}"
|
||
|
||
# 尝试按钮状态变化(优先容器内)
|
||
try:
|
||
scope = video_container if video_container else creator_tab
|
||
disabled = scope.ele(
|
||
'x://button[@data-testid="beast-core-button" and (@disabled or contains(@class, "BTN_disabled"))]//span[text()="发布"]',
|
||
timeout=0.2
|
||
)
|
||
if disabled:
|
||
return True, "发布按钮变为禁用"
|
||
except Exception:
|
||
pass
|
||
|
||
last_hint = "等待页面反馈..."
|
||
_t.sleep(0.3)
|
||
|
||
return False, last_hint
|
||
|
||
def _set_schedule_time(self, creator_tab, time_start, video_container=None, video_index=None):
|
||
"""
|
||
设置定时发布时间的辅助方法
|
||
|
||
Args:
|
||
creator_tab: 浏览器标签页对象
|
||
time_start: 时间字符串,格式:2026-01-15 09:30:00
|
||
video_container: 视频容器元素(可选)
|
||
video_index: 视频索引(当video_container为None时使用)
|
||
"""
|
||
try:
|
||
from datetime import datetime
|
||
dt = datetime.strptime(time_start, "%Y-%m-%d %H:%M:%S")
|
||
date_str = dt.strftime("%Y-%m-%d")
|
||
year = dt.year
|
||
month = dt.month
|
||
day = dt.day
|
||
hour = dt.hour
|
||
minute = dt.minute
|
||
second = dt.second
|
||
|
||
logger.info(
|
||
f"开始设置定时时间: {time_start} (年={year}, 月={month}, 日={day}, 时={hour}, 分={minute}, 秒={second})")
|
||
|
||
# 定位日期选择器(优先容器内,且优先 active)
|
||
date_picker_ele = None
|
||
if video_container and video_container != creator_tab:
|
||
date_picker_ele = video_container.ele(
|
||
'x://div[@data-testid="beast-core-datePicker-input" and @data-status="active"]'
|
||
'//input[@data-testid="beast-core-datePicker-htmlInput"]',
|
||
timeout=2
|
||
)
|
||
if not date_picker_ele:
|
||
date_picker_ele = video_container.ele(
|
||
'x://input[@data-testid="beast-core-datePicker-htmlInput"]',
|
||
timeout=2
|
||
)
|
||
if not date_picker_ele:
|
||
date_picker_ele = video_container.ele(
|
||
'x://*[@placeholder="选择日期"]',
|
||
timeout=2
|
||
)
|
||
|
||
if not date_picker_ele:
|
||
# 兜底:全局查找(优先 active)
|
||
date_picker_ele = creator_tab.ele(
|
||
'x://div[@data-testid="beast-core-datePicker-input" and @data-status="active"]'
|
||
'//input[@data-testid="beast-core-datePicker-htmlInput"]',
|
||
timeout=2
|
||
)
|
||
if not date_picker_ele:
|
||
date_picker_ele = creator_tab.ele(
|
||
'x://input[@data-testid="beast-core-datePicker-htmlInput"]',
|
||
timeout=2
|
||
)
|
||
if not date_picker_ele:
|
||
date_picker_ele = creator_tab.ele('x://*[@placeholder="选择日期"]', timeout=2)
|
||
|
||
if not date_picker_ele:
|
||
logger.warning("未找到日期选择器")
|
||
return False, "定时设置失败:未找到日期选择器"
|
||
|
||
# 点击日期选择器打开面板
|
||
date_picker_ele.click()
|
||
time.sleep(1.5)
|
||
|
||
# 检查并切换年月(如果需要)
|
||
try:
|
||
month_text_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]', timeout=2)
|
||
if month_text_ele:
|
||
current_month = month_text_ele.text
|
||
logger.info(f"当前显示的月份: {current_month}")
|
||
|
||
target_month_str = f"{month}月"
|
||
if current_month != target_month_str:
|
||
logger.info(f"需要切换到目标月份: {target_month_str}")
|
||
current_month_num = int(current_month.replace('月', ''))
|
||
target_month_num = month
|
||
|
||
date_root = 'x://div[@data-testid="beast-core-datePicker-dropdown-contentRoot"]'
|
||
if target_month_num > current_month_num:
|
||
arrow_selector = f'{date_root}//*[contains(@class,"RPR_iconPrevNext") and contains(@class,"RPR_right")]'
|
||
clicks_needed = target_month_num - current_month_num
|
||
else:
|
||
arrow_selector = f'{date_root}//*[contains(@class,"RPR_iconPrevNext") and not(contains(@class,"RPR_right"))]'
|
||
clicks_needed = current_month_num - target_month_num
|
||
|
||
for _ in range(min(clicks_needed, 12)):
|
||
arrow = creator_tab.ele(arrow_selector, timeout=1)
|
||
if arrow:
|
||
try:
|
||
arrow.click()
|
||
except Exception:
|
||
pass
|
||
time.sleep(0.5)
|
||
new_month_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]', timeout=1)
|
||
if new_month_ele and new_month_ele.text == target_month_str:
|
||
logger.info(f"成功切换到目标月份: {target_month_str}")
|
||
break
|
||
except Exception as e:
|
||
logger.warning(f"切换月份时出错: {e},继续尝试选择日期")
|
||
|
||
# 选择日期
|
||
date_cell = creator_tab.ele(
|
||
f'x://td[@role="date-cell"]//div[@title="{day}" and not(contains(@class, "RPR_disabled")) and not(contains(@class, "RPR_outOfMonth"))]',
|
||
timeout=3)
|
||
if date_cell:
|
||
date_cell.click()
|
||
logger.info(f"已点击日期: {day}")
|
||
time.sleep(0.5)
|
||
else:
|
||
logger.warning(f"未找到日期单元格: {day}")
|
||
return False, "定时设置失败:未找到日期单元格"
|
||
|
||
# 点击时间输入框打开时间选择器(优先容器内)
|
||
time_input = None
|
||
if video_container and video_container != creator_tab:
|
||
time_input = video_container.ele(
|
||
'x://input[@data-testid="beast-core-timePicker-html-input"]',
|
||
timeout=3
|
||
)
|
||
if not time_input:
|
||
time_input = creator_tab.ele(
|
||
'x://input[@data-testid="beast-core-timePicker-html-input"]',
|
||
timeout=3
|
||
)
|
||
if time_input:
|
||
time_input.click()
|
||
logger.info("已点击时间输入框,打开时间选择器")
|
||
time.sleep(0.8)
|
||
else:
|
||
logger.warning("未找到时间输入框,尝试使用XPath")
|
||
try:
|
||
time_input_xpath = '/html/body/div[2]/div/div/div/div/div/footer/div/div/div/div/div/div/div/div[1]/input'
|
||
time_input = creator_tab.ele(f'x:{time_input_xpath}', timeout=2)
|
||
if time_input:
|
||
time_input.click()
|
||
logger.info("通过XPath点击了时间输入框")
|
||
time.sleep(0.8)
|
||
except Exception as e:
|
||
logger.warning(f"通过XPath也未能找到时间输入框: {e}")
|
||
if not time_input:
|
||
return False, "定时设置失败:未找到时间输入框"
|
||
|
||
# 选择时间
|
||
hour_str = f"{hour:02d}"
|
||
hour_item = creator_tab.ele(
|
||
f'x://ul[@data-testid="beast-core-timePicker-list-hh"]//li[text()="{hour_str}"]', timeout=3)
|
||
if hour_item:
|
||
hour_item.scroll.to_see()
|
||
time.sleep(0.2)
|
||
hour_item.click()
|
||
logger.info(f"已选择小时: {hour_str}")
|
||
time.sleep(0.3)
|
||
else:
|
||
logger.warning(f"未找到小时选项: {hour_str}")
|
||
|
||
minute_str = f"{minute:02d}"
|
||
minute_item = creator_tab.ele(
|
||
f'x://ul[@data-testid="beast-core-timePicker-list-mm"]//li[text()="{minute_str}"]', timeout=3)
|
||
if minute_item:
|
||
minute_item.scroll.to_see()
|
||
time.sleep(0.2)
|
||
minute_item.click()
|
||
logger.info(f"已选择分钟: {minute_str}")
|
||
time.sleep(0.3)
|
||
else:
|
||
logger.warning(f"未找到分钟选项: {minute_str}")
|
||
|
||
second_str = f"{second:02d}"
|
||
second_item = creator_tab.ele(
|
||
f'x://ul[@data-testid="beast-core-timePicker-list-ss"]//li[text()="{second_str}"]', timeout=3)
|
||
if second_item:
|
||
second_item.scroll.to_see()
|
||
time.sleep(0.2)
|
||
second_item.click()
|
||
logger.info(f"已选择秒: {second_str}")
|
||
time.sleep(0.3)
|
||
else:
|
||
logger.warning(f"未找到秒选项: {second_str}")
|
||
|
||
# 点击确认按钮
|
||
try:
|
||
confirm_btn = creator_tab.ele(
|
||
'x://button[@data-testid="beast-core-button"]//span[text()="确认"]', timeout=3)
|
||
if confirm_btn:
|
||
confirm_btn.click()
|
||
logger.info("已点击确认按钮")
|
||
time.sleep(0.5)
|
||
else:
|
||
confirm_js = """
|
||
(function() {
|
||
const buttons = document.querySelectorAll('button[data-testid="beast-core-button"]');
|
||
for (let btn of buttons) {
|
||
const span = btn.querySelector('span');
|
||
if (span && span.textContent.includes('确认')) {
|
||
btn.click();
|
||
return true;
|
||
}
|
||
}
|
||
return false;
|
||
})();
|
||
"""
|
||
result = creator_tab.run_js(confirm_js)
|
||
if result:
|
||
logger.info("通过JavaScript点击了确认按钮")
|
||
else:
|
||
logger.warning("未找到确认按钮")
|
||
time.sleep(0.5)
|
||
except Exception as e:
|
||
logger.warning(f"点击确认按钮失败: {e}")
|
||
|
||
# 验证设置是否成功(优先从当前容器读取输入框值)
|
||
time.sleep(0.5)
|
||
expected_str = dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
final_value = None
|
||
try:
|
||
value_ele = None
|
||
if video_container and video_container != creator_tab:
|
||
value_ele = video_container.ele(
|
||
'x://input[@data-testid="beast-core-datePicker-htmlInput"]',
|
||
timeout=1
|
||
)
|
||
if not value_ele:
|
||
value_ele = creator_tab.ele(
|
||
'x://input[@data-testid="beast-core-datePicker-htmlInput"]',
|
||
timeout=1
|
||
)
|
||
if value_ele:
|
||
final_value = value_ele.attr("value")
|
||
except Exception as e:
|
||
logger.warning(f"读取日期选择器值失败: {e}")
|
||
|
||
if final_value and str(final_value).strip():
|
||
final_value_str = str(final_value).strip()
|
||
logger.info(f"日期选择器当前值: {final_value_str}, 期望值: {expected_str}")
|
||
# 完整比较日期时间字符串
|
||
if final_value_str == expected_str:
|
||
logger.info(f"成功设置定时时间: {final_value_str}")
|
||
return True, ""
|
||
else:
|
||
logger.warning(f"设置的时间可能不准确,当前值: {final_value_str}, 期望值: {expected_str}")
|
||
return False, f"定时设置失败:期望 {expected_str},实际 {final_value_str}"
|
||
else:
|
||
logger.error("无法获取日期选择器的值,可能设置失败")
|
||
return False, "定时设置失败:无法读取日期选择器值"
|
||
|
||
except ValueError as e:
|
||
logger.error(f"时间格式错误: {time_start}, 正确格式应为: YYYY-MM-DD HH:MM:SS, 错误: {e}")
|
||
return False, "定时设置失败:时间格式错误"
|
||
except Exception as e:
|
||
logger.error(f"设置定时时间失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False, f"定时设置失败:{e}"
|
||
return False, "定时设置失败:未知原因"
|
||
|
||
def _verify_schedule_value(self, creator_tab, expected_dt, video_container=None):
|
||
"""
|
||
验证定时时间是否设置正确
|
||
|
||
Args:
|
||
creator_tab: 浏览器标签页对象
|
||
expected_dt: 期望的 datetime 对象
|
||
video_container: 视频容器元素(可选)
|
||
|
||
Returns:
|
||
(ok, reason, actual_date, actual_time): 验证结果
|
||
"""
|
||
try:
|
||
expected_str = expected_dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
expected_date = expected_dt.strftime("%Y-%m-%d")
|
||
expected_time = expected_dt.strftime("%H:%M:%S")
|
||
|
||
# 读取日期时间输入框的值
|
||
final_value = None
|
||
try:
|
||
value_ele = None
|
||
if video_container and video_container != creator_tab:
|
||
value_ele = video_container.ele(
|
||
'x://input[@data-testid="beast-core-datePicker-htmlInput"]',
|
||
timeout=1
|
||
)
|
||
if not value_ele:
|
||
value_ele = creator_tab.ele(
|
||
'x://input[@data-testid="beast-core-datePicker-htmlInput"]',
|
||
timeout=1
|
||
)
|
||
if value_ele:
|
||
final_value = value_ele.attr("value")
|
||
except Exception as e:
|
||
logger.warning(f"读取日期选择器值失败: {e}")
|
||
|
||
if not final_value or not str(final_value).strip():
|
||
return False, "定时设置失败:无法读取日期选择器值", "", ""
|
||
|
||
final_value_str = str(final_value).strip()
|
||
logger.info(f"日期选择器当前值: {final_value_str}, 期望值: {expected_str}")
|
||
|
||
# 比较完整的日期时间字符串
|
||
if final_value_str == expected_str:
|
||
return True, "", final_value_str[:10], final_value_str[11:] if len(final_value_str) > 10 else ""
|
||
|
||
# 如果完整比较不匹配,尝试分别比较日期和时间
|
||
actual_date = final_value_str[:10] if len(final_value_str) >= 10 else final_value_str
|
||
actual_time = final_value_str[11:] if len(final_value_str) > 11 else ""
|
||
|
||
if actual_date == expected_date and actual_time == expected_time:
|
||
return True, "", actual_date, actual_time
|
||
|
||
# 不匹配
|
||
reason = f"定时设置失败:期望 {expected_str},实际 {final_value_str}"
|
||
return False, reason, actual_date, actual_time
|
||
|
||
except Exception as e:
|
||
logger.error(f"验证定时时间失败: {e}")
|
||
return False, f"定时设置失败:验证异常 {e}", "", ""
|
||
|
||
def _ensure_schedule_selected(self, video_container):
|
||
"""确保切换到定时发布选项。"""
|
||
try:
|
||
schedule_label = video_container.ele(
|
||
'x://label[@data-testid="beast-core-radio"][.//*[contains(text(), "立即发布")]]',
|
||
timeout=2
|
||
)
|
||
if schedule_label:
|
||
schedule_label.click()
|
||
time.sleep(1)
|
||
|
||
schedule_label = video_container.ele(
|
||
'x://label[@data-testid="beast-core-radio"][.//*[contains(text(), "定时发布")]]',
|
||
timeout=2
|
||
)
|
||
if schedule_label:
|
||
schedule_label.click()
|
||
time.sleep(0.5)
|
||
|
||
checked = schedule_label.attr("data-checked")
|
||
if checked == "true":
|
||
logger.info("定时发布已选中")
|
||
return True
|
||
|
||
# 备用:点击内部 radio input
|
||
radio_input = schedule_label.ele('x://input[@type="radio"]', timeout=1)
|
||
if radio_input:
|
||
radio_input.click()
|
||
time.sleep(0.5)
|
||
|
||
checked = schedule_label.attr("data-checked")
|
||
if checked == "true":
|
||
logger.info("定时发布已选中")
|
||
return True
|
||
|
||
# 最后兜底:点击文本
|
||
schedule_text = video_container.ele('x://*[contains(text(), "定时发布")]', timeout=1)
|
||
if schedule_text:
|
||
schedule_text.click()
|
||
time.sleep(0.5)
|
||
return True
|
||
except Exception as e:
|
||
logger.warning(f"切换定时发布失败: {e}")
|
||
|
||
return False
|
||
|
||
|
||
datas = ThreadSafeDict()
|
||
|
||
# if __name__ == '__main__':
|
||
# url = "18 【运动男孩都爱这么穿吗?🏃 - Liu_烫烫 | 小红书 - 你的生活兴趣社区】 😆 D13BaPl6xyUAuQO 😆 https://www.xiaohongshu.com/discovery/item/678ceeef000000001602fb54?source=webshare&xhsshare=pc_web&xsec_token=ABe9oWR9CYCsHBkWUPuoS1Fz3_Uz4WGFMdfCGwSbl0Dfs=&xsec_source=pc_share"
|
||
# pdd = Pdd(
|
||
# url=url,
|
||
# user_id="1050100241",
|
||
# time_start="2026-01-28 09:30:00",
|
||
# ht="#python #haha",
|
||
# index="1",
|
||
# )
|
||
# # pdd.action(folder_path=r"C:\Users\27942\Desktop\多多自动化发文")
|
||
#
|
||
# folder_path = r"C:\Users\27942\Desktop\多多自动化发文"
|
||
# file_paths = []
|
||
# for file in os.listdir(folder_path): # 获取文件夹下所有的文件夹
|
||
# file_path = os.path.join(folder_path, file) # 拼接文件夹
|
||
# # 检查是否为目录,跳过文件(如.lnk快捷方式)
|
||
# if not os.path.isdir(file_path):
|
||
# continue
|
||
# files = os.listdir(file_path) # 获取用户id下的文件
|
||
# for file in files:
|
||
# if ".mp4" in file:
|
||
# file_names = file.split("-")
|
||
#
|
||
# path = Path(os.path.join(file_path, file))
|
||
# # 判断是否为文件
|
||
#
|
||
# file_paths.append(
|
||
# {
|
||
# "url": url,
|
||
# "user_id": "1050100241",
|
||
# "time_start": "2026-01-28 09:30:00",
|
||
# "ht": "#python #haha",
|
||
# "index": "1",
|
||
# "path": path
|
||
# }
|
||
# )
|
||
#
|
||
# pdd.action1(folder_path=file_paths)
|