# -*- coding: utf-8 -*- """ 本地脚本:由 DrissionPage (DP) 控制浏览器(本地谷歌 Chrome 或比特浏览器)。 - 使用本地谷歌:USE_LOCAL_CHROME = True,会启动/连接本机 Chrome。 - 使用比特浏览器:USE_LOCAL_CHROME = False,需先启动比特浏览器客户端(API 端口 54345)。 """ from __future__ import annotations import random import sys import time from pathlib import Path from typing import Any, Dict, List, Optional # 保证从项目根目录运行时可导入 worker 包 _ROOT = Path(__file__).resolve().parent if str(_ROOT) not in sys.path: sys.path.insert(0, str(_ROOT)) # ---------- 选择控制对象 ---------- USE_LOCAL_CHROME = False # True=本地谷歌 Chrome,False=比特浏览器 # 本地谷歌 Chrome 配置(仅当 USE_LOCAL_CHROME=True 时生效) CHROME_DEBUG_PORT = 9222 # 调试端口;若为 None 则由脚本自动启动 Chrome CHROME_PATH = None # 例如 r"C:\Program Files\Google\Chrome\Application\chrome.exe",None 用系统默认 # 比特浏览器配置(仅当 USE_LOCAL_CHROME=False 时生效) BIT_API_BASE = "http://127.0.0.1:54345" BROWSER_NAME = "测试1" BROWSER_ID = None def _connect_local_chrome(): """连接或启动本地谷歌 Chrome,返回 ChromiumPage。""" from DrissionPage import ChromiumPage, ChromiumOptions co = ChromiumOptions() if CHROME_PATH: co.set_browser_path(CHROME_PATH) if CHROME_DEBUG_PORT is not None: # 连接已开启调试端口的 Chrome(需先手动启动:chrome --remote-debugging-port=9222) co.set_local_port(CHROME_DEBUG_PORT) print(f"正在连接本机 Chrome(调试端口 {CHROME_DEBUG_PORT})...") page = ChromiumPage(addr_or_opts=co) else: # 由 DrissionPage 自动启动 Chrome print("正在启动本地谷歌 Chrome...") page = ChromiumPage(addr_or_opts=co) print("已连接本地 Chrome。") return page def _connect_bit_browser( *, bit_api_base: Optional[str] = None, browser_name: Optional[str] = None, browser_id: Optional[str] = None, ): """通过比特浏览器 API 打开并连接,返回 ChromiumPage。""" from worker.bit_browser import BitBrowserAPI from DrissionPage import ChromiumPage, ChromiumOptions print("正在连接比特浏览器 API...") bit_api = BitBrowserAPI(bit_api_base or BIT_API_BASE) print("正在打开比特浏览器...") cdp_addr, port, browser_id = bit_api.open_browser( browser_id=browser_id if browser_id is not None else BROWSER_ID, name=browser_name if browser_name is not None else BROWSER_NAME, remark=None ) print(f"已打开浏览器 ID={browser_id}, CDP 端口={port}") co = ChromiumOptions().set_local_port(port=port) return ChromiumPage(addr_or_opts=co) # 自动获取「所有岗位」时使用的选择器(按顺序尝试,取第一个能匹配到 1~100 个元素的结果) # 若页面结构变化导致拿不到岗位列表,可在此追加或调整选择器 JOB_LIST_SELECTORS = [ "x://*[contains(@class,'job-item') or contains(@class,'position-item')]", "x://li[contains(@class,'job')]", "x://div[contains(@class,'job-list')]/div", "x://ul[contains(@class,'job')]/li", "x://*[contains(@class,'recommend-job')]//*[contains(@class,'item')]", "x://*[contains(@class,'job-list')]/*", "x://a[contains(@href,'job')]", ] def _get_container(page): """推荐牛人内容在 iframe recommendFrame 内,统一在此取容器。""" try: return page.get_frame("recommendFrame") except Exception: return page def _get_all_position_elements(container): """在推荐页 iframe 内获取左侧「所有岗位」可点击元素列表;找不到则返回空列表。""" for sel in JOB_LIST_SELECTORS: try: eles = container.eles(sel, timeout=2) if eles and 1 <= len(eles) <= 100: return eles except Exception: continue return [] def _apply_filter_and_confirm(container, filters): """在容器内点「筛选」、选条件、点「确定」。""" container.ele("x://*[contains(text(),'筛选')]").click() time.sleep(2) for item in filters: container.ele(f"x://*[contains(text(),'{item}')]").click() time.sleep(random.random() * 1.5) container.ele("x://*[contains(text(),'确定')]").click() def _geek_key(item): """牛人去重键:优先 encryptGeekId / geekId,否则用姓名。""" card = item.get("geekCard") or {} return card.get("encryptGeekId") or card.get("geekId") or card.get("geekName") or "" def _greet_one_geek(page, container, item): """对单个牛人:找姓名 → 滚动到视图 → 点打招呼。返回是否成功。""" geekName = (item.get("geekCard") or {}).get("geekName", "") if not geekName: return False name_ele = container.ele(f'x://span[contains(text(),"{geekName}")]', timeout=5) if not name_ele: name_ele = container.ele(f'x://span[text()="{geekName}"]', timeout=2) if not name_ele: print(f" 跳过未找到:{geekName}") return False name_ele.run_js("this.scrollIntoView()") name_ele.click() time.sleep(3) a = page.get_frame(1) time.sleep(random.uniform(0.5, 5)) a.ele(f'x://*[contains(text(),"打招呼")]', timeout=2).click(by_js=True) time.sleep(random.uniform(0.5, 5)) a.ele(f'x://*[contains(text(),"收藏")]', timeout=2).click(by_js=True) time.sleep(random.uniform(0.5, 5)) for i in range(3): a.ele('x://*[@class="btn-v2 btn-outline-v2"]').click(by_js=True) # a.ele(f'x://*[contains(text(),"继续沟通")]', timeout=2).click(by_js=True) time.sleep(random.uniform(0.5, 5)) page.ele(f'x://*[@data-placeholder="快速回复"]', timeout=2).input("我司正在招聘爬虫工程师,有兴趣了解一下吗") page.ele(f'x://*[contains(text(),"发送")]', timeout=2).click() time.sleep(random.uniform(0.5, 5)) page.ele('x://*[@class="iboss iboss-close"]').click() time.sleep(random.uniform(0.5, 5)) a.ele('x://*[@class="boss-popup__close"]').click() time.sleep(random.uniform(0.5, 5)) return True def _greet_geek_list_skip_greeted(page, container, geek_list, greeted_keys): """ 对当前包里的牛人列表逐个打招呼,只打尚未在 greeted_keys 里的人,并写入 greeted_keys。 返回本次新打招呼的人数。 """ n = 0 for item in geek_list or []: k = _geek_key(item) if not k or k in greeted_keys: continue if _greet_one_geek(page, container, item): greeted_keys.add(k) n += 1 return n def _dedupe_filter_values(items: List[str]) -> List[str]: seen = set() result: List[str] = [] for item in items: value = str(item or "").strip() if not value or value in seen: continue seen.add(value) result.append(value) return result def _parse_recommend_filters(raw_filters: List[dict]) -> Dict[str, Any]: groups: List[dict] = [] flat_options: List[str] = [] display_filters: Dict[str, Any] = {} for index, item in enumerate(raw_filters or []): if not isinstance(item, dict): continue name = str(item.get("name", "")).strip() if not name: continue options: List[str] = [] for option in item.get("options") or []: if not isinstance(option, dict): continue option_name = str(option.get("name", "")).strip() if option_name: options.append(option_name) options = _dedupe_filter_values(options) group: Dict[str, Any] = { "name": name, "order": index, "options": options, } start = item.get("start") end = item.get("end") try: if start is not None and end is not None: range_payload = {"start": int(start), "end": int(end)} group["range"] = range_payload display_filters[name] = range_payload else: display_filters[name] = options except Exception: display_filters[name] = options groups.append(group) flat_options.extend(options) return { "groups": groups, "flat_options": _dedupe_filter_values(flat_options), "display_filters": display_filters, "raw_payload": {"filters": raw_filters or []}, } def fetch_recommend_filters(page, timeout_sec: int = 30) -> Dict[str, Any]: """抓取推荐页筛选项并返回结构化数据。""" page.listen.start('wapi/zpblock/recommend/filters') page.get("https://www.zhipin.com/web/chat/recommend") res = page.listen.wait(timeout=timeout_sec) body = getattr(getattr(res, "response", None), "body", None) or {} zp_data = body.get("zpData", {}) if isinstance(body, dict) else {} vip_filter = zp_data.get("vipFilter", {}) if isinstance(zp_data, dict) else {} raw_filters = vip_filter.get("filters", []) if isinstance(vip_filter, dict) else [] if not isinstance(raw_filters, list): raw_filters = [] return _parse_recommend_filters(raw_filters) def main(filters, position_names=None, greet_target=None): """ 推荐牛人流程:多岗位循环,所有岗位合计达到目标人数即停;不够则从第一个岗位再跑一轮直到够了。 - filters: 筛选条件列表,如 ["初中及以下", "离职-随时到岗"];为空则不做筛选。 - position_names: 若传入列表则按名称点击这些岗位;若为 None 则自动获取页面所有岗位并全部循环一遍。 - greet_target: 所有岗位合计目标打招呼人数,如 50。每岗位只抓当前包(不滚动);一轮跑完若未达目标则从第一个岗位再跑一轮,直到总人数够了或一轮无新增为止。 """ if USE_LOCAL_CHROME: page = _connect_local_chrome() else: page = _connect_bit_browser() page.listen.start('wapi/zpjob/rec/geek/list') page.get("https://www.zhipin.com/web/chat/recommend") time.sleep(2) container = _get_container(page) if position_names: positions = [("name", name) for name in position_names] else: job_eles = _get_all_position_elements(container) if job_eles: n = len(job_eles) print(f"自动识别到 {n} 个岗位,将依次处理") positions = [("index", i) for i in range(n)] else: positions = [("current", None)] greeted_keys = set() total_greeted = 0 round_num = 0 while True: round_num += 1 if greet_target is not None: print(f"--- 第 {round_num} 轮,当前已打招呼 {total_greeted}/{greet_target} ---") round_added = 0 for pos_type, pos_value in positions: if greet_target is not None and total_greeted >= greet_target: break container = _get_container(page) label = "当前" if pos_type == "name": job_ele = container.ele(f'x://*[contains(text(),"{pos_value}")]', timeout=5) if not job_ele: print(f"未找到岗位:{pos_value},跳过") continue job_ele.click(by_js=True) label = pos_value elif pos_type == "index": job_eles = _get_all_position_elements(container) if pos_value >= len(job_eles): continue job_eles[pos_value].click(by_js=True) label = f"第{pos_value + 1}个岗位" time.sleep(2) container = _get_container(page) if filters: _apply_filter_and_confirm(container, filters) packets = page.listen.wait(count=2, timeout=30) res = packets[-1] if packets else None else: res = page.listen.wait(timeout=30) if not res: print(f"岗位「{label}」未捕获到 geek/list,跳过") continue geek_list = res.response.body.get("zpData", {}).get("geekList") or [] if not geek_list: continue n = _greet_geek_list_skip_greeted(page, container, geek_list, greeted_keys) total_greeted += n round_added += n if n: print(f"岗位「{label}」本包 {len(geek_list)} 人,新打招呼 {n} 人,累计 {total_greeted}" + ( f"/{greet_target}" if greet_target else "")) if greet_target is None: break if total_greeted >= greet_target: print(f"已达目标,共打招呼 {total_greeted} 人") break if round_added == 0: print(f"本轮无新增,已打招呼 {total_greeted} 人,结束") break time.sleep(1) def main1( *, use_local_chrome: Optional[bool] = None, bit_api_base: Optional[str] = None, browser_name: Optional[str] = None, browser_id: Optional[str] = None, timeout_sec: int = 30, echo: bool = True, ): use_local = USE_LOCAL_CHROME if use_local_chrome is None else use_local_chrome if use_local: page = _connect_local_chrome() else: page = _connect_bit_browser( bit_api_base=bit_api_base, browser_name=browser_name, browser_id=browser_id, ) payload = fetch_recommend_filters(page, timeout_sec=timeout_sec) if echo: print(payload["display_filters"]) return payload if __name__ == "__main__": # greet_target=50:所有岗位合计打招呼 50 人;一轮不够则从第一个岗位再跑,直到够了或一轮无新增 main(filters=["初中及以下", "离职-随时到岗"], greet_target=35) # main1() # main2()