diff --git a/worker/tasks/boss_recruit.py b/worker/tasks/boss_recruit.py index a1c2719..8255c6b 100644 --- a/worker/tasks/boss_recruit.py +++ b/worker/tasks/boss_recruit.py @@ -1,11 +1,15 @@ # -*- coding: utf-8 -*- """ -BOSS recruit task handler (new greeting flow). -This flow is aligned with `1.py -> main`: -1) open recommend page -2) iterate positions -3) apply selected filters -4) greet candidates until target count is reached +BOSS recruit task handler — aligned with `1.py -> main`. + +Flow: +1) Connect to BitBrowser using browser_name (= account_name, injected by server) +2) Open recommend page, start listening for geek/list API +3) Iterate positions (by name, by index, or current) +4) Apply selected filters (optional — skip if empty) +5) Capture geek list from intercepted API response +6) Greet each candidate (scroll, click, greet, favourite, send message, close) +7) Repeat rounds until target count reached or no new additions """ from __future__ import annotations @@ -18,10 +22,8 @@ from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple from common.protocol import TaskType from worker.bit_browser import BitBrowserAPI -from worker.browser_control import connect_browser, human_delay, safe_click from worker.tasks.base import BaseTaskHandler, TaskCancelledError - RECOMMEND_URL = "https://www.zhipin.com/web/chat/recommend" GEEK_LIST_API = "wapi/zpjob/rec/geek/list" FAST_REPLY_TEXT = "您好,我们目前有相关岗位机会,方便了解一下吗?" @@ -50,15 +52,14 @@ class BossRecruitHandler(BaseTaskHandler): progress_cb: Callable[[str, str], Coroutine], ) -> Any: job_title = str(params.get("job_title", "") or "").strip() or "相关岗位" + # account_name 实际就是 browser_name(由服务端从 BossAccount.browser_name 注入) account_name = str(params.get("account_name", "") or "").strip() - account_id = str(params.get("account_id", "") or "").strip() bit_api_base = str(params.get("bit_api_base", "http://127.0.0.1:54345") or "").strip() cancel_event = params.get("_cancel_event") worker_id = str(params.get("worker_id", "") or "").strip() + # 筛选条件可选——为空则跳过筛选步骤(与 1.py 一致) selected_filters = self._normalize_selected_filters(params) - if not selected_filters: - raise ValueError("招聘前必须先选择至少一个筛选条件") greet_target = self._parse_positive_int(params.get("greet_target"), default=20) position_names = self._normalize_string_list(params.get("position_names")) @@ -72,7 +73,6 @@ class BossRecruitHandler(BaseTaskHandler): self._run_sync, job_title, account_name, - account_id, bit_api_base, selected_filters, position_names, @@ -82,11 +82,12 @@ class BossRecruitHandler(BaseTaskHandler): ) return result + # ────────────────────── 同步主流程 ────────────────────── + def _run_sync( self, job_title: str, account_name: str, - account_id: str, bit_api_base: str, selected_filters: List[str], position_names: List[str], @@ -96,15 +97,10 @@ class BossRecruitHandler(BaseTaskHandler): ) -> Dict[str, Any]: self.ensure_not_cancelled(cancel_event) - bit_api = BitBrowserAPI(bit_api_base) - _, port = bit_api.get_browser_for_drission( - browser_id=account_id or None, - name=account_name or None, - ) - browser = connect_browser(port=port) - page = browser.latest_tab + # 与 1.py 一致:用 ChromiumPage + set_local_port 直接连接 + page = self._connect_browser(bit_api_base, account_name) - flow = self._run_main_like_flow( + flow = self._run_main_flow( page=page, job_title=job_title, selected_filters=selected_filters, @@ -129,7 +125,22 @@ class BossRecruitHandler(BaseTaskHandler): "success": len(errors) == 0, } - def _run_main_like_flow( + @staticmethod + def _connect_browser(bit_api_base: str, browser_name: str): + """ + 与 1.py _connect_bit_browser 一致: + 通过 BitBrowserAPI 打开浏览器,用 ChromiumPage + set_local_port 连接。 + """ + from DrissionPage import ChromiumPage, ChromiumOptions + + bit_api = BitBrowserAPI(bit_api_base) + cdp_addr, port, browser_id = bit_api.open_browser(name=browser_name) + co = ChromiumOptions().set_local_port(port=port) + return ChromiumPage(addr_or_opts=co) + + # ────────────────────── 主招聘循环(对齐 1.py main) ────────────────────── + + def _run_main_flow( self, *, page, @@ -145,9 +156,11 @@ class BossRecruitHandler(BaseTaskHandler): errors: List[str] = [] self.ensure_not_cancelled(cancel_event) + + # 1) 开始监听 geek/list API + 打开推荐页(与 1.py 一致) page.listen.start(GEEK_LIST_API) page.get(RECOMMEND_URL) - human_delay(1.2, 2.4) + time.sleep(2) container = self._get_container(page) positions = self._build_positions(container, position_names) @@ -177,18 +190,21 @@ class BossRecruitHandler(BaseTaskHandler): break container = self._get_container(page) - label = self._activate_position(page, container, pos_type, pos_value) + label = self._activate_position(container, pos_type, pos_value) if not label: continue - human_delay(1.0, 2.1) + time.sleep(2) container = self._get_container(page) - packet = self._load_geek_list_packet(page, container, selected_filters) - geek_list = self._extract_geek_list(packet) + # 2) 捕获 geek/list 包 + geek_list = self._load_and_extract_geek_list( + page, container, selected_filters + ) if not geek_list: continue + # 3) 逐个打招呼 added, new_records = self._greet_geek_list( page=page, container=container, @@ -210,11 +226,12 @@ class BossRecruitHandler(BaseTaskHandler): account_name=account_name, ) + # 退出条件(与 1.py 一致) if total_greeted >= greet_target: break if round_added == 0: break - human_delay(0.8, 1.6) + time.sleep(1) return { "actual_greeted": total_greeted, @@ -225,21 +242,256 @@ class BossRecruitHandler(BaseTaskHandler): "errors": errors, } + # ────────────────────── 容器 & 岗位 ────────────────────── + @staticmethod - def _packet_body(packet) -> dict: + def _get_container(page): + """推荐牛人内容在 iframe recommendFrame 内。""" + try: + return page.get_frame("recommendFrame") + except Exception: + return page + + def _get_all_position_elements(self, container) -> List[Any]: + for selector in JOB_LIST_SELECTORS: + try: + eles = container.eles(selector, timeout=2) + if eles and 1 <= len(eles) <= 100: + return eles + except Exception: + continue + return [] + + def _build_positions(self, container, position_names: List[str]) -> List[Tuple[str, Any]]: + if position_names: + return [("name", name) for name in position_names] + job_eles = self._get_all_position_elements(container) + if not job_eles: + return [("current", None)] + return [("index", i) for i in range(len(job_eles))] + + @staticmethod + def _position_label(pos_type: str, pos_value: Any) -> str: + if pos_type == "name": + return str(pos_value) + if pos_type == "index": + return f"岗位{int(pos_value) + 1}" + return "当前岗位" + + def _activate_position(self, container, pos_type: str, pos_value: Any) -> str: + """ + 切换到指定岗位。与 1.py 一致,使用 click(by_js=True)。 + """ + label = self._position_label(pos_type, pos_value) + try: + if pos_type == "name": + ele = container.ele(f'x://*[contains(text(),"{pos_value}")]', timeout=5) + if not ele: + return "" + ele.click(by_js=True) + elif pos_type == "index": + job_eles = self._get_all_position_elements(container) + if int(pos_value) >= len(job_eles): + return "" + job_eles[int(pos_value)].click(by_js=True) + return label + except Exception: + return "" + + # ────────────────────── 筛选 & 抓包 ────────────────────── + + def _apply_filter_and_confirm(self, container, selected_filters: List[str]) -> None: + """与 1.py _apply_filter_and_confirm 一致。""" + container.ele("x://*[contains(text(),'筛选')]").click() + time.sleep(2) + for item in selected_filters: + container.ele(f"x://*[contains(text(),'{item}')]").click() + time.sleep(random.random() * 1.5) + container.ele("x://*[contains(text(),'确定')]").click() + + def _load_and_extract_geek_list( + self, page, container, selected_filters: List[str] + ) -> List[dict]: + """ + 与 1.py 一致的抓包逻辑: + - 有筛选条件 → 点筛选 → wait(count=2) → 取最后一个包 + - 无筛选条件 → wait() → 取第一个包 + """ + if selected_filters: + self._apply_filter_and_confirm(container, selected_filters) + packets = page.listen.wait(count=2, timeout=30) + res = packets[-1] if packets else None + else: + res = page.listen.wait(timeout=30) + + if not res: + return [] + return self._extract_geek_list_from_packet(res) + + @staticmethod + def _extract_geek_list_from_packet(packet) -> List[dict]: + """从监听到的包中提取 geekList。""" if not packet: - return {} - response = getattr(packet, "response", None) - body = getattr(response, "body", None) if response is not None else None - if isinstance(body, dict): - return body + return [] + body = getattr(getattr(packet, "response", None), "body", None) if isinstance(body, str): try: - parsed = json.loads(body) - return parsed if isinstance(parsed, dict) else {} + body = json.loads(body) except Exception: - return {} - return {} + return [] + if not isinstance(body, dict): + return [] + geek_list = body.get("zpData", {}).get("geekList", []) + return geek_list if isinstance(geek_list, list) else [] + + # ────────────────────── 打招呼(对齐 1.py) ────────────────────── + + @staticmethod + def _geek_key(item: dict) -> str: + """牛人去重键(与 1.py _geek_key 一致)。""" + card = item.get("geekCard") or {} + return str( + card.get("encryptGeekId") + or card.get("geekId") + or card.get("geekName") + or "" + ).strip() + + def _greet_geek_list( + self, + *, + page, + container, + geek_list: List[dict], + greeted_keys: set[str], + position_label: str, + default_job: str, + cancel_event, + ) -> Tuple[int, List[dict]]: + """与 1.py _greet_geek_list_skip_greeted 一致的去重逻辑。""" + added = 0 + records: List[dict] = [] + for item in geek_list or []: + self.ensure_not_cancelled(cancel_event) + geek_key = self._geek_key(item) + if not geek_key or geek_key in greeted_keys: + continue + if not self._greet_one_geek(page, container, item): + continue + + greeted_keys.add(geek_key) + added += 1 + card = item.get("geekCard") or {} + name = str(card.get("geekName", "")).strip() or geek_key + position = str(card.get("expectPositionName", "")).strip() or position_label or default_job + records.append( + { + "name": name, + "position": position, + "geek_key": geek_key, + "source": "new_greet", + "greeted_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + ) + return added, records + + def _greet_one_geek(self, page, container, item: dict) -> bool: + """ + 对单个牛人打招呼,完全对齐 1.py _greet_one_geek 逻辑: + 1. 找姓名 → scrollIntoView → 点击姓名 + 2. 获取 frame(1)(右侧面板) + 3. 点击"打招呼" + 4. 点击"收藏" + 5. 点击 3 次 btn-outline 按钮 + 6. 输入快速回复 → 发送 + 7. 关闭侧边栏 + 关闭弹窗 + """ + geek_name = str((item.get("geekCard") or {}).get("geekName", "")).strip() + if not geek_name: + return False + + try: + # 查找姓名元素(与 1.py 一致的双重查找) + name_ele = container.ele(f'x://span[contains(text(),"{geek_name}")]', timeout=5) + if not name_ele: + name_ele = container.ele(f'x://span[text()="{geek_name}"]', timeout=2) + if not name_ele: + return False + + # scrollIntoView + 点击(与 1.py 一致) + name_ele.run_js("this.scrollIntoView()") + name_ele.click() + time.sleep(3) + + # 获取右侧面板(与 1.py 一致:page.get_frame(1)) + panel = page.get_frame(1) + time.sleep(random.uniform(0.5, 1.2)) + + # 打招呼(与 1.py 一致:by_js=True) + panel.ele('x://*[contains(text(),"打招呼")]', timeout=2).click(by_js=True) + time.sleep(random.uniform(0.5, 1.2)) + + # 收藏(与 1.py 一致) + panel.ele('x://*[contains(text(),"收藏")]', timeout=2).click(by_js=True) + time.sleep(random.uniform(0.5, 1.2)) + + # 点击 3 次 btn-outline 按钮(与 1.py 一致) + for _ in range(3): + panel.ele('x://*[@class="btn-v2 btn-outline-v2"]', timeout=2).click(by_js=True) + time.sleep(random.uniform(0.5, 1.2)) + + # 快速回复(与 1.py 一致) + page.ele('x://*[@data-placeholder="快速回复"]', timeout=2).input(FAST_REPLY_TEXT) + page.ele('x://*[contains(text(),"发送")]', timeout=2).click() + time.sleep(random.uniform(0.5, 1.2)) + + # 关闭侧边栏(与 1.py 一致) + page.ele('x://*[@class="iboss iboss-close"]').click() + time.sleep(random.uniform(0.5, 1.2)) + + # 关闭弹窗(与 1.py 一致) + panel.ele('x://*[@class="boss-popup__close"]').click() + time.sleep(random.uniform(0.5, 1.2)) + + return True + except Exception: + return False + + # ────────────────────── 联系记录入库 ────────────────────── + + def _save_new_greet_contacts(self, *, records: List[dict], worker_id: str, account_name: str) -> int: + if not records: + return 0 + from django.utils import timezone + from server.models import ContactRecord + + created = 0 + for rec in records: + geek_key = str(rec.get("geek_key", "")).strip() + if not geek_key: + continue + contact_key = f"greet:{geek_key}" + defaults = { + "name": str(rec.get("name", "")).strip() or contact_key, + "position": str(rec.get("position", "")).strip(), + "reply_status": "新打招呼", + "wechat_exchanged": False, + "worker_id": worker_id, + "contacted_at": timezone.now(), + "notes": f"新打招呼记录; account={account_name}", + } + obj, was_created = ContactRecord.objects.update_or_create( + contact=contact_key, + defaults=defaults, + ) + if was_created: + created += 1 + elif not obj.contacted_at: + obj.contacted_at = timezone.now() + obj.save(update_fields=["contacted_at"]) + return created + + # ────────────────────── 工具方法 ────────────────────── @staticmethod def _normalize_string_list(value: Any) -> List[str]: @@ -274,232 +526,3 @@ class BossRecruitHandler(BaseTaskHandler): return parsed if parsed > 0 else default except Exception: return default - - @staticmethod - def _get_container(page): - try: - return page.get_frame("recommendFrame") - except Exception: - return page - - def _get_all_position_elements(self, container) -> List[Any]: - for selector in JOB_LIST_SELECTORS: - try: - eles = container.eles(selector, timeout=2) - if eles and 1 <= len(eles) <= 100: - return eles - except Exception: - continue - return [] - - def _build_positions(self, container, position_names: List[str]) -> List[Tuple[str, Any]]: - if position_names: - return [("name", name) for name in position_names] - job_eles = self._get_all_position_elements(container) - if not job_eles: - return [("current", None)] - return [("index", i) for i in range(len(job_eles))] - - def _position_label(self, pos_type: str, pos_value: Any) -> str: - if pos_type == "name": - return str(pos_value) - if pos_type == "index": - return f"岗位{int(pos_value) + 1}" - return "当前岗位" - - def _activate_position(self, page, container, pos_type: str, pos_value: Any) -> str: - label = self._position_label(pos_type, pos_value) - try: - if pos_type == "name": - ele = container.ele(f'x://*[contains(text(),"{pos_value}")]', timeout=5) - if not ele: - return "" - if not safe_click(page, ele): - return "" - elif pos_type == "index": - job_eles = self._get_all_position_elements(container) - if int(pos_value) >= len(job_eles): - return "" - if not safe_click(page, job_eles[int(pos_value)]): - return "" - return label - except Exception: - return "" - - def _apply_filter_and_confirm(self, page, container, selected_filters: List[str]) -> None: - trigger = container.ele("x://*[contains(text(),'筛选')]", timeout=3) - if not trigger: - raise RuntimeError("未找到筛选按钮") - if not safe_click(page, trigger): - raise RuntimeError("点击筛选按钮失败") - human_delay(1.3, 2.0) - - for item in selected_filters: - option = container.ele(f"x://*[contains(text(),'{item}')]", timeout=2) - if not option: - self.logger.warning("筛选项未命中: %s", item) - continue - safe_click(page, option) - human_delay(0.35, 0.9) - - confirm = container.ele("x://*[contains(text(),'确定')]", timeout=3) - if not confirm: - raise RuntimeError("未找到筛选确认按钮") - if not safe_click(page, confirm): - raise RuntimeError("点击筛选确认失败") - - def _load_geek_list_packet(self, page, container, selected_filters: List[str]): - if selected_filters: - self._apply_filter_and_confirm(page, container, selected_filters) - packets = page.listen.wait(count=2, timeout=30) - if isinstance(packets, list) and packets: - return packets[-1] - return packets - return page.listen.wait(timeout=30) - - def _extract_geek_list(self, packet) -> List[dict]: - body = self._packet_body(packet) - zp_data = body.get("zpData", {}) if isinstance(body, dict) else {} - geek_list = zp_data.get("geekList", []) if isinstance(zp_data, dict) else [] - return geek_list if isinstance(geek_list, list) else [] - - @staticmethod - def _geek_key(item: dict) -> str: - card = item.get("geekCard") or {} - return str( - card.get("encryptGeekId") - or card.get("geekId") - or card.get("geekName") - or "" - ).strip() - - def _greet_geek_list( - self, - *, - page, - container, - geek_list: List[dict], - greeted_keys: set[str], - position_label: str, - default_job: str, - cancel_event, - ) -> Tuple[int, List[dict]]: - added = 0 - records: List[dict] = [] - for item in geek_list or []: - self.ensure_not_cancelled(cancel_event) - geek_key = self._geek_key(item) - if not geek_key or geek_key in greeted_keys: - continue - if not self._greet_one_geek(page, container, item): - continue - - greeted_keys.add(geek_key) - added += 1 - card = item.get("geekCard") or {} - name = str(card.get("geekName", "")).strip() or geek_key - position = str(card.get("expectPositionName", "")).strip() or position_label or default_job - records.append( - { - "name": name, - "position": position, - "geek_key": geek_key, - "source": "new_greet", - "greeted_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - } - ) - return added, records - - def _greet_one_geek(self, page, container, item: dict) -> bool: - geek_name = str((item.get("geekCard") or {}).get("geekName", "")).strip() - if not geek_name: - return False - - try: - name_ele = container.ele(f'x://span[contains(text(),"{geek_name}")]', timeout=5) - if not name_ele: - name_ele = container.ele(f'x://span[text()="{geek_name}"]', timeout=2) - if not name_ele: - return False - - try: - name_ele.run_js("this.scrollIntoView()") - except Exception: - pass - if not safe_click(page, name_ele): - return False - - human_delay(2.2, 3.6) - panel = page.get_frame(1) - human_delay(0.4, 1.0) - - greet_btn = panel.ele('x://*[contains(text(),"打招呼")]', timeout=3) - if not greet_btn or not safe_click(panel, greet_btn): - return False - human_delay(0.4, 1.0) - - collect_btn = panel.ele('x://*[contains(text(),"收藏")]', timeout=2) - if collect_btn: - safe_click(panel, collect_btn) - human_delay(0.3, 0.8) - - for _ in range(3): - extra_btn = panel.ele('x://*[@class="btn-v2 btn-outline-v2"]', timeout=2) - if extra_btn: - safe_click(panel, extra_btn) - human_delay(0.3, 0.8) - - input_box = page.ele('x://*[@data-placeholder="快速回复"]', timeout=2) - if input_box: - input_box.input(FAST_REPLY_TEXT) - send_btn = page.ele('x://*[contains(text(),"发送")]', timeout=2) - if send_btn: - safe_click(page, send_btn) - human_delay(0.3, 0.8) - - close_side = page.ele('x://*[@class="iboss iboss-close"]', timeout=1) - if close_side: - safe_click(page, close_side) - human_delay(0.2, 0.6) - - close_popup = panel.ele('x://*[@class="boss-popup__close"]', timeout=1) - if close_popup: - safe_click(panel, close_popup) - human_delay(0.2, 0.6) - - return True - except Exception: - return False - - def _save_new_greet_contacts(self, *, records: List[dict], worker_id: str, account_name: str) -> int: - if not records: - return 0 - from django.utils import timezone - from server.models import ContactRecord - - created = 0 - for rec in records: - geek_key = str(rec.get("geek_key", "")).strip() - if not geek_key: - continue - contact_key = f"greet:{geek_key}" - defaults = { - "name": str(rec.get("name", "")).strip() or contact_key, - "position": str(rec.get("position", "")).strip(), - "reply_status": "新打招呼", - "wechat_exchanged": False, - "worker_id": worker_id, - "contacted_at": timezone.now(), - "notes": f"新打招呼记录; account={account_name}", - } - obj, was_created = ContactRecord.objects.update_or_create( - contact=contact_key, - defaults=defaults, - ) - if was_created: - created += 1 - elif not obj.contacted_at: - obj.contacted_at = timezone.now() - obj.save(update_fields=["contacted_at"]) - return created -