229 lines
7.8 KiB
Python
229 lines
7.8 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
联系记录 API(需要登录):
|
||
- GET /api/contacts -> 查询联系记录(支持分页、搜索)
|
||
- GET /api/contacts/query -> 按电话号码或微信号查询(数据展示)
|
||
- GET /api/contacts/export -> 导出联系记录为 Excel 文件(返回下载链接)
|
||
- POST /api/contacts -> 创建联系记录
|
||
- GET /api/contacts/{id} -> 查询单条记录
|
||
- PUT /api/contacts/{id} -> 更新记录
|
||
- DELETE /api/contacts/{id} -> 删除记录
|
||
"""
|
||
import os
|
||
import uuid
|
||
from datetime import datetime
|
||
|
||
from django.db.models import Q
|
||
from django.conf import settings
|
||
|
||
from rest_framework import status
|
||
from rest_framework.decorators import api_view
|
||
|
||
from openpyxl import Workbook
|
||
from openpyxl.styles import Font, Alignment, PatternFill
|
||
|
||
from server.core.response import api_success, api_error
|
||
from server.models import ContactRecord
|
||
from server.serializers import ContactRecordSerializer
|
||
|
||
|
||
def _normalize_media_url(media_url: str) -> str:
|
||
"""Normalize MEDIA_URL so it always ends with '/' and is safe for URL拼接."""
|
||
media_url = (media_url or "/media/").strip()
|
||
if media_url.startswith(("http://", "https://")):
|
||
return f"{media_url.rstrip('/')}/"
|
||
return f"/{media_url.strip('/')}/"
|
||
|
||
|
||
@api_view(["GET", "POST"])
|
||
def contact_list(request):
|
||
if request.method == "GET":
|
||
search = request.query_params.get("search", "")
|
||
page = int(request.query_params.get("page", 1))
|
||
page_size = int(request.query_params.get("page_size", 10))
|
||
reply_status = request.query_params.get("reply_status")
|
||
wechat = request.query_params.get("wechat_exchanged")
|
||
start_date = request.query_params.get("start_date")
|
||
end_date = request.query_params.get("end_date")
|
||
|
||
qs = ContactRecord.objects.all()
|
||
if search:
|
||
qs = qs.filter(
|
||
Q(name__icontains=search) |
|
||
Q(position__icontains=search) |
|
||
Q(contact__icontains=search)
|
||
)
|
||
if reply_status:
|
||
qs = qs.filter(reply_status=reply_status)
|
||
if wechat is not None:
|
||
qs = qs.filter(wechat_exchanged=wechat.lower() in ("true", "1"))
|
||
if start_date:
|
||
qs = qs.filter(created_at__gte=start_date)
|
||
if end_date:
|
||
qs = qs.filter(created_at__lte=f"{end_date} 23:59:59")
|
||
|
||
total = qs.count()
|
||
start = (page - 1) * page_size
|
||
end = start + page_size
|
||
records = qs[start:end]
|
||
|
||
return api_success({
|
||
"total": total,
|
||
"page": page,
|
||
"page_size": page_size,
|
||
"results": ContactRecordSerializer(records, many=True).data,
|
||
})
|
||
|
||
ser = ContactRecordSerializer(data=request.data)
|
||
ser.is_valid(raise_exception=True)
|
||
ser.save()
|
||
return api_success(ser.data, http_status=status.HTTP_201_CREATED)
|
||
|
||
|
||
@api_view(["GET"])
|
||
def contact_query(request):
|
||
"""
|
||
按电话号码或微信号查询联系记录(数据展示接口)。
|
||
传入 contact 参数(电话号码或微信号),返回匹配到的所有记录。
|
||
"""
|
||
contact = (request.query_params.get("contact") or "").strip()
|
||
if not contact:
|
||
return api_error(status.HTTP_400_BAD_REQUEST, "请提供 contact 参数(电话号码或微信号)")
|
||
page = int(request.query_params.get("page", 1))
|
||
page_size = int(request.query_params.get("page_size", 20))
|
||
|
||
qs = ContactRecord.objects.filter(contact__icontains=contact).order_by("-created_at")
|
||
total = qs.count()
|
||
start = (page - 1) * page_size
|
||
end = start + page_size
|
||
records = qs[start:end]
|
||
|
||
return api_success({
|
||
"total": total,
|
||
"page": page,
|
||
"page_size": page_size,
|
||
"contact_keyword": contact,
|
||
"results": ContactRecordSerializer(records, many=True).data,
|
||
})
|
||
|
||
|
||
@api_view(["GET"])
|
||
def contact_export(request):
|
||
"""
|
||
导出联系记录为 Excel 文件。
|
||
支持与列表接口相同的筛选参数:search, reply_status, wechat_exchanged, start_date, end_date
|
||
返回下载链接而不是直接返回文件流
|
||
"""
|
||
search = request.query_params.get("search", "")
|
||
reply_status = request.query_params.get("reply_status")
|
||
wechat = request.query_params.get("wechat_exchanged")
|
||
start_date = request.query_params.get("start_date")
|
||
end_date = request.query_params.get("end_date")
|
||
|
||
# 构建查询
|
||
qs = ContactRecord.objects.all()
|
||
if search:
|
||
qs = qs.filter(
|
||
Q(name__icontains=search) |
|
||
Q(position__icontains=search) |
|
||
Q(contact__icontains=search)
|
||
)
|
||
if reply_status:
|
||
qs = qs.filter(reply_status=reply_status)
|
||
if wechat is not None:
|
||
qs = qs.filter(wechat_exchanged=wechat.lower() in ("true", "1"))
|
||
if start_date:
|
||
qs = qs.filter(created_at__gte=start_date)
|
||
if end_date:
|
||
qs = qs.filter(created_at__lte=f"{end_date} 23:59:59")
|
||
|
||
# 创建 Excel 工作簿
|
||
wb = Workbook()
|
||
ws = wb.active
|
||
ws.title = "联系记录"
|
||
|
||
# 设置表头样式
|
||
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
|
||
header_font = Font(bold=True, color="FFFFFF", size=11)
|
||
header_alignment = Alignment(horizontal="center", vertical="center")
|
||
|
||
# 定义表头
|
||
headers = ["ID", "姓名", "岗位", "联系方式", "回复状态", "是否交换微信", "Worker ID", "备注", "联系时间", "创建时间"]
|
||
ws.append(headers)
|
||
|
||
# 应用表头样式
|
||
for cell in ws[1]:
|
||
cell.fill = header_fill
|
||
cell.font = header_font
|
||
cell.alignment = header_alignment
|
||
|
||
# 写入数据
|
||
for record in qs.order_by("-created_at"):
|
||
ws.append([
|
||
record.id,
|
||
record.name,
|
||
record.position,
|
||
record.contact,
|
||
record.reply_status,
|
||
"是" if record.wechat_exchanged else "否",
|
||
record.worker_id,
|
||
record.notes,
|
||
record.contacted_at.strftime("%Y-%m-%d %H:%M:%S") if record.contacted_at else "",
|
||
record.created_at.strftime("%Y-%m-%d %H:%M:%S"),
|
||
])
|
||
|
||
# 调整列宽
|
||
column_widths = [8, 12, 15, 18, 12, 15, 15, 30, 20, 20]
|
||
for i, width in enumerate(column_widths, start=1):
|
||
ws.column_dimensions[chr(64 + i)].width = width
|
||
|
||
# 设置数据行样式
|
||
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
|
||
for cell in row:
|
||
cell.alignment = Alignment(vertical="center")
|
||
|
||
# 确保导出目录存在
|
||
export_dir = settings.EXPORT_ROOT
|
||
os.makedirs(export_dir, exist_ok=True)
|
||
|
||
# 生成唯一文件名
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
unique_id = uuid.uuid4().hex[:8]
|
||
filename = f"contacts_export_{timestamp}_{unique_id}.xlsx"
|
||
filepath = os.path.join(export_dir, filename)
|
||
|
||
# 保存文件
|
||
wb.save(filepath)
|
||
|
||
# 生成下载链接(始终返回可直接访问的完整 URL)
|
||
media_url = _normalize_media_url(getattr(settings, "MEDIA_URL", "/media/"))
|
||
download_path = f"{media_url}exports/{filename}"
|
||
download_url = request.build_absolute_uri(download_path)
|
||
|
||
return api_success({
|
||
"download_url": download_url,
|
||
"filename": filename,
|
||
"total_records": qs.count(),
|
||
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
})
|
||
|
||
|
||
@api_view(["GET", "PUT", "DELETE"])
|
||
def contact_detail(request, pk):
|
||
try:
|
||
obj = ContactRecord.objects.get(pk=pk)
|
||
except ContactRecord.DoesNotExist:
|
||
return api_error(status.HTTP_404_NOT_FOUND, "联系记录不存在")
|
||
|
||
if request.method == "GET":
|
||
return api_success(ContactRecordSerializer(obj).data)
|
||
|
||
if request.method == "PUT":
|
||
ser = ContactRecordSerializer(obj, data=request.data, partial=True)
|
||
ser.is_valid(raise_exception=True)
|
||
ser.save()
|
||
return api_success(ser.data)
|
||
|
||
obj.delete()
|
||
return api_success(msg="联系记录已删除")
|