649 lines
23 KiB
Python
649 lines
23 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
BOSS recruit task handler — aligned with `1.py -> main`.
|
||
|
||
Flow:
|
||
1) Connect to BitBrowser using browser_name (= account_name, injected by server)
|
||
2) Open recommend page, start listening for geek/list API
|
||
3) Iterate positions (by name, by index, or current)
|
||
4) Apply selected filters (optional — skip if empty)
|
||
5) Capture geek list from intercepted API response
|
||
6) Greet each candidate (scroll, click, greet, favourite, send message, close)
|
||
7) Repeat rounds until target count reached or no new additions
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import random
|
||
import time
|
||
from datetime import datetime
|
||
from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple
|
||
|
||
from common.protocol import TaskType
|
||
from worker.bit_browser import BitBrowserAPI
|
||
from worker.tasks.base import BaseTaskHandler, TaskCancelledError
|
||
|
||
RECOMMEND_URL = "https://www.zhipin.com/web/chat/recommend"
|
||
GEEK_LIST_API = "wapi/zpjob/rec/geek/list"
|
||
FAST_REPLY_TEXT = "您好,我们目前有相关岗位机会,方便了解一下吗?"
|
||
|
||
# Keep selectors consistent with 1.py logic.
|
||
JOB_LIST_SELECTORS = [
|
||
"x://*[contains(@class,'job-item') or contains(@class,'position-item')]",
|
||
"x://li[contains(@class,'job')]",
|
||
"x://div[contains(@class,'job-list')]/div",
|
||
"x://ul[contains(@class,'job')]/li",
|
||
"x://*[contains(@class,'recommend-job')]//*[contains(@class,'item')]",
|
||
"x://*[contains(@class,'job-list')]/*",
|
||
"x://a[contains(@href,'job')]",
|
||
]
|
||
|
||
|
||
class BossRecruitHandler(BaseTaskHandler):
|
||
"""Recruit by greeting candidates from recommend list."""
|
||
|
||
task_type = TaskType.BOSS_RECRUIT.value
|
||
|
||
async def execute(
|
||
self,
|
||
task_id: str,
|
||
params: Dict[str, Any],
|
||
progress_cb: Callable[[str, str], Coroutine],
|
||
) -> Any:
|
||
job_title = str(params.get("job_title", "") or "").strip() or "相关岗位"
|
||
# account_name 实际就是 browser_name(由服务端从 BossAccount.browser_name 注入)
|
||
account_name = str(params.get("account_name", "") or "").strip()
|
||
bit_api_base = str(params.get("bit_api_base", "http://127.0.0.1:54345") or "").strip()
|
||
cancel_event = params.get("_cancel_event")
|
||
worker_id = str(params.get("worker_id", "") or "").strip()
|
||
|
||
# 筛选条件可选——为空则跳过筛选步骤(与 1.py 一致)
|
||
selected_filters = self._normalize_selected_filters(params)
|
||
|
||
greet_target = self._parse_positive_int(params.get("greet_target"), default=20)
|
||
position_names = self._normalize_string_list(params.get("position_names"))
|
||
|
||
# 年龄范围(程序内过滤,不在浏览器筛选中点击)
|
||
age_min = self._parse_optional_int(params.get("age_min"))
|
||
age_max = self._parse_optional_int(params.get("age_max"))
|
||
|
||
self.ensure_not_cancelled(cancel_event)
|
||
age_desc = ""
|
||
if age_min is not None and age_max is not None:
|
||
age_desc = f",年龄范围: {age_min}-{age_max}岁"
|
||
elif age_min is not None:
|
||
age_desc = f",最小年龄: {age_min}岁"
|
||
elif age_max is not None:
|
||
age_desc = f",最大年龄: {age_max}岁"
|
||
await progress_cb(task_id, f"正在启动招聘流程,目标打招呼人数: {greet_target}{age_desc}")
|
||
|
||
loop = asyncio.get_event_loop()
|
||
result = await loop.run_in_executor(
|
||
None,
|
||
self._run_sync,
|
||
job_title,
|
||
account_name,
|
||
bit_api_base,
|
||
selected_filters,
|
||
position_names,
|
||
greet_target,
|
||
worker_id,
|
||
cancel_event,
|
||
age_min,
|
||
age_max,
|
||
)
|
||
return result
|
||
|
||
# ────────────────────── 同步主流程 ──────────────────────
|
||
|
||
def _run_sync(
|
||
self,
|
||
job_title: str,
|
||
account_name: str,
|
||
bit_api_base: str,
|
||
selected_filters: List[str],
|
||
position_names: List[str],
|
||
greet_target: int,
|
||
worker_id: str,
|
||
cancel_event,
|
||
age_min: Optional[int] = None,
|
||
age_max: Optional[int] = None,
|
||
) -> Dict[str, Any]:
|
||
self.ensure_not_cancelled(cancel_event)
|
||
|
||
# 与 1.py 一致:用 ChromiumPage + set_local_port 直接连接
|
||
page = self._connect_browser(bit_api_base, account_name)
|
||
|
||
flow = self._run_main_flow(
|
||
page=page,
|
||
job_title=job_title,
|
||
selected_filters=selected_filters,
|
||
position_names=position_names,
|
||
greet_target=greet_target,
|
||
worker_id=worker_id,
|
||
account_name=account_name,
|
||
cancel_event=cancel_event,
|
||
age_min=age_min,
|
||
age_max=age_max,
|
||
)
|
||
errors = flow.get("errors", [])
|
||
return {
|
||
"job_title": job_title,
|
||
"target_greet_count": greet_target,
|
||
"actual_greeted": flow.get("actual_greeted", 0),
|
||
"rounds": flow.get("rounds", 0),
|
||
"selected_filters": selected_filters,
|
||
"positions": flow.get("positions", []),
|
||
"details": flow.get("details", []),
|
||
"contact_records_created": flow.get("contact_records_created", 0),
|
||
"error_count": len(errors),
|
||
"errors": errors[:20],
|
||
"success": len(errors) == 0,
|
||
}
|
||
|
||
@staticmethod
|
||
def _connect_browser(bit_api_base: str, browser_name: str):
|
||
"""
|
||
与 1.py _connect_bit_browser 一致:
|
||
通过 BitBrowserAPI 打开浏览器,用 ChromiumPage + set_local_port 连接。
|
||
"""
|
||
from DrissionPage import ChromiumPage, ChromiumOptions
|
||
|
||
bit_api = BitBrowserAPI(bit_api_base)
|
||
cdp_addr, port, browser_id = bit_api.open_browser(name=browser_name)
|
||
co = ChromiumOptions().set_local_port(port=port)
|
||
return ChromiumPage(addr_or_opts=co)
|
||
|
||
# ────────────────────── 主招聘循环(对齐 1.py main) ──────────────────────
|
||
|
||
def _run_main_flow(
|
||
self,
|
||
*,
|
||
page,
|
||
job_title: str,
|
||
selected_filters: List[str],
|
||
position_names: List[str],
|
||
greet_target: int,
|
||
worker_id: str,
|
||
account_name: str,
|
||
cancel_event,
|
||
age_min: Optional[int] = None,
|
||
age_max: Optional[int] = None,
|
||
) -> Dict[str, Any]:
|
||
details: List[dict] = []
|
||
errors: List[str] = []
|
||
|
||
self.ensure_not_cancelled(cancel_event)
|
||
|
||
# 1) 开始监听 geek/list API + 打开推荐页(与 1.py 一致)
|
||
page.listen.start(GEEK_LIST_API)
|
||
page.get(RECOMMEND_URL)
|
||
time.sleep(2)
|
||
|
||
container = self._get_container(page)
|
||
positions = self._build_positions(container, position_names)
|
||
if not positions:
|
||
return {
|
||
"actual_greeted": 0,
|
||
"rounds": 0,
|
||
"positions": [],
|
||
"details": [],
|
||
"contact_records_created": 0,
|
||
"errors": ["未识别到岗位列表,无法开始招聘"],
|
||
}
|
||
|
||
greeted_keys: set[str] = set()
|
||
total_greeted = 0
|
||
round_num = 0
|
||
contact_records_created = 0
|
||
|
||
while True:
|
||
self.ensure_not_cancelled(cancel_event)
|
||
round_num += 1
|
||
round_added = 0
|
||
|
||
for pos_type, pos_value in positions:
|
||
self.ensure_not_cancelled(cancel_event)
|
||
if total_greeted >= greet_target:
|
||
break
|
||
|
||
container = self._get_container(page)
|
||
label = self._activate_position(container, pos_type, pos_value)
|
||
if not label:
|
||
continue
|
||
|
||
time.sleep(2)
|
||
container = self._get_container(page)
|
||
|
||
# 2) 捕获 geek/list 包
|
||
geek_list = self._load_and_extract_geek_list(
|
||
page, container, selected_filters
|
||
)
|
||
if not geek_list:
|
||
continue
|
||
|
||
# 3) 逐个打招呼
|
||
added, new_records = self._greet_geek_list(
|
||
page=page,
|
||
container=container,
|
||
geek_list=geek_list,
|
||
greeted_keys=greeted_keys,
|
||
position_label=label,
|
||
default_job=job_title,
|
||
cancel_event=cancel_event,
|
||
age_min=age_min,
|
||
age_max=age_max,
|
||
)
|
||
if not added:
|
||
continue
|
||
|
||
total_greeted += added
|
||
round_added += added
|
||
details.extend(new_records)
|
||
contact_records_created += self._save_new_greet_contacts(
|
||
records=new_records,
|
||
worker_id=worker_id,
|
||
account_name=account_name,
|
||
)
|
||
|
||
# 退出条件(与 1.py 一致)
|
||
if total_greeted >= greet_target:
|
||
break
|
||
if round_added == 0:
|
||
break
|
||
time.sleep(1)
|
||
|
||
return {
|
||
"actual_greeted": total_greeted,
|
||
"rounds": round_num,
|
||
"positions": [self._position_label(t, v) for t, v in positions],
|
||
"details": details,
|
||
"contact_records_created": contact_records_created,
|
||
"errors": errors,
|
||
}
|
||
|
||
# ────────────────────── 容器 & 岗位 ──────────────────────
|
||
|
||
@staticmethod
|
||
def _get_container(page):
|
||
"""推荐牛人内容在 iframe recommendFrame 内。"""
|
||
try:
|
||
return page.get_frame("recommendFrame")
|
||
except Exception:
|
||
return page
|
||
|
||
def _get_all_position_elements(self, container) -> List[Any]:
|
||
for selector in JOB_LIST_SELECTORS:
|
||
try:
|
||
eles = container.eles(selector, timeout=2)
|
||
if eles and 1 <= len(eles) <= 100:
|
||
return eles
|
||
except Exception:
|
||
continue
|
||
return []
|
||
|
||
def _build_positions(self, container, position_names: List[str]) -> List[Tuple[str, Any]]:
|
||
if position_names:
|
||
return [("name", name) for name in position_names]
|
||
job_eles = self._get_all_position_elements(container)
|
||
if not job_eles:
|
||
return [("current", None)]
|
||
return [("index", i) for i in range(len(job_eles))]
|
||
|
||
@staticmethod
|
||
def _position_label(pos_type: str, pos_value: Any) -> str:
|
||
if pos_type == "name":
|
||
return str(pos_value)
|
||
if pos_type == "index":
|
||
return f"岗位{int(pos_value) + 1}"
|
||
return "当前岗位"
|
||
|
||
def _activate_position(self, container, pos_type: str, pos_value: Any) -> str:
|
||
"""
|
||
切换到指定岗位。与 1.py 一致,使用 click(by_js=True)。
|
||
"""
|
||
label = self._position_label(pos_type, pos_value)
|
||
try:
|
||
if pos_type == "name":
|
||
ele = container.ele(f'x://*[contains(text(),"{pos_value}")]', timeout=5)
|
||
if not ele:
|
||
return ""
|
||
ele.click(by_js=True)
|
||
elif pos_type == "index":
|
||
job_eles = self._get_all_position_elements(container)
|
||
if int(pos_value) >= len(job_eles):
|
||
return ""
|
||
job_eles[int(pos_value)].click(by_js=True)
|
||
return label
|
||
except Exception:
|
||
return ""
|
||
|
||
# ────────────────────── 筛选 & 抓包 ──────────────────────
|
||
|
||
def _apply_filter_and_confirm(self, container, selected_filters: List[str]) -> None:
|
||
"""与 1.py _apply_filter_and_confirm 一致。"""
|
||
container.ele("x://*[contains(text(),'筛选')]").click()
|
||
time.sleep(2)
|
||
for item in selected_filters:
|
||
container.ele(f"x://*[contains(text(),'{item}')]").click()
|
||
time.sleep(random.random() * 1.5)
|
||
container.ele("x://*[contains(text(),'确定')]").click()
|
||
|
||
def _load_and_extract_geek_list(
|
||
self, page, container, selected_filters: List[str]
|
||
) -> List[dict]:
|
||
"""
|
||
与 1.py 一致的抓包逻辑:
|
||
- 有筛选条件 → 点筛选 → wait(count=2) → 取最后一个包
|
||
- 无筛选条件 → wait() → 取第一个包
|
||
"""
|
||
if selected_filters:
|
||
self._apply_filter_and_confirm(container, selected_filters)
|
||
packets = page.listen.wait(count=2, timeout=30)
|
||
res = packets[-1] if packets else None
|
||
else:
|
||
res = page.listen.wait(timeout=30)
|
||
|
||
if not res:
|
||
return []
|
||
return self._extract_geek_list_from_packet(res)
|
||
|
||
@staticmethod
|
||
def _extract_geek_list_from_packet(packet) -> List[dict]:
|
||
"""从监听到的包中提取 geekList。"""
|
||
if not packet:
|
||
return []
|
||
body = getattr(getattr(packet, "response", None), "body", None)
|
||
if isinstance(body, str):
|
||
try:
|
||
body = json.loads(body)
|
||
except Exception:
|
||
return []
|
||
if not isinstance(body, dict):
|
||
return []
|
||
geek_list = body.get("zpData", {}).get("geekList", [])
|
||
return geek_list if isinstance(geek_list, list) else []
|
||
|
||
# ────────────────────── 打招呼(对齐 1.py) ──────────────────────
|
||
|
||
@staticmethod
|
||
def _geek_key(item: dict) -> str:
|
||
"""牛人去重键(与 1.py _geek_key 一致)。"""
|
||
card = item.get("geekCard") or {}
|
||
return str(
|
||
card.get("encryptGeekId")
|
||
or card.get("geekId")
|
||
or card.get("geekName")
|
||
or ""
|
||
).strip()
|
||
|
||
def _greet_geek_list(
|
||
self,
|
||
*,
|
||
page,
|
||
container,
|
||
geek_list: List[dict],
|
||
greeted_keys: set[str],
|
||
position_label: str,
|
||
default_job: str,
|
||
cancel_event,
|
||
age_min: Optional[int] = None,
|
||
age_max: Optional[int] = None,
|
||
) -> Tuple[int, List[dict]]:
|
||
"""与 1.py _greet_geek_list_skip_greeted 一致的去重逻辑,增加年龄过滤。"""
|
||
added = 0
|
||
records: List[dict] = []
|
||
for item in geek_list or []:
|
||
self.ensure_not_cancelled(cancel_event)
|
||
geek_key = self._geek_key(item)
|
||
if not geek_key or geek_key in greeted_keys:
|
||
continue
|
||
|
||
# 年龄过滤:从候选人数据中提取年龄,不满足范围则跳过
|
||
if age_min is not None or age_max is not None:
|
||
age = self._parse_age(item)
|
||
if age is not None:
|
||
if age_min is not None and age < age_min:
|
||
continue
|
||
if age_max is not None and age > age_max:
|
||
continue
|
||
|
||
if not self._greet_one_geek(page, container, item):
|
||
continue
|
||
|
||
greeted_keys.add(geek_key)
|
||
added += 1
|
||
card = item.get("geekCard") or {}
|
||
name = str(card.get("geekName", "")).strip() or geek_key
|
||
position = str(card.get("expectPositionName", "")).strip() or position_label or default_job
|
||
records.append(
|
||
{
|
||
"name": name,
|
||
"position": position,
|
||
"geek_key": geek_key,
|
||
"source": "new_greet",
|
||
"greeted_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
}
|
||
)
|
||
return added, records
|
||
|
||
@staticmethod
|
||
def _close_all_panels(page):
|
||
"""
|
||
强制关闭所有可能打开的面板和弹窗,确保 UI 恢复到干净状态。
|
||
每一步都独立 try/except,不管成功或失败都继续。
|
||
"""
|
||
# 关闭侧边聊天面板
|
||
try:
|
||
close_side = page.ele('x://*[@class="iboss iboss-close"]', timeout=1)
|
||
if close_side:
|
||
close_side.click(by_js=True)
|
||
time.sleep(0.3)
|
||
except Exception:
|
||
pass
|
||
|
||
# 关闭 boss-popup 弹窗
|
||
try:
|
||
close_popup = page.ele('x://*[@class="boss-popup__close"]', timeout=1)
|
||
if close_popup:
|
||
close_popup.click(by_js=True)
|
||
time.sleep(0.3)
|
||
except Exception:
|
||
pass
|
||
|
||
# 兜底:尝试关闭所有可能的弹窗关闭按钮
|
||
for selector in [
|
||
'x://*[contains(@class,"close") and contains(@class,"popup")]',
|
||
'x://*[contains(@class,"dialog")]//button[contains(@class,"close")]',
|
||
]:
|
||
try:
|
||
btn = page.ele(selector, timeout=0.5)
|
||
if btn:
|
||
btn.click(by_js=True)
|
||
time.sleep(0.2)
|
||
except Exception:
|
||
pass
|
||
|
||
def _greet_one_geek(self, page, container, item: dict) -> bool:
|
||
"""
|
||
对单个牛人打招呼 —— 与 1.py _greet_one_geek 完全一致的逻辑。
|
||
使用 page.get_frame(1) 获取详情面板,所有元素查找语法与 1.py 相同。
|
||
仅增加 try/finally 防止异常时 UI 卡死。
|
||
"""
|
||
geek_name = str((item.get("geekCard") or {}).get("geekName", "")).strip()
|
||
if not geek_name:
|
||
return False
|
||
|
||
panel_opened = False
|
||
|
||
try:
|
||
# 与 1.py 一致:查找姓名 → scrollIntoView → 点击
|
||
name_ele = container.ele(f'x://span[contains(text(),"{geek_name}")]', timeout=5)
|
||
if not name_ele:
|
||
name_ele = container.ele(f'x://span[text()="{geek_name}"]', timeout=2)
|
||
if not name_ele:
|
||
return False
|
||
|
||
name_ele.run_js("this.scrollIntoView()")
|
||
name_ele.click()
|
||
time.sleep(3)
|
||
panel_opened = True
|
||
|
||
# 与 1.py 一致:a = page.get_frame(1)
|
||
a = page.get_frame(1)
|
||
time.sleep(random.uniform(0.5, 1.2))
|
||
|
||
# 与 1.py 一致:打招呼
|
||
a.ele(f'x://*[contains(text(),"打招呼")]', timeout=2).click(by_js=True)
|
||
time.sleep(random.uniform(0.5, 1.2))
|
||
|
||
# 与 1.py 一致:收藏
|
||
a.ele(f'x://*[contains(text(),"收藏")]', timeout=2).click(by_js=True)
|
||
time.sleep(random.uniform(0.5, 1.2))
|
||
|
||
# 与 1.py 一致:btn-outline × 3
|
||
for i in range(3):
|
||
a.ele(f'x://*[@class="btn-v2 btn-outline-v2"]', timeout=2).click(by_js=True)
|
||
time.sleep(random.uniform(0.5, 1.2))
|
||
|
||
# 与 1.py 一致:快速回复 + 发送
|
||
page.ele(f'x://*[@data-placeholder="快速回复"]', timeout=2).input(FAST_REPLY_TEXT)
|
||
page.ele(f'x://*[contains(text(),"发送")]', timeout=2).click()
|
||
time.sleep(random.uniform(0.5, 1.2))
|
||
|
||
# 与 1.py 一致:关闭面板
|
||
page.ele('x://*[@class="iboss iboss-close"]').click()
|
||
time.sleep(random.uniform(0.5, 1.2))
|
||
a.ele('x://*[@class="boss-popup__close"]').click()
|
||
time.sleep(random.uniform(0.5, 1.2))
|
||
|
||
panel_opened = False
|
||
return True
|
||
|
||
except Exception:
|
||
return False
|
||
finally:
|
||
if panel_opened:
|
||
self._close_all_panels(page)
|
||
time.sleep(0.5)
|
||
|
||
# ────────────────────── 联系记录入库 ──────────────────────
|
||
|
||
def _save_new_greet_contacts(self, *, records: List[dict], worker_id: str, account_name: str) -> int:
|
||
if not records:
|
||
return 0
|
||
from django.utils import timezone
|
||
from server.models import ContactRecord
|
||
|
||
created = 0
|
||
for rec in records:
|
||
geek_key = str(rec.get("geek_key", "")).strip()
|
||
if not geek_key:
|
||
continue
|
||
contact_key = f"greet:{geek_key}"
|
||
defaults = {
|
||
"name": str(rec.get("name", "")).strip() or contact_key,
|
||
"position": str(rec.get("position", "")).strip(),
|
||
"reply_status": "新打招呼",
|
||
"wechat_exchanged": False,
|
||
"worker_id": worker_id,
|
||
"contacted_at": timezone.now(),
|
||
"notes": f"新打招呼记录; account={account_name}",
|
||
}
|
||
obj, was_created = ContactRecord.objects.update_or_create(
|
||
contact=contact_key,
|
||
defaults=defaults,
|
||
)
|
||
if was_created:
|
||
created += 1
|
||
elif not obj.contacted_at:
|
||
obj.contacted_at = timezone.now()
|
||
obj.save(update_fields=["contacted_at"])
|
||
return created
|
||
|
||
# ────────────────────── 工具方法 ──────────────────────
|
||
|
||
@staticmethod
|
||
def _normalize_string_list(value: Any) -> List[str]:
|
||
if value is None:
|
||
return []
|
||
if isinstance(value, list):
|
||
return [str(v).strip() for v in value if str(v).strip()]
|
||
if isinstance(value, str):
|
||
raw = value.strip()
|
||
if not raw:
|
||
return []
|
||
if raw.startswith("[") and raw.endswith("]"):
|
||
try:
|
||
parsed = json.loads(raw)
|
||
if isinstance(parsed, list):
|
||
return [str(v).strip() for v in parsed if str(v).strip()]
|
||
except Exception:
|
||
pass
|
||
return [raw]
|
||
return []
|
||
|
||
def _normalize_selected_filters(self, params: Dict[str, Any]) -> List[str]:
|
||
filters = params.get("selected_filters")
|
||
if filters is None:
|
||
filters = params.get("filters")
|
||
return self._normalize_string_list(filters)
|
||
|
||
@staticmethod
|
||
def _parse_positive_int(value: Any, default: int) -> int:
|
||
try:
|
||
parsed = int(value)
|
||
return parsed if parsed > 0 else default
|
||
except Exception:
|
||
return default
|
||
|
||
@staticmethod
|
||
def _parse_optional_int(value: Any) -> Optional[int]:
|
||
"""解析可选整数,None / 空串 / 无效值返回 None。"""
|
||
if value is None:
|
||
return None
|
||
try:
|
||
parsed = int(value)
|
||
return parsed if parsed > 0 else None
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
@staticmethod
|
||
def _parse_age(item: dict) -> Optional[int]:
|
||
"""
|
||
从 geekCard 中提取候选人年龄(数字)。
|
||
BOSS 直聘 API 的 geekCard 中可能包含:
|
||
- ageDesc: "25岁"
|
||
- age: 25
|
||
- geekAge: 25
|
||
返回整数年龄,无法解析则返回 None。
|
||
"""
|
||
import re
|
||
card = item.get("geekCard") or {}
|
||
|
||
# 优先使用 ageDesc(如 "25岁")
|
||
age_desc = str(card.get("ageDesc", "") or "").strip()
|
||
if age_desc:
|
||
match = re.search(r"(\d+)", age_desc)
|
||
if match:
|
||
return int(match.group(1))
|
||
|
||
# 尝试 age 字段
|
||
age_val = card.get("age")
|
||
if age_val is not None:
|
||
try:
|
||
return int(age_val)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
# 尝试 geekAge 字段
|
||
geek_age = card.get("geekAge")
|
||
if geek_age is not None:
|
||
try:
|
||
return int(geek_age)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
return None
|