Files
boss_dp/worker/tasks/boss_recruit.py
ddrwode efb05ae172 haha
2026-03-03 10:20:02 +08:00

399 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
BOSS 直聘招聘任务处理器。
招聘流程与本地脚本 boss_dp/自动化.py 的 main 保持一致:
1) 监听并获取 friendList
2) 逐个点开会话并监听历史消息
3) 若历史消息中未出现“手机号/微信号”,则发送询问并执行“换微信”
"""
from __future__ import annotations
import asyncio
import json
import random
import re
import time
from typing import Any, Callable, Coroutine, Dict, List
from common.protocol import TaskType
from worker.bit_browser import BitBrowserAPI
from worker.browser_control import connect_browser
from worker.tasks.base import BaseTaskHandler
CHAT_INDEX_URL = "https://www.zhipin.com/web/chat/index"
FRIEND_LIST_API = "wapi/zprelation/friend/getBossFriendListV2"
HISTORY_API = "wapi/zpchat/boss/historyMsg"
ASK_WECHAT_TEXT = "后续沟通会更及时,您方便留一下您的微信号吗?我这边加您。"
CONTACT_KEYWORDS = ("手机号", "微信号")
EXCHANGE_CONFIRM_XPATH = (
"x://span[contains(text(),'确定与对方交换微信吗?')]/../div[@class='btn-box']/"
"span[contains(@class,'boss-btn-primary')]"
)
EXCHANGE_CONFIRM_XPATH_ASCII = (
"x://span[contains(text(),'确定与对方交换微信吗?')]/../div[@class='btn-box']/"
"span[contains(@class,'boss-btn-primary')]"
)
class BossRecruitHandler(BaseTaskHandler):
"""BOSS 直聘招聘自动化任务。"""
task_type = TaskType.BOSS_RECRUIT.value
async def execute(
self,
task_id: str,
params: Dict[str, Any],
progress_cb: Callable[[str, str], Coroutine],
) -> Any:
"""
执行 BOSS 招聘流程。
params:
- job_title: str 招聘岗位名称(用于结果展示)
- max_greet: int 最大处理会话数(默认 5
- account_name: str 比特浏览器窗口名(用于打开浏览器)
- account_id: str 比特浏览器窗口 ID可选优先级高于 name
- bit_api_base: str 比特浏览器 API 地址(可选)
"""
job_title = params.get("job_title", "相关岗位")
max_greet = params.get("max_greet", 5)
account_name = params.get("account_name", "")
account_id = params.get("account_id", "")
bit_api_base = params.get("bit_api_base", "http://127.0.0.1:54345")
await progress_cb(task_id, "正在打开比特浏览器...")
result = await asyncio.get_event_loop().run_in_executor(
None,
self._run_sync,
task_id,
job_title,
max_greet,
account_name,
account_id,
bit_api_base,
progress_cb,
)
return result
def _run_sync(
self,
task_id: str,
job_title: str,
max_greet: int,
account_name: str,
account_id: str,
bit_api_base: str,
progress_cb: Callable,
) -> dict:
"""同步执行浏览器自动化(在线程池中运行)。"""
_ = (task_id, progress_cb)
bit_api = BitBrowserAPI(bit_api_base)
addr, port = bit_api.get_browser_for_drission(
browser_id=account_id or None,
name=account_name or None,
)
self.logger.info("已打开浏览器, CDP: %s (port=%d)", addr, port)
browser = connect_browser(port=port)
tab = browser.latest_tab
flow_result = self._recruit_flow_like_script(tab, job_title, max_greet)
collected = flow_result["details"]
errors = flow_result["errors"]
wechat_set = {str(c.get("wechat", "")).strip() for c in collected if str(c.get("wechat", "")).strip()}
phone_set = {str(c.get("phone", "")).strip() for c in collected if str(c.get("phone", "")).strip()}
has_errors = bool(errors)
result = {
"job_title": job_title,
"total_processed": len(collected),
"wechat_collected": len(wechat_set),
"phone_collected": len(phone_set),
"details": collected,
"error_count": len(errors),
"errors": errors[:20],
"success": not has_errors,
}
if has_errors:
result["error"] = f"招聘流程出现 {len(errors)} 处错误"
return result
def _recruit_flow_like_script(self, tab, job_title: str, max_greet: int) -> dict:
"""按 boss_dp/自动化.py 的 main 流程执行。"""
collected: List[dict] = []
errors: List[str] = []
try:
friend_list = self._open_chat_and_fetch_friend_list(tab)
except Exception as e:
err = f"获取 friendList 失败: {e}"
self.logger.error(err)
return {"details": collected, "errors": [err]}
if not friend_list:
return {"details": collected, "errors": ["未拿到 friendList"]}
total = len(friend_list)
if isinstance(max_greet, int) and max_greet > 0:
total = min(total, max_greet)
self.logger.info("friendList 总数=%d,本次处理=%d", len(friend_list), total)
for i, friend in enumerate(friend_list[:total], start=1):
try:
name = str(friend.get("name", "")).strip() or f"候选人{i}"
friend_job_name = str(friend.get("jobName", "")).strip()
friend_job_id = str(friend.get("jobId", "")).strip()
tab.listen.start(HISTORY_API)
if not self._open_friend_chat_like_script(tab, name):
errors.append(f"[{name}] 未找到联系人或点击失败")
continue
messages = self._wait_history_messages(tab)
has_contact_keyword = self._has_contact_keyword(messages)
action_state = {
"asked_wechat": False,
"send_success": False,
"exchange_clicked": False,
"exchange_confirmed": False,
}
if not has_contact_keyword:
action_state = self._ask_and_exchange_wechat_like_script(tab)
contacts = self._extract_contacts(messages)
collected.append(
{
"name": name,
"job": friend_job_name or job_title,
"job_id": friend_job_id,
"wechat": contacts["wechat"],
"phone": contacts["phone"],
"has_contact_keyword": has_contact_keyword,
**action_state,
}
)
except Exception as e:
err_msg = f"处理第 {i} 个会话出错: {e}"
self.logger.error(err_msg)
errors.append(err_msg)
return {"details": collected, "errors": errors}
def _open_chat_and_fetch_friend_list(self, tab) -> list:
tab.listen.start(FRIEND_LIST_API)
tab.get(CHAT_INDEX_URL)
packet = tab.listen.wait()
body = self._packet_body(packet)
zp_data = body.get("zpData", {}) if isinstance(body, dict) else {}
friend_list = zp_data.get("friendList", []) if isinstance(zp_data, dict) else []
if isinstance(friend_list, list):
return friend_list
return []
@staticmethod
def _xpath_literal(value: str) -> str:
if "'" not in value:
return f"'{value}'"
if '"' not in value:
return f'"{value}"'
parts = value.split("'")
return "concat(" + ", \"'\", ".join(f"'{part}'" for part in parts) + ")"
def _open_friend_chat_like_script(self, tab, name: str) -> bool:
name_selector = f"x://span[text()={self._xpath_literal(name)}]"
ele = tab.ele(name_selector, timeout=2)
if not ele:
return False
try:
ele.run_js("this.scrollIntoView({block: 'center', behavior: 'auto'})")
except Exception:
pass
time.sleep(0.8)
try:
clickable = tab.ele(name_selector, timeout=2)
if not clickable:
return False
clickable.click(by_js=True)
return True
except Exception:
return False
def _wait_history_messages(self, tab) -> list:
packet = tab.listen.wait()
body = self._packet_body(packet)
zp_data = body.get("zpData", {}) if isinstance(body, dict) else {}
messages = zp_data.get("messages", []) if isinstance(zp_data, dict) else []
if isinstance(messages, list):
return messages
return []
def _ask_and_exchange_wechat_like_script(self, tab) -> dict:
state = {
"asked_wechat": False,
"send_success": False,
"exchange_clicked": False,
"exchange_confirmed": False,
}
input_box = tab.ele('x://*[@id="boss-chat-editor-input"]', timeout=2)
if not input_box:
return state
input_box.input(ASK_WECHAT_TEXT)
state["asked_wechat"] = True
time.sleep(random.randint(1, 3) + random.random())
state["send_success"] = self._click_send_like_script(tab)
time.sleep(random.randint(1, 5) + random.random())
state["exchange_clicked"] = self._click_change_wechat_like_script(tab)
if state["exchange_clicked"]:
time.sleep(random.randint(1, 2) + random.random())
state["exchange_confirmed"] = self._click_exchange_confirm_like_script(tab)
return state
def _click_send_like_script(self, tab) -> bool:
for selector in ('x://div[text()="发送"]', 'x://span[text()="发送"]'):
try:
btn = tab.ele(selector, timeout=1)
if btn:
btn.click()
return True
except Exception:
continue
return False
def _click_change_wechat_like_script(self, tab) -> bool:
try:
btn = tab.ele('x://span[text()="换微信"]', timeout=1)
if not btn:
return False
btn.click()
return True
except Exception:
return False
def _click_exchange_confirm_like_script(self, tab) -> bool:
try:
confirm = tab.ele(EXCHANGE_CONFIRM_XPATH, timeout=2)
if not confirm:
confirm = tab.ele(EXCHANGE_CONFIRM_XPATH_ASCII, timeout=2)
if not confirm:
return False
confirm.click(by_js=True)
return True
except Exception:
return False
@staticmethod
def _packet_body(packet) -> dict:
if not packet:
return {}
response = getattr(packet, "response", None)
body = getattr(response, "body", None) if response is not None else None
if isinstance(body, dict):
return body
if isinstance(body, str):
try:
parsed = json.loads(body)
if isinstance(parsed, dict):
return parsed
except Exception:
return {}
return {}
def _history_texts(self, messages: list) -> list[str]:
texts: list[str] = []
for msg in messages:
if not isinstance(msg, dict):
continue
body = msg.get("body")
text = ""
if isinstance(body, dict):
maybe_text = body.get("text")
if isinstance(maybe_text, str):
text = maybe_text
elif isinstance(msg.get("text"), str):
text = str(msg.get("text"))
text = text.strip()
if text:
texts.append(text)
return texts
def _has_contact_keyword(self, messages: list) -> bool:
for text in self._history_texts(messages):
if any(k in text for k in CONTACT_KEYWORDS):
return True
return False
def _extract_contacts(self, messages: list) -> dict:
wechats: list[str] = []
phones: list[str] = []
for text in self._history_texts(messages):
wechats.extend(self._extract_wechat(text))
phones.extend(self._extract_phone(text))
wechats = list(dict.fromkeys(wechats))
phones = list(dict.fromkeys(phones))
return {
"wechat": wechats[0] if wechats else "",
"phone": phones[0] if phones else "",
}
@staticmethod
def _extract_wechat(text: str) -> list:
if not text or not text.strip():
return []
found = []
patterns = [
r"微信号[:\s]*([a-zA-Z0-9_\-]{6,20})",
r"微信[:\s]*([a-zA-Z0-9_\-]{6,20})",
r"wx[:\s]*([a-zA-Z0-9_\-]{6,20})",
r"wechat[:\s]*([a-zA-Z0-9_\-]{6,20})",
r"([a-zA-Z][a-zA-Z0-9_\-]{5,19})",
]
for pattern in patterns:
for match in re.finditer(pattern, text, re.IGNORECASE):
value = match.group(1).strip() if match.lastindex else match.group(0).strip()
if value and value not in found and len(value) >= 6:
found.append(value)
return found[:3]
@staticmethod
def _extract_phone(text: str) -> list:
if not text or not text.strip():
return []
found = []
raw_candidates = re.findall(r"1[3-9][\d\-\s]{9,15}", text)
for raw in raw_candidates:
digits = re.sub(r"\D", "", raw)
if len(digits) == 11 and digits.startswith("1") and digits not in found:
found.append(digits)
return found[:3]