2026-01-14 18:30:41 +08:00
|
|
|
|
import os
|
|
|
|
|
|
import re
|
|
|
|
|
|
import json
|
2026-01-15 00:13:39 +08:00
|
|
|
|
import time
|
2026-01-14 18:30:41 +08:00
|
|
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
|
|
|
|
|
|
from loguru import logger
|
|
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
|
from curl_cffi import requests
|
|
|
|
|
|
from DrissionPage import ChromiumPage, ChromiumOptions, SessionPage
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Pdd:
|
2026-01-15 00:13:39 +08:00
|
|
|
|
def __init__(self, url, user_id, time_start):
|
2026-01-14 18:30:41 +08:00
|
|
|
|
self.url = url
|
2026-01-15 00:13:39 +08:00
|
|
|
|
self.user_id = user_id
|
|
|
|
|
|
self.time_start = time_start
|
|
|
|
|
|
|
2026-01-14 18:30:41 +08:00
|
|
|
|
self.session = requests.Session()
|
|
|
|
|
|
|
|
|
|
|
|
# 浏览器和URL模板
|
|
|
|
|
|
self.page = None
|
|
|
|
|
|
self.user_url_template = None # 用户视频列表URL模板
|
|
|
|
|
|
self.user_profile_url_template = None # 用户信息URL模板
|
|
|
|
|
|
|
|
|
|
|
|
def create_page(self):
|
|
|
|
|
|
co = ChromiumOptions()
|
|
|
|
|
|
|
|
|
|
|
|
co.set_tmp_path("user/tmp")
|
|
|
|
|
|
co.set_user_data_path("user/user_data")
|
|
|
|
|
|
|
|
|
|
|
|
# 以该配置创建页面对象
|
|
|
|
|
|
self.page = ChromiumPage(addr_or_opts=co)
|
|
|
|
|
|
|
|
|
|
|
|
def extract_note_data(self, initial_state):
|
|
|
|
|
|
"""
|
|
|
|
|
|
从初始状态中提取笔记数据(只提取标题、描述、图片列表、视频列表和话题)
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
initial_state: window.__INITIAL_STATE__ 解析后的字典
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
dict: 提取的笔记数据
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 获取笔记详情
|
|
|
|
|
|
note_store = initial_state.get('note', {})
|
|
|
|
|
|
note_detail_map = note_store.get('noteDetailMap', {})
|
|
|
|
|
|
|
|
|
|
|
|
# 获取第一个笔记ID
|
|
|
|
|
|
first_note_id = note_store.get('firstNoteId')
|
|
|
|
|
|
if not first_note_id:
|
|
|
|
|
|
# 如果没有firstNoteId,尝试获取noteDetailMap中的第一个key
|
|
|
|
|
|
if note_detail_map:
|
|
|
|
|
|
first_note_id = list(note_detail_map.keys())[0]
|
|
|
|
|
|
else:
|
|
|
|
|
|
print("未找到笔记ID")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
# 获取笔记详情
|
|
|
|
|
|
note_detail = note_detail_map.get(first_note_id, {})
|
|
|
|
|
|
note_info = note_detail.get('note', {})
|
|
|
|
|
|
|
|
|
|
|
|
if not note_info:
|
|
|
|
|
|
print("未找到笔记信息")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
# 只提取需要的字段
|
|
|
|
|
|
extracted_data = {
|
|
|
|
|
|
'title': note_info.get('title'),
|
|
|
|
|
|
'desc': note_info.get('desc'),
|
|
|
|
|
|
'images': [],
|
|
|
|
|
|
'videos': [],
|
|
|
|
|
|
'topics': []
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
# 提取图片信息
|
|
|
|
|
|
image_list = note_info.get('imageList', [])
|
|
|
|
|
|
for img in image_list:
|
|
|
|
|
|
image_data = {
|
|
|
|
|
|
'url': img.get('urlDefault') or img.get('url'),
|
|
|
|
|
|
'urlPre': img.get('urlPre'),
|
|
|
|
|
|
'width': img.get('width'),
|
|
|
|
|
|
'height': img.get('height'),
|
|
|
|
|
|
}
|
|
|
|
|
|
extracted_data['images'].append(image_data)
|
|
|
|
|
|
|
|
|
|
|
|
# 提取视频信息(如果存在)
|
|
|
|
|
|
video_info = note_info.get('video', {})
|
|
|
|
|
|
if video_info:
|
|
|
|
|
|
video_data = {}
|
|
|
|
|
|
|
|
|
|
|
|
# 尝试提取视频URL
|
|
|
|
|
|
media = video_info.get('media', {})
|
|
|
|
|
|
if media:
|
|
|
|
|
|
stream = media.get('stream', {})
|
|
|
|
|
|
if stream:
|
|
|
|
|
|
hls = stream.get('hls', {})
|
|
|
|
|
|
if hls:
|
|
|
|
|
|
video_data['url'] = hls.get('masterUrl') or hls.get('url')
|
|
|
|
|
|
# 如果没有hls,尝试其他字段
|
|
|
|
|
|
if not video_data.get('url'):
|
|
|
|
|
|
video_data['url'] = media.get('url') or media.get('videoUrl')
|
|
|
|
|
|
|
|
|
|
|
|
# 提取视频封面
|
|
|
|
|
|
if video_info.get('cover'):
|
|
|
|
|
|
video_data['cover'] = video_info.get('cover')
|
|
|
|
|
|
|
|
|
|
|
|
# 提取视频时长
|
|
|
|
|
|
if video_info.get('time'):
|
|
|
|
|
|
video_data['time'] = video_info.get('time')
|
|
|
|
|
|
|
|
|
|
|
|
if video_data.get('url'):
|
|
|
|
|
|
extracted_data['videos'].append(video_data)
|
|
|
|
|
|
|
|
|
|
|
|
# 提取话题信息
|
|
|
|
|
|
# 话题可能在多个位置,尝试不同的字段名
|
|
|
|
|
|
topic_list = note_info.get('topicList', []) or note_info.get('tagList', []) or note_info.get('hashtagList',
|
|
|
|
|
|
[])
|
|
|
|
|
|
if topic_list:
|
|
|
|
|
|
for topic in topic_list:
|
|
|
|
|
|
topic_data = {
|
|
|
|
|
|
'name': topic.get('name') or topic.get('title') or topic.get('tagName'),
|
|
|
|
|
|
'id': topic.get('id') or topic.get('topicId') or topic.get('tagId'),
|
|
|
|
|
|
}
|
|
|
|
|
|
if topic_data.get('name'):
|
|
|
|
|
|
extracted_data['topics'].append(topic_data)
|
|
|
|
|
|
|
|
|
|
|
|
# 如果描述中包含话题(#话题#格式),也提取出来
|
|
|
|
|
|
desc = note_info.get('desc', '')
|
|
|
|
|
|
if desc:
|
|
|
|
|
|
# 使用正则表达式提取 #话题# 格式
|
|
|
|
|
|
topic_pattern = r'#([^#]+)#'
|
|
|
|
|
|
matches = re.findall(topic_pattern, desc)
|
|
|
|
|
|
for match in matches:
|
|
|
|
|
|
# 避免重复添加
|
|
|
|
|
|
if not any(t.get('name') == match for t in extracted_data['topics']):
|
|
|
|
|
|
extracted_data['topics'].append({'name': match})
|
|
|
|
|
|
|
|
|
|
|
|
return extracted_data
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"提取笔记数据时出错:{e}")
|
|
|
|
|
|
import traceback
|
|
|
|
|
|
traceback.print_exc()
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def extract_video_from_meta(self, html_content):
|
|
|
|
|
|
"""
|
|
|
|
|
|
从HTML的meta标签中提取视频信息
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
html_content: HTML内容字符串
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
dict: 视频信息字典,如果没有找到则返回None
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
|
|
video_info = {}
|
|
|
|
|
|
|
|
|
|
|
|
# 提取og:video标签
|
|
|
|
|
|
og_video = soup.find('meta', {'name': 'og:video'})
|
|
|
|
|
|
if og_video and og_video.get('content'):
|
|
|
|
|
|
video_info['url'] = og_video.get('content')
|
|
|
|
|
|
|
|
|
|
|
|
# 提取视频时长
|
|
|
|
|
|
og_videotime = soup.find('meta', {'name': 'og:videotime'})
|
|
|
|
|
|
if og_videotime and og_videotime.get('content'):
|
|
|
|
|
|
video_info['time'] = og_videotime.get('content')
|
|
|
|
|
|
|
|
|
|
|
|
# 提取视频质量
|
|
|
|
|
|
og_videoquality = soup.find('meta', {'name': 'og:videoquality'})
|
|
|
|
|
|
if og_videoquality and og_videoquality.get('content'):
|
|
|
|
|
|
video_info['quality'] = og_videoquality.get('content')
|
|
|
|
|
|
|
|
|
|
|
|
# 如果找到了视频URL,返回视频信息
|
|
|
|
|
|
if video_info.get('url'):
|
|
|
|
|
|
return video_info
|
|
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"从meta标签提取视频信息时出错:{e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def get_page_datas(self):
|
|
|
|
|
|
tab = self.page.new_tab()
|
|
|
|
|
|
tab.listen.start(self.url)
|
|
|
|
|
|
|
|
|
|
|
|
tab.get(url=self.url)
|
|
|
|
|
|
|
|
|
|
|
|
res = tab.listen.wait(timeout=3)
|
|
|
|
|
|
if res:
|
|
|
|
|
|
print(res.response.body)
|
|
|
|
|
|
|
|
|
|
|
|
# 提取meta标签中的视频信息
|
|
|
|
|
|
video_info = self.extract_video_from_meta(res.response.body)
|
|
|
|
|
|
|
|
|
|
|
|
# 使用正则表达式提取window.__INITIAL_STATE__的内容
|
|
|
|
|
|
pattern = r'<script>window\.__INITIAL_STATE__\s*=\s*({.*?});?\s*</script>'
|
|
|
|
|
|
match = re.search(pattern, res.response.body, re.DOTALL)
|
|
|
|
|
|
|
|
|
|
|
|
if not match:
|
|
|
|
|
|
print("未找到 window.__INITIAL_STATE__ 数据")
|
|
|
|
|
|
# 如果只有视频信息,返回视频信息
|
|
|
|
|
|
if video_info:
|
|
|
|
|
|
return {'videos': [video_info]}
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
# 提取JSON字符串
|
|
|
|
|
|
json_str = match.group(1)
|
|
|
|
|
|
|
|
|
|
|
|
# 处理JavaScript中的undefined值(Python JSON不支持undefined)
|
|
|
|
|
|
json_str = re.sub(r'\bundefined\b', 'null', json_str)
|
|
|
|
|
|
|
|
|
|
|
|
# 解析JSON
|
|
|
|
|
|
initial_state = json.loads(json_str)
|
|
|
|
|
|
|
|
|
|
|
|
# 提取笔记数据
|
|
|
|
|
|
note_data = self.extract_note_data(initial_state)
|
|
|
|
|
|
|
|
|
|
|
|
# 如果提取到视频信息,添加到笔记数据中
|
|
|
|
|
|
if video_info and note_data:
|
|
|
|
|
|
if 'videos' not in note_data or not note_data['videos']:
|
|
|
|
|
|
note_data['videos'] = []
|
|
|
|
|
|
note_data['videos'].append(video_info)
|
|
|
|
|
|
|
2026-01-15 00:17:21 +08:00
|
|
|
|
tab.close()
|
|
|
|
|
|
|
2026-01-14 18:30:41 +08:00
|
|
|
|
return note_data
|
|
|
|
|
|
|
|
|
|
|
|
def download_video(self, url):
|
|
|
|
|
|
page = SessionPage()
|
|
|
|
|
|
page.download('https://sns-video-hw.xhscdn.com/stream/110/258/01e6cd08be6e36ad010370019190eceaac_258.mp4')
|
|
|
|
|
|
|
|
|
|
|
|
def download_image(self, url, name):
|
|
|
|
|
|
"""
|
|
|
|
|
|
下载图片文件
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
url: 图片URL
|
|
|
|
|
|
save_path: 保存路径,如果为None则使用URL中的文件名
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 设置请求头
|
|
|
|
|
|
headers = {
|
|
|
|
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
|
|
|
|
|
|
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
|
|
|
|
|
|
'Cache-Control': 'no-cache',
|
|
|
|
|
|
'DNT': '1',
|
|
|
|
|
|
'Pragma': 'no-cache',
|
|
|
|
|
|
'Proxy-Connection': 'keep-alive',
|
|
|
|
|
|
'Upgrade-Insecure-Requests': '1',
|
|
|
|
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0'
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 发送请求,verify=False 相当于 curl 的 --insecure
|
|
|
|
|
|
response = requests.get(url, headers=headers, verify=False, timeout=30)
|
|
|
|
|
|
response.raise_for_status() # 检查HTTP错误
|
|
|
|
|
|
|
|
|
|
|
|
# 保存文件
|
|
|
|
|
|
with open(f"{name}.webp", 'wb') as f:
|
|
|
|
|
|
f.write(response.content)
|
|
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
|
print(f"下载失败: {e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def action(self):
|
|
|
|
|
|
self.create_page()
|
|
|
|
|
|
|
|
|
|
|
|
datas = self.get_page_datas()
|
|
|
|
|
|
|
|
|
|
|
|
self.page.get(url="https://mcn.pinduoduo.com/register")
|
|
|
|
|
|
|
|
|
|
|
|
for i in range(5):
|
|
|
|
|
|
if self.page.ele("x://*[text()='登录']", timeout=5):
|
|
|
|
|
|
logger.warning("请登录》》》")
|
|
|
|
|
|
else:
|
|
|
|
|
|
break
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error("未登录!!!")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
self.page.ele("x://*[text()='主播/作者管理']").click()
|
2026-01-15 00:13:39 +08:00
|
|
|
|
time.sleep(1)
|
2026-01-14 18:30:41 +08:00
|
|
|
|
self.page.ele("x://*[text()='签约主播/作者']").click()
|
|
|
|
|
|
ele = self.page.ele("x://*[text()='我知道了']", timeout=3)
|
|
|
|
|
|
if ele:
|
|
|
|
|
|
ele.click()
|
2026-01-15 00:13:39 +08:00
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
self.page.ele('x://*[@placeholder="输入主播/作者ID搜索"]').input(vals=self.user_id, clear=True)
|
|
|
|
|
|
time.sleep(1)
|
2026-01-14 18:30:41 +08:00
|
|
|
|
self.page.ele("x://*[text()='提交']").click()
|
2026-01-15 00:13:39 +08:00
|
|
|
|
time.sleep(1)
|
2026-01-14 18:30:41 +08:00
|
|
|
|
self.page.actions.move_to(ele_or_loc="x://*[text()='内容管理']")
|
2026-01-15 00:13:39 +08:00
|
|
|
|
time.sleep(1)
|
2026-01-14 18:30:41 +08:00
|
|
|
|
self.page.ele("x://*[text()='内容管理']").click()
|
2026-01-15 00:13:39 +08:00
|
|
|
|
time.sleep(3)
|
2026-01-14 18:30:41 +08:00
|
|
|
|
creator_tab = self.page.get_tab(url="home/creator/manage")
|
|
|
|
|
|
creator_tab.ele("x://*[text()='发布视频']").click()
|
|
|
|
|
|
|
|
|
|
|
|
# 下载文件
|
|
|
|
|
|
path_datas = []
|
|
|
|
|
|
if datas.get("videos"):
|
|
|
|
|
|
for i in datas.get("videos"):
|
|
|
|
|
|
self.download_video(url=i["url"])
|
|
|
|
|
|
|
|
|
|
|
|
# 解析URL
|
|
|
|
|
|
parsed_url = urlparse(i["url"])
|
|
|
|
|
|
# 获取路径部分
|
|
|
|
|
|
path = parsed_url.path
|
|
|
|
|
|
# 从路径中提取文件名
|
|
|
|
|
|
filename = os.path.basename(path)
|
|
|
|
|
|
|
|
|
|
|
|
path_datas.append(filename)
|
|
|
|
|
|
creator_tab.ele("x://*[text()='发布视频']").click.to_upload(
|
|
|
|
|
|
path_datas)
|
|
|
|
|
|
else:
|
|
|
|
|
|
for _, i in enumerate(datas.get("images")):
|
|
|
|
|
|
self.download_image(url=i["url"], name=_)
|
|
|
|
|
|
|
|
|
|
|
|
path_datas.append(f"{_}.webp")
|
|
|
|
|
|
|
|
|
|
|
|
creator_tab.ele('x://*[text()="添加图片"]').click.to_upload(
|
|
|
|
|
|
path_datas
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2026-01-15 09:38:54 +08:00
|
|
|
|
time.sleep(3)
|
2026-01-14 18:30:41 +08:00
|
|
|
|
creator_tab.ele('x://*[@placeholder="添加标题"]').input(vals=datas["title"], clear=True)
|
2026-01-15 09:38:54 +08:00
|
|
|
|
time.sleep(3)
|
2026-01-14 18:30:41 +08:00
|
|
|
|
|
2026-01-15 00:13:39 +08:00
|
|
|
|
xpath_path = creator_tab.ele('x://*[text()="添加视频描述"]').xpath
|
|
|
|
|
|
# 方法2:使用正则表达式替换最后一个div[1]
|
|
|
|
|
|
new_path = re.sub(r'div\[1\]$', 'div[2]', xpath_path)
|
|
|
|
|
|
new_path += "/div/div[3]"
|
|
|
|
|
|
creator_tab.ele(f'x:{new_path}').input(vals=datas["desc"].replace("[话题]", "")[:450], clear=True)
|
|
|
|
|
|
|
|
|
|
|
|
# 定时
|
|
|
|
|
|
if self.time_start:
|
|
|
|
|
|
creator_tab.ele(
|
|
|
|
|
|
'x://*[@id="root"]/section/section/main/div/div/div/div[2]/div[2]/div/div[1]/div/div[2]/div/div[3]/div/div/div/label[2]').click()
|
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
creator_tab.ele('x://*[@placeholder="选择日期"]').run_js(
|
|
|
|
|
|
f"document.querySelector('[data-testid=\"beast-core-datePicker-htmlInput\"]').value = '{self.time_start}';")
|
|
|
|
|
|
|
|
|
|
|
|
# 绑定任务
|
|
|
|
|
|
ele = creator_tab.ele('x://*[text()="点击绑定任务"]', timeout=3)
|
|
|
|
|
|
if ele:
|
|
|
|
|
|
ele.click()
|
|
|
|
|
|
creator_tab.ele('x://*[@placeholder="请输入个人主页链接"]').input(self.url)
|
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
creator_tab.ele('x://*[text()="确认"]').click()
|
|
|
|
|
|
time.sleep(1)
|
2026-01-14 18:30:41 +08:00
|
|
|
|
|
2026-01-15 00:13:39 +08:00
|
|
|
|
ele = creator_tab.ele('x://*[text()="我已阅读并同意"]', timeout=3)
|
|
|
|
|
|
if ele:
|
|
|
|
|
|
ele.click()
|
|
|
|
|
|
time.sleep(1)
|
2026-01-14 18:30:41 +08:00
|
|
|
|
creator_tab.ele('x://*[text()="一键发布"]').click()
|
|
|
|
|
|
|
2026-01-15 00:13:39 +08:00
|
|
|
|
|
2026-01-14 18:30:41 +08:00
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
url = "https://www.xiaohongshu.com/explore/623d36d70000000001026733?xsec_token=ABhhM2ncuuuXOXUkG3YWI5ygMg2uLj9K1IYSxXyKARs3E=&xsec_source=pc_user"
|
2026-01-15 00:13:39 +08:00
|
|
|
|
pdd = Pdd(
|
|
|
|
|
|
url=url,
|
|
|
|
|
|
user_id="1050100241",
|
|
|
|
|
|
time_start="2026-01-15 09:30:00",
|
|
|
|
|
|
)
|
2026-01-14 18:30:41 +08:00
|
|
|
|
pdd.action()
|