import os import re import json import time import threading from datetime import datetime from pathlib import Path from loguru import logger from bs4 import BeautifulSoup from curl_cffi import requests from DrissionPage import ChromiumPage, ChromiumOptions, SessionPage class ThreadSafeDict: """线程安全的字典包装类""" def __init__(self): self._dict = {} self._lock = threading.Lock() def get(self, key, default=None): with self._lock: return self._dict.get(key, default) def __setitem__(self, key, value): with self._lock: self._dict[key] = value def __getitem__(self, key): with self._lock: return self._dict[key] def __contains__(self, key): with self._lock: return key in self._dict class Pdd: def __init__(self, url, user_id, time_start, ht, index, title=None): self.url = url self.user_id = user_id self.time_start = time_start self.session = requests.Session() # 浏览器和URL模板 self.page = None self.user_url_template = None # 用户视频列表URL模板 self.user_profile_url_template = None # 用户信息URL模板 self.title = title self.ht = self._format_topic(ht) # 格式化话题 self.index = index def _parse_schedule_datetime(self, time_text): """Parse schedule time text into datetime. Return None if unparseable.""" if not time_text: return None if isinstance(time_text, datetime): return time_text raw = str(time_text).strip() if not raw: return None fmts = ( "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y/%m/%d %H:%M:%S", "%Y/%m/%d %H:%M", ) for fmt in fmts: try: return datetime.strptime(raw, fmt) except ValueError: continue try: iso_text = raw.replace("Z", "+00:00") return datetime.fromisoformat(iso_text) except ValueError: return None def _is_schedule_time_expired(self, time_text): """Return True when schedule time is earlier than current time.""" schedule_dt = self._parse_schedule_datetime(time_text) if not schedule_dt: return False now = datetime.now(schedule_dt.tzinfo) if schedule_dt.tzinfo else datetime.now() return schedule_dt < now def _format_topic(self, topic_str): """ 格式化话题:将"拍出氛围感-好看照片分享"转换为"#拍出氛围感 #好看照片分享" Args: topic_str: 原始话题字符串,可能包含"-"分隔符 Returns: 格式化后的话题字符串,每个部分前加"#" """ if not topic_str: return "" # 按"-"分割话题 parts = [part.strip() for part in topic_str.split("-") if part.strip()] # 为每个部分添加"#" formatted_parts = [] for part in parts: # 如果部分已经以"#"开头,不再添加 if part.startswith("#"): formatted_parts.append(part) else: formatted_parts.append(f"#{part}") # 用空格连接 return " ".join(formatted_parts) def create_page(self): co = ChromiumOptions() co.set_tmp_path("user/tmp") co.set_user_data_path("user/user_data") # 以该配置创建页面对象 self.page = ChromiumPage(addr_or_opts=co) def extract_note_data(self, initial_state): """ 从初始状态中提取笔记数据(只提取标题、描述、图片列表、视频列表和话题) Args: initial_state: window.__INITIAL_STATE__ 解析后的字典 Returns: dict: 提取的笔记数据 """ try: # 获取笔记详情 note_store = initial_state.get('note', {}) note_detail_map = note_store.get('noteDetailMap', {}) # 获取第一个笔记ID first_note_id = note_store.get('firstNoteId') if not first_note_id: # 如果没有firstNoteId,尝试获取noteDetailMap中的第一个key if note_detail_map: first_note_id = list(note_detail_map.keys())[0] else: print("未找到笔记ID") return None # 获取笔记详情 note_detail = note_detail_map.get(first_note_id, {}) note_info = note_detail.get('note', {}) if not note_info: print("未找到笔记信息") return None # 只提取需要的字段 extracted_data = { 'title': note_info.get('title'), 'desc': note_info.get('desc'), 'images': [], 'videos': [], 'topics': [] } # 提取图片信息 image_list = note_info.get('imageList', []) for img in image_list: image_data = { 'url': img.get('urlDefault') or img.get('url'), 'urlPre': img.get('urlPre'), 'width': img.get('width'), 'height': img.get('height'), } extracted_data['images'].append(image_data) # 提取视频信息(如果存在) video_info = note_info.get('video', {}) if video_info: video_data = {} # 尝试提取视频URL media = video_info.get('media', {}) if media: stream = media.get('stream', {}) if stream: hls = stream.get('hls', {}) if hls: video_data['url'] = hls.get('masterUrl') or hls.get('url') # 如果没有hls,尝试其他字段 if not video_data.get('url'): video_data['url'] = media.get('url') or media.get('videoUrl') # 提取视频封面 if video_info.get('cover'): video_data['cover'] = video_info.get('cover') # 提取视频时长 if video_info.get('time'): video_data['time'] = video_info.get('time') if video_data.get('url'): extracted_data['videos'].append(video_data) # 提取话题信息 # 话题可能在多个位置,尝试不同的字段名 topic_list = note_info.get('topicList', []) or note_info.get('tagList', []) or note_info.get('hashtagList', []) if topic_list: for topic in topic_list: topic_data = { 'name': topic.get('name') or topic.get('title') or topic.get('tagName'), 'id': topic.get('id') or topic.get('topicId') or topic.get('tagId'), } if topic_data.get('name'): extracted_data['topics'].append(topic_data) # 如果描述中包含话题(#话题#格式),也提取出来 desc = note_info.get('desc', '') if desc: # 使用正则表达式提取 #话题# 格式 topic_pattern = r'#([^#]+)#' matches = re.findall(topic_pattern, desc) for match in matches: # 避免重复添加 if not any(t.get('name') == match for t in extracted_data['topics']): extracted_data['topics'].append({'name': match}) return extracted_data except Exception as e: print(f"提取笔记数据时出错:{e}") import traceback traceback.print_exc() return None def extract_video_from_meta(self, html_content): """ 从HTML的meta标签中提取视频信息 Args: html_content: HTML内容字符串 Returns: dict: 视频信息字典,如果没有找到则返回None """ try: soup = BeautifulSoup(html_content, 'html.parser') video_info = {} # 提取og:video标签 og_video = soup.find('meta', {'name': 'og:video'}) if og_video and og_video.get('content'): video_info['url'] = og_video.get('content') # 提取视频时长 og_videotime = soup.find('meta', {'name': 'og:videotime'}) if og_videotime and og_videotime.get('content'): video_info['time'] = og_videotime.get('content') # 提取视频质量 og_videoquality = soup.find('meta', {'name': 'og:videoquality'}) if og_videoquality and og_videoquality.get('content'): video_info['quality'] = og_videoquality.get('content') # 如果找到了视频URL,返回视频信息 if video_info.get('url'): return video_info return None except Exception as e: print(f"从meta标签提取视频信息时出错:{e}") return None def get_page_datas(self): tab = self.page.new_tab() tab.listen.start(self.url) tab.get(url=self.url) res = tab.listen.wait(timeout=3) if res: print(res.response.body) # 提取meta标签中的视频信息 video_info = self.extract_video_from_meta(res.response.body) # 使用正则表达式提取window.__INITIAL_STATE__的内容 pattern = r'' match = re.search(pattern, res.response.body, re.DOTALL) if not match: print("未找到 window.__INITIAL_STATE__ 数据") # 如果只有视频信息,返回视频信息 if video_info: return {'videos': [video_info]} return None # 提取JSON字符串 json_str = match.group(1) # 处理JavaScript中的undefined值(Python JSON不支持undefined) json_str = re.sub(r'\bundefined\b', 'null', json_str) # 解析JSON initial_state = json.loads(json_str) # 提取笔记数据 note_data = self.extract_note_data(initial_state) # 如果提取到视频信息,添加到笔记数据中 if video_info and note_data: if 'videos' not in note_data or not note_data['videos']: note_data['videos'] = [] note_data['videos'].append(video_info) tab.close() return note_data def download_video(self, url): page = SessionPage() page.download(url) def download_image(self, url, name): """ 下载图片文件 Args: url: 图片URL save_path: 保存路径,如果为None则使用URL中的文件名 """ # 设置请求头 headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'Cache-Control': 'no-cache', 'DNT': '1', 'Pragma': 'no-cache', 'Proxy-Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0' } try: # 发送请求,verify=False 相当于 curl 的 --insecure response = requests.get(url, headers=headers, verify=False, timeout=30) response.raise_for_status() # 检查HTTP错误 # 保存文件 with open(f"{name}.webp", 'wb') as f: f.write(response.content) return True except requests.exceptions.RequestException as e: print(f"下载失败: {e}") return None def action(self, folder_path=None, collect_all_videos=False, prepared_video_files=None, prepared_image_folders=None): logger.info("=" * 50) logger.info("开始执行 action 方法(单个/批量上传模式)") logger.info(f"多多ID: {self.user_id}, 序号: {self.index}") logger.info(f"文件夹路径: {folder_path}") logger.info(f"批量上传模式: {collect_all_videos}") logger.info( f"使用预查找文件: 视频={len(prepared_video_files) if prepared_video_files else 0}, 图片文件夹={len(prepared_image_folders) if prepared_image_folders else 0}") logger.info("=" * 50) logger.info("步骤1: 创建浏览器页面...") self.create_page() logger.info(" ✓ 浏览器页面创建完成") if datas.get(self.user_id): logger.info("=" * 50) logger.info("步骤2: 使用已保存的标签页...") creator_tab = self.page.new_tab(datas.get(self.user_id)) logger.info(f" ✓ 已打开已保存的标签页: {datas.get(self.user_id)}") else: logger.info("=" * 50) logger.info("步骤2: 登录并导航到发布页面...") self.page.get(url="https://mcn.pinduoduo.com/register") logger.info(" ✓ 已打开登录页面") for i in range(100): if self.page.ele("x://*[text()='主播/作者管理']", timeout=5): logger.info(" ✓ 检测到已登录") break else: time.sleep(2 * 60) else: logger.error(" ✗ 未登录!!!") return {"ok": False, "reason": "未登录"} logger.info(" 正在导航到发布页面...") self.page.ele("x://*[text()='主播/作者管理']").click() logger.info(" ✓ 已点击'主播/作者管理'") time.sleep(1) self.page.ele("x://*[text()='签约主播/作者']").click() logger.info(" ✓ 已点击'签约主播/作者'") ele = self.page.ele("x://*[text()='稍后再说']", timeout=3) if ele: ele.click() logger.info(" ✓ 已关闭提示框") time.sleep(1) ele = self.page.ele("x://*[text()='我知道了']", timeout=3) if ele: ele.click() logger.info(" ✓ 已关闭提示框") time.sleep(1) logger.info(f" 正在搜索多多ID: {self.user_id}") self.page.ele('x://*[@placeholder="输入主播/作者ID搜索"]').input(vals=self.user_id, clear=True) time.sleep(1) self.page.ele("x://*[text()='提交']").click() logger.info(" ✓ 已提交搜索") time.sleep(1) self.page.actions.move_to(ele_or_loc="x://*[text()='内容管理']") time.sleep(1) self.page.ele("x://*[text()='内容管理']").click() logger.info(" ✓ 已点击'内容管理'") time.sleep(3) creator_tab = self.page.get_tab(url="home/creator/manage") creator_tab.ele("x://*[text()='发布视频']").click() logger.info(" ✓ 已点击'发布视频',进入发布页面") datas[self.user_id] = creator_tab.url # 从文件夹读取文件 video_file_paths = [] # 视频文件列表 image_folder_paths = [] # 图片文件夹列表(用于存储图片文件夹路径) # 支持的视频格式 video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm'] # 支持的图片格式 image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'] if prepared_video_files is not None or prepared_image_folders is not None: # 使用 GUI "更新数据"按钮预查找的文件,跳过文件夹扫描 video_file_paths = list(prepared_video_files) if prepared_video_files else [] image_folder_paths = list(prepared_image_folders) if prepared_image_folders else [] logger.info("=" * 50) logger.info("使用预查找的文件列表,跳过文件夹扫描") logger.info(f" 视频文件数量: {len(video_file_paths)}") logger.info(f" 图片文件夹数量: {len(image_folder_paths)}") if video_file_paths: logger.info("视频文件列表:") for idx, video_path in enumerate(video_file_paths, 1): logger.info(f" {idx}. {video_path}") elif folder_path and os.path.exists(folder_path): logger.info(f"开始读取文件夹: {folder_path}") logger.info(f"查找序号为 '{self.index}' 的文件") # 第一步:先收集所有匹配的视频文件和图片文件夹 logger.info("=" * 50) if collect_all_videos: logger.info("第一步:扫描文件夹,收集所有视频文件(批量上传模式,不限制序号)...") else: logger.info(f"第一步:扫描文件夹,收集序号为 '{self.index}' 的文件...") # 获取最外层文件夹下的所有子文件夹(多多ID文件夹) subdirs = [f for f in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, f))] logger.info(f"在最外层文件夹下找到 {len(subdirs)} 个子文件夹(多多ID文件夹)") # 找到匹配当前多多ID的文件夹 target_subdir = None for subdir_name in subdirs: if subdir_name == str(self.user_id): target_subdir = os.path.join(folder_path, subdir_name) logger.info(f"✓ 找到匹配的多多ID文件夹: {subdir_name}") break if not target_subdir: logger.warning(f"未找到多多ID为 {self.user_id} 的文件夹") logger.info(f"可用的文件夹: {subdirs}") logger.error("无法继续执行,因为未找到匹配的多多ID文件夹") creator_tab.close() return {"ok": False, "reason": f"未找到多多ID为 {self.user_id} 的文件夹"} else: # 只扫描匹配的多多ID文件夹 logger.info(f" 正在扫描子文件夹: {os.path.basename(target_subdir)}") # 获取该子文件夹下的所有文件和文件夹 try: items = os.listdir(target_subdir) logger.info(f" 该文件夹下有 {len(items)} 个项目") for item_name in items: item_path = os.path.join(target_subdir, item_name) logger.info(f" 检查项目: {item_name}") # 分割文件名,检查第一部分是否匹配序号 name_parts = item_name.split("-") logger.info(f" 文件名分割结果: {name_parts}") # 如果是批量上传模式,收集所有视频文件;否则只收集匹配序号的文件 should_collect = False if collect_all_videos: # 批量上传模式:只要文件名格式正确(有序号),就收集视频文件 if len(name_parts) > 0 and name_parts[0].isdigit(): should_collect = True logger.info(f" 批量上传模式:序号 {name_parts[0]} 将被收集") else: # 单个上传模式:只收集匹配当前序号的文件 if len(name_parts) > 0 and name_parts[0] == str(self.index): should_collect = True logger.info(f" ✓ 序号匹配!序号: {name_parts[0]}, 目标序号: {self.index}") if should_collect: path = Path(item_path) # 判断是否为文件(视频文件) if path.is_file(): # 检查是否为视频文件 file_ext = path.suffix.lower() logger.info(f" 这是一个文件,扩展名: {file_ext}") if any(file_ext == ext for ext in video_extensions): video_file_paths.append(path) logger.info(f" ✓ 添加到视频列表: {path.name}") logger.info(f" 当前视频总数: {len(video_file_paths)}") else: logger.info(f" ✗ 不是视频文件,跳过") else: # 这是一个图片文件夹(只在单个上传模式下处理) if not collect_all_videos: image_folder_paths.append(path) logger.info(f" ✓ 添加到图片文件夹列表: {path.name}") logger.info(f" 当前图片文件夹总数: {len(image_folder_paths)}") else: if len(name_parts) > 0: if collect_all_videos: logger.info(f" ✗ 文件名格式不正确(序号不是数字),跳过") else: logger.info(f" ✗ 序号不匹配: {name_parts[0]} != {self.index}") else: logger.info(f" ✗ 文件名格式不正确,无法提取序号") except Exception as e: logger.error(f" 扫描子文件夹 {target_subdir} 时出错: {e}") import traceback traceback.print_exc() logger.info("=" * 50) logger.info(f"文件收集完成!") logger.info(f" 视频文件数量: {len(video_file_paths)}") logger.info(f" 图片文件夹数量: {len(image_folder_paths)}") if video_file_paths: logger.info("视频文件列表:") for idx, video_path in enumerate(video_file_paths, 1): logger.info(f" {idx}. {video_path}") # 第二步:如果有视频文件,批量上传所有视频(此段代码在 if/elif 外面,对预查找和扫描两种方式都生效) if video_file_paths: logger.info("=" * 50) logger.info(f"第二步:开始批量上传 {len(video_file_paths)} 个视频文件...") # 检查是否有"支持批量上传"按钮,如果有则使用批量上传 logger.info("查找批量上传按钮...") batch_upload_btn = creator_tab.ele("x://*[text()='支持批量上传']", timeout=3) if batch_upload_btn: logger.info("✓ 找到'支持批量上传'按钮") if len(video_file_paths) > 1: logger.info(f"使用批量上传模式,准备上传 {len(video_file_paths)} 个视频") logger.info(f"上传文件列表: {[str(p) for p in video_file_paths]}") batch_upload_btn.click.to_upload(video_file_paths) logger.info(f"✓ 已触发批量上传,上传 {len(video_file_paths)} 个视频") else: logger.info(f"只有1个视频,使用批量上传按钮上传") batch_upload_btn.click.to_upload(video_file_paths) logger.info(f"✓ 已触发上传") else: logger.warning("✗ 未找到'支持批量上传'按钮,尝试使用'添加视频'按钮") # 使用单个上传(兼容旧版本或只有一个视频的情况) upload_btn = creator_tab.ele("x://*[text()='添加视频']", timeout=3) if upload_btn: logger.info("✓ 找到'添加视频'按钮") logger.info(f"使用添加视频按钮,准备上传 {len(video_file_paths)} 个视频") logger.info(f"上传文件列表: {[str(p) for p in video_file_paths]}") upload_btn.click.to_upload(video_file_paths) logger.info(f"✓ 已触发上传,上传 {len(video_file_paths)} 个视频") else: logger.error("✗ 未找到任何上传按钮!") logger.info("等待上传完成...") while True: if creator_tab.ele('x://*[contains(., "视频上传成功")]', timeout=3): break if creator_tab.ele('x://*[contains(., "视频上传失败")]', timeout=3): logger.warning("检测到页面提示「视频上传失败」,退出并标记为失败") return {"ok": False, "reason": "视频上传失败"} time.sleep(5) # 输入视频描述(只输入第一个视频的描述,因为批量上传后可能需要单独处理每个视频) if video_file_paths: first_video_name = video_file_paths[0].name file_names = first_video_name.split("-") if len(file_names) > 0: desc_text = file_names[-1].split(".")[0] + self.ht logger.info(f"准备输入视频描述: {desc_text[:50]}...") desc_input = creator_tab.ele(f'x://*[@id="magicdomid1"]', timeout=3) if desc_input: desc_input.input(vals=desc_text, clear=True) logger.info(f"✓ 已输入视频描述") else: logger.warning("✗ 未找到描述输入框") else: logger.warning("未找到任何视频文件,跳过视频上传") # 第三步:如果有图片文件夹,逐个上传图片 if image_folder_paths: logger.info(f"找到 {len(image_folder_paths)} 个图片文件夹,开始逐个上传...") for image_folder_path in image_folder_paths: image_files = [] # 收集图片文件夹中的所有图片文件 if image_folder_path.is_dir(): for img_file in os.listdir(image_folder_path): img_path = Path(os.path.join(image_folder_path, img_file)) if img_path.is_file() and any(img_path.suffix.lower() == ext for ext in image_extensions): image_files.append(str(img_path)) if image_files: creator_tab.ele('x://*[text()="添加图片"]').click.to_upload(image_files) time.sleep(3) # 提取文件夹名称用于标题和描述 folder_name = image_folder_path.name file_names = folder_name.split("-") if len(file_names) > 1: creator_tab.ele('x://*[@placeholder="添加标题"]').input(vals=file_names[1], clear=True) xpath_path = creator_tab.ele('x://*[text()="添加视频描述"]').xpath # 方法2:使用正则表达式替换最后一个div[1] new_path = re.sub(r'div\[1\]$', 'div[2]', xpath_path) new_path += "/div/div[3]/div/div/div" if len(file_names) > 2: creator_tab.ele(f'x:{new_path}').input(vals=file_names[2] + " " + self.ht, clear=True) logger.info(f"已上传图片文件夹: {folder_name}") time.sleep(1) if self.time_start and self._is_schedule_time_expired(self.time_start): logger.warning(f"定时发布时间已过期,取消发布: {self.time_start}") creator_tab.close() return {"ok": False, "reason": f"定时发布时间已过期: {self.time_start}"} # 点击立即发布选项 logger.info("=" * 50) logger.info("步骤:选择发布方式...") immediate_publish = creator_tab.ele('x://*[contains(text(), "立即发布")]', timeout=3) if immediate_publish: immediate_publish.click() logger.info(" ✓ 已点击'立即发布'选项") time.sleep(3) else: logger.warning(" ✗ 未找到'立即发布'选项") # 定时 if self.time_start: logger.info("=" * 50) logger.info("步骤:设置定时发布...") # 点击"定时发布"选项 logger.info(f" 定时时间: {self.time_start}") schedule_btn = creator_tab.ele('x://*[contains(text(), "定时发布")]', timeout=3) if schedule_btn: schedule_btn.click() logger.info(" ✓ 已点击'定时发布'选项") time.sleep(1) else: logger.warning(" ✗ 未找到'定时发布'选项") creator_tab.close() return {"ok": False, "reason": "定时设置失败:未找到定时发布选项"} # 获取日期选择器元素 date_picker_ele = creator_tab.ele('x://*[@placeholder="选择日期"]', timeout=3) if date_picker_ele: # 解析时间字符串,格式:2026-01-15 09:30:00 try: from datetime import datetime dt = datetime.strptime(self.time_start, "%Y-%m-%d %H:%M:%S") date_str = dt.strftime("%Y-%m-%d") time_str = dt.strftime("%H:%M:%S") year = dt.year month = dt.month day = dt.day hour = dt.hour minute = dt.minute second = dt.second logger.info( f"开始设置定时时间: {self.time_start} (年={year}, 月={month}, 日={day}, 时={hour}, 分={minute}, 秒={second})") # 点击日期选择器打开面板 date_picker_ele.click() time.sleep(1.5) # 等待面板完全加载 # 方法:通过点击日期和时间选择器来设置 # 1. 如果需要,先切换年月 # 2. 点击日期单元格 # 3. 点击时间选择器中的小时、分钟、秒 # 4. 点击确认按钮 # 检查并切换年月(如果需要) # 获取当前显示的月份 try: month_text_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]', timeout=2) if month_text_ele: current_month = month_text_ele.text logger.info(f"当前显示的月份: {current_month}") # 如果需要切换月份 target_month_str = f"{month}月" if current_month != target_month_str: logger.info(f"需要切换到目标月份: {target_month_str}") # 计算月份差值(简化处理,只考虑同一年内) current_month_num = int(current_month.replace('月', '')) target_month_num = month # 限定在日期选择器内,用 class 定位(RPR_right 仅日历右箭头有,避免轮播图) date_root = 'x://div[@data-testid="beast-core-datePicker-dropdown-contentRoot"]' if target_month_num > current_month_num: arrow_selector = f'{date_root}//*[contains(@class,"RPR_iconPrevNext") and contains(@class,"RPR_right")]' clicks_needed = target_month_num - current_month_num else: arrow_selector = f'{date_root}//*[contains(@class,"RPR_iconPrevNext") and not(contains(@class,"RPR_right"))]' clicks_needed = current_month_num - target_month_num # 点击箭头切换月份 for _ in range(min(clicks_needed, 12)): arrow = creator_tab.ele(arrow_selector, timeout=1) if arrow: try: arrow.click() except Exception: pass time.sleep(0.5) # 验证是否切换成功 new_month_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]', timeout=1) if new_month_ele and new_month_ele.text == target_month_str: logger.info(f"成功切换到目标月份: {target_month_str}") break except Exception as e: logger.warning(f"切换月份时出错: {e},继续尝试选择日期") # 选择日期 - 点击对应的日期单元格 date_cell = creator_tab.ele( f'x://td[@role="date-cell"]//div[@title="{day}" and not(contains(@class, "RPR_disabled")) and not(contains(@class, "RPR_outOfMonth"))]', timeout=3) if date_cell: date_cell.click() logger.info(f"已点击日期: {day}") time.sleep(0.5) else: logger.warning(f"未找到日期单元格: {day}") # 先点击时间输入框打开时间选择器 time_input = creator_tab.ele('x://input[@data-testid="beast-core-timePicker-html-input"]', timeout=3) if time_input: time_input.click() logger.info("已点击时间输入框,打开时间选择器") time.sleep(0.8) # 等待时间选择器面板打开 else: logger.warning("未找到时间输入框,尝试使用XPath") # 备用方案:使用用户提供的XPath try: time_input_xpath = '/html/body/div[2]/div/div/div/div/div/footer/div/div/div/div/div/div/div/div[1]/input' time_input = creator_tab.ele(f'x:{time_input_xpath}', timeout=2) if time_input: time_input.click() logger.info("通过XPath点击了时间输入框") time.sleep(0.8) except Exception as e: logger.warning(f"通过XPath也未能找到时间输入框: {e}") # 选择时间 - 点击时间选择器中的小时、分钟、秒 # 小时 hour_str = f"{hour:02d}" hour_item = creator_tab.ele( f'x://ul[@data-testid="beast-core-timePicker-list-hh"]//li[text()="{hour_str}"]', timeout=3) if hour_item: hour_item.scroll.to_see() time.sleep(0.2) hour_item.click() logger.info(f"已选择小时: {hour_str}") time.sleep(0.3) else: logger.warning(f"未找到小时选项: {hour_str}") # 分钟 minute_str = f"{minute:02d}" minute_item = creator_tab.ele( f'x://ul[@data-testid="beast-core-timePicker-list-mm"]//li[text()="{minute_str}"]', timeout=3) if minute_item: minute_item.scroll.to_see() time.sleep(0.2) minute_item.click() logger.info(f"已选择分钟: {minute_str}") time.sleep(0.3) else: logger.warning(f"未找到分钟选项: {minute_str}") # 秒 second_str = f"{second:02d}" second_item = creator_tab.ele( f'x://ul[@data-testid="beast-core-timePicker-list-ss"]//li[text()="{second_str}"]', timeout=3) if second_item: second_item.scroll.to_see() time.sleep(0.2) second_item.click() logger.info(f"已选择秒: {second_str}") time.sleep(0.3) else: logger.warning(f"未找到秒选项: {second_str}") # 点击确认按钮 try: # 查找确认按钮 confirm_btn = creator_tab.ele( 'x://button[@data-testid="beast-core-button"]//span[text()="确认"]', timeout=3) if confirm_btn: confirm_btn.click() logger.info("已点击确认按钮") time.sleep(0.5) else: # 尝试通过JavaScript点击确认按钮 confirm_js = """ (function() { const buttons = document.querySelectorAll('button[data-testid="beast-core-button"]'); for (let btn of buttons) { const span = btn.querySelector('span'); if (span && span.textContent.includes('确认')) { btn.click(); return true; } } return false; })(); """ result = creator_tab.run_js(confirm_js) if result: logger.info("通过JavaScript点击了确认按钮") else: logger.warning("未找到确认按钮") time.sleep(0.5) except Exception as e: logger.warning(f"点击确认按钮失败: {e}") # 验证设置是否成功(同时校验日期+时间) time.sleep(0.5) ok, reason, actual_date, actual_time = self._verify_schedule_value( creator_tab, dt, video_container=None ) if ok: logger.info(f"成功设置定时时间: {date_str} {time_str}") else: logger.warning( f"设置的时间可能不准确,日期={actual_date} 时间={actual_time}, 期望={date_str} {time_str}" ) creator_tab.close() return {"ok": False, "reason": reason} except ValueError as e: logger.error(f"时间格式错误: {self.time_start}, 正确格式应为: YYYY-MM-DD HH:MM:SS, 错误: {e}") creator_tab.close() return {"ok": False, "reason": "定时设置失败:时间格式错误"} except Exception as e: logger.error(f"设置定时时间失败: {e}") import traceback traceback.print_exc() creator_tab.close() return {"ok": False, "reason": f"定时设置失败:{e}"} else: logger.error("未找到日期选择器,可能设置失败") creator_tab.close() return {"ok": False, "reason": "定时设置失败:未找到日期选择器"} # 绑定任务(仅在有达人链接时才点击) if self.url and str(self.url).strip(): ele = creator_tab.ele('x://*[text()="点击绑定任务"]', timeout=3) if ele: ele.click() creator_tab.ele('x://*[@placeholder="请输入个人主页链接"]').input(self.url) time.sleep(1) creator_tab.ele('x://*[text()="确认"]').click() time.sleep(1) else: logger.warning("未找到绑定任务按钮") else: logger.info("未设置达人链接,跳过绑定任务") ele = creator_tab.ele('x://*[text()="我已阅读并同意"]', timeout=3) if ele: ele.click() time.sleep(1) creator_tab.ele('x://*[text()="一键发布"]').click() # 尝试验证是否发布成功(避免“点了就算成功”的误判) publish_ok, publish_reason = self._verify_publish_success(creator_tab=creator_tab, video_container=None) if publish_ok: logger.info(f"✓ 发布校验通过:{publish_reason}") else: logger.warning(f"✗ 发布校验失败:{publish_reason}") time.sleep(2) creator_tab.close() return {"ok": publish_ok, "reason": publish_reason} def action1(self, folder_path=None, input_delay=0, on_item_done=None): """ 批量上传视频,针对每个视频单独处理详情、定时任务和绑定任务 Args: folder_path: 视频文件列表 input_delay: 输入延迟时间 on_item_done: 回调函数,每处理完一个视频后调用,参数为 dict: {index, path, name, ok, reason} """ logger.info("=" * 50) logger.info("开始执行 action1 方法(批量上传模式)") logger.info(f"多多ID: {self.user_id}, 序号: {self.index}") logger.info(f"需要处理的视频数量: {len(folder_path) if folder_path else 0}") logger.info("=" * 50) logger.info("步骤1: 创建浏览器页面...") self.create_page() logger.info(" ✓ 浏览器页面创建完成") logger.info("=" * 50) logger.info("步骤2: 登录并导航到发布页面...") if datas.get(self.user_id): logger.info(" 使用已保存的标签页...") creator_tab = self.page.new_tab(datas.get(self.user_id)) logger.info(f" ✓ 已打开已保存的标签页: {datas.get(self.user_id)}") else: logger.info("=" * 50) logger.info("步骤2: 登录并导航到发布页面...") self.page.get(url="https://mcn.pinduoduo.com/register") logger.info(" ✓ 已打开登录页面") for i in range(100): if self.page.ele("x://*[text()='主播/作者管理']", timeout=5): logger.info(" ✓ 检测到已登录") break else: time.sleep(5 * 60) else: logger.error(" ✗ 未登录!!!") return {"ok": False, "results": [], "reason": "未登录"} logger.info(" 正在导航到发布页面...") self.page.ele("x://*[text()='主播/作者管理']").click() logger.info(" ✓ 已点击'主播/作者管理'") time.sleep(1) self.page.ele("x://*[text()='签约主播/作者']").click() logger.info(" ✓ 已点击'签约主播/作者'") ele = self.page.ele("x://*[text()='我知道了']", timeout=3) if ele: ele.click() logger.info(" ✓ 已关闭提示框") time.sleep(1) logger.info(f" 正在搜索多多ID: {self.user_id}") self.page.ele('x://*[@placeholder="输入主播/作者ID搜索"]').input(vals=self.user_id, clear=True) time.sleep(1) self.page.ele("x://*[text()='提交']").click() logger.info(" ✓ 已提交搜索") time.sleep(1) self.page.actions.move_to(ele_or_loc="x://*[text()='内容管理']") time.sleep(1) self.page.ele("x://*[text()='内容管理']").click() logger.info(" ✓ 已点击'内容管理'") time.sleep(3) creator_tab = self.page.get_tab(url="home/creator/manage") creator_tab.ele("x://*[text()='发布视频']").click() logger.info(" ✓ 已点击'发布视频',进入发布页面") datas[self.user_id] = creator_tab.url # 批量上传视频 logger.info("=" * 50) logger.info("步骤3: 准备批量上传视频...") videos = [] for idx, video_info in enumerate(folder_path, 1): video_path = video_info["path"] videos.append(video_path) logger.info(f" {idx}. {video_path.name} ({video_path})") logger.info(f"=" * 50) logger.info(f"准备批量上传 {len(videos)} 个视频") logger.info(f"视频文件列表: {[str(v) for v in videos]}") # 优先使用"支持批量上传"按钮,如果没有则使用"添加视频"按钮 logger.info("查找上传按钮...") batch_upload_btn = creator_tab.ele("x://*[text()='支持批量上传']", timeout=3) if batch_upload_btn: logger.info("✓ 找到'支持批量上传'按钮") logger.info(f"使用批量上传按钮,准备上传 {len(videos)} 个视频") batch_upload_btn.click.to_upload(videos) logger.info(f"✓ 已触发批量上传,上传 {len(videos)} 个视频") else: logger.warning("✗ 未找到'支持批量上传'按钮,尝试使用'添加视频'按钮") # 备用方案:使用"添加视频"按钮(可能也支持批量上传) add_video_btn = creator_tab.ele("x://*[text()='添加视频']", timeout=3) if add_video_btn: logger.info("✓ 找到'添加视频'按钮") logger.info(f"使用添加视频按钮,准备上传 {len(videos)} 个视频") add_video_btn.click.to_upload(videos) logger.info(f"✓ 已触发上传,上传 {len(videos)} 个视频") else: logger.error("✗ 未找到任何视频上传按钮!") creator_tab.close() return {"ok": False, "results": [], "reason": "未找到任何视频上传按钮"} # 等待一段时间让视频开始上传和页面渲染 logger.info("=" * 50) logger.info("步骤4: 等待视频上传和页面渲染...") logger.info(f" 已触发上传 {len(videos)} 个视频,等待 {5} 秒让页面渲染...") time.sleep(5) logger.info(" ✓ 等待完成,开始处理视频详情") # 不等待所有视频上传完成,而是检测每个视频的状态 # 只处理已上传完成的视频,跳过还在上传中的视频 logger.info("=" * 50) logger.info("步骤5: 开始处理每个视频的详细信息...") logger.info(f" 需要处理的视频总数: {len(folder_path)}") logger.info(" 将逐个处理:输入描述 -> 设置定时 -> 绑定任务 -> 点击发布 -> 验证发布结果") results = [] for idx, video_info in enumerate(folder_path): try: video_path = video_info["path"] video_name = video_path.name # 格式化话题(如果还没有格式化) raw_ht = video_info.get("ht", self.ht) video_ht = self._format_topic(raw_ht) if raw_ht else self.ht video_time_start = video_info.get("time_start", self.time_start) video_url = video_info.get("url", self.url) if video_time_start and self._is_schedule_time_expired(video_time_start): reason = f"定时发布时间已过期: {video_time_start}" logger.warning(f" ✗ {reason},跳过视频 {video_name}") result_item = { "index": str(video_info.get("index", "")), "path": str(video_path), "name": video_name, "ok": False, "reason": reason, } results.append(result_item) if on_item_done and callable(on_item_done): try: on_item_done(result_item) except Exception: pass continue logger.info("=" * 50) logger.info(f"处理第 {idx + 1}/{len(folder_path)} 个视频") logger.info(f" 视频名称: {video_name}") logger.info(f" 视频路径: {video_path}") logger.info(f" 话题: {video_ht}") logger.info(f" 定时时间: {video_time_start}") logger.info(f" 绑定URL: {video_url[:50] if video_url else 'None'}...") # 定位视频容器:优先按文件名匹配,其次按索引 logger.info(f" 正在定位视频容器...") video_container = None video_name_without_ext = video_name.rsplit(".", 1)[0] logger.info(f" 完整文件名: {video_name}") logger.info(f" 不含扩展名: {video_name_without_ext}") container_xpath = ( 'x://p[contains(text(), "文件名:") and contains(text(), "{name}")]' '/ancestor::div[contains(@class, "y0VjbyIp")][1]' ) logger.info(f" 方法1: 按完整文件名匹配...") video_container = creator_tab.ele( container_xpath.format(name=video_name), timeout=5 ) if video_container: logger.info(f" ✓ 通过完整文件名找到视频容器") else: logger.info(f" ✗ 未找到,尝试不含扩展名...") video_container = creator_tab.ele( container_xpath.format(name=video_name_without_ext), timeout=3 ) if video_container: logger.info(f" ✓ 通过不含扩展名找到视频容器") if not video_container: logger.info(f" ✗ 按文件名未找到,尝试按索引匹配...") containers = creator_tab.eles( 'x://p[contains(text(), "文件名:")]' '/ancestor::div[contains(@class, "y0VjbyIp")][1]' ) logger.info(f" 页面上共有 {len(containers)} 个视频容器") if idx < len(containers): video_container = containers[idx] logger.warning( f" ✓ 通过索引方式定位到视频容器: {idx + 1}/{len(containers)}" ) else: logger.error(f" ✗ 索引超出范围: {idx} >= {len(containers)}") if not video_container: logger.error(f" ✗ 未找到视频 {video_name} 的容器,跳过该视频") continue else: logger.info(f" ✓ 成功定位到视频容器") # 检测上传状态(在当前视频容器内) logger.info(f" 检测视频上传状态...") # 优先判断是否仍在上传,其次判断发布按钮是否可用 while True: if video_container.ele( 'x://*[contains(., "视频上传成功")]', timeout=0.5 ): break if video_container.ele('x://*[contains(., "视频上传失败")]', timeout=3): logger.warning(f"检测到视频 {video_name} 上传失败,退出并标记该条为失败") result_item = { "index": str(video_info.get("index", "")), "path": str(video_path), "name": video_name, "ok": False, "reason": "视频上传失败", } results.append(result_item) if on_item_done and callable(on_item_done): try: on_item_done(result_item) except Exception: pass return {"ok": False, "results": results, "reason": "视频上传失败"} logger.warning(f" ✗ 视频 {video_name} 还在上传中,跳过处理") time.sleep(5) success_text = video_container.ele( 'x://*[contains(., "视频上传成功")]', timeout=0.5 ) if success_text: logger.info(f" ✓ 检测到视频 {video_name} 上传成功标识") else: logger.info(f" 未找到'上传成功'标识,检查发布按钮状态...") # 备用判断:发布按钮未禁用即可认为已完成 disabled_publish = video_container.ele( 'x://button[@data-testid="beast-core-button" and (@disabled or contains(@class, "BTN_disabled"))]//span[text()="发布"]', timeout=0.5 ) if disabled_publish: logger.warning(f" ✗ 视频 {video_name} 发布按钮仍禁用,跳过") continue logger.info(f" ✓ 未检测到成功标识,但发布按钮可用,继续处理 {video_name}") # 1. 输入视频描述 logger.info(f" 步骤1: 输入视频描述...") try: # 格式化话题(如果还没有格式化) formatted_ht = self._format_topic(video_ht) if video_ht else "" desc_text = video_name.split(".")[0].split("-")[-1] + formatted_ht logger.info(f" 描述文本: {desc_text[:50]}...") desc_inputs = video_container.eles('x://*[starts-with(@id, "magicdomid")]') if desc_inputs: logger.info(f" 找到 {len(desc_inputs)} 个描述输入框,使用第一个") desc_inputs[0].input(vals=desc_text, clear=True) logger.info(f" ✓ 已输入视频描述") else: logger.warning(f" ✗ 在视频容器中未找到描述输入框") except Exception as e: logger.warning(f" ✗ 输入视频描述失败: {e}") import traceback traceback.print_exc() time.sleep(1) # 给用户留出填写信息的时间 if input_delay and input_delay > 0: logger.info(f"等待用户填写视频信息: {input_delay}s") time.sleep(input_delay) # 2. 设置定时任务(如果该视频有定时时间) if video_time_start: try: # 定位并确认勾选“定时发布” if not self._ensure_schedule_selected(video_container): reason = "定时设置失败:未能切换到定时发布" logger.warning(f" ✗ {reason}") result_item = { "index": str(video_info.get("index", "")), "path": str(video_path), "name": video_name, "ok": False, "reason": reason, } results.append(result_item) if on_item_done and callable(on_item_done): try: on_item_done(result_item) except Exception: pass continue # 设置定时时间 schedule_ok, schedule_reason = self._set_schedule_time( creator_tab, video_time_start, video_container, idx ) if not schedule_ok: reason = schedule_reason or "定时设置失败:未知原因" logger.warning(f" ✗ {reason}") result_item = { "index": str(video_info.get("index", "")), "path": str(video_path), "name": video_name, "ok": False, "reason": reason, } results.append(result_item) if on_item_done and callable(on_item_done): try: on_item_done(result_item) except Exception: pass continue except Exception as e: logger.warning(f" ✗ 设置定时任务失败: {e}") import traceback traceback.print_exc() result_item = { "index": str(video_info.get("index", "")), "path": str(video_path), "name": video_name, "ok": False, "reason": f"定时设置异常: {e}", } results.append(result_item) if on_item_done and callable(on_item_done): try: on_item_done(result_item) except Exception: pass continue else: logger.info(f" 步骤2: 跳过定时任务(未设置定时时间)") # 3. 绑定任务(仅在有达人链接时才点击) if video_url and str(video_url).strip(): logger.info(f" 步骤3: 绑定任务...") logger.info(f" 绑定URL: {video_url[:50]}...") try: bind_btn = video_container.ele('x://*[text()="点击绑定任务"]', timeout=3) if bind_btn: logger.info(f" ✓ 找到绑定任务按钮") bind_btn.click() time.sleep(1) # 输入URL(这个输入框可能在弹窗中,使用全局查找) url_input = creator_tab.ele('x://*[@placeholder="请输入个人主页链接"]', timeout=3) if url_input: logger.info(f" ✓ 找到URL输入框") url_input.input(video_url, clear=True) time.sleep(1) confirm_btn = creator_tab.ele('x://*[text()="确认"]', timeout=3) if confirm_btn: confirm_btn.click() logger.info(f" ✓ 已绑定任务") time.sleep(1) else: logger.warning(f" ✗ 未找到确认按钮") else: logger.warning(f" ✗ 未找到URL输入框") else: logger.warning(f" ✗ 未找到绑定任务按钮") except Exception as e: logger.warning(f" ✗ 绑定任务失败: {e}") import traceback traceback.print_exc() else: logger.info(f" 步骤3: 跳过绑定任务(未设置URL)") # 4. 点击该视频的发布按钮(注意:这里应该是单个视频的"发布"按钮,不是"立即发布") logger.info(f" 步骤4: 点击发布按钮...") try: publish_btn = video_container.ele( 'x://button[@data-testid="beast-core-button"]//span[text()="发布"]', timeout=3 ) if publish_btn: publish_btn.click() logger.info(f" ✓ 已点击发布按钮") ok, reason = self._verify_publish_success(creator_tab=creator_tab, video_container=video_container) if ok: logger.info(f" ✓ 发布校验通过:{reason}") else: logger.warning(f" ✗ 发布校验失败:{reason}") else: ok, reason = False, "在视频容器中未找到发布按钮" logger.warning(f" ✗ {reason}") except Exception as e: ok, reason = False, f"点击发布按钮异常: {e}" logger.warning(f" ✗ {reason}") import traceback traceback.print_exc() result_item = { "index": str(video_info.get("index", "")), "path": str(video_path), "name": video_name, "ok": bool(ok), "reason": str(reason), } results.append(result_item) logger.info(f" ✓ 视频 {video_name} 处理完成(ok={ok})") # 实时回调通知 GUI 更新状态(每处理完一个视频就通知) if on_item_done and callable(on_item_done): try: on_item_done(result_item) except Exception as e: logger.warning(f"回调通知失败: {e}") time.sleep(2) # 每个视频处理间隔 except Exception as e: logger.error( f"处理视频 {video_info.get('path', {}).name if hasattr(video_info.get('path', ''), 'name') else 'unknown'} 时出错: {e}") import traceback traceback.print_exc() try: error_result = { "index": str(video_info.get("index", "")), "path": str(video_info.get("path", "")), "name": getattr(video_info.get("path", None), "name", "unknown"), "ok": False, "reason": f"处理异常: {e}", } results.append(error_result) # 实时回调通知 GUI 更新状态(失败情况) if on_item_done and callable(on_item_done): try: on_item_done(error_result) except Exception: pass except Exception: pass continue # 最后点击一键发布(如果有) logger.info("=" * 50) logger.info("步骤:点击一键发布(最终确认)...") try: ele = creator_tab.ele('x://*[text()="我已阅读并同意"]', timeout=3) if ele: ele.click() logger.info(" ✓ 已勾选'我已阅读并同意'") time.sleep(0.5) else: logger.info(" - 未找到'我已阅读并同意'复选框,可能已默认勾选") # one_key_publish = creator_tab.ele('x://*[text()="一键发布"]', timeout=3) # if one_key_publish: # logger.info(" ✓ 找到'一键发布'按钮") # one_key_publish.click() # logger.info(" ✓ 已点击一键发布按钮") # # logger.info(" 正在验证一键发布是否成功...") # ok, reason = self._verify_publish_success(creator_tab=creator_tab, video_container=None) # if ok: # logger.info(f" ✓ 一键发布校验通过:{reason}") # else: # logger.warning(f" ✗ 一键发布校验失败:{reason}") # time.sleep(2) # else: # logger.warning(" ✗ 未找到'一键发布'按钮") except Exception as e: logger.warning(f" ✗ 一键发布失败: {e}") import traceback traceback.print_exc() time.sleep(2) logger.info("=" * 50) success_count = sum(1 for r in results if r.get("ok")) if results else 0 total_count = len(results) if results else 0 logger.info(f"批量上传任务完成,成功: {success_count}/{total_count}") creator_tab.close() return {"ok": all(r.get("ok") for r in results) if results else True, "results": results} def _verify_publish_success(self, creator_tab, video_container=None, timeout_seconds: float = 8.0): """ 尝试验证“是否真的发布成功”。 说明:不同版本页面反馈可能不同,这里采用多策略: - toast/提示文本包含:发布成功/提交成功/已发布/已提交/成功 - 发布按钮变为禁用/消失/文案变化 """ import time as _t start = _t.time() last_hint = "未检测到成功反馈" success_keywords = ["发布成功", "提交成功", "已发布", "已提交", "成功"] fail_keywords = ["失败", "错误", "异常", "请重试"] def _has_text_anywhere(keywords): for kw in keywords: # 全局找,兼容 toast if creator_tab.ele(f'x://*[contains(text(), "{kw}")]', timeout=0.2): return kw return None while _t.time() - start < timeout_seconds: hit_fail = _has_text_anywhere(fail_keywords) if hit_fail: return False, f"检测到失败提示: {hit_fail}" hit_ok = _has_text_anywhere(success_keywords) if hit_ok: return True, f"检测到成功提示: {hit_ok}" # 尝试按钮状态变化(优先容器内) try: scope = video_container if video_container else creator_tab disabled = scope.ele( 'x://button[@data-testid="beast-core-button" and (@disabled or contains(@class, "BTN_disabled"))]//span[text()="发布"]', timeout=0.2 ) if disabled: return True, "发布按钮变为禁用" except Exception: pass last_hint = "等待页面反馈..." _t.sleep(0.3) return False, last_hint def _set_schedule_time(self, creator_tab, time_start, video_container=None, video_index=None): """ 设置定时发布时间的辅助方法 Args: creator_tab: 浏览器标签页对象 time_start: 时间字符串,格式:2026-01-15 09:30:00 video_container: 视频容器元素(可选) video_index: 视频索引(当video_container为None时使用) """ try: from datetime import datetime dt = datetime.strptime(time_start, "%Y-%m-%d %H:%M:%S") date_str = dt.strftime("%Y-%m-%d") year = dt.year month = dt.month day = dt.day hour = dt.hour minute = dt.minute second = dt.second logger.info( f"开始设置定时时间: {time_start} (年={year}, 月={month}, 日={day}, 时={hour}, 分={minute}, 秒={second})") # 定位日期选择器(优先容器内,且优先 active) date_picker_ele = None if video_container and video_container != creator_tab: date_picker_ele = video_container.ele( 'x://div[@data-testid="beast-core-datePicker-input" and @data-status="active"]' '//input[@data-testid="beast-core-datePicker-htmlInput"]', timeout=2 ) if not date_picker_ele: date_picker_ele = video_container.ele( 'x://input[@data-testid="beast-core-datePicker-htmlInput"]', timeout=2 ) if not date_picker_ele: date_picker_ele = video_container.ele( 'x://*[@placeholder="选择日期"]', timeout=2 ) if not date_picker_ele: # 兜底:全局查找(优先 active) date_picker_ele = creator_tab.ele( 'x://div[@data-testid="beast-core-datePicker-input" and @data-status="active"]' '//input[@data-testid="beast-core-datePicker-htmlInput"]', timeout=2 ) if not date_picker_ele: date_picker_ele = creator_tab.ele( 'x://input[@data-testid="beast-core-datePicker-htmlInput"]', timeout=2 ) if not date_picker_ele: date_picker_ele = creator_tab.ele('x://*[@placeholder="选择日期"]', timeout=2) if not date_picker_ele: logger.warning("未找到日期选择器") return False, "定时设置失败:未找到日期选择器" # 点击日期选择器打开面板 date_picker_ele.click() time.sleep(1.5) # 检查并切换年月(如果需要) try: month_text_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]', timeout=2) if month_text_ele: current_month = month_text_ele.text logger.info(f"当前显示的月份: {current_month}") target_month_str = f"{month}月" if current_month != target_month_str: logger.info(f"需要切换到目标月份: {target_month_str}") current_month_num = int(current_month.replace('月', '')) target_month_num = month date_root = 'x://div[@data-testid="beast-core-datePicker-dropdown-contentRoot"]' if target_month_num > current_month_num: arrow_selector = f'{date_root}//*[contains(@class,"RPR_iconPrevNext") and contains(@class,"RPR_right")]' clicks_needed = target_month_num - current_month_num else: arrow_selector = f'{date_root}//*[contains(@class,"RPR_iconPrevNext") and not(contains(@class,"RPR_right"))]' clicks_needed = current_month_num - target_month_num for _ in range(min(clicks_needed, 12)): arrow = creator_tab.ele(arrow_selector, timeout=1) if arrow: try: arrow.click() except Exception: pass time.sleep(0.5) new_month_ele = creator_tab.ele('x://span[@class="RPR_dateText_5-152-0"]', timeout=1) if new_month_ele and new_month_ele.text == target_month_str: logger.info(f"成功切换到目标月份: {target_month_str}") break except Exception as e: logger.warning(f"切换月份时出错: {e},继续尝试选择日期") # 选择日期 date_cell = creator_tab.ele( f'x://td[@role="date-cell"]//div[@title="{day}" and not(contains(@class, "RPR_disabled")) and not(contains(@class, "RPR_outOfMonth"))]', timeout=3) if date_cell: date_cell.click() logger.info(f"已点击日期: {day}") time.sleep(0.5) else: logger.warning(f"未找到日期单元格: {day}") return False, "定时设置失败:未找到日期单元格" # 点击时间输入框打开时间选择器(优先容器内) time_input = None if video_container and video_container != creator_tab: time_input = video_container.ele( 'x://input[@data-testid="beast-core-timePicker-html-input"]', timeout=3 ) if not time_input: time_input = creator_tab.ele( 'x://input[@data-testid="beast-core-timePicker-html-input"]', timeout=3 ) if time_input: time_input.click() logger.info("已点击时间输入框,打开时间选择器") time.sleep(0.8) else: logger.warning("未找到时间输入框,尝试使用XPath") try: time_input_xpath = '/html/body/div[2]/div/div/div/div/div/footer/div/div/div/div/div/div/div/div[1]/input' time_input = creator_tab.ele(f'x:{time_input_xpath}', timeout=2) if time_input: time_input.click() logger.info("通过XPath点击了时间输入框") time.sleep(0.8) except Exception as e: logger.warning(f"通过XPath也未能找到时间输入框: {e}") if not time_input: return False, "定时设置失败:未找到时间输入框" # 选择时间 hour_str = f"{hour:02d}" hour_item = creator_tab.ele( f'x://ul[@data-testid="beast-core-timePicker-list-hh"]//li[text()="{hour_str}"]', timeout=3) if hour_item: hour_item.scroll.to_see() time.sleep(0.2) hour_item.click() logger.info(f"已选择小时: {hour_str}") time.sleep(0.3) else: logger.warning(f"未找到小时选项: {hour_str}") minute_str = f"{minute:02d}" minute_item = creator_tab.ele( f'x://ul[@data-testid="beast-core-timePicker-list-mm"]//li[text()="{minute_str}"]', timeout=3) if minute_item: minute_item.scroll.to_see() time.sleep(0.2) minute_item.click() logger.info(f"已选择分钟: {minute_str}") time.sleep(0.3) else: logger.warning(f"未找到分钟选项: {minute_str}") second_str = f"{second:02d}" second_item = creator_tab.ele( f'x://ul[@data-testid="beast-core-timePicker-list-ss"]//li[text()="{second_str}"]', timeout=3) if second_item: second_item.scroll.to_see() time.sleep(0.2) second_item.click() logger.info(f"已选择秒: {second_str}") time.sleep(0.3) else: logger.warning(f"未找到秒选项: {second_str}") # 点击确认按钮 try: confirm_btn = creator_tab.ele( 'x://button[@data-testid="beast-core-button"]//span[text()="确认"]', timeout=3) if confirm_btn: confirm_btn.click() logger.info("已点击确认按钮") time.sleep(0.5) else: confirm_js = """ (function() { const buttons = document.querySelectorAll('button[data-testid="beast-core-button"]'); for (let btn of buttons) { const span = btn.querySelector('span'); if (span && span.textContent.includes('确认')) { btn.click(); return true; } } return false; })(); """ result = creator_tab.run_js(confirm_js) if result: logger.info("通过JavaScript点击了确认按钮") else: logger.warning("未找到确认按钮") time.sleep(0.5) except Exception as e: logger.warning(f"点击确认按钮失败: {e}") # 验证设置是否成功(优先从当前容器读取输入框值) time.sleep(0.5) expected_str = dt.strftime("%Y-%m-%d %H:%M:%S") final_value = None try: value_ele = None if video_container and video_container != creator_tab: value_ele = video_container.ele( 'x://input[@data-testid="beast-core-datePicker-htmlInput"]', timeout=1 ) if not value_ele: value_ele = creator_tab.ele( 'x://input[@data-testid="beast-core-datePicker-htmlInput"]', timeout=1 ) if value_ele: final_value = value_ele.attr("value") except Exception as e: logger.warning(f"读取日期选择器值失败: {e}") if final_value and str(final_value).strip(): final_value_str = str(final_value).strip() logger.info(f"日期选择器当前值: {final_value_str}, 期望值: {expected_str}") # 完整比较日期时间字符串 if final_value_str == expected_str: logger.info(f"成功设置定时时间: {final_value_str}") return True, "" else: logger.warning(f"设置的时间可能不准确,当前值: {final_value_str}, 期望值: {expected_str}") return False, f"定时设置失败:期望 {expected_str},实际 {final_value_str}" else: logger.error("无法获取日期选择器的值,可能设置失败") return False, "定时设置失败:无法读取日期选择器值" except ValueError as e: logger.error(f"时间格式错误: {time_start}, 正确格式应为: YYYY-MM-DD HH:MM:SS, 错误: {e}") return False, "定时设置失败:时间格式错误" except Exception as e: logger.error(f"设置定时时间失败: {e}") import traceback traceback.print_exc() return False, f"定时设置失败:{e}" return False, "定时设置失败:未知原因" def _verify_schedule_value(self, creator_tab, expected_dt, video_container=None): """ 验证定时时间是否设置正确 Args: creator_tab: 浏览器标签页对象 expected_dt: 期望的 datetime 对象 video_container: 视频容器元素(可选) Returns: (ok, reason, actual_date, actual_time): 验证结果 """ try: expected_str = expected_dt.strftime("%Y-%m-%d %H:%M:%S") expected_date = expected_dt.strftime("%Y-%m-%d") expected_time = expected_dt.strftime("%H:%M:%S") # 读取日期时间输入框的值 final_value = None try: value_ele = None if video_container and video_container != creator_tab: value_ele = video_container.ele( 'x://input[@data-testid="beast-core-datePicker-htmlInput"]', timeout=1 ) if not value_ele: value_ele = creator_tab.ele( 'x://input[@data-testid="beast-core-datePicker-htmlInput"]', timeout=1 ) if value_ele: final_value = value_ele.attr("value") except Exception as e: logger.warning(f"读取日期选择器值失败: {e}") if not final_value or not str(final_value).strip(): return False, "定时设置失败:无法读取日期选择器值", "", "" final_value_str = str(final_value).strip() logger.info(f"日期选择器当前值: {final_value_str}, 期望值: {expected_str}") # 比较完整的日期时间字符串 if final_value_str == expected_str: return True, "", final_value_str[:10], final_value_str[11:] if len(final_value_str) > 10 else "" # 如果完整比较不匹配,尝试分别比较日期和时间 actual_date = final_value_str[:10] if len(final_value_str) >= 10 else final_value_str actual_time = final_value_str[11:] if len(final_value_str) > 11 else "" if actual_date == expected_date and actual_time == expected_time: return True, "", actual_date, actual_time # 不匹配 reason = f"定时设置失败:期望 {expected_str},实际 {final_value_str}" return False, reason, actual_date, actual_time except Exception as e: logger.error(f"验证定时时间失败: {e}") return False, f"定时设置失败:验证异常 {e}", "", "" def _ensure_schedule_selected(self, video_container): """确保切换到定时发布选项。""" try: schedule_label = video_container.ele( 'x://label[@data-testid="beast-core-radio"][.//*[contains(text(), "立即发布")]]', timeout=2 ) if schedule_label: schedule_label.click() time.sleep(1) schedule_label = video_container.ele( 'x://label[@data-testid="beast-core-radio"][.//*[contains(text(), "定时发布")]]', timeout=2 ) if schedule_label: schedule_label.click() time.sleep(0.5) checked = schedule_label.attr("data-checked") if checked == "true": logger.info("定时发布已选中") return True # 备用:点击内部 radio input radio_input = schedule_label.ele('x://input[@type="radio"]', timeout=1) if radio_input: radio_input.click() time.sleep(0.5) checked = schedule_label.attr("data-checked") if checked == "true": logger.info("定时发布已选中") return True # 最后兜底:点击文本 schedule_text = video_container.ele('x://*[contains(text(), "定时发布")]', timeout=1) if schedule_text: schedule_text.click() time.sleep(0.5) return True except Exception as e: logger.warning(f"切换定时发布失败: {e}") return False datas = ThreadSafeDict() # if __name__ == '__main__': # url = "18 【运动男孩都爱这么穿吗?🏃 - Liu_烫烫 | 小红书 - 你的生活兴趣社区】 😆 D13BaPl6xyUAuQO 😆 https://www.xiaohongshu.com/discovery/item/678ceeef000000001602fb54?source=webshare&xhsshare=pc_web&xsec_token=ABe9oWR9CYCsHBkWUPuoS1Fz3_Uz4WGFMdfCGwSbl0Dfs=&xsec_source=pc_share" # pdd = Pdd( # url=url, # user_id="1050100241", # time_start="2026-01-28 09:30:00", # ht="#python #haha", # index="1", # ) # # pdd.action(folder_path=r"C:\Users\27942\Desktop\多多自动化发文") # # folder_path = r"C:\Users\27942\Desktop\多多自动化发文" # file_paths = [] # for file in os.listdir(folder_path): # 获取文件夹下所有的文件夹 # file_path = os.path.join(folder_path, file) # 拼接文件夹 # # 检查是否为目录,跳过文件(如.lnk快捷方式) # if not os.path.isdir(file_path): # continue # files = os.listdir(file_path) # 获取用户id下的文件 # for file in files: # if ".mp4" in file: # file_names = file.split("-") # # path = Path(os.path.join(file_path, file)) # # 判断是否为文件 # # file_paths.append( # { # "url": url, # "user_id": "1050100241", # "time_start": "2026-01-28 09:30:00", # "ht": "#python #haha", # "index": "1", # "path": path # } # ) # # pdd.action1(folder_path=file_paths)