import os import re import json import time from urllib.parse import urlparse from loguru import logger from bs4 import BeautifulSoup from curl_cffi import requests from DrissionPage import ChromiumPage, ChromiumOptions, SessionPage class Pdd: def __init__(self, url, user_id, time_start): self.url = url self.user_id = user_id self.time_start = time_start self.session = requests.Session() # 浏览器和URL模板 self.page = None self.user_url_template = None # 用户视频列表URL模板 self.user_profile_url_template = None # 用户信息URL模板 def create_page(self): co = ChromiumOptions() co.set_tmp_path("user/tmp") co.set_user_data_path("user/user_data") # 以该配置创建页面对象 self.page = ChromiumPage(addr_or_opts=co) def extract_note_data(self, initial_state): """ 从初始状态中提取笔记数据(只提取标题、描述、图片列表、视频列表和话题) Args: initial_state: window.__INITIAL_STATE__ 解析后的字典 Returns: dict: 提取的笔记数据 """ try: # 获取笔记详情 note_store = initial_state.get('note', {}) note_detail_map = note_store.get('noteDetailMap', {}) # 获取第一个笔记ID first_note_id = note_store.get('firstNoteId') if not first_note_id: # 如果没有firstNoteId,尝试获取noteDetailMap中的第一个key if note_detail_map: first_note_id = list(note_detail_map.keys())[0] else: print("未找到笔记ID") return None # 获取笔记详情 note_detail = note_detail_map.get(first_note_id, {}) note_info = note_detail.get('note', {}) if not note_info: print("未找到笔记信息") return None # 只提取需要的字段 extracted_data = { 'title': note_info.get('title'), 'desc': note_info.get('desc'), 'images': [], 'videos': [], 'topics': [] } # 提取图片信息 image_list = note_info.get('imageList', []) for img in image_list: image_data = { 'url': img.get('urlDefault') or img.get('url'), 'urlPre': img.get('urlPre'), 'width': img.get('width'), 'height': img.get('height'), } extracted_data['images'].append(image_data) # 提取视频信息(如果存在) video_info = note_info.get('video', {}) if video_info: video_data = {} # 尝试提取视频URL media = video_info.get('media', {}) if media: stream = media.get('stream', {}) if stream: hls = stream.get('hls', {}) if hls: video_data['url'] = hls.get('masterUrl') or hls.get('url') # 如果没有hls,尝试其他字段 if not video_data.get('url'): video_data['url'] = media.get('url') or media.get('videoUrl') # 提取视频封面 if video_info.get('cover'): video_data['cover'] = video_info.get('cover') # 提取视频时长 if video_info.get('time'): video_data['time'] = video_info.get('time') if video_data.get('url'): extracted_data['videos'].append(video_data) # 提取话题信息 # 话题可能在多个位置,尝试不同的字段名 topic_list = note_info.get('topicList', []) or note_info.get('tagList', []) or note_info.get('hashtagList', []) if topic_list: for topic in topic_list: topic_data = { 'name': topic.get('name') or topic.get('title') or topic.get('tagName'), 'id': topic.get('id') or topic.get('topicId') or topic.get('tagId'), } if topic_data.get('name'): extracted_data['topics'].append(topic_data) # 如果描述中包含话题(#话题#格式),也提取出来 desc = note_info.get('desc', '') if desc: # 使用正则表达式提取 #话题# 格式 topic_pattern = r'#([^#]+)#' matches = re.findall(topic_pattern, desc) for match in matches: # 避免重复添加 if not any(t.get('name') == match for t in extracted_data['topics']): extracted_data['topics'].append({'name': match}) return extracted_data except Exception as e: print(f"提取笔记数据时出错:{e}") import traceback traceback.print_exc() return None def extract_video_from_meta(self, html_content): """ 从HTML的meta标签中提取视频信息 Args: html_content: HTML内容字符串 Returns: dict: 视频信息字典,如果没有找到则返回None """ try: soup = BeautifulSoup(html_content, 'html.parser') video_info = {} # 提取og:video标签 og_video = soup.find('meta', {'name': 'og:video'}) if og_video and og_video.get('content'): video_info['url'] = og_video.get('content') # 提取视频时长 og_videotime = soup.find('meta', {'name': 'og:videotime'}) if og_videotime and og_videotime.get('content'): video_info['time'] = og_videotime.get('content') # 提取视频质量 og_videoquality = soup.find('meta', {'name': 'og:videoquality'}) if og_videoquality and og_videoquality.get('content'): video_info['quality'] = og_videoquality.get('content') # 如果找到了视频URL,返回视频信息 if video_info.get('url'): return video_info return None except Exception as e: print(f"从meta标签提取视频信息时出错:{e}") return None def get_page_datas(self): tab = self.page.new_tab() tab.listen.start(self.url) tab.get(url=self.url) res = tab.listen.wait(timeout=3) if res: print(res.response.body) # 提取meta标签中的视频信息 video_info = self.extract_video_from_meta(res.response.body) # 使用正则表达式提取window.__INITIAL_STATE__的内容 pattern = r'' match = re.search(pattern, res.response.body, re.DOTALL) if not match: print("未找到 window.__INITIAL_STATE__ 数据") # 如果只有视频信息,返回视频信息 if video_info: return {'videos': [video_info]} return None # 提取JSON字符串 json_str = match.group(1) # 处理JavaScript中的undefined值(Python JSON不支持undefined) json_str = re.sub(r'\bundefined\b', 'null', json_str) # 解析JSON initial_state = json.loads(json_str) # 提取笔记数据 note_data = self.extract_note_data(initial_state) # 如果提取到视频信息,添加到笔记数据中 if video_info and note_data: if 'videos' not in note_data or not note_data['videos']: note_data['videos'] = [] note_data['videos'].append(video_info) tab.close() return note_data def download_video(self, url): page = SessionPage() page.download('https://sns-video-hw.xhscdn.com/stream/110/258/01e6cd08be6e36ad010370019190eceaac_258.mp4') def download_image(self, url, name): """ 下载图片文件 Args: url: 图片URL save_path: 保存路径,如果为None则使用URL中的文件名 """ # 设置请求头 headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'Cache-Control': 'no-cache', 'DNT': '1', 'Pragma': 'no-cache', 'Proxy-Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0' } try: # 发送请求,verify=False 相当于 curl 的 --insecure response = requests.get(url, headers=headers, verify=False, timeout=30) response.raise_for_status() # 检查HTTP错误 # 保存文件 with open(f"{name}.webp", 'wb') as f: f.write(response.content) return True except requests.exceptions.RequestException as e: print(f"下载失败: {e}") return None def action(self): self.create_page() datas = self.get_page_datas() if not datas: logger.error("未获取到页面数据,无法继续") return self.page.get(url="https://mcn.pinduoduo.com/register") for i in range(5): if self.page.ele("x://*[text()='登录']", timeout=5): logger.warning("请登录》》》") else: break else: logger.error("未登录!!!") return self.page.ele("x://*[text()='主播/作者管理']").click() time.sleep(1) self.page.ele("x://*[text()='签约主播/作者']").click() ele = self.page.ele("x://*[text()='我知道了']", timeout=3) if ele: ele.click() time.sleep(1) self.page.ele('x://*[@placeholder="输入主播/作者ID搜索"]').input(vals=self.user_id, clear=True) time.sleep(1) self.page.ele("x://*[text()='提交']").click() time.sleep(1) self.page.actions.move_to(ele_or_loc="x://*[text()='内容管理']") time.sleep(1) self.page.ele("x://*[text()='内容管理']").click() time.sleep(3) creator_tab = self.page.get_tab(url="home/creator/manage") creator_tab.ele("x://*[text()='发布视频']").click() # 下载文件 path_datas = [] if datas.get("videos"): for i in datas.get("videos"): self.download_video(url=i["url"]) # 解析URL parsed_url = urlparse(i["url"]) # 获取路径部分 path = parsed_url.path # 从路径中提取文件名 filename = os.path.basename(path) path_datas.append(filename) creator_tab.ele("x://*[text()='发布视频']").click.to_upload( path_datas) else: for _, i in enumerate(datas.get("images")): self.download_image(url=i["url"], name=_) path_datas.append(f"{_}.webp") creator_tab.ele('x://*[text()="添加图片"]').click.to_upload( path_datas ) time.sleep(3) creator_tab.ele('x://*[@placeholder="添加标题"]').input(vals=datas.get("title", ""), clear=True) time.sleep(3) xpath_path = creator_tab.ele('x://*[text()="添加视频描述"]').xpath # 方法2:使用正则表达式替换最后一个div[1] new_path = re.sub(r'div\[1\]$', 'div[2]', xpath_path) new_path += "/div/div[3]/div/div/div" desc_text = (datas.get("desc") or "").replace("[话题]", "")[:450] creator_tab.ele(f'x:{new_path}').input(vals=desc_text, clear=True) # 定时 if self.time_start: # 点击"定时发布"选项 creator_tab.ele( 'x://*[@id="root"]/section/section/main/div/div/div/div[2]/div[2]/div/div[1]/div/div[2]/div/div[3]/div/div/div/label[2]').click() time.sleep(1) # 获取日期选择器元素 date_picker_ele = creator_tab.ele('x://*[@placeholder="选择日期"]', timeout=3) if date_picker_ele: # 解析时间字符串,格式:2026-01-15 09:30:00 try: from datetime import datetime dt = datetime.strptime(self.time_start, "%Y-%m-%d %H:%M:%S") date_str = dt.strftime("%Y-%m-%d") time_str = dt.strftime("%H:%M:%S") year = dt.year month = dt.month day = dt.day hour = dt.hour minute = dt.minute second = dt.second logger.info( f"开始设置定时时间: {self.time_start} (年={year}, 月={month}, 日={day}, 时={hour}, 分={minute}, 秒={second})") # 点击日期选择器打开面板 date_picker_ele.click() time.sleep(1.5) # 等待面板完全加载 # 方法:通过点击日期和时间选择器来设置 # 1. 如果需要,先切换年月 # 2. 点击日期单元格 # 3. 点击时间选择器中的小时、分钟、秒 # 4. 点击确认按钮 # 检查并切换年月(如果需要) # 获取当前显示的月份 try: month_text_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]', timeout=2) if month_text_ele: current_month = month_text_ele.text logger.info(f"当前显示的月份: {current_month}") # 如果需要切换月份 target_month_str = f"{month}月" if current_month != target_month_str: logger.info(f"需要切换到目标月份: {target_month_str}") # 计算月份差值(简化处理,只考虑同一年内) current_month_num = int(current_month.replace('月', '')) target_month_num = month # 确定点击方向 # 限定在日期选择器内,用 class 定位(RPR_right 仅日历右箭头有) date_root = 'x://div[@data-testid="beast-core-datePicker-dropdown-contentRoot"]' if target_month_num > current_month_num: arrow_selector = f'{date_root}//*[contains(@class,"RPR_iconPrevNext") and contains(@class,"RPR_right")]' clicks_needed = target_month_num - current_month_num else: arrow_selector = f'{date_root}//*[contains(@class,"RPR_iconPrevNext") and not(contains(@class,"RPR_right"))]' clicks_needed = current_month_num - target_month_num # 点击箭头切换月份 for _ in range(min(clicks_needed, 12)): arrow = creator_tab.ele(arrow_selector, timeout=1) if arrow: try: arrow.click() except Exception: pass time.sleep(0.5) # 验证是否切换成功 new_month_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]', timeout=1) if new_month_ele and new_month_ele.text == target_month_str: logger.info(f"成功切换到目标月份: {target_month_str}") break except Exception as e: logger.warning(f"切换月份时出错: {e},继续尝试选择日期") # 选择日期 - 点击对应的日期单元格 date_cell = creator_tab.ele( f'x://td[@role="date-cell"]//div[@title="{day}" and not(contains(@class, "RPR_disabled")) and not(contains(@class, "RPR_outOfMonth"))]', timeout=3) if date_cell: date_cell.click() logger.info(f"已点击日期: {day}") time.sleep(0.5) else: logger.warning(f"未找到日期单元格: {day}") # 先点击时间输入框打开时间选择器 time_input = creator_tab.ele('x://input[@data-testid="beast-core-timePicker-html-input"]', timeout=3) if time_input: time_input.click() logger.info("已点击时间输入框,打开时间选择器") time.sleep(0.8) # 等待时间选择器面板打开 else: logger.warning("未找到时间输入框,尝试使用XPath") # 备用方案:使用用户提供的XPath try: time_input_xpath = '/html/body/div[2]/div/div/div/div/div/footer/div/div/div/div/div/div/div/div[1]/input' time_input = creator_tab.ele(f'x:{time_input_xpath}', timeout=2) if time_input: time_input.click() logger.info("通过XPath点击了时间输入框") time.sleep(0.8) except Exception as e: logger.warning(f"通过XPath也未能找到时间输入框: {e}") # 选择时间 - 点击时间选择器中的小时、分钟、秒 # 小时 hour_str = f"{hour:02d}" hour_item = creator_tab.ele( f'x://ul[@data-testid="beast-core-timePicker-list-hh"]//li[text()="{hour_str}"]', timeout=3) if hour_item: hour_item.scroll.to_see() time.sleep(0.2) hour_item.click() logger.info(f"已选择小时: {hour_str}") time.sleep(0.3) else: logger.warning(f"未找到小时选项: {hour_str}") # 分钟 minute_str = f"{minute:02d}" minute_item = creator_tab.ele( f'x://ul[@data-testid="beast-core-timePicker-list-mm"]//li[text()="{minute_str}"]', timeout=3) if minute_item: minute_item.scroll.to_see() time.sleep(0.2) minute_item.click() logger.info(f"已选择分钟: {minute_str}") time.sleep(0.3) else: logger.warning(f"未找到分钟选项: {minute_str}") # 秒 second_str = f"{second:02d}" second_item = creator_tab.ele( f'x://ul[@data-testid="beast-core-timePicker-list-ss"]//li[text()="{second_str}"]', timeout=3) if second_item: second_item.scroll.to_see() time.sleep(0.2) second_item.click() logger.info(f"已选择秒: {second_str}") time.sleep(0.3) else: logger.warning(f"未找到秒选项: {second_str}") # 点击确认按钮 try: # 查找确认按钮 confirm_btn = creator_tab.ele( 'x://button[@data-testid="beast-core-button"]//span[text()="确认"]', timeout=3) if confirm_btn: confirm_btn.click() logger.info("已点击确认按钮") time.sleep(0.5) else: # 尝试通过JavaScript点击确认按钮 confirm_js = """ (function() { const buttons = document.querySelectorAll('button[data-testid="beast-core-button"]'); for (let btn of buttons) { const span = btn.querySelector('span'); if (span && span.textContent.includes('确认')) { btn.click(); return true; } } return false; })(); """ result = creator_tab.run_js(confirm_js) if result: logger.info("通过JavaScript点击了确认按钮") else: logger.warning("未找到确认按钮") time.sleep(0.5) except Exception as e: logger.warning(f"点击确认按钮失败: {e}") # 验证设置是否成功 time.sleep(0.5) check_js = """ (function() { const dateInput = document.querySelector('[data-testid="beast-core-datePicker-htmlInput"]'); return dateInput ? dateInput.value : null; })(); """ final_value = creator_tab.run_js(check_js) if final_value and final_value.strip(): logger.info(f"日期选择器当前值: {final_value}") # 检查是否匹配(允许时间有小的差异,因为可能只精确到秒) if final_value.strip().startswith(date_str): logger.info(f"成功设置定时时间: {final_value}") else: logger.warning(f"设置的时间可能不准确,当前值: {final_value}, 期望日期: {date_str}") else: logger.error(f"无法获取日期选择器的值,可能设置失败") except ValueError as e: logger.error(f"时间格式错误: {self.time_start}, 正确格式应为: YYYY-MM-DD HH:MM:SS, 错误: {e}") except Exception as e: logger.error(f"设置定时时间失败: {e}") import traceback traceback.print_exc() # 绑定任务 ele = creator_tab.ele('x://*[text()="点击绑定任务"]', timeout=3) if ele: ele.click() creator_tab.ele('x://*[@placeholder="请输入个人主页链接"]').input(self.url) time.sleep(1) creator_tab.ele('x://*[text()="确认"]').click() time.sleep(1) ele = creator_tab.ele('x://*[text()="我已阅读并同意"]', timeout=3) if ele: ele.click() time.sleep(1) creator_tab.ele('x://*[text()="一键发布"]').click() time.sleep(5) creator_tab.close() if __name__ == '__main__': url = "https://www.xiaohongshu.com/explore/623d36d70000000001026733?xsec_token=ABhhM2ncuuuXOXUkG3YWI5ygMg2uLj9K1IYSxXyKARs3E=&xsec_source=pc_user" pdd = Pdd( url=url, user_id="1050100241", time_start="2026-01-28 09:30:00", ) pdd.action()