Files
jyls_django/User/utils.py
2026-02-04 14:13:17 +08:00

1151 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
审批相关的工具函数
"""
import json
from datetime import datetime, date
from .models import OperationLog, User, Team, Approval, Department
def is_department_id(value):
"""
判断personincharge字段的值是部门ID还是审批员用户名
统一规则:
- 如果是纯数字字符串(如 "1", "2", "123"表示部门ID
- 如果包含非数字字符(如 "张三", "李四"),表示审批员用户名
Args:
value: personincharge字段的值字符串
Returns:
bool: True表示是部门IDFalse表示是审批员用户名
示例:
>>> is_department_id("1")
True
>>> is_department_id("123")
True
>>> is_department_id("张三")
False
>>> is_department_id("dept:1")
False
"""
if not value:
return False
# 判断是否为纯数字字符串(去除首尾空格)
return str(value).strip().isdigit()
def format_personincharge(value, is_department=False):
"""
格式化personincharge字段的值
统一规则:
- 如果是部门ID确保是纯数字字符串
- 如果是审批员用户名,保持原样
Args:
value: 部门ID整数或字符串或审批员用户名字符串
is_department: 是否为部门ID默认False审批员用户名
Returns:
str: 格式化后的personincharge值
"""
if not value:
return ''
if is_department:
# 部门ID转换为字符串确保是纯数字
try:
return str(int(value))
except (ValueError, TypeError):
raise ValueError(f"部门ID必须是数字当前值: {value}")
else:
# 审批员用户名:保持原样,但确保是字符串
return str(value).strip()
def get_finance_personincharge_candidates():
"""
获取财务部抄送的负责人标识列表优先使用部门ID其次回退字符串“财务”。
返回按优先级去重的字符串列表,便于 personincharge 匹配和查询。
返回值优先级:
1. 财务部门ID如果数据库中存在财务部门
2. "财务" 字符串(兼容历史数据)
3. "财务部" 字符串(兼容历史数据)
"""
import logging
logger = logging.getLogger(__name__)
candidates = []
try:
# 查询数据库获取财务部门ID部门名称包含"财务"的部门)
finance_depts = Department.objects.filter(
is_deleted=False,
username__icontains="财务"
).values_list("id", "username")
for dept_id, dept_name in finance_depts:
candidates.append(str(dept_id))
logger.debug(f"get_finance_personincharge_candidates: 找到财务部门 ID={dept_id}, 名称={dept_name}")
if not candidates:
logger.warning("get_finance_personincharge_candidates: 数据库中未找到财务部门,将使用回退字符串")
except Exception as e:
logger.error(f"get_finance_personincharge_candidates: 查询财务部门失败: {e}")
# 回退保留原有字符串标识,兼容历史数据(包括"财务"和"财务部"两种格式)
candidates.append("财务")
candidates.append("财务部")
# 去重保持顺序
uniq = []
seen = set()
for item in candidates:
if item is None:
continue
item_str = str(item)
if item_str not in seen:
uniq.append(item_str)
seen.add(item_str)
logger.debug(f"get_finance_personincharge_candidates: 返回候选列表={uniq}")
return uniq
def get_finance_personincharge_value():
"""
获取优先的财务抄送标识(用于写入 personincharge 字段)。
返回值:
- 优先返回财务部门ID字符串形式
- 如果数据库中没有财务部门,回退到 "财务" 字符串
"""
import logging
logger = logging.getLogger(__name__)
candidates = get_finance_personincharge_candidates()
result = candidates[0] if candidates else "财务"
# 判断返回值是否为部门ID纯数字字符串
is_dept_id = result.isdigit()
logger.info(f"get_finance_personincharge_value: 返回值={result}, 是否为部门ID={is_dept_id}")
return result
def get_law_firm_leader(team_name=None):
"""
获取律所负责人
优先从团队负责人获取,如果没有则从管委会角色获取
Args:
team_name: 团队名称(可选)
Returns:
str: 律所负责人用户名如果找不到则返回None
"""
try:
# 如果提供了团队名称,尝试从团队获取负责人
if team_name:
from .models import Team
try:
team = Team.objects.get(name=team_name, is_deleted=False)
# 获取团队的第一个审核人作为负责人
approvers = team.approvers.filter(is_deleted=False)
if approvers.exists():
return approvers.first().username
except Team.DoesNotExist:
pass
# 如果没有团队负责人,尝试从管委会角色获取
from .models import User
from business.models import role
try:
# 查找有"管委会"角色的用户
management_users = User.objects.filter(
role__RoleName__icontains="管委会",
is_deleted=False
).distinct()
if management_users.exists():
return management_users.first().username
except:
pass
return None
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"获取律所负责人失败: {str(e)}")
return None
def is_finance_personincharge(value):
"""判断 personincharge 是否表示财务部门抄送部门ID或“财务”字符串"""
if value is None:
return False
return str(value) in get_finance_personincharge_candidates()
def log_operation(request, operation_type, module, action, target_type, target_id=None,
target_name=None, old_data=None, new_data=None, remark=None):
"""
记录操作日志
Args:
request: Django request对象
operation_type: 操作类型DELETE, CREATE, UPDATE, APPROVE等
module: 模块名称User, Business, Finance等
action: 操作描述(如"删除用户""创建立项"等)
target_type: 目标类型如User, ProjectRegistration等
target_id: 目标ID
target_name: 目标名称(如用户名、项目名等)
old_data: 操作前的数据字典会自动转换为JSON
new_data: 操作后的数据字典会自动转换为JSON
remark: 备注信息
Returns:
OperationLog对象
"""
try:
# 获取操作人信息
token = request.META.get('token') or request.META.get('HTTP_AUTHORIZATION', '').replace('Bearer ', '')
operator = '未知用户'
operator_id = None
if token:
user = User.objects.filter(token=token, is_deleted=False).first()
if user:
operator = user.username
operator_id = user.id
# 获取IP地址
ip_address = request.META.get('HTTP_X_FORWARDED_FOR', '').split(',')[0].strip()
if not ip_address:
ip_address = request.META.get('REMOTE_ADDR', '')
# 获取用户代理
user_agent = request.META.get('HTTP_USER_AGENT', '')
# 转换数据为JSON字符串
old_data_str = json.dumps(old_data, ensure_ascii=False) if old_data else None
new_data_str = json.dumps(new_data, ensure_ascii=False) if new_data else None
# 创建日志记录
log = OperationLog.objects.create(
operator=operator,
operator_id=operator_id,
operation_type=operation_type,
module=module,
action=action,
target_type=target_type,
target_id=str(target_id) if target_id else None,
target_name=target_name,
old_data=old_data_str,
new_data=new_data_str,
ip_address=ip_address,
user_agent=user_agent,
request_path=request.path,
remark=remark
)
return log
except Exception as e:
# 日志记录失败不应该影响主业务流程
import logging
logger = logging.getLogger(__name__)
logger.error(f"记录操作日志失败: {str(e)}")
return None
def create_team_approval(team_name, title, content, approval_type, user_id, request=None):
"""
根据团队类型创建审批流程
规则:
- 个人团队personal不需要审核人直接抄送财务personincharge设为财务部ID
- 团队team需要指定审核人可以多个审核通过后才抄送财务
注意personincharge字段统一使用财务部ID优先或回退到"财务"字符串
Args:
team_name: 团队名称
title: 审批标题
content: 审批内容
approval_type: 审批类型(如"立项登记""付款申请"等)
user_id: 关联的业务ID字符串
request: Django request对象可选用于记录日志
Returns:
Approval对象或None
"""
try:
# 查找团队
team = Team.objects.prefetch_related('approvers').filter(name=team_name, is_deleted=False).first()
if not team:
# 如果找不到团队,使用默认流程(需要审核人)
return None
today = datetime.now().strftime("%Y-%m-%d")
if team.team_type == 'personal':
# 个人团队:直接抄送财务,不需要审核人
# 使用财务部ID优先或回退到"财务"字符串
finance_personincharge = get_finance_personincharge_value()
approval = Approval.objects.create(
title=title,
content=content,
times=today,
personincharge=finance_personincharge, # 使用财务部ID
state='已抄送财务', # 直接标记为已抄送财务
type=approval_type,
user_id=str(user_id)
)
return approval
elif team.team_type == 'team':
# 团队:需要审核人审核
approvers = team.approvers.filter(is_deleted=False)
if not approvers.exists():
# 如果没有审核人返回None应该在前端验证
return None
# 将审核人用户名用逗号连接(多个审核人)
approver_names = ','.join([approver.username for approver in approvers])
approval = Approval.objects.create(
title=title,
content=content,
times=today,
personincharge=approver_names, # 多个审核人用逗号分隔
state='审核中',
type=approval_type,
user_id=str(user_id)
)
return approval
return None
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"创建团队审批失败: {str(e)}")
return None
def get_team_approval_info(team_name):
"""
获取团队的审批信息
Args:
team_name: 团队名称
Returns:
dict: {
'team_type': 'personal''team',
'needs_approval': True/False, # 是否需要审核
'approvers': [{'id': 1, 'username': '张三'}, ...], # 审核人列表
'direct_to_finance': True/False # 是否直接抄送财务
}
"""
try:
team = Team.objects.prefetch_related('approvers').filter(name=team_name, is_deleted=False).first()
if not team:
return {
'team_type': None,
'needs_approval': True, # 默认需要审核
'approvers': [],
'direct_to_finance': False
}
approvers = list(team.approvers.filter(is_deleted=False).values('id', 'username'))
return {
'team_type': team.team_type,
'needs_approval': team.team_type == 'team',
'approvers': approvers,
'direct_to_finance': team.team_type == 'personal'
}
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"获取团队审批信息失败: {str(e)}")
return {
'team_type': None,
'needs_approval': True,
'approvers': [],
'direct_to_finance': False
}
def normalize_approvers_param(approvers, personincharge):
"""
兼容旧接口:如果未传 approvers则使用 personincharge。
personincharge 支持单个值、逗号分隔字符串或列表格式。
"""
if approvers:
return approvers
if personincharge:
return personincharge
return approvers
def build_missing_approvers_message(team_name, approvers):
"""
统一生成“缺少审核人”的详细错误文案
适用于团队类型需要审批但未能解析到有效审核人的情况
"""
try:
approvers_missing = approvers is None
approvers_empty = approvers == "" or approvers == []
approvers_provided = not approvers_missing and not approvers_empty
team = None
if team_name:
team = Team.objects.prefetch_related('approvers').filter(
name=team_name, is_deleted=False
).first()
if not team_name:
return "当前用户未绑定团队,无法自动获取审核人,请传 approvers 参数或先配置团队。"
if not team:
return f'团队"{team_name}"不存在或已被删除,请检查团队配置或传 approvers 参数。'
if team.team_type != "team":
return f'团队"{team_name}"为个人团队,无需审核人。'
if approvers_empty:
return "已传 approvers 但为空请传入有效的审核人列表推荐ID数组如[1,2,3])。"
if approvers_provided:
approvers_list = parse_approvers(approvers)
if not approvers_list:
return "已传 approvers 但解析失败请按推荐格式传入用户ID数组如 [1,2,3]。"
invalid = [
name for name in approvers_list
if not User.objects.filter(username=name, is_deleted=False).exists()
]
if invalid and len(invalid) == len(approvers_list):
return f'已传 approvers但全部审核人无效/不存在:{", ".join(invalid)}。请更换有效审核人或调整团队配置。'
if invalid:
return f'已传 approvers但部分审核人无效{", ".join(invalid)}。请移除无效人员或补充有效审核人。'
return "审核人参数已传入但审批创建失败,请检查团队配置或权限。"
# 未传 approvers尝试使用团队配置
team_approvers = list(
team.approvers.filter(is_deleted=False).values_list("username", flat=True)
)
if not team_approvers:
return f'未传 approvers且团队"{team_name}"未配置审核人,请先配置团队审核人或传 approvers 参数。'
valid_team_approvers = [
name for name in team_approvers
if User.objects.filter(username=name, is_deleted=False).exists()
]
if not valid_team_approvers:
return f'未传 approvers且团队"{team_name}"配置的审核人无效或已被删除,请重新配置或传 approvers 参数。'
return (
f'未传 approvers。团队"{team_name}"为团队类型,需要审核人。团队当前审核人:'
f'{", ".join(valid_team_approvers)}'
'请传 approvers 参数推荐ID数组如[1,2,3]),或调整团队审核人配置。'
)
except Exception:
return "团队类型需要指定审核人,请提供 approvers 参数并检查团队配置。"
def parse_approvers(approvers):
"""
解析审核人列表,统一处理数组和字符串格式
支持用户ID列表推荐和用户名列表兼容旧接口
Args:
approvers: 审核人列表,可以是:
- 数组格式ID列表推荐[1, 2, 3] 或 ["1", "2", "3"]
- 数组格式(用户名列表,兼容):["张三", "李四", "王五"]
- JSON字符串格式ID列表"[1,2,3]""[5]"'"[1,2,3]"'
- JSON字符串格式带空格"[ 5 ]""[1, 2, 3]"
- JSON字符串格式用户名列表兼容'["张三","李四","王五"]'
- 字符串格式逗号分隔的ID"1,2,3""5"
- 字符串格式(逗号分隔的用户名,兼容):"张三,李四,王五"
- None: 返回空列表
Returns:
list: 审核人用户名列表,如 ["张三", "李四", "王五"]
"""
import json
if not approvers:
return []
approvers_list = []
if isinstance(approvers, str):
# 去除首尾空格
approvers = approvers.strip()
# 如果字符串为空,返回空列表
if not approvers:
return []
# 尝试解析 JSON 字符串格式(如 "[1,2,3]" 或 "[5]" 或 '"[1,2,3]"'
try:
# 先尝试直接解析 JSON
parsed = json.loads(approvers)
if isinstance(parsed, list):
approvers_list = [str(a).strip() for a in parsed if a is not None and str(a).strip()]
elif isinstance(parsed, str):
# 如果是字符串可能是被双重编码的JSON字符串再解析一次
# 例如:'"[5]"' -> "[5]" -> [5]
try:
parsed2 = json.loads(parsed)
if isinstance(parsed2, list):
approvers_list = [str(a).strip() for a in parsed2 if a is not None and str(a).strip()]
elif isinstance(parsed2, (int, str)):
# 单个值,转换为列表
approvers_list = [str(parsed2).strip()]
except (json.JSONDecodeError, ValueError, TypeError):
# 双重解析失败,可能是普通字符串,按逗号分隔处理
approvers_list = [parsed.strip()] if parsed.strip() else []
elif isinstance(parsed, (int, str)):
# 单个值(数字或字符串),转换为列表
approvers_list = [str(parsed).strip()]
except (json.JSONDecodeError, ValueError, TypeError):
# JSON 解析失败,尝试按逗号分隔处理
# 例如:"1,2,3" 或 "5" 或 "张三,李四"
approvers_list = [a.strip() for a in approvers.split(',') if a.strip()]
elif isinstance(approvers, list):
# 数组格式(推荐)
approvers_list = [str(a).strip() for a in approvers if a is not None and str(a).strip()]
elif isinstance(approvers, (int, float)):
# 单个数字,转换为列表
approvers_list = [str(int(approvers))]
else:
return []
if not approvers_list:
return []
# 判断是ID还是用户名如果第一个元素是纯数字则认为是ID列表
# 否则认为是用户名列表(兼容旧接口)
first_item = approvers_list[0]
is_id_list = str(first_item).strip().isdigit()
if is_id_list:
# ID列表转换为用户名列表
try:
user_ids = [int(uid) for uid in approvers_list]
users = User.objects.filter(id__in=user_ids, is_deleted=False)
# 保持原有顺序
user_dict = {user.id: user.username for user in users}
username_list = [user_dict.get(uid, None) for uid in user_ids]
# 过滤掉不存在的用户
username_list = [name for name in username_list if name is not None]
return username_list
except (ValueError, TypeError):
# 如果转换失败,返回空列表
return []
else:
# 用户名列表(兼容旧接口):直接返回
return approvers_list
def get_approvers_from_record(business_record, approval=None):
"""
从业务记录中获取审核人列表(统一方法)
优先从 approvers_order 字段读取JSON格式
如果没有则从 Approval.content 字段解析(兼容旧数据)
Args:
business_record: 业务记录对象(如 Schedule, Reimbursement, Income 等)
approval: Approval对象可选用于从content字段解析审核人列表
Returns:
list: 审核人列表,如 ["张三", "李四", "王五"],如果没有则返回空列表
"""
import json
import logging
logger = logging.getLogger(__name__)
# 优先从 approvers_order 字段读取
if hasattr(business_record, 'approvers_order') and business_record.approvers_order:
try:
approvers_list = json.loads(business_record.approvers_order)
if isinstance(approvers_list, list):
logger.info(f"get_approvers_from_record: 从 business_record.approvers_order 获取审核人列表: {approvers_list}")
return approvers_list
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"get_approvers_from_record: 解析 approvers_order 失败: {e}")
# 如果 business_record 没有 approvers_order 字段,尝试从 Approval.content 字段解析
if approval and approval.content:
try:
# 格式:审批流程:张三 → 李四 → 王五 → 财务部(按顺序审批),当前审批人:张三
if "审批流程:" in approval.content:
# 提取审批流程部分
flow_part = approval.content.split("审批流程:")[1]
# 查找"(按顺序审批)"的位置
if "(按顺序审批)" in flow_part:
flow_part = flow_part.split("(按顺序审批)")[0]
# 分割审核人(使用 → 分隔)
approvers_list = [a.strip() for a in flow_part.split("") if a.strip()]
# 移除最后的"财务"或"财务部"(兼容两种格式),以及"申请人(待查看)"(投标/立项最后一步)
if approvers_list:
last = approvers_list[-1].strip()
if last in ("财务", "财务部") or last == "申请人(待查看)":
approvers_list = approvers_list[:-1]
if approvers_list:
logger.info(f"get_approvers_from_record: 从 Approval.content 解析审核人列表: {approvers_list}")
return approvers_list
except Exception as e:
logger.warning(f"get_approvers_from_record: 从 Approval.content 解析失败: {e}", exc_info=True)
logger.info(f"get_approvers_from_record: 未找到审核人列表")
return []
def ensure_case_for_approved_project(project, approval=None):
"""
立项审批进入「待查看」或「已通过」时,若该立项尚未在案件管理中有对应案件,则自动创建一条案件记录。
立案时间 = 最后审批人审批通过的时间(进入待查看时写入 approval.completeTiem不再是申请人查看时间。
- 待查看:最后一个审核人通过后即建案,立案时间 = 当日completeTiem 在进入待查看时已写入)。
- 已通过:申请人查看后若尚未建案则补建,立案时间仍用 approval.completeTiem最后审批人通过日
Args:
project: ProjectRegistration 实例
approval: 可选,立项登记的 Approval 记录;不传则按 user_id 查询(仅查已通过)
"""
from business.models import Case, ProjectRegistration
from .models import Approval as ApprovalModel
if not project:
return
# 允许建案:立项已通过,或审批已进入待查看(审核人全部通过)
project_approved = getattr(project, 'state', None) == "已通过"
approval_pending_view = approval and getattr(approval, 'state', None) == "待查看"
if not project_approved and not approval_pending_view:
return
if Case.objects.filter(project_id=project.id, is_deleted=False).exists():
return
if approval is None:
approval = ApprovalModel.objects.filter(
type='立项登记', user_id=str(project.id), state='已通过', is_deleted=False
).order_by('-id').first()
if approval and getattr(approval, 'completeTiem', None):
filing_time = approval.completeTiem.strftime('%Y-%m-%d')
else:
filing_time = date.today().strftime('%Y-%m-%d')
Case.objects.create(
project_id=project.id,
contract_no=getattr(project, 'ContractNo', None) or '',
project_type=getattr(project, 'type', None) or '',
client_name=getattr(project, 'client_info', None) or '',
party_name=getattr(project, 'party_info', None) or '',
project_description=getattr(project, 'description', None) or '',
responsiblefor=getattr(project, 'responsiblefor', None) or '',
charge=getattr(project, 'charge', None) or '',
times=filing_time,
AgencyContract='[]',
Contractreturn='[]',
Closingapplication='[]',
ChangeRequest='',
state='已通过',
invoice_status='0',
paymentcollection='0',
)
def process_approval_flow(approval, business_record, current_approver, state,
approval_type, final_state_map=None, rejection_reason=None):
"""
统一的审核流程处理函数
规则:
- 如果 business_record 有 approvers_order 字段,从该字段读取审核人列表
- 如果没有审核人列表(个人团队),直接抄送财务
- 如果有审核人列表(多人团队),按顺序流转
- 最后一个审核人通过后,抄送财务
- 财务部只需要查看,不需要审批,查看后直接完成(审批记录和业务记录都更新为"已通过"
Args:
approval: Approval对象
business_record: 业务记录对象(如 Schedule, Reimbursement, Income 等)
current_approver: 当前审核人(从 approval.personincharge 获取)
state: 审核状态("已通过""未通过"
approval_type: 审批类型(如"待办""报销申请"等)
final_state_map: 状态映射字典,格式:{"已通过": "已完成", "未通过": "未通过"}
rejection_reason: 不通过原因(审核不通过时填写)
Returns:
tuple: (是否完成, 错误信息)
"""
import json
from .models import Approval
if final_state_map is None:
final_state_map = {"已通过": "已完成", "未通过": "未通过"}
def _sync_case_state_if_change_request(approval_type, business_record):
"""案件变更申请状态变更时,同步关联案件的状态"""
if approval_type == "案件变更" and business_record and getattr(business_record, 'case', None) and business_record.case_id:
business_record.case.state = business_record.state
business_record.case.save(update_fields=['state'])
# 如果审核不通过,直接结束
if state == "未通过":
approval.state = "未通过"
# 保存不通过原因
if rejection_reason:
approval.rejection_reason = rejection_reason
approval.save(update_fields=['state', 'rejection_reason'])
if business_record and hasattr(business_record, 'state'):
business_record.state = final_state_map.get("未通过", "未通过")
business_record.save(update_fields=['state'])
_sync_case_state_if_change_request(approval_type, business_record)
return True, None
# 申请人待查看阶段:当前步骤是申请人待查看,申请人查看后完成(投标/立项/案件变更等)
applicant = getattr(approval, 'applicant', None)
if approval.state == "待查看" and applicant and approval.personincharge == applicant:
# 已通过、已查看、未传 state 或前端传 undefined/null 等均视为申请人已查看完成
state_str = (state or "").strip().lower() if isinstance(state, str) else ""
if state == "已通过" or state == "已查看" or state is None or state == "" or state_str in ("undefined", "null"):
approval.state = "已通过"
# 立项登记在进入待查看时已写入 completeTiem最后审批人通过时间此处不改写其他类型或未写时补写
if not approval.completeTiem:
approval.completeTiem = date.today()
if "已通过" not in approval.content:
approval.content = (approval.content or "") + ",申请人已查看"
approval.save(update_fields=['state', 'content', 'completeTiem'])
if business_record and hasattr(business_record, 'state'):
business_record.state = final_state_map.get("已通过", "已通过")
business_record.save(update_fields=['state'])
_sync_case_state_if_change_request(approval_type, business_record)
# 立项:若待查看时已建案此处不会重复创建;若未建案则补建。立案时间不再按申请人查看日更新
if approval_type == "立项登记" and business_record:
ensure_case_for_approved_project(business_record, approval=approval)
return True, None
if state == "未通过":
approval.state = "未通过"
if rejection_reason:
approval.rejection_reason = rejection_reason
approval.save(update_fields=['state', 'rejection_reason'])
if business_record and hasattr(business_record, 'state'):
business_record.state = final_state_map.get("未通过", "未通过")
business_record.save(update_fields=['state'])
_sync_case_state_if_change_request(approval_type, business_record)
return True, None
return False, None
# 检查当前是否已经是财务审核部门ID或“财务”字符串均视为财务阶段
if is_finance_personincharge(approval.personincharge) and approval.state == "已抄送财务":
# 财务部只需要查看,不需要审批,查看后直接完成
# 如果不传state或state为"已通过",则完成审批
if not state or state == "已通过":
approval.state = "已通过"
if not approval.completeTiem:
approval.completeTiem = date.today()
approval.save(update_fields=['state', 'completeTiem'])
if business_record and hasattr(business_record, 'state'):
business_record.state = final_state_map.get("已通过", "已通过")
business_record.save(update_fields=['state'])
_sync_case_state_if_change_request(approval_type, business_record)
elif state == "未通过":
approval.state = "未通过"
# 保存不通过原因
if rejection_reason:
approval.rejection_reason = rejection_reason
approval.save(update_fields=['state', 'rejection_reason'])
if business_record and hasattr(business_record, 'state'):
business_record.state = final_state_map.get("未通过", "未通过")
business_record.save(update_fields=['state'])
_sync_case_state_if_change_request(approval_type, business_record)
return True, None
# 获取审核人列表
approvers_list = get_approvers_from_record(business_record, approval=approval)
import logging
logger = logging.getLogger(__name__)
logger.info(f"process_approval_flow: 审批类型={approval_type}, 当前审核人={current_approver}, 审核状态={state}, 审核人列表={approvers_list}")
if not approvers_list:
# 没有审核人列表(个人团队或直接到财务)
# 如果当前不是财务,则抄送财务
if not is_finance_personincharge(approval.personincharge):
finance_personincharge = get_finance_personincharge_value()
approval.personincharge = finance_personincharge
approval.state = "已抄送财务"
if "已抄送财务" not in approval.content and "已抄送财务部" not in approval.content:
approval.content = approval.content + ",已抄送财务部"
approval.save(update_fields=['state', 'personincharge', 'content'])
if business_record and hasattr(business_record, 'state'):
# 抄送财务时就已经审核通过,财务只是查看
# 使用 final_state_map 确保状态映射正确(如待办类型:已通过 -> 已完成)
business_record.state = final_state_map.get("已通过", "已通过")
business_record.save(update_fields=['state'])
_sync_case_state_if_change_request(approval_type, business_record)
return False, None
# 有审核人列表(多人团队),按顺序流转
try:
current_index = approvers_list.index(current_approver)
logger.info(f"process_approval_flow: 当前审核人在列表中的位置: {current_index}/{len(approvers_list)-1}")
except ValueError:
# 当前审核人不在列表中,可能是旧数据,直接抄送财务
logger.warning(f"process_approval_flow: 当前审核人 {current_approver} 不在审核人列表中 {approvers_list},直接抄送财务")
finance_personincharge = get_finance_personincharge_value()
approval.personincharge = finance_personincharge
approval.state = "已抄送财务"
approval.save(update_fields=['state', 'personincharge'])
return False, None
# 检查是否还有下一个审核人
if current_index < len(approvers_list) - 1:
# 不是最后一个审核人,流转到下一个
next_approver = approvers_list[current_index + 1]
logger.info(f"process_approval_flow: 流转到下一个审核人: {next_approver} (位置: {current_index + 1})")
approval.personincharge = next_approver
approval.state = "审核中"
# 更新审批内容,显示当前审批人
if "当前审批人:" in approval.content:
approval.content = approval.content.replace(
f"当前审批人:{current_approver}",
f"当前审批人:{next_approver}"
)
approval.save(update_fields=['state', 'personincharge', 'content'])
logger.info(f"process_approval_flow: 已更新审批记录personincharge={next_approver}, state=审核中")
return False, None
else:
# 最后一个审核人已通过:投标/立项/案件变更/结案申请转申请人待查看,其他类型抄送财务
applicant = getattr(approval, 'applicant', None)
if approval_type in ("投标登记", "立项登记", "案件变更", "结案申请") and applicant:
logger.info(f"process_approval_flow: 最后一个审核人已审核,流转到申请人待查看: {applicant}")
approval.personincharge = applicant
approval.state = "待查看"
# 立项登记:立案时间 = 最后审批人通过时间,在此刻写入 completeTiem
if approval_type == "立项登记":
approval.completeTiem = date.today()
if "待申请人查看" not in (approval.content or ""):
approval.content = (approval.content or "") + ",待申请人查看"
approval.save(update_fields=['state', 'personincharge', 'content', 'completeTiem'] if approval_type == "立项登记" else ['state', 'personincharge', 'content'])
# 立项登记:最后审批人通过后,立项即为「已通过」,待查看只是通知不阻塞
if approval_type == "立项登记" and business_record:
if hasattr(business_record, 'state'):
business_record.state = "已通过"
business_record.save(update_fields=['state'])
# 立项审批通过后直接创建案件,不等待申请人查看
ensure_case_for_approved_project(business_record, approval=approval)
# 投标/案件变更等:不更新业务记录状态,等申请人查看后再设为已通过
return False, None
# 其他类型:抄送财务
logger.info(f"process_approval_flow: 最后一个审核人已审核,流转到财务部")
finance_personincharge = get_finance_personincharge_value()
approval.personincharge = finance_personincharge
approval.state = "已抄送财务"
if "已抄送财务" not in (approval.content or "") and "已抄送财务部" not in (approval.content or ""):
approval.content = (approval.content or "") + ",已抄送财务部"
approval.save(update_fields=['state', 'personincharge', 'content'])
if business_record and hasattr(business_record, 'state'):
final_state = final_state_map.get("已通过", "已通过")
business_record.state = final_state
business_record.save(update_fields=['state'])
_sync_case_state_if_change_request(approval_type, business_record)
logger.info(f"process_approval_flow: 已抄送财务部personincharge=%s, state=已抄送财务,业务记录状态=%s", approval.personincharge, final_state_map.get("已通过", "已通过"))
return False, None
def create_approval_with_team_logic(team_name, approvers, title, content, approval_type, user_id,
business_record=None, today=None, applicant=None, force_approval=False):
"""
根据团队类型创建审批记录(统一逻辑)
规则:
- 个人团队personal直接抄送财务personincharge=财务部IDstate="已抄送财务"
- 投标登记/立项登记:最后一步给申请人,生成「待查看」待办,申请人查看后完成(不再给财务部)
- 其他类型团队team需要审核人按顺序审核最后抄送财务
- 无团队:直接抄送财务
- 强制审批模式force_approval=True即使是个人团队也需要审批用于付款申请、报销、工资/奖金变更等
注意personincharge字段统一使用财务部ID优先或回退到"财务"字符串
Args:
team_name: 团队名称
approvers: 审核人列表(可以是数组或字符串,多人团队时需要)
title: 审批标题
content: 审批内容
approval_type: 审批类型(如"立项登记""投标登记""付款申请"等)
user_id: 关联的业务ID字符串
business_record: 业务记录对象(可选)
today: 日期字符串可选格式YYYY-MM-DD
applicant: 申请人用户名(可选,投标/立项时填,最后一步生成待查看待办给申请人)
force_approval: 是否强制审批默认False。设为True时即使是个人团队也需要审批如付款申请、报销、工资/奖金变更)
Returns:
tuple: (approval对象, approvers_order_json, 是否需要审核)
"""
import json
from datetime import datetime
from .models import Team, Approval, User
if today is None:
today = datetime.now().strftime("%Y-%m-%d")
# 查找团队
team = None
if team_name:
try:
team = Team.objects.prefetch_related('approvers').filter(name=team_name, is_deleted=False).first()
except:
team = None
# 优先检查是否传入了审核人(无论团队类型如何,如果传入了审核人,都应该使用)
approvers_list = parse_approvers(approvers)
# 如果传入了审核人使用传入的审核人即使团队类型是personal或无团队
if approvers_list:
# 验证审核人是否存在,过滤掉无效的审核人
valid_approvers_list = []
invalid_approvers_list = []
for approver_name in approvers_list:
if User.objects.filter(username=approver_name, is_deleted=False).exists():
valid_approvers_list.append(approver_name)
else:
invalid_approvers_list.append(approver_name)
# 如果所有审核人都无效,返回 None
if not valid_approvers_list:
return None, None, True # 审核人不存在
# 使用有效的审核人列表
approvers_list = valid_approvers_list
# 将审核人顺序存储为JSON格式
approvers_order_json = json.dumps(approvers_list, ensure_ascii=False)
# 存储到业务记录
if business_record and hasattr(business_record, 'approvers_order'):
business_record.approvers_order = approvers_order_json
business_record.state = "审核中"
business_record.save(update_fields=['approvers_order', 'state'])
# 创建审批记录,第一个审核人
first_approver = approvers_list[0]
approvers_str = ''.join(approvers_list) # 使用箭头表示顺序
# 投标登记/立项登记/案件变更/结案申请:最后一步给申请人(待查看),不再给财务部
if approval_type in ("投标登记", "立项登记", "案件变更", "结案申请") and applicant:
flow_suffix = " → 申请人(待查看)"
else:
flow_suffix = " → 财务部(按顺序审批)"
content_with_flow = f"{content},审批流程:{approvers_str}{flow_suffix},当前审批人:{first_approver}"
import logging
logger = logging.getLogger(__name__)
logger.info(f"create_approval_with_team_logic: 创建审批记录(使用传入的审核人)- 审批类型={approval_type}, 审核人列表={approvers_list}, 第一个审核人={first_approver}, approvers_order_json={approvers_order_json}")
approval = Approval.objects.create(
title=title,
content=content_with_flow,
times=today,
personincharge=first_approver,
state="审核中",
type=approval_type,
user_id=str(user_id),
applicant=applicant
)
logger.info(f"create_approval_with_team_logic: 审批记录已创建 - ID={approval.id}, personincharge={approval.personincharge}, state={approval.state}")
return approval, approvers_order_json, True
# 如果没有传入审核人,则根据团队类型判断
# 判断团队类型
if not team_name or not team or (team and team.team_type == 'personal'):
# 强制审批模式(付款申请、报销、工资/奖金变更等):即使是个人团队也需要审批
if force_approval:
import logging
logger = logging.getLogger(__name__)
logger.info(f"create_approval_with_team_logic: 强制审批模式 - 审批类型={approval_type}, 团队={team_name}")
# 尝试获取默认审核人:优先律所负责人,然后管委会成员
default_approver = get_law_firm_leader(team_name)
if not default_approver:
# 如果找不到默认审核人,返回错误
logger.warning(f"create_approval_with_team_logic: 强制审批模式下找不到默认审核人")
return None, None, True # needs_approval = True表示需要审批但缺少审核人
# 使用默认审核人创建审批
approvers_list = [default_approver]
approvers_order_json = json.dumps(approvers_list, ensure_ascii=False)
# 存储到业务记录
if business_record and hasattr(business_record, 'approvers_order'):
business_record.approvers_order = approvers_order_json
business_record.state = "审核中"
business_record.save(update_fields=['approvers_order', 'state'])
# 创建审批流程内容
content_with_flow = f"{content},审批流程:{default_approver} → 财务部(按顺序审批),当前审批人:{default_approver}"
logger.info(f"create_approval_with_team_logic: 强制审批 - 使用默认审核人 {default_approver}")
approval = Approval.objects.create(
title=title,
content=content_with_flow,
times=today,
personincharge=default_approver,
state="审核中",
type=approval_type,
user_id=str(user_id),
applicant=applicant
)
return approval, approvers_order_json, True
# 投标登记/立项登记/案件变更/结案申请且传入了申请人:最后一步给申请人,生成待查看待办(不再给财务部)
if approval_type in ("投标登记", "立项登记", "案件变更", "结案申请") and applicant:
content_to_save = content + ",待申请人查看"
approval = Approval.objects.create(
title=title,
content=content_to_save,
times=today,
personincharge=applicant,
state="待查看",
type=approval_type,
user_id=str(user_id),
applicant=applicant
)
# 立项登记:个人团队无审核人,直接进入待查看,立项即为「已通过」,待查看只是通知
if approval_type == "立项登记" and business_record:
if hasattr(business_record, 'state'):
business_record.state = "已通过"
business_record.save(update_fields=['state'])
# 立项审批通过后直接创建案件,不等待申请人查看
ensure_case_for_approved_project(business_record, approval=approval)
# 投标/案件变更等:不更新业务记录状态,等申请人查看后再设为已通过
return approval, None, False
# 其他类型:个人团队或无团队,直接到财务团队审核
finance_personincharge = get_finance_personincharge_value()
content_to_save = content
if "已抄送财务" not in content and "已抄送财务部" not in content:
content_to_save = content + ",已抄送财务部"
approval = Approval.objects.create(
title=title,
content=content_to_save,
times=today,
personincharge=finance_personincharge,
state="已抄送财务",
type=approval_type,
user_id=str(user_id),
applicant=applicant
)
# 更新业务记录状态:抄送财务时就已经审核通过,财务只是查看
if business_record:
if approval_type in ["待办", "结案申请"]:
business_record.state = "已完成"
else:
business_record.state = "已通过"
business_record.save(update_fields=['state'])
return approval, None, False
elif team and team.team_type == 'team':
# 团队类型:需要审核人审核(按顺序)
# 如果没有传入审核人,使用团队的审核人
team_approvers = team.approvers.filter(is_deleted=False).order_by('id')
approvers_list = [approver.username for approver in team_approvers]
if not approvers_list:
return None, None, True # 需要审核但没有审核人
# 验证审核人是否存在,过滤掉无效的审核人
valid_approvers_list = []
invalid_approvers_list = []
for approver_name in approvers_list:
if User.objects.filter(username=approver_name, is_deleted=False).exists():
valid_approvers_list.append(approver_name)
else:
invalid_approvers_list.append(approver_name)
# 如果所有审核人都无效,返回 None
if not valid_approvers_list:
return None, None, True # 审核人不存在
# 使用有效的审核人列表
approvers_list = valid_approvers_list
# 将审核人顺序存储为JSON格式
approvers_order_json = json.dumps(approvers_list, ensure_ascii=False)
# 存储到业务记录
if business_record and hasattr(business_record, 'approvers_order'):
business_record.approvers_order = approvers_order_json
business_record.state = "审核中"
business_record.save(update_fields=['approvers_order', 'state'])
# 创建审批记录,第一个审核人
first_approver = approvers_list[0]
approvers_str = ''.join(approvers_list)
if approval_type in ("投标登记", "立项登记", "案件变更", "结案申请") and applicant:
flow_suffix = " → 申请人(待查看)"
else:
flow_suffix = " → 财务部(按顺序审批)"
content_with_flow = f"{content},审批流程:{approvers_str}{flow_suffix},当前审批人:{first_approver}"
import logging
logger = logging.getLogger(__name__)
logger.info(f"create_approval_with_team_logic: 创建审批记录(使用团队配置的审核人)- 审批类型={approval_type}, 审核人列表={approvers_list}, 第一个审核人={first_approver}, approvers_order_json={approvers_order_json}")
approval = Approval.objects.create(
title=title,
content=content_with_flow,
times=today,
personincharge=first_approver,
state="审核中",
type=approval_type,
user_id=str(user_id),
applicant=applicant
)
logger.info(f"create_approval_with_team_logic: 审批记录已创建 - ID={approval.id}, personincharge={approval.personincharge}, state={approval.state}")
return approval, approvers_order_json, True
else:
# 找不到团队或团队类型未知,直接抄送财务
finance_personincharge = get_finance_personincharge_value()
content_to_save = content
if "已抄送财务" not in content and "已抄送财务部" not in content:
content_to_save = content + ",已抄送财务部"
approval = Approval.objects.create(
title=title,
content=content_to_save,
times=today,
personincharge=finance_personincharge,
state="已抄送财务",
type=approval_type,
user_id=str(user_id),
applicant=applicant
)
# 更新业务记录状态:抄送财务时就已经审核通过,财务只是查看
if business_record:
if approval_type in ["待办", "结案申请"]:
business_record.state = "已完成"
else:
business_record.state = "已通过"
business_record.save(update_fields=['state'])
return approval, None, False