Files
ai_web/backend/apps/products/api.py
2026-02-04 15:25:04 +08:00

941 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Products API routes for categories, websites, products and prices.
"""
from typing import List, Optional
import re
import time
import os
import uuid
import mimetypes
from decimal import Decimal, InvalidOperation
import csv
import io
from ninja import Router, Query, File
from ninja.errors import HttpError
from ninja.errors import HttpError
from ninja.files import UploadedFile
from ninja_jwt.authentication import JWTAuth
from ninja.pagination import paginate, PageNumberPagination
from django.conf import settings
from django.db.models import Count, Min, Max, Q, Prefetch, F
from django.db import transaction, IntegrityError
from django.shortcuts import get_object_or_404
from django.core.exceptions import ObjectDoesNotExist
from django.core.files.storage import default_storage
from .models import Category, Website, Product, ProductPrice, ProductPriceHistory, ComparisonTag, ComparisonTagItem
from .schemas import (
CategoryOut, CategoryIn,
WebsiteOut, WebsiteIn, WebsiteFilter,
ProductOut, ProductIn, ProductWithPricesOut, ProductPriceOut, ProductPriceIn,
ProductFilter,
ProductSearchFilter,
ProductPriceHistoryOut,
PriceHistoryStatsOut,
EnhancedProductPriceOut,
ComparisonTagOut,
ComparisonTagDetailOut,
ComparisonTagItemOut,
ImportResultOut,
MyProductOut,
UploadImageOut,
)
from apps.favorites.models import Favorite
router = Router()
category_router = Router()
website_router = Router()
compare_tag_router = Router()
MAX_PRODUCT_IMAGES = 6
def record_product_price_history(price_record: ProductPrice):
"""Record price history for a product price record."""
ProductPriceHistory.objects.create(
product_id=price_record.product_id,
website_id=price_record.website_id,
price=price_record.price,
)
# ==================== Category Routes ====================
@category_router.get("/", response=List[CategoryOut])
def list_categories(request):
"""Get all categories."""
return Category.objects.all()
@category_router.get("/{slug}", response=CategoryOut)
def get_category_by_slug(request, slug: str):
"""Get category by slug."""
return get_object_or_404(Category, slug=slug)
def require_admin(user):
"""Check if user is admin."""
from ninja.errors import HttpError
if not user or user.role != 'admin' or not user.is_active:
raise HttpError(403, "仅管理员可执行此操作")
def normalize_category_slug(name: str, slug: str) -> str:
"""Normalize category slug and ensure it's not empty."""
raw_slug = (slug or "").strip()
if raw_slug:
return raw_slug
base = re.sub(r"\s+", "-", name.strip().lower())
base = re.sub(r"[^a-z0-9-]", "", base)
if not base:
base = f"category-{int(time.time())}"
if Category.objects.filter(slug=base).exists():
suffix = 1
while Category.objects.filter(slug=f"{base}-{suffix}").exists():
suffix += 1
base = f"{base}-{suffix}"
return base
@category_router.post("/", response=CategoryOut, auth=JWTAuth())
def create_category(request, data: CategoryIn):
"""Create a new category."""
name = (data.name or "").strip()
if not name:
raise HttpError(400, "分类名称不能为空")
slug = normalize_category_slug(name, data.slug)
if len(slug) > 100:
raise HttpError(400, "分类标识过长")
try:
category = Category.objects.create(
name=name,
slug=slug,
description=data.description,
icon=data.icon,
parent_id=data.parent_id,
sort_order=data.sort_order or 0,
)
except IntegrityError:
raise HttpError(400, "分类标识已存在")
return category
# ==================== Website Routes ====================
@website_router.get("/", response=List[WebsiteOut])
@paginate(PageNumberPagination, page_size=20)
def list_websites(request, filters: WebsiteFilter = Query(...)):
"""Get all websites with optional filters."""
queryset = Website.objects.all()
if filters.category_id:
queryset = queryset.filter(category_id=filters.category_id)
if filters.is_verified is not None:
queryset = queryset.filter(is_verified=filters.is_verified)
return queryset
@website_router.get("/{website_id}", response=WebsiteOut)
def get_website(request, website_id: int):
"""Get website by ID."""
return get_object_or_404(Website, id=website_id)
@website_router.post("/", response=WebsiteOut, auth=JWTAuth())
def create_website(request, data: WebsiteIn):
"""Create a new website. Any authenticated user can create."""
website = Website.objects.create(**data.dict())
return website
# ==================== Comparison Tag Routes ====================
@compare_tag_router.get("/", response=List[ComparisonTagOut])
def list_compare_tags(request):
"""List active comparison tags."""
tags = (
ComparisonTag.objects.filter(is_active=True)
.annotate(product_count=Count("items", filter=Q(items__product__status="approved"), distinct=True))
.order_by("sort_order", "id")
)
return [
ComparisonTagOut(
id=tag.id,
name=tag.name,
slug=tag.slug,
description=tag.description,
cover_image=tag.cover_image,
icon=tag.icon,
sort_order=tag.sort_order,
is_active=tag.is_active,
product_count=tag.product_count or 0,
created_at=tag.created_at,
updated_at=tag.updated_at,
)
for tag in tags
]
@compare_tag_router.get("/{slug}", response=ComparisonTagDetailOut)
def get_compare_tag_detail(request, slug: str, only_discounted: bool = False, min_discount_percent: Optional[int] = None, only_historical_lowest: bool = False):
"""Get comparison tag detail with product prices and enhanced stats."""
tag = get_object_or_404(ComparisonTag, slug=slug, is_active=True)
prices_prefetch = Prefetch(
"product__prices",
queryset=ProductPrice.objects.select_related("website"),
)
items = (
ComparisonTagItem.objects.filter(tag=tag, product__status="approved")
.select_related("product", "product__category")
.prefetch_related(prices_prefetch)
.order_by("-is_pinned", "sort_order", "id")
)
# 获取所有商品ID批量查询历史最低价
product_ids = [item.product_id for item in items]
historical_lowest_map = {}
if product_ids:
from django.db.models.functions import Coalesce
from django.db.models import Subquery, OuterRef
# 查询每个商品的历史最低价
for pid in product_ids:
history = ProductPriceHistory.objects.filter(product_id=pid).order_by("price").first()
if history:
historical_lowest_map[pid] = {
"price": history.price,
"date": history.recorded_at,
}
item_out: List[ComparisonTagItemOut] = []
for item in items:
product = item.product
prices = list(product.prices.all())
# 获取历史最低价信息
hist_info = historical_lowest_map.get(product.id, {})
historical_lowest = hist_info.get("price")
historical_lowest_date = hist_info.get("date")
# 当前最低价
current_lowest = min((pp.price for pp in prices), default=None)
current_highest = max((pp.price for pp in prices), default=None)
# 判断是否处于历史最低
is_at_historical_lowest = False
if historical_lowest and current_lowest:
is_at_historical_lowest = current_lowest <= historical_lowest
# 计算距最高价降幅百分比
discount_from_highest_percent = None
if current_lowest and current_highest and current_highest > 0:
discount_from_highest_percent = round((1 - current_lowest / current_highest) * 100, 1)
# 推荐逻辑
is_recommended = False
recommendation_reason = None
if is_at_historical_lowest:
is_recommended = True
recommendation_reason = "历史最低价"
elif discount_from_highest_percent and discount_from_highest_percent >= 30:
is_recommended = True
recommendation_reason = f"降幅{discount_from_highest_percent}%"
# 筛选:只看降价商品
if only_discounted:
has_discount = any(pp.original_price and pp.original_price > pp.price for pp in prices)
if not has_discount:
continue
# 筛选:最小降价幅度
if min_discount_percent:
max_discount = 0
for pp in prices:
if pp.original_price and pp.original_price > 0:
discount = (1 - pp.price / pp.original_price) * 100
max_discount = max(max_discount, discount)
if max_discount < min_discount_percent:
continue
# 筛选:只看历史最低
if only_historical_lowest and not is_at_historical_lowest:
continue
price_outs: List[EnhancedProductPriceOut] = []
for pp in prices:
website_name = None
website_logo = None
if pp.website_id:
website_name = pp.website.name
website_logo = pp.website.logo
# 计算单个价格的降价信息
discount_percent = None
discount_amount = None
if pp.original_price and pp.original_price > pp.price:
discount_amount = pp.original_price - pp.price
discount_percent = round((discount_amount / pp.original_price) * 100, 1)
# 判断是否处于该商品的历史最低
price_is_at_lowest = False
if historical_lowest:
price_is_at_lowest = pp.price <= historical_lowest
price_outs.append(EnhancedProductPriceOut(
id=pp.id,
product_id=pp.product_id,
website_id=pp.website_id,
website_name=website_name,
website_logo=website_logo,
price=pp.price,
original_price=pp.original_price,
currency=pp.currency,
url=pp.url,
in_stock=pp.in_stock,
last_checked=pp.last_checked,
historical_lowest=historical_lowest,
is_at_historical_lowest=price_is_at_lowest,
discount_percent=discount_percent,
discount_amount=discount_amount,
))
item_out.append(ComparisonTagItemOut(
product=ProductOut(
id=product.id,
name=product.name,
description=product.description,
image=product.image,
images=list(product.images or []),
category_id=product.category_id,
status=product.status,
submitted_by_id=product.submitted_by_id,
reject_reason=product.reject_reason,
reviewed_at=product.reviewed_at,
created_at=product.created_at,
updated_at=product.updated_at,
),
prices=price_outs,
lowest_price=current_lowest,
highest_price=current_highest,
platform_count=len(prices),
historical_lowest=historical_lowest,
historical_lowest_date=historical_lowest_date,
is_at_historical_lowest=is_at_historical_lowest,
discount_from_highest_percent=discount_from_highest_percent,
is_recommended=is_recommended,
recommendation_reason=recommendation_reason,
))
tag_out = ComparisonTagOut(
id=tag.id,
name=tag.name,
slug=tag.slug,
description=tag.description,
cover_image=tag.cover_image,
icon=tag.icon,
sort_order=tag.sort_order,
is_active=tag.is_active,
product_count=len(item_out),
created_at=tag.created_at,
updated_at=tag.updated_at,
)
return ComparisonTagDetailOut(tag=tag_out, items=item_out)
# ==================== Product Routes ====================
@router.post("/import/", response=ImportResultOut, auth=JWTAuth())
def import_products_csv(request, file: UploadedFile = File(...)):
"""Import products/websites/prices from CSV."""
content = file.read()
try:
text = content.decode("utf-8")
except UnicodeDecodeError:
text = content.decode("utf-8-sig", errors="ignore")
reader = csv.DictReader(io.StringIO(text))
result = ImportResultOut()
def parse_bool(value: Optional[str], default=False) -> bool:
if value is None:
return default
val = str(value).strip().lower()
if val in ("1", "true", "yes", "y", "on"):
return True
if val in ("0", "false", "no", "n", "off"):
return False
return default
def parse_decimal(value: Optional[str], field: str, line_no: int) -> Optional[Decimal]:
if value in (None, ""):
return None
try:
return Decimal(str(value).strip())
except (InvalidOperation, ValueError):
result.errors.append(f"{line_no}行字段{field}格式错误")
return None
for idx, row in enumerate(reader, start=2):
if not row:
continue
product_name = (row.get("product_name") or "").strip()
category_slug = (row.get("category_slug") or "").strip()
category_name = (row.get("category_name") or "").strip()
website_name = (row.get("website_name") or "").strip()
website_url = (row.get("website_url") or "").strip()
price_value = (row.get("price") or "").strip()
product_url = (row.get("url") or "").strip()
if not product_name or not (category_slug or category_name) or not website_name or not website_url or not price_value or not product_url:
result.errors.append(f"{idx}行缺少必填字段")
continue
try:
with transaction.atomic():
category = None
if category_slug:
category, created = Category.objects.get_or_create(
slug=category_slug,
defaults={
"name": category_name or category_slug,
"description": row.get("category_desc") or None,
},
)
if created:
result.created_categories += 1
if not category:
category, created = Category.objects.get_or_create(
name=category_name,
defaults={
"slug": category_slug or category_name,
"description": row.get("category_desc") or None,
},
)
if created:
result.created_categories += 1
website, created = Website.objects.get_or_create(
name=website_name,
defaults={
"url": website_url,
"logo": row.get("website_logo") or None,
"description": row.get("website_desc") or None,
"category": category,
"is_verified": parse_bool(row.get("is_verified"), False),
},
)
if created:
result.created_websites += 1
else:
if website.url != website_url:
website.url = website_url
if row.get("website_logo"):
website.logo = row.get("website_logo")
if row.get("website_desc"):
website.description = row.get("website_desc")
if website.category_id != category.id:
website.category = category
website.save()
product, created = Product.objects.get_or_create(
name=product_name,
category=category,
defaults={
"description": row.get("product_desc") or None,
"image": row.get("product_image") or None,
},
)
if created:
result.created_products += 1
else:
updated = False
if row.get("product_desc") and product.description != row.get("product_desc"):
product.description = row.get("product_desc")
updated = True
if row.get("product_image") and product.image != row.get("product_image"):
product.image = row.get("product_image")
updated = True
if updated:
product.save()
price = parse_decimal(row.get("price"), "price", idx)
if price is None:
continue
original_price = parse_decimal(row.get("original_price"), "original_price", idx)
currency = (row.get("currency") or "CNY").strip() or "CNY"
in_stock = parse_bool(row.get("in_stock"), True)
existing_price = ProductPrice.objects.filter(
product=product,
website=website,
).first()
price_record, created = ProductPrice.objects.update_or_create(
product=product,
website=website,
defaults={
"price": price,
"original_price": original_price,
"currency": currency,
"url": product_url,
"in_stock": in_stock,
},
)
if created:
result.created_prices += 1
else:
result.updated_prices += 1
if created or (existing_price and existing_price.price != price_record.price):
record_product_price_history(price_record)
except Exception:
result.errors.append(f"{idx}行处理失败")
continue
return result
@router.get("/recommendations/", response=List[ProductOut])
def recommend_products(request, limit: int = 12):
"""Get recommended products based on favorites or popularity."""
# 限制 limit 最大值
if limit < 1:
limit = 1
if limit > 100:
limit = 100
user = getattr(request, "auth", None)
# 只显示已审核通过的商品
base_queryset = Product.objects.select_related("category").filter(status='approved')
if user:
favorite_product_ids = list(
Favorite.objects.filter(user=user).values_list("product_id", flat=True)
)
category_ids = list(
Product.objects.filter(id__in=favorite_product_ids, status='approved')
.values_list("category_id", flat=True)
.distinct()
)
if category_ids:
base_queryset = base_queryset.filter(category_id__in=category_ids).exclude(
id__in=favorite_product_ids
)
queryset = (
base_queryset.annotate(favorites_count=Count("favorites", distinct=True))
.order_by("-favorites_count", "-created_at")[:limit]
)
return list(queryset)
@router.get("/", response=List[ProductOut])
@paginate(PageNumberPagination, page_size=20)
def list_products(request, filters: ProductFilter = Query(...)):
"""Get all approved products with optional filters."""
# 只显示已审核通过的商品
queryset = Product.objects.select_related("category").filter(status='approved')
if filters.category_id:
queryset = queryset.filter(category_id=filters.category_id)
if filters.search:
queryset = queryset.filter(
Q(name__icontains=filters.search) |
Q(description__icontains=filters.search)
)
needs_price_stats = (
filters.min_price is not None
or filters.max_price is not None
or (filters.sort_by or "").lower() in ("price_asc", "price_desc")
)
if needs_price_stats:
queryset = queryset.annotate(lowest_price=Min("prices__price"))
if filters.min_price is not None:
queryset = queryset.filter(lowest_price__gte=filters.min_price)
if filters.max_price is not None:
queryset = queryset.filter(lowest_price__lte=filters.max_price)
sort_by = (filters.sort_by or "newest").lower()
if sort_by == "oldest":
queryset = queryset.order_by("created_at")
elif sort_by == "price_asc":
queryset = queryset.order_by(F("lowest_price").asc(nulls_last=True), "-created_at")
elif sort_by == "price_desc":
queryset = queryset.order_by(F("lowest_price").desc(nulls_last=True), "-created_at")
else:
queryset = queryset.order_by("-created_at")
return queryset
@router.get("/{product_id}", response=ProductOut)
def get_product(request, product_id: int):
"""Get product by ID."""
# 只返回已审核通过的商品
return get_object_or_404(Product, id=product_id, status='approved')
@router.get("/{product_id}/with-prices", response=ProductWithPricesOut)
def get_product_with_prices(request, product_id: int):
"""Get product with all prices from different websites."""
try:
product = get_object_or_404(Product, id=product_id)
prices = ProductPrice.objects.filter(product=product).select_related('website')
price_list = []
for pp in prices:
website_name = None
website_logo = None
try:
if pp.website_id:
website_name = pp.website.name
website_logo = pp.website.logo
except ObjectDoesNotExist:
# 数据不一致时避免直接报错
website_name = None
website_logo = None
price_list.append(ProductPriceOut(
id=pp.id,
product_id=pp.product_id,
website_id=pp.website_id,
website_name=website_name,
website_logo=website_logo,
price=pp.price,
original_price=pp.original_price,
currency=pp.currency,
url=pp.url,
in_stock=pp.in_stock,
last_checked=pp.last_checked,
))
# Calculate price range
price_stats = prices.aggregate(
lowest=Min('price'),
highest=Max('price')
)
return ProductWithPricesOut(
id=product.id,
name=product.name,
description=product.description,
image=product.image,
images=list(product.images or []),
category_id=product.category_id,
created_at=product.created_at,
updated_at=product.updated_at,
prices=price_list,
lowest_price=price_stats['lowest'],
highest_price=price_stats['highest'],
)
except Exception as exc:
# 方便定位500具体原因开发环境
raise HttpError(500, f"with-prices failed: {exc}")
@router.get("/search/", response=List[ProductWithPricesOut])
@paginate(PageNumberPagination, page_size=20)
def search_products(request, q: str, filters: ProductSearchFilter = Query(...)):
"""Search approved products by name, description, or by user_id (submitter)."""
from apps.users.models import User
prices_prefetch = Prefetch(
"prices",
queryset=ProductPrice.objects.select_related("website"),
)
# 只搜索已审核通过的商品
products = (
Product.objects.select_related("category")
.filter(Q(name__icontains=q) | Q(description__icontains=q), status='approved')
)
if filters.category_id:
products = products.filter(category_id=filters.category_id)
if filters.user_id:
try:
submitter = User.objects.get(user_id=filters.user_id)
products = products.filter(submitted_by=submitter)
except User.DoesNotExist:
products = products.none()
needs_price_stats = (
filters.min_price is not None
or filters.max_price is not None
or (filters.sort_by or "").lower() in ("price_asc", "price_desc")
)
if needs_price_stats:
products = products.annotate(lowest_price=Min("prices__price"))
if filters.min_price is not None:
products = products.filter(lowest_price__gte=filters.min_price)
if filters.max_price is not None:
products = products.filter(lowest_price__lte=filters.max_price)
sort_by = (filters.sort_by or "newest").lower()
if sort_by == "oldest":
products = products.order_by("created_at")
elif sort_by == "price_asc":
products = products.order_by(F("lowest_price").asc(nulls_last=True), "-created_at")
elif sort_by == "price_desc":
products = products.order_by(F("lowest_price").desc(nulls_last=True), "-created_at")
else:
products = products.order_by("-created_at")
products = products.prefetch_related(prices_prefetch)
result = []
for product in products:
prices = list(product.prices.all())
price_list = []
for pp in prices:
website_name = None
website_logo = None
try:
if pp.website_id:
website_name = pp.website.name
website_logo = pp.website.logo
except ObjectDoesNotExist:
website_name = None
website_logo = None
price_list.append(ProductPriceOut(
id=pp.id,
product_id=pp.product_id,
website_id=pp.website_id,
website_name=website_name,
website_logo=website_logo,
price=pp.price,
original_price=pp.original_price,
currency=pp.currency,
url=pp.url,
in_stock=pp.in_stock,
last_checked=pp.last_checked,
))
lowest_price = min((pp.price for pp in prices), default=None)
highest_price = max((pp.price for pp in prices), default=None)
result.append(ProductWithPricesOut(
id=product.id,
name=product.name,
description=product.description,
image=product.image,
images=list(product.images or []),
category_id=product.category_id,
created_at=product.created_at,
updated_at=product.updated_at,
prices=price_list,
lowest_price=lowest_price,
highest_price=highest_price,
))
return result
@router.post("/", response=ProductOut, auth=JWTAuth())
def create_product(request, data: ProductIn):
"""Create a new product. Admin creates approved, others create pending."""
user = request.auth
is_admin = user and user.role == 'admin' and user.is_active
images = [url.strip() for url in (data.images or []) if url and url.strip()]
image = (data.image or "").strip() or (images[0] if images else None)
if image and image not in images:
images.insert(0, image)
if len(images) > MAX_PRODUCT_IMAGES:
from ninja.errors import HttpError
raise HttpError(400, f"最多上传{MAX_PRODUCT_IMAGES}张图片")
product = Product.objects.create(
name=data.name,
description=data.description,
image=image or None,
images=images,
category_id=data.category_id,
status='approved' if is_admin else 'pending',
submitted_by=user,
)
return product
@router.post("/upload-image/", response=UploadImageOut, auth=JWTAuth())
def upload_product_image(request, file: UploadedFile = File(...)):
"""Upload product image and return URL."""
if not file:
raise HttpError(400, "请上传图片文件")
allowed_types = {"image/jpeg", "image/png", "image/webp"}
if (file.content_type or "").lower() not in allowed_types:
raise HttpError(400, "仅支持 JPG/PNG/WEBP 图片")
max_size = 5 * 1024 * 1024
if file.size and file.size > max_size:
raise HttpError(400, "图片大小不能超过5MB")
ext = os.path.splitext(file.name or "")[1].lower()
if not ext:
ext = mimetypes.guess_extension(file.content_type or "") or ".jpg"
filename = f"products/{uuid.uuid4().hex}{ext}"
saved_path = default_storage.save(filename, file)
url = f"{settings.MEDIA_URL}{saved_path}".replace("\\", "/")
return UploadImageOut(url=url)
@router.get("/my/", response=List[MyProductOut], auth=JWTAuth())
@paginate(PageNumberPagination, page_size=20)
def my_products(request, status: Optional[str] = None):
"""Get current user's submitted products."""
user = request.auth
queryset = Product.objects.filter(submitted_by=user).order_by('-created_at')
if status:
queryset = queryset.filter(status=status)
return queryset
@router.post("/prices/", response=ProductPriceOut, auth=JWTAuth())
def add_product_price(request, data: ProductPriceIn):
"""Add a price for a product. Admin or product owner can add."""
user = request.auth
is_admin = user and user.role == 'admin' and user.is_active
# 检查商品是否存在并验证权限
product = get_object_or_404(Product, id=data.product_id)
if not is_admin and product.submitted_by_id != user.id:
from ninja.errors import HttpError
raise HttpError(403, "只能为自己提交的商品添加价格")
price = ProductPrice.objects.create(**data.dict())
record_product_price_history(price)
website = price.website
return ProductPriceOut(
id=price.id,
product_id=price.product_id,
website_id=price.website_id,
website_name=website.name,
website_logo=website.logo,
price=price.price,
original_price=price.original_price,
currency=price.currency,
url=price.url,
in_stock=price.in_stock,
last_checked=price.last_checked,
)
@router.get("/{product_id}/price-history/", response=List[ProductPriceHistoryOut])
def get_product_price_history(request, product_id: int, website_id: Optional[int] = None, limit: int = 60, days: Optional[int] = None):
"""Get product price history."""
from django.utils import timezone
from datetime import timedelta
queryset = ProductPriceHistory.objects.filter(product_id=product_id)
if website_id:
queryset = queryset.filter(website_id=website_id)
if days:
cutoff = timezone.now() - timedelta(days=days)
queryset = queryset.filter(recorded_at__gte=cutoff)
limit = max(1, min(limit, 500))
histories = list(queryset.order_by("-recorded_at")[:limit])
histories.reverse()
return [
ProductPriceHistoryOut(
id=h.id,
product_id=h.product_id,
website_id=h.website_id,
price=h.price,
recorded_at=h.recorded_at,
)
for h in histories
]
@router.get("/{product_id}/price-stats/", response=PriceHistoryStatsOut)
def get_product_price_stats(request, product_id: int, days: Optional[int] = None):
"""Get product price statistics including historical lowest/highest."""
from django.utils import timezone
from django.db.models import Avg
from datetime import timedelta
from decimal import Decimal
queryset = ProductPriceHistory.objects.filter(product_id=product_id)
if days:
cutoff = timezone.now() - timedelta(days=days)
queryset = queryset.filter(recorded_at__gte=cutoff)
# 历史最低
lowest_record = queryset.order_by("price").first()
historical_lowest = lowest_record.price if lowest_record else None
historical_lowest_date = lowest_record.recorded_at if lowest_record else None
# 历史最高
highest_record = queryset.order_by("-price").first()
historical_highest = highest_record.price if highest_record else None
historical_highest_date = highest_record.recorded_at if highest_record else None
# 平均价
avg_result = queryset.aggregate(avg_price=Avg("price"))
average_price = avg_result["avg_price"]
if average_price:
average_price = round(Decimal(str(average_price)), 2)
# 当前最低价
current_prices = ProductPrice.objects.filter(product_id=product_id)
current_lowest_record = current_prices.order_by("price").first()
current_lowest = current_lowest_record.price if current_lowest_record else None
# 是否处于历史最低
is_historical_lowest = False
if current_lowest and historical_lowest:
is_historical_lowest = current_lowest <= historical_lowest
# 距最高价降幅
discount_from_highest = None
if current_lowest and historical_highest and historical_highest > 0:
discount_from_highest = round((1 - current_lowest / historical_highest) * 100, 1)
# 距历史最低差额
distance_to_lowest = None
if current_lowest and historical_lowest:
distance_to_lowest = current_lowest - historical_lowest
# 价格趋势最近7条记录
recent = list(queryset.order_by("-recorded_at")[:7])
price_trend = "stable"
if len(recent) >= 2:
first_price = recent[-1].price
last_price = recent[0].price
if last_price < first_price * Decimal("0.95"):
price_trend = "down"
elif last_price > first_price * Decimal("1.05"):
price_trend = "up"
# 历史数据最近60条
histories = list(queryset.order_by("-recorded_at")[:60])
histories.reverse()
return PriceHistoryStatsOut(
product_id=product_id,
historical_lowest=historical_lowest,
historical_lowest_date=historical_lowest_date,
historical_highest=historical_highest,
historical_highest_date=historical_highest_date,
average_price=average_price,
current_lowest=current_lowest,
is_historical_lowest=is_historical_lowest,
discount_from_highest=discount_from_highest,
distance_to_lowest=distance_to_lowest,
price_trend=price_trend,
history=[
ProductPriceHistoryOut(
id=h.id,
product_id=h.product_id,
website_id=h.website_id,
price=h.price,
recorded_at=h.recorded_at,
)
for h in histories
],
)