diff --git a/common/protocol.py b/common/protocol.py index 4b5ed93..20e9c84 100644 --- a/common/protocol.py +++ b/common/protocol.py @@ -1,57 +1,52 @@ # -*- coding: utf-8 -*- """ -共享消息协议定义。 -服务器与 Worker 之间通过 WebSocket 传递 JSON 消息,每条消息包含 type 字段标识消息类型。 +Shared message protocol definitions for Server <-> Worker communication. """ from enum import Enum -# ────────────────────────── 消息类型 ────────────────────────── - class MsgType(str, Enum): - """WebSocket 消息类型枚举(str 混入方便 JSON 序列化)。""" + """WebSocket message types.""" - # Worker → Server - REGISTER = "register" # 注册:上报 worker 信息与浏览器列表 - HEARTBEAT = "heartbeat" # 心跳 - BROWSER_LIST_UPDATE = "browser_list_update" # 浏览器列表变更 - TASK_PROGRESS = "task_progress" # 任务进度上报 - TASK_RESULT = "task_result" # 任务最终结果 - TASK_STATUS_REPORT = "task_status_report" # 任务执行状态回报(响应服务端查询) + # Worker -> Server + REGISTER = "register" + HEARTBEAT = "heartbeat" + BROWSER_LIST_UPDATE = "browser_list_update" + TASK_PROGRESS = "task_progress" + TASK_RESULT = "task_result" + TASK_STATUS_REPORT = "task_status_report" - # Server → Worker - REGISTER_ACK = "register_ack" # 注册确认 - HEARTBEAT_ACK = "heartbeat_ack" # 心跳确认 - TASK_ASSIGN = "task_assign" # 派发任务 - TASK_CANCEL = "task_cancel" # 取消任务 - TASK_STATUS_QUERY = "task_status_query" # 查询任务执行状态 + # Server -> Worker + REGISTER_ACK = "register_ack" + HEARTBEAT_ACK = "heartbeat_ack" + TASK_ASSIGN = "task_assign" + TASK_CANCEL = "task_cancel" + TASK_STATUS_QUERY = "task_status_query" - # 双向 - ERROR = "error" # 错误消息 + # Bidirectional + ERROR = "error" -# ────────────────────────── 任务状态 ────────────────────────── - class TaskStatus(str, Enum): - """任务生命周期状态。""" - PENDING = "pending" # 已创建,等待派发 - DISPATCHED = "dispatched" # 已派发给 Worker - RUNNING = "running" # Worker 正在执行 - SUCCESS = "success" # 执行成功 - FAILED = "failed" # 执行失败 - CANCELLED = "cancelled" # 已取消 + """Task lifecycle states.""" + PENDING = "pending" + DISPATCHED = "dispatched" + RUNNING = "running" + SUCCESS = "success" + FAILED = "failed" + CANCELLED = "cancelled" -# ────────────────────────── 任务类型 ────────────────────────── class TaskType(str, Enum): - """可扩展的任务类型。新增任务在此追加即可。""" - BOSS_RECRUIT = "boss_recruit" # BOSS 直聘招聘流程 - CHECK_LOGIN = "check_login" # 检测 BOSS 账号是否已登录 + """Supported task types.""" + BOSS_RECRUIT = "boss_recruit" # New greeting flow (from 1.py main) + BOSS_REPLY = "boss_reply" # Old recruit flow, now reply flow + CHECK_LOGIN = "check_login" -# ────────────────────────── 辅助函数 ────────────────────────── def make_msg(msg_type: MsgType, **payload) -> dict: - """构造一条标准 WebSocket JSON 消息。""" + """Build a standard WebSocket JSON message.""" return {"type": msg_type.value, **payload} + diff --git a/server/api/filters.py b/server/api/filters.py index 5dfc651..9315845 100644 --- a/server/api/filters.py +++ b/server/api/filters.py @@ -1,18 +1,18 @@ # -*- coding: utf-8 -*- """ -筛选配置 API(需要登录): -- GET /api/filters -> 查询所有筛选配置 -- POST /api/filters -> 创建筛选配置 -- GET /api/filters/{id} -> 查询单个配置 -- PUT /api/filters/{id} -> 更新配置 -- DELETE /api/filters/{id} -> 删除配置 +Filter APIs: +- CRUD for FilterConfig (legacy/manual configs) +- Recruit filter options (synced from site at worker startup) """ from rest_framework import status from rest_framework.decorators import api_view -from server.core.response import api_success, api_error -from server.models import FilterConfig -from server.serializers import FilterConfigSerializer +from server.core.response import api_error, api_success +from server.models import BossAccount, FilterConfig, RecruitFilterSnapshot +from server.serializers import ( + FilterConfigSerializer, + RecruitFilterSnapshotSerializer, +) @api_view(["GET", "POST"]) @@ -45,3 +45,61 @@ def filter_detail(request, pk): obj.delete() return api_success(msg="筛选配置已删除") + + +def _snapshot_payload(snapshot: RecruitFilterSnapshot) -> dict: + data = RecruitFilterSnapshotSerializer(snapshot).data + return { + "worker_id": data.get("worker_id", ""), + "account_name": data.get("account_name", ""), + "browser_id": data.get("browser_id", ""), + "groups": data.get("groups", []), + "flat_options": data.get("flat_options", []), + "synced_at": data.get("synced_at"), + } + + +@api_view(["GET"]) +def recruit_filter_options(request): + """ + Read recruit filter options for dropdown. + Query by: + - account_id (preferred) + - or worker_id + account_name + """ + account_id = request.query_params.get("account_id") + worker_id = (request.query_params.get("worker_id") or "").strip() + account_name = (request.query_params.get("account_name") or "").strip() + + snapshot = None + if account_id: + try: + account = BossAccount.objects.get(pk=int(account_id)) + except (ValueError, BossAccount.DoesNotExist): + return api_error(status.HTTP_404_NOT_FOUND, "账号不存在") + snapshot = ( + RecruitFilterSnapshot.objects + .filter(worker_id=account.worker_id, account_name=account.browser_name) + .first() + ) + elif worker_id and account_name: + snapshot = ( + RecruitFilterSnapshot.objects + .filter(worker_id=worker_id, account_name=account_name) + .first() + ) + else: + snapshot = RecruitFilterSnapshot.objects.order_by("-synced_at").first() + + if not snapshot: + return api_success( + { + "worker_id": worker_id, + "account_name": account_name, + "groups": [], + "flat_options": [], + "synced_at": None, + } + ) + return api_success(_snapshot_payload(snapshot)) + diff --git a/server/migrations/0007_recruitfiltersnapshot.py b/server/migrations/0007_recruitfiltersnapshot.py new file mode 100644 index 0000000..7bc33aa --- /dev/null +++ b/server/migrations/0007_recruitfiltersnapshot.py @@ -0,0 +1,34 @@ +# Generated by Codex on 2026-03-06 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("server", "0006_filterconfig_position_keywords_city_salary_experience"), + ] + + operations = [ + migrations.CreateModel( + name="RecruitFilterSnapshot", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("worker_id", models.CharField(db_index=True, default="", max_length=64, verbose_name="Worker ID")), + ("account_name", models.CharField(db_index=True, default="", max_length=128, verbose_name="环境名称")), + ("browser_id", models.CharField(blank=True, default="", max_length=128, verbose_name="浏览器 ID")), + ("groups", models.JSONField(blank=True, default=list, verbose_name="筛选分组")), + ("flat_options", models.JSONField(blank=True, default=list, verbose_name="扁平筛选项")), + ("raw_payload", models.JSONField(blank=True, default=dict, verbose_name="原始筛选数据")), + ("synced_at", models.DateTimeField(auto_now=True, verbose_name="同步时间")), + ("created_at", models.DateTimeField(auto_now_add=True, verbose_name="创建时间")), + ], + options={ + "verbose_name": "招聘筛选快照", + "verbose_name_plural": "招聘筛选快照", + "db_table": "recruit_filter_snapshot", + "unique_together": {("worker_id", "account_name")}, + }, + ), + ] + diff --git a/server/models.py b/server/models.py index 21e8408..5f0015e 100644 --- a/server/models.py +++ b/server/models.py @@ -141,6 +141,28 @@ class FilterConfig(models.Model): return self.name +class RecruitFilterSnapshot(models.Model): + """Per-account site filter snapshot fetched from zhipin recommend page.""" + + worker_id = models.CharField(max_length=64, default="", db_index=True, verbose_name="Worker ID") + account_name = models.CharField(max_length=128, default="", db_index=True, verbose_name="环境名称") + browser_id = models.CharField(max_length=128, default="", blank=True, verbose_name="浏览器 ID") + groups = models.JSONField(default=list, blank=True, verbose_name="筛选分组") + flat_options = models.JSONField(default=list, blank=True, verbose_name="扁平筛选项") + raw_payload = models.JSONField(default=dict, blank=True, verbose_name="原始筛选数据") + synced_at = models.DateTimeField(auto_now=True, verbose_name="同步时间") + created_at = models.DateTimeField(auto_now_add=True, verbose_name="创建时间") + + class Meta: + db_table = "recruit_filter_snapshot" + unique_together = [("worker_id", "account_name")] + verbose_name = "招聘筛选快照" + verbose_name_plural = verbose_name + + def __str__(self): + return f"{self.account_name}@{self.worker_id}" + + class ChatScript(models.Model): """复聊话术表。""" SCRIPT_TYPE_CHOICES = [ diff --git a/server/serializers.py b/server/serializers.py index a74ae84..5496327 100644 --- a/server/serializers.py +++ b/server/serializers.py @@ -5,7 +5,7 @@ DRF 序列化器。 from rest_framework import serializers from server.models import ( - BossAccount, TaskLog, FilterConfig, ChatScript, ContactRecord, SystemConfig, + BossAccount, TaskLog, FilterConfig, RecruitFilterSnapshot, ChatScript, ContactRecord, SystemConfig, FollowUpConfig, FollowUpScript, FollowUpRecord ) @@ -134,6 +134,13 @@ class FilterConfigSerializer(serializers.ModelSerializer): # ────────────────────────── 话术 ────────────────────────── +class RecruitFilterSnapshotSerializer(serializers.ModelSerializer): + class Meta: + model = RecruitFilterSnapshot + fields = "__all__" + read_only_fields = ["id", "synced_at", "created_at"] + + class ChatScriptSerializer(serializers.ModelSerializer): script_type_display = serializers.CharField(source="get_script_type_display", read_only=True) diff --git a/server/urls.py b/server/urls.py index 9218c16..8c2cdbe 100644 --- a/server/urls.py +++ b/server/urls.py @@ -37,6 +37,7 @@ urlpatterns = [ # ─── 筛选配置 ─── path("api/filters", filters.filter_list), + path("api/filters/options", filters.recruit_filter_options), path("api/filters/", filters.filter_detail), # ─── 话术管理 ─── diff --git a/worker/main.py b/worker/main.py index 1fe0181..dde542a 100644 --- a/worker/main.py +++ b/worker/main.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- """ -Worker 启动入口。 -启动方式: python -m worker.main [--server ws://IP:8000/ws] [--worker-id pc-a] [--worker-name 电脑A] +Worker startup entrypoint. +Usage: + python -m worker.main [--server ws://IP:8000/ws] [--worker-id pc-a] [--worker-name name] """ from __future__ import annotations @@ -9,7 +10,6 @@ import argparse import asyncio import logging import os -import sys os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings") @@ -17,10 +17,11 @@ import django # noqa: E402 django.setup() # noqa: E402 +from tunnel.client import TunnelClient from worker import config +from worker.recruit_filter_sync import bootstrap_recruit_filter_snapshot from worker.tasks.registry import register_all_handlers from worker.ws_client import WorkerWSClient -from tunnel.client import TunnelClient logging.basicConfig( level=logging.INFO, @@ -33,38 +34,18 @@ logger = logging.getLogger("worker.main") def parse_args(): parser = argparse.ArgumentParser(description="Browser Control Worker Agent") parser.add_argument("--worker", action="store_true", help=argparse.SUPPRESS) - parser.add_argument( - "--server", - default=config.SERVER_WS_URL, - help=f"中央服务器 WebSocket 地址 (默认: {config.SERVER_WS_URL})", - ) - parser.add_argument( - "--worker-id", - default=config.WORKER_ID, - help=f"Worker ID (默认: {config.WORKER_ID})", - ) - parser.add_argument( - "--worker-name", - default=config.WORKER_NAME, - help=f"Worker 名称 (默认: {config.WORKER_NAME})", - ) - parser.add_argument( - "--bit-api", - default=config.BIT_API_BASE, - help=f"比特浏览器本地 API 地址 (默认: {config.BIT_API_BASE})", - ) - parser.add_argument( - "--no-tunnel", - action="store_true", - help="禁用内网穿透隧道(不与隧道服务端连接)", - ) + parser.add_argument("--server", default=config.SERVER_WS_URL, help=f"WebSocket server URL (default: {config.SERVER_WS_URL})") + parser.add_argument("--worker-id", default=config.WORKER_ID, help=f"Worker ID (default: {config.WORKER_ID})") + parser.add_argument("--worker-name", default=config.WORKER_NAME, help=f"Worker name (default: {config.WORKER_NAME})") + parser.add_argument("--bit-api", default=config.BIT_API_BASE, help=f"BitBrowser local API URL (default: {config.BIT_API_BASE})") + parser.add_argument("--no-tunnel", action="store_true", help="Disable tunnel client") return parser.parse_args() def _local_port_from_bit_api(bit_api_base: str) -> int: - """从 BIT_API_BASE (e.g. http://127.0.0.1:54345) 解析端口。""" try: from urllib.parse import urlparse + p = urlparse(bit_api_base if "://" in bit_api_base else "http://" + bit_api_base) return p.port or 54345 except Exception: @@ -72,9 +53,9 @@ def _local_port_from_bit_api(bit_api_base: str) -> int: def _extract_host_from_ws_url(ws_url: str) -> str: - """从 WebSocket URL (如 ws://8.137.99.82:9000/ws) 中提取 host。""" try: from urllib.parse import urlparse + p = urlparse(ws_url) return p.hostname or "127.0.0.1" except Exception: @@ -82,9 +63,25 @@ def _extract_host_from_ws_url(ws_url: str) -> str: async def run(args): - # 注册所有任务处理器 register_all_handlers() - logger.info("已注册任务处理器") + logger.info("Task handlers registered") + + # Startup bootstrap: fetch site filters (main1 logic) and persist for frontend dropdown. + try: + bootstrap_result = bootstrap_recruit_filter_snapshot( + worker_id=args.worker_id, + bit_api_base=args.bit_api, + logger=logger, + ) + if bootstrap_result: + logger.info( + "Recruit filters synced on startup: account=%s groups=%s options=%s", + bootstrap_result.get("account_name", ""), + bootstrap_result.get("groups", 0), + bootstrap_result.get("flat_options", 0), + ) + except Exception as e: + logger.warning("Recruit filter startup sync failed: %s", e) client = WorkerWSClient( server_url=args.server, @@ -96,7 +93,6 @@ async def run(args): tunnel_enabled = config.TUNNEL_ENABLED and not args.no_tunnel tunnel_client = None if tunnel_enabled: - # 从命令行 --server 参数中提取云服务器 host(而非配置文件默认值) tunnel_host = _extract_host_from_ws_url(args.server) tunnel_client = TunnelClient( server_host=tunnel_host, @@ -106,13 +102,17 @@ async def run(args): local_port=_local_port_from_bit_api(args.bit_api), ) logger.info( - "隧道已启用: 暴露本地 %s -> %s (worker_id=%s)", - _local_port_from_bit_api(args.bit_api), tunnel_host, args.worker_id, + "Tunnel enabled: expose local %s -> %s (worker_id=%s)", + _local_port_from_bit_api(args.bit_api), + tunnel_host, + args.worker_id, ) logger.info( - "Worker 启动: id=%s, name=%s, server=%s", - args.worker_id, args.worker_name, args.server, + "Worker startup: id=%s, name=%s, server=%s", + args.worker_id, + args.worker_name, + args.server, ) async def run_worker(): @@ -131,7 +131,7 @@ async def run(args): else: await worker_task except KeyboardInterrupt: - logger.info("收到中断信号,正在退出...") + logger.info("Keyboard interrupt received, stopping...") worker_task.cancel() if tunnel_task is not None: tunnel_task.cancel() @@ -153,8 +153,9 @@ def main(): try: asyncio.run(run(args)) except KeyboardInterrupt: - logger.info("Worker 已退出") + logger.info("Worker exited") if __name__ == "__main__": main() + diff --git a/worker/recruit_filter_sync.py b/worker/recruit_filter_sync.py new file mode 100644 index 0000000..eef5226 --- /dev/null +++ b/worker/recruit_filter_sync.py @@ -0,0 +1,239 @@ +# -*- coding: utf-8 -*- +""" +Recruit filter snapshot sync: +1) Open an existing browser profile from DB. +2) Fetch recommend filter options from zhipin. +3) Persist to DB for frontend dropdown usage. +""" +from __future__ import annotations + +import json +import logging +from typing import Any, Dict, List, Optional + +from worker.bit_browser import BitBrowserAPI +from worker.browser_control import connect_browser + + +FILTER_API = "wapi/zpblock/recommend/filters" +RECOMMEND_URL = "https://www.zhipin.com/web/chat/recommend" + + +def _packet_body(packet) -> dict: + if not packet: + return {} + response = getattr(packet, "response", None) + body = getattr(response, "body", None) if response is not None else None + if isinstance(body, dict): + return body + if isinstance(body, str): + try: + parsed = json.loads(body) + return parsed if isinstance(parsed, dict) else {} + except Exception: + return {} + return {} + + +def _dedupe(items: List[str]) -> List[str]: + seen = set() + result: List[str] = [] + for item in items: + value = str(item or "").strip() + if not value or value in seen: + continue + seen.add(value) + result.append(value) + return result + + +def _parse_groups(raw_filters: List[dict]) -> tuple[list[dict], list[str]]: + groups: List[dict] = [] + flat_options: List[str] = [] + + for idx, item in enumerate(raw_filters or []): + if not isinstance(item, dict): + continue + name = str(item.get("name", "")).strip() + if not name: + continue + + options: List[str] = [] + raw_options = item.get("options") + if isinstance(raw_options, list): + for opt in raw_options: + if not isinstance(opt, dict): + continue + opt_name = str(opt.get("name", "")).strip() + if opt_name: + options.append(opt_name) + options = _dedupe(options) + + group: Dict[str, Any] = { + "name": name, + "order": idx, + "options": options, + } + + start = item.get("start") + end = item.get("end") + try: + if start is not None and end is not None: + group["range"] = {"start": int(start), "end": int(end)} + except Exception: + pass + + groups.append(group) + flat_options.extend(options) + + return groups, _dedupe(flat_options) + + +def fetch_recruit_filters_from_site( + *, + bit_api_base: str, + account_name: str, + browser_id: str = "", + timeout_sec: int = 30, + logger: Optional[logging.Logger] = None, +) -> Dict[str, Any]: + """Fetch recruit filters from target site using one BitBrowser profile.""" + log = logger or logging.getLogger("worker.recruit_filter_sync") + bit_api = BitBrowserAPI(bit_api_base) + + resolved_browser_id = (browser_id or "").strip() + if resolved_browser_id.startswith("name:"): + resolved_browser_id = "" + + _, port = bit_api.get_browser_for_drission( + browser_id=resolved_browser_id or None, + name=(account_name or "").strip() or None, + ) + browser = connect_browser(port=port) + tab = browser.latest_tab + + tab.listen.start(FILTER_API) + tab.get(RECOMMEND_URL) + packet = tab.listen.wait(timeout=timeout_sec) + body = _packet_body(packet) + zp_data = body.get("zpData", {}) if isinstance(body, dict) else {} + vip_filter = zp_data.get("vipFilter", {}) if isinstance(zp_data, dict) else {} + raw_filters = vip_filter.get("filters", []) if isinstance(vip_filter, dict) else [] + if not isinstance(raw_filters, list): + raw_filters = [] + + groups, flat_options = _parse_groups(raw_filters) + log.info( + "Fetched recruit filters: account=%s groups=%d options=%d", + account_name, + len(groups), + len(flat_options), + ) + return { + "groups": groups, + "flat_options": flat_options, + "raw_payload": {"filters": raw_filters}, + } + + +def save_recruit_filter_snapshot( + *, + worker_id: str, + account_name: str, + browser_id: str = "", + groups: List[dict], + flat_options: List[str], + raw_payload: Dict[str, Any], +) -> None: + from server.models import RecruitFilterSnapshot + + RecruitFilterSnapshot.objects.update_or_create( + worker_id=(worker_id or "").strip(), + account_name=(account_name or "").strip(), + defaults={ + "browser_id": (browser_id or "").strip(), + "groups": groups or [], + "flat_options": flat_options or [], + "raw_payload": raw_payload or {}, + }, + ) + + +def sync_recruit_filters_for_account( + *, + worker_id: str, + bit_api_base: str, + account_name: str, + browser_id: str = "", + logger: Optional[logging.Logger] = None, +) -> Dict[str, Any]: + """Fetch and persist recruit filters for one account.""" + payload = fetch_recruit_filters_from_site( + bit_api_base=bit_api_base, + account_name=account_name, + browser_id=browser_id, + logger=logger, + ) + save_recruit_filter_snapshot( + worker_id=worker_id, + account_name=account_name, + browser_id=browser_id, + groups=payload["groups"], + flat_options=payload["flat_options"], + raw_payload=payload["raw_payload"], + ) + return payload + + +def bootstrap_recruit_filter_snapshot( + *, + worker_id: str, + bit_api_base: str, + logger: Optional[logging.Logger] = None, +) -> Optional[Dict[str, Any]]: + """ + On worker startup, pick one existing DB account for this worker + and synchronize recruit filters. + """ + log = logger or logging.getLogger("worker.recruit_filter_sync") + from server.models import BossAccount + + accounts = ( + BossAccount.objects + .filter(worker_id=worker_id) + .exclude(browser_name="") + .order_by("-is_logged_in", "-updated_at") + ) + if not accounts.exists(): + log.info("Skip recruit filter bootstrap: no account found for worker=%s", worker_id) + return None + + last_error = None + for account in accounts: + try: + payload = sync_recruit_filters_for_account( + worker_id=worker_id, + bit_api_base=bit_api_base, + account_name=account.browser_name, + browser_id=account.browser_id or "", + logger=log, + ) + return { + "worker_id": worker_id, + "account_name": account.browser_name, + "groups": len(payload.get("groups", [])), + "flat_options": len(payload.get("flat_options", [])), + } + except Exception as e: + last_error = e + log.warning( + "Recruit filter bootstrap failed for account=%s worker=%s: %s", + account.browser_name, + worker_id, + e, + ) + + if last_error: + log.warning("Recruit filter bootstrap skipped: all accounts failed for worker=%s", worker_id) + return None + diff --git a/worker/tasks/boss_recruit.py b/worker/tasks/boss_recruit.py index c993fee..a1c2719 100644 --- a/worker/tasks/boss_recruit.py +++ b/worker/tasks/boss_recruit.py @@ -1,43 +1,45 @@ # -*- coding: utf-8 -*- """ -BOSS 直聘招聘任务处理器。 -招聘流程与本地脚本 boss_dp/自动化.py 的 main 保持一致: -1) 监听并获取 friendList -2) 逐个点开会话并监听历史消息 -3) 若历史消息中未出现“手机号/微信号”,则发送询问并执行“换微信” +BOSS recruit task handler (new greeting flow). +This flow is aligned with `1.py -> main`: +1) open recommend page +2) iterate positions +3) apply selected filters +4) greet candidates until target count is reached """ from __future__ import annotations import asyncio import json import random -import re import time -from datetime import datetime, timedelta -from typing import Any, Callable, Coroutine, Dict, List, Optional +from datetime import datetime +from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple from common.protocol import TaskType from worker.bit_browser import BitBrowserAPI -from worker.browser_control import connect_browser +from worker.browser_control import connect_browser, human_delay, safe_click from worker.tasks.base import BaseTaskHandler, TaskCancelledError -CHAT_INDEX_URL = "https://www.zhipin.com/web/chat/index" -FRIEND_LIST_API = "wapi/zprelation/friend/getBossFriendListV2" -HISTORY_API = "wapi/zpchat/boss/historyMsg" -ASK_WECHAT_TEXT = "后续沟通会更及时,您方便留一下您的微信号吗?我这边加您。" -CONTACT_KEYWORDS = ("手机号", "微信号") -EXCHANGE_CONFIRM_XPATH = ( - "x://span[contains(text(),'确定与对方交换微信吗?')]/../div[@class='btn-box']/" - "span[contains(@class,'boss-btn-primary')]" -) -EXCHANGE_CONFIRM_XPATH_ASCII = ( - "x://span[contains(text(),'确定与对方交换微信吗?')]/../div[@class='btn-box']/" - "span[contains(@class,'boss-btn-primary')]" -) + +RECOMMEND_URL = "https://www.zhipin.com/web/chat/recommend" +GEEK_LIST_API = "wapi/zpjob/rec/geek/list" +FAST_REPLY_TEXT = "您好,我们目前有相关岗位机会,方便了解一下吗?" + +# Keep selectors consistent with 1.py logic. +JOB_LIST_SELECTORS = [ + "x://*[contains(@class,'job-item') or contains(@class,'position-item')]", + "x://li[contains(@class,'job')]", + "x://div[contains(@class,'job-list')]/div", + "x://ul[contains(@class,'job')]/li", + "x://*[contains(@class,'recommend-job')]//*[contains(@class,'item')]", + "x://*[contains(@class,'job-list')]/*", + "x://a[contains(@href,'job')]", +] class BossRecruitHandler(BaseTaskHandler): - """BOSS 直聘招聘自动化任务。""" + """Recruit by greeting candidates from recommend list.""" task_type = TaskType.BOSS_RECRUIT.value @@ -47,913 +49,457 @@ class BossRecruitHandler(BaseTaskHandler): params: Dict[str, Any], progress_cb: Callable[[str, str], Coroutine], ) -> Any: - """ - 执行 BOSS 招聘流程。 - - params: - - job_title: str 招聘岗位名称(用于结果展示) - - account_name: str 比特浏览器窗口名(用于打开浏览器) - - account_id: str 比特浏览器窗口 ID(可选,优先级高于 name) - - bit_api_base: str 比特浏览器 API 地址(可选) - """ - job_title = params.get("job_title", "相关岗位") - account_name = params.get("account_name", "") - account_id = params.get("account_id", "") - bit_api_base = params.get("bit_api_base", "http://127.0.0.1:54345") + job_title = str(params.get("job_title", "") or "").strip() or "相关岗位" + account_name = str(params.get("account_name", "") or "").strip() + account_id = str(params.get("account_id", "") or "").strip() + bit_api_base = str(params.get("bit_api_base", "http://127.0.0.1:54345") or "").strip() cancel_event = params.get("_cancel_event") + worker_id = str(params.get("worker_id", "") or "").strip() + + selected_filters = self._normalize_selected_filters(params) + if not selected_filters: + raise ValueError("招聘前必须先选择至少一个筛选条件") + + greet_target = self._parse_positive_int(params.get("greet_target"), default=20) + position_names = self._normalize_string_list(params.get("position_names")) self.ensure_not_cancelled(cancel_event) - await progress_cb(task_id, "正在打开比特浏览器...") + await progress_cb(task_id, f"正在启动招聘流程,目标打招呼人数: {greet_target}") - result = await asyncio.get_event_loop().run_in_executor( + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( None, self._run_sync, - task_id, job_title, account_name, account_id, bit_api_base, - progress_cb, + selected_filters, + position_names, + greet_target, + worker_id, cancel_event, ) return result def _run_sync( self, - task_id: str, job_title: str, account_name: str, account_id: str, bit_api_base: str, - progress_cb: Callable, + selected_filters: List[str], + position_names: List[str], + greet_target: int, + worker_id: str, cancel_event, - ) -> dict: - """同步执行浏览器自动化(在线程池中运行)。""" - _ = (task_id, progress_cb) - + ) -> Dict[str, Any]: self.ensure_not_cancelled(cancel_event) + bit_api = BitBrowserAPI(bit_api_base) - addr, port = bit_api.get_browser_for_drission( + _, port = bit_api.get_browser_for_drission( browser_id=account_id or None, name=account_name or None, ) - self.logger.info("已打开浏览器, CDP: %s (port=%d)", addr, port) - browser = connect_browser(port=port) - tab = browser.latest_tab + page = browser.latest_tab - flow_result = self._recruit_flow_like_script(tab, job_title, cancel_event) - collected = flow_result["details"] - errors = flow_result["errors"] - - wechat_set = {str(c.get("wechat", "")).strip() for c in collected if str(c.get("wechat", "")).strip()} - phone_set = {str(c.get("phone", "")).strip() for c in collected if str(c.get("phone", "")).strip()} - - has_errors = bool(errors) - result = { + flow = self._run_main_like_flow( + page=page, + job_title=job_title, + selected_filters=selected_filters, + position_names=position_names, + greet_target=greet_target, + worker_id=worker_id, + account_name=account_name, + cancel_event=cancel_event, + ) + errors = flow.get("errors", []) + return { "job_title": job_title, - "total_processed": len(collected), - "wechat_collected": len(wechat_set), - "phone_collected": len(phone_set), - "details": collected, + "target_greet_count": greet_target, + "actual_greeted": flow.get("actual_greeted", 0), + "rounds": flow.get("rounds", 0), + "selected_filters": selected_filters, + "positions": flow.get("positions", []), + "details": flow.get("details", []), + "contact_records_created": flow.get("contact_records_created", 0), "error_count": len(errors), "errors": errors[:20], - "success": not has_errors, + "success": len(errors) == 0, } - if has_errors: - result["error"] = f"招聘流程出现 {len(errors)} 处错误" - return result - - def _recruit_flow_like_script(self, tab, job_title: str, cancel_event) -> dict: - """按 boss_dp/自动化.py 的 main 流程执行。""" - collected: List[dict] = [] + def _run_main_like_flow( + self, + *, + page, + job_title: str, + selected_filters: List[str], + position_names: List[str], + greet_target: int, + worker_id: str, + account_name: str, + cancel_event, + ) -> Dict[str, Any]: + details: List[dict] = [] errors: List[str] = [] self.ensure_not_cancelled(cancel_event) - try: - friend_list = self._open_chat_and_fetch_friend_list(tab) - except Exception as e: - err = f"获取 friendList 失败: {e}" - self.logger.error(err) - return {"details": collected, "errors": [err]} + page.listen.start(GEEK_LIST_API) + page.get(RECOMMEND_URL) + human_delay(1.2, 2.4) - if not friend_list: - return {"details": collected, "errors": ["未拿到 friendList"]} + container = self._get_container(page) + positions = self._build_positions(container, position_names) + if not positions: + return { + "actual_greeted": 0, + "rounds": 0, + "positions": [], + "details": [], + "contact_records_created": 0, + "errors": ["未识别到岗位列表,无法开始招聘"], + } - # 应用筛选条件 - friend_list = self._apply_filters(friend_list) - total = len(friend_list) + greeted_keys: set[str] = set() + total_greeted = 0 + round_num = 0 + contact_records_created = 0 - self.logger.info("friendList 筛选后=%d,本次处理=%d", len(friend_list), total) + while True: + self.ensure_not_cancelled(cancel_event) + round_num += 1 + round_added = 0 - for i, friend in enumerate(friend_list[:total], start=1): - try: + for pos_type, pos_value in positions: self.ensure_not_cancelled(cancel_event) - name = str(friend.get("name", "")).strip() or f"候选人{i}" - friend_job_name = str(friend.get("jobName", "")).strip() - friend_job_id = str(friend.get("jobId", "")).strip() + if total_greeted >= greet_target: + break - tab.listen.start(HISTORY_API) - if not self._open_friend_chat_like_script(tab, name): - errors.append(f"[{name}] 未找到联系人或点击失败") + container = self._get_container(page) + label = self._activate_position(page, container, pos_type, pos_value) + if not label: continue - messages = self._wait_history_messages(tab) - self.ensure_not_cancelled(cancel_event) - - # 过滤掉自己发送的消息 - filtered_messages = self._filter_my_messages(messages) - has_contact_keyword = self._has_contact_keyword(filtered_messages) + human_delay(1.0, 2.1) + container = self._get_container(page) - action_state = { - "asked_wechat": False, - "send_success": False, - "exchange_clicked": False, - "exchange_confirmed": False, - } - if not has_contact_keyword: - self.ensure_not_cancelled(cancel_event) - action_state = self._ask_and_exchange_wechat_like_script(tab) - - # 先保存联系人记录(如果有的话) - temp_contact_id = None - if contacts.get("wechat") or contacts.get("phone"): - temp_contact_id = self._save_contact_record(name, friend_job_name, contacts, action_state) - - # 发送后等待对方回复,进行复聊管理 - if action_state["send_success"]: - self.ensure_not_cancelled(cancel_event) - reply_result = self._handle_follow_up_chat(tab, name, friend_job_name, temp_contact_id) - action_state.update(reply_result) - - # 如果复聊中提取到了新的联系方式,更新联系人记录 - if reply_result.get("extracted_contact_from_reply"): - panel_texts = self._collect_chat_panel_texts(tab) - new_contacts = self._extract_contacts(filtered_messages, extra_texts=panel_texts) - if new_contacts.get("wechat") or new_contacts.get("phone"): - contacts.update(new_contacts) - contact_written = True + packet = self._load_geek_list_packet(page, container, selected_filters) + geek_list = self._extract_geek_list(packet) + if not geek_list: + continue - panel_texts = self._collect_chat_panel_texts(tab) - contacts = self._extract_contacts(filtered_messages, extra_texts=panel_texts) - contact_written = bool(contacts["wechat"] or contacts["phone"]) - if has_contact_keyword and not contact_written: - self.logger.warning( - "[%s] 历史消息含联系方式关键词,但未提取到有效联系方式,疑似识别失败", - name, - ) - - # 保存联系人记录到数据库,获取contact_id用于复聊 - contact_id = None - if contact_written: - contact_id = self._save_contact_record(name, friend_job_name, contacts, action_state) - - collected.append( - { - "name": name, - "job": friend_job_name or job_title, - "job_id": friend_job_id, - "wechat": contacts["wechat"], - "phone": contacts["phone"], - "contact_written": contact_written, - "has_contact_keyword": has_contact_keyword, - **action_state, - } + added, new_records = self._greet_geek_list( + page=page, + container=container, + geek_list=geek_list, + greeted_keys=greeted_keys, + position_label=label, + default_job=job_title, + cancel_event=cancel_event, ) - except TaskCancelledError: - self.logger.info("任务执行中收到取消信号,提前结束") + if not added: + continue + + total_greeted += added + round_added += added + details.extend(new_records) + contact_records_created += self._save_new_greet_contacts( + records=new_records, + worker_id=worker_id, + account_name=account_name, + ) + + if total_greeted >= greet_target: break - except Exception as e: - err_msg = f"处理第 {i} 个会话出错: {e}" - self.logger.error(err_msg) - errors.append(err_msg) - finally: - if i < total: - self.ensure_not_cancelled(cancel_event) - self._sleep_between_sessions() + if round_added == 0: + break + human_delay(0.8, 1.6) - self.ensure_not_cancelled(cancel_event) - return {"details": collected, "errors": errors} - - def _open_chat_and_fetch_friend_list(self, tab) -> list: - tab.listen.start(FRIEND_LIST_API) - tab.get(CHAT_INDEX_URL) - packet = tab.listen.wait() - body = self._packet_body(packet) - zp_data = body.get("zpData", {}) if isinstance(body, dict) else {} - friend_list = zp_data.get("friendList", []) if isinstance(zp_data, dict) else [] - if isinstance(friend_list, list): - return friend_list - return [] - - @staticmethod - def _xpath_literal(value: str) -> str: - if "'" not in value: - return f"'{value}'" - if '"' not in value: - return f'"{value}"' - parts = value.split("'") - return "concat(" + ", \"'\", ".join(f"'{part}'" for part in parts) + ")" - - def _open_friend_chat_like_script(self, tab, name: str) -> bool: - name_selector = f"x://span[text()={self._xpath_literal(name)}]" - ele = tab.ele(name_selector, timeout=2) - if not ele: - return False - - try: - ele.run_js("this.scrollIntoView({block: 'center', behavior: 'auto'})") - except Exception: - pass - - time.sleep(0.8) - - try: - clickable = tab.ele(name_selector, timeout=2) - if not clickable: - return False - clickable.click(by_js=True) - return True - except Exception: - return False - - def _wait_history_messages(self, tab) -> list: - packet = tab.listen.wait() - body = self._packet_body(packet) - zp_data = body.get("zpData", {}) if isinstance(body, dict) else {} - messages = zp_data.get("messages", []) if isinstance(zp_data, dict) else [] - if isinstance(messages, list): - return messages - return [] - - @staticmethod - def _sleep_between_sessions() -> None: - """会话间随机停顿,降低频繁切换带来的风控风险。""" - time.sleep(random.uniform(1.8, 4.2)) - - def _ask_and_exchange_wechat_like_script(self, tab) -> dict: - state = { - "asked_wechat": False, - "send_success": False, - "exchange_clicked": False, - "exchange_confirmed": False, + return { + "actual_greeted": total_greeted, + "rounds": round_num, + "positions": [self._position_label(t, v) for t, v in positions], + "details": details, + "contact_records_created": contact_records_created, + "errors": errors, } - input_box = tab.ele('x://*[@id="boss-chat-editor-input"]', timeout=2) - if not input_box: - return state - - try: - input_box.click(by_js=True) - except Exception: - pass - - try: - input_box.clear() - except Exception: - pass - - input_box.input(ASK_WECHAT_TEXT) - state["asked_wechat"] = True - - time.sleep(random.randint(1, 3) + random.random()) - state["send_success"] = self._send_with_confirm( - tab, - input_box=input_box, - message=ASK_WECHAT_TEXT, - ) - - time.sleep(random.randint(1, 5) + random.random()) - state["exchange_clicked"] = self._click_change_wechat_like_script(tab) - - if state["exchange_clicked"]: - time.sleep(random.randint(1, 2) + random.random()) - state["exchange_confirmed"] = self._click_exchange_confirm_like_script(tab) - - return state - - def _click_send_like_script(self, tab, input_box=None) -> bool: - selectors = ( - 'x://div[text()="发送"]', - 'x://span[text()="发送"]', - 'x://button[normalize-space(.)="发送"]', - 'x://*[@role="button" and normalize-space(.)="发送"]', - ) - - for _ in range(3): - for selector in selectors: - try: - btn = tab.ele(selector, timeout=1) - if not btn: - continue - try: - btn.click(by_js=True) - except Exception: - btn.click() - time.sleep(0.25) - return True - except Exception: - continue - time.sleep(0.25) - - if input_box: - try: - input_box.input("\n") - time.sleep(0.25) - return True - except Exception: - pass - - return False - - def _send_with_confirm(self, tab, input_box, message: str, max_attempts: int = 2) -> bool: - """发送后检查末条消息,避免因确认误判导致重复补发。""" - msg = (message or "").strip() - if not msg: - return False - - for _ in range(max_attempts): - clicked = self._click_send_like_script(tab, input_box=input_box) - if not clicked: - continue - if self._confirm_last_sent_message(tab, msg): - return True - - # 已点击发送但未在列表中确认时,不直接补发,先判断输入框是否已清空。 - # 若已清空,通常表示消息已发出,只是 UI 刷新慢或识别未命中,避免重复发送。 - if not self._editor_has_text(input_box, msg): - return True - time.sleep(0.35) - - sent = self._confirm_last_sent_message(tab, msg) - if sent: - return True - - self._clear_editor(input_box) - return False - - @staticmethod - def _normalize_text(text: str) -> str: - return re.sub(r"\s+", "", text or "") - - def _editor_has_text(self, input_box, expected: str = "") -> bool: - """判断输入框是否仍残留指定文本。""" - expected_norm = self._normalize_text(expected) - current = "" - try: - current = input_box.run_js("return (this.innerText || this.textContent || this.value || '').trim();") - except Exception: - try: - current = input_box.text - except Exception: - current = "" - current_norm = self._normalize_text(str(current)) - if not current_norm: - return False - if not expected_norm: - return True - return expected_norm in current_norm - - def _clear_editor(self, input_box) -> None: - """清空聊天输入框,避免残留预输入内容。""" - try: - input_box.click(by_js=True) - except Exception: - pass - try: - input_box.clear() - except Exception: - pass - try: - input_box.run_js( - "if (this.isContentEditable) { this.innerHTML=''; this.textContent=''; }" - ) - except Exception: - pass - - def _confirm_last_sent_message(self, tab, message: str) -> bool: - """确认当前聊天窗口末条是否为刚发送内容。""" - msg = (message or "").strip() - if not msg: - return False - - target = re.sub(r"\s+", "", msg) - - try: - for _ in range(6): - items = tab.eles("css:.message-item", timeout=1) - if not items: - time.sleep(0.2) - continue - - for e in reversed(items[-6:]): - text = (e.text or "").strip() - if not text: - continue - - normalized = re.sub(r"\s+", "", text) - is_boss = False - try: - if e.ele("css:.item-boss", timeout=0): - is_boss = True - except Exception: - pass - - if is_boss and target in normalized: - return True - - time.sleep(0.2) - except Exception: - return False - - return False - - def _click_change_wechat_like_script(self, tab) -> bool: - try: - btn = tab.ele('x://span[text()="换微信"]', timeout=1) - if not btn: - return False - btn.click() - return True - except Exception: - return False - - def _click_exchange_confirm_like_script(self, tab) -> bool: - try: - confirm = tab.ele(EXCHANGE_CONFIRM_XPATH, timeout=2) - if not confirm: - confirm = tab.ele(EXCHANGE_CONFIRM_XPATH_ASCII, timeout=2) - if not confirm: - return False - confirm.click(by_js=True) - return True - except Exception: - return False - @staticmethod def _packet_body(packet) -> dict: if not packet: return {} - response = getattr(packet, "response", None) body = getattr(response, "body", None) if response is not None else None - if isinstance(body, dict): return body - if isinstance(body, str): try: parsed = json.loads(body) - if isinstance(parsed, dict): - return parsed + return parsed if isinstance(parsed, dict) else {} except Exception: return {} - return {} - def _history_texts(self, messages: list) -> list[str]: - texts: list[str] = [] - for msg in messages: - if not isinstance(msg, dict): + @staticmethod + def _normalize_string_list(value: Any) -> List[str]: + if value is None: + return [] + if isinstance(value, list): + return [str(v).strip() for v in value if str(v).strip()] + if isinstance(value, str): + raw = value.strip() + if not raw: + return [] + if raw.startswith("[") and raw.endswith("]"): + try: + parsed = json.loads(raw) + if isinstance(parsed, list): + return [str(v).strip() for v in parsed if str(v).strip()] + except Exception: + pass + return [raw] + return [] + + def _normalize_selected_filters(self, params: Dict[str, Any]) -> List[str]: + filters = params.get("selected_filters") + if filters is None: + filters = params.get("filters") + return self._normalize_string_list(filters) + + @staticmethod + def _parse_positive_int(value: Any, default: int) -> int: + try: + parsed = int(value) + return parsed if parsed > 0 else default + except Exception: + return default + + @staticmethod + def _get_container(page): + try: + return page.get_frame("recommendFrame") + except Exception: + return page + + def _get_all_position_elements(self, container) -> List[Any]: + for selector in JOB_LIST_SELECTORS: + try: + eles = container.eles(selector, timeout=2) + if eles and 1 <= len(eles) <= 100: + return eles + except Exception: + continue + return [] + + def _build_positions(self, container, position_names: List[str]) -> List[Tuple[str, Any]]: + if position_names: + return [("name", name) for name in position_names] + job_eles = self._get_all_position_elements(container) + if not job_eles: + return [("current", None)] + return [("index", i) for i in range(len(job_eles))] + + def _position_label(self, pos_type: str, pos_value: Any) -> str: + if pos_type == "name": + return str(pos_value) + if pos_type == "index": + return f"岗位{int(pos_value) + 1}" + return "当前岗位" + + def _activate_position(self, page, container, pos_type: str, pos_value: Any) -> str: + label = self._position_label(pos_type, pos_value) + try: + if pos_type == "name": + ele = container.ele(f'x://*[contains(text(),"{pos_value}")]', timeout=5) + if not ele: + return "" + if not safe_click(page, ele): + return "" + elif pos_type == "index": + job_eles = self._get_all_position_elements(container) + if int(pos_value) >= len(job_eles): + return "" + if not safe_click(page, job_eles[int(pos_value)]): + return "" + return label + except Exception: + return "" + + def _apply_filter_and_confirm(self, page, container, selected_filters: List[str]) -> None: + trigger = container.ele("x://*[contains(text(),'筛选')]", timeout=3) + if not trigger: + raise RuntimeError("未找到筛选按钮") + if not safe_click(page, trigger): + raise RuntimeError("点击筛选按钮失败") + human_delay(1.3, 2.0) + + for item in selected_filters: + option = container.ele(f"x://*[contains(text(),'{item}')]", timeout=2) + if not option: + self.logger.warning("筛选项未命中: %s", item) + continue + safe_click(page, option) + human_delay(0.35, 0.9) + + confirm = container.ele("x://*[contains(text(),'确定')]", timeout=3) + if not confirm: + raise RuntimeError("未找到筛选确认按钮") + if not safe_click(page, confirm): + raise RuntimeError("点击筛选确认失败") + + def _load_geek_list_packet(self, page, container, selected_filters: List[str]): + if selected_filters: + self._apply_filter_and_confirm(page, container, selected_filters) + packets = page.listen.wait(count=2, timeout=30) + if isinstance(packets, list) and packets: + return packets[-1] + return packets + return page.listen.wait(timeout=30) + + def _extract_geek_list(self, packet) -> List[dict]: + body = self._packet_body(packet) + zp_data = body.get("zpData", {}) if isinstance(body, dict) else {} + geek_list = zp_data.get("geekList", []) if isinstance(zp_data, dict) else [] + return geek_list if isinstance(geek_list, list) else [] + + @staticmethod + def _geek_key(item: dict) -> str: + card = item.get("geekCard") or {} + return str( + card.get("encryptGeekId") + or card.get("geekId") + or card.get("geekName") + or "" + ).strip() + + def _greet_geek_list( + self, + *, + page, + container, + geek_list: List[dict], + greeted_keys: set[str], + position_label: str, + default_job: str, + cancel_event, + ) -> Tuple[int, List[dict]]: + added = 0 + records: List[dict] = [] + for item in geek_list or []: + self.ensure_not_cancelled(cancel_event) + geek_key = self._geek_key(item) + if not geek_key or geek_key in greeted_keys: + continue + if not self._greet_one_geek(page, container, item): continue - body = msg.get("body") - text = "" - if isinstance(body, dict): - maybe_text = body.get("text") - if isinstance(maybe_text, str): - text = maybe_text - elif isinstance(msg.get("text"), str): - text = str(msg.get("text")) + greeted_keys.add(geek_key) + added += 1 + card = item.get("geekCard") or {} + name = str(card.get("geekName", "")).strip() or geek_key + position = str(card.get("expectPositionName", "")).strip() or position_label or default_job + records.append( + { + "name": name, + "position": position, + "geek_key": geek_key, + "source": "new_greet", + "greeted_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + ) + return added, records - text = text.strip() - if text: - texts.append(text) - - return texts - - def _collect_chat_panel_texts(self, tab, max_items: int = 80) -> list[str]: - """从当前聊天面板读取可见消息文本,补充接口历史消息。""" - texts: list[str] = [] - try: - items = tab.eles("css:.message-item", timeout=1) - except Exception: - return texts - - if not items: - return texts - - for e in items[-max_items:]: - try: - text = (e.text or "").strip() - except Exception: - text = "" - if text: - texts.append(text) - return texts - - def _has_contact_keyword(self, messages: list) -> bool: - for text in self._history_texts(messages): - if any(k in text for k in CONTACT_KEYWORDS): - return True - return False - - def _extract_contacts(self, messages: list, extra_texts: Optional[list[str]] = None) -> dict: - wechats: list[str] = [] - phones: list[str] = [] - all_texts = self._history_texts(messages) - if extra_texts: - all_texts.extend([str(t).strip() for t in extra_texts if str(t).strip()]) - - for text in all_texts: - wechats.extend(self._extract_wechat(text)) - phones.extend(self._extract_phone(text)) - - wechats = list(dict.fromkeys(wechats)) - phones = list(dict.fromkeys(phones)) - - return { - "wechat": wechats[0] if wechats else "", - "phone": phones[0] if phones else "", - } - - @staticmethod - def _extract_wechat(text: str) -> list: - if not text or not text.strip(): - return [] - - found = [] - patterns = [ - r"微信号[::\s]*([a-zA-Z0-9_\-]{6,20})", - r"微信[::\s]*([a-zA-Z0-9_\-]{6,20})", - r"wx[::\s]*([a-zA-Z0-9_\-]{6,20})", - r"wechat[::\s]*([a-zA-Z0-9_\-]{6,20})", - ] - - for pattern in patterns: - for match in re.finditer(pattern, text, re.IGNORECASE): - value = match.group(1).strip() if match.lastindex else match.group(0).strip() - if value and value not in found and len(value) >= 6: - found.append(value) - - return found[:3] - - @staticmethod - def _extract_phone(text: str) -> list: - if not text or not text.strip(): - return [] - - found = [] - raw_candidates = re.findall(r"1[3-9][\d\-\s]{9,15}", text) - for raw in raw_candidates: - digits = re.sub(r"\D", "", raw) - if len(digits) == 11 and digits.startswith("1") and digits not in found: - found.append(digits) - - return found[:3] - - def _apply_filters(self, friend_list: list) -> list: - """应用筛选条件过滤候选人列表。""" - try: - from server.models import FilterConfig - - # 获取启用的筛选配置 - filter_config = FilterConfig.objects.filter(is_active=True).first() - if not filter_config: - self.logger.info("未找到启用的筛选配置,跳过筛选") - return friend_list - - filtered = [] - for friend in friend_list: - # 筛选活跃度(最后上线时间) - last_time = friend.get("lastTime", "") - if not self._check_activity(last_time, filter_config.activity): - continue - - # 从简历信息中获取年龄、学历、期望职位 - resume = friend.get("resume", {}) or {} - - # 筛选年龄 - age = resume.get("age") - if age and not (filter_config.age_min <= int(age) <= filter_config.age_max): - continue - - # 筛选学历 - education = resume.get("education", "") - if filter_config.education != "不限" and education: - if not self._check_education(education, filter_config.education): - continue - - # 筛选期望职位 - job_name = friend.get("jobName", "") - if filter_config.positions and job_name: - if not any(pos in job_name for pos in filter_config.positions): - continue - - filtered.append(friend) - - self.logger.info("筛选前: %d 人,筛选后: %d 人", len(friend_list), len(filtered)) - return filtered - - except Exception as e: - self.logger.error("应用筛选条件失败: %s,返回原列表", e) - return friend_list - - def _check_activity(self, last_time: str, activity_filter: str) -> bool: - """检查活跃度是否符合要求。""" - if activity_filter == "不限": - return True - - try: - # 解析时间字符串 - now = datetime.now() - - if "昨天" in last_time: - last_active = now - timedelta(days=1) - elif "今天" in last_time or "刚刚" in last_time: - last_active = now - elif "月" in last_time and "日" in last_time: - # 格式如 "03月03日" - match = re.search(r"(\d+)月(\d+)日", last_time) - if match: - month = int(match.group(1)) - day = int(match.group(2)) - year = now.year - # 如果月份大于当前月份,说明是去年的 - if month > now.month: - year -= 1 - last_active = datetime(year, month, day) - else: - return True - else: - return True - - # 计算天数差 - days_diff = (now - last_active).days - - # 根据筛选条件判断 - if activity_filter == "今天活跃": - return days_diff == 0 - elif activity_filter == "3天内活跃": - return days_diff <= 3 - elif activity_filter == "本周活跃": - return days_diff <= 7 - elif activity_filter == "本月活跃": - return days_diff <= 30 - - return True - - except Exception as e: - self.logger.warning("解析活跃度时间失败: %s, last_time=%s", e, last_time) - return True - - @staticmethod - def _check_education(candidate_edu: str, required_edu: str) -> bool: - """检查学历是否符合要求。""" - edu_levels = ["初中", "高中", "中专", "大专", "本科", "硕士", "博士"] - - try: - candidate_level = next((i for i, edu in enumerate(edu_levels) if edu in candidate_edu), -1) - required_level = next((i for i, edu in enumerate(edu_levels) if edu in required_edu), -1) - - if candidate_level == -1 or required_level == -1: - return True - - return candidate_level >= required_level - except Exception: - return True - - def _filter_my_messages(self, messages: list) -> list: - """过滤掉自己发送的消息,只保留对方的消息。""" - filtered = [] - for msg in messages: - if not isinstance(msg, dict): - continue - - # from_id 为 0 表示是对方发送的消息 - from_id = msg.get("fromId", 0) - if from_id == 0: - filtered.append(msg) - - return filtered - - def _handle_follow_up_chat(self, tab, name: str, job_name: str, contact_id: int = None) -> dict: - """处理复聊管理,根据配置发送多轮话术。""" - result = { - "follow_up_attempted": False, - "got_reply": False, - "extracted_contact_from_reply": False, - } - - if not contact_id: - return result - - try: - from server.models import FollowUpConfig, FollowUpScript, FollowUpRecord - from django.utils import timezone - - # 获取该岗位的复聊配置 - config = FollowUpConfig.objects.filter( - position=job_name, - is_active=True - ).first() - - if not config: - # 尝试获取通用配置 - config = FollowUpConfig.objects.filter( - position="通用", - is_active=True - ).first() - - if not config: - self.logger.info("[%s] 未找到复聊配置,跳过复聊", name) - return result - - # 获取该联系人的复聊记录 - last_record = FollowUpRecord.objects.filter( - contact_id=contact_id, - config_id=config.id - ).order_by('-sent_at').first() - - # 确定当前是第几天 - if not last_record: - # 第一次复聊 - day_number = 1 - else: - # 计算距离上次发送的时间 - hours_since_last = (timezone.now() - last_record.sent_at).total_seconds() / 3600 - - # 获取上次使用的话术的间隔时间 - last_script = FollowUpScript.objects.filter(id=last_record.script_id).first() - if last_script and hours_since_last < last_script.interval_hours: - self.logger.info("[%s] 距离上次复聊不足 %d 小时,跳过", name, last_script.interval_hours) - return result - - # 下一天 - day_number = last_record.day_number + 1 - - # 获取该天的话术 - script = FollowUpScript.objects.filter( - config_id=config.id, - day_number=day_number, - is_active=True - ).order_by('order').first() - - # 如果没有该天的话术,尝试获取"往后一直"的话术(day_number=0) - if not script: - script = FollowUpScript.objects.filter( - config_id=config.id, - day_number=0, - is_active=True - ).order_by('order').first() - - if not script: - self.logger.info("[%s] 未找到第 %d 天的复聊话术", name, day_number) - return result - - # 发送话术 - result["follow_up_attempted"] = True - send_success = self._send_message(tab, script.content) - - if send_success: - # 记录发送 - record = FollowUpRecord.objects.create( - contact_id=contact_id, - config_id=config.id, - script_id=script.id, - day_number=day_number, - content=script.content, - ) - - # 等待回复 - reply_info = self._wait_for_reply(tab, script.content) - if reply_info["got_reply"]: - result["got_reply"] = True - result["extracted_contact_from_reply"] = reply_info["has_contact"] - - # 更新记录 - record.got_reply = True - record.reply_content = reply_info["reply_text"] - record.replied_at = timezone.now() - record.save() - - self.logger.info("[%s] 第 %d 天复聊得到回复", name, day_number) - - except Exception as e: - self.logger.error("复聊管理失败: %s", e) - - return result - - - - def _save_contact_record(self, name: str, job_name: str, contacts: dict, action_state: dict) -> int: - """保存联系人记录到数据库,返回contact_id。""" - try: - from server.models import ContactRecord - from django.utils import timezone - - contact_value = contacts.get("wechat") or contacts.get("phone") or "" - if not contact_value: - return None - - # 检查是否已存在 - existing = ContactRecord.objects.filter( - name=name, - contact=contact_value - ).first() - - if existing: - # 更新现有记录 - existing.wechat_exchanged = action_state.get("exchange_confirmed", False) - existing.reply_status = "已回复" if action_state.get("got_reply", False) else "未回复" - existing.save() - self.logger.info("更新联系人记录: %s - %s", name, contact_value) - return existing.id - else: - # 创建新记录 - record = ContactRecord.objects.create( - name=name, - position=job_name, - contact=contact_value, - reply_status="已回复" if action_state.get("got_reply", False) else "未回复", - wechat_exchanged=action_state.get("exchange_confirmed", False), - contacted_at=timezone.now(), - notes=f"自动招聘获取 - 微信: {contacts.get('wechat', '')}, 手机: {contacts.get('phone', '')}" - ) - self.logger.info("保存新联系人记录: %s - %s", name, contact_value) - return record.id - - except Exception as e: - self.logger.error("保存联系人记录失败: %s", e) - return None - def _send_message(self, tab, message: str) -> bool: - """发送消息的通用方法。""" - try: - input_box = tab.ele('x://*[@id="boss-chat-editor-input"]', timeout=2) - if not input_box: - return False - - try: - input_box.click(by_js=True) - input_box.clear() - except Exception: - pass - - input_box.input(message) - time.sleep(random.uniform(1, 2)) - - return self._send_with_confirm(tab, input_box=input_box, message=message) - - except Exception as e: - self.logger.error("发送消息失败: %s", e) + def _greet_one_geek(self, page, container, item: dict) -> bool: + geek_name = str((item.get("geekCard") or {}).get("geekName", "")).strip() + if not geek_name: return False - def _wait_for_reply(self, tab, sent_message: str, max_wait: int = 30) -> dict: - """等待对方回复并提取信息。""" - result = { - "got_reply": False, - "has_contact": False, - "reply_text": "", - } - try: - check_interval = 3 # 每3秒检查一次 - - for _ in range(max_wait // check_interval): - time.sleep(check_interval) - - # 重新获取聊天面板的消息 - panel_texts = self._collect_chat_panel_texts(tab, max_items=10) - - # 检查最后几条消息 - for text in panel_texts[-5:]: - # 过滤掉我们发送的消息(包含发送的话术内容) - if sent_message in text: - continue - - # 过滤掉包含"微信号"关键词但不是真实微信号的消息 - if "微信号" in text and not self._extract_wechat(text): - continue - - # 尝试提取联系方式 - wechats = self._extract_wechat(text) - phones = self._extract_phone(text) - - if wechats or phones: - result["got_reply"] = True - result["has_contact"] = True - result["reply_text"] = text - return result - - # 即使没有联系方式,只要有新消息也算回复 - if text and text not in [sent_message, ASK_WECHAT_TEXT]: - result["got_reply"] = True - result["reply_text"] = text - return result - - except Exception as e: - self.logger.error("等待回复失败: %s", e) - - return result + name_ele = container.ele(f'x://span[contains(text(),"{geek_name}")]', timeout=5) + if not name_ele: + name_ele = container.ele(f'x://span[text()="{geek_name}"]', timeout=2) + if not name_ele: + return False + + try: + name_ele.run_js("this.scrollIntoView()") + except Exception: + pass + if not safe_click(page, name_ele): + return False + + human_delay(2.2, 3.6) + panel = page.get_frame(1) + human_delay(0.4, 1.0) + + greet_btn = panel.ele('x://*[contains(text(),"打招呼")]', timeout=3) + if not greet_btn or not safe_click(panel, greet_btn): + return False + human_delay(0.4, 1.0) + + collect_btn = panel.ele('x://*[contains(text(),"收藏")]', timeout=2) + if collect_btn: + safe_click(panel, collect_btn) + human_delay(0.3, 0.8) + + for _ in range(3): + extra_btn = panel.ele('x://*[@class="btn-v2 btn-outline-v2"]', timeout=2) + if extra_btn: + safe_click(panel, extra_btn) + human_delay(0.3, 0.8) + + input_box = page.ele('x://*[@data-placeholder="快速回复"]', timeout=2) + if input_box: + input_box.input(FAST_REPLY_TEXT) + send_btn = page.ele('x://*[contains(text(),"发送")]', timeout=2) + if send_btn: + safe_click(page, send_btn) + human_delay(0.3, 0.8) + + close_side = page.ele('x://*[@class="iboss iboss-close"]', timeout=1) + if close_side: + safe_click(page, close_side) + human_delay(0.2, 0.6) + + close_popup = panel.ele('x://*[@class="boss-popup__close"]', timeout=1) + if close_popup: + safe_click(panel, close_popup) + human_delay(0.2, 0.6) + + return True + except Exception: + return False + + def _save_new_greet_contacts(self, *, records: List[dict], worker_id: str, account_name: str) -> int: + if not records: + return 0 + from django.utils import timezone + from server.models import ContactRecord + + created = 0 + for rec in records: + geek_key = str(rec.get("geek_key", "")).strip() + if not geek_key: + continue + contact_key = f"greet:{geek_key}" + defaults = { + "name": str(rec.get("name", "")).strip() or contact_key, + "position": str(rec.get("position", "")).strip(), + "reply_status": "新打招呼", + "wechat_exchanged": False, + "worker_id": worker_id, + "contacted_at": timezone.now(), + "notes": f"新打招呼记录; account={account_name}", + } + obj, was_created = ContactRecord.objects.update_or_create( + contact=contact_key, + defaults=defaults, + ) + if was_created: + created += 1 + elif not obj.contacted_at: + obj.contacted_at = timezone.now() + obj.save(update_fields=["contacted_at"]) + return created diff --git a/worker/tasks/boss_reply.py b/worker/tasks/boss_reply.py new file mode 100644 index 0000000..d7882c9 --- /dev/null +++ b/worker/tasks/boss_reply.py @@ -0,0 +1,961 @@ +# -*- coding: utf-8 -*- +""" +BOSS 直聘招聘任务处理器。 +招聘流程与本地脚本 boss_dp/自动化.py 的 main 保持一致: +1) 监听并获取 friendList +2) 逐个点开会话并监听历史消息 +3) 若历史消息中未出现“手机号/微信号”,则发送询问并执行“换微信” +""" +from __future__ import annotations + +import asyncio +import json +import random +import re +import time +from datetime import datetime, timedelta +from typing import Any, Callable, Coroutine, Dict, List, Optional + +from common.protocol import TaskType +from worker.bit_browser import BitBrowserAPI +from worker.browser_control import connect_browser +from worker.tasks.base import BaseTaskHandler, TaskCancelledError + +CHAT_INDEX_URL = "https://www.zhipin.com/web/chat/index" +FRIEND_LIST_API = "wapi/zprelation/friend/getBossFriendListV2" +HISTORY_API = "wapi/zpchat/boss/historyMsg" +ASK_WECHAT_TEXT = "后续沟通会更及时,您方便留一下您的微信号吗?我这边加您。" +CONTACT_KEYWORDS = ("手机号", "微信号") +EXCHANGE_CONFIRM_XPATH = ( + "x://span[contains(text(),'确定与对方交换微信吗?')]/../div[@class='btn-box']/" + "span[contains(@class,'boss-btn-primary')]" +) +EXCHANGE_CONFIRM_XPATH_ASCII = ( + "x://span[contains(text(),'确定与对方交换微信吗?')]/../div[@class='btn-box']/" + "span[contains(@class,'boss-btn-primary')]" +) + + +class BossReplyHandler(BaseTaskHandler): + """BOSS 直聘招聘自动化任务。""" + + task_type = TaskType.BOSS_REPLY.value + + async def execute( + self, + task_id: str, + params: Dict[str, Any], + progress_cb: Callable[[str, str], Coroutine], + ) -> Any: + """ + 执行 BOSS 招聘流程。 + + params: + - job_title: str 招聘岗位名称(用于结果展示) + - account_name: str 比特浏览器窗口名(用于打开浏览器) + - account_id: str 比特浏览器窗口 ID(可选,优先级高于 name) + - bit_api_base: str 比特浏览器 API 地址(可选) + """ + job_title = params.get("job_title", "相关岗位") + account_name = params.get("account_name", "") + account_id = params.get("account_id", "") + bit_api_base = params.get("bit_api_base", "http://127.0.0.1:54345") + cancel_event = params.get("_cancel_event") + + self.ensure_not_cancelled(cancel_event) + await progress_cb(task_id, "正在打开比特浏览器...") + + result = await asyncio.get_event_loop().run_in_executor( + None, + self._run_sync, + task_id, + job_title, + account_name, + account_id, + bit_api_base, + progress_cb, + cancel_event, + ) + return result + + def _run_sync( + self, + task_id: str, + job_title: str, + account_name: str, + account_id: str, + bit_api_base: str, + progress_cb: Callable, + cancel_event, + ) -> dict: + """同步执行浏览器自动化(在线程池中运行)。""" + _ = (task_id, progress_cb) + + self.ensure_not_cancelled(cancel_event) + bit_api = BitBrowserAPI(bit_api_base) + addr, port = bit_api.get_browser_for_drission( + browser_id=account_id or None, + name=account_name or None, + ) + self.logger.info("已打开浏览器, CDP: %s (port=%d)", addr, port) + + browser = connect_browser(port=port) + tab = browser.latest_tab + + flow_result = self._recruit_flow_like_script(tab, job_title, cancel_event) + collected = flow_result["details"] + errors = flow_result["errors"] + + wechat_set = {str(c.get("wechat", "")).strip() for c in collected if str(c.get("wechat", "")).strip()} + phone_set = {str(c.get("phone", "")).strip() for c in collected if str(c.get("phone", "")).strip()} + + has_errors = bool(errors) + result = { + "job_title": job_title, + "total_processed": len(collected), + "wechat_collected": len(wechat_set), + "phone_collected": len(phone_set), + "details": collected, + "error_count": len(errors), + "errors": errors[:20], + "success": not has_errors, + } + if has_errors: + result["error"] = f"招聘流程出现 {len(errors)} 处错误" + + return result + + def _recruit_flow_like_script(self, tab, job_title: str, cancel_event) -> dict: + """按 boss_dp/自动化.py 的 main 流程执行。""" + collected: List[dict] = [] + errors: List[str] = [] + + self.ensure_not_cancelled(cancel_event) + try: + friend_list = self._open_chat_and_fetch_friend_list(tab) + except Exception as e: + err = f"获取 friendList 失败: {e}" + self.logger.error(err) + return {"details": collected, "errors": [err]} + + if not friend_list: + return {"details": collected, "errors": ["未拿到 friendList"]} + + # 应用筛选条件 + friend_list = self._apply_filters(friend_list) + total = len(friend_list) + + self.logger.info("friendList 筛选后=%d,本次处理=%d", len(friend_list), total) + + for i, friend in enumerate(friend_list[:total], start=1): + try: + self.ensure_not_cancelled(cancel_event) + name = str(friend.get("name", "")).strip() or f"候选人{i}" + friend_job_name = str(friend.get("jobName", "")).strip() + friend_job_id = str(friend.get("jobId", "")).strip() + + tab.listen.start(HISTORY_API) + if not self._open_friend_chat_like_script(tab, name): + errors.append(f"[{name}] 未找到联系人或点击失败") + continue + + messages = self._wait_history_messages(tab) + self.ensure_not_cancelled(cancel_event) + + # 过滤掉自己发送的消息 + filtered_messages = self._filter_my_messages(messages) + has_contact_keyword = self._has_contact_keyword(filtered_messages) + contacts = self._extract_contacts(filtered_messages) + contact_written = bool(contacts.get("wechat") or contacts.get("phone")) + + action_state = { + "asked_wechat": False, + "send_success": False, + "exchange_clicked": False, + "exchange_confirmed": False, + } + if not has_contact_keyword: + self.ensure_not_cancelled(cancel_event) + action_state = self._ask_and_exchange_wechat_like_script(tab) + + # 先保存联系人记录(如果有的话) + temp_contact_id = None + if contacts.get("wechat") or contacts.get("phone"): + temp_contact_id = self._save_contact_record(name, friend_job_name, contacts, action_state) + + # 发送后等待对方回复,进行复聊管理 + if action_state["send_success"]: + self.ensure_not_cancelled(cancel_event) + reply_result = self._handle_follow_up_chat(tab, name, friend_job_name, temp_contact_id) + action_state.update(reply_result) + + # 如果复聊中提取到了新的联系方式,更新联系人记录 + if reply_result.get("extracted_contact_from_reply"): + panel_texts = self._collect_chat_panel_texts(tab) + new_contacts = self._extract_contacts(filtered_messages, extra_texts=panel_texts) + if new_contacts.get("wechat") or new_contacts.get("phone"): + contacts.update(new_contacts) + contact_written = True + + panel_texts = self._collect_chat_panel_texts(tab) + contacts = self._extract_contacts(filtered_messages, extra_texts=panel_texts) + contact_written = bool(contacts["wechat"] or contacts["phone"]) + if has_contact_keyword and not contact_written: + self.logger.warning( + "[%s] 历史消息含联系方式关键词,但未提取到有效联系方式,疑似识别失败", + name, + ) + + # 保存联系人记录到数据库,获取contact_id用于复聊 + contact_id = None + if contact_written: + contact_id = self._save_contact_record(name, friend_job_name, contacts, action_state) + + collected.append( + { + "name": name, + "job": friend_job_name or job_title, + "job_id": friend_job_id, + "wechat": contacts["wechat"], + "phone": contacts["phone"], + "contact_written": contact_written, + "has_contact_keyword": has_contact_keyword, + **action_state, + } + ) + except TaskCancelledError: + self.logger.info("任务执行中收到取消信号,提前结束") + break + except Exception as e: + err_msg = f"处理第 {i} 个会话出错: {e}" + self.logger.error(err_msg) + errors.append(err_msg) + finally: + if i < total: + self.ensure_not_cancelled(cancel_event) + self._sleep_between_sessions() + + self.ensure_not_cancelled(cancel_event) + return {"details": collected, "errors": errors} + + def _open_chat_and_fetch_friend_list(self, tab) -> list: + tab.listen.start(FRIEND_LIST_API) + tab.get(CHAT_INDEX_URL) + packet = tab.listen.wait() + body = self._packet_body(packet) + zp_data = body.get("zpData", {}) if isinstance(body, dict) else {} + friend_list = zp_data.get("friendList", []) if isinstance(zp_data, dict) else [] + if isinstance(friend_list, list): + return friend_list + return [] + + @staticmethod + def _xpath_literal(value: str) -> str: + if "'" not in value: + return f"'{value}'" + if '"' not in value: + return f'"{value}"' + parts = value.split("'") + return "concat(" + ", \"'\", ".join(f"'{part}'" for part in parts) + ")" + + def _open_friend_chat_like_script(self, tab, name: str) -> bool: + name_selector = f"x://span[text()={self._xpath_literal(name)}]" + ele = tab.ele(name_selector, timeout=2) + if not ele: + return False + + try: + ele.run_js("this.scrollIntoView({block: 'center', behavior: 'auto'})") + except Exception: + pass + + time.sleep(0.8) + + try: + clickable = tab.ele(name_selector, timeout=2) + if not clickable: + return False + clickable.click(by_js=True) + return True + except Exception: + return False + + def _wait_history_messages(self, tab) -> list: + packet = tab.listen.wait() + body = self._packet_body(packet) + zp_data = body.get("zpData", {}) if isinstance(body, dict) else {} + messages = zp_data.get("messages", []) if isinstance(zp_data, dict) else [] + if isinstance(messages, list): + return messages + return [] + + @staticmethod + def _sleep_between_sessions() -> None: + """会话间随机停顿,降低频繁切换带来的风控风险。""" + time.sleep(random.uniform(1.8, 4.2)) + + def _ask_and_exchange_wechat_like_script(self, tab) -> dict: + state = { + "asked_wechat": False, + "send_success": False, + "exchange_clicked": False, + "exchange_confirmed": False, + } + + input_box = tab.ele('x://*[@id="boss-chat-editor-input"]', timeout=2) + if not input_box: + return state + + try: + input_box.click(by_js=True) + except Exception: + pass + + try: + input_box.clear() + except Exception: + pass + + input_box.input(ASK_WECHAT_TEXT) + state["asked_wechat"] = True + + time.sleep(random.randint(1, 3) + random.random()) + state["send_success"] = self._send_with_confirm( + tab, + input_box=input_box, + message=ASK_WECHAT_TEXT, + ) + + time.sleep(random.randint(1, 5) + random.random()) + state["exchange_clicked"] = self._click_change_wechat_like_script(tab) + + if state["exchange_clicked"]: + time.sleep(random.randint(1, 2) + random.random()) + state["exchange_confirmed"] = self._click_exchange_confirm_like_script(tab) + + return state + + def _click_send_like_script(self, tab, input_box=None) -> bool: + selectors = ( + 'x://div[text()="发送"]', + 'x://span[text()="发送"]', + 'x://button[normalize-space(.)="发送"]', + 'x://*[@role="button" and normalize-space(.)="发送"]', + ) + + for _ in range(3): + for selector in selectors: + try: + btn = tab.ele(selector, timeout=1) + if not btn: + continue + try: + btn.click(by_js=True) + except Exception: + btn.click() + time.sleep(0.25) + return True + except Exception: + continue + time.sleep(0.25) + + if input_box: + try: + input_box.input("\n") + time.sleep(0.25) + return True + except Exception: + pass + + return False + + def _send_with_confirm(self, tab, input_box, message: str, max_attempts: int = 2) -> bool: + """发送后检查末条消息,避免因确认误判导致重复补发。""" + msg = (message or "").strip() + if not msg: + return False + + for _ in range(max_attempts): + clicked = self._click_send_like_script(tab, input_box=input_box) + if not clicked: + continue + if self._confirm_last_sent_message(tab, msg): + return True + + # 已点击发送但未在列表中确认时,不直接补发,先判断输入框是否已清空。 + # 若已清空,通常表示消息已发出,只是 UI 刷新慢或识别未命中,避免重复发送。 + if not self._editor_has_text(input_box, msg): + return True + time.sleep(0.35) + + sent = self._confirm_last_sent_message(tab, msg) + if sent: + return True + + self._clear_editor(input_box) + return False + + @staticmethod + def _normalize_text(text: str) -> str: + return re.sub(r"\s+", "", text or "") + + def _editor_has_text(self, input_box, expected: str = "") -> bool: + """判断输入框是否仍残留指定文本。""" + expected_norm = self._normalize_text(expected) + current = "" + try: + current = input_box.run_js("return (this.innerText || this.textContent || this.value || '').trim();") + except Exception: + try: + current = input_box.text + except Exception: + current = "" + current_norm = self._normalize_text(str(current)) + if not current_norm: + return False + if not expected_norm: + return True + return expected_norm in current_norm + + def _clear_editor(self, input_box) -> None: + """清空聊天输入框,避免残留预输入内容。""" + try: + input_box.click(by_js=True) + except Exception: + pass + try: + input_box.clear() + except Exception: + pass + try: + input_box.run_js( + "if (this.isContentEditable) { this.innerHTML=''; this.textContent=''; }" + ) + except Exception: + pass + + def _confirm_last_sent_message(self, tab, message: str) -> bool: + """确认当前聊天窗口末条是否为刚发送内容。""" + msg = (message or "").strip() + if not msg: + return False + + target = re.sub(r"\s+", "", msg) + + try: + for _ in range(6): + items = tab.eles("css:.message-item", timeout=1) + if not items: + time.sleep(0.2) + continue + + for e in reversed(items[-6:]): + text = (e.text or "").strip() + if not text: + continue + + normalized = re.sub(r"\s+", "", text) + is_boss = False + try: + if e.ele("css:.item-boss", timeout=0): + is_boss = True + except Exception: + pass + + if is_boss and target in normalized: + return True + + time.sleep(0.2) + except Exception: + return False + + return False + + def _click_change_wechat_like_script(self, tab) -> bool: + try: + btn = tab.ele('x://span[text()="换微信"]', timeout=1) + if not btn: + return False + btn.click() + return True + except Exception: + return False + + def _click_exchange_confirm_like_script(self, tab) -> bool: + try: + confirm = tab.ele(EXCHANGE_CONFIRM_XPATH, timeout=2) + if not confirm: + confirm = tab.ele(EXCHANGE_CONFIRM_XPATH_ASCII, timeout=2) + if not confirm: + return False + confirm.click(by_js=True) + return True + except Exception: + return False + + @staticmethod + def _packet_body(packet) -> dict: + if not packet: + return {} + + response = getattr(packet, "response", None) + body = getattr(response, "body", None) if response is not None else None + + if isinstance(body, dict): + return body + + if isinstance(body, str): + try: + parsed = json.loads(body) + if isinstance(parsed, dict): + return parsed + except Exception: + return {} + + return {} + + def _history_texts(self, messages: list) -> list[str]: + texts: list[str] = [] + for msg in messages: + if not isinstance(msg, dict): + continue + + body = msg.get("body") + text = "" + if isinstance(body, dict): + maybe_text = body.get("text") + if isinstance(maybe_text, str): + text = maybe_text + elif isinstance(msg.get("text"), str): + text = str(msg.get("text")) + + text = text.strip() + if text: + texts.append(text) + + return texts + + def _collect_chat_panel_texts(self, tab, max_items: int = 80) -> list[str]: + """从当前聊天面板读取可见消息文本,补充接口历史消息。""" + texts: list[str] = [] + try: + items = tab.eles("css:.message-item", timeout=1) + except Exception: + return texts + + if not items: + return texts + + for e in items[-max_items:]: + try: + text = (e.text or "").strip() + except Exception: + text = "" + if text: + texts.append(text) + return texts + + def _has_contact_keyword(self, messages: list) -> bool: + for text in self._history_texts(messages): + if any(k in text for k in CONTACT_KEYWORDS): + return True + return False + + def _extract_contacts(self, messages: list, extra_texts: Optional[list[str]] = None) -> dict: + wechats: list[str] = [] + phones: list[str] = [] + all_texts = self._history_texts(messages) + if extra_texts: + all_texts.extend([str(t).strip() for t in extra_texts if str(t).strip()]) + + for text in all_texts: + wechats.extend(self._extract_wechat(text)) + phones.extend(self._extract_phone(text)) + + wechats = list(dict.fromkeys(wechats)) + phones = list(dict.fromkeys(phones)) + + return { + "wechat": wechats[0] if wechats else "", + "phone": phones[0] if phones else "", + } + + @staticmethod + def _extract_wechat(text: str) -> list: + if not text or not text.strip(): + return [] + + found = [] + patterns = [ + r"微信号[::\s]*([a-zA-Z0-9_\-]{6,20})", + r"微信[::\s]*([a-zA-Z0-9_\-]{6,20})", + r"wx[::\s]*([a-zA-Z0-9_\-]{6,20})", + r"wechat[::\s]*([a-zA-Z0-9_\-]{6,20})", + ] + + for pattern in patterns: + for match in re.finditer(pattern, text, re.IGNORECASE): + value = match.group(1).strip() if match.lastindex else match.group(0).strip() + if value and value not in found and len(value) >= 6: + found.append(value) + + return found[:3] + + @staticmethod + def _extract_phone(text: str) -> list: + if not text or not text.strip(): + return [] + + found = [] + raw_candidates = re.findall(r"1[3-9][\d\-\s]{9,15}", text) + for raw in raw_candidates: + digits = re.sub(r"\D", "", raw) + if len(digits) == 11 and digits.startswith("1") and digits not in found: + found.append(digits) + + return found[:3] + + def _apply_filters(self, friend_list: list) -> list: + """应用筛选条件过滤候选人列表。""" + try: + from server.models import FilterConfig + + # 获取启用的筛选配置 + filter_config = FilterConfig.objects.filter(is_active=True).first() + if not filter_config: + self.logger.info("未找到启用的筛选配置,跳过筛选") + return friend_list + + filtered = [] + for friend in friend_list: + # 筛选活跃度(最后上线时间) + last_time = friend.get("lastTime", "") + if not self._check_activity(last_time, filter_config.activity): + continue + + # 从简历信息中获取年龄、学历、期望职位 + resume = friend.get("resume", {}) or {} + + # 筛选年龄 + age = resume.get("age") + if age and not (filter_config.age_min <= int(age) <= filter_config.age_max): + continue + + # 筛选学历 + education = resume.get("education", "") + if filter_config.education != "不限" and education: + if not self._check_education(education, filter_config.education): + continue + + # 筛选期望职位 + job_name = friend.get("jobName", "") + if filter_config.positions and job_name: + if not any(pos in job_name for pos in filter_config.positions): + continue + + filtered.append(friend) + + self.logger.info("筛选前: %d 人,筛选后: %d 人", len(friend_list), len(filtered)) + return filtered + + except Exception as e: + self.logger.error("应用筛选条件失败: %s,返回原列表", e) + return friend_list + + def _check_activity(self, last_time: str, activity_filter: str) -> bool: + """检查活跃度是否符合要求。""" + if activity_filter == "不限": + return True + + try: + # 解析时间字符串 + now = datetime.now() + + if "昨天" in last_time: + last_active = now - timedelta(days=1) + elif "今天" in last_time or "刚刚" in last_time: + last_active = now + elif "月" in last_time and "日" in last_time: + # 格式如 "03月03日" + match = re.search(r"(\d+)月(\d+)日", last_time) + if match: + month = int(match.group(1)) + day = int(match.group(2)) + year = now.year + # 如果月份大于当前月份,说明是去年的 + if month > now.month: + year -= 1 + last_active = datetime(year, month, day) + else: + return True + else: + return True + + # 计算天数差 + days_diff = (now - last_active).days + + # 根据筛选条件判断 + if activity_filter == "今天活跃": + return days_diff == 0 + elif activity_filter == "3天内活跃": + return days_diff <= 3 + elif activity_filter == "本周活跃": + return days_diff <= 7 + elif activity_filter == "本月活跃": + return days_diff <= 30 + + return True + + except Exception as e: + self.logger.warning("解析活跃度时间失败: %s, last_time=%s", e, last_time) + return True + + @staticmethod + def _check_education(candidate_edu: str, required_edu: str) -> bool: + """检查学历是否符合要求。""" + edu_levels = ["初中", "高中", "中专", "大专", "本科", "硕士", "博士"] + + try: + candidate_level = next((i for i, edu in enumerate(edu_levels) if edu in candidate_edu), -1) + required_level = next((i for i, edu in enumerate(edu_levels) if edu in required_edu), -1) + + if candidate_level == -1 or required_level == -1: + return True + + return candidate_level >= required_level + except Exception: + return True + + def _filter_my_messages(self, messages: list) -> list: + """过滤掉自己发送的消息,只保留对方的消息。""" + filtered = [] + for msg in messages: + if not isinstance(msg, dict): + continue + + # from_id 为 0 表示是对方发送的消息 + from_id = msg.get("fromId", 0) + if from_id == 0: + filtered.append(msg) + + return filtered + + def _handle_follow_up_chat(self, tab, name: str, job_name: str, contact_id: int = None) -> dict: + """处理复聊管理,根据配置发送多轮话术。""" + result = { + "follow_up_attempted": False, + "got_reply": False, + "extracted_contact_from_reply": False, + } + + if not contact_id: + return result + + try: + from server.models import FollowUpConfig, FollowUpScript, FollowUpRecord + from django.utils import timezone + + # 获取该岗位的复聊配置 + config = FollowUpConfig.objects.filter( + position=job_name, + is_active=True + ).first() + + if not config: + # 尝试获取通用配置 + config = FollowUpConfig.objects.filter( + position="通用", + is_active=True + ).first() + + if not config: + self.logger.info("[%s] 未找到复聊配置,跳过复聊", name) + return result + + # 获取该联系人的复聊记录 + last_record = FollowUpRecord.objects.filter( + contact_id=contact_id, + config_id=config.id + ).order_by('-sent_at').first() + + # 确定当前是第几天 + if not last_record: + # 第一次复聊 + day_number = 1 + else: + # 计算距离上次发送的时间 + hours_since_last = (timezone.now() - last_record.sent_at).total_seconds() / 3600 + + # 获取上次使用的话术的间隔时间 + last_script = FollowUpScript.objects.filter(id=last_record.script_id).first() + if last_script and hours_since_last < last_script.interval_hours: + self.logger.info("[%s] 距离上次复聊不足 %d 小时,跳过", name, last_script.interval_hours) + return result + + # 下一天 + day_number = last_record.day_number + 1 + + # 获取该天的话术 + script = FollowUpScript.objects.filter( + config_id=config.id, + day_number=day_number, + is_active=True + ).order_by('order').first() + + # 如果没有该天的话术,尝试获取"往后一直"的话术(day_number=0) + if not script: + script = FollowUpScript.objects.filter( + config_id=config.id, + day_number=0, + is_active=True + ).order_by('order').first() + + if not script: + self.logger.info("[%s] 未找到第 %d 天的复聊话术", name, day_number) + return result + + # 发送话术 + result["follow_up_attempted"] = True + send_success = self._send_message(tab, script.content) + + if send_success: + # 记录发送 + record = FollowUpRecord.objects.create( + contact_id=contact_id, + config_id=config.id, + script_id=script.id, + day_number=day_number, + content=script.content, + ) + + # 等待回复 + reply_info = self._wait_for_reply(tab, script.content) + if reply_info["got_reply"]: + result["got_reply"] = True + result["extracted_contact_from_reply"] = reply_info["has_contact"] + + # 更新记录 + record.got_reply = True + record.reply_content = reply_info["reply_text"] + record.replied_at = timezone.now() + record.save() + + self.logger.info("[%s] 第 %d 天复聊得到回复", name, day_number) + + except Exception as e: + self.logger.error("复聊管理失败: %s", e) + + return result + + + + def _save_contact_record(self, name: str, job_name: str, contacts: dict, action_state: dict) -> int: + """保存联系人记录到数据库,返回contact_id。""" + try: + from server.models import ContactRecord + from django.utils import timezone + + contact_value = contacts.get("wechat") or contacts.get("phone") or "" + if not contact_value: + return None + + # 检查是否已存在 + existing = ContactRecord.objects.filter( + name=name, + contact=contact_value + ).first() + + if existing: + # 更新现有记录 + existing.wechat_exchanged = action_state.get("exchange_confirmed", False) + existing.reply_status = "已回复" if action_state.get("got_reply", False) else "未回复" + existing.save() + self.logger.info("更新联系人记录: %s - %s", name, contact_value) + return existing.id + else: + # 创建新记录 + record = ContactRecord.objects.create( + name=name, + position=job_name, + contact=contact_value, + reply_status="已回复" if action_state.get("got_reply", False) else "未回复", + wechat_exchanged=action_state.get("exchange_confirmed", False), + contacted_at=timezone.now(), + notes=f"自动招聘获取 - 微信: {contacts.get('wechat', '')}, 手机: {contacts.get('phone', '')}" + ) + self.logger.info("保存新联系人记录: %s - %s", name, contact_value) + return record.id + + except Exception as e: + self.logger.error("保存联系人记录失败: %s", e) + return None + def _send_message(self, tab, message: str) -> bool: + """发送消息的通用方法。""" + try: + input_box = tab.ele('x://*[@id="boss-chat-editor-input"]', timeout=2) + if not input_box: + return False + + try: + input_box.click(by_js=True) + input_box.clear() + except Exception: + pass + + input_box.input(message) + time.sleep(random.uniform(1, 2)) + + return self._send_with_confirm(tab, input_box=input_box, message=message) + + except Exception as e: + self.logger.error("发送消息失败: %s", e) + return False + + def _wait_for_reply(self, tab, sent_message: str, max_wait: int = 30) -> dict: + """等待对方回复并提取信息。""" + result = { + "got_reply": False, + "has_contact": False, + "reply_text": "", + } + + try: + check_interval = 3 # 每3秒检查一次 + + for _ in range(max_wait // check_interval): + time.sleep(check_interval) + + # 重新获取聊天面板的消息 + panel_texts = self._collect_chat_panel_texts(tab, max_items=10) + + # 检查最后几条消息 + for text in panel_texts[-5:]: + # 过滤掉我们发送的消息(包含发送的话术内容) + if sent_message in text: + continue + + # 过滤掉包含"微信号"关键词但不是真实微信号的消息 + if "微信号" in text and not self._extract_wechat(text): + continue + + # 尝试提取联系方式 + wechats = self._extract_wechat(text) + phones = self._extract_phone(text) + + if wechats or phones: + result["got_reply"] = True + result["has_contact"] = True + result["reply_text"] = text + return result + + # 即使没有联系方式,只要有新消息也算回复 + if text and text not in [sent_message, ASK_WECHAT_TEXT]: + result["got_reply"] = True + result["reply_text"] = text + return result + + except Exception as e: + self.logger.error("等待回复失败: %s", e) + + return result + diff --git a/worker/tasks/registry.py b/worker/tasks/registry.py index dfa7ad4..97a7b26 100644 --- a/worker/tasks/registry.py +++ b/worker/tasks/registry.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- """ -任务处理器注册表。 -Worker 启动时注册所有可用的 handler,收到任务时按 task_type 查找对应 handler。 +Task handler registry for worker runtime. """ from __future__ import annotations @@ -12,35 +11,31 @@ from worker.tasks.base import BaseTaskHandler logger = logging.getLogger("worker.tasks.registry") -# task_type → handler 实例 _registry: Dict[str, BaseTaskHandler] = {} def register_handler(handler_cls: Type[BaseTaskHandler]) -> None: - """注册一个任务处理器类(自动实例化)。""" instance = handler_cls() if not instance.task_type: - raise ValueError(f"{handler_cls.__name__} 未设置 task_type") + raise ValueError(f"{handler_cls.__name__} missing task_type") _registry[instance.task_type] = instance - logger.info("注册任务处理器: %s → %s", instance.task_type, handler_cls.__name__) + logger.info("Registered task handler: %s -> %s", instance.task_type, handler_cls.__name__) def get_handler(task_type: str) -> Optional[BaseTaskHandler]: - """根据 task_type 获取对应的处理器实例。""" return _registry.get(task_type) def list_handlers() -> list[str]: - """列出所有已注册的 task_type。""" return list(_registry.keys()) def register_all_handlers() -> None: - """注册所有内置任务处理器。在此函数中 import 并注册。""" from worker.tasks.boss_recruit import BossRecruitHandler + from worker.tasks.boss_reply import BossReplyHandler from worker.tasks.check_login import CheckLoginHandler + register_handler(BossRecruitHandler) + register_handler(BossReplyHandler) register_handler(CheckLoginHandler) - # 未来扩展:在此处添加新的 handler - # from worker.tasks.xxx import XxxHandler - # register_handler(XxxHandler) + diff --git a/worker/ws_client.py b/worker/ws_client.py index 440eb5a..c79e50a 100644 --- a/worker/ws_client.py +++ b/worker/ws_client.py @@ -215,6 +215,7 @@ class WorkerWSClient: # 将 account_name 注入 params(供 handler 使用) if account_name: params.setdefault("account_name", account_name) + params.setdefault("worker_id", self.worker_id) params.setdefault("bit_api_base", self.bit_api.base_url) params["_cancel_event"] = cancel_event