Files
haha/自动化.py

377 lines
14 KiB
Python
Raw Normal View History

2026-01-14 18:30:41 +08:00
import os
import re
import json
2026-01-15 00:13:39 +08:00
import time
2026-01-14 18:30:41 +08:00
from urllib.parse import urlparse
from loguru import logger
from bs4 import BeautifulSoup
from curl_cffi import requests
from DrissionPage import ChromiumPage, ChromiumOptions, SessionPage
class Pdd:
2026-01-15 00:13:39 +08:00
def __init__(self, url, user_id, time_start):
2026-01-14 18:30:41 +08:00
self.url = url
2026-01-15 00:13:39 +08:00
self.user_id = user_id
self.time_start = time_start
2026-01-14 18:30:41 +08:00
self.session = requests.Session()
# 浏览器和URL模板
self.page = None
self.user_url_template = None # 用户视频列表URL模板
self.user_profile_url_template = None # 用户信息URL模板
def create_page(self):
co = ChromiumOptions()
co.set_tmp_path("user/tmp")
co.set_user_data_path("user/user_data")
# 以该配置创建页面对象
self.page = ChromiumPage(addr_or_opts=co)
def extract_note_data(self, initial_state):
"""
从初始状态中提取笔记数据只提取标题描述图片列表视频列表和话题
Args:
initial_state: window.__INITIAL_STATE__ 解析后的字典
Returns:
dict: 提取的笔记数据
"""
try:
# 获取笔记详情
note_store = initial_state.get('note', {})
note_detail_map = note_store.get('noteDetailMap', {})
# 获取第一个笔记ID
first_note_id = note_store.get('firstNoteId')
if not first_note_id:
# 如果没有firstNoteId尝试获取noteDetailMap中的第一个key
if note_detail_map:
first_note_id = list(note_detail_map.keys())[0]
else:
print("未找到笔记ID")
return None
# 获取笔记详情
note_detail = note_detail_map.get(first_note_id, {})
note_info = note_detail.get('note', {})
if not note_info:
print("未找到笔记信息")
return None
# 只提取需要的字段
extracted_data = {
'title': note_info.get('title'),
'desc': note_info.get('desc'),
'images': [],
'videos': [],
'topics': []
}
# 提取图片信息
image_list = note_info.get('imageList', [])
for img in image_list:
image_data = {
'url': img.get('urlDefault') or img.get('url'),
'urlPre': img.get('urlPre'),
'width': img.get('width'),
'height': img.get('height'),
}
extracted_data['images'].append(image_data)
# 提取视频信息(如果存在)
video_info = note_info.get('video', {})
if video_info:
video_data = {}
# 尝试提取视频URL
media = video_info.get('media', {})
if media:
stream = media.get('stream', {})
if stream:
hls = stream.get('hls', {})
if hls:
video_data['url'] = hls.get('masterUrl') or hls.get('url')
# 如果没有hls尝试其他字段
if not video_data.get('url'):
video_data['url'] = media.get('url') or media.get('videoUrl')
# 提取视频封面
if video_info.get('cover'):
video_data['cover'] = video_info.get('cover')
# 提取视频时长
if video_info.get('time'):
video_data['time'] = video_info.get('time')
if video_data.get('url'):
extracted_data['videos'].append(video_data)
# 提取话题信息
# 话题可能在多个位置,尝试不同的字段名
topic_list = note_info.get('topicList', []) or note_info.get('tagList', []) or note_info.get('hashtagList',
[])
if topic_list:
for topic in topic_list:
topic_data = {
'name': topic.get('name') or topic.get('title') or topic.get('tagName'),
'id': topic.get('id') or topic.get('topicId') or topic.get('tagId'),
}
if topic_data.get('name'):
extracted_data['topics'].append(topic_data)
# 如果描述中包含话题(#话题#格式),也提取出来
desc = note_info.get('desc', '')
if desc:
# 使用正则表达式提取 #话题# 格式
topic_pattern = r'#([^#]+)#'
matches = re.findall(topic_pattern, desc)
for match in matches:
# 避免重复添加
if not any(t.get('name') == match for t in extracted_data['topics']):
extracted_data['topics'].append({'name': match})
return extracted_data
except Exception as e:
print(f"提取笔记数据时出错:{e}")
import traceback
traceback.print_exc()
return None
def extract_video_from_meta(self, html_content):
"""
从HTML的meta标签中提取视频信息
Args:
html_content: HTML内容字符串
Returns:
dict: 视频信息字典如果没有找到则返回None
"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
video_info = {}
# 提取og:video标签
og_video = soup.find('meta', {'name': 'og:video'})
if og_video and og_video.get('content'):
video_info['url'] = og_video.get('content')
# 提取视频时长
og_videotime = soup.find('meta', {'name': 'og:videotime'})
if og_videotime and og_videotime.get('content'):
video_info['time'] = og_videotime.get('content')
# 提取视频质量
og_videoquality = soup.find('meta', {'name': 'og:videoquality'})
if og_videoquality and og_videoquality.get('content'):
video_info['quality'] = og_videoquality.get('content')
# 如果找到了视频URL返回视频信息
if video_info.get('url'):
return video_info
return None
except Exception as e:
print(f"从meta标签提取视频信息时出错{e}")
return None
def get_page_datas(self):
tab = self.page.new_tab()
tab.listen.start(self.url)
tab.get(url=self.url)
res = tab.listen.wait(timeout=3)
if res:
print(res.response.body)
# 提取meta标签中的视频信息
video_info = self.extract_video_from_meta(res.response.body)
# 使用正则表达式提取window.__INITIAL_STATE__的内容
pattern = r'<script>window\.__INITIAL_STATE__\s*=\s*({.*?});?\s*</script>'
match = re.search(pattern, res.response.body, re.DOTALL)
if not match:
print("未找到 window.__INITIAL_STATE__ 数据")
# 如果只有视频信息,返回视频信息
if video_info:
return {'videos': [video_info]}
return None
# 提取JSON字符串
json_str = match.group(1)
# 处理JavaScript中的undefined值Python JSON不支持undefined
json_str = re.sub(r'\bundefined\b', 'null', json_str)
# 解析JSON
initial_state = json.loads(json_str)
# 提取笔记数据
note_data = self.extract_note_data(initial_state)
# 如果提取到视频信息,添加到笔记数据中
if video_info and note_data:
if 'videos' not in note_data or not note_data['videos']:
note_data['videos'] = []
note_data['videos'].append(video_info)
2026-01-15 00:17:21 +08:00
tab.close()
2026-01-14 18:30:41 +08:00
return note_data
def download_video(self, url):
page = SessionPage()
page.download('https://sns-video-hw.xhscdn.com/stream/110/258/01e6cd08be6e36ad010370019190eceaac_258.mp4')
def download_image(self, url, name):
"""
下载图片文件
Args:
url: 图片URL
save_path: 保存路径如果为None则使用URL中的文件名
"""
# 设置请求头
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control': 'no-cache',
'DNT': '1',
'Pragma': 'no-cache',
'Proxy-Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0'
}
try:
# 发送请求verify=False 相当于 curl 的 --insecure
response = requests.get(url, headers=headers, verify=False, timeout=30)
response.raise_for_status() # 检查HTTP错误
# 保存文件
with open(f"{name}.webp", 'wb') as f:
f.write(response.content)
return True
except requests.exceptions.RequestException as e:
print(f"下载失败: {e}")
return None
def action(self):
self.create_page()
datas = self.get_page_datas()
self.page.get(url="https://mcn.pinduoduo.com/register")
for i in range(5):
if self.page.ele("x://*[text()='登录']", timeout=5):
logger.warning("请登录》》》")
else:
break
else:
logger.error("未登录!!!")
return
self.page.ele("x://*[text()='主播/作者管理']").click()
2026-01-15 00:13:39 +08:00
time.sleep(1)
2026-01-14 18:30:41 +08:00
self.page.ele("x://*[text()='签约主播/作者']").click()
ele = self.page.ele("x://*[text()='我知道了']", timeout=3)
if ele:
ele.click()
2026-01-15 00:13:39 +08:00
time.sleep(1)
self.page.ele('x://*[@placeholder="输入主播/作者ID搜索"]').input(vals=self.user_id, clear=True)
time.sleep(1)
2026-01-14 18:30:41 +08:00
self.page.ele("x://*[text()='提交']").click()
2026-01-15 00:13:39 +08:00
time.sleep(1)
2026-01-14 18:30:41 +08:00
self.page.actions.move_to(ele_or_loc="x://*[text()='内容管理']")
2026-01-15 00:13:39 +08:00
time.sleep(1)
2026-01-14 18:30:41 +08:00
self.page.ele("x://*[text()='内容管理']").click()
2026-01-15 00:13:39 +08:00
time.sleep(3)
2026-01-14 18:30:41 +08:00
creator_tab = self.page.get_tab(url="home/creator/manage")
creator_tab.ele("x://*[text()='发布视频']").click()
# 下载文件
path_datas = []
if datas.get("videos"):
for i in datas.get("videos"):
self.download_video(url=i["url"])
# 解析URL
parsed_url = urlparse(i["url"])
# 获取路径部分
path = parsed_url.path
# 从路径中提取文件名
filename = os.path.basename(path)
path_datas.append(filename)
creator_tab.ele("x://*[text()='发布视频']").click.to_upload(
path_datas)
else:
for _, i in enumerate(datas.get("images")):
self.download_image(url=i["url"], name=_)
path_datas.append(f"{_}.webp")
creator_tab.ele('x://*[text()="添加图片"]').click.to_upload(
path_datas
)
2026-01-15 09:38:54 +08:00
time.sleep(3)
2026-01-14 18:30:41 +08:00
creator_tab.ele('x://*[@placeholder="添加标题"]').input(vals=datas["title"], clear=True)
2026-01-15 09:38:54 +08:00
time.sleep(3)
2026-01-14 18:30:41 +08:00
2026-01-15 00:13:39 +08:00
xpath_path = creator_tab.ele('x://*[text()="添加视频描述"]').xpath
# 方法2使用正则表达式替换最后一个div[1]
new_path = re.sub(r'div\[1\]$', 'div[2]', xpath_path)
2026-01-15 10:57:58 +08:00
new_path += "/div/div[3]/div/div/div"
2026-01-15 00:13:39 +08:00
creator_tab.ele(f'x:{new_path}').input(vals=datas["desc"].replace("[话题]", "")[:450], clear=True)
# 定时
if self.time_start:
creator_tab.ele(
'x://*[@id="root"]/section/section/main/div/div/div/div[2]/div[2]/div/div[1]/div/div[2]/div/div[3]/div/div/div/label[2]').click()
time.sleep(1)
creator_tab.ele('x://*[@placeholder="选择日期"]').run_js(
f"document.querySelector('[data-testid=\"beast-core-datePicker-htmlInput\"]').value = '{self.time_start}';")
# 绑定任务
ele = creator_tab.ele('x://*[text()="点击绑定任务"]', timeout=3)
if ele:
ele.click()
creator_tab.ele('x://*[@placeholder="请输入个人主页链接"]').input(self.url)
time.sleep(1)
creator_tab.ele('x://*[text()="确认"]').click()
time.sleep(1)
2026-01-14 18:30:41 +08:00
2026-01-15 00:13:39 +08:00
ele = creator_tab.ele('x://*[text()="我已阅读并同意"]', timeout=3)
if ele:
ele.click()
time.sleep(1)
2026-01-14 18:30:41 +08:00
creator_tab.ele('x://*[text()="一键发布"]').click()
2026-01-15 10:57:58 +08:00
time.sleep(5)
creator_tab.close()
2026-01-15 00:13:39 +08:00
2026-01-14 18:30:41 +08:00
if __name__ == '__main__':
url = "https://www.xiaohongshu.com/explore/623d36d70000000001026733?xsec_token=ABhhM2ncuuuXOXUkG3YWI5ygMg2uLj9K1IYSxXyKARs3E=&xsec_source=pc_user"
2026-01-15 00:13:39 +08:00
pdd = Pdd(
url=url,
user_id="1050100241",
time_start="2026-01-15 09:30:00",
)
2026-01-14 18:30:41 +08:00
pdd.action()