Files
vps_web/app.py
ddrwode 036a19f28c 哈哈
2026-02-10 13:48:58 +08:00

3448 lines
120 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""云服务器价格对比 - Flask 应用"""
import io
from time import monotonic
from datetime import datetime, timezone
from urllib.parse import urlencode
from flask import Flask, render_template, jsonify, request, redirect, url_for, session, send_file
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import text, func, or_
from sqlalchemy.orm import joinedload
from markupsafe import Markup, escape
try:
import markdown as py_markdown
except Exception:
py_markdown = None
try:
import bleach
except Exception:
bleach = None
from config import Config
from extensions import db
from openpyxl import Workbook
from openpyxl import load_workbook
app = Flask(__name__)
app.config.from_object(Config)
# 部署在 Nginx 等反向代理后时,信任 X-Forwarded-* 头HTTPS、真实 IP
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
db.init_app(app)
from models import (
VPSPlan,
Provider,
PriceHistory,
User,
ForumPost,
ForumComment,
ForumCategory,
ForumReport,
ForumNotification,
ForumPostLike,
ForumPostBookmark,
) # noqa: E402
def _ensure_mysql_columns():
"""为已有 MySQL 表添加缺失列,避免 1054 Unknown column。"""
try:
engine = db.engine
if engine.dialect.name != "mysql":
return
with engine.connect() as conn:
for col, spec in [
("traffic", "VARCHAR(64) NULL"),
("countries", "VARCHAR(255) NULL"),
("provider_id", "INT NULL"),
]:
try:
conn.execute(text("ALTER TABLE vps_plans ADD COLUMN {} {}".format(col, spec)))
conn.commit()
except Exception:
conn.rollback()
for col, spec in [
("name", "VARCHAR(128) NULL"),
("region", "VARCHAR(128) NULL"),
("price_cny", "DOUBLE NULL"),
("price_usd", "DOUBLE NULL"),
]:
try:
conn.execute(text("ALTER TABLE vps_plans MODIFY COLUMN {} {}".format(col, spec)))
conn.commit()
except Exception:
conn.rollback()
except Exception:
pass # 表不存在或非 MySQL 时忽略
def _ensure_forum_columns():
"""为已有论坛表补齐后续新增字段。"""
try:
engine = db.engine
dialect = engine.dialect.name
with engine.connect() as conn:
if dialect == "mysql":
alters = [
"ALTER TABLE forum_posts ADD COLUMN category VARCHAR(32) NOT NULL DEFAULT '综合讨论'",
"ALTER TABLE forum_posts ADD COLUMN view_count INT NOT NULL DEFAULT 0",
]
else:
alters = [
"ALTER TABLE forum_posts ADD COLUMN category TEXT DEFAULT '综合讨论'",
"ALTER TABLE forum_posts ADD COLUMN view_count INTEGER DEFAULT 0",
]
for sql in alters:
try:
conn.execute(text(sql))
conn.commit()
except Exception:
conn.rollback()
except Exception:
pass
def _ensure_forum_manage_columns():
"""为用户与论坛帖子补齐管理字段(封禁/置顶/精华/锁帖)。"""
try:
engine = db.engine
dialect = engine.dialect.name
with engine.connect() as conn:
if dialect == "mysql":
alters = [
"ALTER TABLE users ADD COLUMN is_banned TINYINT(1) NOT NULL DEFAULT 0",
"ALTER TABLE users ADD COLUMN banned_at DATETIME NULL",
"ALTER TABLE users ADD COLUMN banned_reason VARCHAR(255) NULL",
"ALTER TABLE forum_posts ADD COLUMN is_pinned TINYINT(1) NOT NULL DEFAULT 0",
"ALTER TABLE forum_posts ADD COLUMN is_featured TINYINT(1) NOT NULL DEFAULT 0",
"ALTER TABLE forum_posts ADD COLUMN is_locked TINYINT(1) NOT NULL DEFAULT 0",
]
else:
alters = [
"ALTER TABLE users ADD COLUMN is_banned INTEGER DEFAULT 0",
"ALTER TABLE users ADD COLUMN banned_at DATETIME",
"ALTER TABLE users ADD COLUMN banned_reason TEXT",
"ALTER TABLE forum_posts ADD COLUMN is_pinned INTEGER DEFAULT 0",
"ALTER TABLE forum_posts ADD COLUMN is_featured INTEGER DEFAULT 0",
"ALTER TABLE forum_posts ADD COLUMN is_locked INTEGER DEFAULT 0",
]
for sql in alters:
try:
conn.execute(text(sql))
conn.commit()
except Exception:
conn.rollback()
except Exception:
pass
DEFAULT_FORUM_CATEGORIES = [
"综合讨论",
"VPS 评测",
"优惠活动",
"运维经验",
"新手提问",
]
def _ensure_forum_categories_seed():
"""初始化论坛默认分类。"""
try:
if ForumCategory.query.count() > 0:
return
for idx, name in enumerate(DEFAULT_FORUM_CATEGORIES, start=1):
db.session.add(ForumCategory(
name=name,
sort_order=idx * 10,
is_active=True,
))
db.session.commit()
except Exception:
db.session.rollback()
def _ensure_price_history_baseline():
"""为历史数据补首条价格快照,便于后续计算涨跌。"""
try:
missing = (
db.session.query(VPSPlan)
.outerjoin(PriceHistory, PriceHistory.plan_id == VPSPlan.id)
.filter(PriceHistory.id.is_(None))
.all()
)
if not missing:
return
for p in missing:
if p.price_cny is None and p.price_usd is None:
continue
db.session.add(PriceHistory(
plan_id=p.id,
price_cny=p.price_cny,
price_usd=p.price_usd,
currency=(p.currency or "CNY"),
source="bootstrap",
))
db.session.commit()
except Exception:
db.session.rollback()
# 启动时自动创建表(若不存在),并为已有表补列
with app.app_context():
db.create_all()
_ensure_mysql_columns()
_ensure_forum_columns()
_ensure_forum_manage_columns()
_ensure_forum_categories_seed()
_ensure_price_history_baseline()
ADMIN_PASSWORD = app.config["ADMIN_PASSWORD"]
SITE_URL = app.config["SITE_URL"]
SITE_NAME = app.config["SITE_NAME"]
# 国家/区域标签,供后台表单选择
COUNTRY_TAGS = [
"中国大陆", "中国香港", "中国台湾", "美国", "日本", "新加坡", "韩国",
"德国", "英国", "法国", "荷兰", "加拿大", "澳大利亚", "印度", "其他",
]
PRICE_SOURCE_LABELS = {
"manual": "手工编辑",
"import": "Excel 导入",
"bootstrap": "基线",
}
FORUM_REPORT_REASONS = [
"垃圾广告",
"辱骂攻击",
"违法违规",
"虚假信息",
"其他",
]
FORUM_REPORT_STATUS_LABELS = {
"pending": "待处理",
"processed": "已处理",
"rejected": "已驳回",
}
FORUM_NOTIFICATION_TYPE_LABELS = {
"post_commented": "帖子新评论",
"thread_replied": "主题新回复",
"report_processed": "举报处理结果",
"content_removed": "内容处理通知",
}
# 论坛高频数据短时缓存(进程内)
_FORUM_CACHE_TTL_CATEGORIES = 20.0
_FORUM_CACHE_TTL_SIDEBAR = 15.0
_FORUM_CATEGORY_CACHE = {}
_FORUM_SIDEBAR_CACHE = {"expires_at": 0.0, "data": None}
_MARKDOWN_ALLOWED_TAGS = [
"p", "br", "hr",
"h1", "h2", "h3", "h4",
"strong", "em", "del",
"ul", "ol", "li",
"blockquote",
"pre", "code",
"a",
"table", "thead", "tbody", "tr", "th", "td",
]
_MARKDOWN_ALLOWED_ATTRS = {
"a": ["href", "title", "target", "rel"],
"code": ["class"],
"pre": ["class"],
}
_MARKDOWN_EXTENSIONS = [
"fenced_code",
"tables",
"sane_lists",
"nl2br",
]
FORUM_NOTIFICATION_TYPE_LABELS_EN = {
"post_commented": "New comment",
"thread_replied": "New reply",
"report_processed": "Report update",
"content_removed": "Content moderation",
}
def _get_lang():
lang = (
request.args.get("lang")
or request.form.get("lang")
or session.get("lang")
or "zh"
)
lang = (lang or "zh").strip().lower()
if lang not in ("zh", "en"):
lang = "zh"
session["lang"] = lang
return lang
def _pick_lang(zh_text, en_text, lang=None):
active_lang = lang or _get_lang()
return en_text if active_lang == "en" else zh_text
def _lang_url(lang_code):
target_lang = (lang_code or "").strip().lower()
if target_lang not in ("zh", "en"):
target_lang = "zh"
params = {}
if request.view_args:
params.update(request.view_args)
params.update(request.args.to_dict(flat=True))
params["lang"] = target_lang
try:
if request.endpoint:
return url_for(request.endpoint, **params)
except Exception:
pass
return "{}?{}".format(request.path, urlencode(params))
def _notification_type_label(notif_type, lang=None):
active_lang = lang or _get_lang()
if active_lang == "en":
return FORUM_NOTIFICATION_TYPE_LABELS_EN.get(notif_type, notif_type or "Notification")
return FORUM_NOTIFICATION_TYPE_LABELS.get(notif_type, notif_type or "通知")
@app.template_global("l")
def _template_pick_lang(zh_text, en_text):
active_lang = session.get("lang", "zh")
if active_lang not in ("zh", "en"):
active_lang = "zh"
return en_text if active_lang == "en" else zh_text
@app.template_global("lang_url")
def _template_lang_url(lang_code):
return _lang_url(lang_code)
def _render_markdown_html(text):
raw = (text or "").strip()
if not raw:
return Markup("")
if py_markdown is None or bleach is None:
# 依赖缺失时回退为安全纯文本显示,避免服务启动失败。
return Markup("<p>{}</p>".format(str(escape(raw)).replace("\n", "<br>")))
html = py_markdown.markdown(raw, extensions=_MARKDOWN_EXTENSIONS)
clean_html = bleach.clean(
html,
tags=_MARKDOWN_ALLOWED_TAGS,
attributes=_MARKDOWN_ALLOWED_ATTRS,
protocols=["http", "https", "mailto"],
strip=True,
)
return Markup(clean_html)
@app.template_filter("markdown_html")
def markdown_html_filter(text):
return _render_markdown_html(text)
def _get_current_user():
user_id = session.get("user_id")
if not user_id:
return None
user = db.session.get(User, user_id)
if not user:
session.pop("user_id", None)
return user
def _is_banned_user(user):
return bool(user and bool(user.is_banned))
def _user_ban_message(user):
if not user:
return "账号状态异常"
reason = (user.banned_reason or "").strip()
if reason:
return "账号已被封禁:{}".format(reason)
return "账号已被封禁"
def _is_valid_username(username):
if not username:
return False
if len(username) < 3 or len(username) > 20:
return False
return all(ch.isalnum() or ch == "_" for ch in username)
def _safe_next_url(default_endpoint):
nxt = (request.values.get("next") or "").strip()
if nxt.startswith("/") and not nxt.startswith("//"):
return nxt
return url_for(default_endpoint)
def _safe_form_next_url(default_url):
nxt = (request.form.get("next") or request.args.get("next") or "").strip()
if nxt.startswith("/") and not nxt.startswith("//"):
return nxt
return default_url
def _create_notification(
user_id,
notif_type,
message,
actor_id=None,
post_id=None,
comment_id=None,
report_id=None,
):
"""创建站内通知(由调用方控制事务提交)。"""
if not user_id or not message:
return
db.session.add(ForumNotification(
user_id=user_id,
actor_id=actor_id,
notif_type=notif_type,
post_id=post_id,
comment_id=comment_id,
report_id=report_id,
message=message[:255],
is_read=False,
))
def _notification_target_url(notification):
# 避免通知列表页按条检查帖子存在性导致 N+1 查询。
if notification.post_id:
return url_for("forum_post_detail", post_id=notification.post_id)
return url_for("user_notifications")
def _load_forum_categories(active_only=True):
"""读取论坛分类(默认只读启用项)。"""
try:
q = ForumCategory.query
if active_only:
q = q.filter_by(is_active=True)
return q.order_by(ForumCategory.sort_order.asc(), ForumCategory.id.asc()).all()
except Exception:
return []
def _get_forum_category_names(active_only=True):
cache_key = "active" if active_only else "all"
now_ts = monotonic()
cached = _FORUM_CATEGORY_CACHE.get(cache_key)
if cached and cached[0] > now_ts:
return list(cached[1])
rows = _load_forum_categories(active_only=active_only)
names = [x.name for x in rows if x.name]
if names:
_FORUM_CATEGORY_CACHE[cache_key] = (now_ts + _FORUM_CACHE_TTL_CATEGORIES, tuple(names))
return names
# 若全部被停用,前台仍回退到已存在分类,避免下拉为空。
if active_only:
rows = _load_forum_categories(active_only=False)
names = [x.name for x in rows if x.name]
if names:
_FORUM_CATEGORY_CACHE[cache_key] = (now_ts + _FORUM_CACHE_TTL_CATEGORIES, tuple(names))
return names
fallback = list(DEFAULT_FORUM_CATEGORIES)
_FORUM_CATEGORY_CACHE[cache_key] = (now_ts + _FORUM_CACHE_TTL_CATEGORIES, tuple(fallback))
return fallback
@app.context_processor
def inject_global_user():
lang = _get_lang()
current_user = _get_current_user()
notifications_unread_count = 0
if current_user:
notifications_unread_count = ForumNotification.query.filter_by(
user_id=current_user.id,
is_read=False,
).count()
return {
"current_user": current_user,
"admin_logged_in": bool(session.get("admin_logged_in")),
"forum_categories": _get_forum_category_names(active_only=True),
"forum_report_reasons": FORUM_REPORT_REASONS,
"notifications_unread_count": notifications_unread_count,
"lang": lang,
}
def _humanize_time(dt, lang=None):
if not dt:
return ""
active_lang = lang or session.get("lang", "zh")
if dt.tzinfo is None:
now = datetime.utcnow()
else:
now = datetime.now(dt.tzinfo)
delta = now - dt
seconds = int(delta.total_seconds())
if seconds < 0:
return dt.strftime("%Y-%m-%d")
if seconds < 60:
return "just now" if active_lang == "en" else "刚刚"
if seconds < 3600:
mins = seconds // 60
return "{}m ago".format(mins) if active_lang == "en" else "{} 分钟前".format(mins)
if seconds < 86400:
hours = seconds // 3600
return "{}h ago".format(hours) if active_lang == "en" else "{} 小时前".format(hours)
if seconds < 86400 * 14:
days = seconds // 86400
return "{}d ago".format(days) if active_lang == "en" else "{} 天前".format(days)
return dt.strftime("%Y-%m-%d")
def _build_forum_post_cards(rows, lang=None):
"""将论坛查询结果行转换为列表卡片数据。"""
active_lang = lang or _get_lang()
cards = []
for post, reply_count, latest_activity, author_name, like_count, bookmark_count in rows:
latest_activity = latest_activity or post.created_at
username = author_name or _pick_lang("用户", "User", active_lang)
cards.append({
"post": post,
"reply_count": int(reply_count or 0),
"view_count": int(post.view_count or 0),
"like_count": int(like_count or 0),
"bookmark_count": int(bookmark_count or 0),
"latest_activity": latest_activity,
"latest_activity_text": _humanize_time(latest_activity, lang=active_lang),
"author_name": username,
"author_initial": (username[0] if username else "?").upper(),
})
return cards
def _build_forum_url(tab="latest", category=None, q=None, page=1, per_page=20):
"""构建论坛列表页链接,并尽量保持 URL 简洁。"""
params = {}
if (tab or "latest") != "latest":
params["tab"] = tab
if category:
params["category"] = category
if q:
params["q"] = q
if page and int(page) > 1:
params["page"] = int(page)
if per_page:
size = int(per_page)
if size != 20:
params["per_page"] = size
return url_for("forum_index", **params)
def _query_forum_post_rows(active_tab="latest", selected_category=None, search_query=None, author_id=None):
"""论坛列表查询:支持最新/新帖/热门 + 分类过滤 + 关键词搜索。"""
comment_stats_subq = (
db.session.query(
ForumComment.post_id.label("post_id"),
func.count(ForumComment.id).label("comment_count"),
func.max(ForumComment.created_at).label("latest_comment_at"),
)
.group_by(ForumComment.post_id)
.subquery()
)
comment_count_expr = func.coalesce(comment_stats_subq.c.comment_count, 0)
latest_activity_expr = func.coalesce(comment_stats_subq.c.latest_comment_at, ForumPost.created_at)
like_stats_subq = (
db.session.query(
ForumPostLike.post_id.label("post_id"),
func.count(ForumPostLike.id).label("like_count"),
)
.group_by(ForumPostLike.post_id)
.subquery()
)
bookmark_stats_subq = (
db.session.query(
ForumPostBookmark.post_id.label("post_id"),
func.count(ForumPostBookmark.id).label("bookmark_count"),
)
.group_by(ForumPostBookmark.post_id)
.subquery()
)
like_count_expr = func.coalesce(like_stats_subq.c.like_count, 0)
bookmark_count_expr = func.coalesce(bookmark_stats_subq.c.bookmark_count, 0)
q = (
db.session.query(
ForumPost,
comment_count_expr.label("comment_count"),
latest_activity_expr.label("latest_activity"),
User.username.label("author_name"),
like_count_expr.label("like_count"),
bookmark_count_expr.label("bookmark_count"),
)
.outerjoin(comment_stats_subq, comment_stats_subq.c.post_id == ForumPost.id)
.outerjoin(like_stats_subq, like_stats_subq.c.post_id == ForumPost.id)
.outerjoin(bookmark_stats_subq, bookmark_stats_subq.c.post_id == ForumPost.id)
.outerjoin(User, User.id == ForumPost.user_id)
)
if selected_category:
q = q.filter(ForumPost.category == selected_category)
if author_id is not None:
q = q.filter(ForumPost.user_id == author_id)
if search_query:
pattern = "%{}%".format(search_query)
q = q.filter(
or_(
ForumPost.title.ilike(pattern),
ForumPost.content.ilike(pattern),
User.username.ilike(pattern),
)
)
if active_tab == "hot":
q = q.order_by(
ForumPost.is_pinned.desc(),
comment_count_expr.desc(),
like_count_expr.desc(),
ForumPost.view_count.desc(),
latest_activity_expr.desc(),
ForumPost.id.desc(),
)
elif active_tab == "new":
q = q.order_by(ForumPost.is_pinned.desc(), ForumPost.created_at.desc(), ForumPost.id.desc())
else:
q = q.order_by(ForumPost.is_pinned.desc(), latest_activity_expr.desc(), ForumPost.id.desc())
return q
def _forum_sidebar_data():
now_ts = monotonic()
cached = _FORUM_SIDEBAR_CACHE.get("data")
if cached is not None and _FORUM_SIDEBAR_CACHE.get("expires_at", 0.0) > now_ts:
return dict(cached)
category_counts = (
db.session.query(ForumPost.category, func.count(ForumPost.id))
.group_by(ForumPost.category)
.order_by(func.count(ForumPost.id).desc())
.all()
)
active_users = (
db.session.query(User.username, func.count(ForumPost.id).label("post_count"))
.outerjoin(ForumPost, ForumPost.user_id == User.id)
.group_by(User.id)
.order_by(func.count(ForumPost.id).desc(), User.created_at.asc())
.limit(6)
.all()
)
data = {
"total_users": User.query.count(),
"total_posts": ForumPost.query.count(),
"total_comments": ForumComment.query.count(),
"category_counts": list(category_counts),
"active_users": list(active_users),
}
_FORUM_SIDEBAR_CACHE["data"] = data
_FORUM_SIDEBAR_CACHE["expires_at"] = now_ts + _FORUM_CACHE_TTL_SIDEBAR
return dict(data)
def _count_forum_posts(selected_category=None, search_query=None, author_id=None):
"""论坛列表总数查询:避免对重查询语句直接 count 导致慢查询。"""
q = (
db.session.query(func.count(ForumPost.id))
.select_from(ForumPost)
.outerjoin(User, User.id == ForumPost.user_id)
)
if selected_category:
q = q.filter(ForumPost.category == selected_category)
if author_id is not None:
q = q.filter(ForumPost.user_id == author_id)
if search_query:
pattern = "%{}%".format(search_query)
q = q.filter(
or_(
ForumPost.title.ilike(pattern),
ForumPost.content.ilike(pattern),
User.username.ilike(pattern),
)
)
return int(q.scalar() or 0)
def _currency_symbol(currency):
return "¥" if (currency or "CNY").upper() == "CNY" else "$"
def _format_money(currency, value):
return "{}{:.2f}".format(_currency_symbol(currency), float(value))
def _format_history_time(dt):
return dt.strftime("%Y-%m-%d %H:%M") if dt else ""
def _pick_price_pair(latest, previous=None):
if previous is None:
if latest.price_cny is not None:
return "CNY", float(latest.price_cny), None
if latest.price_usd is not None:
return "USD", float(latest.price_usd), None
return None, None, None
if latest.price_cny is not None and previous.price_cny is not None:
return "CNY", float(latest.price_cny), float(previous.price_cny)
if latest.price_usd is not None and previous.price_usd is not None:
return "USD", float(latest.price_usd), float(previous.price_usd)
return None, None, None
def _build_price_trend(latest, previous=None):
currency, current_value, previous_value = _pick_price_pair(latest, previous)
if currency is None or current_value is None:
return None
source = PRICE_SOURCE_LABELS.get(latest.source or "", latest.source or "未知来源")
meta = "当前 {} · {} · {}".format(
_format_money(currency, current_value),
_format_history_time(latest.captured_at),
source,
)
if previous_value is None:
return {
"direction": "new",
"delta_text": "首次记录",
"meta_text": meta,
}
diff = current_value - previous_value
if abs(diff) < 1e-9:
return {
"direction": "flat",
"delta_text": "→ 持平",
"meta_text": meta,
}
direction = "up" if diff > 0 else "down"
arrow = "" if diff > 0 else ""
sign = "+" if diff > 0 else "-"
delta_text = "{} {}{}{:,.2f}".format(arrow, sign, _currency_symbol(currency), abs(diff))
if abs(previous_value) > 1e-9:
pct = diff / previous_value * 100
delta_text += " ({:+.2f}%)".format(pct)
return {
"direction": direction,
"delta_text": delta_text,
"meta_text": meta,
}
def _build_plan_trend_map(plans):
plan_ids = [p.id for p in plans if p.id is not None]
if not plan_ids:
return {}
rows = (
PriceHistory.query
.filter(PriceHistory.plan_id.in_(plan_ids))
.order_by(PriceHistory.plan_id.asc(), PriceHistory.captured_at.desc(), PriceHistory.id.desc())
.all()
)
grouped = {}
for row in rows:
bucket = grouped.setdefault(row.plan_id, [])
if len(bucket) < 2:
bucket.append(row)
result = {}
for plan_id, bucket in grouped.items():
latest = bucket[0] if bucket else None
previous = bucket[1] if len(bucket) > 1 else None
trend = _build_price_trend(latest, previous) if latest else None
if trend:
result[plan_id] = trend
return result
def admin_required(f):
from functools import wraps
@wraps(f)
def wrapped(*args, **kwargs):
if not session.get("admin_logged_in"):
return redirect(url_for("admin_login"))
return f(*args, **kwargs)
return wrapped
def user_login_required(f):
from functools import wraps
@wraps(f)
def wrapped(*args, **kwargs):
user = _get_current_user()
if not user:
return redirect(url_for("user_login", next=request.path))
if _is_banned_user(user):
session.pop("user_id", None)
return redirect(url_for("user_login", next=request.path, error=_user_ban_message(user)))
return f(*args, **kwargs)
return wrapped
def _ensure_forum_interaction_user(user, post_id=None):
"""校验当前登录用户是否可进行论坛互动动作。"""
if not _is_banned_user(user):
return None
text = _user_ban_message(user)
if post_id:
return _forum_redirect_with_error(post_id, text)
return redirect(url_for("forum_index", error=text))
def _can_edit_post(user, post):
if not user or not post:
return False
return post.user_id == user.id
def _can_edit_comment(user, comment):
if not user or not comment:
return False
return comment.user_id == user.id
def _forum_redirect_with_error(post_id, text_msg):
return redirect(url_for("forum_post_detail", post_id=post_id, error=text_msg))
def _forum_redirect_with_msg(post_id, text_msg):
return redirect(url_for("forum_post_detail", post_id=post_id, msg=text_msg))
# 首页多语言文案(中文 / English
I18N = {
"zh": {
"tagline": "云服务器价格一目了然",
"filter_provider": "厂商",
"filter_region": "区域",
"filter_memory": "内存 ≥",
"filter_price": "价格区间",
"filter_currency": "货币",
"search_placeholder": "搜索厂商、配置...",
"all": "全部",
"unlimited": "不限",
"btn_reset": "重置筛选",
"th_provider": "厂商",
"th_country": "国家",
"th_config": "配置",
"th_vcpu": "vCPU",
"th_memory": "内存",
"th_storage": "存储",
"th_bandwidth": "带宽",
"th_traffic": "流量",
"th_price": "月付价格",
"th_action": "操作",
"disclaimer": "* 价格仅供参考,以各厂商官网为准。部分为按量/包年折算月价。",
"footer_note": "数据仅供参考 · 请以云厂商官网实时报价为准",
"contact_label": "联系我们",
"empty_state": "未找到匹配的方案",
"load_error": "数据加载失败,请刷新页面重试",
"search_label": "搜索",
"price_under50": "< ¥50",
"price_50_100": "¥50-100",
"price_100_300": "¥100-300",
"price_300_500": "¥300-500",
"price_over500": "> ¥500",
"cny": "人民币 (¥)",
"usd": "美元 ($)",
},
"en": {
"tagline": "VPS & cloud server prices at a glance",
"filter_provider": "Provider",
"filter_region": "Region",
"filter_memory": "Memory ≥",
"filter_price": "Price range",
"filter_currency": "Currency",
"search_placeholder": "Search provider, config...",
"all": "All",
"unlimited": "Any",
"btn_reset": "Reset",
"th_provider": "Provider",
"th_country": "Country",
"th_config": "Config",
"th_vcpu": "vCPU",
"th_memory": "Memory",
"th_storage": "Storage",
"th_bandwidth": "Bandwidth",
"th_traffic": "Traffic",
"th_price": "Monthly",
"th_action": "Action",
"disclaimer": "* Prices are indicative. See provider sites for current rates.",
"footer_note": "Data for reference only. Check provider sites for latest pricing.",
"contact_label": "Contact",
"empty_state": "No matching plans found",
"load_error": "Failed to load data. Please refresh.",
"search_label": "Search",
"price_under50": "< 50",
"price_50_100": "50-100",
"price_100_300": "100-300",
"price_300_500": "300-500",
"price_over500": "> 500",
"cny": "CNY (¥)",
"usd": "USD ($)",
},
}
@app.route("/")
def index():
lang = _get_lang()
t = I18N[lang]
plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all()
return render_template(
"index.html",
site_url=SITE_URL,
site_name=SITE_NAME,
plans_json_ld=[p.to_dict() for p in plans],
lang=lang,
t=t,
)
@app.route("/api/plans")
def api_plans():
plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all()
return jsonify([p.to_dict() for p in plans])
# ---------- 前台用户与论坛 ----------
@app.route("/register", methods=["GET", "POST"])
def user_register():
lang = _get_lang()
current = _get_current_user()
if current:
if _is_banned_user(current):
session.pop("user_id", None)
else:
return redirect(url_for("forum_index"))
error = None
if request.method == "POST":
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
confirm_password = request.form.get("confirm_password") or ""
if not _is_valid_username(username):
error = _pick_lang(
"用户名需为 3-20 位,仅支持字母、数字、下划线",
"Username must be 3-20 chars (letters, numbers, underscore).",
lang,
)
elif len(password) < 6:
error = _pick_lang("密码至少 6 位", "Password must be at least 6 characters.", lang)
elif password != confirm_password:
error = _pick_lang("两次输入的密码不一致", "Passwords do not match.", lang)
elif User.query.filter(func.lower(User.username) == username.lower()).first():
error = _pick_lang("用户名已存在", "Username already exists.", lang)
else:
user = User(username=username)
user.set_password(password)
user.last_login_at = datetime.now(timezone.utc)
db.session.add(user)
db.session.commit()
session["user_id"] = user.id
return redirect(_safe_next_url("forum_index"))
return render_template("auth/register.html", error=error)
@app.route("/login", methods=["GET", "POST"])
def user_login():
lang = _get_lang()
current = _get_current_user()
if current:
if _is_banned_user(current):
session.pop("user_id", None)
else:
return redirect(url_for("forum_index"))
error = (request.args.get("error") or "").strip() or None
if request.method == "POST":
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
user = User.query.filter(func.lower(User.username) == username.lower()).first()
if not user or not user.check_password(password):
error = _pick_lang("用户名或密码错误", "Invalid username or password.", lang)
elif _is_banned_user(user):
error = _user_ban_message(user)
else:
user.last_login_at = datetime.now(timezone.utc)
db.session.commit()
session["user_id"] = user.id
return redirect(_safe_next_url("forum_index"))
return render_template("auth/login.html", error=error)
@app.route("/logout")
def user_logout():
session.pop("user_id", None)
return redirect(url_for("forum_index"))
@app.route("/profile")
def user_profile_redirect():
return redirect(url_for("user_profile"))
@app.route("/me", methods=["GET", "POST"])
@user_login_required
def user_profile():
lang = _get_lang()
user = _get_current_user()
tab = (request.args.get("tab") or "posts").strip().lower()
if tab not in {"posts", "comments", "likes", "bookmarks", "settings"}:
tab = "posts"
if request.method == "POST":
action = (request.form.get("action") or "").strip().lower()
if action == "profile":
username = (request.form.get("username") or "").strip()
if username == user.username:
return redirect(url_for("user_profile", tab="settings", msg=_pick_lang("资料未变更", "No changes detected.", lang)))
if not _is_valid_username(username):
return redirect(url_for(
"user_profile",
tab="settings",
error=_pick_lang(
"用户名需为 3-20 位,仅支持字母、数字、下划线",
"Username must be 3-20 chars (letters, numbers, underscore).",
lang,
),
))
exists = (
User.query
.filter(func.lower(User.username) == username.lower(), User.id != user.id)
.first()
)
if exists:
return redirect(url_for("user_profile", tab="settings", error=_pick_lang("用户名已存在", "Username already exists.", lang)))
user.username = username
db.session.commit()
return redirect(url_for("user_profile", tab="settings", msg=_pick_lang("用户名已更新", "Username updated.", lang)))
if action == "password":
current_password = request.form.get("current_password") or ""
new_password = request.form.get("new_password") or ""
confirm_password = request.form.get("confirm_password") or ""
if not user.check_password(current_password):
return redirect(url_for("user_profile", tab="settings", error=_pick_lang("当前密码错误", "Current password is incorrect.", lang)))
if len(new_password) < 6:
return redirect(url_for("user_profile", tab="settings", error=_pick_lang("新密码至少 6 位", "New password must be at least 6 characters.", lang)))
if new_password != confirm_password:
return redirect(url_for("user_profile", tab="settings", error=_pick_lang("两次新密码输入不一致", "New passwords do not match.", lang)))
user.set_password(new_password)
db.session.commit()
return redirect(url_for("user_profile", tab="settings", msg=_pick_lang("密码已更新", "Password updated.", lang)))
return redirect(url_for("user_profile", tab="settings", error=_pick_lang("未知操作", "Unknown action.", lang)))
my_post_rows = (
_query_forum_post_rows(active_tab="latest", author_id=user.id)
.limit(60)
.all()
)
my_post_cards = _build_forum_post_cards(my_post_rows, lang=lang)
my_comment_rows = (
db.session.query(
ForumComment,
ForumPost.id.label("post_id"),
ForumPost.title.label("post_title"),
)
.join(ForumPost, ForumComment.post_id == ForumPost.id)
.filter(ForumComment.user_id == user.id)
.order_by(ForumComment.created_at.desc(), ForumComment.id.desc())
.limit(120)
.all()
)
my_comment_items = [
{
"comment": c,
"post_id": post_id,
"post_title": post_title,
}
for c, post_id, post_title in my_comment_rows
]
my_like_rows = (
db.session.query(
ForumPostLike,
ForumPost.id.label("post_id"),
ForumPost.title.label("post_title"),
ForumPost.category.label("post_category"),
ForumPost.created_at.label("post_created_at"),
)
.join(ForumPost, ForumPostLike.post_id == ForumPost.id)
.filter(ForumPostLike.user_id == user.id)
.order_by(ForumPostLike.created_at.desc(), ForumPostLike.id.desc())
.limit(120)
.all()
)
my_like_items = [
{
"like": like_row,
"post_id": post_id,
"post_title": post_title,
"post_category": post_category,
"post_created_at": post_created_at,
}
for like_row, post_id, post_title, post_category, post_created_at in my_like_rows
]
my_bookmark_rows = (
db.session.query(
ForumPostBookmark,
ForumPost.id.label("post_id"),
ForumPost.title.label("post_title"),
ForumPost.category.label("post_category"),
ForumPost.created_at.label("post_created_at"),
)
.join(ForumPost, ForumPostBookmark.post_id == ForumPost.id)
.filter(ForumPostBookmark.user_id == user.id)
.order_by(ForumPostBookmark.created_at.desc(), ForumPostBookmark.id.desc())
.limit(120)
.all()
)
my_bookmark_items = [
{
"bookmark": bookmark_row,
"post_id": post_id,
"post_title": post_title,
"post_category": post_category,
"post_created_at": post_created_at,
}
for bookmark_row, post_id, post_title, post_category, post_created_at in my_bookmark_rows
]
stats = {
"post_count": ForumPost.query.filter_by(user_id=user.id).count(),
"comment_count": ForumComment.query.filter_by(user_id=user.id).count(),
"like_count": ForumPostLike.query.filter_by(user_id=user.id).count(),
"bookmark_count": ForumPostBookmark.query.filter_by(user_id=user.id).count(),
"report_count": ForumReport.query.filter_by(reporter_id=user.id).count(),
"pending_report_count": ForumReport.query.filter_by(reporter_id=user.id, status="pending").count(),
"notification_count": ForumNotification.query.filter_by(user_id=user.id).count(),
"unread_notification_count": ForumNotification.query.filter_by(user_id=user.id, is_read=False).count(),
}
return render_template(
"forum/profile.html",
profile_user=user,
active_tab=tab,
my_post_cards=my_post_cards,
my_comment_items=my_comment_items,
my_like_items=my_like_items,
my_bookmark_items=my_bookmark_items,
stats=stats,
message=request.args.get("msg") or "",
error=request.args.get("error") or "",
)
@app.route("/notifications")
@user_login_required
def user_notifications():
lang = _get_lang()
user = _get_current_user()
status = (request.args.get("status") or "all").strip().lower()
if status not in {"all", "unread", "read"}:
status = "all"
q = (
ForumNotification.query
.filter_by(user_id=user.id)
.options(joinedload(ForumNotification.actor_rel))
)
if status == "unread":
q = q.filter_by(is_read=False)
elif status == "read":
q = q.filter_by(is_read=True)
rows = q.order_by(ForumNotification.created_at.desc(), ForumNotification.id.desc()).limit(300).all()
items = []
for n in rows:
items.append({
"notification": n,
"type_label": _notification_type_label(n.notif_type, lang=lang),
"actor_name": n.actor_rel.username if n.actor_rel else "",
"target_url": _notification_target_url(n),
"time_text": _humanize_time(n.created_at, lang=lang),
})
status_rows = (
db.session.query(ForumNotification.is_read, func.count(ForumNotification.id))
.filter_by(user_id=user.id)
.group_by(ForumNotification.is_read)
.all()
)
read_count = 0
unread_count = 0
for is_read, count_val in status_rows:
if bool(is_read):
read_count = int(count_val or 0)
else:
unread_count = int(count_val or 0)
return render_template(
"forum/notifications.html",
active_status=status,
notification_items=items,
unread_count=unread_count,
read_count=read_count,
total_count=unread_count + read_count,
message=request.args.get("msg") or "",
error=request.args.get("error") or "",
)
@app.route("/notification/<int:notification_id>/go")
@user_login_required
def user_notification_go(notification_id):
lang = _get_lang()
user = _get_current_user()
n = ForumNotification.query.get_or_404(notification_id)
if n.user_id != user.id:
return redirect(url_for("user_notifications", error=_pick_lang("无权访问该通知", "Permission denied for this notification.", lang)))
if not n.is_read:
n.is_read = True
db.session.commit()
return redirect(_notification_target_url(n))
@app.route("/notification/<int:notification_id>/read", methods=["POST"])
@user_login_required
def user_notification_read(notification_id):
lang = _get_lang()
user = _get_current_user()
n = ForumNotification.query.get_or_404(notification_id)
if n.user_id != user.id:
return redirect(url_for("user_notifications", error=_pick_lang("无权操作该通知", "Permission denied for this notification.", lang)))
if not n.is_read:
n.is_read = True
db.session.commit()
next_url = (request.form.get("next") or "").strip()
if next_url.startswith("/") and not next_url.startswith("//"):
return redirect(next_url)
return redirect(url_for("user_notifications", msg=_pick_lang("已标记为已读", "Marked as read.", lang)))
@app.route("/notifications/read-all", methods=["POST"])
@user_login_required
def user_notifications_read_all():
lang = _get_lang()
user = _get_current_user()
unread = ForumNotification.query.filter_by(user_id=user.id, is_read=False)
updated = unread.update({"is_read": True}, synchronize_session=False)
db.session.commit()
msg = _pick_lang("已全部标记为已读", "All notifications marked as read.", lang) if updated else _pick_lang("没有未读通知", "No unread notifications.", lang)
return redirect(url_for("user_notifications", msg=msg))
@app.route("/forum")
def forum_index():
lang = _get_lang()
per_page_options = [10, 20, 30, 50]
active_tab = (request.args.get("tab") or "latest").strip().lower()
if active_tab not in {"latest", "new", "hot"}:
active_tab = "latest"
selected_category = (request.args.get("category") or "").strip() or None
if selected_category and len(selected_category) > 32:
selected_category = selected_category[:32]
search_query = (request.args.get("q") or "").strip()
if len(search_query) > 80:
search_query = search_query[:80]
page = request.args.get("page", type=int) or 1
if page < 1:
page = 1
per_page = request.args.get("per_page", type=int) or 20
if per_page not in per_page_options:
per_page = 20
rows_query = _query_forum_post_rows(
active_tab=active_tab,
selected_category=selected_category,
search_query=search_query or None,
)
total_posts = _count_forum_posts(
selected_category=selected_category,
search_query=search_query or None,
)
total_pages = max((total_posts + per_page - 1) // per_page, 1)
if page > total_pages:
page = total_pages
rows = rows_query.offset((page - 1) * per_page).limit(per_page).all()
post_cards = _build_forum_post_cards(rows, lang=lang)
sidebar = _forum_sidebar_data()
category_count_map = {name: int(count or 0) for name, count in (sidebar.get("category_counts") or [])}
category_names = list(_get_forum_category_names(active_only=True))
for name in category_count_map.keys():
if name and name not in category_names:
category_names.append(name)
if selected_category and selected_category not in category_names:
category_names.insert(0, selected_category)
tab_defs = [
("latest", _pick_lang("最新", "Latest", lang)),
("new", _pick_lang("新帖", "New", lang)),
("hot", _pick_lang("热门", "Top", lang)),
]
tab_links = [
{
"key": key,
"label": label,
"url": _build_forum_url(
tab=key,
category=selected_category,
q=search_query or None,
page=1,
per_page=per_page,
),
"active": active_tab == key,
}
for key, label in tab_defs
]
category_links = [
{
"name": _pick_lang("全部", "All", lang),
"count": None,
"url": _build_forum_url(
tab=active_tab,
category=None,
q=search_query or None,
page=1,
per_page=per_page,
),
"active": selected_category is None,
}
]
for name in category_names:
category_links.append({
"name": name,
"count": category_count_map.get(name, 0),
"url": _build_forum_url(
tab=active_tab,
category=name,
q=search_query or None,
page=1,
per_page=per_page,
),
"active": selected_category == name,
})
category_nav_url = _build_forum_url(
tab=active_tab,
category=selected_category or (category_names[0] if category_names else None),
q=search_query or None,
page=1,
per_page=per_page,
)
window_start = max(1, page - 2)
window_end = min(total_pages, page + 2)
page_links = [
{
"num": num,
"url": _build_forum_url(
tab=active_tab,
category=selected_category,
q=search_query or None,
page=num,
per_page=per_page,
),
"active": num == page,
}
for num in range(window_start, window_end + 1)
]
has_filters = bool(selected_category or search_query or active_tab != "latest")
if search_query and selected_category:
empty_hint = _pick_lang("当前分类下没有匹配关键词的帖子。", "No posts match your keywords in this category.", lang)
elif search_query:
empty_hint = _pick_lang("没有匹配关键词的帖子。", "No posts match your keywords.", lang)
elif selected_category:
empty_hint = _pick_lang("该分类暂时没有帖子。", "No posts in this category yet.", lang)
else:
empty_hint = _pick_lang("当前没有帖子,点击右上角按钮发布第一条内容。", "No posts yet. Create the first topic from the top-right button.", lang)
result_start = ((page - 1) * per_page + 1) if total_posts else 0
result_end = min(page * per_page, total_posts) if total_posts else 0
return render_template(
"forum/index.html",
post_cards=post_cards,
sidebar=sidebar,
active_tab=active_tab,
selected_category=selected_category,
search_query=search_query,
tab_links=tab_links,
category_links=category_links,
category_nav_url=category_nav_url,
total_posts=total_posts,
total_pages=total_pages,
current_page=page,
page_links=page_links,
has_prev=(page > 1),
has_next=(page < total_pages),
prev_page_url=_build_forum_url(
tab=active_tab,
category=selected_category,
q=search_query or None,
page=page - 1,
per_page=per_page,
),
next_page_url=_build_forum_url(
tab=active_tab,
category=selected_category,
q=search_query or None,
page=page + 1,
per_page=per_page,
),
clear_search_url=_build_forum_url(
tab=active_tab,
category=selected_category,
q=None,
page=1,
per_page=per_page,
),
clear_all_url=_build_forum_url(tab="latest", category=None, q=None, page=1, per_page=per_page),
has_filters=has_filters,
empty_hint=empty_hint,
result_start=result_start,
result_end=result_end,
per_page=per_page,
per_page_options=per_page_options,
message=request.args.get("msg") or "",
error=request.args.get("error") or "",
)
@app.route("/forum/post/new", methods=["GET", "POST"])
@user_login_required
def forum_post_new():
lang = _get_lang()
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user)
if blocked_resp:
return blocked_resp
error = None
title = ""
content = ""
available_categories = _get_forum_category_names(active_only=True)
category = available_categories[0] if available_categories else "综合讨论"
if request.method == "POST":
title = (request.form.get("title") or "").strip()
content = (request.form.get("content") or "").strip()
category = (request.form.get("category") or "").strip() or category
if category not in available_categories:
category = available_categories[0] if available_categories else "综合讨论"
if len(title) < 5:
error = _pick_lang("标题至少 5 个字符", "Title must be at least 5 characters.", lang)
elif len(title) > 160:
error = _pick_lang("标题不能超过 160 个字符", "Title must be 160 characters or fewer.", lang)
elif len(content) < 10:
error = _pick_lang("内容至少 10 个字符", "Content must be at least 10 characters.", lang)
else:
post = ForumPost(
user_id=user.id,
category=category,
title=title,
content=content,
)
db.session.add(post)
db.session.commit()
return redirect(url_for("forum_post_detail", post_id=post.id))
return render_template(
"forum/post_form.html",
error=error,
title_val=title,
content_val=content,
category_val=category,
categories=available_categories,
page_title=_pick_lang("创建新主题", "Create Topic", lang),
submit_text=_pick_lang("发布主题", "Publish", lang),
action_url=url_for("forum_post_new"),
cancel_url=url_for("forum_index"),
form_mode="create",
)
@app.route("/forum/post/<int:post_id>/edit", methods=["GET", "POST"])
@user_login_required
def forum_post_edit(post_id):
lang = _get_lang()
post = ForumPost.query.get_or_404(post_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id)
if blocked_resp:
return blocked_resp
if not _can_edit_post(user, post):
return _forum_redirect_with_error(post.id, "你没有权限编辑该帖子")
error = None
title = post.title or ""
content = post.content or ""
available_categories = _get_forum_category_names(active_only=True)
if post.category and post.category not in available_categories:
available_categories.insert(0, post.category)
category = post.category or (available_categories[0] if available_categories else "综合讨论")
if request.method == "POST":
title = (request.form.get("title") or "").strip()
content = (request.form.get("content") or "").strip()
category = (request.form.get("category") or "").strip() or category
if category not in available_categories:
category = available_categories[0] if available_categories else "综合讨论"
if len(title) < 5:
error = _pick_lang("标题至少 5 个字符", "Title must be at least 5 characters.", lang)
elif len(title) > 160:
error = _pick_lang("标题不能超过 160 个字符", "Title must be 160 characters or fewer.", lang)
elif len(content) < 10:
error = _pick_lang("内容至少 10 个字符", "Content must be at least 10 characters.", lang)
else:
post.title = title
post.content = content
post.category = category
db.session.commit()
return _forum_redirect_with_msg(post.id, "帖子已更新")
return render_template(
"forum/post_form.html",
error=error,
title_val=title,
content_val=content,
category_val=category,
categories=available_categories,
page_title=_pick_lang("编辑主题", "Edit Topic", lang),
submit_text=_pick_lang("保存修改", "Save Changes", lang),
action_url=url_for("forum_post_edit", post_id=post.id),
cancel_url=url_for("forum_post_detail", post_id=post.id),
form_mode="edit",
)
@app.route("/forum/post/<int:post_id>/delete", methods=["POST"])
@user_login_required
def forum_post_delete(post_id):
post = ForumPost.query.get_or_404(post_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id)
if blocked_resp:
return blocked_resp
if not _can_edit_post(user, post):
return _forum_redirect_with_error(post.id, "你没有权限删除该帖子")
db.session.delete(post)
db.session.commit()
return redirect(url_for("forum_index"))
@app.route("/forum/post/<int:post_id>")
def forum_post_detail(post_id):
post = ForumPost.query.get_or_404(post_id)
current_user = _get_current_user()
viewed_posts = session.get("viewed_posts") or []
if post.id not in viewed_posts:
post.view_count = int(post.view_count or 0) + 1
viewed_posts.append(post.id)
session["viewed_posts"] = viewed_posts[-200:]
db.session.commit()
comments = (
ForumComment.query
.options(joinedload(ForumComment.author_rel))
.filter_by(post_id=post.id)
.order_by(ForumComment.created_at.asc(), ForumComment.id.asc())
.all()
)
like_count = ForumPostLike.query.filter_by(post_id=post.id).count()
bookmark_count = ForumPostBookmark.query.filter_by(post_id=post.id).count()
liked_by_me = False
bookmarked_by_me = False
can_interact = bool(current_user and not _is_banned_user(current_user))
if current_user:
liked_by_me = (
ForumPostLike.query
.filter_by(post_id=post.id, user_id=current_user.id)
.first() is not None
)
bookmarked_by_me = (
ForumPostBookmark.query
.filter_by(post_id=post.id, user_id=current_user.id)
.first() is not None
)
sidebar = _forum_sidebar_data()
return render_template(
"forum/post_detail.html",
post=post,
comments=comments,
like_count=like_count,
bookmark_count=bookmark_count,
liked_by_me=liked_by_me,
bookmarked_by_me=bookmarked_by_me,
can_interact=can_interact,
sidebar=sidebar,
message=request.args.get("msg") or "",
error=request.args.get("error") or "",
)
@app.route("/forum/post/<int:post_id>/like", methods=["POST"])
@user_login_required
def forum_post_like_toggle(post_id):
post = ForumPost.query.get_or_404(post_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id)
if blocked_resp:
return blocked_resp
exists = ForumPostLike.query.filter_by(post_id=post.id, user_id=user.id).first()
if exists:
db.session.delete(exists)
db.session.commit()
return redirect(_safe_form_next_url(url_for("forum_post_detail", post_id=post.id, msg="已取消点赞")))
db.session.add(ForumPostLike(post_id=post.id, user_id=user.id))
db.session.commit()
return redirect(_safe_form_next_url(url_for("forum_post_detail", post_id=post.id, msg="已点赞该帖子")))
@app.route("/forum/post/<int:post_id>/bookmark", methods=["POST"])
@user_login_required
def forum_post_bookmark_toggle(post_id):
post = ForumPost.query.get_or_404(post_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id)
if blocked_resp:
return blocked_resp
exists = ForumPostBookmark.query.filter_by(post_id=post.id, user_id=user.id).first()
if exists:
db.session.delete(exists)
db.session.commit()
return redirect(_safe_form_next_url(url_for("forum_post_detail", post_id=post.id, msg="已取消收藏")))
db.session.add(ForumPostBookmark(post_id=post.id, user_id=user.id))
db.session.commit()
return redirect(_safe_form_next_url(url_for("forum_post_detail", post_id=post.id, msg="已收藏该帖子")))
@app.route("/forum/post/<int:post_id>/comment", methods=["POST"])
@user_login_required
def forum_post_comment(post_id):
post = ForumPost.query.get_or_404(post_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id)
if blocked_resp:
return blocked_resp
if post.is_locked:
return _forum_redirect_with_error(post.id, "该帖子已锁定,暂不允许新增评论")
content = (request.form.get("content") or "").strip()
if len(content) < 2:
return redirect(url_for("forum_post_detail", post_id=post.id, error="评论至少 2 个字符"))
comment = ForumComment(
post_id=post.id,
user_id=user.id,
content=content,
)
db.session.add(comment)
db.session.flush()
actor_name = user.username or "用户"
post_title = post.title or "主题"
if post.user_id and post.user_id != user.id:
_create_notification(
user_id=post.user_id,
notif_type="post_commented",
message="{} 评论了你的帖子《{}".format(actor_name, post_title),
actor_id=user.id,
post_id=post.id,
comment_id=comment.id,
)
participant_rows = (
db.session.query(ForumComment.user_id)
.filter(
ForumComment.post_id == post.id,
ForumComment.user_id.isnot(None),
ForumComment.user_id != user.id,
)
.distinct()
.limit(50)
.all()
)
for (uid,) in participant_rows:
if not uid:
continue
if uid == post.user_id or uid == user.id:
continue
_create_notification(
user_id=uid,
notif_type="thread_replied",
message="{} 在你参与的主题《{}》有新回复".format(actor_name, post_title),
actor_id=user.id,
post_id=post.id,
comment_id=comment.id,
)
db.session.commit()
return redirect(url_for("forum_post_detail", post_id=post.id, msg="评论发布成功"))
@app.route("/forum/comment/<int:comment_id>/edit", methods=["GET", "POST"])
@user_login_required
def forum_comment_edit(comment_id):
comment = ForumComment.query.get_or_404(comment_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=comment.post_id)
if blocked_resp:
return blocked_resp
if not _can_edit_comment(user, comment):
return _forum_redirect_with_error(comment.post_id, "你没有权限编辑该评论")
error = None
content = comment.content or ""
if request.method == "POST":
content = (request.form.get("content") or "").strip()
if len(content) < 2:
error = "评论至少 2 个字符"
else:
comment.content = content
db.session.commit()
return _forum_redirect_with_msg(comment.post_id, "评论已更新")
return render_template(
"forum/comment_form.html",
error=error,
comment=comment,
content_val=content,
action_url=url_for("forum_comment_edit", comment_id=comment.id),
cancel_url=url_for("forum_post_detail", post_id=comment.post_id),
)
@app.route("/forum/comment/<int:comment_id>/delete", methods=["POST"])
@user_login_required
def forum_comment_delete(comment_id):
comment = ForumComment.query.get_or_404(comment_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=comment.post_id)
if blocked_resp:
return blocked_resp
if not _can_edit_comment(user, comment):
return _forum_redirect_with_error(comment.post_id, "你没有权限删除该评论")
post_id = comment.post_id
db.session.delete(comment)
db.session.commit()
return _forum_redirect_with_msg(post_id, "评论已删除")
@app.route("/forum/report", methods=["POST"])
@user_login_required
def forum_report_create():
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user)
if blocked_resp:
return blocked_resp
target_type = (request.form.get("target_type") or "").strip().lower()
target_id = request.form.get("target_id", type=int) or 0
reason = (request.form.get("reason") or "其他").strip()
detail = (request.form.get("detail") or "").strip()
if len(detail) > 500:
detail = detail[:500]
if reason not in FORUM_REPORT_REASONS:
reason = "其他"
report_post_id = None
target_owner_id = None
snapshot_title = None
snapshot_content = None
if target_type == "post":
target_post = db.session.get(ForumPost, target_id)
if target_post is None:
return redirect(url_for("forum_index"))
report_post_id = target_post.id
target_owner_id = target_post.user_id
snapshot_title = target_post.title
snapshot_content = target_post.content
elif target_type == "comment":
target_comment = db.session.get(ForumComment, target_id)
if target_comment is None:
return redirect(url_for("forum_index"))
report_post_id = target_comment.post_id
target_owner_id = target_comment.user_id
snapshot_title = target_comment.post_rel.title if target_comment.post_rel else None
snapshot_content = target_comment.content
else:
return redirect(url_for("forum_index"))
if target_owner_id == user.id:
return _forum_redirect_with_error(report_post_id, "不能举报自己的内容")
exists = ForumReport.query.filter_by(
reporter_id=user.id,
target_type=target_type,
target_id=target_id,
status="pending",
).first()
if exists:
return _forum_redirect_with_msg(report_post_id, "你已举报该内容,请等待处理")
db.session.add(ForumReport(
reporter_id=user.id,
target_type=target_type,
target_id=target_id,
reason=reason,
detail=detail or None,
snapshot_title=snapshot_title,
snapshot_content=snapshot_content,
status="pending",
))
db.session.commit()
return _forum_redirect_with_msg(report_post_id, "举报已提交,感谢反馈")
# ---------- SEO ----------
@app.route("/sitemap.xml")
def sitemap():
from flask import make_response
url = SITE_URL.rstrip("/")
xml = f'''<?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
<url>
<loc>{url}/</loc>
<changefreq>weekly</changefreq>
<priority>1.0</priority>
</url>
<url>
<loc>{url}/forum</loc>
<changefreq>daily</changefreq>
<priority>0.9</priority>
</url>
</urlset>'''
resp = make_response(xml)
resp.mimetype = "application/xml"
return resp
@app.route("/robots.txt")
def robots():
from flask import make_response
url = SITE_URL.rstrip("/")
txt = f"""User-agent: *
Allow: /
Sitemap: {url}/sitemap.xml
"""
resp = make_response(txt)
resp.mimetype = "text/plain"
return resp
@app.route("/favicon.ico")
def favicon():
return redirect(url_for("static", filename="img/site-logo-mark.svg"))
# ---------- 后台 ----------
@app.route("/admin/login", methods=["GET", "POST"])
def admin_login():
if request.method == "POST":
password = request.form.get("password", "")
if password == ADMIN_PASSWORD:
session["admin_logged_in"] = True
return redirect(url_for("admin_dashboard"))
return render_template("admin/login.html", error="密码错误")
return render_template("admin/login.html")
@app.route("/admin/logout")
def admin_logout():
session.pop("admin_logged_in", None)
return redirect(url_for("index"))
@app.route("/admin/api/plan/<int:plan_id>")
@admin_required
def admin_api_plan(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
return jsonify({
"id": plan.id,
"provider_id": plan.provider_id,
"countries": plan.countries or "",
"vcpu": plan.vcpu,
"memory_gb": plan.memory_gb,
"storage_gb": plan.storage_gb,
"bandwidth_mbps": plan.bandwidth_mbps,
"traffic": plan.traffic or "",
"price_cny": float(plan.price_cny) if plan.price_cny is not None else None,
"price_usd": float(plan.price_usd) if plan.price_usd is not None else None,
"currency": plan.currency or "CNY",
"official_url": plan.official_url or "",
})
@app.route("/admin/api/plan/<int:plan_id>/price-history")
@admin_required
def admin_api_plan_price_history(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
rows = (
PriceHistory.query
.filter_by(plan_id=plan.id)
.order_by(PriceHistory.captured_at.desc(), PriceHistory.id.desc())
.limit(30)
.all()
)
return jsonify([
{
"id": r.id,
"price_cny": float(r.price_cny) if r.price_cny is not None else None,
"price_usd": float(r.price_usd) if r.price_usd is not None else None,
"currency": r.currency or "CNY",
"source": r.source or "",
"captured_at": r.captured_at.isoformat() if r.captured_at else "",
}
for r in rows
])
@app.route("/admin")
@admin_required
def admin_dashboard():
providers = Provider.query.order_by(Provider.name).all()
plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all()
plan_trends = _build_plan_trend_map(plans)
return render_template(
"admin/dashboard.html",
providers=providers,
plans=plans,
plan_trends=plan_trends,
country_tags=COUNTRY_TAGS,
)
# ---------- 厂商管理 ----------
@app.route("/admin/providers")
@admin_required
def admin_providers():
providers = Provider.query.order_by(Provider.name).all()
return render_template("admin/providers.html", providers=providers)
@app.route("/admin/provider/new", methods=["GET", "POST"])
@admin_required
def admin_provider_new():
if request.method == "POST":
name = request.form.get("name", "").strip()
official_url = request.form.get("official_url", "").strip() or None
if not name:
return render_template("admin/provider_form.html", provider=None, error="请填写厂商名称")
if Provider.query.filter_by(name=name).first():
return render_template("admin/provider_form.html", provider=None, error="该厂商名称已存在")
p = Provider(name=name, official_url=official_url)
db.session.add(p)
db.session.commit()
return redirect(url_for("admin_provider_detail", provider_id=p.id))
return render_template("admin/provider_form.html", provider=None)
@app.route("/admin/provider/<int:provider_id>")
@admin_required
def admin_provider_detail(provider_id):
provider = Provider.query.get_or_404(provider_id)
plans = VPSPlan.query.filter(
(VPSPlan.provider_id == provider_id) | (VPSPlan.provider == provider.name)
).order_by(VPSPlan.price_cny.asc(), VPSPlan.name).all()
providers = Provider.query.order_by(Provider.name).all()
plan_trends = _build_plan_trend_map(plans)
return render_template(
"admin/provider_detail.html",
provider=provider,
plans=plans,
plan_trends=plan_trends,
providers=providers,
country_tags=COUNTRY_TAGS,
)
@app.route("/admin/provider/<int:provider_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_provider_edit(provider_id):
provider = Provider.query.get_or_404(provider_id)
if request.method == "POST":
provider.name = request.form.get("name", "").strip()
provider.official_url = request.form.get("official_url", "").strip() or None
if not provider.name:
return render_template("admin/provider_form.html", provider=provider, error="请填写厂商名称")
db.session.commit()
return redirect(url_for("admin_provider_detail", provider_id=provider.id))
return render_template("admin/provider_form.html", provider=provider)
@app.route("/admin/provider/<int:provider_id>/delete", methods=["POST"])
@admin_required
def admin_provider_delete(provider_id):
provider = Provider.query.get_or_404(provider_id)
# 将该厂商下的配置改为无厂商关联(保留配置,仅清空 provider_id
VPSPlan.query.filter_by(provider_id=provider_id).update({"provider_id": None})
db.session.delete(provider)
db.session.commit()
return redirect(url_for("admin_providers"))
def _parse_sort_order(raw, default=100):
s = (raw or "").strip()
if not s:
return default
try:
return int(s)
except ValueError:
return default
def _admin_user_counts(user_ids):
"""批量统计用户维度数据,减少列表页 N+1 查询。"""
if not user_ids:
return {
"posts": {},
"comments": {},
"reports": {},
"unread_notifications": {},
}
post_counts = {
uid: int(cnt or 0)
for uid, cnt in (
db.session.query(ForumPost.user_id, func.count(ForumPost.id))
.filter(ForumPost.user_id.in_(user_ids))
.group_by(ForumPost.user_id)
.all()
)
}
comment_counts = {
uid: int(cnt or 0)
for uid, cnt in (
db.session.query(ForumComment.user_id, func.count(ForumComment.id))
.filter(ForumComment.user_id.in_(user_ids))
.group_by(ForumComment.user_id)
.all()
)
}
report_counts = {
uid: int(cnt or 0)
for uid, cnt in (
db.session.query(ForumReport.reporter_id, func.count(ForumReport.id))
.filter(ForumReport.reporter_id.in_(user_ids))
.group_by(ForumReport.reporter_id)
.all()
)
}
unread_notification_counts = {
uid: int(cnt or 0)
for uid, cnt in (
db.session.query(ForumNotification.user_id, func.count(ForumNotification.id))
.filter(ForumNotification.user_id.in_(user_ids), ForumNotification.is_read.is_(False))
.group_by(ForumNotification.user_id)
.all()
)
}
return {
"posts": post_counts,
"comments": comment_counts,
"reports": report_counts,
"unread_notifications": unread_notification_counts,
}
def _admin_load_user_options(limit=400):
return (
User.query
.order_by(User.username.asc(), User.id.asc())
.limit(limit)
.all()
)
def _admin_load_post_options(limit=400):
return (
ForumPost.query
.order_by(ForumPost.created_at.desc(), ForumPost.id.desc())
.limit(limit)
.all()
)
def _admin_fill_post_and_user_options(post_options, selected_post_id, user_options, selected_user_id):
"""确保编辑场景中的当前值始终出现在下拉框中。"""
if selected_post_id and all(p.id != selected_post_id for p in post_options):
selected_post = db.session.get(ForumPost, selected_post_id)
if selected_post:
post_options = [selected_post] + post_options
if selected_user_id and all(u.id != selected_user_id for u in user_options):
selected_user = db.session.get(User, selected_user_id)
if selected_user:
user_options = [selected_user] + user_options
return post_options, user_options
# ---------- 论坛分类管理 ----------
@app.route("/admin/forum/categories")
@admin_required
def admin_forum_categories():
categories = _load_forum_categories(active_only=False)
posts_by_category = {
name: count
for name, count in (
db.session.query(ForumPost.category, func.count(ForumPost.id))
.group_by(ForumPost.category)
.all()
)
}
return render_template(
"admin/forum_categories.html",
categories=categories,
posts_by_category=posts_by_category,
msg=request.args.get("msg", ""),
error=request.args.get("error", ""),
)
@app.route("/admin/forum/category/new", methods=["GET", "POST"])
@admin_required
def admin_forum_category_new():
error = ""
name_val = ""
sort_order_val = 100
is_active_val = True
if request.method == "POST":
name_val = (request.form.get("name") or "").strip()
sort_order_val = _parse_sort_order(request.form.get("sort_order"), 100)
is_active_val = bool(request.form.get("is_active"))
if not name_val:
error = "请填写分类名称"
elif len(name_val) > 32:
error = "分类名称最多 32 个字符"
elif ForumCategory.query.filter(func.lower(ForumCategory.name) == name_val.lower()).first():
error = "分类名称已存在"
else:
db.session.add(ForumCategory(
name=name_val,
sort_order=sort_order_val,
is_active=is_active_val,
))
db.session.commit()
return redirect(url_for("admin_forum_categories", msg="已新增分类:{}".format(name_val)))
return render_template(
"admin/forum_category_form.html",
page_title="新增论坛分类",
submit_text="创建分类",
action_url=url_for("admin_forum_category_new"),
error=error,
name_val=name_val,
sort_order_val=sort_order_val,
is_active_val=is_active_val,
category_id=None,
)
@app.route("/admin/forum/category/<int:category_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_forum_category_edit(category_id):
category = ForumCategory.query.get_or_404(category_id)
error = ""
name_val = category.name
sort_order_val = category.sort_order
is_active_val = bool(category.is_active)
if request.method == "POST":
name_val = (request.form.get("name") or "").strip()
sort_order_val = _parse_sort_order(request.form.get("sort_order"), category.sort_order)
is_active_val = bool(request.form.get("is_active"))
if not name_val:
error = "请填写分类名称"
elif len(name_val) > 32:
error = "分类名称最多 32 个字符"
elif category.is_active and not is_active_val and ForumCategory.query.filter_by(is_active=True).count() <= 1:
error = "至少保留一个启用分类"
else:
exists = (
ForumCategory.query
.filter(func.lower(ForumCategory.name) == name_val.lower(), ForumCategory.id != category.id)
.first()
)
if exists:
error = "分类名称已存在"
else:
old_name = category.name
category.name = name_val
category.sort_order = sort_order_val
category.is_active = is_active_val
if old_name != name_val:
ForumPost.query.filter_by(category=old_name).update({"category": name_val})
db.session.commit()
return redirect(url_for("admin_forum_categories", msg="已更新分类:{}".format(name_val)))
return render_template(
"admin/forum_category_form.html",
page_title="编辑论坛分类",
submit_text="保存修改",
action_url=url_for("admin_forum_category_edit", category_id=category.id),
error=error,
name_val=name_val,
sort_order_val=sort_order_val,
is_active_val=is_active_val,
category_id=category.id,
)
@app.route("/admin/forum/category/<int:category_id>/delete", methods=["POST"])
@admin_required
def admin_forum_category_delete(category_id):
category = ForumCategory.query.get_or_404(category_id)
total = ForumCategory.query.count()
if total <= 1:
return redirect(url_for("admin_forum_categories", error="至少保留一个分类,无法删除最后一个"))
if category.is_active and ForumCategory.query.filter_by(is_active=True).count() <= 1:
return redirect(url_for("admin_forum_categories", error="至少保留一个启用分类,无法删除最后一个启用项"))
replacement = (
ForumCategory.query
.filter(ForumCategory.id != category.id)
.order_by(ForumCategory.is_active.desc(), ForumCategory.sort_order.asc(), ForumCategory.id.asc())
.first()
)
if replacement is None:
return redirect(url_for("admin_forum_categories", error="未找到可替代分类"))
ForumPost.query.filter_by(category=category.name).update({"category": replacement.name})
db.session.delete(category)
db.session.commit()
return redirect(url_for("admin_forum_categories", msg="已删除分类,帖子迁移到:{}".format(replacement.name)))
def _get_report_target_info(report):
"""返回举报目标的展示信息。"""
info = {
"exists": False,
"post_id": None,
"title": report.snapshot_title or "",
"content": report.snapshot_content or "",
"author_name": "",
}
if report.target_type == "post":
post = db.session.get(ForumPost, report.target_id)
if post:
info.update({
"exists": True,
"post_id": post.id,
"title": post.title or info["title"],
"content": post.content or info["content"],
"author_name": post.author_rel.username if post.author_rel else "",
})
elif report.target_type == "comment":
comment = db.session.get(ForumComment, report.target_id)
if comment:
info.update({
"exists": True,
"post_id": comment.post_id,
"title": comment.post_rel.title if comment.post_rel else (info["title"] or ""),
"content": comment.content or info["content"],
"author_name": comment.author_rel.username if comment.author_rel else "",
})
if info["content"] and len(info["content"]) > 140:
info["content"] = info["content"][:140] + "..."
return info
@app.route("/admin/forum/reports")
@admin_required
def admin_forum_reports():
status = (request.args.get("status") or "pending").strip().lower()
if status not in {"pending", "processed", "rejected", "all"}:
status = "pending"
q = ForumReport.query.order_by(ForumReport.created_at.desc(), ForumReport.id.desc())
if status != "all":
q = q.filter_by(status=status)
reports = q.limit(300).all()
report_items = []
for r in reports:
report_items.append({
"report": r,
"target": _get_report_target_info(r),
"reporter_name": r.reporter_rel.username if r.reporter_rel else "用户",
})
grouped = (
db.session.query(ForumReport.status, func.count(ForumReport.id))
.group_by(ForumReport.status)
.all()
)
count_map = {k: int(v or 0) for k, v in grouped}
return render_template(
"admin/forum_reports.html",
status=status,
report_items=report_items,
status_count_map=count_map,
status_labels=FORUM_REPORT_STATUS_LABELS,
msg=request.args.get("msg", ""),
error=request.args.get("error", ""),
)
@app.route("/admin/forum/report/<int:report_id>/process", methods=["POST"])
@admin_required
def admin_forum_report_process(report_id):
report = ForumReport.query.get_or_404(report_id)
action = (request.form.get("action") or "").strip().lower()
review_note = (request.form.get("review_note") or "").strip()
if len(review_note) > 500:
review_note = review_note[:500]
if report.status != "pending":
return redirect(url_for("admin_forum_reports", error="该举报已处理"))
outcome = ""
target_owner_id = None
target_post_id = None
target_kind_label = "内容"
if action == "delete_target":
deleted = False
if report.target_type == "post":
target = db.session.get(ForumPost, report.target_id)
if target:
target_owner_id = target.user_id
target_post_id = target.id
target_kind_label = "帖子"
db.session.delete(target)
deleted = True
outcome = "已删除被举报帖子" if deleted else "目标帖子已不存在"
elif report.target_type == "comment":
target = db.session.get(ForumComment, report.target_id)
if target:
target_owner_id = target.user_id
target_post_id = target.post_id
target_kind_label = "评论"
db.session.delete(target)
deleted = True
outcome = "已删除被举报评论" if deleted else "目标评论已不存在"
else:
return redirect(url_for("admin_forum_reports", error="未知举报目标类型"))
report.status = "processed"
report.review_note = review_note or outcome
elif action == "keep":
report.status = "processed"
report.review_note = review_note or "审核后保留内容"
outcome = "已标记为保留"
elif action == "reject":
report.status = "rejected"
report.review_note = review_note or "举报不成立"
outcome = "已驳回举报"
else:
return redirect(url_for("admin_forum_reports", error="未知处理动作"))
report.reviewed_by = "admin"
report.reviewed_at = datetime.now(timezone.utc)
_create_notification(
user_id=report.reporter_id,
notif_type="report_processed",
message="你提交的举报(#{})处理结果:{}".format(report.id, outcome),
report_id=report.id,
post_id=target_post_id,
)
if action == "delete_target" and target_owner_id and target_owner_id != report.reporter_id:
_create_notification(
user_id=target_owner_id,
notif_type="content_removed",
message="你的{}因举报处理已被删除".format(target_kind_label),
report_id=report.id,
post_id=target_post_id,
)
db.session.commit()
return redirect(url_for("admin_forum_reports", msg=outcome))
@app.route("/admin/users")
@admin_required
def admin_users():
keyword = (request.args.get("q") or "").strip()
if len(keyword) > 50:
keyword = keyword[:50]
q = User.query
if keyword:
pattern = "%{}%".format(keyword)
q = q.filter(User.username.ilike(pattern))
users = q.order_by(User.created_at.desc(), User.id.desc()).limit(300).all()
user_ids = [u.id for u in users]
count_maps = _admin_user_counts(user_ids)
return render_template(
"admin/users.html",
users=users,
keyword=keyword,
post_count_map=count_maps["posts"],
comment_count_map=count_maps["comments"],
report_count_map=count_maps["reports"],
unread_notification_count_map=count_maps["unread_notifications"],
msg=request.args.get("msg", ""),
error=request.args.get("error", ""),
)
@app.route("/admin/user/new", methods=["GET", "POST"])
@admin_required
def admin_user_new():
error = ""
username_val = ""
if request.method == "POST":
username_val = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
confirm_password = request.form.get("confirm_password") or ""
if not _is_valid_username(username_val):
error = "用户名需为 3-20 位,仅支持字母、数字、下划线"
elif len(password) < 6:
error = "密码至少 6 位"
elif password != confirm_password:
error = "两次输入的密码不一致"
elif User.query.filter(func.lower(User.username) == username_val.lower()).first():
error = "用户名已存在"
else:
user = User(username=username_val)
user.set_password(password)
db.session.add(user)
db.session.commit()
return redirect(url_for("admin_users", msg="已新增用户:{}".format(username_val)))
return render_template(
"admin/user_form.html",
page_title="新增用户",
submit_text="创建用户",
action_url=url_for("admin_user_new"),
error=error,
username_val=username_val,
user_id=None,
)
@app.route("/admin/user/<int:user_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_user_edit(user_id):
user = User.query.get_or_404(user_id)
error = ""
username_val = user.username or ""
if request.method == "POST":
username_val = (request.form.get("username") or "").strip()
new_password = request.form.get("new_password") or ""
confirm_password = request.form.get("confirm_password") or ""
if not _is_valid_username(username_val):
error = "用户名需为 3-20 位,仅支持字母、数字、下划线"
elif (
User.query
.filter(func.lower(User.username) == username_val.lower(), User.id != user.id)
.first()
):
error = "用户名已存在"
elif new_password and len(new_password) < 6:
error = "新密码至少 6 位"
elif new_password and new_password != confirm_password:
error = "两次新密码输入不一致"
else:
old_username = user.username
user.username = username_val
changed = False
if old_username != username_val:
changed = True
if new_password:
user.set_password(new_password)
changed = True
if changed:
db.session.commit()
return redirect(url_for("admin_users", msg="已更新用户:{}".format(username_val)))
return redirect(url_for("admin_users", msg="未检测到变更"))
return render_template(
"admin/user_form.html",
page_title="编辑用户",
submit_text="保存修改",
action_url=url_for("admin_user_edit", user_id=user.id),
error=error,
username_val=username_val,
user_id=user.id,
)
@app.route("/admin/user/<int:user_id>/delete", methods=["POST"])
@admin_required
def admin_user_delete(user_id):
user = User.query.get_or_404(user_id)
if User.query.count() <= 1:
return redirect(url_for("admin_users", error="至少保留一个用户,无法删除最后一个"))
username = user.username or "用户"
try:
# 其他用户已收到的通知可能引用该用户为 actor删除前置空避免外键冲突。
ForumNotification.query.filter_by(actor_id=user.id).update({"actor_id": None}, synchronize_session=False)
db.session.delete(user)
db.session.commit()
return redirect(url_for("admin_users", msg="已删除用户:{}".format(username)))
except Exception:
db.session.rollback()
return redirect(url_for("admin_users", error="删除失败,请稍后重试"))
@app.route("/admin/user/<int:user_id>/ban", methods=["POST"])
@admin_required
def admin_user_ban(user_id):
user = User.query.get_or_404(user_id)
reason = (request.form.get("reason") or "").strip()
if len(reason) > 255:
reason = reason[:255]
if user.is_banned:
return redirect(url_for("admin_users", msg="用户已处于封禁状态:{}".format(user.username)))
user.is_banned = True
user.banned_at = datetime.now(timezone.utc)
user.banned_reason = reason or "管理员封禁"
db.session.commit()
return redirect(url_for("admin_users", msg="已封禁用户:{}".format(user.username)))
@app.route("/admin/user/<int:user_id>/unban", methods=["POST"])
@admin_required
def admin_user_unban(user_id):
user = User.query.get_or_404(user_id)
if not user.is_banned:
return redirect(url_for("admin_users", msg="用户未被封禁:{}".format(user.username)))
user.is_banned = False
user.banned_at = None
user.banned_reason = None
db.session.commit()
return redirect(url_for("admin_users", msg="已解封用户:{}".format(user.username)))
@app.route("/admin/forum/posts")
@admin_required
def admin_forum_posts():
keyword = (request.args.get("q") or "").strip()
if len(keyword) > 80:
keyword = keyword[:80]
selected_category = (request.args.get("category") or "").strip() or None
selected_author_id = request.args.get("author_id", type=int)
comment_stats_subq = (
db.session.query(
ForumComment.post_id.label("post_id"),
func.count(ForumComment.id).label("comment_count"),
)
.group_by(ForumComment.post_id)
.subquery()
)
like_stats_subq = (
db.session.query(
ForumPostLike.post_id.label("post_id"),
func.count(ForumPostLike.id).label("like_count"),
)
.group_by(ForumPostLike.post_id)
.subquery()
)
bookmark_stats_subq = (
db.session.query(
ForumPostBookmark.post_id.label("post_id"),
func.count(ForumPostBookmark.id).label("bookmark_count"),
)
.group_by(ForumPostBookmark.post_id)
.subquery()
)
q = (
db.session.query(
ForumPost,
func.coalesce(comment_stats_subq.c.comment_count, 0).label("comment_count"),
User.username.label("author_name"),
func.coalesce(like_stats_subq.c.like_count, 0).label("like_count"),
func.coalesce(bookmark_stats_subq.c.bookmark_count, 0).label("bookmark_count"),
)
.outerjoin(comment_stats_subq, comment_stats_subq.c.post_id == ForumPost.id)
.outerjoin(like_stats_subq, like_stats_subq.c.post_id == ForumPost.id)
.outerjoin(bookmark_stats_subq, bookmark_stats_subq.c.post_id == ForumPost.id)
.outerjoin(User, User.id == ForumPost.user_id)
)
if selected_category:
q = q.filter(ForumPost.category == selected_category)
if selected_author_id:
q = q.filter(ForumPost.user_id == selected_author_id)
if keyword:
pattern = "%{}%".format(keyword)
q = q.filter(
or_(
ForumPost.title.ilike(pattern),
ForumPost.content.ilike(pattern),
User.username.ilike(pattern),
)
)
rows = q.order_by(ForumPost.is_pinned.desc(), ForumPost.created_at.desc(), ForumPost.id.desc()).limit(400).all()
category_names = list(_get_forum_category_names(active_only=False))
for (name,) in db.session.query(ForumPost.category).distinct().all():
if name and name not in category_names:
category_names.append(name)
if selected_category and selected_category not in category_names:
category_names.insert(0, selected_category)
author_rows = (
db.session.query(
User.id,
User.username,
func.count(ForumPost.id).label("post_count"),
)
.outerjoin(ForumPost, ForumPost.user_id == User.id)
.group_by(User.id)
.order_by(func.count(ForumPost.id).desc(), User.username.asc())
.limit(300)
.all()
)
return render_template(
"admin/forum_posts.html",
rows=rows,
category_names=category_names,
author_rows=author_rows,
keyword=keyword,
selected_category=selected_category,
selected_author_id=selected_author_id,
msg=request.args.get("msg", ""),
error=request.args.get("error", ""),
)
@app.route("/admin/forum/post/<int:post_id>/moderate", methods=["POST"])
@admin_required
def admin_forum_post_moderate(post_id):
post = ForumPost.query.get_or_404(post_id)
action = (request.form.get("action") or "").strip().lower()
if action == "pin":
post.is_pinned = True
elif action == "unpin":
post.is_pinned = False
elif action == "feature":
post.is_featured = True
elif action == "unfeature":
post.is_featured = False
elif action == "lock":
post.is_locked = True
elif action == "unlock":
post.is_locked = False
else:
return redirect(url_for("admin_forum_posts", error="未知帖子管理动作"))
db.session.commit()
return redirect(url_for("admin_forum_posts", msg="已更新帖子 #{} 状态".format(post.id)))
@app.route("/admin/forum/post/new", methods=["GET", "POST"])
@admin_required
def admin_forum_post_new():
error = ""
users = _admin_load_user_options(limit=400)
categories = _get_forum_category_names(active_only=False)
if not categories:
categories = list(DEFAULT_FORUM_CATEGORIES)
selected_author_id = request.args.get("author_id", type=int) or (users[0].id if users else None)
selected_category = request.args.get("category") or (categories[0] if categories else "综合讨论")
is_pinned_val = False
is_featured_val = False
is_locked_val = False
title_val = ""
content_val = ""
if request.method == "POST":
selected_author_id = request.form.get("author_id", type=int)
selected_category = (request.form.get("category") or "").strip() or selected_category
is_pinned_val = bool(request.form.get("is_pinned"))
is_featured_val = bool(request.form.get("is_featured"))
is_locked_val = bool(request.form.get("is_locked"))
title_val = (request.form.get("title") or "").strip()
content_val = (request.form.get("content") or "").strip()
author = db.session.get(User, selected_author_id or 0)
if not author:
error = "请选择有效作者"
elif selected_category not in categories:
error = "请选择有效分类"
elif len(title_val) < 5:
error = "标题至少 5 个字符"
elif len(title_val) > 160:
error = "标题不能超过 160 个字符"
elif len(content_val) < 10:
error = "内容至少 10 个字符"
else:
db.session.add(ForumPost(
user_id=author.id,
category=selected_category,
title=title_val,
content=content_val,
is_pinned=is_pinned_val,
is_featured=is_featured_val,
is_locked=is_locked_val,
))
db.session.commit()
return redirect(url_for("admin_forum_posts", msg="已新增帖子"))
if not users:
error = error or "当前没有可用用户,请先在用户管理中新增用户"
return render_template(
"admin/forum_post_form.html",
page_title="后台新增帖子",
submit_text="创建帖子",
action_url=url_for("admin_forum_post_new"),
cancel_url=url_for("admin_forum_posts"),
error=error,
users=users,
categories=categories,
selected_author_id=selected_author_id,
selected_category=selected_category,
is_pinned_val=is_pinned_val,
is_featured_val=is_featured_val,
is_locked_val=is_locked_val,
title_val=title_val,
content_val=content_val,
post_id=None,
)
@app.route("/admin/forum/post/<int:post_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_forum_post_edit(post_id):
post = ForumPost.query.get_or_404(post_id)
error = ""
users = _admin_load_user_options(limit=400)
categories = _get_forum_category_names(active_only=False)
if post.category and post.category not in categories:
categories.insert(0, post.category)
selected_author_id = post.user_id
selected_category = post.category or (categories[0] if categories else "综合讨论")
is_pinned_val = bool(post.is_pinned)
is_featured_val = bool(post.is_featured)
is_locked_val = bool(post.is_locked)
title_val = post.title or ""
content_val = post.content or ""
if request.method == "POST":
selected_author_id = request.form.get("author_id", type=int)
selected_category = (request.form.get("category") or "").strip() or selected_category
is_pinned_val = bool(request.form.get("is_pinned"))
is_featured_val = bool(request.form.get("is_featured"))
is_locked_val = bool(request.form.get("is_locked"))
title_val = (request.form.get("title") or "").strip()
content_val = (request.form.get("content") or "").strip()
author = db.session.get(User, selected_author_id or 0)
if not author:
error = "请选择有效作者"
elif selected_category not in categories:
error = "请选择有效分类"
elif len(title_val) < 5:
error = "标题至少 5 个字符"
elif len(title_val) > 160:
error = "标题不能超过 160 个字符"
elif len(content_val) < 10:
error = "内容至少 10 个字符"
else:
post.user_id = author.id
post.category = selected_category
post.is_pinned = is_pinned_val
post.is_featured = is_featured_val
post.is_locked = is_locked_val
post.title = title_val
post.content = content_val
db.session.commit()
return redirect(url_for("admin_forum_posts", msg="已更新帖子 #{}".format(post.id)))
if not users:
error = error or "当前没有可用用户,请先在用户管理中新增用户"
return render_template(
"admin/forum_post_form.html",
page_title="后台编辑帖子",
submit_text="保存修改",
action_url=url_for("admin_forum_post_edit", post_id=post.id),
cancel_url=url_for("admin_forum_posts"),
error=error,
users=users,
categories=categories,
selected_author_id=selected_author_id,
selected_category=selected_category,
is_pinned_val=is_pinned_val,
is_featured_val=is_featured_val,
is_locked_val=is_locked_val,
title_val=title_val,
content_val=content_val,
post_id=post.id,
)
@app.route("/admin/forum/post/<int:post_id>/delete", methods=["POST"])
@admin_required
def admin_forum_post_delete(post_id):
post = ForumPost.query.get_or_404(post_id)
db.session.delete(post)
db.session.commit()
return redirect(url_for("admin_forum_posts", msg="已删除帖子 #{}".format(post_id)))
@app.route("/admin/forum/comments")
@admin_required
def admin_forum_comments():
keyword = (request.args.get("q") or "").strip()
if len(keyword) > 80:
keyword = keyword[:80]
selected_author_id = request.args.get("author_id", type=int)
selected_post_id = request.args.get("post_id", type=int)
q = (
db.session.query(
ForumComment,
ForumPost.title.label("post_title"),
User.username.label("author_name"),
)
.join(ForumPost, ForumComment.post_id == ForumPost.id)
.outerjoin(User, User.id == ForumComment.user_id)
)
if selected_post_id:
q = q.filter(ForumComment.post_id == selected_post_id)
if selected_author_id:
q = q.filter(ForumComment.user_id == selected_author_id)
if keyword:
pattern = "%{}%".format(keyword)
q = q.filter(
or_(
ForumComment.content.ilike(pattern),
ForumPost.title.ilike(pattern),
User.username.ilike(pattern),
)
)
rows = q.order_by(ForumComment.created_at.desc(), ForumComment.id.desc()).limit(500).all()
author_rows = (
db.session.query(
User.id,
User.username,
func.count(ForumComment.id).label("comment_count"),
)
.outerjoin(ForumComment, ForumComment.user_id == User.id)
.group_by(User.id)
.order_by(func.count(ForumComment.id).desc(), User.username.asc())
.limit(300)
.all()
)
post_rows = (
db.session.query(
ForumPost.id,
ForumPost.title,
)
.order_by(ForumPost.created_at.desc(), ForumPost.id.desc())
.limit(300)
.all()
)
if selected_post_id and all(pid != selected_post_id for pid, _ in post_rows):
selected_post = db.session.get(ForumPost, selected_post_id)
if selected_post:
post_rows = [(selected_post.id, selected_post.title)] + post_rows
return render_template(
"admin/forum_comments.html",
rows=rows,
author_rows=author_rows,
post_rows=post_rows,
keyword=keyword,
selected_author_id=selected_author_id,
selected_post_id=selected_post_id,
msg=request.args.get("msg", ""),
error=request.args.get("error", ""),
)
@app.route("/admin/forum/comment/new", methods=["GET", "POST"])
@admin_required
def admin_forum_comment_new():
error = ""
post_options = _admin_load_post_options(limit=400)
user_options = _admin_load_user_options(limit=400)
selected_post_id = request.args.get("post_id", type=int) or (post_options[0].id if post_options else None)
selected_user_id = request.args.get("user_id", type=int) or (user_options[0].id if user_options else None)
post_options, user_options = _admin_fill_post_and_user_options(
post_options,
selected_post_id,
user_options,
selected_user_id,
)
content_val = ""
if request.method == "POST":
selected_post_id = request.form.get("post_id", type=int)
selected_user_id = request.form.get("user_id", type=int)
content_val = (request.form.get("content") or "").strip()
post_options, user_options = _admin_fill_post_and_user_options(
post_options,
selected_post_id,
user_options,
selected_user_id,
)
target_post = db.session.get(ForumPost, selected_post_id or 0)
target_user = db.session.get(User, selected_user_id or 0)
if not target_post:
error = "请选择有效帖子"
elif not target_user:
error = "请选择有效用户"
elif len(content_val) < 2:
error = "评论至少 2 个字符"
else:
db.session.add(ForumComment(
post_id=target_post.id,
user_id=target_user.id,
content=content_val,
))
db.session.commit()
return redirect(url_for("admin_forum_comments", post_id=target_post.id, msg="已新增评论"))
if not post_options:
error = error or "暂无可评论的帖子,请先新增帖子"
elif not user_options:
error = error or "当前没有可用用户,请先在用户管理中新增用户"
return render_template(
"admin/forum_comment_form.html",
page_title="后台新增评论",
submit_text="创建评论",
action_url=url_for("admin_forum_comment_new"),
cancel_url=url_for("admin_forum_comments"),
error=error,
post_options=post_options,
user_options=user_options,
selected_post_id=selected_post_id,
selected_user_id=selected_user_id,
content_val=content_val,
comment_id=None,
)
@app.route("/admin/forum/comment/<int:comment_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_forum_comment_edit(comment_id):
comment = ForumComment.query.get_or_404(comment_id)
error = ""
post_options = _admin_load_post_options(limit=400)
user_options = _admin_load_user_options(limit=400)
selected_post_id = comment.post_id
selected_user_id = comment.user_id
post_options, user_options = _admin_fill_post_and_user_options(
post_options,
selected_post_id,
user_options,
selected_user_id,
)
content_val = comment.content or ""
if request.method == "POST":
selected_post_id = request.form.get("post_id", type=int)
selected_user_id = request.form.get("user_id", type=int)
content_val = (request.form.get("content") or "").strip()
post_options, user_options = _admin_fill_post_and_user_options(
post_options,
selected_post_id,
user_options,
selected_user_id,
)
target_post = db.session.get(ForumPost, selected_post_id or 0)
target_user = db.session.get(User, selected_user_id or 0)
if not target_post:
error = "请选择有效帖子"
elif not target_user:
error = "请选择有效用户"
elif len(content_val) < 2:
error = "评论至少 2 个字符"
else:
comment.post_id = target_post.id
comment.user_id = target_user.id
comment.content = content_val
db.session.commit()
return redirect(url_for("admin_forum_comments", post_id=target_post.id, msg="已更新评论 #{}".format(comment.id)))
if not post_options:
error = error or "暂无可评论的帖子,请先新增帖子"
elif not user_options:
error = error or "当前没有可用用户,请先在用户管理中新增用户"
return render_template(
"admin/forum_comment_form.html",
page_title="后台编辑评论",
submit_text="保存修改",
action_url=url_for("admin_forum_comment_edit", comment_id=comment.id),
cancel_url=url_for("admin_forum_comments", post_id=selected_post_id),
error=error,
post_options=post_options,
user_options=user_options,
selected_post_id=selected_post_id,
selected_user_id=selected_user_id,
content_val=content_val,
comment_id=comment.id,
)
@app.route("/admin/forum/comment/<int:comment_id>/delete", methods=["POST"])
@admin_required
def admin_forum_comment_delete(comment_id):
comment = ForumComment.query.get_or_404(comment_id)
post_id = comment.post_id
db.session.delete(comment)
db.session.commit()
return redirect(url_for("admin_forum_comments", post_id=post_id, msg="已删除评论 #{}".format(comment_id)))
@app.route("/admin/plan/new", methods=["GET", "POST"])
@admin_required
def admin_plan_new():
provider_id = request.args.get("provider_id", type=int)
if request.method == "POST":
return _save_plan(None)
providers = Provider.query.order_by(Provider.name).all()
return render_template(
"admin/plan_form.html",
plan=None,
country_tags=COUNTRY_TAGS,
providers=providers,
preselected_provider_id=provider_id,
)
@app.route("/admin/plan/<int:plan_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_plan_edit(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
if request.method == "POST":
return _save_plan(plan)
return redirect(url_for("admin_dashboard"))
def _parse_optional_int(s):
s = (s or "").strip()
if not s:
return None
try:
return int(s)
except ValueError:
return None
def _parse_optional_float(s):
s = (s or "").strip()
if not s:
return None
try:
return float(s)
except ValueError:
return None
def _save_plan(plan):
provider_id = request.form.get("provider_id", type=int)
countries = request.form.get("countries", "").strip() or None
vcpu = _parse_optional_int(request.form.get("vcpu"))
memory_gb = _parse_optional_int(request.form.get("memory_gb"))
storage_gb = _parse_optional_int(request.form.get("storage_gb"))
bandwidth_mbps = _parse_optional_int(request.form.get("bandwidth_mbps"))
traffic = request.form.get("traffic", "").strip() or None
price_cny = _parse_optional_float(request.form.get("price_cny"))
price_usd = _parse_optional_float(request.form.get("price_usd"))
currency = request.form.get("currency", "CNY").strip() or "CNY"
official_url = request.form.get("official_url", "").strip() or None
provider = None
if provider_id:
provider = db.session.get(Provider, provider_id)
if not provider:
providers = Provider.query.order_by(Provider.name).all()
return render_template(
"admin/plan_form.html",
plan=plan,
country_tags=COUNTRY_TAGS,
providers=providers,
preselected_provider_id=provider_id,
error="请选择厂商",
)
if plan is None:
plan = VPSPlan(
provider_id=provider.id,
provider=provider.name,
region=None,
name=None,
vcpu=vcpu,
memory_gb=memory_gb,
storage_gb=storage_gb,
bandwidth_mbps=bandwidth_mbps,
traffic=traffic,
price_cny=price_cny,
price_usd=price_usd,
currency=currency,
official_url=official_url,
countries=countries,
)
db.session.add(plan)
else:
plan.provider_id = provider.id
plan.provider = provider.name
plan.region = None
plan.name = None
plan.vcpu = vcpu
plan.memory_gb = memory_gb
plan.storage_gb = storage_gb
plan.bandwidth_mbps = bandwidth_mbps
plan.traffic = traffic
plan.price_cny = price_cny
plan.price_usd = price_usd
plan.currency = currency
plan.official_url = official_url
plan.countries = countries
db.session.flush()
_record_price_history(plan, source="manual")
db.session.commit()
# 若从厂商详情页进入添加,保存后返回该厂商详情
from_provider_id = request.form.get("from_provider_id", type=int)
if from_provider_id:
return redirect(url_for("admin_provider_detail", provider_id=from_provider_id))
return redirect(url_for("admin_dashboard"))
@app.route("/admin/plan/<int:plan_id>/delete", methods=["POST"])
@admin_required
def admin_plan_delete(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
PriceHistory.query.filter_by(plan_id=plan_id).delete()
db.session.delete(plan)
db.session.commit()
return redirect(url_for("admin_dashboard"))
# ---------- Excel 导出 / 导入 ----------
EXCEL_HEADERS = [
"厂商", "厂商官网", "国家", "vCPU", "内存GB", "存储GB", "带宽Mbps", "流量",
"月付人民币", "月付美元", "货币", "配置官网",
]
@app.route("/admin/export/excel")
@admin_required
def admin_export_excel():
wb = Workbook()
ws = wb.active
ws.title = "配置"
ws.append(EXCEL_HEADERS)
plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all()
for p in plans:
provider_url = (p.provider_rel.official_url if p.provider_rel else "") or ""
ws.append([
p.provider_name,
provider_url or "",
p.countries or "",
p.vcpu if p.vcpu is not None else "",
p.memory_gb if p.memory_gb is not None else "",
p.storage_gb if p.storage_gb is not None else "",
p.bandwidth_mbps if p.bandwidth_mbps is not None else "",
p.traffic or "",
p.price_cny if p.price_cny is not None else "",
p.price_usd if p.price_usd is not None else "",
p.currency or "CNY",
p.official_url or "",
])
buf = io.BytesIO()
wb.save(buf)
buf.seek(0)
return send_file(
buf,
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
as_attachment=True,
download_name="vps_配置_导出.xlsx",
)
def _num(v):
if v is None or v == "":
return None
try:
return int(float(v))
except (ValueError, TypeError):
return None
def _float(v):
if v is None or v == "":
return None
try:
return float(v)
except (ValueError, TypeError):
return None
def _opt_text(v):
if v is None:
return None
s = str(v).strip()
return s or None
def _safe_str(v):
if v is None:
return ""
return str(v).strip()
def _eq_optional(a, b):
if a is None and b is None:
return True
if a is None or b is None:
return False
if isinstance(a, float) or isinstance(b, float):
return abs(float(a) - float(b)) < 1e-9
return a == b
def _record_price_history(plan, source):
if plan is None:
return
if plan.price_cny is None and plan.price_usd is None:
return
if plan.id is None:
db.session.flush()
latest = (
PriceHistory.query
.filter_by(plan_id=plan.id)
.order_by(PriceHistory.captured_at.desc(), PriceHistory.id.desc())
.first()
)
currency = _opt_text(plan.currency) or "CNY"
if latest:
same_currency = _safe_str(latest.currency).upper() == _safe_str(currency).upper()
if same_currency and _eq_optional(latest.price_cny, plan.price_cny) and _eq_optional(latest.price_usd, plan.price_usd):
return
db.session.add(PriceHistory(
plan_id=plan.id,
price_cny=plan.price_cny,
price_usd=plan.price_usd,
currency=currency,
source=source,
))
def _display_val(v):
if v is None or v == "":
return ""
if isinstance(v, float):
s = "{:.2f}".format(v).rstrip("0").rstrip(".")
return s if s else "0"
return str(v)
def _row_identity_key(row):
return (
_safe_str(row.get("厂商")),
_num(row.get("vCPU")),
_num(row.get("内存GB")),
_num(row.get("存储GB")),
_num(row.get("带宽Mbps")),
_safe_str(row.get("国家")),
_safe_str(row.get("流量")),
)
def _plan_identity_key(plan):
return (
_safe_str(plan.provider_name),
plan.vcpu,
plan.memory_gb,
plan.storage_gb,
plan.bandwidth_mbps,
_safe_str(plan.countries),
_safe_str(plan.traffic),
)
def _plan_diff(plan, row):
"""返回导入行相对于现有 plan 的差异列表。"""
fields = [
("国家", "countries", _opt_text(row.get("国家"))),
("vCPU", "vcpu", _num(row.get("vCPU"))),
("内存GB", "memory_gb", _num(row.get("内存GB"))),
("存储GB", "storage_gb", _num(row.get("存储GB"))),
("带宽Mbps", "bandwidth_mbps", _num(row.get("带宽Mbps"))),
("流量", "traffic", _opt_text(row.get("流量"))),
("月付人民币", "price_cny", _float(row.get("月付人民币"))),
("月付美元", "price_usd", _float(row.get("月付美元"))),
("货币", "currency", _opt_text(row.get("货币")) or "CNY"),
("配置官网", "official_url", _opt_text(row.get("配置官网"))),
]
diffs = []
for label, attr, new_value in fields:
old_value = getattr(plan, attr)
if not _eq_optional(old_value, new_value):
diffs.append({
"label": label,
"old": old_value,
"new": new_value,
"old_display": _display_val(old_value),
"new_display": _display_val(new_value),
})
return diffs
def _upsert_provider_from_row(row):
provider_name = _safe_str(row.get("厂商"))
if not provider_name:
return None
imported_provider_url = _opt_text(row.get("厂商官网"))
provider = Provider.query.filter_by(name=provider_name).first()
if not provider:
provider = Provider(name=provider_name, official_url=imported_provider_url)
db.session.add(provider)
db.session.flush()
elif imported_provider_url and provider.official_url != imported_provider_url:
provider.official_url = imported_provider_url
return provider
def _fill_plan_from_row(plan, row, provider):
plan.provider_id = provider.id
plan.provider = provider.name
plan.region = None
plan.name = None
plan.vcpu = _num(row.get("vCPU"))
plan.memory_gb = _num(row.get("内存GB"))
plan.storage_gb = _num(row.get("存储GB"))
plan.bandwidth_mbps = _num(row.get("带宽Mbps"))
plan.traffic = _opt_text(row.get("流量"))
plan.price_cny = _float(row.get("月付人民币"))
plan.price_usd = _float(row.get("月付美元"))
plan.currency = _opt_text(row.get("货币")) or "CNY"
plan.official_url = _opt_text(row.get("配置官网"))
plan.countries = _opt_text(row.get("国家"))
@app.route("/admin/import", methods=["GET", "POST"])
@admin_required
def admin_import():
if request.method == "GET":
return render_template("admin/import.html")
f = request.files.get("file")
if not f or not f.filename:
return render_template("admin/import.html", error="请选择 Excel 文件")
if not f.filename.lower().endswith(".xlsx"):
return render_template("admin/import.html", error="请上传 .xlsx 文件")
try:
wb = load_workbook(io.BytesIO(f.read()), read_only=True, data_only=True)
ws = wb.active
rows = list(ws.iter_rows(min_row=2, values_only=True))
except Exception as e:
return render_template("admin/import.html", error="解析失败: {}".format(str(e)))
headers = EXCEL_HEADERS
parsed = []
for row in rows:
if not any(cell is not None and str(cell).strip() for cell in row):
continue
d = {}
for i, h in enumerate(headers):
if i < len(row):
v = row[i]
if v is not None and hasattr(v, "strip"):
v = v.strip()
d[h] = v
else:
d[h] = None
parsed.append(d)
if not parsed:
return render_template("admin/import.html", error="文件中没有有效数据行")
plans = VPSPlan.query.all()
plan_index = {}
for p in plans:
key = _plan_identity_key(p)
if key not in plan_index:
plan_index[key] = p
seen_row_keys = set()
preview_items = []
for row in parsed:
key = _row_identity_key(row)
provider_name = key[0]
if not provider_name:
continue
if key in seen_row_keys:
continue
seen_row_keys.add(key)
matched = plan_index.get(key)
if not matched:
preview_items.append({
"action": "add",
"row": row,
"changes": [],
"provider_url_changed": False,
})
continue
changes = _plan_diff(matched, row)
imported_provider_url = _opt_text(row.get("厂商官网"))
old_provider_url = _opt_text(matched.provider_rel.official_url if matched.provider_rel else None)
provider_url_changed = bool(imported_provider_url and imported_provider_url != old_provider_url)
if changes or provider_url_changed:
preview_items.append({
"action": "update",
"plan_id": matched.id,
"row": row,
"changes": changes,
"provider_url_changed": provider_url_changed,
"provider_url_old": old_provider_url,
"provider_url_new": imported_provider_url,
})
session["import_preview"] = preview_items
return redirect(url_for("admin_import_preview"))
@app.route("/admin/import/preview", methods=["GET", "POST"])
@admin_required
def admin_import_preview():
preview_items = session.get("import_preview") or []
add_count = sum(1 for x in preview_items if x.get("action") == "add")
update_count = sum(1 for x in preview_items if x.get("action") == "update")
if request.method == "GET":
return render_template(
"admin/import_preview.html",
rows=list(enumerate(preview_items)),
add_count=add_count,
update_count=update_count,
)
selected = request.form.getlist("row_index")
if not selected:
return render_template(
"admin/import_preview.html",
rows=list(enumerate(preview_items)),
add_count=add_count,
update_count=update_count,
error="请至少勾选一行",
)
indices = sorted(set(int(x) for x in selected if x.isdigit()))
add_applied = 0
update_applied = 0
for i in indices:
if i < 0 or i >= len(preview_items):
continue
item = preview_items[i]
row = item.get("row") or {}
provider = _upsert_provider_from_row(row)
if not provider:
continue
action = item.get("action")
if action == "update":
plan = db.session.get(VPSPlan, item.get("plan_id"))
if not plan:
plan = VPSPlan()
db.session.add(plan)
add_applied += 1
else:
update_applied += 1
_fill_plan_from_row(plan, row, provider)
else:
plan = VPSPlan()
_fill_plan_from_row(plan, row, provider)
db.session.add(plan)
add_applied += 1
_record_price_history(plan, source="import")
db.session.commit()
session.pop("import_preview", None)
msg = "已新增 {} 条,更新 {} 条配置".format(add_applied, update_applied)
return redirect(url_for("admin_dashboard") + "?" + urlencode({"msg": msg}))
if __name__ == "__main__":
app.run(debug=True, port=5001)