diff --git a/server/api/stats.py b/server/api/stats.py index d59a2cf..e450030 100644 --- a/server/api/stats.py +++ b/server/api/stats.py @@ -14,6 +14,8 @@ from server.core.response import api_success from server.models import ContactRecord, BossAccount, TaskLog from server.core.worker_manager import worker_manager +GREETED_STATUS = "新打招呼" + @api_view(["GET"]) def stats_overview(request): @@ -32,12 +34,14 @@ def stats_overview(request): contacts_qs = ContactRecord.objects.filter(date_filter) total_contacts = contacts_qs.count() + total_greeted = contacts_qs.filter(reply_status=GREETED_STATUS).count() total_replied = contacts_qs.filter(reply_status="已回复").count() total_wechat = contacts_qs.filter(wechat_exchanged=True).count() # 今日数据 today_filter = Q(contacted_at__date=now.date()) today_contacts = ContactRecord.objects.filter(today_filter).count() + today_greeted = ContactRecord.objects.filter(today_filter, reply_status=GREETED_STATUS).count() today_replied = ContactRecord.objects.filter(today_filter, reply_status="已回复").count() today_wechat = ContactRecord.objects.filter(today_filter, wechat_exchanged=True).count() @@ -60,10 +64,16 @@ def stats_overview(request): "contacts": { "total": total_contacts, "today": today_contacts, + "greeted": total_greeted, + "today_greeted": today_greeted, "replied": total_replied, "today_replied": today_replied, "reply_rate": reply_rate, }, + "greetings": { + "total": total_greeted, + "today": today_greeted, + }, "wechat": { "total": total_wechat, "today": today_wechat, @@ -86,7 +96,11 @@ def stats_overview(request): @api_view(["GET"]) def stats_daily(request): """按日统计最近 N 天数据。""" - days = int(request.query_params.get("days", 7)) + try: + days = int(request.query_params.get("days", 7)) + except (TypeError, ValueError): + days = 7 + days = max(1, min(days, 180)) now = timezone.now() daily_data = [] @@ -95,11 +109,13 @@ def stats_daily(request): day_filter = Q(contacted_at__date=day) qs = ContactRecord.objects.filter(day_filter) total = qs.count() + greeted = qs.filter(reply_status=GREETED_STATUS).count() replied = qs.filter(reply_status="已回复").count() wechat = qs.filter(wechat_exchanged=True).count() daily_data.append({ "date": f"{day.isoformat()}T00:00:00", "contacts": total, + "greeted": greeted, "replied": replied, "wechat": wechat, "reply_rate": round(replied / max(total, 1) * 100, 1), diff --git a/worker/tasks/boss_recruit.py b/worker/tasks/boss_recruit.py index c0ee749..4d6223f 100644 --- a/worker/tasks/boss_recruit.py +++ b/worker/tasks/boss_recruit.py @@ -26,7 +26,7 @@ from worker.tasks.base import BaseTaskHandler, TaskCancelledError RECOMMEND_URL = "https://www.zhipin.com/web/chat/recommend" GEEK_LIST_API = "wapi/zpjob/rec/geek/list" -FAST_REPLY_TEXT = "您好,我们目前有相关岗位机会,方便了解一下吗?" +DEFAULT_GREETING_TEXT = "当前岗位还在招,加个微信了解一下吗?" # Keep selectors consistent with 1.py logic. JOB_LIST_SELECTORS = [ @@ -63,6 +63,7 @@ class BossRecruitHandler(BaseTaskHandler): greet_target = self._parse_positive_int(params.get("greet_target"), default=20) position_names = self._normalize_string_list(params.get("position_names")) + greeting_messages = self._parse_greeting_messages(params) # 年龄范围(程序内过滤,不在浏览器筛选中点击) age_min = self._parse_optional_int(params.get("age_min")) @@ -76,7 +77,10 @@ class BossRecruitHandler(BaseTaskHandler): age_desc = f",最小年龄: {age_min}岁" elif age_max is not None: age_desc = f",最大年龄: {age_max}岁" - await progress_cb(task_id, f"正在启动招聘流程,目标打招呼人数: {greet_target}{age_desc}") + await progress_cb( + task_id, + f"正在启动招聘流程,目标打招呼人数: {greet_target}{age_desc},启用问候语: {len(greeting_messages)} 条", + ) loop = asyncio.get_event_loop() result = await loop.run_in_executor( @@ -87,6 +91,7 @@ class BossRecruitHandler(BaseTaskHandler): bit_api_base, selected_filters, position_names, + greeting_messages, greet_target, worker_id, cancel_event, @@ -104,6 +109,7 @@ class BossRecruitHandler(BaseTaskHandler): bit_api_base: str, selected_filters: List[str], position_names: List[str], + greeting_messages: List[str], greet_target: int, worker_id: str, cancel_event, @@ -120,6 +126,7 @@ class BossRecruitHandler(BaseTaskHandler): job_title=job_title, selected_filters=selected_filters, position_names=position_names, + greeting_messages=greeting_messages, greet_target=greet_target, worker_id=worker_id, account_name=account_name, @@ -134,6 +141,7 @@ class BossRecruitHandler(BaseTaskHandler): "actual_greeted": flow.get("actual_greeted", 0), "rounds": flow.get("rounds", 0), "selected_filters": selected_filters, + "active_greeting_messages": greeting_messages, "positions": flow.get("positions", []), "details": flow.get("details", []), "contact_records_created": flow.get("contact_records_created", 0), @@ -164,6 +172,7 @@ class BossRecruitHandler(BaseTaskHandler): job_title: str, selected_filters: List[str], position_names: List[str], + greeting_messages: List[str], greet_target: int, worker_id: str, account_name: str, @@ -231,6 +240,7 @@ class BossRecruitHandler(BaseTaskHandler): greeted_keys=greeted_keys, position_label=label, default_job=job_title, + greeting_messages=greeting_messages, cancel_event=cancel_event, age_min=age_min, age_max=age_max, @@ -259,6 +269,7 @@ class BossRecruitHandler(BaseTaskHandler): "rounds": round_num, "positions": [self._position_label(t, v) for t, v in positions], "details": details, + "active_greeting_messages": greeting_messages, "contact_records_created": contact_records_created, "errors": errors, } @@ -387,6 +398,7 @@ class BossRecruitHandler(BaseTaskHandler): greeted_keys: set[str], position_label: str, default_job: str, + greeting_messages: List[str], cancel_event, age_min: Optional[int] = None, age_max: Optional[int] = None, @@ -409,7 +421,8 @@ class BossRecruitHandler(BaseTaskHandler): if age_max is not None and age > age_max: continue - if not self._greet_one_geek(page, container, item): + greeting_text = self._pick_greeting_message(greeting_messages) + if not self._greet_one_geek(page, container, item, greeting_text): continue greeted_keys.add(geek_key) @@ -423,6 +436,7 @@ class BossRecruitHandler(BaseTaskHandler): "position": position, "geek_key": geek_key, "source": "new_greet", + "greeting_text": greeting_text, "greeted_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } ) @@ -465,7 +479,7 @@ class BossRecruitHandler(BaseTaskHandler): except Exception: pass - def _greet_one_geek(self, page, container, item: dict) -> bool: + def _greet_one_geek(self, page, container, item: dict, greeting_text: str) -> bool: """ 对单个牛人打招呼 —— 与 1.py _greet_one_geek 完全一致的逻辑。 使用 page.get_frame(1) 获取详情面板,所有元素查找语法与 1.py 相同。 @@ -516,7 +530,8 @@ class BossRecruitHandler(BaseTaskHandler): time.sleep(random.uniform(0.5, 5)) # 与 1.py 一致:快速回复 + 发送 - page.ele(f'x://*[@data-placeholder="快速回复"]', timeout=2).input(FAST_REPLY_TEXT) + reply_text = str(greeting_text or "").strip() or DEFAULT_GREETING_TEXT + page.ele(f'x://*[@data-placeholder="快速回复"]', timeout=2).input(reply_text) page.ele(f'x://*[contains(text(),"发送")]', timeout=2).click() time.sleep(random.uniform(0.5, 5)) @@ -550,6 +565,7 @@ class BossRecruitHandler(BaseTaskHandler): if not geek_key: continue contact_key = f"greet:{geek_key}" + greeting_text = str(rec.get("greeting_text", "")).strip() defaults = { "name": str(rec.get("name", "")).strip() or contact_key, "position": str(rec.get("position", "")).strip(), @@ -557,7 +573,7 @@ class BossRecruitHandler(BaseTaskHandler): "wechat_exchanged": False, "worker_id": worker_id, "contacted_at": timezone.now(), - "notes": f"新打招呼记录; account={account_name}", + "notes": f"新打招呼记录; account={account_name}; greeting={greeting_text or DEFAULT_GREETING_TEXT}", } obj, was_created = ContactRecord.objects.update_or_create( contact=contact_key, @@ -572,6 +588,62 @@ class BossRecruitHandler(BaseTaskHandler): # ────────────────────── 工具方法 ────────────────────── + def _parse_greeting_messages(self, params: Dict[str, Any]) -> List[str]: + """ + 解析并裁剪问候语配置: + - greeting_messages: list / JSON 字符串 / 换行分隔字符串 + - greeting_count: 启用前 N 条问候语;N>1 时运行期随机发送 + """ + raw_messages = params.get("greeting_messages") + if raw_messages is None: + raw_messages = params.get("greetings") + + messages: List[str] = [] + if isinstance(raw_messages, list): + for item in raw_messages: + text = str(item or "").strip() + if text: + messages.append(text) + elif isinstance(raw_messages, str): + raw = raw_messages.strip() + if raw: + if raw.startswith("[") and raw.endswith("]"): + try: + parsed = json.loads(raw) + if isinstance(parsed, list): + for item in parsed: + text = str(item or "").strip() + if text: + messages.append(text) + except Exception: + pass + if not messages: + for line in raw.replace("\r", "").split("\n"): + text = line.strip() + if text: + messages.append(text) + + # 去重并确保至少有系统内置话术 + deduped = list(dict.fromkeys(messages)) + if not deduped: + deduped = [DEFAULT_GREETING_TEXT] + + greeting_count = self._parse_positive_int( + params.get("greeting_count"), + default=min(1, len(deduped)) if deduped else 1, + ) + greeting_count = max(1, min(greeting_count, len(deduped))) + return deduped[:greeting_count] + + @staticmethod + def _pick_greeting_message(greeting_messages: List[str]) -> str: + pool = [str(item or "").strip() for item in (greeting_messages or []) if str(item or "").strip()] + if not pool: + return DEFAULT_GREETING_TEXT + if len(pool) == 1: + return pool[0] + return random.choice(pool) + @staticmethod def _normalize_string_list(value: Any) -> List[str]: if value is None: diff --git a/worker/tasks/boss_reply.py b/worker/tasks/boss_reply.py index 388f44a..f1a94ef 100644 --- a/worker/tasks/boss_reply.py +++ b/worker/tasks/boss_reply.py @@ -161,40 +161,53 @@ class BossReplyHandler(BaseTaskHandler): messages = self._wait_history_messages(tab) self.ensure_not_cancelled(cancel_event) - # 过滤掉自己发送的消息 + # 过滤掉自己发送的消息,只保留候选人消息 filtered_messages = self._filter_my_messages(messages) + has_history_message = bool(filtered_messages) has_contact_keyword = self._has_contact_keyword(filtered_messages) contacts = self._extract_contacts(filtered_messages) contact_written = bool(contacts.get("wechat") or contacts.get("phone")) + contact_fallback = self._build_friend_contact_fallback(friend, name, friend_job_id) action_state = { "asked_wechat": False, "send_success": False, "exchange_clicked": False, "exchange_confirmed": False, + "follow_up_attempted": False, + "follow_up_sent": False, + "follow_up_day": 0, + "follow_up_script_id": 0, + "follow_up_content": "", + "got_reply": False, + "extracted_contact_from_reply": False, } - if not has_contact_keyword: + + # 先确保存在联系人记录(即使暂时没有微信/手机号,也要入库) + contact_id = self._save_contact_record( + name=name, + job_name=friend_job_name, + contacts=contacts, + action_state=action_state, + contact_fallback=contact_fallback, + ) + + # 如果候选人此前发过消息,优先走复聊配置(第1天/第2天/...) + if has_history_message and contact_id: self.ensure_not_cancelled(cancel_event) - action_state = self._ask_and_exchange_wechat_like_script(tab) - - # 先保存联系人记录(如果有的话) - temp_contact_id = None - if contacts.get("wechat") or contacts.get("phone"): - temp_contact_id = self._save_contact_record(name, friend_job_name, contacts, action_state) - - # 发送后等待对方回复,进行复聊管理 - if action_state["send_success"]: - self.ensure_not_cancelled(cancel_event) - reply_result = self._handle_follow_up_chat(tab, name, friend_job_name, temp_contact_id) - action_state.update(reply_result) - - # 如果复聊中提取到了新的联系方式,更新联系人记录 - if reply_result.get("extracted_contact_from_reply"): - panel_texts = self._collect_chat_panel_texts(tab) - new_contacts = self._extract_contacts(filtered_messages, extra_texts=panel_texts) - if new_contacts.get("wechat") or new_contacts.get("phone"): - contacts.update(new_contacts) - contact_written = True + follow_state = self._handle_follow_up_chat(tab, name, friend_job_name, contact_id) + action_state.update(follow_state) + + # 没有联系方式关键词且未成功发送复聊时,走默认索要微信逻辑 + if not has_contact_keyword and not action_state.get("follow_up_sent", False): + self.ensure_not_cancelled(cancel_event) + ask_state = self._ask_and_exchange_wechat_like_script(tab) + action_state["asked_wechat"] = action_state["asked_wechat"] or ask_state.get("asked_wechat", False) + action_state["send_success"] = action_state["send_success"] or ask_state.get("send_success", False) + action_state["exchange_clicked"] = action_state["exchange_clicked"] or ask_state.get("exchange_clicked", False) + action_state["exchange_confirmed"] = ( + action_state["exchange_confirmed"] or ask_state.get("exchange_confirmed", False) + ) panel_texts = self._collect_chat_panel_texts(tab) contacts = self._extract_contacts(filtered_messages, extra_texts=panel_texts) @@ -204,12 +217,16 @@ class BossReplyHandler(BaseTaskHandler): "[%s] 历史消息含联系方式关键词,但未提取到有效联系方式,疑似识别失败", name, ) - - # 保存联系人记录到数据库,获取contact_id用于复聊 - contact_id = None - if contact_written: - contact_id = self._save_contact_record(name, friend_job_name, contacts, action_state) - + + # 二次更新联系人记录(补充回复状态/是否交换微信/提取到的新联系方式) + contact_id = self._save_contact_record( + name=name, + job_name=friend_job_name, + contacts=contacts, + action_state=action_state, + contact_fallback=contact_fallback, + ) + collected.append( { "name": name, @@ -218,6 +235,8 @@ class BossReplyHandler(BaseTaskHandler): "wechat": contacts["wechat"], "phone": contacts["phone"], "contact_written": contact_written, + "contact_recorded": bool(contact_id), + "has_history_message": has_history_message, "has_contact_keyword": has_contact_keyword, **action_state, } @@ -248,6 +267,26 @@ class BossReplyHandler(BaseTaskHandler): return friend_list return [] + @staticmethod + def _build_friend_contact_fallback(friend: dict, name: str, job_id: str) -> str: + """为未提取到微信/手机号的联系人生成稳定占位 contact。""" + candidates = [ + friend.get("encryptGeekId"), + friend.get("geekId"), + friend.get("friendId"), + friend.get("uid"), + friend.get("encryptUid"), + friend.get("securityId"), + ] + for value in candidates: + normalized = str(value or "").strip() + if normalized: + return f"boss_friend:{normalized}" + + base = f"{name}:{job_id}".strip(":") + normalized_base = re.sub(r"\s+", "", base) or name or "unknown" + return f"boss_friend:{normalized_base}" + @staticmethod def _xpath_literal(value: str) -> str: if "'" not in value: @@ -634,154 +673,208 @@ class BossReplyHandler(BaseTaskHandler): return filtered def _handle_follow_up_chat(self, tab, name: str, job_name: str, contact_id: int = None) -> dict: - """处理复聊管理,根据配置发送多轮话术。""" + """处理复聊管理:命中第N天话术后发送,并尝试交换微信。""" result = { + "send_success": False, + "exchange_clicked": False, + "exchange_confirmed": False, "follow_up_attempted": False, + "follow_up_sent": False, + "follow_up_day": 0, + "follow_up_script_id": 0, + "follow_up_content": "", "got_reply": False, "extracted_contact_from_reply": False, } - + if not contact_id: return result - + try: from server.models import FollowUpConfig, FollowUpScript, FollowUpRecord from django.utils import timezone - - # 获取该岗位的复聊配置 + config = FollowUpConfig.objects.filter( position=job_name, - is_active=True + is_active=True, ).first() - if not config: - # 尝试获取通用配置 config = FollowUpConfig.objects.filter( position="通用", - is_active=True + is_active=True, ).first() - + if not config: self.logger.info("[%s] 未找到复聊配置,跳过复聊", name) return result - - # 获取该联系人的复聊记录 + last_record = FollowUpRecord.objects.filter( contact_id=contact_id, - config_id=config.id - ).order_by('-sent_at').first() - - # 确定当前是第几天 + config_id=config.id, + ).order_by("-sent_at").first() + if not last_record: - # 第一次复聊 day_number = 1 else: - # 计算距离上次发送的时间 hours_since_last = (timezone.now() - last_record.sent_at).total_seconds() / 3600 - - # 获取上次使用的话术的间隔时间 last_script = FollowUpScript.objects.filter(id=last_record.script_id).first() if last_script and hours_since_last < last_script.interval_hours: self.logger.info("[%s] 距离上次复聊不足 %d 小时,跳过", name, last_script.interval_hours) return result - - # 下一天 day_number = last_record.day_number + 1 - - # 获取该天的话术 + script = FollowUpScript.objects.filter( config_id=config.id, day_number=day_number, - is_active=True - ).order_by('order').first() - - # 如果没有该天的话术,尝试获取"往后一直"的话术(day_number=0) + is_active=True, + ).order_by("order").first() if not script: script = FollowUpScript.objects.filter( config_id=config.id, day_number=0, - is_active=True - ).order_by('order').first() - + is_active=True, + ).order_by("order").first() + if not script: self.logger.info("[%s] 未找到第 %d 天的复聊话术", name, day_number) return result - - # 发送话术 + result["follow_up_attempted"] = True + result["follow_up_day"] = day_number + result["follow_up_script_id"] = int(script.id) + result["follow_up_content"] = script.content + send_success = self._send_message(tab, script.content) - - if send_success: - # 记录发送 - record = FollowUpRecord.objects.create( - contact_id=contact_id, - config_id=config.id, - script_id=script.id, - day_number=day_number, - content=script.content, - ) - - # 等待回复 - reply_info = self._wait_for_reply(tab, script.content) - if reply_info["got_reply"]: - result["got_reply"] = True - result["extracted_contact_from_reply"] = reply_info["has_contact"] - - # 更新记录 - record.got_reply = True - record.reply_content = reply_info["reply_text"] - record.replied_at = timezone.now() - record.save() - - self.logger.info("[%s] 第 %d 天复聊得到回复", name, day_number) - + if not send_success: + return result + + result["follow_up_sent"] = True + result["send_success"] = True + + record = FollowUpRecord.objects.create( + contact_id=contact_id, + config_id=config.id, + script_id=script.id, + day_number=day_number, + content=script.content, + ) + + time.sleep(random.uniform(0.8, 1.5)) + result["exchange_clicked"] = self._click_change_wechat_like_script(tab) + if result["exchange_clicked"]: + time.sleep(random.uniform(0.8, 1.5)) + result["exchange_confirmed"] = self._click_exchange_confirm_like_script(tab) + + reply_info = self._wait_for_reply(tab, script.content) + if reply_info["got_reply"]: + result["got_reply"] = True + result["extracted_contact_from_reply"] = reply_info["has_contact"] + + record.got_reply = True + record.reply_content = reply_info["reply_text"] + record.replied_at = timezone.now() + record.save() + + self.logger.info("[%s] 第 %d 天复聊得到回复", name, day_number) + except Exception as e: self.logger.error("复聊管理失败: %s", e) - + return result + @staticmethod + def _resolve_reply_status(action_state: dict) -> str: + if action_state.get("got_reply"): + return "已回复" + if action_state.get("follow_up_attempted"): + return "复聊中" + if action_state.get("asked_wechat") or action_state.get("send_success"): + return "待回复" + return "未回复" - - def _save_contact_record(self, name: str, job_name: str, contacts: dict, action_state: dict) -> int: - """保存联系人记录到数据库,返回contact_id。""" + def _save_contact_record( + self, + name: str, + job_name: str, + contacts: dict, + action_state: dict, + contact_fallback: str = "", + ) -> Optional[int]: + """保存或更新联系人记录(无联系方式时使用占位 contact)。""" try: from server.models import ContactRecord from django.utils import timezone - - contact_value = contacts.get("wechat") or contacts.get("phone") or "" + + actual_contact = str(contacts.get("wechat") or contacts.get("phone") or "").strip() + fallback_contact = str(contact_fallback or "").strip() + contact_value = actual_contact or fallback_contact if not contact_value: return None - - # 检查是否已存在 - existing = ContactRecord.objects.filter( - name=name, - contact=contact_value - ).first() - + + existing = None + if fallback_contact: + existing = ContactRecord.objects.filter(contact=fallback_contact).first() + if not existing and actual_contact: + existing = ContactRecord.objects.filter(contact=actual_contact).first() + + reply_status = self._resolve_reply_status(action_state) + now = timezone.now() + note_parts = [ + "自动回复任务", + f"微信:{contacts.get('wechat', '')}", + f"手机:{contacts.get('phone', '')}", + f"follow_day:{action_state.get('follow_up_day', 0)}", + ] + notes = "; ".join(note_parts) + if existing: - # 更新现有记录 - existing.wechat_exchanged = action_state.get("exchange_confirmed", False) - existing.reply_status = "已回复" if action_state.get("got_reply", False) else "未回复" - existing.save() - self.logger.info("更新联系人记录: %s - %s", name, contact_value) - return existing.id - else: - # 创建新记录 - record = ContactRecord.objects.create( - name=name, - position=job_name, - contact=contact_value, - reply_status="已回复" if action_state.get("got_reply", False) else "未回复", - wechat_exchanged=action_state.get("exchange_confirmed", False), - contacted_at=timezone.now(), - notes=f"自动招聘获取 - 微信: {contacts.get('wechat', '')}, 手机: {contacts.get('phone', '')}" + # 已有占位 contact,拿到真实联系方式后升级 contact 字段 + if ( + actual_contact + and existing.contact != actual_contact + and not ContactRecord.objects.filter(contact=actual_contact).exclude(pk=existing.pk).exists() + ): + existing.contact = actual_contact + elif not existing.contact: + existing.contact = contact_value + + existing.name = name or existing.name + if job_name: + existing.position = job_name + existing.reply_status = reply_status + existing.wechat_exchanged = existing.wechat_exchanged or bool(action_state.get("exchange_confirmed")) + existing.contacted_at = now + existing.notes = notes + existing.save( + update_fields=[ + "contact", + "name", + "position", + "reply_status", + "wechat_exchanged", + "contacted_at", + "notes", + ] ) - self.logger.info("保存新联系人记录: %s - %s", name, contact_value) - return record.id - + self.logger.info("更新联系人记录: %s - %s", name, existing.contact) + return existing.id + + record = ContactRecord.objects.create( + name=name, + position=job_name, + contact=contact_value, + reply_status=reply_status, + wechat_exchanged=bool(action_state.get("exchange_confirmed")), + contacted_at=now, + notes=notes, + ) + self.logger.info("保存新联系人记录: %s - %s", name, contact_value) + return record.id + except Exception as e: self.logger.error("保存联系人记录失败: %s", e) return None + def _send_message(self, tab, message: str) -> bool: """发送消息的通用方法。""" try: