# -*- coding: utf-8 -*- """ BOSS 直聘招聘任务处理器。 招聘流程与本地脚本 boss_dp/自动化.py 的 main 保持一致: 1) 监听并获取 friendList 2) 逐个点开会话并监听历史消息 3) 若历史消息中未出现“手机号/微信号”,则发送询问并执行“换微信” """ from __future__ import annotations import asyncio import json import random import re import time from typing import Any, Callable, Coroutine, Dict, List, Optional from common.protocol import TaskType from worker.bit_browser import BitBrowserAPI from worker.browser_control import connect_browser from worker.tasks.base import BaseTaskHandler, TaskCancelledError CHAT_INDEX_URL = "https://www.zhipin.com/web/chat/index" FRIEND_LIST_API = "wapi/zprelation/friend/getBossFriendListV2" HISTORY_API = "wapi/zpchat/boss/historyMsg" ASK_WECHAT_TEXT = "后续沟通会更及时,您方便留一下您的微信号吗?我这边加您。" CONTACT_KEYWORDS = ("手机号", "微信号") EXCHANGE_CONFIRM_XPATH = ( "x://span[contains(text(),'确定与对方交换微信吗?')]/../div[@class='btn-box']/" "span[contains(@class,'boss-btn-primary')]" ) EXCHANGE_CONFIRM_XPATH_ASCII = ( "x://span[contains(text(),'确定与对方交换微信吗?')]/../div[@class='btn-box']/" "span[contains(@class,'boss-btn-primary')]" ) class BossReplyHandler(BaseTaskHandler): """BOSS 直聘招聘自动化任务。""" task_type = TaskType.BOSS_REPLY.value async def execute( self, task_id: str, params: Dict[str, Any], progress_cb: Callable[[str, str], Coroutine], ) -> Any: """ 执行 BOSS 招聘流程。 params: - job_title: str 招聘岗位名称(用于结果展示) - account_name: str 比特浏览器窗口名(用于打开浏览器) - account_id: str 比特浏览器窗口 ID(可选,优先级高于 name) - bit_api_base: str 比特浏览器 API 地址(可选) """ job_title = params.get("job_title", "相关岗位") account_name = params.get("account_name", "") account_id = params.get("account_id", "") bit_api_base = params.get("bit_api_base", "http://127.0.0.1:54345") cancel_event = params.get("_cancel_event") self.ensure_not_cancelled(cancel_event) await progress_cb(task_id, "正在打开比特浏览器...") result = await asyncio.get_event_loop().run_in_executor( None, self._run_sync, task_id, job_title, account_name, account_id, bit_api_base, progress_cb, cancel_event, ) return result def _run_sync( self, task_id: str, job_title: str, account_name: str, account_id: str, bit_api_base: str, progress_cb: Callable, cancel_event, ) -> dict: """同步执行浏览器自动化(在线程池中运行)。""" _ = (task_id, progress_cb) self.ensure_not_cancelled(cancel_event) bit_api = BitBrowserAPI(bit_api_base) addr, port = bit_api.get_browser_for_drission( browser_id=account_id or None, name=account_name or None, ) self.logger.info("已打开浏览器, CDP: %s (port=%d)", addr, port) browser = connect_browser(port=port) tab = browser.latest_tab flow_result = self._recruit_flow_like_script(tab, job_title, cancel_event) collected = flow_result["details"] errors = flow_result["errors"] wechat_set = {str(c.get("wechat", "")).strip() for c in collected if str(c.get("wechat", "")).strip()} phone_set = {str(c.get("phone", "")).strip() for c in collected if str(c.get("phone", "")).strip()} has_errors = bool(errors) result = { "job_title": job_title, "total_processed": len(collected), "wechat_collected": len(wechat_set), "phone_collected": len(phone_set), "details": collected, "error_count": len(errors), "errors": errors[:20], "success": not has_errors, } if has_errors: result["error"] = f"招聘流程出现 {len(errors)} 处错误" return result def _recruit_flow_like_script(self, tab, job_title: str, cancel_event) -> dict: """按 boss_dp/自动化.py 的 main 流程执行。""" collected: List[dict] = [] errors: List[str] = [] self.ensure_not_cancelled(cancel_event) try: friend_list = self._open_chat_and_fetch_friend_list(tab) except Exception as e: err = f"获取 friendList 失败: {e}" self.logger.error(err) return {"details": collected, "errors": [err]} if not friend_list: return {"details": collected, "errors": ["未拿到 friendList"]} # 应用筛选条件 friend_list = self._apply_filters(friend_list) total = len(friend_list) self.logger.info("friendList 筛选后=%d,本次处理=%d", len(friend_list), total) for i, friend in enumerate(friend_list[:total], start=1): try: self.ensure_not_cancelled(cancel_event) name = str(friend.get("name", "")).strip() or f"候选人{i}" friend_job_name = str(friend.get("jobName", "")).strip() friend_job_id = str(friend.get("jobId", "")).strip() tab.listen.start(HISTORY_API) if not self._open_friend_chat_like_script(tab, name): errors.append(f"[{name}] 未找到联系人或点击失败") continue messages = self._wait_history_messages(tab) self.ensure_not_cancelled(cancel_event) # 过滤掉自己发送的消息,只保留候选人消息 filtered_messages = self._filter_my_messages(messages) has_history_message = bool(filtered_messages) has_contact_keyword = self._has_contact_keyword(filtered_messages) contacts = self._extract_contacts(filtered_messages) contact_written = bool(contacts.get("wechat") or contacts.get("phone")) contact_fallback = self._build_friend_contact_fallback(friend, name, friend_job_id) action_state = { "asked_wechat": False, "send_success": False, "exchange_clicked": False, "exchange_confirmed": False, "follow_up_attempted": False, "follow_up_sent": False, "follow_up_day": 0, "follow_up_script_id": 0, "follow_up_content": "", "got_reply": False, "extracted_contact_from_reply": False, } # 先确保存在联系人记录(即使暂时没有微信/手机号,也要入库) contact_id = self._save_contact_record( name=name, job_name=friend_job_name, contacts=contacts, action_state=action_state, contact_fallback=contact_fallback, ) # 如果候选人此前发过消息,优先走复聊配置(第1天/第2天/...) if has_history_message and contact_id: self.ensure_not_cancelled(cancel_event) follow_state = self._handle_follow_up_chat(tab, name, friend_job_name, contact_id) action_state.update(follow_state) # 没有联系方式关键词且未成功发送复聊时,走默认索要微信逻辑 if not has_contact_keyword and not action_state.get("follow_up_sent", False): self.ensure_not_cancelled(cancel_event) ask_state = self._ask_and_exchange_wechat_like_script(tab) action_state["asked_wechat"] = action_state["asked_wechat"] or ask_state.get("asked_wechat", False) action_state["send_success"] = action_state["send_success"] or ask_state.get("send_success", False) action_state["exchange_clicked"] = action_state["exchange_clicked"] or ask_state.get("exchange_clicked", False) action_state["exchange_confirmed"] = ( action_state["exchange_confirmed"] or ask_state.get("exchange_confirmed", False) ) panel_texts = self._collect_chat_panel_texts(tab) contacts = self._extract_contacts(filtered_messages, extra_texts=panel_texts) contact_written = bool(contacts["wechat"] or contacts["phone"]) if has_contact_keyword and not contact_written: self.logger.warning( "[%s] 历史消息含联系方式关键词,但未提取到有效联系方式,疑似识别失败", name, ) # 二次更新联系人记录(补充回复状态/是否交换微信/提取到的新联系方式) contact_id = self._save_contact_record( name=name, job_name=friend_job_name, contacts=contacts, action_state=action_state, contact_fallback=contact_fallback, ) collected.append( { "name": name, "job": friend_job_name or job_title, "job_id": friend_job_id, "wechat": contacts["wechat"], "phone": contacts["phone"], "contact_written": contact_written, "contact_recorded": bool(contact_id), "has_history_message": has_history_message, "has_contact_keyword": has_contact_keyword, **action_state, } ) except TaskCancelledError: self.logger.info("任务执行中收到取消信号,提前结束") break except Exception as e: err_msg = f"处理第 {i} 个会话出错: {e}" self.logger.error(err_msg) errors.append(err_msg) finally: if i < total: self.ensure_not_cancelled(cancel_event) self._sleep_between_sessions() self.ensure_not_cancelled(cancel_event) return {"details": collected, "errors": errors} def _open_chat_and_fetch_friend_list(self, tab) -> list: tab.listen.start(FRIEND_LIST_API) tab.get(CHAT_INDEX_URL) packet = tab.listen.wait() body = self._packet_body(packet) zp_data = body.get("zpData", {}) if isinstance(body, dict) else {} friend_list = zp_data.get("friendList", []) if isinstance(zp_data, dict) else [] if isinstance(friend_list, list): return friend_list return [] @staticmethod def _build_friend_contact_fallback(friend: dict, name: str, job_id: str) -> str: """为未提取到微信/手机号的联系人生成稳定占位 contact。""" candidates = [ friend.get("encryptGeekId"), friend.get("geekId"), friend.get("friendId"), friend.get("uid"), friend.get("encryptUid"), friend.get("securityId"), ] for value in candidates: normalized = str(value or "").strip() if normalized: return f"boss_friend:{normalized}" base = f"{name}:{job_id}".strip(":") normalized_base = re.sub(r"\s+", "", base) or name or "unknown" return f"boss_friend:{normalized_base}" @staticmethod def _xpath_literal(value: str) -> str: if "'" not in value: return f"'{value}'" if '"' not in value: return f'"{value}"' parts = value.split("'") return "concat(" + ", \"'\", ".join(f"'{part}'" for part in parts) + ")" def _open_friend_chat_like_script(self, tab, name: str) -> bool: name_selector = f"x://span[text()={self._xpath_literal(name)}]" ele = tab.ele(name_selector, timeout=2) if not ele: return False try: ele.run_js("this.scrollIntoView({block: 'center', behavior: 'auto'})") except Exception: pass time.sleep(0.8) try: clickable = tab.ele(name_selector, timeout=2) if not clickable: return False clickable.click(by_js=True) return True except Exception: return False def _wait_history_messages(self, tab) -> list: packet = tab.listen.wait() body = self._packet_body(packet) zp_data = body.get("zpData", {}) if isinstance(body, dict) else {} messages = zp_data.get("messages", []) if isinstance(zp_data, dict) else [] if isinstance(messages, list): return messages return [] @staticmethod def _sleep_between_sessions() -> None: """会话间随机停顿,降低频繁切换带来的风控风险。""" time.sleep(random.uniform(1.8, 4.2)) def _ask_and_exchange_wechat_like_script(self, tab) -> dict: state = { "asked_wechat": False, "send_success": False, "exchange_clicked": False, "exchange_confirmed": False, } input_box = tab.ele('x://*[@id="boss-chat-editor-input"]', timeout=2) if not input_box: return state try: input_box.click(by_js=True) except Exception: pass try: input_box.clear() except Exception: pass input_box.input(ASK_WECHAT_TEXT) state["asked_wechat"] = True time.sleep(random.randint(1, 3) + random.random()) state["send_success"] = self._send_with_confirm( tab, input_box=input_box, message=ASK_WECHAT_TEXT, ) time.sleep(random.randint(1, 5) + random.random()) state["exchange_clicked"] = self._click_change_wechat_like_script(tab) if state["exchange_clicked"]: time.sleep(random.randint(1, 2) + random.random()) state["exchange_confirmed"] = self._click_exchange_confirm_like_script(tab) return state def _click_send_like_script(self, tab, input_box=None) -> bool: selectors = ( 'x://div[text()="发送"]', 'x://span[text()="发送"]', 'x://button[normalize-space(.)="发送"]', 'x://*[@role="button" and normalize-space(.)="发送"]', ) for _ in range(3): for selector in selectors: try: btn = tab.ele(selector, timeout=1) if not btn: continue try: btn.click(by_js=True) except Exception: btn.click() time.sleep(0.25) return True except Exception: continue time.sleep(0.25) if input_box: try: input_box.input("\n") time.sleep(0.25) return True except Exception: pass return False def _send_with_confirm(self, tab, input_box, message: str, max_attempts: int = 2) -> bool: """发送后检查末条消息,避免因确认误判导致重复补发。""" msg = (message or "").strip() if not msg: return False for _ in range(max_attempts): clicked = self._click_send_like_script(tab, input_box=input_box) if not clicked: continue if self._confirm_last_sent_message(tab, msg): return True # 已点击发送但未在列表中确认时,不直接补发,先判断输入框是否已清空。 # 若已清空,通常表示消息已发出,只是 UI 刷新慢或识别未命中,避免重复发送。 if not self._editor_has_text(input_box, msg): return True time.sleep(0.35) sent = self._confirm_last_sent_message(tab, msg) if sent: return True self._clear_editor(input_box) return False @staticmethod def _normalize_text(text: str) -> str: return re.sub(r"\s+", "", text or "") def _editor_has_text(self, input_box, expected: str = "") -> bool: """判断输入框是否仍残留指定文本。""" expected_norm = self._normalize_text(expected) current = "" try: current = input_box.run_js("return (this.innerText || this.textContent || this.value || '').trim();") except Exception: try: current = input_box.text except Exception: current = "" current_norm = self._normalize_text(str(current)) if not current_norm: return False if not expected_norm: return True return expected_norm in current_norm def _clear_editor(self, input_box) -> None: """清空聊天输入框,避免残留预输入内容。""" try: input_box.click(by_js=True) except Exception: pass try: input_box.clear() except Exception: pass try: input_box.run_js( "if (this.isContentEditable) { this.innerHTML=''; this.textContent=''; }" ) except Exception: pass def _confirm_last_sent_message(self, tab, message: str) -> bool: """确认当前聊天窗口末条是否为刚发送内容。""" msg = (message or "").strip() if not msg: return False target = re.sub(r"\s+", "", msg) try: for _ in range(6): items = tab.eles("css:.message-item", timeout=1) if not items: time.sleep(0.2) continue for e in reversed(items[-6:]): text = (e.text or "").strip() if not text: continue normalized = re.sub(r"\s+", "", text) is_boss = False try: if e.ele("css:.item-boss", timeout=0): is_boss = True except Exception: pass if is_boss and target in normalized: return True time.sleep(0.2) except Exception: return False return False def _click_change_wechat_like_script(self, tab) -> bool: try: btn = tab.ele('x://span[text()="换微信"]', timeout=1) if not btn: return False btn.click() return True except Exception: return False def _click_exchange_confirm_like_script(self, tab) -> bool: try: confirm = tab.ele(EXCHANGE_CONFIRM_XPATH, timeout=2) if not confirm: confirm = tab.ele(EXCHANGE_CONFIRM_XPATH_ASCII, timeout=2) if not confirm: return False confirm.click(by_js=True) return True except Exception: return False @staticmethod def _packet_body(packet) -> dict: if not packet: return {} response = getattr(packet, "response", None) body = getattr(response, "body", None) if response is not None else None if isinstance(body, dict): return body if isinstance(body, str): try: parsed = json.loads(body) if isinstance(parsed, dict): return parsed except Exception: return {} return {} def _history_texts(self, messages: list) -> list[str]: texts: list[str] = [] for msg in messages: if not isinstance(msg, dict): continue body = msg.get("body") text = "" if isinstance(body, dict): maybe_text = body.get("text") if isinstance(maybe_text, str): text = maybe_text elif isinstance(msg.get("text"), str): text = str(msg.get("text")) text = text.strip() if text: texts.append(text) return texts def _collect_chat_panel_texts(self, tab, max_items: int = 80) -> list[str]: """从当前聊天面板读取可见消息文本,补充接口历史消息。""" texts: list[str] = [] try: items = tab.eles("css:.message-item", timeout=1) except Exception: return texts if not items: return texts for e in items[-max_items:]: try: text = (e.text or "").strip() except Exception: text = "" if text: texts.append(text) return texts def _has_contact_keyword(self, messages: list) -> bool: for text in self._history_texts(messages): if any(k in text for k in CONTACT_KEYWORDS): return True return False def _extract_contacts(self, messages: list, extra_texts: Optional[list[str]] = None) -> dict: wechats: list[str] = [] phones: list[str] = [] all_texts = self._history_texts(messages) if extra_texts: all_texts.extend([str(t).strip() for t in extra_texts if str(t).strip()]) for text in all_texts: wechats.extend(self._extract_wechat(text)) phones.extend(self._extract_phone(text)) wechats = list(dict.fromkeys(wechats)) phones = list(dict.fromkeys(phones)) return { "wechat": wechats[0] if wechats else "", "phone": phones[0] if phones else "", } @staticmethod def _extract_wechat(text: str) -> list: if not text or not text.strip(): return [] found = [] patterns = [ r"微信号[::\s]*([a-zA-Z0-9_\-]{6,20})", r"微信[::\s]*([a-zA-Z0-9_\-]{6,20})", r"wx[::\s]*([a-zA-Z0-9_\-]{6,20})", r"wechat[::\s]*([a-zA-Z0-9_\-]{6,20})", ] for pattern in patterns: for match in re.finditer(pattern, text, re.IGNORECASE): value = match.group(1).strip() if match.lastindex else match.group(0).strip() if value and value not in found and len(value) >= 6: found.append(value) return found[:3] @staticmethod def _extract_phone(text: str) -> list: if not text or not text.strip(): return [] found = [] raw_candidates = re.findall(r"1[3-9][\d\-\s]{9,15}", text) for raw in raw_candidates: digits = re.sub(r"\D", "", raw) if len(digits) == 11 and digits.startswith("1") and digits not in found: found.append(digits) return found[:3] def _apply_filters(self, friend_list: list) -> list: """旧筛选配置已下线,当前直接返回原始候选人列表。""" self.logger.info("旧版筛选配置已移除,reply 流程不再额外过滤候选人") return friend_list def _filter_my_messages(self, messages: list) -> list: """过滤掉自己发送的消息,只保留对方的消息。""" filtered = [] for msg in messages: if not isinstance(msg, dict): continue # from_id 为 0 表示是对方发送的消息 from_id = msg.get("fromId", 0) if from_id == 0: filtered.append(msg) return filtered def _handle_follow_up_chat(self, tab, name: str, job_name: str, contact_id: int = None) -> dict: """处理复聊管理:命中第N天话术后发送,并尝试交换微信。""" result = { "send_success": False, "exchange_clicked": False, "exchange_confirmed": False, "follow_up_attempted": False, "follow_up_sent": False, "follow_up_day": 0, "follow_up_script_id": 0, "follow_up_content": "", "got_reply": False, "extracted_contact_from_reply": False, } if not contact_id: return result try: from server.models import FollowUpConfig, FollowUpScript, FollowUpRecord from django.utils import timezone config = FollowUpConfig.objects.filter( position=job_name, is_active=True, ).first() if not config: config = FollowUpConfig.objects.filter( position="通用", is_active=True, ).first() if not config: self.logger.info("[%s] 未找到复聊配置,跳过复聊", name) return result last_record = FollowUpRecord.objects.filter( contact_id=contact_id, config_id=config.id, ).order_by("-sent_at").first() if not last_record: day_number = 1 else: hours_since_last = (timezone.now() - last_record.sent_at).total_seconds() / 3600 last_script = FollowUpScript.objects.filter(id=last_record.script_id).first() if last_script and hours_since_last < last_script.interval_hours: self.logger.info("[%s] 距离上次复聊不足 %d 小时,跳过", name, last_script.interval_hours) return result day_number = last_record.day_number + 1 script = FollowUpScript.objects.filter( config_id=config.id, day_number=day_number, is_active=True, ).order_by("order").first() if not script: script = FollowUpScript.objects.filter( config_id=config.id, day_number=0, is_active=True, ).order_by("order").first() if not script: self.logger.info("[%s] 未找到第 %d 天的复聊话术", name, day_number) return result result["follow_up_attempted"] = True result["follow_up_day"] = day_number result["follow_up_script_id"] = int(script.id) result["follow_up_content"] = script.content send_success = self._send_message(tab, script.content) if not send_success: return result result["follow_up_sent"] = True result["send_success"] = True record = FollowUpRecord.objects.create( contact_id=contact_id, config_id=config.id, script_id=script.id, day_number=day_number, content=script.content, ) time.sleep(random.uniform(0.8, 1.5)) result["exchange_clicked"] = self._click_change_wechat_like_script(tab) if result["exchange_clicked"]: time.sleep(random.uniform(0.8, 1.5)) result["exchange_confirmed"] = self._click_exchange_confirm_like_script(tab) reply_info = self._wait_for_reply(tab, script.content) if reply_info["got_reply"]: result["got_reply"] = True result["extracted_contact_from_reply"] = reply_info["has_contact"] record.got_reply = True record.reply_content = reply_info["reply_text"] record.replied_at = timezone.now() record.save() self.logger.info("[%s] 第 %d 天复聊得到回复", name, day_number) except Exception as e: self.logger.error("复聊管理失败: %s", e) return result @staticmethod def _resolve_reply_status(action_state: dict) -> str: if action_state.get("got_reply"): return "已回复" if action_state.get("follow_up_attempted"): return "复聊中" if action_state.get("asked_wechat") or action_state.get("send_success"): return "待回复" return "未回复" def _save_contact_record( self, name: str, job_name: str, contacts: dict, action_state: dict, contact_fallback: str = "", ) -> Optional[int]: """保存或更新联系人记录(无联系方式时使用占位 contact)。""" try: from server.models import ContactRecord from django.utils import timezone actual_contact = str(contacts.get("wechat") or contacts.get("phone") or "").strip() fallback_contact = str(contact_fallback or "").strip() contact_value = actual_contact or fallback_contact if not contact_value: return None existing = None if fallback_contact: existing = ContactRecord.objects.filter(contact=fallback_contact).first() if not existing and actual_contact: existing = ContactRecord.objects.filter(contact=actual_contact).first() reply_status = self._resolve_reply_status(action_state) now = timezone.now() note_parts = [ "自动回复任务", f"微信:{contacts.get('wechat', '')}", f"手机:{contacts.get('phone', '')}", f"follow_day:{action_state.get('follow_up_day', 0)}", ] notes = "; ".join(note_parts) if existing: # 已有占位 contact,拿到真实联系方式后升级 contact 字段 if ( actual_contact and existing.contact != actual_contact and not ContactRecord.objects.filter(contact=actual_contact).exclude(pk=existing.pk).exists() ): existing.contact = actual_contact elif not existing.contact: existing.contact = contact_value existing.name = name or existing.name if job_name: existing.position = job_name existing.reply_status = reply_status existing.wechat_exchanged = existing.wechat_exchanged or bool(action_state.get("exchange_confirmed")) existing.contacted_at = now existing.notes = notes existing.save( update_fields=[ "contact", "name", "position", "reply_status", "wechat_exchanged", "contacted_at", "notes", ] ) self.logger.info("更新联系人记录: %s - %s", name, existing.contact) return existing.id record = ContactRecord.objects.create( name=name, position=job_name, contact=contact_value, reply_status=reply_status, wechat_exchanged=bool(action_state.get("exchange_confirmed")), contacted_at=now, notes=notes, ) self.logger.info("保存新联系人记录: %s - %s", name, contact_value) return record.id except Exception as e: self.logger.error("保存联系人记录失败: %s", e) return None def _send_message(self, tab, message: str) -> bool: """发送消息的通用方法。""" try: input_box = tab.ele('x://*[@id="boss-chat-editor-input"]', timeout=2) if not input_box: return False try: input_box.click(by_js=True) input_box.clear() except Exception: pass input_box.input(message) time.sleep(random.uniform(1, 2)) return self._send_with_confirm(tab, input_box=input_box, message=message) except Exception as e: self.logger.error("发送消息失败: %s", e) return False def _wait_for_reply(self, tab, sent_message: str, max_wait: int = 30) -> dict: """等待对方回复并提取信息。""" result = { "got_reply": False, "has_contact": False, "reply_text": "", } try: check_interval = 3 # 每3秒检查一次 for _ in range(max_wait // check_interval): time.sleep(check_interval) # 重新获取聊天面板的消息 panel_texts = self._collect_chat_panel_texts(tab, max_items=10) # 检查最后几条消息 for text in panel_texts[-5:]: # 过滤掉我们发送的消息(包含发送的话术内容) if sent_message in text: continue # 过滤掉包含"微信号"关键词但不是真实微信号的消息 if "微信号" in text and not self._extract_wechat(text): continue # 尝试提取联系方式 wechats = self._extract_wechat(text) phones = self._extract_phone(text) if wechats or phones: result["got_reply"] = True result["has_contact"] = True result["reply_text"] = text return result # 即使没有联系方式,只要有新消息也算回复 if text and text not in [sent_message, ASK_WECHAT_TEXT]: result["got_reply"] = True result["reply_text"] = text return result except Exception as e: self.logger.error("等待回复失败: %s", e) return result