Files
haha/自动化.py
2026-02-05 16:06:06 +08:00

584 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import json
import time
from urllib.parse import urlparse
from loguru import logger
from bs4 import BeautifulSoup
from curl_cffi import requests
from DrissionPage import ChromiumPage, ChromiumOptions, SessionPage
class Pdd:
def __init__(self, url, user_id, time_start):
self.url = url
self.user_id = user_id
self.time_start = time_start
self.session = requests.Session()
# 浏览器和URL模板
self.page = None
self.user_url_template = None # 用户视频列表URL模板
self.user_profile_url_template = None # 用户信息URL模板
def create_page(self):
co = ChromiumOptions()
co.set_tmp_path("user/tmp")
co.set_user_data_path("user/user_data")
# 以该配置创建页面对象
self.page = ChromiumPage(addr_or_opts=co)
def extract_note_data(self, initial_state):
"""
从初始状态中提取笔记数据(只提取标题、描述、图片列表、视频列表和话题)
Args:
initial_state: window.__INITIAL_STATE__ 解析后的字典
Returns:
dict: 提取的笔记数据
"""
try:
# 获取笔记详情
note_store = initial_state.get('note', {})
note_detail_map = note_store.get('noteDetailMap', {})
# 获取第一个笔记ID
first_note_id = note_store.get('firstNoteId')
if not first_note_id:
# 如果没有firstNoteId尝试获取noteDetailMap中的第一个key
if note_detail_map:
first_note_id = list(note_detail_map.keys())[0]
else:
print("未找到笔记ID")
return None
# 获取笔记详情
note_detail = note_detail_map.get(first_note_id, {})
note_info = note_detail.get('note', {})
if not note_info:
print("未找到笔记信息")
return None
# 只提取需要的字段
extracted_data = {
'title': note_info.get('title'),
'desc': note_info.get('desc'),
'images': [],
'videos': [],
'topics': []
}
# 提取图片信息
image_list = note_info.get('imageList', [])
for img in image_list:
image_data = {
'url': img.get('urlDefault') or img.get('url'),
'urlPre': img.get('urlPre'),
'width': img.get('width'),
'height': img.get('height'),
}
extracted_data['images'].append(image_data)
# 提取视频信息(如果存在)
video_info = note_info.get('video', {})
if video_info:
video_data = {}
# 尝试提取视频URL
media = video_info.get('media', {})
if media:
stream = media.get('stream', {})
if stream:
hls = stream.get('hls', {})
if hls:
video_data['url'] = hls.get('masterUrl') or hls.get('url')
# 如果没有hls尝试其他字段
if not video_data.get('url'):
video_data['url'] = media.get('url') or media.get('videoUrl')
# 提取视频封面
if video_info.get('cover'):
video_data['cover'] = video_info.get('cover')
# 提取视频时长
if video_info.get('time'):
video_data['time'] = video_info.get('time')
if video_data.get('url'):
extracted_data['videos'].append(video_data)
# 提取话题信息
# 话题可能在多个位置,尝试不同的字段名
topic_list = note_info.get('topicList', []) or note_info.get('tagList', []) or note_info.get('hashtagList',
[])
if topic_list:
for topic in topic_list:
topic_data = {
'name': topic.get('name') or topic.get('title') or topic.get('tagName'),
'id': topic.get('id') or topic.get('topicId') or topic.get('tagId'),
}
if topic_data.get('name'):
extracted_data['topics'].append(topic_data)
# 如果描述中包含话题(#话题#格式),也提取出来
desc = note_info.get('desc', '')
if desc:
# 使用正则表达式提取 #话题# 格式
topic_pattern = r'#([^#]+)#'
matches = re.findall(topic_pattern, desc)
for match in matches:
# 避免重复添加
if not any(t.get('name') == match for t in extracted_data['topics']):
extracted_data['topics'].append({'name': match})
return extracted_data
except Exception as e:
print(f"提取笔记数据时出错:{e}")
import traceback
traceback.print_exc()
return None
def extract_video_from_meta(self, html_content):
"""
从HTML的meta标签中提取视频信息
Args:
html_content: HTML内容字符串
Returns:
dict: 视频信息字典如果没有找到则返回None
"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
video_info = {}
# 提取og:video标签
og_video = soup.find('meta', {'name': 'og:video'})
if og_video and og_video.get('content'):
video_info['url'] = og_video.get('content')
# 提取视频时长
og_videotime = soup.find('meta', {'name': 'og:videotime'})
if og_videotime and og_videotime.get('content'):
video_info['time'] = og_videotime.get('content')
# 提取视频质量
og_videoquality = soup.find('meta', {'name': 'og:videoquality'})
if og_videoquality and og_videoquality.get('content'):
video_info['quality'] = og_videoquality.get('content')
# 如果找到了视频URL返回视频信息
if video_info.get('url'):
return video_info
return None
except Exception as e:
print(f"从meta标签提取视频信息时出错{e}")
return None
def get_page_datas(self):
tab = self.page.new_tab()
tab.listen.start(self.url)
tab.get(url=self.url)
res = tab.listen.wait(timeout=3)
if res:
print(res.response.body)
# 提取meta标签中的视频信息
video_info = self.extract_video_from_meta(res.response.body)
# 使用正则表达式提取window.__INITIAL_STATE__的内容
pattern = r'<script>window\.__INITIAL_STATE__\s*=\s*({.*?});?\s*</script>'
match = re.search(pattern, res.response.body, re.DOTALL)
if not match:
print("未找到 window.__INITIAL_STATE__ 数据")
# 如果只有视频信息,返回视频信息
if video_info:
return {'videos': [video_info]}
return None
# 提取JSON字符串
json_str = match.group(1)
# 处理JavaScript中的undefined值Python JSON不支持undefined
json_str = re.sub(r'\bundefined\b', 'null', json_str)
# 解析JSON
initial_state = json.loads(json_str)
# 提取笔记数据
note_data = self.extract_note_data(initial_state)
# 如果提取到视频信息,添加到笔记数据中
if video_info and note_data:
if 'videos' not in note_data or not note_data['videos']:
note_data['videos'] = []
note_data['videos'].append(video_info)
tab.close()
return note_data
def download_video(self, url):
page = SessionPage()
page.download('https://sns-video-hw.xhscdn.com/stream/110/258/01e6cd08be6e36ad010370019190eceaac_258.mp4')
def download_image(self, url, name):
"""
下载图片文件
Args:
url: 图片URL
save_path: 保存路径如果为None则使用URL中的文件名
"""
# 设置请求头
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control': 'no-cache',
'DNT': '1',
'Pragma': 'no-cache',
'Proxy-Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0'
}
try:
# 发送请求verify=False 相当于 curl 的 --insecure
response = requests.get(url, headers=headers, verify=False, timeout=30)
response.raise_for_status() # 检查HTTP错误
# 保存文件
with open(f"{name}.webp", 'wb') as f:
f.write(response.content)
return True
except requests.exceptions.RequestException as e:
print(f"下载失败: {e}")
return None
def action(self):
self.create_page()
datas = self.get_page_datas()
if not datas:
logger.error("未获取到页面数据,无法继续")
return
self.page.get(url="https://mcn.pinduoduo.com/register")
for i in range(5):
if self.page.ele("x://*[text()='登录']", timeout=5):
logger.warning("请登录》》》")
else:
break
else:
logger.error("未登录!!!")
return
self.page.ele("x://*[text()='主播/作者管理']").click()
time.sleep(1)
self.page.ele("x://*[text()='签约主播/作者']").click()
ele = self.page.ele("x://*[text()='我知道了']", timeout=3)
if ele:
ele.click()
time.sleep(1)
self.page.ele('x://*[@placeholder="输入主播/作者ID搜索"]').input(vals=self.user_id, clear=True)
time.sleep(1)
self.page.ele("x://*[text()='提交']").click()
time.sleep(1)
self.page.actions.move_to(ele_or_loc="x://*[text()='内容管理']")
time.sleep(1)
self.page.ele("x://*[text()='内容管理']").click()
time.sleep(3)
creator_tab = self.page.get_tab(url="home/creator/manage")
creator_tab.ele("x://*[text()='发布视频']").click()
# 下载文件
path_datas = []
if datas.get("videos"):
for i in datas.get("videos"):
self.download_video(url=i["url"])
# 解析URL
parsed_url = urlparse(i["url"])
# 获取路径部分
path = parsed_url.path
# 从路径中提取文件名
filename = os.path.basename(path)
path_datas.append(filename)
creator_tab.ele("x://*[text()='发布视频']").click.to_upload(
path_datas)
else:
for _, i in enumerate(datas.get("images")):
self.download_image(url=i["url"], name=_)
path_datas.append(f"{_}.webp")
creator_tab.ele('x://*[text()="添加图片"]').click.to_upload(
path_datas
)
time.sleep(3)
creator_tab.ele('x://*[@placeholder="添加标题"]').input(vals=datas.get("title", ""), clear=True)
time.sleep(3)
xpath_path = creator_tab.ele('x://*[text()="添加视频描述"]').xpath
# 方法2使用正则表达式替换最后一个div[1]
new_path = re.sub(r'div\[1\]$', 'div[2]', xpath_path)
new_path += "/div/div[3]/div/div/div"
desc_text = (datas.get("desc") or "").replace("[话题]", "")[:450]
creator_tab.ele(f'x:{new_path}').input(vals=desc_text, clear=True)
# 定时
if self.time_start:
# 点击"定时发布"选项
creator_tab.ele(
'x://*[@id="root"]/section/section/main/div/div/div/div[2]/div[2]/div/div[1]/div/div[2]/div/div[3]/div/div/div/label[2]').click()
time.sleep(1)
# 获取日期选择器元素
date_picker_ele = creator_tab.ele('x://*[@placeholder="选择日期"]', timeout=3)
if date_picker_ele:
# 解析时间字符串格式2026-01-15 09:30:00
try:
from datetime import datetime
dt = datetime.strptime(self.time_start, "%Y-%m-%d %H:%M:%S")
date_str = dt.strftime("%Y-%m-%d")
time_str = dt.strftime("%H:%M:%S")
year = dt.year
month = dt.month
day = dt.day
hour = dt.hour
minute = dt.minute
second = dt.second
logger.info(
f"开始设置定时时间: {self.time_start} (年={year}, 月={month}, 日={day}, 时={hour}, 分={minute}, 秒={second})")
# 点击日期选择器打开面板
date_picker_ele.click()
time.sleep(1.5) # 等待面板完全加载
# 方法:通过点击日期和时间选择器来设置
# 1. 如果需要,先切换年月
# 2. 点击日期单元格
# 3. 点击时间选择器中的小时、分钟、秒
# 4. 点击确认按钮
# 检查并切换年月(如果需要)
# 获取当前显示的月份
try:
month_text_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]', timeout=2)
if month_text_ele:
current_month = month_text_ele.text
logger.info(f"当前显示的月份: {current_month}")
# 如果需要切换月份
target_month_str = f"{month}"
if current_month != target_month_str:
logger.info(f"需要切换到目标月份: {target_month_str}")
# 计算月份差值(简化处理,只考虑同一年内)
current_month_num = int(current_month.replace('', ''))
target_month_num = month
# 确定点击方向
# 限定在日期选择器内,用 class 定位RPR_right 仅日历右箭头有)
date_root = 'x://div[@data-testid="beast-core-datePicker-dropdown-contentRoot"]'
if target_month_num > current_month_num:
arrow_selector = f'{date_root}//*[contains(@class,"RPR_iconPrevNext") and contains(@class,"RPR_right")]'
clicks_needed = target_month_num - current_month_num
else:
arrow_selector = f'{date_root}//*[contains(@class,"RPR_iconPrevNext") and not(contains(@class,"RPR_right"))]'
clicks_needed = current_month_num - target_month_num
# 点击箭头切换月份
for _ in range(min(clicks_needed, 12)):
arrow = creator_tab.ele(arrow_selector, timeout=1)
if arrow:
try:
arrow.click()
except Exception:
pass
time.sleep(0.5)
# 验证是否切换成功
new_month_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]',
timeout=1)
if new_month_ele and new_month_ele.text == target_month_str:
logger.info(f"成功切换到目标月份: {target_month_str}")
break
except Exception as e:
logger.warning(f"切换月份时出错: {e},继续尝试选择日期")
# 选择日期 - 点击对应的日期单元格
date_cell = creator_tab.ele(
f'x://td[@role="date-cell"]//div[@title="{day}" and not(contains(@class, "RPR_disabled")) and not(contains(@class, "RPR_outOfMonth"))]',
timeout=3)
if date_cell:
date_cell.click()
logger.info(f"已点击日期: {day}")
time.sleep(0.5)
else:
logger.warning(f"未找到日期单元格: {day}")
# 先点击时间输入框打开时间选择器
time_input = creator_tab.ele('x://input[@data-testid="beast-core-timePicker-html-input"]',
timeout=3)
if time_input:
time_input.click()
logger.info("已点击时间输入框,打开时间选择器")
time.sleep(0.8) # 等待时间选择器面板打开
else:
logger.warning("未找到时间输入框尝试使用XPath")
# 备用方案使用用户提供的XPath
try:
time_input_xpath = '/html/body/div[2]/div/div/div/div/div/footer/div/div/div/div/div/div/div/div[1]/input'
time_input = creator_tab.ele(f'x:{time_input_xpath}', timeout=2)
if time_input:
time_input.click()
logger.info("通过XPath点击了时间输入框")
time.sleep(0.8)
except Exception as e:
logger.warning(f"通过XPath也未能找到时间输入框: {e}")
# 选择时间 - 点击时间选择器中的小时、分钟、秒
# 小时
hour_str = f"{hour:02d}"
hour_item = creator_tab.ele(
f'x://ul[@data-testid="beast-core-timePicker-list-hh"]//li[text()="{hour_str}"]', timeout=3)
if hour_item:
hour_item.scroll.to_see()
time.sleep(0.2)
hour_item.click()
logger.info(f"已选择小时: {hour_str}")
time.sleep(0.3)
else:
logger.warning(f"未找到小时选项: {hour_str}")
# 分钟
minute_str = f"{minute:02d}"
minute_item = creator_tab.ele(
f'x://ul[@data-testid="beast-core-timePicker-list-mm"]//li[text()="{minute_str}"]', timeout=3)
if minute_item:
minute_item.scroll.to_see()
time.sleep(0.2)
minute_item.click()
logger.info(f"已选择分钟: {minute_str}")
time.sleep(0.3)
else:
logger.warning(f"未找到分钟选项: {minute_str}")
# 秒
second_str = f"{second:02d}"
second_item = creator_tab.ele(
f'x://ul[@data-testid="beast-core-timePicker-list-ss"]//li[text()="{second_str}"]', timeout=3)
if second_item:
second_item.scroll.to_see()
time.sleep(0.2)
second_item.click()
logger.info(f"已选择秒: {second_str}")
time.sleep(0.3)
else:
logger.warning(f"未找到秒选项: {second_str}")
# 点击确认按钮
try:
# 查找确认按钮
confirm_btn = creator_tab.ele(
'x://button[@data-testid="beast-core-button"]//span[text()="确认"]', timeout=3)
if confirm_btn:
confirm_btn.click()
logger.info("已点击确认按钮")
time.sleep(0.5)
else:
# 尝试通过JavaScript点击确认按钮
confirm_js = """
(function() {
const buttons = document.querySelectorAll('button[data-testid="beast-core-button"]');
for (let btn of buttons) {
const span = btn.querySelector('span');
if (span && span.textContent.includes('确认')) {
btn.click();
return true;
}
}
return false;
})();
"""
result = creator_tab.run_js(confirm_js)
if result:
logger.info("通过JavaScript点击了确认按钮")
else:
logger.warning("未找到确认按钮")
time.sleep(0.5)
except Exception as e:
logger.warning(f"点击确认按钮失败: {e}")
# 验证设置是否成功
time.sleep(0.5)
check_js = """
(function() {
const dateInput = document.querySelector('[data-testid="beast-core-datePicker-htmlInput"]');
return dateInput ? dateInput.value : null;
})();
"""
final_value = creator_tab.run_js(check_js)
if final_value and final_value.strip():
logger.info(f"日期选择器当前值: {final_value}")
# 检查是否匹配(允许时间有小的差异,因为可能只精确到秒)
if final_value.strip().startswith(date_str):
logger.info(f"成功设置定时时间: {final_value}")
else:
logger.warning(f"设置的时间可能不准确,当前值: {final_value}, 期望日期: {date_str}")
else:
logger.error(f"无法获取日期选择器的值,可能设置失败")
except ValueError as e:
logger.error(f"时间格式错误: {self.time_start}, 正确格式应为: YYYY-MM-DD HH:MM:SS, 错误: {e}")
except Exception as e:
logger.error(f"设置定时时间失败: {e}")
import traceback
traceback.print_exc()
# 绑定任务
ele = creator_tab.ele('x://*[text()="点击绑定任务"]', timeout=3)
if ele:
ele.click()
creator_tab.ele('x://*[@placeholder="请输入个人主页链接"]').input(self.url)
time.sleep(1)
creator_tab.ele('x://*[text()="确认"]').click()
time.sleep(1)
ele = creator_tab.ele('x://*[text()="我已阅读并同意"]', timeout=3)
if ele:
ele.click()
time.sleep(1)
creator_tab.ele('x://*[text()="一键发布"]').click()
time.sleep(5)
creator_tab.close()
if __name__ == '__main__':
url = "https://www.xiaohongshu.com/explore/623d36d70000000001026733?xsec_token=ABhhM2ncuuuXOXUkG3YWI5ygMg2uLj9K1IYSxXyKARs3E=&xsec_source=pc_user"
pdd = Pdd(
url=url,
user_id="1050100241",
time_start="2026-01-28 09:30:00",
)
pdd.action()