Files
haha/main.py
27942 c35b3d21d5 gui
第一版
2026-01-20 04:09:09 +08:00

1154 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import json
import time
from pathlib import Path
from loguru import logger
from bs4 import BeautifulSoup
from curl_cffi import requests
from DrissionPage import ChromiumPage, ChromiumOptions, SessionPage
class Pdd:
def __init__(self, url, user_id, time_start, ht, index, title=None):
self.url = url
self.user_id = user_id
self.time_start = time_start
self.session = requests.Session()
# 浏览器和URL模板
self.page = None
self.user_url_template = None # 用户视频列表URL模板
self.user_profile_url_template = None # 用户信息URL模板
self.title = title
self.ht = ht
self.index = index
def create_page(self):
co = ChromiumOptions()
co.set_tmp_path("user/tmp")
co.set_user_data_path("user/user_data")
# 以该配置创建页面对象
self.page = ChromiumPage(addr_or_opts=co)
def extract_note_data(self, initial_state):
"""
从初始状态中提取笔记数据(只提取标题、描述、图片列表、视频列表和话题)
Args:
initial_state: window.__INITIAL_STATE__ 解析后的字典
Returns:
dict: 提取的笔记数据
"""
try:
# 获取笔记详情
note_store = initial_state.get('note', {})
note_detail_map = note_store.get('noteDetailMap', {})
# 获取第一个笔记ID
first_note_id = note_store.get('firstNoteId')
if not first_note_id:
# 如果没有firstNoteId尝试获取noteDetailMap中的第一个key
if note_detail_map:
first_note_id = list(note_detail_map.keys())[0]
else:
print("未找到笔记ID")
return None
# 获取笔记详情
note_detail = note_detail_map.get(first_note_id, {})
note_info = note_detail.get('note', {})
if not note_info:
print("未找到笔记信息")
return None
# 只提取需要的字段
extracted_data = {
'title': note_info.get('title'),
'desc': note_info.get('desc'),
'images': [],
'videos': [],
'topics': []
}
# 提取图片信息
image_list = note_info.get('imageList', [])
for img in image_list:
image_data = {
'url': img.get('urlDefault') or img.get('url'),
'urlPre': img.get('urlPre'),
'width': img.get('width'),
'height': img.get('height'),
}
extracted_data['images'].append(image_data)
# 提取视频信息(如果存在)
video_info = note_info.get('video', {})
if video_info:
video_data = {}
# 尝试提取视频URL
media = video_info.get('media', {})
if media:
stream = media.get('stream', {})
if stream:
hls = stream.get('hls', {})
if hls:
video_data['url'] = hls.get('masterUrl') or hls.get('url')
# 如果没有hls尝试其他字段
if not video_data.get('url'):
video_data['url'] = media.get('url') or media.get('videoUrl')
# 提取视频封面
if video_info.get('cover'):
video_data['cover'] = video_info.get('cover')
# 提取视频时长
if video_info.get('time'):
video_data['time'] = video_info.get('time')
if video_data.get('url'):
extracted_data['videos'].append(video_data)
# 提取话题信息
# 话题可能在多个位置,尝试不同的字段名
topic_list = note_info.get('topicList', []) or note_info.get('tagList', []) or note_info.get('hashtagList',
[])
if topic_list:
for topic in topic_list:
topic_data = {
'name': topic.get('name') or topic.get('title') or topic.get('tagName'),
'id': topic.get('id') or topic.get('topicId') or topic.get('tagId'),
}
if topic_data.get('name'):
extracted_data['topics'].append(topic_data)
# 如果描述中包含话题(#话题#格式),也提取出来
desc = note_info.get('desc', '')
if desc:
# 使用正则表达式提取 #话题# 格式
topic_pattern = r'#([^#]+)#'
matches = re.findall(topic_pattern, desc)
for match in matches:
# 避免重复添加
if not any(t.get('name') == match for t in extracted_data['topics']):
extracted_data['topics'].append({'name': match})
return extracted_data
except Exception as e:
print(f"提取笔记数据时出错:{e}")
import traceback
traceback.print_exc()
return None
def extract_video_from_meta(self, html_content):
"""
从HTML的meta标签中提取视频信息
Args:
html_content: HTML内容字符串
Returns:
dict: 视频信息字典如果没有找到则返回None
"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
video_info = {}
# 提取og:video标签
og_video = soup.find('meta', {'name': 'og:video'})
if og_video and og_video.get('content'):
video_info['url'] = og_video.get('content')
# 提取视频时长
og_videotime = soup.find('meta', {'name': 'og:videotime'})
if og_videotime and og_videotime.get('content'):
video_info['time'] = og_videotime.get('content')
# 提取视频质量
og_videoquality = soup.find('meta', {'name': 'og:videoquality'})
if og_videoquality and og_videoquality.get('content'):
video_info['quality'] = og_videoquality.get('content')
# 如果找到了视频URL返回视频信息
if video_info.get('url'):
return video_info
return None
except Exception as e:
print(f"从meta标签提取视频信息时出错{e}")
return None
def get_page_datas(self):
tab = self.page.new_tab()
tab.listen.start(self.url)
tab.get(url=self.url)
res = tab.listen.wait(timeout=3)
if res:
print(res.response.body)
# 提取meta标签中的视频信息
video_info = self.extract_video_from_meta(res.response.body)
# 使用正则表达式提取window.__INITIAL_STATE__的内容
pattern = r'<script>window\.__INITIAL_STATE__\s*=\s*({.*?});?\s*</script>'
match = re.search(pattern, res.response.body, re.DOTALL)
if not match:
print("未找到 window.__INITIAL_STATE__ 数据")
# 如果只有视频信息,返回视频信息
if video_info:
return {'videos': [video_info]}
return None
# 提取JSON字符串
json_str = match.group(1)
# 处理JavaScript中的undefined值Python JSON不支持undefined
json_str = re.sub(r'\bundefined\b', 'null', json_str)
# 解析JSON
initial_state = json.loads(json_str)
# 提取笔记数据
note_data = self.extract_note_data(initial_state)
# 如果提取到视频信息,添加到笔记数据中
if video_info and note_data:
if 'videos' not in note_data or not note_data['videos']:
note_data['videos'] = []
note_data['videos'].append(video_info)
tab.close()
return note_data
def download_video(self, url):
page = SessionPage()
page.download('https://sns-video-hw.xhscdn.com/stream/110/258/01e6cd08be6e36ad010370019190eceaac_258.mp4')
def download_image(self, url, name):
"""
下载图片文件
Args:
url: 图片URL
save_path: 保存路径如果为None则使用URL中的文件名
"""
# 设置请求头
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control': 'no-cache',
'DNT': '1',
'Pragma': 'no-cache',
'Proxy-Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0'
}
try:
# 发送请求verify=False 相当于 curl 的 --insecure
response = requests.get(url, headers=headers, verify=False, timeout=30)
response.raise_for_status() # 检查HTTP错误
# 保存文件
with open(f"{name}.webp", 'wb') as f:
f.write(response.content)
return True
except requests.exceptions.RequestException as e:
print(f"下载失败: {e}")
return None
def action(self, folder_path=None):
self.create_page()
if datas.get(self.user_id):
creator_tab = self.page.new_tab(datas.get(self.user_id))
else:
self.page.get(url="https://mcn.pinduoduo.com/register")
for i in range(5):
if self.page.ele("x://*[text()='登录']", timeout=5):
logger.warning("请登录》》》")
elif self.page.ele("x://*[text()='主播/作者管理']", timeout=5):
break
else:
logger.error("未登录!!!")
return
self.page.ele("x://*[text()='主播/作者管理']").click()
time.sleep(1)
self.page.ele("x://*[text()='签约主播/作者']").click()
ele = self.page.ele("x://*[text()='我知道了']", timeout=3)
if ele:
ele.click()
time.sleep(1)
self.page.ele('x://*[@placeholder="输入主播/作者ID搜索"]').input(vals=self.user_id, clear=True)
time.sleep(1)
self.page.ele("x://*[text()='提交']").click()
time.sleep(1)
self.page.actions.move_to(ele_or_loc="x://*[text()='内容管理']")
time.sleep(1)
self.page.ele("x://*[text()='内容管理']").click()
time.sleep(3)
creator_tab = self.page.get_tab(url="home/creator/manage")
creator_tab.ele("x://*[text()='发布视频']").click()
datas[self.user_id] = creator_tab.url
# 从文件夹读取文件
file_paths = []
if folder_path and os.path.exists(folder_path):
# 支持的视频格式
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm']
# 支持的图片格式
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']
# 获取文件夹中的所有文件
for file in os.listdir(folder_path): # 获取文件夹下所有的文件夹
file_path = os.path.join(folder_path, file) # 拼接文件夹
# 检查是否为目录,跳过文件(如.lnk快捷方式
if not os.path.isdir(file_path):
continue
files = os.listdir(file_path) # 获取用户id下的文件
for file in files:
file_names = file.split("-")
if file_names[0] == self.index:
path = Path(os.path.join(file_path, file))
# 判断是否为文件
if path.is_file():
file_paths.append(path)
creator_tab.ele("x://*[text()='添加视频']").click.to_upload(file_paths)
# xpath_path = creator_tab.ele('x://*[text()="添加视频描述"]').xpath
# new_path = re.sub(r'div\[1\]$', 'div[2]', xpath_path)
# new_path += "/div[2]/div/div[1]/div[2]"
# creator_tab.ele(f'x:{new_path}').input(vals=self.title + self.ht, clear=True)
time.sleep(3)
creator_tab.ele(f'x://*[@id="magicdomid1"]').input(
vals=file_names[-1].split(".")[0] + self.ht, clear=True)
break
else:
for file in os.listdir(path):
file_paths.append(os.path.join(path, file))
creator_tab.ele('x://*[text()="添加图片"]').click.to_upload(file_paths)
time.sleep(3)
creator_tab.ele('x://*[@placeholder="添加标题"]').input(vals=file_names[1], clear=True)
xpath_path = creator_tab.ele('x://*[text()="添加视频描述"]').xpath
# 方法2使用正则表达式替换最后一个div[1]
new_path = re.sub(r'div\[1\]$', 'div[2]', xpath_path)
new_path += "/div/div[3]/div/div/div"
creator_tab.ele(f'x:{new_path}').input(vals=file_names[2] + " " + self.ht, clear=True)
break
# if ".mp4" in file_path:
# # 上传视频
# creator_tab.ele("x://*[text()='添加视频']").click.to_upload(file_path)
# else:
# # 上传图片
# image_files = [f for f in files if os.path.splitext(f)[1].lower() in image_extensions]
# path_datas = image_files
# if path_datas:
# creator_tab.ele('x://*[text()="添加图片"]').click.to_upload(path_datas)
time.sleep(3)
creator_tab.ele('x://*[contains(text(), "立即发布")]').click()
# 定时
if self.time_start:
# 点击"定时发布"选项
# creator_tab.ele(
# 'x://*[@id="root"]/section/section/main/div/div/div/div[2]/div[2]/div/div[1]/div/div[2]/div/div[3]/div/div/div/label[2]').click()
creator_tab.ele('x://*[contains(text(), "定时发布")]').click()
time.sleep(1)
# 获取日期选择器元素
date_picker_ele = creator_tab.ele('x://*[@placeholder="选择日期"]', timeout=3)
if date_picker_ele:
# 解析时间字符串格式2026-01-15 09:30:00
try:
from datetime import datetime
dt = datetime.strptime(self.time_start, "%Y-%m-%d %H:%M:%S")
date_str = dt.strftime("%Y-%m-%d")
time_str = dt.strftime("%H:%M:%S")
year = dt.year
month = dt.month
day = dt.day
hour = dt.hour
minute = dt.minute
second = dt.second
logger.info(
f"开始设置定时时间: {self.time_start} (年={year}, 月={month}, 日={day}, 时={hour}, 分={minute}, 秒={second})")
# 点击日期选择器打开面板
date_picker_ele.click()
time.sleep(1.5) # 等待面板完全加载
# 方法:通过点击日期和时间选择器来设置
# 1. 如果需要,先切换年月
# 2. 点击日期单元格
# 3. 点击时间选择器中的小时、分钟、秒
# 4. 点击确认按钮
# 检查并切换年月(如果需要)
# 获取当前显示的月份
try:
month_text_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]', timeout=2)
if month_text_ele:
current_month = month_text_ele.text
logger.info(f"当前显示的月份: {current_month}")
# 如果需要切换月份
target_month_str = f"{month}"
if current_month != target_month_str:
logger.info(f"需要切换到目标月份: {target_month_str}")
# 计算月份差值(简化处理,只考虑同一年内)
current_month_num = int(current_month.replace('', ''))
target_month_num = month
# 确定点击方向
if target_month_num > current_month_num:
# 点击右箭头
arrow_selector = 'x://svg[@data-testid="beast-core-icon-right"]'
clicks_needed = target_month_num - current_month_num
else:
# 点击左箭头
arrow_selector = 'x://svg[@data-testid="beast-core-icon-left"]'
clicks_needed = current_month_num - target_month_num
# 点击箭头切换月份
for _ in range(min(clicks_needed, 12)):
arrow = creator_tab.ele(arrow_selector, timeout=1)
if arrow:
arrow.click()
time.sleep(0.4)
# 验证是否切换成功
new_month_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]',
timeout=1)
if new_month_ele and new_month_ele.text == target_month_str:
logger.info(f"成功切换到目标月份: {target_month_str}")
break
except Exception as e:
logger.warning(f"切换月份时出错: {e},继续尝试选择日期")
# 选择日期 - 点击对应的日期单元格
date_cell = creator_tab.ele(
f'x://td[@role="date-cell"]//div[@title="{day}" and not(contains(@class, "RPR_disabled")) and not(contains(@class, "RPR_outOfMonth"))]',
timeout=3)
if date_cell:
date_cell.click()
logger.info(f"已点击日期: {day}")
time.sleep(0.5)
else:
logger.warning(f"未找到日期单元格: {day}")
# 先点击时间输入框打开时间选择器
time_input = creator_tab.ele('x://input[@data-testid="beast-core-timePicker-html-input"]',
timeout=3)
if time_input:
time_input.click()
logger.info("已点击时间输入框,打开时间选择器")
time.sleep(0.8) # 等待时间选择器面板打开
else:
logger.warning("未找到时间输入框尝试使用XPath")
# 备用方案使用用户提供的XPath
try:
time_input_xpath = '/html/body/div[2]/div/div/div/div/div/footer/div/div/div/div/div/div/div/div[1]/input'
time_input = creator_tab.ele(f'x:{time_input_xpath}', timeout=2)
if time_input:
time_input.click()
logger.info("通过XPath点击了时间输入框")
time.sleep(0.8)
except Exception as e:
logger.warning(f"通过XPath也未能找到时间输入框: {e}")
# 选择时间 - 点击时间选择器中的小时、分钟、秒
# 小时
hour_str = f"{hour:02d}"
hour_item = creator_tab.ele(
f'x://ul[@data-testid="beast-core-timePicker-list-hh"]//li[text()="{hour_str}"]', timeout=3)
if hour_item:
hour_item.scroll.to_see()
time.sleep(0.2)
hour_item.click()
logger.info(f"已选择小时: {hour_str}")
time.sleep(0.3)
else:
logger.warning(f"未找到小时选项: {hour_str}")
# 分钟
minute_str = f"{minute:02d}"
minute_item = creator_tab.ele(
f'x://ul[@data-testid="beast-core-timePicker-list-mm"]//li[text()="{minute_str}"]', timeout=3)
if minute_item:
minute_item.scroll.to_see()
time.sleep(0.2)
minute_item.click()
logger.info(f"已选择分钟: {minute_str}")
time.sleep(0.3)
else:
logger.warning(f"未找到分钟选项: {minute_str}")
# 秒
second_str = f"{second:02d}"
second_item = creator_tab.ele(
f'x://ul[@data-testid="beast-core-timePicker-list-ss"]//li[text()="{second_str}"]', timeout=3)
if second_item:
second_item.scroll.to_see()
time.sleep(0.2)
second_item.click()
logger.info(f"已选择秒: {second_str}")
time.sleep(0.3)
else:
logger.warning(f"未找到秒选项: {second_str}")
# 点击确认按钮
try:
# 查找确认按钮
confirm_btn = creator_tab.ele(
'x://button[@data-testid="beast-core-button"]//span[text()="确认"]', timeout=3)
if confirm_btn:
confirm_btn.click()
logger.info("已点击确认按钮")
time.sleep(0.5)
else:
# 尝试通过JavaScript点击确认按钮
confirm_js = """
(function() {
const buttons = document.querySelectorAll('button[data-testid="beast-core-button"]');
for (let btn of buttons) {
const span = btn.querySelector('span');
if (span && span.textContent.includes('确认')) {
btn.click();
return true;
}
}
return false;
})();
"""
result = creator_tab.run_js(confirm_js)
if result:
logger.info("通过JavaScript点击了确认按钮")
else:
logger.warning("未找到确认按钮")
time.sleep(0.5)
except Exception as e:
logger.warning(f"点击确认按钮失败: {e}")
# 验证设置是否成功
time.sleep(0.5)
check_js = """
(function() {
const dateInput = document.querySelector('[data-testid="beast-core-datePicker-htmlInput"]');
return dateInput ? dateInput.value : null;
})();
"""
final_value = creator_tab.run_js(check_js)
if final_value and final_value.strip():
logger.info(f"日期选择器当前值: {final_value}")
# 检查是否匹配(允许时间有小的差异,因为可能只精确到秒)
if final_value.strip().startswith(date_str):
logger.info(f"成功设置定时时间: {final_value}")
else:
logger.warning(f"设置的时间可能不准确,当前值: {final_value}, 期望日期: {date_str}")
else:
logger.error(f"无法获取日期选择器的值,可能设置失败")
except ValueError as e:
logger.error(f"时间格式错误: {self.time_start}, 正确格式应为: YYYY-MM-DD HH:MM:SS, 错误: {e}")
except Exception as e:
logger.error(f"设置定时时间失败: {e}")
import traceback
traceback.print_exc()
# 绑定任务
ele = creator_tab.ele('x://*[text()="点击绑定任务"]', timeout=3)
if ele:
ele.click()
creator_tab.ele('x://*[@placeholder="请输入个人主页链接"]').input(self.url)
time.sleep(1)
creator_tab.ele('x://*[text()="确认"]').click()
time.sleep(1)
ele = creator_tab.ele('x://*[text()="我已阅读并同意"]', timeout=3)
if ele:
ele.click()
time.sleep(1)
creator_tab.ele('x://*[text()="一键发布"]').click()
time.sleep(5)
creator_tab.close()
def action1(self, folder_path=None, input_delay=0):
"""
批量上传视频,针对每个视频单独处理详情、定时任务和绑定任务
"""
self.create_page()
if datas.get(self.user_id):
creator_tab = self.page.new_tab(datas.get(self.user_id))
else:
self.page.get(url="https://mcn.pinduoduo.com/register")
for i in range(5):
if self.page.ele("x://*[text()='主播/作者管理']", timeout=5):
break
else:
logger.error("未登录!!!")
return
self.page.ele("x://*[text()='主播/作者管理']").click()
time.sleep(1)
self.page.ele("x://*[text()='签约主播/作者']").click()
ele = self.page.ele("x://*[text()='我知道了']", timeout=3)
if ele:
ele.click()
time.sleep(1)
self.page.ele('x://*[@placeholder="输入主播/作者ID搜索"]').input(vals=self.user_id, clear=True)
time.sleep(1)
self.page.ele("x://*[text()='提交']").click()
time.sleep(1)
self.page.actions.move_to(ele_or_loc="x://*[text()='内容管理']")
time.sleep(1)
self.page.ele("x://*[text()='内容管理']").click()
time.sleep(3)
creator_tab = self.page.get_tab(url="home/creator/manage")
creator_tab.ele("x://*[text()='发布视频']").click()
datas[self.user_id] = creator_tab.url
# 批量上传视频
videos = []
for i in folder_path:
videos.append(i["path"])
logger.info(f"开始批量上传 {len(videos)} 个视频")
creator_tab.ele("x://*[text()='支持批量上传']").click.to_upload(videos)
# 等待一段时间让视频开始上传和页面渲染
logger.info("等待视频开始上传和页面渲染...")
time.sleep(5)
# 不等待所有视频上传完成,而是检测每个视频的状态
# 只处理已上传完成的视频,跳过还在上传中的视频
logger.info("检测视频上传状态,只处理已上传完成的视频...")
# 针对每个视频单独处理
for idx, video_info in enumerate(folder_path):
try:
video_path = video_info["path"]
video_name = video_path.name
video_ht = video_info.get("ht", self.ht)
video_time_start = video_info.get("time_start", self.time_start)
video_url = video_info.get("url", self.url)
logger.info(f"处理第 {idx + 1}/{len(folder_path)} 个视频: {video_name}")
# 定位视频容器:优先按文件名匹配,其次按索引
video_container = None
video_name_without_ext = video_name.rsplit(".", 1)[0]
container_xpath = (
'x://p[contains(text(), "文件名:") and contains(text(), "{name}")]'
'/ancestor::div[contains(@class, "y0VjbyIp")][1]'
)
video_container = creator_tab.ele(
container_xpath.format(name=video_name), timeout=5
)
if not video_container:
video_container = creator_tab.ele(
container_xpath.format(name=video_name_without_ext), timeout=3
)
if not video_container:
containers = creator_tab.eles(
'x://p[contains(text(), "文件名:")]'
'/ancestor::div[contains(@class, "y0VjbyIp")][1]'
)
if idx < len(containers):
video_container = containers[idx]
logger.warning(
f"通过索引方式定位到视频容器: {idx + 1}/{len(containers)}"
)
if not video_container:
logger.warning(f"未找到视频 {video_name} 的容器,跳过")
continue
# 检测上传状态(在当前视频容器内)
# 优先判断是否仍在上传,其次判断发布按钮是否可用
uploading_text = video_container.ele(
'x://*[contains(., "视频上传中")]', timeout=0.5
)
if uploading_text:
logger.warning(f"视频 {video_name} 还在上传中,跳过处理")
continue
success_text = video_container.ele(
'x://*[contains(., "视频上传成功")]', timeout=0.5
)
if success_text:
logger.info(f"检测到视频 {video_name} 上传成功标识")
else:
# 备用判断:发布按钮未禁用即可认为已完成
disabled_publish = video_container.ele(
'x://button[@data-testid="beast-core-button" and (@disabled or contains(@class, "BTN_disabled"))]//span[text()="发布"]',
timeout=0.5
)
if disabled_publish:
logger.warning(f"视频 {video_name} 发布按钮仍禁用,跳过")
continue
logger.info(f"未检测到成功标识,但发布按钮可用,继续处理 {video_name}")
# 1. 输入视频描述
try:
desc_text = video_name.split(".")[0].split("-")[-1] + video_ht
desc_inputs = video_container.eles('x://*[starts-with(@id, "magicdomid")]')
if desc_inputs:
desc_inputs[0].input(vals=desc_text, clear=True)
logger.info(f"已输入视频描述: {desc_text[:50]}...")
else:
logger.warning("在视频容器中未找到描述输入框")
except Exception as e:
logger.warning(f"输入视频描述失败: {e}")
import traceback
traceback.print_exc()
time.sleep(1)
# 给用户留出填写信息的时间
if input_delay and input_delay > 0:
logger.info(f"等待用户填写视频信息: {input_delay}s")
time.sleep(input_delay)
# 2. 设置定时任务(如果该视频有定时时间)
if video_time_start:
try:
# 定位并确认勾选“定时发布”
if not self._ensure_schedule_selected(video_container):
logger.warning("未能切换到定时发布,跳过定时设置")
continue
# 设置定时时间
self._set_schedule_time(creator_tab, video_time_start, video_container, idx)
except Exception as e:
logger.warning(f"设置定时任务失败: {e}")
import traceback
traceback.print_exc()
# 3. 绑定任务如果该视频有URL
if video_url:
try:
bind_btn = video_container.ele('x://*[text()="点击绑定任务"]', timeout=3)
if bind_btn:
bind_btn.click()
time.sleep(1)
# 输入URL这个输入框可能在弹窗中使用全局查找
url_input = creator_tab.ele('x://*[@placeholder="请输入个人主页链接"]', timeout=3)
if url_input:
url_input.input(video_url, clear=True)
time.sleep(1)
confirm_btn = creator_tab.ele('x://*[text()="确认"]', timeout=3)
if confirm_btn:
confirm_btn.click()
logger.info(f"已绑定任务: {video_url[:50]}...")
time.sleep(1)
else:
logger.warning("未找到URL输入框")
else:
logger.warning("未找到绑定任务按钮")
except Exception as e:
logger.warning(f"绑定任务失败: {e}")
import traceback
traceback.print_exc()
# 4. 点击该视频的发布按钮(注意:这里应该是单个视频的"发布"按钮,不是"立即发布"
try:
publish_btn = video_container.ele(
'x://button[@data-testid="beast-core-button"]//span[text()="发布"]', timeout=3
)
if publish_btn:
publish_btn.click()
logger.info("已点击发布按钮")
else:
logger.warning("在视频容器中未找到发布按钮")
except Exception as e:
logger.warning(f"点击发布按钮失败: {e}")
import traceback
traceback.print_exc()
time.sleep(2) # 每个视频处理间隔
except Exception as e:
logger.error(f"处理视频 {video_info.get('path', {}).name if hasattr(video_info.get('path', ''), 'name') else 'unknown'} 时出错: {e}")
import traceback
traceback.print_exc()
continue
# 最后点击一键发布(如果有)
try:
ele = creator_tab.ele('x://*[text()="我已阅读并同意"]', timeout=3)
if ele:
ele.click()
time.sleep(1)
one_key_publish = creator_tab.ele('x://*[text()="一键发布"]', timeout=3)
if one_key_publish:
one_key_publish.click()
logger.info("已点击一键发布")
time.sleep(5)
except Exception as e:
logger.warning(f"一键发布失败: {e}")
time.sleep(5)
creator_tab.close()
def _set_schedule_time(self, creator_tab, time_start, video_container=None, video_index=None):
"""
设置定时发布时间的辅助方法
Args:
creator_tab: 浏览器标签页对象
time_start: 时间字符串格式2026-01-15 09:30:00
video_container: 视频容器元素(可选)
video_index: 视频索引当video_container为None时使用
"""
try:
from datetime import datetime
dt = datetime.strptime(time_start, "%Y-%m-%d %H:%M:%S")
date_str = dt.strftime("%Y-%m-%d")
year = dt.year
month = dt.month
day = dt.day
hour = dt.hour
minute = dt.minute
second = dt.second
logger.info(f"开始设置定时时间: {time_start} (年={year}, 月={month}, 日={day}, 时={hour}, 分={minute}, 秒={second})")
# 定位日期选择器(优先容器内,且优先 active
date_picker_ele = None
if video_container and video_container != creator_tab:
date_picker_ele = video_container.ele(
'x://div[@data-testid="beast-core-datePicker-input" and @data-status="active"]'
'//input[@data-testid="beast-core-datePicker-htmlInput"]',
timeout=2
)
if not date_picker_ele:
date_picker_ele = video_container.ele(
'x://input[@data-testid="beast-core-datePicker-htmlInput"]',
timeout=2
)
if not date_picker_ele:
date_picker_ele = video_container.ele(
'x://*[@placeholder="选择日期"]',
timeout=2
)
if not date_picker_ele:
# 兜底:全局查找(优先 active
date_picker_ele = creator_tab.ele(
'x://div[@data-testid="beast-core-datePicker-input" and @data-status="active"]'
'//input[@data-testid="beast-core-datePicker-htmlInput"]',
timeout=2
)
if not date_picker_ele:
date_picker_ele = creator_tab.ele(
'x://input[@data-testid="beast-core-datePicker-htmlInput"]',
timeout=2
)
if not date_picker_ele:
date_picker_ele = creator_tab.ele('x://*[@placeholder="选择日期"]', timeout=2)
if not date_picker_ele:
logger.warning("未找到日期选择器")
return
# 点击日期选择器打开面板
date_picker_ele.click()
time.sleep(1.5)
# 检查并切换年月(如果需要)
try:
month_text_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]', timeout=2)
if month_text_ele:
current_month = month_text_ele.text
logger.info(f"当前显示的月份: {current_month}")
target_month_str = f"{month}"
if current_month != target_month_str:
logger.info(f"需要切换到目标月份: {target_month_str}")
current_month_num = int(current_month.replace('', ''))
target_month_num = month
if target_month_num > current_month_num:
arrow_selector = 'x://svg[@data-testid="beast-core-icon-right"]'
clicks_needed = target_month_num - current_month_num
else:
arrow_selector = 'x://svg[@data-testid="beast-core-icon-left"]'
clicks_needed = current_month_num - target_month_num
for _ in range(min(clicks_needed, 12)):
arrow = creator_tab.ele(arrow_selector, timeout=1)
if arrow:
arrow.click()
time.sleep(0.4)
new_month_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]', timeout=1)
if new_month_ele and new_month_ele.text == target_month_str:
logger.info(f"成功切换到目标月份: {target_month_str}")
break
except Exception as e:
logger.warning(f"切换月份时出错: {e},继续尝试选择日期")
# 选择日期
date_cell = creator_tab.ele(
f'x://td[@role="date-cell"]//div[@title="{day}" and not(contains(@class, "RPR_disabled")) and not(contains(@class, "RPR_outOfMonth"))]',
timeout=3)
if date_cell:
date_cell.click()
logger.info(f"已点击日期: {day}")
time.sleep(0.5)
else:
logger.warning(f"未找到日期单元格: {day}")
# 点击时间输入框打开时间选择器(优先容器内)
time_input = None
if video_container and video_container != creator_tab:
time_input = video_container.ele(
'x://input[@data-testid="beast-core-timePicker-html-input"]',
timeout=3
)
if not time_input:
time_input = creator_tab.ele(
'x://input[@data-testid="beast-core-timePicker-html-input"]',
timeout=3
)
if time_input:
time_input.click()
logger.info("已点击时间输入框,打开时间选择器")
time.sleep(0.8)
else:
logger.warning("未找到时间输入框尝试使用XPath")
try:
time_input_xpath = '/html/body/div[2]/div/div/div/div/div/footer/div/div/div/div/div/div/div/div[1]/input'
time_input = creator_tab.ele(f'x:{time_input_xpath}', timeout=2)
if time_input:
time_input.click()
logger.info("通过XPath点击了时间输入框")
time.sleep(0.8)
except Exception as e:
logger.warning(f"通过XPath也未能找到时间输入框: {e}")
# 选择时间
hour_str = f"{hour:02d}"
hour_item = creator_tab.ele(
f'x://ul[@data-testid="beast-core-timePicker-list-hh"]//li[text()="{hour_str}"]', timeout=3)
if hour_item:
hour_item.scroll.to_see()
time.sleep(0.2)
hour_item.click()
logger.info(f"已选择小时: {hour_str}")
time.sleep(0.3)
else:
logger.warning(f"未找到小时选项: {hour_str}")
minute_str = f"{minute:02d}"
minute_item = creator_tab.ele(
f'x://ul[@data-testid="beast-core-timePicker-list-mm"]//li[text()="{minute_str}"]', timeout=3)
if minute_item:
minute_item.scroll.to_see()
time.sleep(0.2)
minute_item.click()
logger.info(f"已选择分钟: {minute_str}")
time.sleep(0.3)
else:
logger.warning(f"未找到分钟选项: {minute_str}")
second_str = f"{second:02d}"
second_item = creator_tab.ele(
f'x://ul[@data-testid="beast-core-timePicker-list-ss"]//li[text()="{second_str}"]', timeout=3)
if second_item:
second_item.scroll.to_see()
time.sleep(0.2)
second_item.click()
logger.info(f"已选择秒: {second_str}")
time.sleep(0.3)
else:
logger.warning(f"未找到秒选项: {second_str}")
# 点击确认按钮
try:
confirm_btn = creator_tab.ele(
'x://button[@data-testid="beast-core-button"]//span[text()="确认"]', timeout=3)
if confirm_btn:
confirm_btn.click()
logger.info("已点击确认按钮")
time.sleep(0.5)
else:
confirm_js = """
(function() {
const buttons = document.querySelectorAll('button[data-testid="beast-core-button"]');
for (let btn of buttons) {
const span = btn.querySelector('span');
if (span && span.textContent.includes('确认')) {
btn.click();
return true;
}
}
return false;
})();
"""
result = creator_tab.run_js(confirm_js)
if result:
logger.info("通过JavaScript点击了确认按钮")
else:
logger.warning("未找到确认按钮")
time.sleep(0.5)
except Exception as e:
logger.warning(f"点击确认按钮失败: {e}")
# 验证设置是否成功(优先从当前容器读取输入框值)
time.sleep(0.5)
final_value = None
try:
value_ele = None
if video_container and video_container != creator_tab:
value_ele = video_container.ele(
'x://input[@data-testid="beast-core-datePicker-htmlInput"]',
timeout=1
)
if not value_ele:
value_ele = creator_tab.ele(
'x://input[@data-testid="beast-core-datePicker-htmlInput"]',
timeout=1
)
if value_ele:
final_value = value_ele.attr("value")
except Exception as e:
logger.warning(f"读取日期选择器值失败: {e}")
if final_value and str(final_value).strip():
final_value_str = str(final_value).strip()
logger.info(f"日期选择器当前值: {final_value_str}")
if final_value_str.startswith(date_str):
logger.info(f"成功设置定时时间: {final_value_str}")
else:
logger.warning(f"设置的时间可能不准确,当前值: {final_value_str}, 期望日期: {date_str}")
else:
logger.error("无法获取日期选择器的值,可能设置失败")
except ValueError as e:
logger.error(f"时间格式错误: {time_start}, 正确格式应为: YYYY-MM-DD HH:MM:SS, 错误: {e}")
except Exception as e:
logger.error(f"设置定时时间失败: {e}")
import traceback
traceback.print_exc()
def _ensure_schedule_selected(self, video_container):
"""确保切换到定时发布选项。"""
try:
schedule_label = video_container.ele(
'x://label[@data-testid="beast-core-radio"][.//*[contains(text(), "立即发布")]]',
timeout=2
)
if schedule_label:
schedule_label.click()
time.sleep(1)
schedule_label = video_container.ele(
'x://label[@data-testid="beast-core-radio"][.//*[contains(text(), "定时发布")]]',
timeout=2
)
if schedule_label:
schedule_label.click()
time.sleep(0.5)
checked = schedule_label.attr("data-checked")
if checked == "true":
logger.info("定时发布已选中")
return True
# 备用:点击内部 radio input
radio_input = schedule_label.ele('x://input[@type="radio"]', timeout=1)
if radio_input:
radio_input.click()
time.sleep(0.5)
checked = schedule_label.attr("data-checked")
if checked == "true":
logger.info("定时发布已选中")
return True
# 最后兜底:点击文本
schedule_text = video_container.ele('x://*[contains(text(), "定时发布")]', timeout=1)
if schedule_text:
schedule_text.click()
time.sleep(0.5)
return True
except Exception as e:
logger.warning(f"切换定时发布失败: {e}")
return False
datas = {}
if __name__ == '__main__':
url = "18 【运动男孩都爱这么穿吗?🏃 - Liu_烫烫 | 小红书 - 你的生活兴趣社区】 😆 D13BaPl6xyUAuQO 😆 https://www.xiaohongshu.com/discovery/item/678ceeef000000001602fb54?source=webshare&xhsshare=pc_web&xsec_token=ABe9oWR9CYCsHBkWUPuoS1Fz3_Uz4WGFMdfCGwSbl0Dfs=&xsec_source=pc_share"
pdd = Pdd(
url=url,
user_id="1050100241",
time_start="2026-01-28 09:30:00",
ht="#python #haha",
index="1",
)
# pdd.action(folder_path=r"C:\Users\27942\Desktop\多多自动化发文")
folder_path = r"C:\Users\27942\Desktop\多多自动化发文"
file_paths = []
for file in os.listdir(folder_path): # 获取文件夹下所有的文件夹
file_path = os.path.join(folder_path, file) # 拼接文件夹
# 检查是否为目录,跳过文件(如.lnk快捷方式
if not os.path.isdir(file_path):
continue
files = os.listdir(file_path) # 获取用户id下的文件
for file in files:
if ".mp4" in file:
file_names = file.split("-")
path = Path(os.path.join(file_path, file))
# 判断是否为文件
file_paths.append(
{
"url": url,
"user_id": "1050100241",
"time_start": "2026-01-28 09:30:00",
"ht": "#python #haha",
"index": "1",
"path": path
}
)
pdd.action1(folder_path=file_paths)