Files
boss_dp/1.py
ddrwode 7b351039f8 haha
2026-03-06 10:47:46 +08:00

375 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
本地脚本:由 DrissionPage (DP) 控制浏览器(本地谷歌 Chrome 或比特浏览器)。
- 使用本地谷歌USE_LOCAL_CHROME = True会启动/连接本机 Chrome。
- 使用比特浏览器USE_LOCAL_CHROME = False需先启动比特浏览器客户端API 端口 54345
"""
from __future__ import annotations
import random
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
# 保证从项目根目录运行时可导入 worker 包
_ROOT = Path(__file__).resolve().parent
if str(_ROOT) not in sys.path:
sys.path.insert(0, str(_ROOT))
# ---------- 选择控制对象 ----------
USE_LOCAL_CHROME = True # True=本地谷歌 ChromeFalse=比特浏览器
# 本地谷歌 Chrome 配置(仅当 USE_LOCAL_CHROME=True 时生效)
CHROME_DEBUG_PORT = 9222 # 调试端口;若为 None 则由脚本自动启动 Chrome
CHROME_PATH = None # 例如 r"C:\Program Files\Google\Chrome\Application\chrome.exe"None 用系统默认
# 比特浏览器配置(仅当 USE_LOCAL_CHROME=False 时生效)
BIT_API_BASE = "http://127.0.0.1:54345"
BROWSER_NAME = "测试2"
BROWSER_ID = None
def _connect_local_chrome():
"""连接或启动本地谷歌 Chrome返回 ChromiumPage。"""
from DrissionPage import ChromiumPage, ChromiumOptions
co = ChromiumOptions()
if CHROME_PATH:
co.set_browser_path(CHROME_PATH)
if CHROME_DEBUG_PORT is not None:
# 连接已开启调试端口的 Chrome需先手动启动chrome --remote-debugging-port=9222
co.set_local_port(CHROME_DEBUG_PORT)
print(f"正在连接本机 Chrome调试端口 {CHROME_DEBUG_PORT}...")
page = ChromiumPage(addr_or_opts=co)
else:
# 由 DrissionPage 自动启动 Chrome
print("正在启动本地谷歌 Chrome...")
page = ChromiumPage(addr_or_opts=co)
print("已连接本地 Chrome。")
return page
def _connect_bit_browser(
*,
bit_api_base: Optional[str] = None,
browser_name: Optional[str] = None,
browser_id: Optional[str] = None,
):
"""通过比特浏览器 API 打开并连接,返回 ChromiumPage。"""
from worker.bit_browser import BitBrowserAPI
from DrissionPage import ChromiumPage, ChromiumOptions
print("正在连接比特浏览器 API...")
bit_api = BitBrowserAPI(bit_api_base or BIT_API_BASE)
print("正在打开比特浏览器...")
cdp_addr, port, browser_id = bit_api.open_browser(
browser_id=browser_id if browser_id is not None else BROWSER_ID,
name=browser_name if browser_name is not None else BROWSER_NAME,
remark=None
)
print(f"已打开浏览器 ID={browser_id}, CDP 端口={port}")
co = ChromiumOptions().set_local_port(port=port)
return ChromiumPage(addr_or_opts=co)
# 自动获取「所有岗位」时使用的选择器(按顺序尝试,取第一个能匹配到 1~100 个元素的结果)
# 若页面结构变化导致拿不到岗位列表,可在此追加或调整选择器
JOB_LIST_SELECTORS = [
"x://*[contains(@class,'job-item') or contains(@class,'position-item')]",
"x://li[contains(@class,'job')]",
"x://div[contains(@class,'job-list')]/div",
"x://ul[contains(@class,'job')]/li",
"x://*[contains(@class,'recommend-job')]//*[contains(@class,'item')]",
"x://*[contains(@class,'job-list')]/*",
"x://a[contains(@href,'job')]",
]
def _get_container(page):
"""推荐牛人内容在 iframe recommendFrame 内,统一在此取容器。"""
try:
return page.get_frame("recommendFrame")
except Exception:
return page
def _get_all_position_elements(container):
"""在推荐页 iframe 内获取左侧「所有岗位」可点击元素列表;找不到则返回空列表。"""
for sel in JOB_LIST_SELECTORS:
try:
eles = container.eles(sel, timeout=2)
if eles and 1 <= len(eles) <= 100:
return eles
except Exception:
continue
return []
def _apply_filter_and_confirm(container, filters):
"""在容器内点「筛选」、选条件、点「确定」。"""
container.ele("x://*[contains(text(),'筛选')]").click()
time.sleep(2)
for item in filters:
container.ele(f"x://*[contains(text(),'{item}')]").click()
time.sleep(random.random() * 1.5)
container.ele("x://*[contains(text(),'确定')]").click()
def _geek_key(item):
"""牛人去重键:优先 encryptGeekId / geekId否则用姓名。"""
card = item.get("geekCard") or {}
return card.get("encryptGeekId") or card.get("geekId") or card.get("geekName") or ""
def _greet_one_geek(page, container, item):
"""对单个牛人:找姓名 → 滚动到视图 → 点打招呼。返回是否成功。"""
geekName = (item.get("geekCard") or {}).get("geekName", "")
if not geekName:
return False
name_ele = container.ele(f'x://span[contains(text(),"{geekName}")]', timeout=5)
if not name_ele:
name_ele = container.ele(f'x://span[text()="{geekName}"]', timeout=2)
if not name_ele:
print(f" 跳过未找到:{geekName}")
return False
name_ele.run_js("this.scrollIntoView()")
name_ele.click()
time.sleep(3)
a = page.get_frame(1)
time.sleep(random.uniform(0.5, 1.2))
a.ele(f'x://*[contains(text(),"打招呼")]', timeout=2).click(by_js=True)
time.sleep(random.uniform(0.5, 1.2))
a.ele(f'x://*[contains(text(),"收藏")]', timeout=2).click(by_js=True)
time.sleep(random.uniform(0.5, 1.2))
for i in range(3):
a.ele(f'x://*[@class="btn-v2 btn-outline-v2"]', timeout=2).click(by_js=True)
time.sleep(random.uniform(0.5, 1.2))
page.ele(f'x://*[@data-placeholder="快速回复"]', timeout=2).input("我司正在招聘爬虫工程师,有兴趣了解一下吗")
page.ele(f'x://*[contains(text(),"发送")]', timeout=2).click()
time.sleep(random.uniform(0.5, 1.2))
page.ele('x://*[@class="iboss iboss-close"]').click()
time.sleep(random.uniform(0.5, 1.2))
a.ele('x://*[@class="boss-popup__close"]').click()
time.sleep(random.uniform(0.5, 1.2))
return True
def _greet_geek_list_skip_greeted(page, container, geek_list, greeted_keys):
"""
对当前包里的牛人列表逐个打招呼,只打尚未在 greeted_keys 里的人,并写入 greeted_keys。
返回本次新打招呼的人数。
"""
n = 0
for item in geek_list or []:
k = _geek_key(item)
if not k or k in greeted_keys:
continue
if _greet_one_geek(page, container, item):
greeted_keys.add(k)
n += 1
return n
def _dedupe_filter_values(items: List[str]) -> List[str]:
seen = set()
result: List[str] = []
for item in items:
value = str(item or "").strip()
if not value or value in seen:
continue
seen.add(value)
result.append(value)
return result
def _parse_recommend_filters(raw_filters: List[dict]) -> Dict[str, Any]:
groups: List[dict] = []
flat_options: List[str] = []
display_filters: Dict[str, Any] = {}
for index, item in enumerate(raw_filters or []):
if not isinstance(item, dict):
continue
name = str(item.get("name", "")).strip()
if not name:
continue
options: List[str] = []
for option in item.get("options") or []:
if not isinstance(option, dict):
continue
option_name = str(option.get("name", "")).strip()
if option_name:
options.append(option_name)
options = _dedupe_filter_values(options)
group: Dict[str, Any] = {
"name": name,
"order": index,
"options": options,
}
start = item.get("start")
end = item.get("end")
try:
if start is not None and end is not None:
range_payload = {"start": int(start), "end": int(end)}
group["range"] = range_payload
display_filters[name] = range_payload
else:
display_filters[name] = options
except Exception:
display_filters[name] = options
groups.append(group)
flat_options.extend(options)
return {
"groups": groups,
"flat_options": _dedupe_filter_values(flat_options),
"display_filters": display_filters,
"raw_payload": {"filters": raw_filters or []},
}
def fetch_recommend_filters(page, timeout_sec: int = 30) -> Dict[str, Any]:
"""抓取推荐页筛选项并返回结构化数据。"""
page.listen.start('wapi/zpblock/recommend/filters')
page.get("https://www.zhipin.com/web/chat/recommend")
res = page.listen.wait(timeout=timeout_sec)
body = getattr(getattr(res, "response", None), "body", None) or {}
zp_data = body.get("zpData", {}) if isinstance(body, dict) else {}
vip_filter = zp_data.get("vipFilter", {}) if isinstance(zp_data, dict) else {}
raw_filters = vip_filter.get("filters", []) if isinstance(vip_filter, dict) else []
if not isinstance(raw_filters, list):
raw_filters = []
return _parse_recommend_filters(raw_filters)
def main(filters, position_names=None, greet_target=None):
"""
推荐牛人流程:多岗位循环,所有岗位合计达到目标人数即停;不够则从第一个岗位再跑一轮直到够了。
- filters: 筛选条件列表,如 ["初中及以下", "离职-随时到岗"];为空则不做筛选。
- position_names: 若传入列表则按名称点击这些岗位;若为 None 则自动获取页面所有岗位并全部循环一遍。
- greet_target: 所有岗位合计目标打招呼人数,如 50。每岗位只抓当前包不滚动一轮跑完若未达目标则从第一个岗位再跑一轮直到总人数够了或一轮无新增为止。
"""
if USE_LOCAL_CHROME:
page = _connect_local_chrome()
else:
page = _connect_bit_browser()
page.listen.start('wapi/zpjob/rec/geek/list')
page.get("https://www.zhipin.com/web/chat/recommend")
time.sleep(2)
container = _get_container(page)
if position_names:
positions = [("name", name) for name in position_names]
else:
job_eles = _get_all_position_elements(container)
if job_eles:
n = len(job_eles)
print(f"自动识别到 {n} 个岗位,将依次处理")
positions = [("index", i) for i in range(n)]
else:
positions = [("current", None)]
greeted_keys = set()
total_greeted = 0
round_num = 0
while True:
round_num += 1
if greet_target is not None:
print(f"--- 第 {round_num} 轮,当前已打招呼 {total_greeted}/{greet_target} ---")
round_added = 0
for pos_type, pos_value in positions:
if greet_target is not None and total_greeted >= greet_target:
break
container = _get_container(page)
label = "当前"
if pos_type == "name":
job_ele = container.ele(f'x://*[contains(text(),"{pos_value}")]', timeout=5)
if not job_ele:
print(f"未找到岗位:{pos_value},跳过")
continue
job_ele.click(by_js=True)
label = pos_value
elif pos_type == "index":
job_eles = _get_all_position_elements(container)
if pos_value >= len(job_eles):
continue
job_eles[pos_value].click(by_js=True)
label = f"{pos_value + 1}个岗位"
time.sleep(2)
container = _get_container(page)
if filters:
_apply_filter_and_confirm(container, filters)
packets = page.listen.wait(count=2, timeout=30)
res = packets[-1] if packets else None
else:
res = page.listen.wait(timeout=30)
if not res:
print(f"岗位「{label}」未捕获到 geek/list跳过")
continue
geek_list = res.response.body.get("zpData", {}).get("geekList") or []
if not geek_list:
continue
n = _greet_geek_list_skip_greeted(page, container, geek_list, greeted_keys)
total_greeted += n
round_added += n
if n:
print(f"岗位「{label}」本包 {len(geek_list)} 人,新打招呼 {n} 人,累计 {total_greeted}" + (
f"/{greet_target}" if greet_target else ""))
if greet_target is None:
break
if total_greeted >= greet_target:
print(f"已达目标,共打招呼 {total_greeted}")
break
if round_added == 0:
print(f"本轮无新增,已打招呼 {total_greeted} 人,结束")
break
time.sleep(1)
def main1(
*,
use_local_chrome: Optional[bool] = None,
bit_api_base: Optional[str] = None,
browser_name: Optional[str] = None,
browser_id: Optional[str] = None,
timeout_sec: int = 30,
echo: bool = True,
):
use_local = USE_LOCAL_CHROME if use_local_chrome is None else use_local_chrome
if use_local:
page = _connect_local_chrome()
else:
page = _connect_bit_browser(
bit_api_base=bit_api_base,
browser_name=browser_name,
browser_id=browser_id,
)
payload = fetch_recommend_filters(page, timeout_sec=timeout_sec)
if echo:
print(payload["display_filters"])
return payload
if __name__ == "__main__":
# greet_target=50所有岗位合计打招呼 50 人;一轮不够则从第一个岗位再跑,直到够了或一轮无新增
main(filters=["初中及以下", "离职-随时到岗"], greet_target=50)
# main1()
# main2()