# -*- coding: utf-8 -*- """ BOSS recruit task handler — aligned with `1.py -> main`. Flow: 1) Connect to BitBrowser using browser_name (= account_name, injected by server) 2) Open recommend page, start listening for geek/list API 3) Iterate positions (by name, by index, or current) 4) Apply selected filters (optional — skip if empty) 5) Capture geek list from intercepted API response 6) Greet each candidate (scroll, click, greet, favourite, send message, close) 7) Repeat rounds until target count reached or no new additions """ from __future__ import annotations import asyncio import json import random import time from datetime import datetime from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple from common.protocol import TaskType from worker.bit_browser import BitBrowserAPI from worker.tasks.base import BaseTaskHandler, TaskCancelledError RECOMMEND_URL = "https://www.zhipin.com/web/chat/recommend" GEEK_LIST_API = "wapi/zpjob/rec/geek/list" FAST_REPLY_TEXT = "您好,我们目前有相关岗位机会,方便了解一下吗?" # Keep selectors consistent with 1.py logic. JOB_LIST_SELECTORS = [ "x://*[contains(@class,'job-item') or contains(@class,'position-item')]", "x://li[contains(@class,'job')]", "x://div[contains(@class,'job-list')]/div", "x://ul[contains(@class,'job')]/li", "x://*[contains(@class,'recommend-job')]//*[contains(@class,'item')]", "x://*[contains(@class,'job-list')]/*", "x://a[contains(@href,'job')]", ] class BossRecruitHandler(BaseTaskHandler): """Recruit by greeting candidates from recommend list.""" task_type = TaskType.BOSS_RECRUIT.value async def execute( self, task_id: str, params: Dict[str, Any], progress_cb: Callable[[str, str], Coroutine], ) -> Any: job_title = str(params.get("job_title", "") or "").strip() or "相关岗位" # account_name 实际就是 browser_name(由服务端从 BossAccount.browser_name 注入) account_name = str(params.get("account_name", "") or "").strip() bit_api_base = str(params.get("bit_api_base", "http://127.0.0.1:54345") or "").strip() cancel_event = params.get("_cancel_event") worker_id = str(params.get("worker_id", "") or "").strip() # 筛选条件可选——为空则跳过筛选步骤(与 1.py 一致) selected_filters = self._normalize_selected_filters(params) greet_target = self._parse_positive_int(params.get("greet_target"), default=20) position_names = self._normalize_string_list(params.get("position_names")) # 年龄范围(程序内过滤,不在浏览器筛选中点击) age_min = self._parse_optional_int(params.get("age_min")) age_max = self._parse_optional_int(params.get("age_max")) self.ensure_not_cancelled(cancel_event) age_desc = "" if age_min is not None and age_max is not None: age_desc = f",年龄范围: {age_min}-{age_max}岁" elif age_min is not None: age_desc = f",最小年龄: {age_min}岁" elif age_max is not None: age_desc = f",最大年龄: {age_max}岁" await progress_cb(task_id, f"正在启动招聘流程,目标打招呼人数: {greet_target}{age_desc}") loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, self._run_sync, job_title, account_name, bit_api_base, selected_filters, position_names, greet_target, worker_id, cancel_event, age_min, age_max, ) return result # ────────────────────── 同步主流程 ────────────────────── def _run_sync( self, job_title: str, account_name: str, bit_api_base: str, selected_filters: List[str], position_names: List[str], greet_target: int, worker_id: str, cancel_event, age_min: Optional[int] = None, age_max: Optional[int] = None, ) -> Dict[str, Any]: self.ensure_not_cancelled(cancel_event) # 与 1.py 一致:用 ChromiumPage + set_local_port 直接连接 page = self._connect_browser(bit_api_base, account_name) flow = self._run_main_flow( page=page, job_title=job_title, selected_filters=selected_filters, position_names=position_names, greet_target=greet_target, worker_id=worker_id, account_name=account_name, cancel_event=cancel_event, age_min=age_min, age_max=age_max, ) errors = flow.get("errors", []) return { "job_title": job_title, "target_greet_count": greet_target, "actual_greeted": flow.get("actual_greeted", 0), "rounds": flow.get("rounds", 0), "selected_filters": selected_filters, "positions": flow.get("positions", []), "details": flow.get("details", []), "contact_records_created": flow.get("contact_records_created", 0), "error_count": len(errors), "errors": errors[:20], "success": len(errors) == 0, } @staticmethod def _connect_browser(bit_api_base: str, browser_name: str): """ 与 1.py _connect_bit_browser 一致: 通过 BitBrowserAPI 打开浏览器,用 ChromiumPage + set_local_port 连接。 """ from DrissionPage import ChromiumPage, ChromiumOptions bit_api = BitBrowserAPI(bit_api_base) cdp_addr, port, browser_id = bit_api.open_browser(name=browser_name) co = ChromiumOptions().set_local_port(port=port) return ChromiumPage(addr_or_opts=co) # ────────────────────── 主招聘循环(对齐 1.py main) ────────────────────── def _run_main_flow( self, *, page, job_title: str, selected_filters: List[str], position_names: List[str], greet_target: int, worker_id: str, account_name: str, cancel_event, age_min: Optional[int] = None, age_max: Optional[int] = None, ) -> Dict[str, Any]: details: List[dict] = [] errors: List[str] = [] self.ensure_not_cancelled(cancel_event) # 1) 开始监听 geek/list API + 打开推荐页(与 1.py 一致) page.listen.start(GEEK_LIST_API) page.get(RECOMMEND_URL) time.sleep(2) container = self._get_container(page) positions = self._build_positions(container, position_names) if not positions: return { "actual_greeted": 0, "rounds": 0, "positions": [], "details": [], "contact_records_created": 0, "errors": ["未识别到岗位列表,无法开始招聘"], } greeted_keys: set[str] = set() total_greeted = 0 round_num = 0 contact_records_created = 0 while True: self.ensure_not_cancelled(cancel_event) round_num += 1 round_added = 0 for pos_type, pos_value in positions: self.ensure_not_cancelled(cancel_event) if total_greeted >= greet_target: break container = self._get_container(page) label = self._activate_position(container, pos_type, pos_value) if not label: continue time.sleep(2) container = self._get_container(page) # 2) 捕获 geek/list 包 geek_list = self._load_and_extract_geek_list( page, container, selected_filters ) if not geek_list: continue # 3) 逐个打招呼 added, new_records = self._greet_geek_list( page=page, container=container, geek_list=geek_list, greeted_keys=greeted_keys, position_label=label, default_job=job_title, cancel_event=cancel_event, age_min=age_min, age_max=age_max, ) if not added: continue total_greeted += added round_added += added details.extend(new_records) contact_records_created += self._save_new_greet_contacts( records=new_records, worker_id=worker_id, account_name=account_name, ) # 退出条件(与 1.py 一致) if total_greeted >= greet_target: break if round_added == 0: break time.sleep(1) return { "actual_greeted": total_greeted, "rounds": round_num, "positions": [self._position_label(t, v) for t, v in positions], "details": details, "contact_records_created": contact_records_created, "errors": errors, } # ────────────────────── 容器 & 岗位 ────────────────────── @staticmethod def _get_container(page): """推荐牛人内容在 iframe recommendFrame 内。""" try: return page.get_frame("recommendFrame") except Exception: return page def _get_all_position_elements(self, container) -> List[Any]: for selector in JOB_LIST_SELECTORS: try: eles = container.eles(selector, timeout=2) if eles and 1 <= len(eles) <= 100: return eles except Exception: continue return [] def _build_positions(self, container, position_names: List[str]) -> List[Tuple[str, Any]]: if position_names: return [("name", name) for name in position_names] job_eles = self._get_all_position_elements(container) if not job_eles: return [("current", None)] return [("index", i) for i in range(len(job_eles))] @staticmethod def _position_label(pos_type: str, pos_value: Any) -> str: if pos_type == "name": return str(pos_value) if pos_type == "index": return f"岗位{int(pos_value) + 1}" return "当前岗位" def _activate_position(self, container, pos_type: str, pos_value: Any) -> str: """ 切换到指定岗位。与 1.py 一致,使用 click(by_js=True)。 """ label = self._position_label(pos_type, pos_value) try: if pos_type == "name": ele = container.ele(f'x://*[contains(text(),"{pos_value}")]', timeout=5) if not ele: return "" ele.click(by_js=True) elif pos_type == "index": job_eles = self._get_all_position_elements(container) if int(pos_value) >= len(job_eles): return "" job_eles[int(pos_value)].click(by_js=True) return label except Exception: return "" # ────────────────────── 筛选 & 抓包 ────────────────────── def _apply_filter_and_confirm(self, container, selected_filters: List[str]) -> None: """与 1.py _apply_filter_and_confirm 一致。""" container.ele("x://*[contains(text(),'筛选')]").click() time.sleep(2) for item in selected_filters: container.ele(f"x://*[contains(text(),'{item}')]").click() time.sleep(random.random() * 1.5) container.ele("x://*[contains(text(),'确定')]").click() def _load_and_extract_geek_list( self, page, container, selected_filters: List[str] ) -> List[dict]: """ 与 1.py 一致的抓包逻辑: - 有筛选条件 → 点筛选 → wait(count=2) → 取最后一个包 - 无筛选条件 → wait() → 取第一个包 """ if selected_filters: self._apply_filter_and_confirm(container, selected_filters) packets = page.listen.wait(count=2, timeout=30) res = packets[-1] if packets else None else: res = page.listen.wait(timeout=30) if not res: return [] return self._extract_geek_list_from_packet(res) @staticmethod def _extract_geek_list_from_packet(packet) -> List[dict]: """从监听到的包中提取 geekList。""" if not packet: return [] body = getattr(getattr(packet, "response", None), "body", None) if isinstance(body, str): try: body = json.loads(body) except Exception: return [] if not isinstance(body, dict): return [] geek_list = body.get("zpData", {}).get("geekList", []) return geek_list if isinstance(geek_list, list) else [] # ────────────────────── 打招呼(对齐 1.py) ────────────────────── @staticmethod def _geek_key(item: dict) -> str: """牛人去重键(与 1.py _geek_key 一致)。""" card = item.get("geekCard") or {} return str( card.get("encryptGeekId") or card.get("geekId") or card.get("geekName") or "" ).strip() def _greet_geek_list( self, *, page, container, geek_list: List[dict], greeted_keys: set[str], position_label: str, default_job: str, cancel_event, age_min: Optional[int] = None, age_max: Optional[int] = None, ) -> Tuple[int, List[dict]]: """与 1.py _greet_geek_list_skip_greeted 一致的去重逻辑,增加年龄过滤。""" added = 0 records: List[dict] = [] for item in geek_list or []: self.ensure_not_cancelled(cancel_event) geek_key = self._geek_key(item) if not geek_key or geek_key in greeted_keys: continue # 年龄过滤:从候选人数据中提取年龄,不满足范围则跳过 if age_min is not None or age_max is not None: age = self._parse_age(item) if age is not None: if age_min is not None and age < age_min: continue if age_max is not None and age > age_max: continue if not self._greet_one_geek(page, container, item): continue greeted_keys.add(geek_key) added += 1 card = item.get("geekCard") or {} name = str(card.get("geekName", "")).strip() or geek_key position = str(card.get("expectPositionName", "")).strip() or position_label or default_job records.append( { "name": name, "position": position, "geek_key": geek_key, "source": "new_greet", "greeted_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } ) return added, records @staticmethod def _close_all_panels(page): """ 强制关闭所有可能打开的面板和弹窗,确保 UI 恢复到干净状态。 每一步都独立 try/except,不管成功或失败都继续。 """ # 关闭侧边聊天面板 try: close_side = page.ele('x://*[@class="iboss iboss-close"]', timeout=1) if close_side: close_side.click(by_js=True) time.sleep(0.3) except Exception: pass # 关闭 boss-popup 弹窗 try: close_popup = page.ele('x://*[@class="boss-popup__close"]', timeout=1) if close_popup: close_popup.click(by_js=True) time.sleep(0.3) except Exception: pass # 兜底:尝试关闭所有可能的弹窗关闭按钮 for selector in [ 'x://*[contains(@class,"close") and contains(@class,"popup")]', 'x://*[contains(@class,"dialog")]//button[contains(@class,"close")]', ]: try: btn = page.ele(selector, timeout=0.5) if btn: btn.click(by_js=True) time.sleep(0.2) except Exception: pass def _greet_one_geek(self, page, container, item: dict) -> bool: """ 对单个牛人打招呼,对齐 1.py _greet_one_geek 逻辑。 关键改进:使用 finally 确保失败时关闭所有面板/弹窗, 防止 UI 残留导致下一个人卡死在循环中。 """ geek_name = str((item.get("geekCard") or {}).get("geekName", "")).strip() if not geek_name: return False panel_opened = False # 标记是否打开了右侧面板 try: # 查找姓名元素 name_ele = container.ele(f'x://span[contains(text(),"{geek_name}")]', timeout=5) if not name_ele: name_ele = container.ele(f'x://span[text()="{geek_name}"]', timeout=2) if not name_ele: return False # scrollIntoView + 点击 name_ele.run_js("this.scrollIntoView()") name_ele.click() time.sleep(3) panel_opened = True # 获取右侧面板 panel = page.get_frame(1) time.sleep(random.uniform(0.5, 1.2)) # 打招呼 greet_btn = panel.ele('x://*[contains(text(),"打招呼")]', timeout=2) if not greet_btn: return False greet_btn.click(by_js=True) time.sleep(random.uniform(0.5, 1.2)) # 收藏(可选,失败不影响) try: collect_btn = panel.ele('x://*[contains(text(),"收藏")]', timeout=2) if collect_btn: collect_btn.click(by_js=True) time.sleep(random.uniform(0.5, 1.2)) except Exception: pass # 点击 btn-outline 按钮(可选,失败不影响) for _ in range(3): try: extra_btn = panel.ele('x://*[@class="btn-v2 btn-outline-v2"]', timeout=1) if extra_btn: extra_btn.click(by_js=True) time.sleep(random.uniform(0.5, 1.2)) except Exception: break # 快速回复(可选,失败不影响) try: input_box = page.ele('x://*[@data-placeholder="快速回复"]', timeout=2) if input_box: input_box.input(FAST_REPLY_TEXT) send_btn = page.ele('x://*[contains(text(),"发送")]', timeout=2) if send_btn: send_btn.click() time.sleep(random.uniform(0.5, 1.2)) except Exception: pass # 关闭面板 self._close_all_panels(page) panel_opened = False return True except Exception: return False finally: # 无论成功还是失败,都确保关闭所有面板 if panel_opened: self._close_all_panels(page) time.sleep(0.5) # ────────────────────── 联系记录入库 ────────────────────── def _save_new_greet_contacts(self, *, records: List[dict], worker_id: str, account_name: str) -> int: if not records: return 0 from django.utils import timezone from server.models import ContactRecord created = 0 for rec in records: geek_key = str(rec.get("geek_key", "")).strip() if not geek_key: continue contact_key = f"greet:{geek_key}" defaults = { "name": str(rec.get("name", "")).strip() or contact_key, "position": str(rec.get("position", "")).strip(), "reply_status": "新打招呼", "wechat_exchanged": False, "worker_id": worker_id, "contacted_at": timezone.now(), "notes": f"新打招呼记录; account={account_name}", } obj, was_created = ContactRecord.objects.update_or_create( contact=contact_key, defaults=defaults, ) if was_created: created += 1 elif not obj.contacted_at: obj.contacted_at = timezone.now() obj.save(update_fields=["contacted_at"]) return created # ────────────────────── 工具方法 ────────────────────── @staticmethod def _normalize_string_list(value: Any) -> List[str]: if value is None: return [] if isinstance(value, list): return [str(v).strip() for v in value if str(v).strip()] if isinstance(value, str): raw = value.strip() if not raw: return [] if raw.startswith("[") and raw.endswith("]"): try: parsed = json.loads(raw) if isinstance(parsed, list): return [str(v).strip() for v in parsed if str(v).strip()] except Exception: pass return [raw] return [] def _normalize_selected_filters(self, params: Dict[str, Any]) -> List[str]: filters = params.get("selected_filters") if filters is None: filters = params.get("filters") return self._normalize_string_list(filters) @staticmethod def _parse_positive_int(value: Any, default: int) -> int: try: parsed = int(value) return parsed if parsed > 0 else default except Exception: return default @staticmethod def _parse_optional_int(value: Any) -> Optional[int]: """解析可选整数,None / 空串 / 无效值返回 None。""" if value is None: return None try: parsed = int(value) return parsed if parsed > 0 else None except (TypeError, ValueError): return None @staticmethod def _parse_age(item: dict) -> Optional[int]: """ 从 geekCard 中提取候选人年龄(数字)。 BOSS 直聘 API 的 geekCard 中可能包含: - ageDesc: "25岁" - age: 25 - geekAge: 25 返回整数年龄,无法解析则返回 None。 """ import re card = item.get("geekCard") or {} # 优先使用 ageDesc(如 "25岁") age_desc = str(card.get("ageDesc", "") or "").strip() if age_desc: match = re.search(r"(\d+)", age_desc) if match: return int(match.group(1)) # 尝试 age 字段 age_val = card.get("age") if age_val is not None: try: return int(age_val) except (TypeError, ValueError): pass # 尝试 geekAge 字段 geek_age = card.get("geekAge") if geek_age is not None: try: return int(geek_age) except (TypeError, ValueError): pass return None