Files
vps_web/app.py
ddrwode 4210e0d70a 哈哈
2026-02-10 17:54:22 +08:00

4547 lines
164 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""云服务器价格对比 - Flask 应用"""
import io
import os
from time import monotonic
from datetime import datetime, timezone
from email.utils import format_datetime
from urllib.parse import urlencode
from xml.sax.saxutils import escape as xml_escape
from flask import (
Flask,
abort,
jsonify,
make_response,
redirect,
render_template,
request,
send_file,
send_from_directory,
session,
url_for,
)
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import text, func, or_
from sqlalchemy.orm import joinedload
from markupsafe import Markup, escape
try:
import markdown as py_markdown
except Exception:
py_markdown = None
try:
import bleach
except Exception:
bleach = None
from config import Config
from extensions import db
from openpyxl import Workbook
from openpyxl import load_workbook
app = Flask(__name__)
app.config.from_object(Config)
# 部署在 Nginx 等反向代理后时,信任 X-Forwarded-* 头HTTPS、真实 IP
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
db.init_app(app)
from models import (
VPSPlan,
Provider,
PriceHistory,
User,
ForumPost,
ForumComment,
ForumCategory,
ForumReport,
ForumNotification,
ForumPostLike,
ForumPostBookmark,
) # noqa: E402
def _ensure_mysql_columns():
"""为已有 MySQL 表添加缺失列,避免 1054 Unknown column。"""
try:
engine = db.engine
if engine.dialect.name != "mysql":
return
with engine.connect() as conn:
for col, spec in [
("traffic", "VARCHAR(64) NULL"),
("countries", "VARCHAR(255) NULL"),
("provider_id", "INT NULL"),
]:
try:
conn.execute(text("ALTER TABLE vps_plans ADD COLUMN {} {}".format(col, spec)))
conn.commit()
except Exception:
conn.rollback()
for col, spec in [
("name", "VARCHAR(128) NULL"),
("region", "VARCHAR(128) NULL"),
("price_cny", "DOUBLE NULL"),
("price_usd", "DOUBLE NULL"),
]:
try:
conn.execute(text("ALTER TABLE vps_plans MODIFY COLUMN {} {}".format(col, spec)))
conn.commit()
except Exception:
conn.rollback()
except Exception:
pass # 表不存在或非 MySQL 时忽略
def _ensure_forum_columns():
"""为已有论坛表补齐后续新增字段。"""
try:
engine = db.engine
dialect = engine.dialect.name
with engine.connect() as conn:
if dialect == "mysql":
alters = [
"ALTER TABLE forum_posts ADD COLUMN category VARCHAR(32) NOT NULL DEFAULT '综合讨论'",
"ALTER TABLE forum_posts ADD COLUMN view_count INT NOT NULL DEFAULT 0",
]
else:
alters = [
"ALTER TABLE forum_posts ADD COLUMN category TEXT DEFAULT '综合讨论'",
"ALTER TABLE forum_posts ADD COLUMN view_count INTEGER DEFAULT 0",
]
for sql in alters:
try:
conn.execute(text(sql))
conn.commit()
except Exception:
conn.rollback()
except Exception:
pass
def _ensure_forum_manage_columns():
"""为用户与论坛帖子补齐管理字段(封禁/置顶/精华/锁帖)。"""
try:
engine = db.engine
dialect = engine.dialect.name
with engine.connect() as conn:
if dialect == "mysql":
alters = [
"ALTER TABLE users ADD COLUMN is_banned TINYINT(1) NOT NULL DEFAULT 0",
"ALTER TABLE users ADD COLUMN banned_at DATETIME NULL",
"ALTER TABLE users ADD COLUMN banned_reason VARCHAR(255) NULL",
"ALTER TABLE forum_posts ADD COLUMN is_pinned TINYINT(1) NOT NULL DEFAULT 0",
"ALTER TABLE forum_posts ADD COLUMN is_featured TINYINT(1) NOT NULL DEFAULT 0",
"ALTER TABLE forum_posts ADD COLUMN is_locked TINYINT(1) NOT NULL DEFAULT 0",
]
else:
alters = [
"ALTER TABLE users ADD COLUMN is_banned INTEGER DEFAULT 0",
"ALTER TABLE users ADD COLUMN banned_at DATETIME",
"ALTER TABLE users ADD COLUMN banned_reason TEXT",
"ALTER TABLE forum_posts ADD COLUMN is_pinned INTEGER DEFAULT 0",
"ALTER TABLE forum_posts ADD COLUMN is_featured INTEGER DEFAULT 0",
"ALTER TABLE forum_posts ADD COLUMN is_locked INTEGER DEFAULT 0",
]
for sql in alters:
try:
conn.execute(text(sql))
conn.commit()
except Exception:
conn.rollback()
except Exception:
pass
DEFAULT_FORUM_CATEGORIES = [
"综合讨论",
"VPS 评测",
"优惠活动",
"运维经验",
"新手提问",
]
def _ensure_forum_categories_seed():
"""初始化论坛默认分类。"""
try:
if ForumCategory.query.count() > 0:
return
for idx, name in enumerate(DEFAULT_FORUM_CATEGORIES, start=1):
db.session.add(ForumCategory(
name=name,
sort_order=idx * 10,
is_active=True,
))
db.session.commit()
except Exception:
db.session.rollback()
def _ensure_price_history_baseline():
"""为历史数据补首条价格快照,便于后续计算涨跌。"""
try:
missing = (
db.session.query(VPSPlan)
.outerjoin(PriceHistory, PriceHistory.plan_id == VPSPlan.id)
.filter(PriceHistory.id.is_(None))
.all()
)
if not missing:
return
for p in missing:
if p.price_cny is None and p.price_usd is None:
continue
db.session.add(PriceHistory(
plan_id=p.id,
price_cny=p.price_cny,
price_usd=p.price_usd,
currency=(p.currency or "CNY"),
source="bootstrap",
))
db.session.commit()
except Exception:
db.session.rollback()
# 启动时自动创建表(若不存在),并为已有表补列
with app.app_context():
db.create_all()
_ensure_mysql_columns()
_ensure_forum_columns()
_ensure_forum_manage_columns()
_ensure_forum_categories_seed()
_ensure_price_history_baseline()
ADMIN_PASSWORD = app.config["ADMIN_PASSWORD"]
SITE_URL = app.config["SITE_URL"]
SITE_NAME = app.config["SITE_NAME"]
# 国家/区域标签,供后台表单选择
COUNTRY_TAGS = [
"中国大陆", "中国香港", "中国台湾", "美国", "日本", "新加坡", "韩国",
"德国", "英国", "法国", "荷兰", "加拿大", "澳大利亚", "印度", "其他",
]
PRICE_SOURCE_LABELS = {
"manual": "手工编辑",
"import": "Excel 导入",
"bootstrap": "基线",
}
FORUM_REPORT_REASONS = [
"垃圾广告",
"辱骂攻击",
"违法违规",
"虚假信息",
"其他",
]
FORUM_REPORT_STATUS_LABELS = {
"pending": "待处理",
"processed": "已处理",
"rejected": "已驳回",
}
FORUM_NOTIFICATION_TYPE_LABELS = {
"post_commented": "帖子新评论",
"thread_replied": "主题新回复",
"report_processed": "举报处理结果",
"content_removed": "内容处理通知",
}
# 论坛高频数据短时缓存(进程内)
_FORUM_CACHE_TTL_CATEGORIES = 20.0
_FORUM_CACHE_TTL_SIDEBAR = 15.0
_FORUM_CACHE_TTL_NOTIF_COUNT = 30.0
_FORUM_CATEGORY_CACHE = {}
_FORUM_SIDEBAR_CACHE = {"expires_at": 0.0, "data": None}
_NOTIF_COUNT_CACHE = {} # user_id -> (count, expires_at)
_MARKDOWN_ALLOWED_TAGS = [
"p", "br", "hr",
"h1", "h2", "h3", "h4",
"strong", "em", "del",
"ul", "ol", "li",
"blockquote",
"pre", "code",
"a",
"table", "thead", "tbody", "tr", "th", "td",
]
_MARKDOWN_ALLOWED_ATTRS = {
"a": ["href", "title", "target", "rel"],
"code": ["class"],
"pre": ["class"],
}
_MARKDOWN_EXTENSIONS = [
"fenced_code",
"tables",
"sane_lists",
"nl2br",
]
FORUM_NOTIFICATION_TYPE_LABELS_EN = {
"post_commented": "New comment",
"thread_replied": "New reply",
"report_processed": "Report update",
"content_removed": "Content moderation",
}
# Sitemap 单个文件最大帖子条数(按语言拆分后可稳定低于 50k URL 上限)
SITEMAP_POSTS_PER_FILE = 25000
FORUM_CATEGORY_SEO_COPY = {
"综合讨论": {
"zh": "围绕 VPS 选型、采购和实践经验的综合讨论区。",
"en": "General discussions about VPS planning, buying, and operations.",
},
"VPS 评测": {
"zh": "集中查看 VPS 评测、性能体验与线路反馈。",
"en": "Hands-on VPS reviews, benchmarks, and network feedback.",
},
"优惠活动": {
"zh": "跟踪厂商促销、折扣活动与限时优惠。",
"en": "Track provider promotions, discounts, and limited-time deals.",
},
"运维经验": {
"zh": "分享部署、监控、故障排查与稳定性实践。",
"en": "Operations playbooks for deployment, monitoring, and troubleshooting.",
},
"新手提问": {
"zh": "面向新手的配置建议与入门答疑。",
"en": "Beginner-friendly Q&A for VPS setup and decision making.",
},
}
def _get_lang():
lang = (
request.args.get("lang")
or request.form.get("lang")
or session.get("lang")
or "zh"
)
lang = (lang or "zh").strip().lower()
if lang not in ("zh", "en"):
lang = "zh"
session["lang"] = lang
return lang
def _pick_lang(zh_text, en_text, lang=None):
active_lang = lang or _get_lang()
return en_text if active_lang == "en" else zh_text
def _lang_url(lang_code):
target_lang = (lang_code or "").strip().lower()
if target_lang not in ("zh", "en"):
target_lang = "zh"
params = {}
if request.view_args:
params.update(request.view_args)
params.update(request.args.to_dict(flat=True))
params["lang"] = target_lang
try:
if request.endpoint:
return url_for(request.endpoint, **params)
except Exception:
pass
return "{}?{}".format(request.path, urlencode(params))
def _site_root_url():
return (SITE_URL or "").rstrip("/")
def _absolute_url_for(endpoint, **values):
return "{}{}".format(_site_root_url(), url_for(endpoint, **values))
def _public_url(endpoint, lang="zh", **params):
values = {}
for key, value in params.items():
if value is None:
continue
if isinstance(value, str) and not value.strip():
continue
values[key] = value
if (lang or "zh").strip().lower() == "en":
values["lang"] = "en"
else:
values.pop("lang", None)
return _absolute_url_for(endpoint, **values)
def _alternate_lang_links(endpoint, **params):
return {
"zh-CN": _public_url(endpoint, lang="zh", **params),
"en-US": _public_url(endpoint, lang="en", **params),
"x-default": _public_url(endpoint, lang="zh", **params),
}
def _iso8601_utc(dt):
if not dt:
return None
if dt.tzinfo is None:
aware = dt.replace(tzinfo=timezone.utc)
else:
aware = dt.astimezone(timezone.utc)
return aware.strftime("%Y-%m-%dT%H:%M:%SZ")
def _rfc2822_utc(dt):
if not dt:
return None
if dt.tzinfo is None:
aware = dt.replace(tzinfo=timezone.utc)
else:
aware = dt.astimezone(timezone.utc)
return format_datetime(aware, usegmt=True)
def _plain_excerpt(text, limit=160):
raw = " ".join((text or "").split())
if len(raw) <= limit:
return raw
return "{}".format(raw[:max(limit - 1, 0)].rstrip())
def _forum_category_description(category_name, lang):
category = (category_name or "").strip()
if not category:
return _pick_lang(
"聚合 VPS 评测、运维经验与采购讨论,帮助团队完成云资源选型。",
"A VPS community for reviews, operations knowledge, and procurement discussions.",
lang,
)
preset = FORUM_CATEGORY_SEO_COPY.get(category) or {}
if lang == "en":
return preset.get("en") or "Community topics tagged '{}' for VPS reviews, operations, and buying decisions.".format(category)
return preset.get("zh") or "浏览“{}”分类下的 VPS 讨论、评测与采购经验。".format(category)
def _forum_index_keywords(lang, active_tab="latest", selected_category=None):
if lang == "en":
keywords = [
"VPS forum",
"VPS community",
"cloud server reviews",
"VPS buying guide",
"VPS operations",
]
tab_map = {
"latest": "latest VPS topics",
"new": "new VPS posts",
"hot": "popular VPS discussions",
}
else:
keywords = [
"VPS论坛",
"VPS社区",
"云服务器评测",
"VPS采购建议",
"VPS运维经验",
]
tab_map = {
"latest": "最新帖子",
"new": "新帖",
"hot": "热门讨论",
}
tab_keyword = tab_map.get(active_tab)
if tab_keyword:
keywords.append(tab_keyword)
if selected_category:
keywords.append(selected_category)
return ", ".join(dict.fromkeys(keywords))
def _forum_breadcrumb_schema(lang, selected_category=None, post=None, post_url=None):
items = [
{
"@type": "ListItem",
"position": 1,
"name": _pick_lang("首页", "Home", lang),
"item": _public_url("index", lang=lang),
},
{
"@type": "ListItem",
"position": 2,
"name": _pick_lang("论坛", "Forum", lang),
"item": _public_url("forum_index", lang=lang),
},
]
if selected_category:
items.append({
"@type": "ListItem",
"position": len(items) + 1,
"name": selected_category,
"item": _public_url("forum_index", lang=lang, category=selected_category),
})
if post and post_url:
items.append({
"@type": "ListItem",
"position": len(items) + 1,
"name": post.title,
"item": post_url,
})
return {
"@type": "BreadcrumbList",
"itemListElement": items,
}
def _sitemap_alternates(endpoint, **params):
links = _alternate_lang_links(endpoint, **params)
return [{"hreflang": k, "href": v} for k, v in links.items()]
def _build_sitemap_urlset_xml(url_items):
lines = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9" '
'xmlns:xhtml="http://www.w3.org/1999/xhtml">',
]
for item in url_items:
lines.append(" <url>")
lines.append(" <loc>{}</loc>".format(xml_escape(item["loc"])))
if item.get("lastmod"):
lines.append(" <lastmod>{}</lastmod>".format(item["lastmod"]))
if item.get("changefreq"):
lines.append(" <changefreq>{}</changefreq>".format(item["changefreq"]))
if item.get("priority"):
lines.append(" <priority>{}</priority>".format(item["priority"]))
for alt in item.get("alternates") or []:
href = alt.get("href")
hreflang = alt.get("hreflang")
if not href or not hreflang:
continue
lines.append(
' <xhtml:link rel="alternate" hreflang="{}" href="{}" />'.format(
xml_escape(hreflang),
xml_escape(href),
)
)
lines.append(" </url>")
lines.append("</urlset>")
return "\n".join(lines)
def _build_sitemap_index_xml(entries):
lines = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<sitemapindex xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">',
]
for item in entries:
lines.append(" <sitemap>")
lines.append(" <loc>{}</loc>".format(xml_escape(item["loc"])))
if item.get("lastmod"):
lines.append(" <lastmod>{}</lastmod>".format(item["lastmod"]))
lines.append(" </sitemap>")
lines.append("</sitemapindex>")
return "\n".join(lines)
def _latest_forum_content_datetime():
return db.session.query(func.max(func.coalesce(ForumPost.updated_at, ForumPost.created_at))).scalar()
def _forum_sitemap_total_pages():
total_posts = ForumPost.query.count()
return max((total_posts + SITEMAP_POSTS_PER_FILE - 1) // SITEMAP_POSTS_PER_FILE, 1)
def _should_noindex_path(path):
target = path or ""
if target.startswith("/admin"):
return True
if target.startswith("/api/"):
return True
if target in {"/login", "/register", "/profile", "/me", "/notifications"}:
return True
if target.startswith("/notification/"):
return True
if target == "/forum/post/new":
return True
if target == "/forum/report":
return True
if target.startswith("/forum/post/") and target.endswith("/edit"):
return True
if target.startswith("/forum/comment/") and target.endswith("/edit"):
return True
return False
@app.after_request
def _append_response_headers(response):
response.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
response.headers.setdefault("X-Content-Type-Options", "nosniff")
response.headers.setdefault("X-Frame-Options", "SAMEORIGIN")
if _should_noindex_path(request.path):
response.headers["X-Robots-Tag"] = "noindex, nofollow, noarchive"
return response
def _notification_type_label(notif_type, lang=None):
active_lang = lang or _get_lang()
if active_lang == "en":
return FORUM_NOTIFICATION_TYPE_LABELS_EN.get(notif_type, notif_type or "Notification")
return FORUM_NOTIFICATION_TYPE_LABELS.get(notif_type, notif_type or "通知")
@app.template_global("l")
def _template_pick_lang(zh_text, en_text):
active_lang = session.get("lang", "zh")
if active_lang not in ("zh", "en"):
active_lang = "zh"
return en_text if active_lang == "en" else zh_text
@app.template_global("lang_url")
def _template_lang_url(lang_code):
return _lang_url(lang_code)
def _render_markdown_html(text):
raw = (text or "").strip()
if not raw:
return Markup("")
if py_markdown is None or bleach is None:
# 依赖缺失时回退为安全纯文本显示,避免服务启动失败。
return Markup("<p>{}</p>".format(str(escape(raw)).replace("\n", "<br>")))
html = py_markdown.markdown(raw, extensions=_MARKDOWN_EXTENSIONS)
clean_html = bleach.clean(
html,
tags=_MARKDOWN_ALLOWED_TAGS,
attributes=_MARKDOWN_ALLOWED_ATTRS,
protocols=["http", "https", "mailto"],
strip=True,
)
return Markup(clean_html)
@app.template_filter("markdown_html")
def markdown_html_filter(text):
return _render_markdown_html(text)
def _get_current_user():
user_id = session.get("user_id")
if not user_id:
return None
user = db.session.get(User, user_id)
if not user:
session.pop("user_id", None)
return user
def _is_banned_user(user):
return bool(user and bool(user.is_banned))
def _user_ban_message(user):
if not user:
return "账号状态异常"
reason = (user.banned_reason or "").strip()
if reason:
return "账号已被封禁:{}".format(reason)
return "账号已被封禁"
def _is_valid_username(username):
if not username:
return False
if len(username) < 3 or len(username) > 20:
return False
return all(ch.isalnum() or ch == "_" for ch in username)
def _safe_next_url(default_endpoint):
nxt = (request.values.get("next") or "").strip()
if nxt.startswith("/") and not nxt.startswith("//"):
return nxt
return url_for(default_endpoint)
def _safe_form_next_url(default_url):
nxt = (request.form.get("next") or request.args.get("next") or "").strip()
if nxt.startswith("/") and not nxt.startswith("//"):
return nxt
return default_url
def _create_notification(
user_id,
notif_type,
message,
actor_id=None,
post_id=None,
comment_id=None,
report_id=None,
):
"""创建站内通知(由调用方控制事务提交)。"""
if not user_id or not message:
return
db.session.add(ForumNotification(
user_id=user_id,
actor_id=actor_id,
notif_type=notif_type,
post_id=post_id,
comment_id=comment_id,
report_id=report_id,
message=message[:255],
is_read=False,
))
_NOTIF_COUNT_CACHE.pop(user_id, None)
def _notification_target_url(notification):
# 避免通知列表页按条检查帖子存在性导致 N+1 查询。
if notification.post_id:
return url_for("forum_post_detail", post_id=notification.post_id)
return url_for("user_notifications")
def _load_forum_categories(active_only=True):
"""读取论坛分类(默认只读启用项)。"""
try:
q = ForumCategory.query
if active_only:
q = q.filter_by(is_active=True)
return q.order_by(ForumCategory.sort_order.asc(), ForumCategory.id.asc()).all()
except Exception:
return []
def _get_forum_category_names(active_only=True):
cache_key = "active" if active_only else "all"
now_ts = monotonic()
cached = _FORUM_CATEGORY_CACHE.get(cache_key)
if cached and cached[0] > now_ts:
return list(cached[1])
rows = _load_forum_categories(active_only=active_only)
names = [x.name for x in rows if x.name]
if names:
_FORUM_CATEGORY_CACHE[cache_key] = (now_ts + _FORUM_CACHE_TTL_CATEGORIES, tuple(names))
return names
# 若全部被停用,前台仍回退到已存在分类,避免下拉为空。
if active_only:
rows = _load_forum_categories(active_only=False)
names = [x.name for x in rows if x.name]
if names:
_FORUM_CATEGORY_CACHE[cache_key] = (now_ts + _FORUM_CACHE_TTL_CATEGORIES, tuple(names))
return names
fallback = list(DEFAULT_FORUM_CATEGORIES)
_FORUM_CATEGORY_CACHE[cache_key] = (now_ts + _FORUM_CACHE_TTL_CATEGORIES, tuple(fallback))
return fallback
def _get_notifications_unread_count(user_id):
"""已登录用户未读通知数,短时缓存减少每次请求的 count 查询。"""
if not user_id:
return 0
now_ts = monotonic()
entry = _NOTIF_COUNT_CACHE.get(user_id)
if entry is not None and entry[1] > now_ts:
return entry[0]
count = ForumNotification.query.filter_by(user_id=user_id, is_read=False).count()
_NOTIF_COUNT_CACHE[user_id] = (count, now_ts + _FORUM_CACHE_TTL_NOTIF_COUNT)
return count
@app.context_processor
def inject_global_user():
lang = _get_lang()
current_user = _get_current_user()
notifications_unread_count = _get_notifications_unread_count(current_user.id if current_user else None)
return {
"current_user": current_user,
"admin_logged_in": bool(session.get("admin_logged_in")),
"forum_categories": _get_forum_category_names(active_only=True),
"forum_report_reasons": FORUM_REPORT_REASONS,
"notifications_unread_count": notifications_unread_count,
"lang": lang,
}
def _humanize_time(dt, lang=None):
if not dt:
return ""
active_lang = lang or session.get("lang", "zh")
if dt.tzinfo is None:
# 兼容历史“无时区”时间:按 UTC 解释后与当前 UTC 进行比较,避免 utcnow 弃用告警
dt = dt.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
else:
now = datetime.now(dt.tzinfo)
delta = now - dt
seconds = int(delta.total_seconds())
if seconds < 0:
return dt.strftime("%Y-%m-%d")
if seconds < 60:
return "just now" if active_lang == "en" else "刚刚"
if seconds < 3600:
mins = seconds // 60
return "{}m ago".format(mins) if active_lang == "en" else "{} 分钟前".format(mins)
if seconds < 86400:
hours = seconds // 3600
return "{}h ago".format(hours) if active_lang == "en" else "{} 小时前".format(hours)
if seconds < 86400 * 14:
days = seconds // 86400
return "{}d ago".format(days) if active_lang == "en" else "{} 天前".format(days)
return dt.strftime("%Y-%m-%d")
def _build_forum_post_cards(rows, lang=None):
"""将论坛查询结果行转换为列表卡片数据。"""
active_lang = lang or _get_lang()
cards = []
for post, reply_count, latest_activity, author_name, like_count, bookmark_count in rows:
latest_activity = latest_activity or post.created_at
username = author_name or _pick_lang("用户", "User", active_lang)
cards.append({
"post": post,
"reply_count": int(reply_count or 0),
"view_count": int(post.view_count or 0),
"like_count": int(like_count or 0),
"bookmark_count": int(bookmark_count or 0),
"latest_activity": latest_activity,
"latest_activity_text": _humanize_time(latest_activity, lang=active_lang),
"author_name": username,
"author_initial": (username[0] if username else "?").upper(),
})
return cards
def _build_forum_url(tab="latest", category=None, q=None, page=1, per_page=20):
"""构建论坛列表页链接,并尽量保持 URL 简洁。"""
params = {}
if (tab or "latest") != "latest":
params["tab"] = tab
if category:
params["category"] = category
if q:
params["q"] = q
if page and int(page) > 1:
params["page"] = int(page)
if per_page:
size = int(per_page)
if size != 20:
params["per_page"] = size
return url_for("forum_index", **params)
def _query_forum_post_rows(active_tab="latest", selected_category=None, search_query=None, author_id=None):
"""论坛列表查询:支持最新/新帖/热门 + 分类过滤 + 关键词搜索。"""
comment_stats_subq = (
db.session.query(
ForumComment.post_id.label("post_id"),
func.count(ForumComment.id).label("comment_count"),
func.max(ForumComment.created_at).label("latest_comment_at"),
)
.group_by(ForumComment.post_id)
.subquery()
)
comment_count_expr = func.coalesce(comment_stats_subq.c.comment_count, 0)
latest_activity_expr = func.coalesce(comment_stats_subq.c.latest_comment_at, ForumPost.created_at)
like_stats_subq = (
db.session.query(
ForumPostLike.post_id.label("post_id"),
func.count(ForumPostLike.id).label("like_count"),
)
.group_by(ForumPostLike.post_id)
.subquery()
)
bookmark_stats_subq = (
db.session.query(
ForumPostBookmark.post_id.label("post_id"),
func.count(ForumPostBookmark.id).label("bookmark_count"),
)
.group_by(ForumPostBookmark.post_id)
.subquery()
)
like_count_expr = func.coalesce(like_stats_subq.c.like_count, 0)
bookmark_count_expr = func.coalesce(bookmark_stats_subq.c.bookmark_count, 0)
q = (
db.session.query(
ForumPost,
comment_count_expr.label("comment_count"),
latest_activity_expr.label("latest_activity"),
User.username.label("author_name"),
like_count_expr.label("like_count"),
bookmark_count_expr.label("bookmark_count"),
)
.outerjoin(comment_stats_subq, comment_stats_subq.c.post_id == ForumPost.id)
.outerjoin(like_stats_subq, like_stats_subq.c.post_id == ForumPost.id)
.outerjoin(bookmark_stats_subq, bookmark_stats_subq.c.post_id == ForumPost.id)
.outerjoin(User, User.id == ForumPost.user_id)
)
if selected_category:
q = q.filter(ForumPost.category == selected_category)
if author_id is not None:
q = q.filter(ForumPost.user_id == author_id)
if search_query:
pattern = "%{}%".format(search_query)
q = q.filter(
or_(
ForumPost.title.ilike(pattern),
ForumPost.content.ilike(pattern),
User.username.ilike(pattern),
)
)
if active_tab == "hot":
q = q.order_by(
ForumPost.is_pinned.desc(),
comment_count_expr.desc(),
like_count_expr.desc(),
ForumPost.view_count.desc(),
latest_activity_expr.desc(),
ForumPost.id.desc(),
)
elif active_tab == "new":
q = q.order_by(ForumPost.is_pinned.desc(), ForumPost.created_at.desc(), ForumPost.id.desc())
else:
q = q.order_by(ForumPost.is_pinned.desc(), latest_activity_expr.desc(), ForumPost.id.desc())
return q
def _forum_sidebar_data():
now_ts = monotonic()
cached = _FORUM_SIDEBAR_CACHE.get("data")
if cached is not None and _FORUM_SIDEBAR_CACHE.get("expires_at", 0.0) > now_ts:
return dict(cached)
category_counts = (
db.session.query(ForumPost.category, func.count(ForumPost.id))
.group_by(ForumPost.category)
.order_by(func.count(ForumPost.id).desc())
.all()
)
active_users = (
db.session.query(User.username, func.count(ForumPost.id).label("post_count"))
.outerjoin(ForumPost, ForumPost.user_id == User.id)
.group_by(User.id)
.order_by(func.count(ForumPost.id).desc(), User.created_at.asc())
.limit(6)
.all()
)
data = {
"total_users": User.query.count(),
"total_posts": ForumPost.query.count(),
"total_comments": ForumComment.query.count(),
"category_counts": list(category_counts),
"active_users": list(active_users),
}
_FORUM_SIDEBAR_CACHE["data"] = data
_FORUM_SIDEBAR_CACHE["expires_at"] = now_ts + _FORUM_CACHE_TTL_SIDEBAR
return dict(data)
def _count_forum_posts(selected_category=None, search_query=None, author_id=None):
"""论坛列表总数查询:避免对重查询语句直接 count 导致慢查询。"""
q = (
db.session.query(func.count(ForumPost.id))
.select_from(ForumPost)
.outerjoin(User, User.id == ForumPost.user_id)
)
if selected_category:
q = q.filter(ForumPost.category == selected_category)
if author_id is not None:
q = q.filter(ForumPost.user_id == author_id)
if search_query:
pattern = "%{}%".format(search_query)
q = q.filter(
or_(
ForumPost.title.ilike(pattern),
ForumPost.content.ilike(pattern),
User.username.ilike(pattern),
)
)
return int(q.scalar() or 0)
def _currency_symbol(currency):
return "¥" if (currency or "CNY").upper() == "CNY" else "$"
def _format_money(currency, value):
return "{}{:.2f}".format(_currency_symbol(currency), float(value))
def _format_history_time(dt):
return dt.strftime("%Y-%m-%d %H:%M") if dt else ""
def _pick_price_pair(latest, previous=None):
if previous is None:
if latest.price_cny is not None:
return "CNY", float(latest.price_cny), None
if latest.price_usd is not None:
return "USD", float(latest.price_usd), None
return None, None, None
if latest.price_cny is not None and previous.price_cny is not None:
return "CNY", float(latest.price_cny), float(previous.price_cny)
if latest.price_usd is not None and previous.price_usd is not None:
return "USD", float(latest.price_usd), float(previous.price_usd)
return None, None, None
def _build_price_trend(latest, previous=None):
currency, current_value, previous_value = _pick_price_pair(latest, previous)
if currency is None or current_value is None:
return None
source = PRICE_SOURCE_LABELS.get(latest.source or "", latest.source or "未知来源")
meta = "当前 {} · {} · {}".format(
_format_money(currency, current_value),
_format_history_time(latest.captured_at),
source,
)
if previous_value is None:
return {
"direction": "new",
"delta_text": "首次记录",
"meta_text": meta,
}
diff = current_value - previous_value
if abs(diff) < 1e-9:
return {
"direction": "flat",
"delta_text": "→ 持平",
"meta_text": meta,
}
direction = "up" if diff > 0 else "down"
arrow = "" if diff > 0 else ""
sign = "+" if diff > 0 else "-"
delta_text = "{} {}{}{:,.2f}".format(arrow, sign, _currency_symbol(currency), abs(diff))
if abs(previous_value) > 1e-9:
pct = diff / previous_value * 100
delta_text += " ({:+.2f}%)".format(pct)
return {
"direction": direction,
"delta_text": delta_text,
"meta_text": meta,
}
def _build_plan_trend_map(plans):
plan_ids = [p.id for p in plans if p.id is not None]
if not plan_ids:
return {}
rows = (
PriceHistory.query
.filter(PriceHistory.plan_id.in_(plan_ids))
.order_by(PriceHistory.plan_id.asc(), PriceHistory.captured_at.desc(), PriceHistory.id.desc())
.all()
)
grouped = {}
for row in rows:
bucket = grouped.setdefault(row.plan_id, [])
if len(bucket) < 2:
bucket.append(row)
result = {}
for plan_id, bucket in grouped.items():
latest = bucket[0] if bucket else None
previous = bucket[1] if len(bucket) > 1 else None
trend = _build_price_trend(latest, previous) if latest else None
if trend:
result[plan_id] = trend
return result
def admin_required(f):
from functools import wraps
@wraps(f)
def wrapped(*args, **kwargs):
if not session.get("admin_logged_in"):
return redirect(url_for("admin_login"))
return f(*args, **kwargs)
return wrapped
def user_login_required(f):
from functools import wraps
@wraps(f)
def wrapped(*args, **kwargs):
user = _get_current_user()
if not user:
return redirect(url_for("user_login", next=request.path))
if _is_banned_user(user):
session.pop("user_id", None)
return redirect(url_for("user_login", next=request.path, error=_user_ban_message(user)))
return f(*args, **kwargs)
return wrapped
def _ensure_forum_interaction_user(user, post_id=None):
"""校验当前登录用户是否可进行论坛互动动作。"""
if not _is_banned_user(user):
return None
text = _user_ban_message(user)
if post_id:
return _forum_redirect_with_error(post_id, text)
return redirect(url_for("forum_index", error=text))
def _can_edit_post(user, post):
if not user or not post:
return False
return post.user_id == user.id
def _can_edit_comment(user, comment):
if not user or not comment:
return False
return comment.user_id == user.id
def _forum_redirect_with_error(post_id, text_msg):
return redirect(url_for("forum_post_detail", post_id=post_id, error=text_msg))
def _forum_redirect_with_msg(post_id, text_msg):
return redirect(url_for("forum_post_detail", post_id=post_id, msg=text_msg))
# 首页多语言文案(中文 / English
I18N = {
"zh": {
"meta_title": "全球 VPS 价格与配置对比 | 云价眼",
"meta_description": "面向技术与采购团队的云服务器价格情报平台:统一对比主流厂商 VPS 月付价格、配置与区域,支持快速筛选并直达官方购买页。",
"meta_keywords": "VPS价格对比,云服务器采购,云主机报价,云厂商比价,企业云成本,阿里云腾讯云DigitalOceanVultr",
"og_title": "云价眼 | 全球 VPS 价格与配置决策台",
"og_description": "为团队采购与技术选型提供可比价的云服务器数据视图,快速定位成本与性能平衡点。",
"og_locale": "zh_CN",
"schema_webapp_description": "面向团队采购与技术选型的 VPS 价格与配置对比平台。",
"schema_table_about": "云价眼 - 全球 VPS 价格与配置决策台",
"schema_table_name": "VPS 价格与配置对比表",
"schema_table_description": "主流云厂商 VPS 方案的配置、区域与月付价格数据",
"tagline": "面向团队采购的云服务器价格情报",
"hero_kicker": "企业云资源采购情报",
"hero_title": "全球 VPS 价格与配置决策台",
"hero_lede": "聚合主流云厂商公开报价,统一月付口径与配置维度,帮助技术与采购团队更快完成方案筛选与预算评估。",
"hero_trust_1": "主流云厂商持续收录",
"hero_trust_2": "统一月付与配置口径",
"hero_trust_3": "直达官方购买与文档",
"metric_total_plans": "可比较方案",
"metric_providers": "覆盖厂商",
"metric_regions": "覆盖区域",
"metric_lowest": "筛选后最低月价",
"filters_title": "采购筛选控制台",
"filters_subtitle": "按厂商、区域、资源规格与预算快速收敛候选方案。",
"table_caption": "价格与配置根据筛选条件实时刷新,用于初步比选与预算评估。",
"filter_provider": "供应商",
"filter_region": "区域市场",
"filter_memory": "内存 ≥",
"filter_price": "价格区间",
"filter_currency": "计价货币",
"search_placeholder": "搜索供应商、方案或区域...",
"all": "全部",
"unlimited": "不限",
"btn_reset": "清空筛选",
"btn_visit": "查看官网",
"th_provider": "供应商",
"th_country": "区域",
"th_config": "实例规格",
"th_vcpu": "vCPU",
"th_memory": "内存",
"th_storage": "存储",
"th_bandwidth": "带宽",
"th_traffic": "流量",
"th_price": "月付参考价",
"th_action": "官方链接",
"disclaimer": "* 数据来自公开页面与规则换算,可能存在时差或促销偏差;下单前请以厂商官网实时价格与条款为准。",
"footer_note": "仅作采购调研参考 · 请以各云厂商官网实时价格为准",
"contact_label": "联系我们",
"empty_state": "未找到匹配的方案",
"load_error": "数据加载失败,请刷新页面重试",
"search_label": "关键词检索",
"result_count_pattern": "当前筛选:{visible} / {total} 个方案",
"price_under50": "< ¥50",
"price_50_100": "¥50-100",
"price_100_300": "¥100-300",
"price_300_500": "¥300-500",
"price_over500": "> ¥500",
"cny": "人民币 (¥)",
"usd": "美元 ($)",
"no_js_note": "已显示基础数据表;开启 JavaScript 后可使用实时筛选、排序和动态统计。",
"faq_title": "常见问题(采购前必看)",
"faq_intro": "以下信息用于预算与方案初筛,正式采购前请再次核对厂商官网。",
"faq_q1": "价格和配置数据多久更新一次?",
"faq_a1": "平台持续维护公开报价源,后台更新后会同步刷新展示与 API 缓存。",
"faq_q2": "表格价格能直接作为合同报价吗?",
"faq_a2": "不能。页面数据用于调研与比选,实际价格、账单周期与折扣条款请以厂商官网和销售合同为准。",
"faq_q3": "如何快速筛选适合企业业务的方案?",
"faq_a3": "建议先按区域和预算过滤,再结合 vCPU、内存、存储和带宽指标缩小候选范围最后进入厂商官网确认 SLA 与网络质量。",
"cta_title": "需要更深度的采购建议?",
"cta_lede": "在社区论坛提交需求场景,或直接联系站点维护者获取更新建议。",
"cta_primary": "进入社区论坛",
"cta_secondary": "联系维护者",
},
"en": {
"meta_title": "Global VPS Pricing & Configuration Comparison | VPS Price",
"meta_description": "Pricing intelligence for engineering and procurement teams: compare VPS monthly costs, specs, and regions across major providers with normalized criteria.",
"meta_keywords": "VPS pricing comparison,cloud server procurement,provider pricing benchmark,cloud cost planning,infrastructure buying",
"og_title": "VPS Price | Global VPS Pricing Decision Console",
"og_description": "A procurement-ready view of VPS pricing and specs across major providers for faster, more confident infrastructure decisions.",
"og_locale": "en_US",
"schema_webapp_description": "A pricing and configuration comparison platform for VPS procurement and technical planning.",
"schema_table_about": "VPS Price - Global VPS Pricing Decision Console",
"schema_table_name": "VPS Pricing and Configuration Table",
"schema_table_description": "Comparable monthly pricing, specs, and region data across mainstream VPS providers",
"tagline": "Cloud pricing intelligence for engineering and procurement teams",
"hero_kicker": "Enterprise Infrastructure Intelligence",
"hero_title": "Global VPS Pricing Decision Console",
"hero_lede": "Aggregate public VPS offers, normalize monthly pricing and specs, and help engineering and procurement teams shortlist options faster.",
"hero_trust_1": "Major providers continuously tracked",
"hero_trust_2": "Normalized monthly pricing and specs",
"hero_trust_3": "Direct links to official purchase pages",
"metric_total_plans": "Comparable Plans",
"metric_providers": "Providers Covered",
"metric_regions": "Regions Covered",
"metric_lowest": "Lowest Monthly Price",
"filters_title": "Procurement Filter Console",
"filters_subtitle": "Narrow candidates by provider, region, resource profile, and budget range.",
"table_caption": "Pricing and specs refresh in real time based on active filters for quicker shortlist decisions.",
"filter_provider": "Provider",
"filter_region": "Region",
"filter_memory": "Memory ≥",
"filter_price": "Price range",
"filter_currency": "Currency",
"search_placeholder": "Search provider, plan, or region...",
"all": "All",
"unlimited": "Any",
"btn_reset": "Clear filters",
"btn_visit": "Visit Site",
"th_provider": "Provider",
"th_country": "Region",
"th_config": "Plan Spec",
"th_vcpu": "vCPU",
"th_memory": "Memory",
"th_storage": "Storage",
"th_bandwidth": "Bandwidth",
"th_traffic": "Traffic",
"th_price": "Monthly Price",
"th_action": "Official Link",
"disclaimer": "* Data is compiled from public sources and normalization rules. Final billing terms and live pricing are determined by each provider.",
"footer_note": "For research and shortlisting only. Always verify latest pricing on official provider websites.",
"contact_label": "Contact",
"empty_state": "No matching plans found",
"load_error": "Failed to load data. Please refresh.",
"search_label": "Keyword Search",
"result_count_pattern": "Showing {visible} of {total} plans",
"price_under50": "< 50",
"price_50_100": "50-100",
"price_100_300": "100-300",
"price_300_500": "300-500",
"price_over500": "> 500",
"cny": "CNY (¥)",
"usd": "USD ($)",
"no_js_note": "Base table data is already visible. Enable JavaScript for live filters, sorting, and dynamic metrics.",
"faq_title": "FAQ for Procurement Teams",
"faq_intro": "Use these answers for shortlisting. Re-check vendor websites before placing orders.",
"faq_q1": "How often are pricing and spec records updated?",
"faq_a1": "The platform continuously maintains public pricing sources. Admin updates refresh both page rendering and API cache.",
"faq_q2": "Can listed prices be treated as final contract quotes?",
"faq_a2": "No. This site is for research and shortlisting. Final pricing, billing cycles, and discounts are defined by each provider and contract.",
"faq_q3": "How should we shortlist plans for business workloads?",
"faq_a3": "Start with region and budget filters, then narrow by vCPU, memory, storage, and bandwidth. Validate SLA and network quality on the provider site.",
"cta_title": "Need Deeper Buying Guidance?",
"cta_lede": "Post your workload requirements in the community forum or contact the site maintainer directly.",
"cta_primary": "Open Community Forum",
"cta_secondary": "Contact Maintainer",
},
}
def _query_plans_for_display():
"""查询 VPS 方案列表并预加载 provider避免 to_dict() 时 N+1。"""
return (
VPSPlan.query
.options(joinedload(VPSPlan.provider_rel))
.order_by(VPSPlan.provider, VPSPlan.price_cny)
.all()
)
# /api/plans 短期缓存(秒)
_API_PLANS_CACHE_TTL = 60
_API_PLANS_CACHE = {"data": None, "expires_at": 0.0}
def _invalidate_plans_cache():
"""后台增删改方案后调用,使 /api/plans 缓存失效。"""
_API_PLANS_CACHE["expires_at"] = 0.0
def _build_home_faq_items(t):
return [
{"question": t["faq_q1"], "answer": t["faq_a1"]},
{"question": t["faq_q2"], "answer": t["faq_a2"]},
{"question": t["faq_q3"], "answer": t["faq_a3"]},
]
def _build_home_schema(lang, t, canonical_url, plans_data, faq_items):
in_language = "en-US" if lang == "en" else "zh-CN"
site_root = _site_root_url()
logo_url = _absolute_url_for("static", filename="img/site-logo.svg")
og_image_url = _absolute_url_for("static", filename="img/site-logo-mark.svg")
item_list = []
for idx, plan in enumerate(plans_data[:30], start=1):
provider_name = (plan.get("provider") or "").strip()
plan_name = (plan.get("name") or "").strip()
product_name = "{} {}".format(provider_name, plan_name).strip() or "VPS Plan {}".format(idx)
product = {
"@type": "Product",
"name": product_name,
"brand": {"@type": "Brand", "name": provider_name or SITE_NAME},
}
region_name = (plan.get("countries") or "").strip()
if region_name:
product["category"] = region_name
official_url = (plan.get("official_url") or "").strip()
if official_url:
product["url"] = official_url
offer = {"@type": "Offer", "url": official_url or canonical_url}
if plan.get("price_cny") is not None:
offer["price"] = float(plan["price_cny"])
offer["priceCurrency"] = "CNY"
elif plan.get("price_usd") is not None:
offer["price"] = float(plan["price_usd"])
offer["priceCurrency"] = "USD"
if "price" in offer:
product["offers"] = offer
item_list.append({
"@type": "ListItem",
"position": idx,
"item": product,
})
faq_entities = [
{
"@type": "Question",
"name": item["question"],
"acceptedAnswer": {"@type": "Answer", "text": item["answer"]},
}
for item in faq_items
]
return {
"@context": "https://schema.org",
"@graph": [
{
"@type": "Organization",
"@id": "{}#org".format(site_root),
"name": SITE_NAME,
"url": site_root,
"logo": logo_url,
},
{
"@type": "WebSite",
"@id": "{}#website".format(site_root),
"url": site_root,
"name": SITE_NAME,
"inLanguage": in_language,
},
{
"@type": "WebPage",
"@id": "{}#home".format(canonical_url),
"url": canonical_url,
"name": t["meta_title"],
"description": t["meta_description"],
"inLanguage": in_language,
"primaryImageOfPage": og_image_url,
},
{
"@type": "ItemList",
"name": t["schema_table_name"],
"description": t["schema_table_description"],
"itemListElement": item_list,
},
{
"@type": "FAQPage",
"mainEntity": faq_entities,
},
],
}
@app.route("/")
def index():
lang = _get_lang()
t = I18N[lang]
plans = _query_plans_for_display()
plans_data = [p.to_dict() for p in plans]
canonical_url = _public_url("index", lang=lang)
alternate_links = _alternate_lang_links("index")
faq_items = _build_home_faq_items(t)
seo = {
"title": t["meta_title"],
"description": t["meta_description"],
"keywords": t["meta_keywords"],
"canonical_url": canonical_url,
"robots": "index,follow,max-image-preview:large,max-snippet:-1,max-video-preview:-1",
"og_type": "website",
"og_url": canonical_url,
"og_title": t["og_title"],
"og_description": t["og_description"],
"og_locale": t["og_locale"],
"og_image": _absolute_url_for("static", filename="img/site-logo-mark.svg"),
"twitter_card": "summary_large_image",
"twitter_title": t["og_title"],
"twitter_description": t["og_description"],
"alternate_links": alternate_links,
}
home_schema = _build_home_schema(
lang=lang,
t=t,
canonical_url=canonical_url,
plans_data=plans_data,
faq_items=faq_items,
)
return render_template(
"index.html",
site_url=_site_root_url(),
site_name=SITE_NAME,
initial_plans_json=plans_data,
faq_items=faq_items,
seo=seo,
seo_schema=home_schema,
lang=lang,
t=t,
)
@app.route("/assets/<path:filename>")
def legacy_assets(filename):
"""
兼容历史内容中的 /assets/* 链接:
- 若 static/assets 下存在目标文件则直接返回
- 否则回退到站点标识图,避免前端出现 404 噪音
"""
assets_dir = os.path.join(app.static_folder or "", "assets")
candidate = os.path.normpath(os.path.join(assets_dir, filename))
assets_dir_abs = os.path.abspath(assets_dir)
candidate_abs = os.path.abspath(candidate)
if candidate_abs.startswith(assets_dir_abs + os.sep) and os.path.isfile(candidate_abs):
rel_path = os.path.relpath(candidate_abs, assets_dir_abs)
return send_from_directory(assets_dir_abs, rel_path)
return redirect(url_for("static", filename="img/site-logo-mark.svg"), code=302)
@app.route("/api/plans")
def api_plans():
now_ts = monotonic()
cached = _API_PLANS_CACHE.get("data")
if cached is not None and _API_PLANS_CACHE.get("expires_at", 0.0) > now_ts:
resp = jsonify(cached)
resp.headers["Cache-Control"] = "public, max-age=%d" % _API_PLANS_CACHE_TTL
return resp
plans = _query_plans_for_display()
data = [p.to_dict() for p in plans]
_API_PLANS_CACHE["data"] = data
_API_PLANS_CACHE["expires_at"] = now_ts + _API_PLANS_CACHE_TTL
resp = jsonify(data)
resp.headers["Cache-Control"] = "public, max-age=%d" % _API_PLANS_CACHE_TTL
return resp
# ---------- 前台用户与论坛 ----------
@app.route("/register", methods=["GET", "POST"])
def user_register():
lang = _get_lang()
current = _get_current_user()
if current:
if _is_banned_user(current):
session.pop("user_id", None)
else:
return redirect(url_for("forum_index"))
error = None
if request.method == "POST":
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
confirm_password = request.form.get("confirm_password") or ""
if not _is_valid_username(username):
error = _pick_lang(
"用户名需为 3-20 位,仅支持字母、数字、下划线",
"Username must be 3-20 chars (letters, numbers, underscore).",
lang,
)
elif len(password) < 6:
error = _pick_lang("密码至少 6 位", "Password must be at least 6 characters.", lang)
elif password != confirm_password:
error = _pick_lang("两次输入的密码不一致", "Passwords do not match.", lang)
elif User.query.filter(func.lower(User.username) == username.lower()).first():
error = _pick_lang("用户名已存在", "Username already exists.", lang)
else:
user = User(username=username)
user.set_password(password)
user.last_login_at = datetime.now(timezone.utc)
db.session.add(user)
db.session.commit()
session["user_id"] = user.id
return redirect(_safe_next_url("forum_index"))
return render_template("auth/register.html", error=error)
@app.route("/login", methods=["GET", "POST"])
def user_login():
lang = _get_lang()
current = _get_current_user()
if current:
if _is_banned_user(current):
session.pop("user_id", None)
else:
return redirect(url_for("forum_index"))
error = (request.args.get("error") or "").strip() or None
if request.method == "POST":
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
user = User.query.filter(func.lower(User.username) == username.lower()).first()
if not user or not user.check_password(password):
error = _pick_lang("用户名或密码错误", "Invalid username or password.", lang)
elif _is_banned_user(user):
error = _user_ban_message(user)
else:
user.last_login_at = datetime.now(timezone.utc)
db.session.commit()
session["user_id"] = user.id
return redirect(_safe_next_url("forum_index"))
return render_template("auth/login.html", error=error)
@app.route("/logout")
def user_logout():
session.pop("user_id", None)
return redirect(url_for("forum_index"))
@app.route("/profile")
def user_profile_redirect():
return redirect(url_for("user_profile"))
@app.route("/me", methods=["GET", "POST"])
@user_login_required
def user_profile():
lang = _get_lang()
user = _get_current_user()
tab = (request.args.get("tab") or "posts").strip().lower()
if tab not in {"posts", "comments", "likes", "bookmarks", "settings"}:
tab = "posts"
if request.method == "POST":
action = (request.form.get("action") or "").strip().lower()
if action == "profile":
username = (request.form.get("username") or "").strip()
if username == user.username:
return redirect(url_for("user_profile", tab="settings", msg=_pick_lang("资料未变更", "No changes detected.", lang)))
if not _is_valid_username(username):
return redirect(url_for(
"user_profile",
tab="settings",
error=_pick_lang(
"用户名需为 3-20 位,仅支持字母、数字、下划线",
"Username must be 3-20 chars (letters, numbers, underscore).",
lang,
),
))
exists = (
User.query
.filter(func.lower(User.username) == username.lower(), User.id != user.id)
.first()
)
if exists:
return redirect(url_for("user_profile", tab="settings", error=_pick_lang("用户名已存在", "Username already exists.", lang)))
user.username = username
db.session.commit()
return redirect(url_for("user_profile", tab="settings", msg=_pick_lang("用户名已更新", "Username updated.", lang)))
if action == "password":
current_password = request.form.get("current_password") or ""
new_password = request.form.get("new_password") or ""
confirm_password = request.form.get("confirm_password") or ""
if not user.check_password(current_password):
return redirect(url_for("user_profile", tab="settings", error=_pick_lang("当前密码错误", "Current password is incorrect.", lang)))
if len(new_password) < 6:
return redirect(url_for("user_profile", tab="settings", error=_pick_lang("新密码至少 6 位", "New password must be at least 6 characters.", lang)))
if new_password != confirm_password:
return redirect(url_for("user_profile", tab="settings", error=_pick_lang("两次新密码输入不一致", "New passwords do not match.", lang)))
user.set_password(new_password)
db.session.commit()
return redirect(url_for("user_profile", tab="settings", msg=_pick_lang("密码已更新", "Password updated.", lang)))
return redirect(url_for("user_profile", tab="settings", error=_pick_lang("未知操作", "Unknown action.", lang)))
my_post_rows = (
_query_forum_post_rows(active_tab="latest", author_id=user.id)
.limit(60)
.all()
)
my_post_cards = _build_forum_post_cards(my_post_rows, lang=lang)
my_comment_rows = (
db.session.query(
ForumComment,
ForumPost.id.label("post_id"),
ForumPost.title.label("post_title"),
)
.join(ForumPost, ForumComment.post_id == ForumPost.id)
.filter(ForumComment.user_id == user.id)
.order_by(ForumComment.created_at.desc(), ForumComment.id.desc())
.limit(120)
.all()
)
my_comment_items = [
{
"comment": c,
"post_id": post_id,
"post_title": post_title,
}
for c, post_id, post_title in my_comment_rows
]
my_like_rows = (
db.session.query(
ForumPostLike,
ForumPost.id.label("post_id"),
ForumPost.title.label("post_title"),
ForumPost.category.label("post_category"),
ForumPost.created_at.label("post_created_at"),
)
.join(ForumPost, ForumPostLike.post_id == ForumPost.id)
.filter(ForumPostLike.user_id == user.id)
.order_by(ForumPostLike.created_at.desc(), ForumPostLike.id.desc())
.limit(120)
.all()
)
my_like_items = [
{
"like": like_row,
"post_id": post_id,
"post_title": post_title,
"post_category": post_category,
"post_created_at": post_created_at,
}
for like_row, post_id, post_title, post_category, post_created_at in my_like_rows
]
my_bookmark_rows = (
db.session.query(
ForumPostBookmark,
ForumPost.id.label("post_id"),
ForumPost.title.label("post_title"),
ForumPost.category.label("post_category"),
ForumPost.created_at.label("post_created_at"),
)
.join(ForumPost, ForumPostBookmark.post_id == ForumPost.id)
.filter(ForumPostBookmark.user_id == user.id)
.order_by(ForumPostBookmark.created_at.desc(), ForumPostBookmark.id.desc())
.limit(120)
.all()
)
my_bookmark_items = [
{
"bookmark": bookmark_row,
"post_id": post_id,
"post_title": post_title,
"post_category": post_category,
"post_created_at": post_created_at,
}
for bookmark_row, post_id, post_title, post_category, post_created_at in my_bookmark_rows
]
stats = {
"post_count": ForumPost.query.filter_by(user_id=user.id).count(),
"comment_count": ForumComment.query.filter_by(user_id=user.id).count(),
"like_count": ForumPostLike.query.filter_by(user_id=user.id).count(),
"bookmark_count": ForumPostBookmark.query.filter_by(user_id=user.id).count(),
"report_count": ForumReport.query.filter_by(reporter_id=user.id).count(),
"pending_report_count": ForumReport.query.filter_by(reporter_id=user.id, status="pending").count(),
"notification_count": ForumNotification.query.filter_by(user_id=user.id).count(),
"unread_notification_count": ForumNotification.query.filter_by(user_id=user.id, is_read=False).count(),
}
return render_template(
"forum/profile.html",
profile_user=user,
active_tab=tab,
my_post_cards=my_post_cards,
my_comment_items=my_comment_items,
my_like_items=my_like_items,
my_bookmark_items=my_bookmark_items,
stats=stats,
message=request.args.get("msg") or "",
error=request.args.get("error") or "",
)
@app.route("/notifications")
@user_login_required
def user_notifications():
lang = _get_lang()
user = _get_current_user()
status = (request.args.get("status") or "all").strip().lower()
if status not in {"all", "unread", "read"}:
status = "all"
q = (
ForumNotification.query
.filter_by(user_id=user.id)
.options(joinedload(ForumNotification.actor_rel))
)
if status == "unread":
q = q.filter_by(is_read=False)
elif status == "read":
q = q.filter_by(is_read=True)
rows = q.order_by(ForumNotification.created_at.desc(), ForumNotification.id.desc()).limit(300).all()
items = []
for n in rows:
items.append({
"notification": n,
"type_label": _notification_type_label(n.notif_type, lang=lang),
"actor_name": n.actor_rel.username if n.actor_rel else "",
"target_url": _notification_target_url(n),
"time_text": _humanize_time(n.created_at, lang=lang),
})
status_rows = (
db.session.query(ForumNotification.is_read, func.count(ForumNotification.id))
.filter_by(user_id=user.id)
.group_by(ForumNotification.is_read)
.all()
)
read_count = 0
unread_count = 0
for is_read, count_val in status_rows:
if bool(is_read):
read_count = int(count_val or 0)
else:
unread_count = int(count_val or 0)
return render_template(
"forum/notifications.html",
active_status=status,
notification_items=items,
unread_count=unread_count,
read_count=read_count,
total_count=unread_count + read_count,
message=request.args.get("msg") or "",
error=request.args.get("error") or "",
)
@app.route("/notification/<int:notification_id>/go")
@user_login_required
def user_notification_go(notification_id):
lang = _get_lang()
user = _get_current_user()
n = ForumNotification.query.get_or_404(notification_id)
if n.user_id != user.id:
return redirect(url_for("user_notifications", error=_pick_lang("无权访问该通知", "Permission denied for this notification.", lang)))
if not n.is_read:
n.is_read = True
db.session.commit()
return redirect(_notification_target_url(n))
@app.route("/notification/<int:notification_id>/read", methods=["POST"])
@user_login_required
def user_notification_read(notification_id):
lang = _get_lang()
user = _get_current_user()
n = ForumNotification.query.get_or_404(notification_id)
if n.user_id != user.id:
return redirect(url_for("user_notifications", error=_pick_lang("无权操作该通知", "Permission denied for this notification.", lang)))
if not n.is_read:
n.is_read = True
db.session.commit()
_NOTIF_COUNT_CACHE.pop(user.id, None)
next_url = (request.form.get("next") or "").strip()
if next_url.startswith("/") and not next_url.startswith("//"):
return redirect(next_url)
return redirect(url_for("user_notifications", msg=_pick_lang("已标记为已读", "Marked as read.", lang)))
@app.route("/notifications/read-all", methods=["POST"])
@user_login_required
def user_notifications_read_all():
lang = _get_lang()
user = _get_current_user()
unread = ForumNotification.query.filter_by(user_id=user.id, is_read=False)
updated = unread.update({"is_read": True}, synchronize_session=False)
db.session.commit()
if updated:
_NOTIF_COUNT_CACHE.pop(user.id, None)
msg = _pick_lang("已全部标记为已读", "All notifications marked as read.", lang) if updated else _pick_lang("没有未读通知", "No unread notifications.", lang)
return redirect(url_for("user_notifications", msg=msg))
@app.route("/forum")
def forum_index():
lang = _get_lang()
per_page_options = [10, 20, 30, 50]
active_tab = (request.args.get("tab") or "latest").strip().lower()
if active_tab not in {"latest", "new", "hot"}:
active_tab = "latest"
selected_category = (request.args.get("category") or "").strip() or None
if selected_category and len(selected_category) > 32:
selected_category = selected_category[:32]
search_query = (request.args.get("q") or "").strip()
if len(search_query) > 80:
search_query = search_query[:80]
page = request.args.get("page", type=int) or 1
if page < 1:
page = 1
per_page = request.args.get("per_page", type=int) or 20
if per_page not in per_page_options:
per_page = 20
rows_query = _query_forum_post_rows(
active_tab=active_tab,
selected_category=selected_category,
search_query=search_query or None,
)
total_posts = _count_forum_posts(
selected_category=selected_category,
search_query=search_query or None,
)
total_pages = max((total_posts + per_page - 1) // per_page, 1)
if page > total_pages:
page = total_pages
rows = rows_query.offset((page - 1) * per_page).limit(per_page).all()
post_cards = _build_forum_post_cards(rows, lang=lang)
sidebar = _forum_sidebar_data()
category_count_map = {name: int(count or 0) for name, count in (sidebar.get("category_counts") or [])}
category_names = list(_get_forum_category_names(active_only=True))
for name in category_count_map.keys():
if name and name not in category_names:
category_names.append(name)
if selected_category and selected_category not in category_names:
category_names.insert(0, selected_category)
tab_defs = [
("latest", _pick_lang("最新", "Latest", lang)),
("new", _pick_lang("新帖", "New", lang)),
("hot", _pick_lang("热门", "Top", lang)),
]
tab_links = [
{
"key": key,
"label": label,
"url": _build_forum_url(
tab=key,
category=selected_category,
q=search_query or None,
page=1,
per_page=per_page,
),
"active": active_tab == key,
}
for key, label in tab_defs
]
category_links = [
{
"name": _pick_lang("全部", "All", lang),
"count": None,
"url": _build_forum_url(
tab=active_tab,
category=None,
q=search_query or None,
page=1,
per_page=per_page,
),
"active": selected_category is None,
}
]
for name in category_names:
category_links.append({
"name": name,
"count": category_count_map.get(name, 0),
"url": _build_forum_url(
tab=active_tab,
category=name,
q=search_query or None,
page=1,
per_page=per_page,
),
"active": selected_category == name,
})
category_nav_url = _build_forum_url(
tab=active_tab,
category=selected_category or (category_names[0] if category_names else None),
q=search_query or None,
page=1,
per_page=per_page,
)
window_start = max(1, page - 2)
window_end = min(total_pages, page + 2)
page_links = [
{
"num": num,
"url": _build_forum_url(
tab=active_tab,
category=selected_category,
q=search_query or None,
page=num,
per_page=per_page,
),
"active": num == page,
}
for num in range(window_start, window_end + 1)
]
has_filters = bool(selected_category or search_query or active_tab != "latest")
if search_query and selected_category:
empty_hint = _pick_lang("当前分类下没有匹配关键词的帖子。", "No posts match your keywords in this category.", lang)
elif search_query:
empty_hint = _pick_lang("没有匹配关键词的帖子。", "No posts match your keywords.", lang)
elif selected_category:
empty_hint = _pick_lang("该分类暂时没有帖子。", "No posts in this category yet.", lang)
else:
empty_hint = _pick_lang("当前没有帖子,点击右上角按钮发布第一条内容。", "No posts yet. Create the first topic from the top-right button.", lang)
result_start = ((page - 1) * per_page + 1) if total_posts else 0
result_end = min(page * per_page, total_posts) if total_posts else 0
canonical_params = {
"tab": active_tab if active_tab != "latest" else None,
"category": selected_category,
"page": page if page > 1 else None,
}
canonical_url = _public_url("forum_index", lang=lang, **canonical_params)
alternate_links = _alternate_lang_links("forum_index", **canonical_params)
prev_canonical_url = None
next_canonical_url = None
if page > 1:
prev_params = dict(canonical_params)
prev_page = page - 1
prev_params["page"] = prev_page if prev_page > 1 else None
prev_canonical_url = _public_url("forum_index", lang=lang, **prev_params)
if page < total_pages:
next_params = dict(canonical_params)
next_params["page"] = page + 1
next_canonical_url = _public_url("forum_index", lang=lang, **next_params)
if selected_category:
forum_title = _pick_lang(
"{} 讨论区 | 云价眼论坛".format(selected_category),
"{} Discussions | VPS Forum".format(selected_category),
lang,
)
forum_heading = _pick_lang(
"{} · 论坛分类".format(selected_category),
"{} · Forum Category".format(selected_category),
lang,
)
else:
forum_title = _pick_lang("VPS 社区论坛 | 云价眼", "VPS Community Forum | VPS Price", lang)
forum_heading = _pick_lang("VPS 社区论坛", "VPS Community Forum", lang)
if page > 1:
forum_title = "{} - {}".format(
forum_title,
_pick_lang("{}".format(page), "Page {}".format(page), lang),
)
if search_query:
forum_description = _pick_lang(
"论坛搜索结果:{}。该页面主要用于站内检索。".format(search_query),
"Forum search results for '{}'. This page is intended for on-site search.".format(search_query),
lang,
)
forum_intro = _pick_lang(
"搜索词:{}。建议进一步按分类或标签缩小结果范围。".format(search_query),
"Search query: '{}'. Narrow down with categories or topic tags for better results.".format(search_query),
lang,
)
elif selected_category:
forum_description = _forum_category_description(selected_category, lang)
forum_intro = forum_description
else:
forum_description = _forum_category_description(None, lang)
tab_intro_map = {
"latest": _pick_lang(
"按最新活跃度浏览主题,快速跟进持续更新的讨论。",
"Browse by latest activity to track ongoing discussions.",
lang,
),
"new": _pick_lang(
"查看最近发布的新主题,及时参与新话题。",
"See newly published topics and join early conversations.",
lang,
),
"hot": _pick_lang(
"按热度排序,优先阅读高互动的热门讨论。",
"Sorted by engagement to surface high-signal discussions.",
lang,
),
}
forum_intro = tab_intro_map.get(active_tab) or forum_description
noindex_listing = bool(search_query or per_page != 20)
forum_feed_url = _public_url("forum_feed", lang=lang)
seo = {
"title": forum_title,
"description": forum_description,
"keywords": _forum_index_keywords(lang, active_tab=active_tab, selected_category=selected_category),
"canonical_url": canonical_url,
"prev_canonical_url": prev_canonical_url,
"next_canonical_url": next_canonical_url,
"robots": "noindex,follow" if noindex_listing else "index,follow,max-image-preview:large,max-snippet:-1,max-video-preview:-1",
"og_type": "website",
"og_url": canonical_url,
"og_title": forum_title,
"og_description": forum_description,
"og_image": _absolute_url_for("static", filename="img/site-logo-mark.svg"),
"twitter_card": "summary_large_image",
"twitter_title": forum_title,
"twitter_description": forum_description,
"alternate_links": alternate_links,
"feed_url": forum_feed_url,
}
list_items = []
latest_activity_at = None
for idx, card in enumerate(post_cards, start=1):
post_obj = card.get("post")
if not post_obj:
continue
post_url = _public_url("forum_post_detail", lang=lang, post_id=post_obj.id)
list_items.append({
"@type": "ListItem",
"position": idx,
"name": post_obj.title,
"url": post_url,
})
activity_at = card.get("latest_activity") or post_obj.updated_at or post_obj.created_at
if activity_at and (latest_activity_at is None or activity_at > latest_activity_at):
latest_activity_at = activity_at
breadcrumb_schema = _forum_breadcrumb_schema(lang=lang, selected_category=selected_category)
breadcrumb_schema["@id"] = "{}#breadcrumb".format(canonical_url)
collection_schema = {
"@type": "CollectionPage",
"@id": "{}#collection".format(canonical_url),
"name": forum_title,
"description": forum_description,
"url": canonical_url,
"inLanguage": "en-US" if lang == "en" else "zh-CN",
"breadcrumb": {"@id": breadcrumb_schema["@id"]},
"isPartOf": {"@type": "WebSite", "name": SITE_NAME, "url": _site_root_url()},
}
if latest_activity_at:
collection_schema["dateModified"] = _iso8601_utc(latest_activity_at)
if not search_query:
collection_schema["potentialAction"] = {
"@type": "SearchAction",
"target": "{}?q={{q}}".format(_public_url("forum_index", lang=lang)),
"query-input": "required name=q",
}
seo_graph = [collection_schema, breadcrumb_schema]
if list_items:
item_list_schema = {
"@type": "ItemList",
"@id": "{}#items".format(canonical_url),
"itemListElement": list_items,
}
collection_schema["mainEntity"] = {"@id": item_list_schema["@id"]}
seo_graph.append(item_list_schema)
seo_schema = {
"@context": "https://schema.org",
"@graph": seo_graph,
}
return render_template(
"forum/index.html",
post_cards=post_cards,
sidebar=sidebar,
active_tab=active_tab,
selected_category=selected_category,
search_query=search_query,
tab_links=tab_links,
category_links=category_links,
category_nav_url=category_nav_url,
total_posts=total_posts,
total_pages=total_pages,
current_page=page,
page_links=page_links,
has_prev=(page > 1),
has_next=(page < total_pages),
prev_page_url=_build_forum_url(
tab=active_tab,
category=selected_category,
q=search_query or None,
page=page - 1,
per_page=per_page,
),
next_page_url=_build_forum_url(
tab=active_tab,
category=selected_category,
q=search_query or None,
page=page + 1,
per_page=per_page,
),
clear_search_url=_build_forum_url(
tab=active_tab,
category=selected_category,
q=None,
page=1,
per_page=per_page,
),
clear_all_url=_build_forum_url(tab="latest", category=None, q=None, page=1, per_page=per_page),
has_filters=has_filters,
empty_hint=empty_hint,
result_start=result_start,
result_end=result_end,
per_page=per_page,
per_page_options=per_page_options,
message=request.args.get("msg") or "",
error=request.args.get("error") or "",
forum_heading=forum_heading,
forum_intro=forum_intro,
forum_feed_url=forum_feed_url,
seo=seo,
seo_schema=seo_schema,
)
@app.route("/forum/post/new", methods=["GET", "POST"])
@user_login_required
def forum_post_new():
lang = _get_lang()
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user)
if blocked_resp:
return blocked_resp
error = None
title = ""
content = ""
available_categories = _get_forum_category_names(active_only=True)
category = available_categories[0] if available_categories else "综合讨论"
if request.method == "POST":
title = (request.form.get("title") or "").strip()
content = (request.form.get("content") or "").strip()
category = (request.form.get("category") or "").strip() or category
if category not in available_categories:
category = available_categories[0] if available_categories else "综合讨论"
if len(title) < 5:
error = _pick_lang("标题至少 5 个字符", "Title must be at least 5 characters.", lang)
elif len(title) > 160:
error = _pick_lang("标题不能超过 160 个字符", "Title must be 160 characters or fewer.", lang)
elif len(content) < 10:
error = _pick_lang("内容至少 10 个字符", "Content must be at least 10 characters.", lang)
else:
post = ForumPost(
user_id=user.id,
category=category,
title=title,
content=content,
)
db.session.add(post)
db.session.commit()
return redirect(url_for("forum_post_detail", post_id=post.id))
return render_template(
"forum/post_form.html",
error=error,
title_val=title,
content_val=content,
category_val=category,
categories=available_categories,
page_title=_pick_lang("创建新主题", "Create Topic", lang),
submit_text=_pick_lang("发布主题", "Publish", lang),
action_url=url_for("forum_post_new"),
cancel_url=url_for("forum_index"),
form_mode="create",
)
@app.route("/forum/post/<int:post_id>/edit", methods=["GET", "POST"])
@user_login_required
def forum_post_edit(post_id):
lang = _get_lang()
post = ForumPost.query.get_or_404(post_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id)
if blocked_resp:
return blocked_resp
if not _can_edit_post(user, post):
return _forum_redirect_with_error(post.id, "你没有权限编辑该帖子")
error = None
title = post.title or ""
content = post.content or ""
available_categories = _get_forum_category_names(active_only=True)
if post.category and post.category not in available_categories:
available_categories.insert(0, post.category)
category = post.category or (available_categories[0] if available_categories else "综合讨论")
if request.method == "POST":
title = (request.form.get("title") or "").strip()
content = (request.form.get("content") or "").strip()
category = (request.form.get("category") or "").strip() or category
if category not in available_categories:
category = available_categories[0] if available_categories else "综合讨论"
if len(title) < 5:
error = _pick_lang("标题至少 5 个字符", "Title must be at least 5 characters.", lang)
elif len(title) > 160:
error = _pick_lang("标题不能超过 160 个字符", "Title must be 160 characters or fewer.", lang)
elif len(content) < 10:
error = _pick_lang("内容至少 10 个字符", "Content must be at least 10 characters.", lang)
else:
post.title = title
post.content = content
post.category = category
db.session.commit()
return _forum_redirect_with_msg(post.id, "帖子已更新")
return render_template(
"forum/post_form.html",
error=error,
title_val=title,
content_val=content,
category_val=category,
categories=available_categories,
page_title=_pick_lang("编辑主题", "Edit Topic", lang),
submit_text=_pick_lang("保存修改", "Save Changes", lang),
action_url=url_for("forum_post_edit", post_id=post.id),
cancel_url=url_for("forum_post_detail", post_id=post.id),
form_mode="edit",
)
@app.route("/forum/post/<int:post_id>/delete", methods=["POST"])
@user_login_required
def forum_post_delete(post_id):
post = ForumPost.query.get_or_404(post_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id)
if blocked_resp:
return blocked_resp
if not _can_edit_post(user, post):
return _forum_redirect_with_error(post.id, "你没有权限删除该帖子")
db.session.delete(post)
db.session.commit()
return redirect(url_for("forum_index"))
@app.route("/forum/post/<int:post_id>")
def forum_post_detail(post_id):
lang = _get_lang()
post = ForumPost.query.get_or_404(post_id)
current_user = _get_current_user()
viewed_posts = session.get("viewed_posts") or []
if post.id not in viewed_posts:
post.view_count = int(post.view_count or 0) + 1
viewed_posts.append(post.id)
session["viewed_posts"] = viewed_posts[-200:]
db.session.commit()
comments = (
ForumComment.query
.options(joinedload(ForumComment.author_rel))
.filter_by(post_id=post.id)
.order_by(ForumComment.created_at.asc(), ForumComment.id.asc())
.all()
)
like_count = ForumPostLike.query.filter_by(post_id=post.id).count()
bookmark_count = ForumPostBookmark.query.filter_by(post_id=post.id).count()
liked_by_me = False
bookmarked_by_me = False
can_interact = bool(current_user and not _is_banned_user(current_user))
if current_user:
# 一次查询同时得到当前用户是否点赞/收藏,减少请求次数
rows = db.session.execute(
text(
"(SELECT 'like' AS kind FROM forum_post_likes WHERE post_id=:pid AND user_id=:uid LIMIT 1) "
"UNION ALL "
"(SELECT 'bookmark' FROM forum_post_bookmarks WHERE post_id=:pid AND user_id=:uid LIMIT 1)"
),
{"pid": post.id, "uid": current_user.id},
).fetchall()
kinds = {row[0] for row in rows}
liked_by_me = "like" in kinds
bookmarked_by_me = "bookmark" in kinds
sidebar = _forum_sidebar_data()
canonical_url = _public_url("forum_post_detail", lang=lang, post_id=post.id)
post_excerpt = _plain_excerpt(post.content or "", limit=170)
if not post_excerpt:
post_excerpt = _pick_lang("论坛主题详情页。", "Discussion topic detail page.", lang)
post_category = post.category or _pick_lang("综合讨论", "General", lang)
post_keywords = ", ".join(dict.fromkeys([
post_category,
_pick_lang("VPS论坛", "VPS forum", lang),
_pick_lang("VPS讨论", "VPS discussion", lang),
_pick_lang("云服务器评测", "cloud server review", lang),
]))
published_time = _iso8601_utc(post.created_at)
modified_time = _iso8601_utc(post.updated_at or post.created_at)
comments_count = len(comments)
forum_feed_url = _public_url("forum_feed", lang=lang)
seo_title = _pick_lang(
"{} - 论坛主题 | 云价眼".format(post.title),
"{} - Forum Topic | VPS Price".format(post.title),
lang,
)
seo = {
"title": seo_title,
"description": post_excerpt,
"keywords": post_keywords,
"canonical_url": canonical_url,
"robots": "index,follow,max-image-preview:large,max-snippet:-1,max-video-preview:-1",
"og_type": "article",
"og_url": canonical_url,
"og_title": seo_title,
"og_description": post_excerpt,
"og_image": _absolute_url_for("static", filename="img/site-logo-mark.svg"),
"twitter_card": "summary_large_image",
"twitter_title": seo_title,
"twitter_description": post_excerpt,
"article_published_time": published_time,
"article_modified_time": modified_time,
"article_section": post_category,
"feed_url": forum_feed_url,
"alternate_links": _alternate_lang_links("forum_post_detail", post_id=post.id),
}
author_name = (
post.author_rel.username
if post.author_rel and post.author_rel.username
else _pick_lang("已注销用户", "Deleted user", lang)
)
post_schema = {
"@type": "DiscussionForumPosting",
"@id": "{}#topic".format(canonical_url),
"headline": post.title,
"description": post_excerpt,
"articleSection": post_category,
"keywords": post_keywords,
"mainEntityOfPage": canonical_url,
"url": canonical_url,
"datePublished": published_time,
"dateModified": modified_time,
"author": {"@type": "Person", "name": author_name},
"publisher": {
"@type": "Organization",
"name": SITE_NAME,
"url": _site_root_url(),
"logo": {
"@type": "ImageObject",
"url": _absolute_url_for("static", filename="img/site-logo.svg"),
},
},
"commentCount": comments_count,
"interactionStatistic": [
{
"@type": "InteractionCounter",
"interactionType": "https://schema.org/ViewAction",
"userInteractionCount": int(post.view_count or 0),
},
{
"@type": "InteractionCounter",
"interactionType": "https://schema.org/CommentAction",
"userInteractionCount": comments_count,
},
{
"@type": "InteractionCounter",
"interactionType": "https://schema.org/LikeAction",
"userInteractionCount": int(like_count or 0),
},
],
"inLanguage": "en-US" if lang == "en" else "zh-CN",
"isPartOf": {"@type": "WebSite", "name": SITE_NAME, "url": _site_root_url()},
}
comment_entities = []
for c in comments[:20]:
author = c.author_rel.username if c.author_rel and c.author_rel.username else _pick_lang("匿名用户", "Anonymous", lang)
text_excerpt = _plain_excerpt(c.content or "", limit=220)
if not text_excerpt:
continue
comment_item = {
"@type": "Comment",
"text": text_excerpt,
"dateCreated": _iso8601_utc(c.created_at),
"author": {"@type": "Person", "name": author},
}
if c.id:
comment_item["url"] = "{}#comment-{}".format(canonical_url, c.id)
comment_entities.append(comment_item)
if comment_entities:
post_schema["comment"] = comment_entities
breadcrumb_schema = _forum_breadcrumb_schema(
lang=lang,
selected_category=post.category,
post=post,
post_url=canonical_url,
)
breadcrumb_schema["@id"] = "{}#breadcrumb".format(canonical_url)
post_schema["breadcrumb"] = {"@id": breadcrumb_schema["@id"]}
seo_schema = {
"@context": "https://schema.org",
"@graph": [post_schema, breadcrumb_schema],
}
return render_template(
"forum/post_detail.html",
post=post,
comments=comments,
like_count=like_count,
bookmark_count=bookmark_count,
liked_by_me=liked_by_me,
bookmarked_by_me=bookmarked_by_me,
can_interact=can_interact,
sidebar=sidebar,
message=request.args.get("msg") or "",
error=request.args.get("error") or "",
seo=seo,
seo_schema=seo_schema,
)
@app.route("/forum/post/<int:post_id>/like", methods=["POST"])
@user_login_required
def forum_post_like_toggle(post_id):
post = ForumPost.query.get_or_404(post_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id)
if blocked_resp:
return blocked_resp
exists = ForumPostLike.query.filter_by(post_id=post.id, user_id=user.id).first()
if exists:
db.session.delete(exists)
db.session.commit()
return redirect(_safe_form_next_url(url_for("forum_post_detail", post_id=post.id, msg="已取消点赞")))
db.session.add(ForumPostLike(post_id=post.id, user_id=user.id))
db.session.commit()
return redirect(_safe_form_next_url(url_for("forum_post_detail", post_id=post.id, msg="已点赞该帖子")))
@app.route("/forum/post/<int:post_id>/bookmark", methods=["POST"])
@user_login_required
def forum_post_bookmark_toggle(post_id):
post = ForumPost.query.get_or_404(post_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id)
if blocked_resp:
return blocked_resp
exists = ForumPostBookmark.query.filter_by(post_id=post.id, user_id=user.id).first()
if exists:
db.session.delete(exists)
db.session.commit()
return redirect(_safe_form_next_url(url_for("forum_post_detail", post_id=post.id, msg="已取消收藏")))
db.session.add(ForumPostBookmark(post_id=post.id, user_id=user.id))
db.session.commit()
return redirect(_safe_form_next_url(url_for("forum_post_detail", post_id=post.id, msg="已收藏该帖子")))
@app.route("/forum/post/<int:post_id>/comment", methods=["POST"])
@user_login_required
def forum_post_comment(post_id):
post = ForumPost.query.get_or_404(post_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id)
if blocked_resp:
return blocked_resp
if post.is_locked:
return _forum_redirect_with_error(post.id, "该帖子已锁定,暂不允许新增评论")
content = (request.form.get("content") or "").strip()
if len(content) < 2:
return redirect(url_for("forum_post_detail", post_id=post.id, error="评论至少 2 个字符"))
comment = ForumComment(
post_id=post.id,
user_id=user.id,
content=content,
)
db.session.add(comment)
db.session.flush()
actor_name = user.username or "用户"
post_title = post.title or "主题"
if post.user_id and post.user_id != user.id:
_create_notification(
user_id=post.user_id,
notif_type="post_commented",
message="{} 评论了你的帖子《{}".format(actor_name, post_title),
actor_id=user.id,
post_id=post.id,
comment_id=comment.id,
)
participant_rows = (
db.session.query(ForumComment.user_id)
.filter(
ForumComment.post_id == post.id,
ForumComment.user_id.isnot(None),
ForumComment.user_id != user.id,
)
.distinct()
.limit(50)
.all()
)
for (uid,) in participant_rows:
if not uid:
continue
if uid == post.user_id or uid == user.id:
continue
_create_notification(
user_id=uid,
notif_type="thread_replied",
message="{} 在你参与的主题《{}》有新回复".format(actor_name, post_title),
actor_id=user.id,
post_id=post.id,
comment_id=comment.id,
)
db.session.commit()
return redirect(url_for("forum_post_detail", post_id=post.id, msg="评论发布成功"))
@app.route("/forum/comment/<int:comment_id>/edit", methods=["GET", "POST"])
@user_login_required
def forum_comment_edit(comment_id):
comment = ForumComment.query.get_or_404(comment_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=comment.post_id)
if blocked_resp:
return blocked_resp
if not _can_edit_comment(user, comment):
return _forum_redirect_with_error(comment.post_id, "你没有权限编辑该评论")
error = None
content = comment.content or ""
if request.method == "POST":
content = (request.form.get("content") or "").strip()
if len(content) < 2:
error = "评论至少 2 个字符"
else:
comment.content = content
db.session.commit()
return _forum_redirect_with_msg(comment.post_id, "评论已更新")
return render_template(
"forum/comment_form.html",
error=error,
comment=comment,
content_val=content,
action_url=url_for("forum_comment_edit", comment_id=comment.id),
cancel_url=url_for("forum_post_detail", post_id=comment.post_id),
)
@app.route("/forum/comment/<int:comment_id>/delete", methods=["POST"])
@user_login_required
def forum_comment_delete(comment_id):
comment = ForumComment.query.get_or_404(comment_id)
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user, post_id=comment.post_id)
if blocked_resp:
return blocked_resp
if not _can_edit_comment(user, comment):
return _forum_redirect_with_error(comment.post_id, "你没有权限删除该评论")
post_id = comment.post_id
db.session.delete(comment)
db.session.commit()
return _forum_redirect_with_msg(post_id, "评论已删除")
@app.route("/forum/report", methods=["POST"])
@user_login_required
def forum_report_create():
user = _get_current_user()
blocked_resp = _ensure_forum_interaction_user(user)
if blocked_resp:
return blocked_resp
target_type = (request.form.get("target_type") or "").strip().lower()
target_id = request.form.get("target_id", type=int) or 0
reason = (request.form.get("reason") or "其他").strip()
detail = (request.form.get("detail") or "").strip()
if len(detail) > 500:
detail = detail[:500]
if reason not in FORUM_REPORT_REASONS:
reason = "其他"
report_post_id = None
target_owner_id = None
snapshot_title = None
snapshot_content = None
if target_type == "post":
target_post = db.session.get(ForumPost, target_id)
if target_post is None:
return redirect(url_for("forum_index"))
report_post_id = target_post.id
target_owner_id = target_post.user_id
snapshot_title = target_post.title
snapshot_content = target_post.content
elif target_type == "comment":
target_comment = db.session.get(ForumComment, target_id)
if target_comment is None:
return redirect(url_for("forum_index"))
report_post_id = target_comment.post_id
target_owner_id = target_comment.user_id
snapshot_title = target_comment.post_rel.title if target_comment.post_rel else None
snapshot_content = target_comment.content
else:
return redirect(url_for("forum_index"))
if target_owner_id == user.id:
return _forum_redirect_with_error(report_post_id, "不能举报自己的内容")
exists = ForumReport.query.filter_by(
reporter_id=user.id,
target_type=target_type,
target_id=target_id,
status="pending",
).first()
if exists:
return _forum_redirect_with_msg(report_post_id, "你已举报该内容,请等待处理")
db.session.add(ForumReport(
reporter_id=user.id,
target_type=target_type,
target_id=target_id,
reason=reason,
detail=detail or None,
snapshot_title=snapshot_title,
snapshot_content=snapshot_content,
status="pending",
))
db.session.commit()
return _forum_redirect_with_msg(report_post_id, "举报已提交,感谢反馈")
@app.route("/forum/feed.xml")
def forum_feed():
lang = _get_lang()
latest_activity_expr = func.coalesce(ForumPost.updated_at, ForumPost.created_at)
rows = (
db.session.query(
ForumPost,
User.username.label("author_name"),
)
.outerjoin(User, User.id == ForumPost.user_id)
.order_by(latest_activity_expr.desc(), ForumPost.id.desc())
.limit(120)
.all()
)
channel_title = _pick_lang("云价眼论坛最新主题", "VPS Price Forum Latest Topics", lang)
channel_description = _pick_lang(
"按最新活跃度输出论坛主题 RSS 订阅,便于跟踪 VPS 讨论更新。",
"RSS feed of the latest forum activity to track VPS discussions.",
lang,
)
channel_link = _public_url("forum_index", lang=lang)
self_feed_url = _public_url("forum_feed", lang=lang)
latest_time = None
if rows:
p = rows[0][0]
latest_time = p.updated_at or p.created_at
last_build_date = _rfc2822_utc(latest_time or datetime.now(timezone.utc))
lines = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom">',
" <channel>",
" <title>{}</title>".format(xml_escape(channel_title)),
" <description>{}</description>".format(xml_escape(channel_description)),
" <link>{}</link>".format(xml_escape(channel_link)),
" <language>{}</language>".format("en-us" if lang == "en" else "zh-cn"),
" <lastBuildDate>{}</lastBuildDate>".format(xml_escape(last_build_date)),
' <atom:link href="{}" rel="self" type="application/rss+xml" />'.format(xml_escape(self_feed_url)),
]
for post, author_name in rows:
post_url = _public_url("forum_post_detail", lang=lang, post_id=post.id)
pub_date = _rfc2822_utc(post.updated_at or post.created_at) or last_build_date
author = author_name or _pick_lang("匿名用户", "Anonymous", lang)
summary = _plain_excerpt(post.content or "", limit=260)
category = post.category or _pick_lang("综合讨论", "General", lang)
lines.extend([
" <item>",
" <title>{}</title>".format(xml_escape(post.title or _pick_lang("未命名主题", "Untitled topic", lang))),
" <description>{}</description>".format(xml_escape(summary)),
" <link>{}</link>".format(xml_escape(post_url)),
" <guid>{}</guid>".format(xml_escape(post_url)),
" <author>{}</author>".format(xml_escape(author)),
" <category>{}</category>".format(xml_escape(category)),
" <pubDate>{}</pubDate>".format(xml_escape(pub_date)),
" </item>",
])
lines.extend([
" </channel>",
"</rss>",
])
xml = "\n".join(lines)
resp = make_response(xml)
resp.mimetype = "application/rss+xml"
resp.headers["Cache-Control"] = "public, max-age=900"
return resp
# ---------- 法务页面 ----------
@app.route("/privacy")
def privacy_policy():
lang = _get_lang()
page_title = _pick_lang("隐私政策 | 云价眼", "Privacy Policy | VPS Price", lang)
page_description = _pick_lang(
"了解云价眼如何收集、使用和保护站点访客与论坛用户数据。",
"How VPS Price collects, uses, and protects visitor and forum user data.",
lang,
)
canonical_url = _public_url("privacy_policy", lang=lang)
seo = {
"title": page_title,
"description": page_description,
"canonical_url": canonical_url,
"robots": "index,follow,max-image-preview:large",
"og_type": "article",
"og_url": canonical_url,
"og_title": page_title,
"og_description": page_description,
"og_image": _absolute_url_for("static", filename="img/site-logo-mark.svg"),
"twitter_card": "summary",
"twitter_title": page_title,
"twitter_description": page_description,
"alternate_links": _alternate_lang_links("privacy_policy"),
}
seo_schema = {
"@context": "https://schema.org",
"@type": "WebPage",
"name": page_title,
"description": page_description,
"url": canonical_url,
"inLanguage": "en-US" if lang == "en" else "zh-CN",
}
return render_template(
"privacy.html",
seo=seo,
seo_schema=seo_schema,
updated_on="2026-02-10",
)
@app.route("/terms")
def terms_of_service():
lang = _get_lang()
page_title = _pick_lang("服务条款 | 云价眼", "Terms of Service | VPS Price", lang)
page_description = _pick_lang(
"查看云价眼的服务范围、免责声明与论坛使用规范。",
"Read the VPS Price service scope, disclaimers, and forum usage rules.",
lang,
)
canonical_url = _public_url("terms_of_service", lang=lang)
seo = {
"title": page_title,
"description": page_description,
"canonical_url": canonical_url,
"robots": "index,follow,max-image-preview:large",
"og_type": "article",
"og_url": canonical_url,
"og_title": page_title,
"og_description": page_description,
"og_image": _absolute_url_for("static", filename="img/site-logo-mark.svg"),
"twitter_card": "summary",
"twitter_title": page_title,
"twitter_description": page_description,
"alternate_links": _alternate_lang_links("terms_of_service"),
}
seo_schema = {
"@context": "https://schema.org",
"@type": "WebPage",
"name": page_title,
"description": page_description,
"url": canonical_url,
"inLanguage": "en-US" if lang == "en" else "zh-CN",
}
return render_template(
"terms.html",
seo=seo,
seo_schema=seo_schema,
updated_on="2026-02-10",
)
# ---------- SEO ----------
@app.route("/sitemap.xml")
def sitemap():
latest_forum_dt = _latest_forum_content_datetime()
sitemap_lastmod = _iso8601_utc(latest_forum_dt)
total_pages = _forum_sitemap_total_pages()
entries = [{
"loc": _absolute_url_for("sitemap_static"),
"lastmod": sitemap_lastmod,
}]
for lang_code in ("zh", "en"):
for page in range(1, total_pages + 1):
entries.append({
"loc": _absolute_url_for("sitemap_forum_page", lang_code=lang_code, page=page),
"lastmod": sitemap_lastmod,
})
xml = _build_sitemap_index_xml(entries)
resp = make_response(xml)
resp.mimetype = "application/xml"
resp.headers["Cache-Control"] = "public, max-age=1800"
return resp
@app.route("/sitemap-static.xml")
def sitemap_static():
latest_forum_dt = _latest_forum_content_datetime()
latest_forum_lastmod = _iso8601_utc(latest_forum_dt)
urls = []
def add_page(endpoint, changefreq, priority, params=None, lastmod=None):
values = params or {}
alternates = _sitemap_alternates(endpoint, **values)
for lang_code in ("zh", "en"):
urls.append({
"loc": _public_url(endpoint, lang=lang_code, **values),
"changefreq": changefreq,
"priority": priority,
"lastmod": lastmod,
"alternates": alternates,
})
add_page("index", "daily", "1.0")
add_page("forum_index", "daily", "0.9", lastmod=latest_forum_lastmod)
add_page("forum_feed", "hourly", "0.4", lastmod=latest_forum_lastmod)
add_page("forum_index", "daily", "0.8", params={"tab": "new"}, lastmod=latest_forum_lastmod)
add_page("forum_index", "daily", "0.8", params={"tab": "hot"}, lastmod=latest_forum_lastmod)
add_page("privacy_policy", "monthly", "0.3", lastmod="2026-02-10T00:00:00Z")
add_page("terms_of_service", "monthly", "0.3", lastmod="2026-02-10T00:00:00Z")
category_rows = (
db.session.query(
ForumPost.category,
func.max(func.coalesce(ForumPost.updated_at, ForumPost.created_at)).label("latest_at"),
)
.filter(ForumPost.category.isnot(None), ForumPost.category != "")
.group_by(ForumPost.category)
.order_by(func.count(ForumPost.id).desc(), ForumPost.category.asc())
.limit(300)
.all()
)
for category_name, latest_at in category_rows:
add_page(
"forum_index",
"daily",
"0.75",
params={"category": category_name},
lastmod=_iso8601_utc(latest_at),
)
xml = _build_sitemap_urlset_xml(urls)
resp = make_response(xml)
resp.mimetype = "application/xml"
resp.headers["Cache-Control"] = "public, max-age=1800"
return resp
@app.route("/sitemap-forum-<lang_code>-<int:page>.xml")
def sitemap_forum_page(lang_code, page):
normalized_lang = (lang_code or "").strip().lower()
if normalized_lang not in {"zh", "en"}:
abort(404)
total_pages = _forum_sitemap_total_pages()
if page < 1 or page > total_pages:
abort(404)
offset = (page - 1) * SITEMAP_POSTS_PER_FILE
rows = (
db.session.query(ForumPost.id, ForumPost.updated_at, ForumPost.created_at)
.order_by(ForumPost.updated_at.desc(), ForumPost.id.desc())
.offset(offset)
.limit(SITEMAP_POSTS_PER_FILE)
.all()
)
urls = []
for post_id, updated_at, created_at in rows:
lastmod = _iso8601_utc(updated_at or created_at)
urls.append({
"loc": _public_url("forum_post_detail", lang=normalized_lang, post_id=post_id),
"changefreq": "weekly",
"priority": "0.8",
"lastmod": lastmod,
"alternates": _sitemap_alternates("forum_post_detail", post_id=post_id),
})
xml = _build_sitemap_urlset_xml(urls)
resp = make_response(xml)
resp.mimetype = "application/xml"
resp.headers["Cache-Control"] = "public, max-age=1800"
return resp
@app.route("/robots.txt")
def robots():
txt = """User-agent: *
Allow: /
Allow: /forum/feed.xml
Disallow: /admin/
Disallow: /login
Disallow: /register
Disallow: /profile
Disallow: /me
Disallow: /notifications
Disallow: /notification/
Disallow: /forum/post/new
Disallow: /forum/post/*/edit
Disallow: /forum/comment/*/edit
Disallow: /forum/report
Disallow: /api/
Disallow: /*?*q=
Sitemap: {}/sitemap.xml
""".format(_site_root_url())
resp = make_response(txt)
resp.mimetype = "text/plain"
resp.headers["Cache-Control"] = "public, max-age=3600"
return resp
@app.route("/ads.txt")
def ads_txt():
content = (os.environ.get("ADS_TXT_CONTENT") or "").strip()
if content:
from flask import make_response
body = content if content.endswith("\n") else "{}\n".format(content)
resp = make_response(body)
resp.mimetype = "text/plain"
resp.headers["Cache-Control"] = "public, max-age=3600"
return resp
ads_file = os.path.join(app.static_folder or "", "ads.txt")
if os.path.isfile(ads_file):
return send_from_directory(app.static_folder or "", "ads.txt")
from flask import make_response
resp = make_response("# Configure ADS_TXT_CONTENT or create static/ads.txt\n")
resp.mimetype = "text/plain"
resp.headers["Cache-Control"] = "public, max-age=600"
return resp
@app.route("/favicon.ico")
def favicon():
return redirect(url_for("static", filename="img/site-logo-mark.svg"))
# ---------- 后台 ----------
@app.route("/admin/login", methods=["GET", "POST"])
def admin_login():
if request.method == "POST":
password = request.form.get("password", "")
if password == ADMIN_PASSWORD:
session["admin_logged_in"] = True
return redirect(url_for("admin_dashboard"))
return render_template("admin/login.html", error="密码错误")
return render_template("admin/login.html")
@app.route("/admin/logout")
def admin_logout():
session.pop("admin_logged_in", None)
return redirect(url_for("index"))
@app.route("/admin/api/plan/<int:plan_id>")
@admin_required
def admin_api_plan(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
return jsonify({
"id": plan.id,
"provider_id": plan.provider_id,
"countries": plan.countries or "",
"vcpu": plan.vcpu,
"memory_gb": plan.memory_gb,
"storage_gb": plan.storage_gb,
"bandwidth_mbps": plan.bandwidth_mbps,
"traffic": plan.traffic or "",
"price_cny": float(plan.price_cny) if plan.price_cny is not None else None,
"price_usd": float(plan.price_usd) if plan.price_usd is not None else None,
"currency": plan.currency or "CNY",
"official_url": plan.official_url or "",
})
@app.route("/admin/api/plan/<int:plan_id>/price-history")
@admin_required
def admin_api_plan_price_history(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
rows = (
PriceHistory.query
.filter_by(plan_id=plan.id)
.order_by(PriceHistory.captured_at.desc(), PriceHistory.id.desc())
.limit(30)
.all()
)
return jsonify([
{
"id": r.id,
"price_cny": float(r.price_cny) if r.price_cny is not None else None,
"price_usd": float(r.price_usd) if r.price_usd is not None else None,
"currency": r.currency or "CNY",
"source": r.source or "",
"captured_at": r.captured_at.isoformat() if r.captured_at else "",
}
for r in rows
])
@app.route("/admin")
@admin_required
def admin_dashboard():
providers = Provider.query.order_by(Provider.name).all()
plans = _query_plans_for_display()
plan_trends = _build_plan_trend_map(plans)
return render_template(
"admin/dashboard.html",
providers=providers,
plans=plans,
plan_trends=plan_trends,
country_tags=COUNTRY_TAGS,
)
# ---------- 厂商管理 ----------
@app.route("/admin/providers")
@admin_required
def admin_providers():
providers = Provider.query.order_by(Provider.name).all()
return render_template("admin/providers.html", providers=providers)
@app.route("/admin/provider/new", methods=["GET", "POST"])
@admin_required
def admin_provider_new():
if request.method == "POST":
name = request.form.get("name", "").strip()
official_url = request.form.get("official_url", "").strip() or None
if not name:
return render_template("admin/provider_form.html", provider=None, error="请填写厂商名称")
if Provider.query.filter_by(name=name).first():
return render_template("admin/provider_form.html", provider=None, error="该厂商名称已存在")
p = Provider(name=name, official_url=official_url)
db.session.add(p)
db.session.commit()
return redirect(url_for("admin_provider_detail", provider_id=p.id))
return render_template("admin/provider_form.html", provider=None)
@app.route("/admin/provider/<int:provider_id>")
@admin_required
def admin_provider_detail(provider_id):
provider = Provider.query.get_or_404(provider_id)
plans = (
VPSPlan.query
.options(joinedload(VPSPlan.provider_rel))
.filter(
(VPSPlan.provider_id == provider_id) | (VPSPlan.provider == provider.name)
)
.order_by(VPSPlan.price_cny.asc(), VPSPlan.name)
.all()
)
providers = Provider.query.order_by(Provider.name).all()
plan_trends = _build_plan_trend_map(plans)
return render_template(
"admin/provider_detail.html",
provider=provider,
plans=plans,
plan_trends=plan_trends,
providers=providers,
country_tags=COUNTRY_TAGS,
)
@app.route("/admin/provider/<int:provider_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_provider_edit(provider_id):
provider = Provider.query.get_or_404(provider_id)
if request.method == "POST":
provider.name = request.form.get("name", "").strip()
provider.official_url = request.form.get("official_url", "").strip() or None
if not provider.name:
return render_template("admin/provider_form.html", provider=provider, error="请填写厂商名称")
db.session.commit()
return redirect(url_for("admin_provider_detail", provider_id=provider.id))
return render_template("admin/provider_form.html", provider=provider)
@app.route("/admin/provider/<int:provider_id>/delete", methods=["POST"])
@admin_required
def admin_provider_delete(provider_id):
provider = Provider.query.get_or_404(provider_id)
# 将该厂商下的配置改为无厂商关联(保留配置,仅清空 provider_id
VPSPlan.query.filter_by(provider_id=provider_id).update({"provider_id": None})
db.session.delete(provider)
db.session.commit()
_invalidate_plans_cache()
return redirect(url_for("admin_providers"))
def _parse_sort_order(raw, default=100):
s = (raw or "").strip()
if not s:
return default
try:
return int(s)
except ValueError:
return default
def _admin_user_counts(user_ids):
"""批量统计用户维度数据,减少列表页 N+1 查询。"""
if not user_ids:
return {
"posts": {},
"comments": {},
"reports": {},
"unread_notifications": {},
}
post_counts = {
uid: int(cnt or 0)
for uid, cnt in (
db.session.query(ForumPost.user_id, func.count(ForumPost.id))
.filter(ForumPost.user_id.in_(user_ids))
.group_by(ForumPost.user_id)
.all()
)
}
comment_counts = {
uid: int(cnt or 0)
for uid, cnt in (
db.session.query(ForumComment.user_id, func.count(ForumComment.id))
.filter(ForumComment.user_id.in_(user_ids))
.group_by(ForumComment.user_id)
.all()
)
}
report_counts = {
uid: int(cnt or 0)
for uid, cnt in (
db.session.query(ForumReport.reporter_id, func.count(ForumReport.id))
.filter(ForumReport.reporter_id.in_(user_ids))
.group_by(ForumReport.reporter_id)
.all()
)
}
unread_notification_counts = {
uid: int(cnt or 0)
for uid, cnt in (
db.session.query(ForumNotification.user_id, func.count(ForumNotification.id))
.filter(ForumNotification.user_id.in_(user_ids), ForumNotification.is_read.is_(False))
.group_by(ForumNotification.user_id)
.all()
)
}
return {
"posts": post_counts,
"comments": comment_counts,
"reports": report_counts,
"unread_notifications": unread_notification_counts,
}
def _admin_load_user_options(limit=400):
return (
User.query
.order_by(User.username.asc(), User.id.asc())
.limit(limit)
.all()
)
def _admin_load_post_options(limit=400):
return (
ForumPost.query
.order_by(ForumPost.created_at.desc(), ForumPost.id.desc())
.limit(limit)
.all()
)
def _admin_fill_post_and_user_options(post_options, selected_post_id, user_options, selected_user_id):
"""确保编辑场景中的当前值始终出现在下拉框中。"""
if selected_post_id and all(p.id != selected_post_id for p in post_options):
selected_post = db.session.get(ForumPost, selected_post_id)
if selected_post:
post_options = [selected_post] + post_options
if selected_user_id and all(u.id != selected_user_id for u in user_options):
selected_user = db.session.get(User, selected_user_id)
if selected_user:
user_options = [selected_user] + user_options
return post_options, user_options
# ---------- 论坛分类管理 ----------
@app.route("/admin/forum/categories")
@admin_required
def admin_forum_categories():
categories = _load_forum_categories(active_only=False)
posts_by_category = {
name: count
for name, count in (
db.session.query(ForumPost.category, func.count(ForumPost.id))
.group_by(ForumPost.category)
.all()
)
}
return render_template(
"admin/forum_categories.html",
categories=categories,
posts_by_category=posts_by_category,
msg=request.args.get("msg", ""),
error=request.args.get("error", ""),
)
@app.route("/admin/forum/category/new", methods=["GET", "POST"])
@admin_required
def admin_forum_category_new():
error = ""
name_val = ""
sort_order_val = 100
is_active_val = True
if request.method == "POST":
name_val = (request.form.get("name") or "").strip()
sort_order_val = _parse_sort_order(request.form.get("sort_order"), 100)
is_active_val = bool(request.form.get("is_active"))
if not name_val:
error = "请填写分类名称"
elif len(name_val) > 32:
error = "分类名称最多 32 个字符"
elif ForumCategory.query.filter(func.lower(ForumCategory.name) == name_val.lower()).first():
error = "分类名称已存在"
else:
db.session.add(ForumCategory(
name=name_val,
sort_order=sort_order_val,
is_active=is_active_val,
))
db.session.commit()
return redirect(url_for("admin_forum_categories", msg="已新增分类:{}".format(name_val)))
return render_template(
"admin/forum_category_form.html",
page_title="新增论坛分类",
submit_text="创建分类",
action_url=url_for("admin_forum_category_new"),
error=error,
name_val=name_val,
sort_order_val=sort_order_val,
is_active_val=is_active_val,
category_id=None,
)
@app.route("/admin/forum/category/<int:category_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_forum_category_edit(category_id):
category = ForumCategory.query.get_or_404(category_id)
error = ""
name_val = category.name
sort_order_val = category.sort_order
is_active_val = bool(category.is_active)
if request.method == "POST":
name_val = (request.form.get("name") or "").strip()
sort_order_val = _parse_sort_order(request.form.get("sort_order"), category.sort_order)
is_active_val = bool(request.form.get("is_active"))
if not name_val:
error = "请填写分类名称"
elif len(name_val) > 32:
error = "分类名称最多 32 个字符"
elif category.is_active and not is_active_val and ForumCategory.query.filter_by(is_active=True).count() <= 1:
error = "至少保留一个启用分类"
else:
exists = (
ForumCategory.query
.filter(func.lower(ForumCategory.name) == name_val.lower(), ForumCategory.id != category.id)
.first()
)
if exists:
error = "分类名称已存在"
else:
old_name = category.name
category.name = name_val
category.sort_order = sort_order_val
category.is_active = is_active_val
if old_name != name_val:
ForumPost.query.filter_by(category=old_name).update({"category": name_val})
db.session.commit()
return redirect(url_for("admin_forum_categories", msg="已更新分类:{}".format(name_val)))
return render_template(
"admin/forum_category_form.html",
page_title="编辑论坛分类",
submit_text="保存修改",
action_url=url_for("admin_forum_category_edit", category_id=category.id),
error=error,
name_val=name_val,
sort_order_val=sort_order_val,
is_active_val=is_active_val,
category_id=category.id,
)
@app.route("/admin/forum/category/<int:category_id>/delete", methods=["POST"])
@admin_required
def admin_forum_category_delete(category_id):
category = ForumCategory.query.get_or_404(category_id)
total = ForumCategory.query.count()
if total <= 1:
return redirect(url_for("admin_forum_categories", error="至少保留一个分类,无法删除最后一个"))
if category.is_active and ForumCategory.query.filter_by(is_active=True).count() <= 1:
return redirect(url_for("admin_forum_categories", error="至少保留一个启用分类,无法删除最后一个启用项"))
replacement = (
ForumCategory.query
.filter(ForumCategory.id != category.id)
.order_by(ForumCategory.is_active.desc(), ForumCategory.sort_order.asc(), ForumCategory.id.asc())
.first()
)
if replacement is None:
return redirect(url_for("admin_forum_categories", error="未找到可替代分类"))
ForumPost.query.filter_by(category=category.name).update({"category": replacement.name})
db.session.delete(category)
db.session.commit()
return redirect(url_for("admin_forum_categories", msg="已删除分类,帖子迁移到:{}".format(replacement.name)))
def _get_report_target_info(report):
"""返回举报目标的展示信息。"""
info = {
"exists": False,
"post_id": None,
"title": report.snapshot_title or "",
"content": report.snapshot_content or "",
"author_name": "",
}
if report.target_type == "post":
post = db.session.get(ForumPost, report.target_id)
if post:
info.update({
"exists": True,
"post_id": post.id,
"title": post.title or info["title"],
"content": post.content or info["content"],
"author_name": post.author_rel.username if post.author_rel else "",
})
elif report.target_type == "comment":
comment = db.session.get(ForumComment, report.target_id)
if comment:
info.update({
"exists": True,
"post_id": comment.post_id,
"title": comment.post_rel.title if comment.post_rel else (info["title"] or ""),
"content": comment.content or info["content"],
"author_name": comment.author_rel.username if comment.author_rel else "",
})
if info["content"] and len(info["content"]) > 140:
info["content"] = info["content"][:140] + "..."
return info
@app.route("/admin/forum/reports")
@admin_required
def admin_forum_reports():
status = (request.args.get("status") or "pending").strip().lower()
if status not in {"pending", "processed", "rejected", "all"}:
status = "pending"
q = ForumReport.query.order_by(ForumReport.created_at.desc(), ForumReport.id.desc())
if status != "all":
q = q.filter_by(status=status)
reports = q.limit(300).all()
report_items = []
for r in reports:
report_items.append({
"report": r,
"target": _get_report_target_info(r),
"reporter_name": r.reporter_rel.username if r.reporter_rel else "用户",
})
grouped = (
db.session.query(ForumReport.status, func.count(ForumReport.id))
.group_by(ForumReport.status)
.all()
)
count_map = {k: int(v or 0) for k, v in grouped}
return render_template(
"admin/forum_reports.html",
status=status,
report_items=report_items,
status_count_map=count_map,
status_labels=FORUM_REPORT_STATUS_LABELS,
msg=request.args.get("msg", ""),
error=request.args.get("error", ""),
)
@app.route("/admin/forum/report/<int:report_id>/process", methods=["POST"])
@admin_required
def admin_forum_report_process(report_id):
report = ForumReport.query.get_or_404(report_id)
action = (request.form.get("action") or "").strip().lower()
review_note = (request.form.get("review_note") or "").strip()
if len(review_note) > 500:
review_note = review_note[:500]
if report.status != "pending":
return redirect(url_for("admin_forum_reports", error="该举报已处理"))
outcome = ""
target_owner_id = None
target_post_id = None
target_kind_label = "内容"
if action == "delete_target":
deleted = False
if report.target_type == "post":
target = db.session.get(ForumPost, report.target_id)
if target:
target_owner_id = target.user_id
target_post_id = target.id
target_kind_label = "帖子"
db.session.delete(target)
deleted = True
outcome = "已删除被举报帖子" if deleted else "目标帖子已不存在"
elif report.target_type == "comment":
target = db.session.get(ForumComment, report.target_id)
if target:
target_owner_id = target.user_id
target_post_id = target.post_id
target_kind_label = "评论"
db.session.delete(target)
deleted = True
outcome = "已删除被举报评论" if deleted else "目标评论已不存在"
else:
return redirect(url_for("admin_forum_reports", error="未知举报目标类型"))
report.status = "processed"
report.review_note = review_note or outcome
elif action == "keep":
report.status = "processed"
report.review_note = review_note or "审核后保留内容"
outcome = "已标记为保留"
elif action == "reject":
report.status = "rejected"
report.review_note = review_note or "举报不成立"
outcome = "已驳回举报"
else:
return redirect(url_for("admin_forum_reports", error="未知处理动作"))
report.reviewed_by = "admin"
report.reviewed_at = datetime.now(timezone.utc)
_create_notification(
user_id=report.reporter_id,
notif_type="report_processed",
message="你提交的举报(#{})处理结果:{}".format(report.id, outcome),
report_id=report.id,
post_id=target_post_id,
)
if action == "delete_target" and target_owner_id and target_owner_id != report.reporter_id:
_create_notification(
user_id=target_owner_id,
notif_type="content_removed",
message="你的{}因举报处理已被删除".format(target_kind_label),
report_id=report.id,
post_id=target_post_id,
)
db.session.commit()
return redirect(url_for("admin_forum_reports", msg=outcome))
@app.route("/admin/users")
@admin_required
def admin_users():
keyword = (request.args.get("q") or "").strip()
if len(keyword) > 50:
keyword = keyword[:50]
q = User.query
if keyword:
pattern = "%{}%".format(keyword)
q = q.filter(User.username.ilike(pattern))
users = q.order_by(User.created_at.desc(), User.id.desc()).limit(300).all()
user_ids = [u.id for u in users]
count_maps = _admin_user_counts(user_ids)
return render_template(
"admin/users.html",
users=users,
keyword=keyword,
post_count_map=count_maps["posts"],
comment_count_map=count_maps["comments"],
report_count_map=count_maps["reports"],
unread_notification_count_map=count_maps["unread_notifications"],
msg=request.args.get("msg", ""),
error=request.args.get("error", ""),
)
@app.route("/admin/user/new", methods=["GET", "POST"])
@admin_required
def admin_user_new():
error = ""
username_val = ""
if request.method == "POST":
username_val = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
confirm_password = request.form.get("confirm_password") or ""
if not _is_valid_username(username_val):
error = "用户名需为 3-20 位,仅支持字母、数字、下划线"
elif len(password) < 6:
error = "密码至少 6 位"
elif password != confirm_password:
error = "两次输入的密码不一致"
elif User.query.filter(func.lower(User.username) == username_val.lower()).first():
error = "用户名已存在"
else:
user = User(username=username_val)
user.set_password(password)
db.session.add(user)
db.session.commit()
return redirect(url_for("admin_users", msg="已新增用户:{}".format(username_val)))
return render_template(
"admin/user_form.html",
page_title="新增用户",
submit_text="创建用户",
action_url=url_for("admin_user_new"),
error=error,
username_val=username_val,
user_id=None,
)
@app.route("/admin/user/<int:user_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_user_edit(user_id):
user = User.query.get_or_404(user_id)
error = ""
username_val = user.username or ""
if request.method == "POST":
username_val = (request.form.get("username") or "").strip()
new_password = request.form.get("new_password") or ""
confirm_password = request.form.get("confirm_password") or ""
if not _is_valid_username(username_val):
error = "用户名需为 3-20 位,仅支持字母、数字、下划线"
elif (
User.query
.filter(func.lower(User.username) == username_val.lower(), User.id != user.id)
.first()
):
error = "用户名已存在"
elif new_password and len(new_password) < 6:
error = "新密码至少 6 位"
elif new_password and new_password != confirm_password:
error = "两次新密码输入不一致"
else:
old_username = user.username
user.username = username_val
changed = False
if old_username != username_val:
changed = True
if new_password:
user.set_password(new_password)
changed = True
if changed:
db.session.commit()
return redirect(url_for("admin_users", msg="已更新用户:{}".format(username_val)))
return redirect(url_for("admin_users", msg="未检测到变更"))
return render_template(
"admin/user_form.html",
page_title="编辑用户",
submit_text="保存修改",
action_url=url_for("admin_user_edit", user_id=user.id),
error=error,
username_val=username_val,
user_id=user.id,
)
@app.route("/admin/user/<int:user_id>/delete", methods=["POST"])
@admin_required
def admin_user_delete(user_id):
user = User.query.get_or_404(user_id)
if User.query.count() <= 1:
return redirect(url_for("admin_users", error="至少保留一个用户,无法删除最后一个"))
username = user.username or "用户"
try:
# 其他用户已收到的通知可能引用该用户为 actor删除前置空避免外键冲突。
ForumNotification.query.filter_by(actor_id=user.id).update({"actor_id": None}, synchronize_session=False)
db.session.delete(user)
db.session.commit()
return redirect(url_for("admin_users", msg="已删除用户:{}".format(username)))
except Exception:
db.session.rollback()
return redirect(url_for("admin_users", error="删除失败,请稍后重试"))
@app.route("/admin/user/<int:user_id>/ban", methods=["POST"])
@admin_required
def admin_user_ban(user_id):
user = User.query.get_or_404(user_id)
reason = (request.form.get("reason") or "").strip()
if len(reason) > 255:
reason = reason[:255]
if user.is_banned:
return redirect(url_for("admin_users", msg="用户已处于封禁状态:{}".format(user.username)))
user.is_banned = True
user.banned_at = datetime.now(timezone.utc)
user.banned_reason = reason or "管理员封禁"
db.session.commit()
return redirect(url_for("admin_users", msg="已封禁用户:{}".format(user.username)))
@app.route("/admin/user/<int:user_id>/unban", methods=["POST"])
@admin_required
def admin_user_unban(user_id):
user = User.query.get_or_404(user_id)
if not user.is_banned:
return redirect(url_for("admin_users", msg="用户未被封禁:{}".format(user.username)))
user.is_banned = False
user.banned_at = None
user.banned_reason = None
db.session.commit()
return redirect(url_for("admin_users", msg="已解封用户:{}".format(user.username)))
@app.route("/admin/forum/posts")
@admin_required
def admin_forum_posts():
keyword = (request.args.get("q") or "").strip()
if len(keyword) > 80:
keyword = keyword[:80]
selected_category = (request.args.get("category") or "").strip() or None
selected_author_id = request.args.get("author_id", type=int)
comment_stats_subq = (
db.session.query(
ForumComment.post_id.label("post_id"),
func.count(ForumComment.id).label("comment_count"),
)
.group_by(ForumComment.post_id)
.subquery()
)
like_stats_subq = (
db.session.query(
ForumPostLike.post_id.label("post_id"),
func.count(ForumPostLike.id).label("like_count"),
)
.group_by(ForumPostLike.post_id)
.subquery()
)
bookmark_stats_subq = (
db.session.query(
ForumPostBookmark.post_id.label("post_id"),
func.count(ForumPostBookmark.id).label("bookmark_count"),
)
.group_by(ForumPostBookmark.post_id)
.subquery()
)
q = (
db.session.query(
ForumPost,
func.coalesce(comment_stats_subq.c.comment_count, 0).label("comment_count"),
User.username.label("author_name"),
func.coalesce(like_stats_subq.c.like_count, 0).label("like_count"),
func.coalesce(bookmark_stats_subq.c.bookmark_count, 0).label("bookmark_count"),
)
.outerjoin(comment_stats_subq, comment_stats_subq.c.post_id == ForumPost.id)
.outerjoin(like_stats_subq, like_stats_subq.c.post_id == ForumPost.id)
.outerjoin(bookmark_stats_subq, bookmark_stats_subq.c.post_id == ForumPost.id)
.outerjoin(User, User.id == ForumPost.user_id)
)
if selected_category:
q = q.filter(ForumPost.category == selected_category)
if selected_author_id:
q = q.filter(ForumPost.user_id == selected_author_id)
if keyword:
pattern = "%{}%".format(keyword)
q = q.filter(
or_(
ForumPost.title.ilike(pattern),
ForumPost.content.ilike(pattern),
User.username.ilike(pattern),
)
)
rows = q.order_by(ForumPost.is_pinned.desc(), ForumPost.created_at.desc(), ForumPost.id.desc()).limit(400).all()
category_names = list(_get_forum_category_names(active_only=False))
for (name,) in db.session.query(ForumPost.category).distinct().all():
if name and name not in category_names:
category_names.append(name)
if selected_category and selected_category not in category_names:
category_names.insert(0, selected_category)
author_rows = (
db.session.query(
User.id,
User.username,
func.count(ForumPost.id).label("post_count"),
)
.outerjoin(ForumPost, ForumPost.user_id == User.id)
.group_by(User.id)
.order_by(func.count(ForumPost.id).desc(), User.username.asc())
.limit(300)
.all()
)
return render_template(
"admin/forum_posts.html",
rows=rows,
category_names=category_names,
author_rows=author_rows,
keyword=keyword,
selected_category=selected_category,
selected_author_id=selected_author_id,
msg=request.args.get("msg", ""),
error=request.args.get("error", ""),
)
@app.route("/admin/forum/post/<int:post_id>/moderate", methods=["POST"])
@admin_required
def admin_forum_post_moderate(post_id):
post = ForumPost.query.get_or_404(post_id)
action = (request.form.get("action") or "").strip().lower()
if action == "pin":
post.is_pinned = True
elif action == "unpin":
post.is_pinned = False
elif action == "feature":
post.is_featured = True
elif action == "unfeature":
post.is_featured = False
elif action == "lock":
post.is_locked = True
elif action == "unlock":
post.is_locked = False
else:
return redirect(url_for("admin_forum_posts", error="未知帖子管理动作"))
db.session.commit()
return redirect(url_for("admin_forum_posts", msg="已更新帖子 #{} 状态".format(post.id)))
@app.route("/admin/forum/post/new", methods=["GET", "POST"])
@admin_required
def admin_forum_post_new():
error = ""
users = _admin_load_user_options(limit=400)
categories = _get_forum_category_names(active_only=False)
if not categories:
categories = list(DEFAULT_FORUM_CATEGORIES)
selected_author_id = request.args.get("author_id", type=int) or (users[0].id if users else None)
selected_category = request.args.get("category") or (categories[0] if categories else "综合讨论")
is_pinned_val = False
is_featured_val = False
is_locked_val = False
title_val = ""
content_val = ""
if request.method == "POST":
selected_author_id = request.form.get("author_id", type=int)
selected_category = (request.form.get("category") or "").strip() or selected_category
is_pinned_val = bool(request.form.get("is_pinned"))
is_featured_val = bool(request.form.get("is_featured"))
is_locked_val = bool(request.form.get("is_locked"))
title_val = (request.form.get("title") or "").strip()
content_val = (request.form.get("content") or "").strip()
author = db.session.get(User, selected_author_id or 0)
if not author:
error = "请选择有效作者"
elif selected_category not in categories:
error = "请选择有效分类"
elif len(title_val) < 5:
error = "标题至少 5 个字符"
elif len(title_val) > 160:
error = "标题不能超过 160 个字符"
elif len(content_val) < 10:
error = "内容至少 10 个字符"
else:
db.session.add(ForumPost(
user_id=author.id,
category=selected_category,
title=title_val,
content=content_val,
is_pinned=is_pinned_val,
is_featured=is_featured_val,
is_locked=is_locked_val,
))
db.session.commit()
return redirect(url_for("admin_forum_posts", msg="已新增帖子"))
if not users:
error = error or "当前没有可用用户,请先在用户管理中新增用户"
return render_template(
"admin/forum_post_form.html",
page_title="后台新增帖子",
submit_text="创建帖子",
action_url=url_for("admin_forum_post_new"),
cancel_url=url_for("admin_forum_posts"),
error=error,
users=users,
categories=categories,
selected_author_id=selected_author_id,
selected_category=selected_category,
is_pinned_val=is_pinned_val,
is_featured_val=is_featured_val,
is_locked_val=is_locked_val,
title_val=title_val,
content_val=content_val,
post_id=None,
)
@app.route("/admin/forum/post/<int:post_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_forum_post_edit(post_id):
post = ForumPost.query.get_or_404(post_id)
error = ""
users = _admin_load_user_options(limit=400)
categories = _get_forum_category_names(active_only=False)
if post.category and post.category not in categories:
categories.insert(0, post.category)
selected_author_id = post.user_id
selected_category = post.category or (categories[0] if categories else "综合讨论")
is_pinned_val = bool(post.is_pinned)
is_featured_val = bool(post.is_featured)
is_locked_val = bool(post.is_locked)
title_val = post.title or ""
content_val = post.content or ""
if request.method == "POST":
selected_author_id = request.form.get("author_id", type=int)
selected_category = (request.form.get("category") or "").strip() or selected_category
is_pinned_val = bool(request.form.get("is_pinned"))
is_featured_val = bool(request.form.get("is_featured"))
is_locked_val = bool(request.form.get("is_locked"))
title_val = (request.form.get("title") or "").strip()
content_val = (request.form.get("content") or "").strip()
author = db.session.get(User, selected_author_id or 0)
if not author:
error = "请选择有效作者"
elif selected_category not in categories:
error = "请选择有效分类"
elif len(title_val) < 5:
error = "标题至少 5 个字符"
elif len(title_val) > 160:
error = "标题不能超过 160 个字符"
elif len(content_val) < 10:
error = "内容至少 10 个字符"
else:
post.user_id = author.id
post.category = selected_category
post.is_pinned = is_pinned_val
post.is_featured = is_featured_val
post.is_locked = is_locked_val
post.title = title_val
post.content = content_val
db.session.commit()
return redirect(url_for("admin_forum_posts", msg="已更新帖子 #{}".format(post.id)))
if not users:
error = error or "当前没有可用用户,请先在用户管理中新增用户"
return render_template(
"admin/forum_post_form.html",
page_title="后台编辑帖子",
submit_text="保存修改",
action_url=url_for("admin_forum_post_edit", post_id=post.id),
cancel_url=url_for("admin_forum_posts"),
error=error,
users=users,
categories=categories,
selected_author_id=selected_author_id,
selected_category=selected_category,
is_pinned_val=is_pinned_val,
is_featured_val=is_featured_val,
is_locked_val=is_locked_val,
title_val=title_val,
content_val=content_val,
post_id=post.id,
)
@app.route("/admin/forum/post/<int:post_id>/delete", methods=["POST"])
@admin_required
def admin_forum_post_delete(post_id):
post = ForumPost.query.get_or_404(post_id)
db.session.delete(post)
db.session.commit()
return redirect(url_for("admin_forum_posts", msg="已删除帖子 #{}".format(post_id)))
@app.route("/admin/forum/comments")
@admin_required
def admin_forum_comments():
keyword = (request.args.get("q") or "").strip()
if len(keyword) > 80:
keyword = keyword[:80]
selected_author_id = request.args.get("author_id", type=int)
selected_post_id = request.args.get("post_id", type=int)
q = (
db.session.query(
ForumComment,
ForumPost.title.label("post_title"),
User.username.label("author_name"),
)
.join(ForumPost, ForumComment.post_id == ForumPost.id)
.outerjoin(User, User.id == ForumComment.user_id)
)
if selected_post_id:
q = q.filter(ForumComment.post_id == selected_post_id)
if selected_author_id:
q = q.filter(ForumComment.user_id == selected_author_id)
if keyword:
pattern = "%{}%".format(keyword)
q = q.filter(
or_(
ForumComment.content.ilike(pattern),
ForumPost.title.ilike(pattern),
User.username.ilike(pattern),
)
)
rows = q.order_by(ForumComment.created_at.desc(), ForumComment.id.desc()).limit(500).all()
author_rows = (
db.session.query(
User.id,
User.username,
func.count(ForumComment.id).label("comment_count"),
)
.outerjoin(ForumComment, ForumComment.user_id == User.id)
.group_by(User.id)
.order_by(func.count(ForumComment.id).desc(), User.username.asc())
.limit(300)
.all()
)
post_rows = (
db.session.query(
ForumPost.id,
ForumPost.title,
)
.order_by(ForumPost.created_at.desc(), ForumPost.id.desc())
.limit(300)
.all()
)
if selected_post_id and all(pid != selected_post_id for pid, _ in post_rows):
selected_post = db.session.get(ForumPost, selected_post_id)
if selected_post:
post_rows = [(selected_post.id, selected_post.title)] + post_rows
return render_template(
"admin/forum_comments.html",
rows=rows,
author_rows=author_rows,
post_rows=post_rows,
keyword=keyword,
selected_author_id=selected_author_id,
selected_post_id=selected_post_id,
msg=request.args.get("msg", ""),
error=request.args.get("error", ""),
)
@app.route("/admin/forum/comment/new", methods=["GET", "POST"])
@admin_required
def admin_forum_comment_new():
error = ""
post_options = _admin_load_post_options(limit=400)
user_options = _admin_load_user_options(limit=400)
selected_post_id = request.args.get("post_id", type=int) or (post_options[0].id if post_options else None)
selected_user_id = request.args.get("user_id", type=int) or (user_options[0].id if user_options else None)
post_options, user_options = _admin_fill_post_and_user_options(
post_options,
selected_post_id,
user_options,
selected_user_id,
)
content_val = ""
if request.method == "POST":
selected_post_id = request.form.get("post_id", type=int)
selected_user_id = request.form.get("user_id", type=int)
content_val = (request.form.get("content") or "").strip()
post_options, user_options = _admin_fill_post_and_user_options(
post_options,
selected_post_id,
user_options,
selected_user_id,
)
target_post = db.session.get(ForumPost, selected_post_id or 0)
target_user = db.session.get(User, selected_user_id or 0)
if not target_post:
error = "请选择有效帖子"
elif not target_user:
error = "请选择有效用户"
elif len(content_val) < 2:
error = "评论至少 2 个字符"
else:
db.session.add(ForumComment(
post_id=target_post.id,
user_id=target_user.id,
content=content_val,
))
db.session.commit()
return redirect(url_for("admin_forum_comments", post_id=target_post.id, msg="已新增评论"))
if not post_options:
error = error or "暂无可评论的帖子,请先新增帖子"
elif not user_options:
error = error or "当前没有可用用户,请先在用户管理中新增用户"
return render_template(
"admin/forum_comment_form.html",
page_title="后台新增评论",
submit_text="创建评论",
action_url=url_for("admin_forum_comment_new"),
cancel_url=url_for("admin_forum_comments"),
error=error,
post_options=post_options,
user_options=user_options,
selected_post_id=selected_post_id,
selected_user_id=selected_user_id,
content_val=content_val,
comment_id=None,
)
@app.route("/admin/forum/comment/<int:comment_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_forum_comment_edit(comment_id):
comment = ForumComment.query.get_or_404(comment_id)
error = ""
post_options = _admin_load_post_options(limit=400)
user_options = _admin_load_user_options(limit=400)
selected_post_id = comment.post_id
selected_user_id = comment.user_id
post_options, user_options = _admin_fill_post_and_user_options(
post_options,
selected_post_id,
user_options,
selected_user_id,
)
content_val = comment.content or ""
if request.method == "POST":
selected_post_id = request.form.get("post_id", type=int)
selected_user_id = request.form.get("user_id", type=int)
content_val = (request.form.get("content") or "").strip()
post_options, user_options = _admin_fill_post_and_user_options(
post_options,
selected_post_id,
user_options,
selected_user_id,
)
target_post = db.session.get(ForumPost, selected_post_id or 0)
target_user = db.session.get(User, selected_user_id or 0)
if not target_post:
error = "请选择有效帖子"
elif not target_user:
error = "请选择有效用户"
elif len(content_val) < 2:
error = "评论至少 2 个字符"
else:
comment.post_id = target_post.id
comment.user_id = target_user.id
comment.content = content_val
db.session.commit()
return redirect(url_for("admin_forum_comments", post_id=target_post.id, msg="已更新评论 #{}".format(comment.id)))
if not post_options:
error = error or "暂无可评论的帖子,请先新增帖子"
elif not user_options:
error = error or "当前没有可用用户,请先在用户管理中新增用户"
return render_template(
"admin/forum_comment_form.html",
page_title="后台编辑评论",
submit_text="保存修改",
action_url=url_for("admin_forum_comment_edit", comment_id=comment.id),
cancel_url=url_for("admin_forum_comments", post_id=selected_post_id),
error=error,
post_options=post_options,
user_options=user_options,
selected_post_id=selected_post_id,
selected_user_id=selected_user_id,
content_val=content_val,
comment_id=comment.id,
)
@app.route("/admin/forum/comment/<int:comment_id>/delete", methods=["POST"])
@admin_required
def admin_forum_comment_delete(comment_id):
comment = ForumComment.query.get_or_404(comment_id)
post_id = comment.post_id
db.session.delete(comment)
db.session.commit()
return redirect(url_for("admin_forum_comments", post_id=post_id, msg="已删除评论 #{}".format(comment_id)))
@app.route("/admin/plan/new", methods=["GET", "POST"])
@admin_required
def admin_plan_new():
provider_id = request.args.get("provider_id", type=int)
if request.method == "POST":
return _save_plan(None)
providers = Provider.query.order_by(Provider.name).all()
return render_template(
"admin/plan_form.html",
plan=None,
country_tags=COUNTRY_TAGS,
providers=providers,
preselected_provider_id=provider_id,
)
@app.route("/admin/plan/<int:plan_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_plan_edit(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
if request.method == "POST":
return _save_plan(plan)
return redirect(url_for("admin_dashboard"))
def _parse_optional_int(s):
s = (s or "").strip()
if not s:
return None
try:
return int(s)
except ValueError:
return None
def _parse_optional_float(s):
s = (s or "").strip()
if not s:
return None
try:
return float(s)
except ValueError:
return None
def _save_plan(plan):
provider_id = request.form.get("provider_id", type=int)
countries = request.form.get("countries", "").strip() or None
vcpu = _parse_optional_int(request.form.get("vcpu"))
memory_gb = _parse_optional_int(request.form.get("memory_gb"))
storage_gb = _parse_optional_int(request.form.get("storage_gb"))
bandwidth_mbps = _parse_optional_int(request.form.get("bandwidth_mbps"))
traffic = request.form.get("traffic", "").strip() or None
price_cny = _parse_optional_float(request.form.get("price_cny"))
price_usd = _parse_optional_float(request.form.get("price_usd"))
currency = request.form.get("currency", "CNY").strip() or "CNY"
official_url = request.form.get("official_url", "").strip() or None
provider = None
if provider_id:
provider = db.session.get(Provider, provider_id)
if not provider:
providers = Provider.query.order_by(Provider.name).all()
return render_template(
"admin/plan_form.html",
plan=plan,
country_tags=COUNTRY_TAGS,
providers=providers,
preselected_provider_id=provider_id,
error="请选择厂商",
)
if plan is None:
plan = VPSPlan(
provider_id=provider.id,
provider=provider.name,
region=None,
name=None,
vcpu=vcpu,
memory_gb=memory_gb,
storage_gb=storage_gb,
bandwidth_mbps=bandwidth_mbps,
traffic=traffic,
price_cny=price_cny,
price_usd=price_usd,
currency=currency,
official_url=official_url,
countries=countries,
)
db.session.add(plan)
else:
plan.provider_id = provider.id
plan.provider = provider.name
plan.region = None
plan.name = None
plan.vcpu = vcpu
plan.memory_gb = memory_gb
plan.storage_gb = storage_gb
plan.bandwidth_mbps = bandwidth_mbps
plan.traffic = traffic
plan.price_cny = price_cny
plan.price_usd = price_usd
plan.currency = currency
plan.official_url = official_url
plan.countries = countries
db.session.flush()
_record_price_history(plan, source="manual")
db.session.commit()
_invalidate_plans_cache()
# 若从厂商详情页进入添加,保存后返回该厂商详情
from_provider_id = request.form.get("from_provider_id", type=int)
if from_provider_id:
return redirect(url_for("admin_provider_detail", provider_id=from_provider_id))
return redirect(url_for("admin_dashboard"))
@app.route("/admin/plan/<int:plan_id>/delete", methods=["POST"])
@admin_required
def admin_plan_delete(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
PriceHistory.query.filter_by(plan_id=plan_id).delete()
db.session.delete(plan)
db.session.commit()
_invalidate_plans_cache()
return redirect(url_for("admin_dashboard"))
# ---------- Excel 导出 / 导入 ----------
EXCEL_HEADERS = [
"厂商", "厂商官网", "国家", "vCPU", "内存GB", "存储GB", "带宽Mbps", "流量",
"月付人民币", "月付美元", "货币", "配置官网",
]
@app.route("/admin/export/excel")
@admin_required
def admin_export_excel():
wb = Workbook()
ws = wb.active
ws.title = "配置"
ws.append(EXCEL_HEADERS)
plans = _query_plans_for_display()
for p in plans:
provider_url = (p.provider_rel.official_url if p.provider_rel else "") or ""
ws.append([
p.provider_name,
provider_url or "",
p.countries or "",
p.vcpu if p.vcpu is not None else "",
p.memory_gb if p.memory_gb is not None else "",
p.storage_gb if p.storage_gb is not None else "",
p.bandwidth_mbps if p.bandwidth_mbps is not None else "",
p.traffic or "",
p.price_cny if p.price_cny is not None else "",
p.price_usd if p.price_usd is not None else "",
p.currency or "CNY",
p.official_url or "",
])
buf = io.BytesIO()
wb.save(buf)
buf.seek(0)
return send_file(
buf,
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
as_attachment=True,
download_name="vps_配置_导出.xlsx",
)
def _num(v):
if v is None or v == "":
return None
try:
return int(float(v))
except (ValueError, TypeError):
return None
def _float(v):
if v is None or v == "":
return None
try:
return float(v)
except (ValueError, TypeError):
return None
def _opt_text(v):
if v is None:
return None
s = str(v).strip()
return s or None
def _safe_str(v):
if v is None:
return ""
return str(v).strip()
def _eq_optional(a, b):
if a is None and b is None:
return True
if a is None or b is None:
return False
if isinstance(a, float) or isinstance(b, float):
return abs(float(a) - float(b)) < 1e-9
return a == b
def _record_price_history(plan, source):
if plan is None:
return
if plan.price_cny is None and plan.price_usd is None:
return
if plan.id is None:
db.session.flush()
latest = (
PriceHistory.query
.filter_by(plan_id=plan.id)
.order_by(PriceHistory.captured_at.desc(), PriceHistory.id.desc())
.first()
)
currency = _opt_text(plan.currency) or "CNY"
if latest:
same_currency = _safe_str(latest.currency).upper() == _safe_str(currency).upper()
if same_currency and _eq_optional(latest.price_cny, plan.price_cny) and _eq_optional(latest.price_usd, plan.price_usd):
return
db.session.add(PriceHistory(
plan_id=plan.id,
price_cny=plan.price_cny,
price_usd=plan.price_usd,
currency=currency,
source=source,
))
def _display_val(v):
if v is None or v == "":
return ""
if isinstance(v, float):
s = "{:.2f}".format(v).rstrip("0").rstrip(".")
return s if s else "0"
return str(v)
def _row_identity_key(row):
return (
_safe_str(row.get("厂商")),
_num(row.get("vCPU")),
_num(row.get("内存GB")),
_num(row.get("存储GB")),
_num(row.get("带宽Mbps")),
_safe_str(row.get("国家")),
_safe_str(row.get("流量")),
)
def _plan_identity_key(plan):
return (
_safe_str(plan.provider_name),
plan.vcpu,
plan.memory_gb,
plan.storage_gb,
plan.bandwidth_mbps,
_safe_str(plan.countries),
_safe_str(plan.traffic),
)
def _plan_diff(plan, row):
"""返回导入行相对于现有 plan 的差异列表。"""
fields = [
("国家", "countries", _opt_text(row.get("国家"))),
("vCPU", "vcpu", _num(row.get("vCPU"))),
("内存GB", "memory_gb", _num(row.get("内存GB"))),
("存储GB", "storage_gb", _num(row.get("存储GB"))),
("带宽Mbps", "bandwidth_mbps", _num(row.get("带宽Mbps"))),
("流量", "traffic", _opt_text(row.get("流量"))),
("月付人民币", "price_cny", _float(row.get("月付人民币"))),
("月付美元", "price_usd", _float(row.get("月付美元"))),
("货币", "currency", _opt_text(row.get("货币")) or "CNY"),
("配置官网", "official_url", _opt_text(row.get("配置官网"))),
]
diffs = []
for label, attr, new_value in fields:
old_value = getattr(plan, attr)
if not _eq_optional(old_value, new_value):
diffs.append({
"label": label,
"old": old_value,
"new": new_value,
"old_display": _display_val(old_value),
"new_display": _display_val(new_value),
})
return diffs
def _upsert_provider_from_row(row):
provider_name = _safe_str(row.get("厂商"))
if not provider_name:
return None
imported_provider_url = _opt_text(row.get("厂商官网"))
provider = Provider.query.filter_by(name=provider_name).first()
if not provider:
provider = Provider(name=provider_name, official_url=imported_provider_url)
db.session.add(provider)
db.session.flush()
elif imported_provider_url and provider.official_url != imported_provider_url:
provider.official_url = imported_provider_url
return provider
def _fill_plan_from_row(plan, row, provider):
plan.provider_id = provider.id
plan.provider = provider.name
plan.region = None
plan.name = None
plan.vcpu = _num(row.get("vCPU"))
plan.memory_gb = _num(row.get("内存GB"))
plan.storage_gb = _num(row.get("存储GB"))
plan.bandwidth_mbps = _num(row.get("带宽Mbps"))
plan.traffic = _opt_text(row.get("流量"))
plan.price_cny = _float(row.get("月付人民币"))
plan.price_usd = _float(row.get("月付美元"))
plan.currency = _opt_text(row.get("货币")) or "CNY"
plan.official_url = _opt_text(row.get("配置官网"))
plan.countries = _opt_text(row.get("国家"))
@app.route("/admin/import", methods=["GET", "POST"])
@admin_required
def admin_import():
if request.method == "GET":
return render_template("admin/import.html")
f = request.files.get("file")
if not f or not f.filename:
return render_template("admin/import.html", error="请选择 Excel 文件")
if not f.filename.lower().endswith(".xlsx"):
return render_template("admin/import.html", error="请上传 .xlsx 文件")
try:
wb = load_workbook(io.BytesIO(f.read()), read_only=True, data_only=True)
ws = wb.active
rows = list(ws.iter_rows(min_row=2, values_only=True))
except Exception as e:
return render_template("admin/import.html", error="解析失败: {}".format(str(e)))
headers = EXCEL_HEADERS
parsed = []
for row in rows:
if not any(cell is not None and str(cell).strip() for cell in row):
continue
d = {}
for i, h in enumerate(headers):
if i < len(row):
v = row[i]
if v is not None and hasattr(v, "strip"):
v = v.strip()
d[h] = v
else:
d[h] = None
parsed.append(d)
if not parsed:
return render_template("admin/import.html", error="文件中没有有效数据行")
plans = VPSPlan.query.all()
plan_index = {}
for p in plans:
key = _plan_identity_key(p)
if key not in plan_index:
plan_index[key] = p
seen_row_keys = set()
preview_items = []
for row in parsed:
key = _row_identity_key(row)
provider_name = key[0]
if not provider_name:
continue
if key in seen_row_keys:
continue
seen_row_keys.add(key)
matched = plan_index.get(key)
if not matched:
preview_items.append({
"action": "add",
"row": row,
"changes": [],
"provider_url_changed": False,
})
continue
changes = _plan_diff(matched, row)
imported_provider_url = _opt_text(row.get("厂商官网"))
old_provider_url = _opt_text(matched.provider_rel.official_url if matched.provider_rel else None)
provider_url_changed = bool(imported_provider_url and imported_provider_url != old_provider_url)
if changes or provider_url_changed:
preview_items.append({
"action": "update",
"plan_id": matched.id,
"row": row,
"changes": changes,
"provider_url_changed": provider_url_changed,
"provider_url_old": old_provider_url,
"provider_url_new": imported_provider_url,
})
session["import_preview"] = preview_items
return redirect(url_for("admin_import_preview"))
@app.route("/admin/import/preview", methods=["GET", "POST"])
@admin_required
def admin_import_preview():
preview_items = session.get("import_preview") or []
add_count = sum(1 for x in preview_items if x.get("action") == "add")
update_count = sum(1 for x in preview_items if x.get("action") == "update")
if request.method == "GET":
return render_template(
"admin/import_preview.html",
rows=list(enumerate(preview_items)),
add_count=add_count,
update_count=update_count,
)
selected = request.form.getlist("row_index")
if not selected:
return render_template(
"admin/import_preview.html",
rows=list(enumerate(preview_items)),
add_count=add_count,
update_count=update_count,
error="请至少勾选一行",
)
indices = sorted(set(int(x) for x in selected if x.isdigit()))
add_applied = 0
update_applied = 0
for i in indices:
if i < 0 or i >= len(preview_items):
continue
item = preview_items[i]
row = item.get("row") or {}
provider = _upsert_provider_from_row(row)
if not provider:
continue
action = item.get("action")
if action == "update":
plan = db.session.get(VPSPlan, item.get("plan_id"))
if not plan:
plan = VPSPlan()
db.session.add(plan)
add_applied += 1
else:
update_applied += 1
_fill_plan_from_row(plan, row, provider)
else:
plan = VPSPlan()
_fill_plan_from_row(plan, row, provider)
db.session.add(plan)
add_applied += 1
_record_price_history(plan, source="import")
db.session.commit()
_invalidate_plans_cache()
session.pop("import_preview", None)
msg = "已新增 {} 条,更新 {} 条配置".format(add_applied, update_applied)
return redirect(url_for("admin_dashboard") + "?" + urlencode({"msg": msg}))
if __name__ == "__main__":
app.run(debug=True, port=5001)