Files
boss_dp/worker/tasks/boss_recruit.py
ddrwode 6c31e334a3 haha
2026-03-06 13:37:58 +08:00

529 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
BOSS recruit task handler — aligned with `1.py -> main`.
Flow:
1) Connect to BitBrowser using browser_name (= account_name, injected by server)
2) Open recommend page, start listening for geek/list API
3) Iterate positions (by name, by index, or current)
4) Apply selected filters (optional — skip if empty)
5) Capture geek list from intercepted API response
6) Greet each candidate (scroll, click, greet, favourite, send message, close)
7) Repeat rounds until target count reached or no new additions
"""
from __future__ import annotations
import asyncio
import json
import random
import time
from datetime import datetime
from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple
from common.protocol import TaskType
from worker.bit_browser import BitBrowserAPI
from worker.tasks.base import BaseTaskHandler, TaskCancelledError
RECOMMEND_URL = "https://www.zhipin.com/web/chat/recommend"
GEEK_LIST_API = "wapi/zpjob/rec/geek/list"
FAST_REPLY_TEXT = "您好,我们目前有相关岗位机会,方便了解一下吗?"
# Keep selectors consistent with 1.py logic.
JOB_LIST_SELECTORS = [
"x://*[contains(@class,'job-item') or contains(@class,'position-item')]",
"x://li[contains(@class,'job')]",
"x://div[contains(@class,'job-list')]/div",
"x://ul[contains(@class,'job')]/li",
"x://*[contains(@class,'recommend-job')]//*[contains(@class,'item')]",
"x://*[contains(@class,'job-list')]/*",
"x://a[contains(@href,'job')]",
]
class BossRecruitHandler(BaseTaskHandler):
"""Recruit by greeting candidates from recommend list."""
task_type = TaskType.BOSS_RECRUIT.value
async def execute(
self,
task_id: str,
params: Dict[str, Any],
progress_cb: Callable[[str, str], Coroutine],
) -> Any:
job_title = str(params.get("job_title", "") or "").strip() or "相关岗位"
# account_name 实际就是 browser_name由服务端从 BossAccount.browser_name 注入)
account_name = str(params.get("account_name", "") or "").strip()
bit_api_base = str(params.get("bit_api_base", "http://127.0.0.1:54345") or "").strip()
cancel_event = params.get("_cancel_event")
worker_id = str(params.get("worker_id", "") or "").strip()
# 筛选条件可选——为空则跳过筛选步骤(与 1.py 一致)
selected_filters = self._normalize_selected_filters(params)
greet_target = self._parse_positive_int(params.get("greet_target"), default=20)
position_names = self._normalize_string_list(params.get("position_names"))
self.ensure_not_cancelled(cancel_event)
await progress_cb(task_id, f"正在启动招聘流程,目标打招呼人数: {greet_target}")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
self._run_sync,
job_title,
account_name,
bit_api_base,
selected_filters,
position_names,
greet_target,
worker_id,
cancel_event,
)
return result
# ────────────────────── 同步主流程 ──────────────────────
def _run_sync(
self,
job_title: str,
account_name: str,
bit_api_base: str,
selected_filters: List[str],
position_names: List[str],
greet_target: int,
worker_id: str,
cancel_event,
) -> Dict[str, Any]:
self.ensure_not_cancelled(cancel_event)
# 与 1.py 一致:用 ChromiumPage + set_local_port 直接连接
page = self._connect_browser(bit_api_base, account_name)
flow = self._run_main_flow(
page=page,
job_title=job_title,
selected_filters=selected_filters,
position_names=position_names,
greet_target=greet_target,
worker_id=worker_id,
account_name=account_name,
cancel_event=cancel_event,
)
errors = flow.get("errors", [])
return {
"job_title": job_title,
"target_greet_count": greet_target,
"actual_greeted": flow.get("actual_greeted", 0),
"rounds": flow.get("rounds", 0),
"selected_filters": selected_filters,
"positions": flow.get("positions", []),
"details": flow.get("details", []),
"contact_records_created": flow.get("contact_records_created", 0),
"error_count": len(errors),
"errors": errors[:20],
"success": len(errors) == 0,
}
@staticmethod
def _connect_browser(bit_api_base: str, browser_name: str):
"""
与 1.py _connect_bit_browser 一致:
通过 BitBrowserAPI 打开浏览器,用 ChromiumPage + set_local_port 连接。
"""
from DrissionPage import ChromiumPage, ChromiumOptions
bit_api = BitBrowserAPI(bit_api_base)
cdp_addr, port, browser_id = bit_api.open_browser(name=browser_name)
co = ChromiumOptions().set_local_port(port=port)
return ChromiumPage(addr_or_opts=co)
# ────────────────────── 主招聘循环(对齐 1.py main ──────────────────────
def _run_main_flow(
self,
*,
page,
job_title: str,
selected_filters: List[str],
position_names: List[str],
greet_target: int,
worker_id: str,
account_name: str,
cancel_event,
) -> Dict[str, Any]:
details: List[dict] = []
errors: List[str] = []
self.ensure_not_cancelled(cancel_event)
# 1) 开始监听 geek/list API + 打开推荐页(与 1.py 一致)
page.listen.start(GEEK_LIST_API)
page.get(RECOMMEND_URL)
time.sleep(2)
container = self._get_container(page)
positions = self._build_positions(container, position_names)
if not positions:
return {
"actual_greeted": 0,
"rounds": 0,
"positions": [],
"details": [],
"contact_records_created": 0,
"errors": ["未识别到岗位列表,无法开始招聘"],
}
greeted_keys: set[str] = set()
total_greeted = 0
round_num = 0
contact_records_created = 0
while True:
self.ensure_not_cancelled(cancel_event)
round_num += 1
round_added = 0
for pos_type, pos_value in positions:
self.ensure_not_cancelled(cancel_event)
if total_greeted >= greet_target:
break
container = self._get_container(page)
label = self._activate_position(container, pos_type, pos_value)
if not label:
continue
time.sleep(2)
container = self._get_container(page)
# 2) 捕获 geek/list 包
geek_list = self._load_and_extract_geek_list(
page, container, selected_filters
)
if not geek_list:
continue
# 3) 逐个打招呼
added, new_records = self._greet_geek_list(
page=page,
container=container,
geek_list=geek_list,
greeted_keys=greeted_keys,
position_label=label,
default_job=job_title,
cancel_event=cancel_event,
)
if not added:
continue
total_greeted += added
round_added += added
details.extend(new_records)
contact_records_created += self._save_new_greet_contacts(
records=new_records,
worker_id=worker_id,
account_name=account_name,
)
# 退出条件(与 1.py 一致)
if total_greeted >= greet_target:
break
if round_added == 0:
break
time.sleep(1)
return {
"actual_greeted": total_greeted,
"rounds": round_num,
"positions": [self._position_label(t, v) for t, v in positions],
"details": details,
"contact_records_created": contact_records_created,
"errors": errors,
}
# ────────────────────── 容器 & 岗位 ──────────────────────
@staticmethod
def _get_container(page):
"""推荐牛人内容在 iframe recommendFrame 内。"""
try:
return page.get_frame("recommendFrame")
except Exception:
return page
def _get_all_position_elements(self, container) -> List[Any]:
for selector in JOB_LIST_SELECTORS:
try:
eles = container.eles(selector, timeout=2)
if eles and 1 <= len(eles) <= 100:
return eles
except Exception:
continue
return []
def _build_positions(self, container, position_names: List[str]) -> List[Tuple[str, Any]]:
if position_names:
return [("name", name) for name in position_names]
job_eles = self._get_all_position_elements(container)
if not job_eles:
return [("current", None)]
return [("index", i) for i in range(len(job_eles))]
@staticmethod
def _position_label(pos_type: str, pos_value: Any) -> str:
if pos_type == "name":
return str(pos_value)
if pos_type == "index":
return f"岗位{int(pos_value) + 1}"
return "当前岗位"
def _activate_position(self, container, pos_type: str, pos_value: Any) -> str:
"""
切换到指定岗位。与 1.py 一致,使用 click(by_js=True)。
"""
label = self._position_label(pos_type, pos_value)
try:
if pos_type == "name":
ele = container.ele(f'x://*[contains(text(),"{pos_value}")]', timeout=5)
if not ele:
return ""
ele.click(by_js=True)
elif pos_type == "index":
job_eles = self._get_all_position_elements(container)
if int(pos_value) >= len(job_eles):
return ""
job_eles[int(pos_value)].click(by_js=True)
return label
except Exception:
return ""
# ────────────────────── 筛选 & 抓包 ──────────────────────
def _apply_filter_and_confirm(self, container, selected_filters: List[str]) -> None:
"""与 1.py _apply_filter_and_confirm 一致。"""
container.ele("x://*[contains(text(),'筛选')]").click()
time.sleep(2)
for item in selected_filters:
container.ele(f"x://*[contains(text(),'{item}')]").click()
time.sleep(random.random() * 1.5)
container.ele("x://*[contains(text(),'确定')]").click()
def _load_and_extract_geek_list(
self, page, container, selected_filters: List[str]
) -> List[dict]:
"""
与 1.py 一致的抓包逻辑:
- 有筛选条件 → 点筛选 → wait(count=2) → 取最后一个包
- 无筛选条件 → wait() → 取第一个包
"""
if selected_filters:
self._apply_filter_and_confirm(container, selected_filters)
packets = page.listen.wait(count=2, timeout=30)
res = packets[-1] if packets else None
else:
res = page.listen.wait(timeout=30)
if not res:
return []
return self._extract_geek_list_from_packet(res)
@staticmethod
def _extract_geek_list_from_packet(packet) -> List[dict]:
"""从监听到的包中提取 geekList。"""
if not packet:
return []
body = getattr(getattr(packet, "response", None), "body", None)
if isinstance(body, str):
try:
body = json.loads(body)
except Exception:
return []
if not isinstance(body, dict):
return []
geek_list = body.get("zpData", {}).get("geekList", [])
return geek_list if isinstance(geek_list, list) else []
# ────────────────────── 打招呼(对齐 1.py ──────────────────────
@staticmethod
def _geek_key(item: dict) -> str:
"""牛人去重键(与 1.py _geek_key 一致)。"""
card = item.get("geekCard") or {}
return str(
card.get("encryptGeekId")
or card.get("geekId")
or card.get("geekName")
or ""
).strip()
def _greet_geek_list(
self,
*,
page,
container,
geek_list: List[dict],
greeted_keys: set[str],
position_label: str,
default_job: str,
cancel_event,
) -> Tuple[int, List[dict]]:
"""与 1.py _greet_geek_list_skip_greeted 一致的去重逻辑。"""
added = 0
records: List[dict] = []
for item in geek_list or []:
self.ensure_not_cancelled(cancel_event)
geek_key = self._geek_key(item)
if not geek_key or geek_key in greeted_keys:
continue
if not self._greet_one_geek(page, container, item):
continue
greeted_keys.add(geek_key)
added += 1
card = item.get("geekCard") or {}
name = str(card.get("geekName", "")).strip() or geek_key
position = str(card.get("expectPositionName", "")).strip() or position_label or default_job
records.append(
{
"name": name,
"position": position,
"geek_key": geek_key,
"source": "new_greet",
"greeted_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
)
return added, records
def _greet_one_geek(self, page, container, item: dict) -> bool:
"""
对单个牛人打招呼,完全对齐 1.py _greet_one_geek 逻辑:
1. 找姓名 → scrollIntoView → 点击姓名
2. 获取 frame(1)(右侧面板)
3. 点击"打招呼"
4. 点击"收藏"
5. 点击 3 次 btn-outline 按钮
6. 输入快速回复 → 发送
7. 关闭侧边栏 + 关闭弹窗
"""
geek_name = str((item.get("geekCard") or {}).get("geekName", "")).strip()
if not geek_name:
return False
try:
# 查找姓名元素(与 1.py 一致的双重查找)
name_ele = container.ele(f'x://span[contains(text(),"{geek_name}")]', timeout=5)
if not name_ele:
name_ele = container.ele(f'x://span[text()="{geek_name}"]', timeout=2)
if not name_ele:
return False
# scrollIntoView + 点击(与 1.py 一致)
name_ele.run_js("this.scrollIntoView()")
name_ele.click()
time.sleep(3)
# 获取右侧面板(与 1.py 一致page.get_frame(1)
panel = page.get_frame(1)
time.sleep(random.uniform(0.5, 1.2))
# 打招呼(与 1.py 一致by_js=True
panel.ele('x://*[contains(text(),"打招呼")]', timeout=2).click(by_js=True)
time.sleep(random.uniform(0.5, 1.2))
# 收藏(与 1.py 一致)
panel.ele('x://*[contains(text(),"收藏")]', timeout=2).click(by_js=True)
time.sleep(random.uniform(0.5, 1.2))
# 点击 3 次 btn-outline 按钮(与 1.py 一致)
for _ in range(3):
panel.ele('x://*[@class="btn-v2 btn-outline-v2"]', timeout=2).click(by_js=True)
time.sleep(random.uniform(0.5, 1.2))
# 快速回复(与 1.py 一致)
page.ele('x://*[@data-placeholder="快速回复"]', timeout=2).input(FAST_REPLY_TEXT)
page.ele('x://*[contains(text(),"发送")]', timeout=2).click()
time.sleep(random.uniform(0.5, 1.2))
# 关闭侧边栏(与 1.py 一致)
page.ele('x://*[@class="iboss iboss-close"]').click()
time.sleep(random.uniform(0.5, 1.2))
# 关闭弹窗(与 1.py 一致)
panel.ele('x://*[@class="boss-popup__close"]').click()
time.sleep(random.uniform(0.5, 1.2))
return True
except Exception:
return False
# ────────────────────── 联系记录入库 ──────────────────────
def _save_new_greet_contacts(self, *, records: List[dict], worker_id: str, account_name: str) -> int:
if not records:
return 0
from django.utils import timezone
from server.models import ContactRecord
created = 0
for rec in records:
geek_key = str(rec.get("geek_key", "")).strip()
if not geek_key:
continue
contact_key = f"greet:{geek_key}"
defaults = {
"name": str(rec.get("name", "")).strip() or contact_key,
"position": str(rec.get("position", "")).strip(),
"reply_status": "新打招呼",
"wechat_exchanged": False,
"worker_id": worker_id,
"contacted_at": timezone.now(),
"notes": f"新打招呼记录; account={account_name}",
}
obj, was_created = ContactRecord.objects.update_or_create(
contact=contact_key,
defaults=defaults,
)
if was_created:
created += 1
elif not obj.contacted_at:
obj.contacted_at = timezone.now()
obj.save(update_fields=["contacted_at"])
return created
# ────────────────────── 工具方法 ──────────────────────
@staticmethod
def _normalize_string_list(value: Any) -> List[str]:
if value is None:
return []
if isinstance(value, list):
return [str(v).strip() for v in value if str(v).strip()]
if isinstance(value, str):
raw = value.strip()
if not raw:
return []
if raw.startswith("[") and raw.endswith("]"):
try:
parsed = json.loads(raw)
if isinstance(parsed, list):
return [str(v).strip() for v in parsed if str(v).strip()]
except Exception:
pass
return [raw]
return []
def _normalize_selected_filters(self, params: Dict[str, Any]) -> List[str]:
filters = params.get("selected_filters")
if filters is None:
filters = params.get("filters")
return self._normalize_string_list(filters)
@staticmethod
def _parse_positive_int(value: Any, default: int) -> int:
try:
parsed = int(value)
return parsed if parsed > 0 else default
except Exception:
return default