Files
boss_dp/server/api/contacts.py
ddrwode 8ac650e27c haha
2026-03-04 17:18:31 +08:00

229 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
联系记录 API需要登录
- GET /api/contacts -> 查询联系记录(支持分页、搜索)
- GET /api/contacts/query -> 按电话号码或微信号查询(数据展示)
- GET /api/contacts/export -> 导出联系记录为 Excel 文件(返回下载链接)
- POST /api/contacts -> 创建联系记录
- GET /api/contacts/{id} -> 查询单条记录
- PUT /api/contacts/{id} -> 更新记录
- DELETE /api/contacts/{id} -> 删除记录
"""
import os
import uuid
from datetime import datetime
from django.db.models import Q
from django.conf import settings
from rest_framework import status
from rest_framework.decorators import api_view
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill
from server.core.response import api_success, api_error
from server.models import ContactRecord
from server.serializers import ContactRecordSerializer
def _normalize_media_url(media_url: str) -> str:
"""Normalize MEDIA_URL so it always ends with '/' and is safe for URL拼接."""
media_url = (media_url or "/media/").strip()
if media_url.startswith(("http://", "https://")):
return f"{media_url.rstrip('/')}/"
return f"/{media_url.strip('/')}/"
@api_view(["GET", "POST"])
def contact_list(request):
if request.method == "GET":
search = request.query_params.get("search", "")
page = int(request.query_params.get("page", 1))
page_size = int(request.query_params.get("page_size", 10))
reply_status = request.query_params.get("reply_status")
wechat = request.query_params.get("wechat_exchanged")
start_date = request.query_params.get("start_date")
end_date = request.query_params.get("end_date")
qs = ContactRecord.objects.all()
if search:
qs = qs.filter(
Q(name__icontains=search) |
Q(position__icontains=search) |
Q(contact__icontains=search)
)
if reply_status:
qs = qs.filter(reply_status=reply_status)
if wechat is not None:
qs = qs.filter(wechat_exchanged=wechat.lower() in ("true", "1"))
if start_date:
qs = qs.filter(created_at__gte=start_date)
if end_date:
qs = qs.filter(created_at__lte=f"{end_date} 23:59:59")
total = qs.count()
start = (page - 1) * page_size
end = start + page_size
records = qs[start:end]
return api_success({
"total": total,
"page": page,
"page_size": page_size,
"results": ContactRecordSerializer(records, many=True).data,
})
ser = ContactRecordSerializer(data=request.data)
ser.is_valid(raise_exception=True)
ser.save()
return api_success(ser.data, http_status=status.HTTP_201_CREATED)
@api_view(["GET"])
def contact_query(request):
"""
按电话号码或微信号查询联系记录(数据展示接口)。
传入 contact 参数(电话号码或微信号),返回匹配到的所有记录。
"""
contact = (request.query_params.get("contact") or "").strip()
if not contact:
return api_error(status.HTTP_400_BAD_REQUEST, "请提供 contact 参数(电话号码或微信号)")
page = int(request.query_params.get("page", 1))
page_size = int(request.query_params.get("page_size", 20))
qs = ContactRecord.objects.filter(contact__icontains=contact).order_by("-created_at")
total = qs.count()
start = (page - 1) * page_size
end = start + page_size
records = qs[start:end]
return api_success({
"total": total,
"page": page,
"page_size": page_size,
"contact_keyword": contact,
"results": ContactRecordSerializer(records, many=True).data,
})
@api_view(["GET"])
def contact_export(request):
"""
导出联系记录为 Excel 文件。
支持与列表接口相同的筛选参数search, reply_status, wechat_exchanged, start_date, end_date
返回下载链接而不是直接返回文件流
"""
search = request.query_params.get("search", "")
reply_status = request.query_params.get("reply_status")
wechat = request.query_params.get("wechat_exchanged")
start_date = request.query_params.get("start_date")
end_date = request.query_params.get("end_date")
# 构建查询
qs = ContactRecord.objects.all()
if search:
qs = qs.filter(
Q(name__icontains=search) |
Q(position__icontains=search) |
Q(contact__icontains=search)
)
if reply_status:
qs = qs.filter(reply_status=reply_status)
if wechat is not None:
qs = qs.filter(wechat_exchanged=wechat.lower() in ("true", "1"))
if start_date:
qs = qs.filter(created_at__gte=start_date)
if end_date:
qs = qs.filter(created_at__lte=f"{end_date} 23:59:59")
# 创建 Excel 工作簿
wb = Workbook()
ws = wb.active
ws.title = "联系记录"
# 设置表头样式
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_font = Font(bold=True, color="FFFFFF", size=11)
header_alignment = Alignment(horizontal="center", vertical="center")
# 定义表头
headers = ["ID", "姓名", "岗位", "联系方式", "回复状态", "是否交换微信", "Worker ID", "备注", "联系时间", "创建时间"]
ws.append(headers)
# 应用表头样式
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = header_alignment
# 写入数据
for record in qs.order_by("-created_at"):
ws.append([
record.id,
record.name,
record.position,
record.contact,
record.reply_status,
"" if record.wechat_exchanged else "",
record.worker_id,
record.notes,
record.contacted_at.strftime("%Y-%m-%d %H:%M:%S") if record.contacted_at else "",
record.created_at.strftime("%Y-%m-%d %H:%M:%S"),
])
# 调整列宽
column_widths = [8, 12, 15, 18, 12, 15, 15, 30, 20, 20]
for i, width in enumerate(column_widths, start=1):
ws.column_dimensions[chr(64 + i)].width = width
# 设置数据行样式
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
for cell in row:
cell.alignment = Alignment(vertical="center")
# 确保导出目录存在
export_dir = settings.EXPORT_ROOT
os.makedirs(export_dir, exist_ok=True)
# 生成唯一文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = uuid.uuid4().hex[:8]
filename = f"contacts_export_{timestamp}_{unique_id}.xlsx"
filepath = os.path.join(export_dir, filename)
# 保存文件
wb.save(filepath)
# 生成下载链接(始终返回可直接访问的完整 URL
media_url = _normalize_media_url(getattr(settings, "MEDIA_URL", "/media/"))
download_path = f"{media_url}exports/{filename}"
download_url = request.build_absolute_uri(download_path)
return api_success({
"download_url": download_url,
"filename": filename,
"total_records": qs.count(),
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
@api_view(["GET", "PUT", "DELETE"])
def contact_detail(request, pk):
try:
obj = ContactRecord.objects.get(pk=pk)
except ContactRecord.DoesNotExist:
return api_error(status.HTTP_404_NOT_FOUND, "联系记录不存在")
if request.method == "GET":
return api_success(ContactRecordSerializer(obj).data)
if request.method == "PUT":
ser = ContactRecordSerializer(obj, data=request.data, partial=True)
ser.is_valid(raise_exception=True)
ser.save()
return api_success(ser.data)
obj.delete()
return api_success(msg="联系记录已删除")