# -*- coding: utf-8 -*- """云服务器价格对比 - Flask 应用""" import io from datetime import datetime, timezone from urllib.parse import urlencode from flask import Flask, render_template, jsonify, request, redirect, url_for, session, send_file from werkzeug.middleware.proxy_fix import ProxyFix from sqlalchemy import text, func, or_ from config import Config from extensions import db from openpyxl import Workbook from openpyxl import load_workbook app = Flask(__name__) app.config.from_object(Config) # 部署在 Nginx 等反向代理后时,信任 X-Forwarded-* 头(HTTPS、真实 IP) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1) db.init_app(app) from models import ( VPSPlan, Provider, PriceHistory, User, ForumPost, ForumComment, ForumCategory, ForumReport, ForumNotification, ForumPostLike, ForumPostBookmark, ) # noqa: E402 def _ensure_mysql_columns(): """为已有 MySQL 表添加缺失列,避免 1054 Unknown column。""" try: engine = db.engine if engine.dialect.name != "mysql": return with engine.connect() as conn: for col, spec in [ ("traffic", "VARCHAR(64) NULL"), ("countries", "VARCHAR(255) NULL"), ("provider_id", "INT NULL"), ]: try: conn.execute(text("ALTER TABLE vps_plans ADD COLUMN {} {}".format(col, spec))) conn.commit() except Exception: conn.rollback() for col, spec in [ ("name", "VARCHAR(128) NULL"), ("region", "VARCHAR(128) NULL"), ("price_cny", "DOUBLE NULL"), ("price_usd", "DOUBLE NULL"), ]: try: conn.execute(text("ALTER TABLE vps_plans MODIFY COLUMN {} {}".format(col, spec))) conn.commit() except Exception: conn.rollback() except Exception: pass # 表不存在或非 MySQL 时忽略 def _ensure_forum_columns(): """为已有论坛表补齐后续新增字段。""" try: engine = db.engine dialect = engine.dialect.name with engine.connect() as conn: if dialect == "mysql": alters = [ "ALTER TABLE forum_posts ADD COLUMN category VARCHAR(32) NOT NULL DEFAULT '综合讨论'", "ALTER TABLE forum_posts ADD COLUMN view_count INT NOT NULL DEFAULT 0", ] else: alters = [ "ALTER TABLE forum_posts ADD COLUMN category TEXT DEFAULT '综合讨论'", "ALTER TABLE forum_posts ADD COLUMN view_count INTEGER DEFAULT 0", ] for sql in alters: try: conn.execute(text(sql)) conn.commit() except Exception: conn.rollback() except Exception: pass def _ensure_forum_manage_columns(): """为用户与论坛帖子补齐管理字段(封禁/置顶/精华/锁帖)。""" try: engine = db.engine dialect = engine.dialect.name with engine.connect() as conn: if dialect == "mysql": alters = [ "ALTER TABLE users ADD COLUMN is_banned TINYINT(1) NOT NULL DEFAULT 0", "ALTER TABLE users ADD COLUMN banned_at DATETIME NULL", "ALTER TABLE users ADD COLUMN banned_reason VARCHAR(255) NULL", "ALTER TABLE forum_posts ADD COLUMN is_pinned TINYINT(1) NOT NULL DEFAULT 0", "ALTER TABLE forum_posts ADD COLUMN is_featured TINYINT(1) NOT NULL DEFAULT 0", "ALTER TABLE forum_posts ADD COLUMN is_locked TINYINT(1) NOT NULL DEFAULT 0", ] else: alters = [ "ALTER TABLE users ADD COLUMN is_banned INTEGER DEFAULT 0", "ALTER TABLE users ADD COLUMN banned_at DATETIME", "ALTER TABLE users ADD COLUMN banned_reason TEXT", "ALTER TABLE forum_posts ADD COLUMN is_pinned INTEGER DEFAULT 0", "ALTER TABLE forum_posts ADD COLUMN is_featured INTEGER DEFAULT 0", "ALTER TABLE forum_posts ADD COLUMN is_locked INTEGER DEFAULT 0", ] for sql in alters: try: conn.execute(text(sql)) conn.commit() except Exception: conn.rollback() except Exception: pass DEFAULT_FORUM_CATEGORIES = [ "综合讨论", "VPS 评测", "优惠活动", "运维经验", "新手提问", ] def _ensure_forum_categories_seed(): """初始化论坛默认分类。""" try: if ForumCategory.query.count() > 0: return for idx, name in enumerate(DEFAULT_FORUM_CATEGORIES, start=1): db.session.add(ForumCategory( name=name, sort_order=idx * 10, is_active=True, )) db.session.commit() except Exception: db.session.rollback() def _ensure_price_history_baseline(): """为历史数据补首条价格快照,便于后续计算涨跌。""" try: missing = ( db.session.query(VPSPlan) .outerjoin(PriceHistory, PriceHistory.plan_id == VPSPlan.id) .filter(PriceHistory.id.is_(None)) .all() ) if not missing: return for p in missing: if p.price_cny is None and p.price_usd is None: continue db.session.add(PriceHistory( plan_id=p.id, price_cny=p.price_cny, price_usd=p.price_usd, currency=(p.currency or "CNY"), source="bootstrap", )) db.session.commit() except Exception: db.session.rollback() # 启动时自动创建表(若不存在),并为已有表补列 with app.app_context(): db.create_all() _ensure_mysql_columns() _ensure_forum_columns() _ensure_forum_manage_columns() _ensure_forum_categories_seed() _ensure_price_history_baseline() ADMIN_PASSWORD = app.config["ADMIN_PASSWORD"] SITE_URL = app.config["SITE_URL"] SITE_NAME = app.config["SITE_NAME"] # 国家/区域标签,供后台表单选择 COUNTRY_TAGS = [ "中国大陆", "中国香港", "中国台湾", "美国", "日本", "新加坡", "韩国", "德国", "英国", "法国", "荷兰", "加拿大", "澳大利亚", "印度", "其他", ] PRICE_SOURCE_LABELS = { "manual": "手工编辑", "import": "Excel 导入", "bootstrap": "基线", } FORUM_REPORT_REASONS = [ "垃圾广告", "辱骂攻击", "违法违规", "虚假信息", "其他", ] FORUM_REPORT_STATUS_LABELS = { "pending": "待处理", "processed": "已处理", "rejected": "已驳回", } FORUM_NOTIFICATION_TYPE_LABELS = { "post_commented": "帖子新评论", "thread_replied": "主题新回复", "report_processed": "举报处理结果", "content_removed": "内容处理通知", } def _get_current_user(): user_id = session.get("user_id") if not user_id: return None user = db.session.get(User, user_id) if not user: session.pop("user_id", None) return user def _is_banned_user(user): return bool(user and bool(user.is_banned)) def _user_ban_message(user): if not user: return "账号状态异常" reason = (user.banned_reason or "").strip() if reason: return "账号已被封禁:{}".format(reason) return "账号已被封禁" def _is_valid_username(username): if not username: return False if len(username) < 3 or len(username) > 20: return False return all(ch.isalnum() or ch == "_" for ch in username) def _safe_next_url(default_endpoint): nxt = (request.values.get("next") or "").strip() if nxt.startswith("/") and not nxt.startswith("//"): return nxt return url_for(default_endpoint) def _safe_form_next_url(default_url): nxt = (request.form.get("next") or request.args.get("next") or "").strip() if nxt.startswith("/") and not nxt.startswith("//"): return nxt return default_url def _create_notification( user_id, notif_type, message, actor_id=None, post_id=None, comment_id=None, report_id=None, ): """创建站内通知(由调用方控制事务提交)。""" if not user_id or not message: return db.session.add(ForumNotification( user_id=user_id, actor_id=actor_id, notif_type=notif_type, post_id=post_id, comment_id=comment_id, report_id=report_id, message=message[:255], is_read=False, )) def _notification_target_url(notification): if notification.post_id: exists = db.session.get(ForumPost, notification.post_id) if exists: return url_for("forum_post_detail", post_id=notification.post_id) return url_for("user_notifications") def _load_forum_categories(active_only=True): """读取论坛分类(默认只读启用项)。""" try: q = ForumCategory.query if active_only: q = q.filter_by(is_active=True) return q.order_by(ForumCategory.sort_order.asc(), ForumCategory.id.asc()).all() except Exception: return [] def _get_forum_category_names(active_only=True): rows = _load_forum_categories(active_only=active_only) names = [x.name for x in rows if x.name] if names: return names # 若全部被停用,前台仍回退到已存在分类,避免下拉为空。 if active_only: rows = _load_forum_categories(active_only=False) names = [x.name for x in rows if x.name] if names: return names return list(DEFAULT_FORUM_CATEGORIES) @app.context_processor def inject_global_user(): current_user = _get_current_user() notifications_unread_count = 0 if current_user: notifications_unread_count = ForumNotification.query.filter_by( user_id=current_user.id, is_read=False, ).count() return { "current_user": current_user, "admin_logged_in": bool(session.get("admin_logged_in")), "forum_categories": _get_forum_category_names(active_only=True), "forum_report_reasons": FORUM_REPORT_REASONS, "notifications_unread_count": notifications_unread_count, } def _humanize_time(dt): if not dt: return "" if dt.tzinfo is None: now = datetime.utcnow() else: now = datetime.now(dt.tzinfo) delta = now - dt seconds = int(delta.total_seconds()) if seconds < 0: return dt.strftime("%Y-%m-%d") if seconds < 60: return "刚刚" if seconds < 3600: return "{} 分钟前".format(seconds // 60) if seconds < 86400: return "{} 小时前".format(seconds // 3600) if seconds < 86400 * 14: return "{} 天前".format(seconds // 86400) return dt.strftime("%Y-%m-%d") def _build_forum_post_cards(rows): """将论坛查询结果行转换为列表卡片数据。""" cards = [] for post, reply_count, latest_activity, author_name, like_count, bookmark_count in rows: latest_activity = latest_activity or post.created_at username = author_name or "用户" cards.append({ "post": post, "reply_count": int(reply_count or 0), "view_count": int(post.view_count or 0), "like_count": int(like_count or 0), "bookmark_count": int(bookmark_count or 0), "latest_activity": latest_activity, "latest_activity_text": _humanize_time(latest_activity), "author_name": username, "author_initial": (username[0] if username else "?").upper(), }) return cards def _build_forum_url(tab="latest", category=None, q=None, page=1): """构建论坛列表页链接,并尽量保持 URL 简洁。""" params = {} if (tab or "latest") != "latest": params["tab"] = tab if category: params["category"] = category if q: params["q"] = q if page and int(page) > 1: params["page"] = int(page) return url_for("forum_index", **params) def _query_forum_post_rows(active_tab="latest", selected_category=None, search_query=None, author_id=None): """论坛列表查询:支持最新/新帖/热门 + 分类过滤 + 关键词搜索。""" comment_stats_subq = ( db.session.query( ForumComment.post_id.label("post_id"), func.count(ForumComment.id).label("comment_count"), func.max(ForumComment.created_at).label("latest_comment_at"), ) .group_by(ForumComment.post_id) .subquery() ) comment_count_expr = func.coalesce(comment_stats_subq.c.comment_count, 0) latest_activity_expr = func.coalesce(comment_stats_subq.c.latest_comment_at, ForumPost.created_at) like_stats_subq = ( db.session.query( ForumPostLike.post_id.label("post_id"), func.count(ForumPostLike.id).label("like_count"), ) .group_by(ForumPostLike.post_id) .subquery() ) bookmark_stats_subq = ( db.session.query( ForumPostBookmark.post_id.label("post_id"), func.count(ForumPostBookmark.id).label("bookmark_count"), ) .group_by(ForumPostBookmark.post_id) .subquery() ) like_count_expr = func.coalesce(like_stats_subq.c.like_count, 0) bookmark_count_expr = func.coalesce(bookmark_stats_subq.c.bookmark_count, 0) q = ( db.session.query( ForumPost, comment_count_expr.label("comment_count"), latest_activity_expr.label("latest_activity"), User.username.label("author_name"), like_count_expr.label("like_count"), bookmark_count_expr.label("bookmark_count"), ) .outerjoin(comment_stats_subq, comment_stats_subq.c.post_id == ForumPost.id) .outerjoin(like_stats_subq, like_stats_subq.c.post_id == ForumPost.id) .outerjoin(bookmark_stats_subq, bookmark_stats_subq.c.post_id == ForumPost.id) .outerjoin(User, User.id == ForumPost.user_id) ) if selected_category: q = q.filter(ForumPost.category == selected_category) if author_id is not None: q = q.filter(ForumPost.user_id == author_id) if search_query: pattern = "%{}%".format(search_query) q = q.filter( or_( ForumPost.title.ilike(pattern), ForumPost.content.ilike(pattern), User.username.ilike(pattern), ) ) if active_tab == "hot": q = q.order_by( ForumPost.is_pinned.desc(), comment_count_expr.desc(), like_count_expr.desc(), ForumPost.view_count.desc(), latest_activity_expr.desc(), ForumPost.id.desc(), ) elif active_tab == "new": q = q.order_by(ForumPost.is_pinned.desc(), ForumPost.created_at.desc(), ForumPost.id.desc()) else: q = q.order_by(ForumPost.is_pinned.desc(), latest_activity_expr.desc(), ForumPost.id.desc()) return q def _forum_sidebar_data(): category_counts = ( db.session.query(ForumPost.category, func.count(ForumPost.id)) .group_by(ForumPost.category) .order_by(func.count(ForumPost.id).desc()) .all() ) active_users = ( db.session.query(User.username, func.count(ForumPost.id).label("post_count")) .outerjoin(ForumPost, ForumPost.user_id == User.id) .group_by(User.id) .order_by(func.count(ForumPost.id).desc(), User.created_at.asc()) .limit(6) .all() ) return { "total_users": User.query.count(), "total_posts": ForumPost.query.count(), "total_comments": ForumComment.query.count(), "category_counts": list(category_counts), "active_users": list(active_users), } def _currency_symbol(currency): return "¥" if (currency or "CNY").upper() == "CNY" else "$" def _format_money(currency, value): return "{}{:.2f}".format(_currency_symbol(currency), float(value)) def _format_history_time(dt): return dt.strftime("%Y-%m-%d %H:%M") if dt else "" def _pick_price_pair(latest, previous=None): if previous is None: if latest.price_cny is not None: return "CNY", float(latest.price_cny), None if latest.price_usd is not None: return "USD", float(latest.price_usd), None return None, None, None if latest.price_cny is not None and previous.price_cny is not None: return "CNY", float(latest.price_cny), float(previous.price_cny) if latest.price_usd is not None and previous.price_usd is not None: return "USD", float(latest.price_usd), float(previous.price_usd) return None, None, None def _build_price_trend(latest, previous=None): currency, current_value, previous_value = _pick_price_pair(latest, previous) if currency is None or current_value is None: return None source = PRICE_SOURCE_LABELS.get(latest.source or "", latest.source or "未知来源") meta = "当前 {} · {} · {}".format( _format_money(currency, current_value), _format_history_time(latest.captured_at), source, ) if previous_value is None: return { "direction": "new", "delta_text": "首次记录", "meta_text": meta, } diff = current_value - previous_value if abs(diff) < 1e-9: return { "direction": "flat", "delta_text": "→ 持平", "meta_text": meta, } direction = "up" if diff > 0 else "down" arrow = "↑" if diff > 0 else "↓" sign = "+" if diff > 0 else "-" delta_text = "{} {}{}{:,.2f}".format(arrow, sign, _currency_symbol(currency), abs(diff)) if abs(previous_value) > 1e-9: pct = diff / previous_value * 100 delta_text += " ({:+.2f}%)".format(pct) return { "direction": direction, "delta_text": delta_text, "meta_text": meta, } def _build_plan_trend_map(plans): plan_ids = [p.id for p in plans if p.id is not None] if not plan_ids: return {} rows = ( PriceHistory.query .filter(PriceHistory.plan_id.in_(plan_ids)) .order_by(PriceHistory.plan_id.asc(), PriceHistory.captured_at.desc(), PriceHistory.id.desc()) .all() ) grouped = {} for row in rows: bucket = grouped.setdefault(row.plan_id, []) if len(bucket) < 2: bucket.append(row) result = {} for plan_id, bucket in grouped.items(): latest = bucket[0] if bucket else None previous = bucket[1] if len(bucket) > 1 else None trend = _build_price_trend(latest, previous) if latest else None if trend: result[plan_id] = trend return result def admin_required(f): from functools import wraps @wraps(f) def wrapped(*args, **kwargs): if not session.get("admin_logged_in"): return redirect(url_for("admin_login")) return f(*args, **kwargs) return wrapped def user_login_required(f): from functools import wraps @wraps(f) def wrapped(*args, **kwargs): user = _get_current_user() if not user: return redirect(url_for("user_login", next=request.path)) if _is_banned_user(user): session.pop("user_id", None) return redirect(url_for("user_login", next=request.path, error=_user_ban_message(user))) return f(*args, **kwargs) return wrapped def _ensure_forum_interaction_user(user, post_id=None): """校验当前登录用户是否可进行论坛互动动作。""" if not _is_banned_user(user): return None text = _user_ban_message(user) if post_id: return _forum_redirect_with_error(post_id, text) return redirect(url_for("forum_index", error=text)) def _can_edit_post(user, post): if not user or not post: return False return post.user_id == user.id def _can_edit_comment(user, comment): if not user or not comment: return False return comment.user_id == user.id def _forum_redirect_with_error(post_id, text_msg): return redirect(url_for("forum_post_detail", post_id=post_id, error=text_msg)) def _forum_redirect_with_msg(post_id, text_msg): return redirect(url_for("forum_post_detail", post_id=post_id, msg=text_msg)) # 首页多语言文案(中文 / English) I18N = { "zh": { "tagline": "云服务器价格一目了然", "filter_provider": "厂商", "filter_region": "区域", "filter_memory": "内存 ≥", "filter_price": "价格区间", "filter_currency": "货币", "search_placeholder": "搜索厂商、配置...", "all": "全部", "unlimited": "不限", "btn_reset": "重置筛选", "th_provider": "厂商", "th_country": "国家", "th_config": "配置", "th_vcpu": "vCPU", "th_memory": "内存", "th_storage": "存储", "th_bandwidth": "带宽", "th_traffic": "流量", "th_price": "月付价格", "th_action": "操作", "disclaimer": "* 价格仅供参考,以各厂商官网为准。部分为按量/包年折算月价。", "footer_note": "数据仅供参考 · 请以云厂商官网实时报价为准", "contact_label": "联系我们", "empty_state": "未找到匹配的方案", "load_error": "数据加载失败,请刷新页面重试", "search_label": "搜索", "price_under50": "< ¥50", "price_50_100": "¥50-100", "price_100_300": "¥100-300", "price_300_500": "¥300-500", "price_over500": "> ¥500", "cny": "人民币 (¥)", "usd": "美元 ($)", }, "en": { "tagline": "VPS & cloud server prices at a glance", "filter_provider": "Provider", "filter_region": "Region", "filter_memory": "Memory ≥", "filter_price": "Price range", "filter_currency": "Currency", "search_placeholder": "Search provider, config...", "all": "All", "unlimited": "Any", "btn_reset": "Reset", "th_provider": "Provider", "th_country": "Country", "th_config": "Config", "th_vcpu": "vCPU", "th_memory": "Memory", "th_storage": "Storage", "th_bandwidth": "Bandwidth", "th_traffic": "Traffic", "th_price": "Monthly", "th_action": "Action", "disclaimer": "* Prices are indicative. See provider sites for current rates.", "footer_note": "Data for reference only. Check provider sites for latest pricing.", "contact_label": "Contact", "empty_state": "No matching plans found", "load_error": "Failed to load data. Please refresh.", "search_label": "Search", "price_under50": "< 50", "price_50_100": "50-100", "price_100_300": "100-300", "price_300_500": "300-500", "price_over500": "> 500", "cny": "CNY (¥)", "usd": "USD ($)", }, } @app.route("/") def index(): lang = request.args.get("lang") or session.get("lang", "zh") if lang not in ("zh", "en"): lang = "zh" session["lang"] = lang t = I18N[lang] plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all() return render_template( "index.html", site_url=SITE_URL, site_name=SITE_NAME, plans_json_ld=[p.to_dict() for p in plans], lang=lang, t=t, ) @app.route("/api/plans") def api_plans(): plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all() return jsonify([p.to_dict() for p in plans]) # ---------- 前台用户与论坛 ---------- @app.route("/register", methods=["GET", "POST"]) def user_register(): current = _get_current_user() if current: if _is_banned_user(current): session.pop("user_id", None) else: return redirect(url_for("forum_index")) error = None if request.method == "POST": username = (request.form.get("username") or "").strip() password = request.form.get("password") or "" confirm_password = request.form.get("confirm_password") or "" if not _is_valid_username(username): error = "用户名需为 3-20 位,仅支持字母、数字、下划线" elif len(password) < 6: error = "密码至少 6 位" elif password != confirm_password: error = "两次输入的密码不一致" elif User.query.filter(func.lower(User.username) == username.lower()).first(): error = "用户名已存在" else: user = User(username=username) user.set_password(password) user.last_login_at = datetime.now(timezone.utc) db.session.add(user) db.session.commit() session["user_id"] = user.id return redirect(_safe_next_url("forum_index")) return render_template("auth/register.html", error=error) @app.route("/login", methods=["GET", "POST"]) def user_login(): current = _get_current_user() if current: if _is_banned_user(current): session.pop("user_id", None) else: return redirect(url_for("forum_index")) error = (request.args.get("error") or "").strip() or None if request.method == "POST": username = (request.form.get("username") or "").strip() password = request.form.get("password") or "" user = User.query.filter(func.lower(User.username) == username.lower()).first() if not user or not user.check_password(password): error = "用户名或密码错误" elif _is_banned_user(user): error = _user_ban_message(user) else: user.last_login_at = datetime.now(timezone.utc) db.session.commit() session["user_id"] = user.id return redirect(_safe_next_url("forum_index")) return render_template("auth/login.html", error=error) @app.route("/logout") def user_logout(): session.pop("user_id", None) return redirect(url_for("forum_index")) @app.route("/profile") def user_profile_redirect(): return redirect(url_for("user_profile")) @app.route("/me", methods=["GET", "POST"]) @user_login_required def user_profile(): user = _get_current_user() tab = (request.args.get("tab") or "posts").strip().lower() if tab not in {"posts", "comments", "likes", "bookmarks", "settings"}: tab = "posts" if request.method == "POST": action = (request.form.get("action") or "").strip().lower() if action == "profile": username = (request.form.get("username") or "").strip() if username == user.username: return redirect(url_for("user_profile", tab="settings", msg="资料未变更")) if not _is_valid_username(username): return redirect(url_for("user_profile", tab="settings", error="用户名需为 3-20 位,仅支持字母、数字、下划线")) exists = ( User.query .filter(func.lower(User.username) == username.lower(), User.id != user.id) .first() ) if exists: return redirect(url_for("user_profile", tab="settings", error="用户名已存在")) user.username = username db.session.commit() return redirect(url_for("user_profile", tab="settings", msg="用户名已更新")) if action == "password": current_password = request.form.get("current_password") or "" new_password = request.form.get("new_password") or "" confirm_password = request.form.get("confirm_password") or "" if not user.check_password(current_password): return redirect(url_for("user_profile", tab="settings", error="当前密码错误")) if len(new_password) < 6: return redirect(url_for("user_profile", tab="settings", error="新密码至少 6 位")) if new_password != confirm_password: return redirect(url_for("user_profile", tab="settings", error="两次新密码输入不一致")) user.set_password(new_password) db.session.commit() return redirect(url_for("user_profile", tab="settings", msg="密码已更新")) return redirect(url_for("user_profile", tab="settings", error="未知操作")) my_post_rows = ( _query_forum_post_rows(active_tab="latest", author_id=user.id) .limit(60) .all() ) my_post_cards = _build_forum_post_cards(my_post_rows) my_comment_rows = ( db.session.query( ForumComment, ForumPost.id.label("post_id"), ForumPost.title.label("post_title"), ) .join(ForumPost, ForumComment.post_id == ForumPost.id) .filter(ForumComment.user_id == user.id) .order_by(ForumComment.created_at.desc(), ForumComment.id.desc()) .limit(120) .all() ) my_comment_items = [ { "comment": c, "post_id": post_id, "post_title": post_title, } for c, post_id, post_title in my_comment_rows ] my_like_rows = ( db.session.query( ForumPostLike, ForumPost.id.label("post_id"), ForumPost.title.label("post_title"), ForumPost.category.label("post_category"), ForumPost.created_at.label("post_created_at"), ) .join(ForumPost, ForumPostLike.post_id == ForumPost.id) .filter(ForumPostLike.user_id == user.id) .order_by(ForumPostLike.created_at.desc(), ForumPostLike.id.desc()) .limit(120) .all() ) my_like_items = [ { "like": like_row, "post_id": post_id, "post_title": post_title, "post_category": post_category, "post_created_at": post_created_at, } for like_row, post_id, post_title, post_category, post_created_at in my_like_rows ] my_bookmark_rows = ( db.session.query( ForumPostBookmark, ForumPost.id.label("post_id"), ForumPost.title.label("post_title"), ForumPost.category.label("post_category"), ForumPost.created_at.label("post_created_at"), ) .join(ForumPost, ForumPostBookmark.post_id == ForumPost.id) .filter(ForumPostBookmark.user_id == user.id) .order_by(ForumPostBookmark.created_at.desc(), ForumPostBookmark.id.desc()) .limit(120) .all() ) my_bookmark_items = [ { "bookmark": bookmark_row, "post_id": post_id, "post_title": post_title, "post_category": post_category, "post_created_at": post_created_at, } for bookmark_row, post_id, post_title, post_category, post_created_at in my_bookmark_rows ] stats = { "post_count": ForumPost.query.filter_by(user_id=user.id).count(), "comment_count": ForumComment.query.filter_by(user_id=user.id).count(), "like_count": ForumPostLike.query.filter_by(user_id=user.id).count(), "bookmark_count": ForumPostBookmark.query.filter_by(user_id=user.id).count(), "report_count": ForumReport.query.filter_by(reporter_id=user.id).count(), "pending_report_count": ForumReport.query.filter_by(reporter_id=user.id, status="pending").count(), "notification_count": ForumNotification.query.filter_by(user_id=user.id).count(), "unread_notification_count": ForumNotification.query.filter_by(user_id=user.id, is_read=False).count(), } return render_template( "forum/profile.html", profile_user=user, active_tab=tab, my_post_cards=my_post_cards, my_comment_items=my_comment_items, my_like_items=my_like_items, my_bookmark_items=my_bookmark_items, stats=stats, message=request.args.get("msg") or "", error=request.args.get("error") or "", ) @app.route("/notifications") @user_login_required def user_notifications(): user = _get_current_user() status = (request.args.get("status") or "all").strip().lower() if status not in {"all", "unread", "read"}: status = "all" q = ForumNotification.query.filter_by(user_id=user.id) if status == "unread": q = q.filter_by(is_read=False) elif status == "read": q = q.filter_by(is_read=True) rows = q.order_by(ForumNotification.created_at.desc(), ForumNotification.id.desc()).limit(300).all() items = [] for n in rows: items.append({ "notification": n, "type_label": FORUM_NOTIFICATION_TYPE_LABELS.get(n.notif_type, n.notif_type or "通知"), "actor_name": n.actor_rel.username if n.actor_rel else "", "target_url": _notification_target_url(n), "time_text": _humanize_time(n.created_at), }) unread_count = ForumNotification.query.filter_by(user_id=user.id, is_read=False).count() read_count = ForumNotification.query.filter_by(user_id=user.id, is_read=True).count() return render_template( "forum/notifications.html", active_status=status, notification_items=items, unread_count=unread_count, read_count=read_count, total_count=unread_count + read_count, message=request.args.get("msg") or "", error=request.args.get("error") or "", ) @app.route("/notification//go") @user_login_required def user_notification_go(notification_id): user = _get_current_user() n = ForumNotification.query.get_or_404(notification_id) if n.user_id != user.id: return redirect(url_for("user_notifications", error="无权访问该通知")) if not n.is_read: n.is_read = True db.session.commit() return redirect(_notification_target_url(n)) @app.route("/notification//read", methods=["POST"]) @user_login_required def user_notification_read(notification_id): user = _get_current_user() n = ForumNotification.query.get_or_404(notification_id) if n.user_id != user.id: return redirect(url_for("user_notifications", error="无权操作该通知")) if not n.is_read: n.is_read = True db.session.commit() next_url = (request.form.get("next") or "").strip() if next_url.startswith("/") and not next_url.startswith("//"): return redirect(next_url) return redirect(url_for("user_notifications", msg="已标记为已读")) @app.route("/notifications/read-all", methods=["POST"]) @user_login_required def user_notifications_read_all(): user = _get_current_user() unread = ForumNotification.query.filter_by(user_id=user.id, is_read=False) updated = unread.update({"is_read": True}, synchronize_session=False) db.session.commit() msg = "已全部标记为已读" if updated else "没有未读通知" return redirect(url_for("user_notifications", msg=msg)) @app.route("/forum") def forum_index(): active_tab = (request.args.get("tab") or "latest").strip().lower() if active_tab not in {"latest", "new", "hot"}: active_tab = "latest" selected_category = (request.args.get("category") or "").strip() or None if selected_category and len(selected_category) > 32: selected_category = selected_category[:32] search_query = (request.args.get("q") or "").strip() if len(search_query) > 80: search_query = search_query[:80] page = request.args.get("page", type=int) or 1 if page < 1: page = 1 per_page = 20 rows_query = _query_forum_post_rows( active_tab=active_tab, selected_category=selected_category, search_query=search_query or None, ) total_posts = rows_query.order_by(None).count() total_pages = max((total_posts + per_page - 1) // per_page, 1) if page > total_pages: page = total_pages rows = rows_query.offset((page - 1) * per_page).limit(per_page).all() post_cards = _build_forum_post_cards(rows) sidebar = _forum_sidebar_data() category_count_map = {name: int(count or 0) for name, count in (sidebar.get("category_counts") or [])} category_names = list(_get_forum_category_names(active_only=True)) for name in category_count_map.keys(): if name and name not in category_names: category_names.append(name) if selected_category and selected_category not in category_names: category_names.insert(0, selected_category) tab_defs = [ ("latest", "最新"), ("new", "新帖"), ("hot", "热门"), ] tab_links = [ { "key": key, "label": label, "url": _build_forum_url( tab=key, category=selected_category, q=search_query or None, page=1, ), "active": active_tab == key, } for key, label in tab_defs ] category_links = [ { "name": "全部", "count": None, "url": _build_forum_url(tab=active_tab, category=None, q=search_query or None, page=1), "active": selected_category is None, } ] for name in category_names: category_links.append({ "name": name, "count": category_count_map.get(name, 0), "url": _build_forum_url(tab=active_tab, category=name, q=search_query or None, page=1), "active": selected_category == name, }) category_nav_url = _build_forum_url( tab=active_tab, category=selected_category or (category_names[0] if category_names else None), q=search_query or None, page=1, ) window_start = max(1, page - 2) window_end = min(total_pages, page + 2) page_links = [ { "num": num, "url": _build_forum_url( tab=active_tab, category=selected_category, q=search_query or None, page=num, ), "active": num == page, } for num in range(window_start, window_end + 1) ] has_filters = bool(selected_category or search_query or active_tab != "latest") if search_query and selected_category: empty_hint = "当前分类下没有匹配关键词的帖子。" elif search_query: empty_hint = "没有匹配关键词的帖子。" elif selected_category: empty_hint = "该分类暂时没有帖子。" else: empty_hint = "当前没有帖子,点击右上角按钮发布第一条内容。" result_start = ((page - 1) * per_page + 1) if total_posts else 0 result_end = min(page * per_page, total_posts) if total_posts else 0 return render_template( "forum/index.html", post_cards=post_cards, sidebar=sidebar, active_tab=active_tab, selected_category=selected_category, search_query=search_query, tab_links=tab_links, category_links=category_links, category_nav_url=category_nav_url, total_posts=total_posts, total_pages=total_pages, current_page=page, page_links=page_links, has_prev=(page > 1), has_next=(page < total_pages), prev_page_url=_build_forum_url( tab=active_tab, category=selected_category, q=search_query or None, page=page - 1, ), next_page_url=_build_forum_url( tab=active_tab, category=selected_category, q=search_query or None, page=page + 1, ), clear_search_url=_build_forum_url( tab=active_tab, category=selected_category, q=None, page=1, ), clear_all_url=_build_forum_url(tab="latest", category=None, q=None, page=1), has_filters=has_filters, empty_hint=empty_hint, result_start=result_start, result_end=result_end, message=request.args.get("msg") or "", error=request.args.get("error") or "", ) @app.route("/forum/post/new", methods=["GET", "POST"]) @user_login_required def forum_post_new(): user = _get_current_user() blocked_resp = _ensure_forum_interaction_user(user) if blocked_resp: return blocked_resp error = None title = "" content = "" available_categories = _get_forum_category_names(active_only=True) category = available_categories[0] if available_categories else "综合讨论" if request.method == "POST": title = (request.form.get("title") or "").strip() content = (request.form.get("content") or "").strip() category = (request.form.get("category") or "").strip() or category if category not in available_categories: category = available_categories[0] if available_categories else "综合讨论" if len(title) < 5: error = "标题至少 5 个字符" elif len(title) > 160: error = "标题不能超过 160 个字符" elif len(content) < 10: error = "内容至少 10 个字符" else: post = ForumPost( user_id=user.id, category=category, title=title, content=content, ) db.session.add(post) db.session.commit() return redirect(url_for("forum_post_detail", post_id=post.id)) return render_template( "forum/post_form.html", error=error, title_val=title, content_val=content, category_val=category, categories=available_categories, page_title="创建新主题", submit_text="发布主题", action_url=url_for("forum_post_new"), cancel_url=url_for("forum_index"), form_mode="create", ) @app.route("/forum/post//edit", methods=["GET", "POST"]) @user_login_required def forum_post_edit(post_id): post = ForumPost.query.get_or_404(post_id) user = _get_current_user() blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id) if blocked_resp: return blocked_resp if not _can_edit_post(user, post): return _forum_redirect_with_error(post.id, "你没有权限编辑该帖子") error = None title = post.title or "" content = post.content or "" available_categories = _get_forum_category_names(active_only=True) if post.category and post.category not in available_categories: available_categories.insert(0, post.category) category = post.category or (available_categories[0] if available_categories else "综合讨论") if request.method == "POST": title = (request.form.get("title") or "").strip() content = (request.form.get("content") or "").strip() category = (request.form.get("category") or "").strip() or category if category not in available_categories: category = available_categories[0] if available_categories else "综合讨论" if len(title) < 5: error = "标题至少 5 个字符" elif len(title) > 160: error = "标题不能超过 160 个字符" elif len(content) < 10: error = "内容至少 10 个字符" else: post.title = title post.content = content post.category = category db.session.commit() return _forum_redirect_with_msg(post.id, "帖子已更新") return render_template( "forum/post_form.html", error=error, title_val=title, content_val=content, category_val=category, categories=available_categories, page_title="编辑主题", submit_text="保存修改", action_url=url_for("forum_post_edit", post_id=post.id), cancel_url=url_for("forum_post_detail", post_id=post.id), form_mode="edit", ) @app.route("/forum/post//delete", methods=["POST"]) @user_login_required def forum_post_delete(post_id): post = ForumPost.query.get_or_404(post_id) user = _get_current_user() blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id) if blocked_resp: return blocked_resp if not _can_edit_post(user, post): return _forum_redirect_with_error(post.id, "你没有权限删除该帖子") db.session.delete(post) db.session.commit() return redirect(url_for("forum_index")) @app.route("/forum/post/") def forum_post_detail(post_id): post = ForumPost.query.get_or_404(post_id) current_user = _get_current_user() viewed_posts = session.get("viewed_posts") or [] if post.id not in viewed_posts: post.view_count = int(post.view_count or 0) + 1 viewed_posts.append(post.id) session["viewed_posts"] = viewed_posts[-200:] db.session.commit() comments = ( ForumComment.query .filter_by(post_id=post.id) .order_by(ForumComment.created_at.asc(), ForumComment.id.asc()) .all() ) like_count = ForumPostLike.query.filter_by(post_id=post.id).count() bookmark_count = ForumPostBookmark.query.filter_by(post_id=post.id).count() liked_by_me = False bookmarked_by_me = False can_interact = bool(current_user and not _is_banned_user(current_user)) if current_user: liked_by_me = ( ForumPostLike.query .filter_by(post_id=post.id, user_id=current_user.id) .first() is not None ) bookmarked_by_me = ( ForumPostBookmark.query .filter_by(post_id=post.id, user_id=current_user.id) .first() is not None ) sidebar = _forum_sidebar_data() return render_template( "forum/post_detail.html", post=post, comments=comments, like_count=like_count, bookmark_count=bookmark_count, liked_by_me=liked_by_me, bookmarked_by_me=bookmarked_by_me, can_interact=can_interact, sidebar=sidebar, message=request.args.get("msg") or "", error=request.args.get("error") or "", ) @app.route("/forum/post//like", methods=["POST"]) @user_login_required def forum_post_like_toggle(post_id): post = ForumPost.query.get_or_404(post_id) user = _get_current_user() blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id) if blocked_resp: return blocked_resp exists = ForumPostLike.query.filter_by(post_id=post.id, user_id=user.id).first() if exists: db.session.delete(exists) db.session.commit() return redirect(_safe_form_next_url(url_for("forum_post_detail", post_id=post.id, msg="已取消点赞"))) db.session.add(ForumPostLike(post_id=post.id, user_id=user.id)) db.session.commit() return redirect(_safe_form_next_url(url_for("forum_post_detail", post_id=post.id, msg="已点赞该帖子"))) @app.route("/forum/post//bookmark", methods=["POST"]) @user_login_required def forum_post_bookmark_toggle(post_id): post = ForumPost.query.get_or_404(post_id) user = _get_current_user() blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id) if blocked_resp: return blocked_resp exists = ForumPostBookmark.query.filter_by(post_id=post.id, user_id=user.id).first() if exists: db.session.delete(exists) db.session.commit() return redirect(_safe_form_next_url(url_for("forum_post_detail", post_id=post.id, msg="已取消收藏"))) db.session.add(ForumPostBookmark(post_id=post.id, user_id=user.id)) db.session.commit() return redirect(_safe_form_next_url(url_for("forum_post_detail", post_id=post.id, msg="已收藏该帖子"))) @app.route("/forum/post//comment", methods=["POST"]) @user_login_required def forum_post_comment(post_id): post = ForumPost.query.get_or_404(post_id) user = _get_current_user() blocked_resp = _ensure_forum_interaction_user(user, post_id=post.id) if blocked_resp: return blocked_resp if post.is_locked: return _forum_redirect_with_error(post.id, "该帖子已锁定,暂不允许新增评论") content = (request.form.get("content") or "").strip() if len(content) < 2: return redirect(url_for("forum_post_detail", post_id=post.id, error="评论至少 2 个字符")) comment = ForumComment( post_id=post.id, user_id=user.id, content=content, ) db.session.add(comment) db.session.flush() actor_name = user.username or "用户" post_title = post.title or "主题" if post.user_id and post.user_id != user.id: _create_notification( user_id=post.user_id, notif_type="post_commented", message="{} 评论了你的帖子《{}》".format(actor_name, post_title), actor_id=user.id, post_id=post.id, comment_id=comment.id, ) participant_rows = ( db.session.query(ForumComment.user_id) .filter( ForumComment.post_id == post.id, ForumComment.user_id.isnot(None), ForumComment.user_id != user.id, ) .distinct() .limit(50) .all() ) for (uid,) in participant_rows: if not uid: continue if uid == post.user_id or uid == user.id: continue _create_notification( user_id=uid, notif_type="thread_replied", message="{} 在你参与的主题《{}》有新回复".format(actor_name, post_title), actor_id=user.id, post_id=post.id, comment_id=comment.id, ) db.session.commit() return redirect(url_for("forum_post_detail", post_id=post.id, msg="评论发布成功")) @app.route("/forum/comment//edit", methods=["GET", "POST"]) @user_login_required def forum_comment_edit(comment_id): comment = ForumComment.query.get_or_404(comment_id) user = _get_current_user() blocked_resp = _ensure_forum_interaction_user(user, post_id=comment.post_id) if blocked_resp: return blocked_resp if not _can_edit_comment(user, comment): return _forum_redirect_with_error(comment.post_id, "你没有权限编辑该评论") error = None content = comment.content or "" if request.method == "POST": content = (request.form.get("content") or "").strip() if len(content) < 2: error = "评论至少 2 个字符" else: comment.content = content db.session.commit() return _forum_redirect_with_msg(comment.post_id, "评论已更新") return render_template( "forum/comment_form.html", error=error, comment=comment, content_val=content, action_url=url_for("forum_comment_edit", comment_id=comment.id), cancel_url=url_for("forum_post_detail", post_id=comment.post_id), ) @app.route("/forum/comment//delete", methods=["POST"]) @user_login_required def forum_comment_delete(comment_id): comment = ForumComment.query.get_or_404(comment_id) user = _get_current_user() blocked_resp = _ensure_forum_interaction_user(user, post_id=comment.post_id) if blocked_resp: return blocked_resp if not _can_edit_comment(user, comment): return _forum_redirect_with_error(comment.post_id, "你没有权限删除该评论") post_id = comment.post_id db.session.delete(comment) db.session.commit() return _forum_redirect_with_msg(post_id, "评论已删除") @app.route("/forum/report", methods=["POST"]) @user_login_required def forum_report_create(): user = _get_current_user() blocked_resp = _ensure_forum_interaction_user(user) if blocked_resp: return blocked_resp target_type = (request.form.get("target_type") or "").strip().lower() target_id = request.form.get("target_id", type=int) or 0 reason = (request.form.get("reason") or "其他").strip() detail = (request.form.get("detail") or "").strip() if len(detail) > 500: detail = detail[:500] if reason not in FORUM_REPORT_REASONS: reason = "其他" report_post_id = None target_owner_id = None snapshot_title = None snapshot_content = None if target_type == "post": target_post = db.session.get(ForumPost, target_id) if target_post is None: return redirect(url_for("forum_index")) report_post_id = target_post.id target_owner_id = target_post.user_id snapshot_title = target_post.title snapshot_content = target_post.content elif target_type == "comment": target_comment = db.session.get(ForumComment, target_id) if target_comment is None: return redirect(url_for("forum_index")) report_post_id = target_comment.post_id target_owner_id = target_comment.user_id snapshot_title = target_comment.post_rel.title if target_comment.post_rel else None snapshot_content = target_comment.content else: return redirect(url_for("forum_index")) if target_owner_id == user.id: return _forum_redirect_with_error(report_post_id, "不能举报自己的内容") exists = ForumReport.query.filter_by( reporter_id=user.id, target_type=target_type, target_id=target_id, status="pending", ).first() if exists: return _forum_redirect_with_msg(report_post_id, "你已举报该内容,请等待处理") db.session.add(ForumReport( reporter_id=user.id, target_type=target_type, target_id=target_id, reason=reason, detail=detail or None, snapshot_title=snapshot_title, snapshot_content=snapshot_content, status="pending", )) db.session.commit() return _forum_redirect_with_msg(report_post_id, "举报已提交,感谢反馈") # ---------- SEO ---------- @app.route("/sitemap.xml") def sitemap(): from flask import make_response url = SITE_URL.rstrip("/") xml = f''' {url}/ weekly 1.0 {url}/forum daily 0.9 ''' resp = make_response(xml) resp.mimetype = "application/xml" return resp @app.route("/robots.txt") def robots(): from flask import make_response url = SITE_URL.rstrip("/") txt = f"""User-agent: * Allow: / Sitemap: {url}/sitemap.xml """ resp = make_response(txt) resp.mimetype = "text/plain" return resp # ---------- 后台 ---------- @app.route("/admin/login", methods=["GET", "POST"]) def admin_login(): if request.method == "POST": password = request.form.get("password", "") if password == ADMIN_PASSWORD: session["admin_logged_in"] = True return redirect(url_for("admin_dashboard")) return render_template("admin/login.html", error="密码错误") return render_template("admin/login.html") @app.route("/admin/logout") def admin_logout(): session.pop("admin_logged_in", None) return redirect(url_for("index")) @app.route("/admin/api/plan/") @admin_required def admin_api_plan(plan_id): plan = VPSPlan.query.get_or_404(plan_id) return jsonify({ "id": plan.id, "provider_id": plan.provider_id, "countries": plan.countries or "", "vcpu": plan.vcpu, "memory_gb": plan.memory_gb, "storage_gb": plan.storage_gb, "bandwidth_mbps": plan.bandwidth_mbps, "traffic": plan.traffic or "", "price_cny": float(plan.price_cny) if plan.price_cny is not None else None, "price_usd": float(plan.price_usd) if plan.price_usd is not None else None, "currency": plan.currency or "CNY", "official_url": plan.official_url or "", }) @app.route("/admin/api/plan//price-history") @admin_required def admin_api_plan_price_history(plan_id): plan = VPSPlan.query.get_or_404(plan_id) rows = ( PriceHistory.query .filter_by(plan_id=plan.id) .order_by(PriceHistory.captured_at.desc(), PriceHistory.id.desc()) .limit(30) .all() ) return jsonify([ { "id": r.id, "price_cny": float(r.price_cny) if r.price_cny is not None else None, "price_usd": float(r.price_usd) if r.price_usd is not None else None, "currency": r.currency or "CNY", "source": r.source or "", "captured_at": r.captured_at.isoformat() if r.captured_at else "", } for r in rows ]) @app.route("/admin") @admin_required def admin_dashboard(): providers = Provider.query.order_by(Provider.name).all() plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all() plan_trends = _build_plan_trend_map(plans) return render_template( "admin/dashboard.html", providers=providers, plans=plans, plan_trends=plan_trends, country_tags=COUNTRY_TAGS, ) # ---------- 厂商管理 ---------- @app.route("/admin/providers") @admin_required def admin_providers(): providers = Provider.query.order_by(Provider.name).all() return render_template("admin/providers.html", providers=providers) @app.route("/admin/provider/new", methods=["GET", "POST"]) @admin_required def admin_provider_new(): if request.method == "POST": name = request.form.get("name", "").strip() official_url = request.form.get("official_url", "").strip() or None if not name: return render_template("admin/provider_form.html", provider=None, error="请填写厂商名称") if Provider.query.filter_by(name=name).first(): return render_template("admin/provider_form.html", provider=None, error="该厂商名称已存在") p = Provider(name=name, official_url=official_url) db.session.add(p) db.session.commit() return redirect(url_for("admin_provider_detail", provider_id=p.id)) return render_template("admin/provider_form.html", provider=None) @app.route("/admin/provider/") @admin_required def admin_provider_detail(provider_id): provider = Provider.query.get_or_404(provider_id) plans = VPSPlan.query.filter( (VPSPlan.provider_id == provider_id) | (VPSPlan.provider == provider.name) ).order_by(VPSPlan.price_cny.asc(), VPSPlan.name).all() providers = Provider.query.order_by(Provider.name).all() plan_trends = _build_plan_trend_map(plans) return render_template( "admin/provider_detail.html", provider=provider, plans=plans, plan_trends=plan_trends, providers=providers, country_tags=COUNTRY_TAGS, ) @app.route("/admin/provider//edit", methods=["GET", "POST"]) @admin_required def admin_provider_edit(provider_id): provider = Provider.query.get_or_404(provider_id) if request.method == "POST": provider.name = request.form.get("name", "").strip() provider.official_url = request.form.get("official_url", "").strip() or None if not provider.name: return render_template("admin/provider_form.html", provider=provider, error="请填写厂商名称") db.session.commit() return redirect(url_for("admin_provider_detail", provider_id=provider.id)) return render_template("admin/provider_form.html", provider=provider) @app.route("/admin/provider//delete", methods=["POST"]) @admin_required def admin_provider_delete(provider_id): provider = Provider.query.get_or_404(provider_id) # 将该厂商下的配置改为无厂商关联(保留配置,仅清空 provider_id) VPSPlan.query.filter_by(provider_id=provider_id).update({"provider_id": None}) db.session.delete(provider) db.session.commit() return redirect(url_for("admin_providers")) def _parse_sort_order(raw, default=100): s = (raw or "").strip() if not s: return default try: return int(s) except ValueError: return default def _admin_user_counts(user_ids): """批量统计用户维度数据,减少列表页 N+1 查询。""" if not user_ids: return { "posts": {}, "comments": {}, "reports": {}, "unread_notifications": {}, } post_counts = { uid: int(cnt or 0) for uid, cnt in ( db.session.query(ForumPost.user_id, func.count(ForumPost.id)) .filter(ForumPost.user_id.in_(user_ids)) .group_by(ForumPost.user_id) .all() ) } comment_counts = { uid: int(cnt or 0) for uid, cnt in ( db.session.query(ForumComment.user_id, func.count(ForumComment.id)) .filter(ForumComment.user_id.in_(user_ids)) .group_by(ForumComment.user_id) .all() ) } report_counts = { uid: int(cnt or 0) for uid, cnt in ( db.session.query(ForumReport.reporter_id, func.count(ForumReport.id)) .filter(ForumReport.reporter_id.in_(user_ids)) .group_by(ForumReport.reporter_id) .all() ) } unread_notification_counts = { uid: int(cnt or 0) for uid, cnt in ( db.session.query(ForumNotification.user_id, func.count(ForumNotification.id)) .filter(ForumNotification.user_id.in_(user_ids), ForumNotification.is_read.is_(False)) .group_by(ForumNotification.user_id) .all() ) } return { "posts": post_counts, "comments": comment_counts, "reports": report_counts, "unread_notifications": unread_notification_counts, } def _admin_load_user_options(limit=400): return ( User.query .order_by(User.username.asc(), User.id.asc()) .limit(limit) .all() ) def _admin_load_post_options(limit=400): return ( ForumPost.query .order_by(ForumPost.created_at.desc(), ForumPost.id.desc()) .limit(limit) .all() ) def _admin_fill_post_and_user_options(post_options, selected_post_id, user_options, selected_user_id): """确保编辑场景中的当前值始终出现在下拉框中。""" if selected_post_id and all(p.id != selected_post_id for p in post_options): selected_post = db.session.get(ForumPost, selected_post_id) if selected_post: post_options = [selected_post] + post_options if selected_user_id and all(u.id != selected_user_id for u in user_options): selected_user = db.session.get(User, selected_user_id) if selected_user: user_options = [selected_user] + user_options return post_options, user_options # ---------- 论坛分类管理 ---------- @app.route("/admin/forum/categories") @admin_required def admin_forum_categories(): categories = _load_forum_categories(active_only=False) posts_by_category = { name: count for name, count in ( db.session.query(ForumPost.category, func.count(ForumPost.id)) .group_by(ForumPost.category) .all() ) } return render_template( "admin/forum_categories.html", categories=categories, posts_by_category=posts_by_category, msg=request.args.get("msg", ""), error=request.args.get("error", ""), ) @app.route("/admin/forum/category/new", methods=["GET", "POST"]) @admin_required def admin_forum_category_new(): error = "" name_val = "" sort_order_val = 100 is_active_val = True if request.method == "POST": name_val = (request.form.get("name") or "").strip() sort_order_val = _parse_sort_order(request.form.get("sort_order"), 100) is_active_val = bool(request.form.get("is_active")) if not name_val: error = "请填写分类名称" elif len(name_val) > 32: error = "分类名称最多 32 个字符" elif ForumCategory.query.filter(func.lower(ForumCategory.name) == name_val.lower()).first(): error = "分类名称已存在" else: db.session.add(ForumCategory( name=name_val, sort_order=sort_order_val, is_active=is_active_val, )) db.session.commit() return redirect(url_for("admin_forum_categories", msg="已新增分类:{}".format(name_val))) return render_template( "admin/forum_category_form.html", page_title="新增论坛分类", submit_text="创建分类", action_url=url_for("admin_forum_category_new"), error=error, name_val=name_val, sort_order_val=sort_order_val, is_active_val=is_active_val, category_id=None, ) @app.route("/admin/forum/category//edit", methods=["GET", "POST"]) @admin_required def admin_forum_category_edit(category_id): category = ForumCategory.query.get_or_404(category_id) error = "" name_val = category.name sort_order_val = category.sort_order is_active_val = bool(category.is_active) if request.method == "POST": name_val = (request.form.get("name") or "").strip() sort_order_val = _parse_sort_order(request.form.get("sort_order"), category.sort_order) is_active_val = bool(request.form.get("is_active")) if not name_val: error = "请填写分类名称" elif len(name_val) > 32: error = "分类名称最多 32 个字符" elif category.is_active and not is_active_val and ForumCategory.query.filter_by(is_active=True).count() <= 1: error = "至少保留一个启用分类" else: exists = ( ForumCategory.query .filter(func.lower(ForumCategory.name) == name_val.lower(), ForumCategory.id != category.id) .first() ) if exists: error = "分类名称已存在" else: old_name = category.name category.name = name_val category.sort_order = sort_order_val category.is_active = is_active_val if old_name != name_val: ForumPost.query.filter_by(category=old_name).update({"category": name_val}) db.session.commit() return redirect(url_for("admin_forum_categories", msg="已更新分类:{}".format(name_val))) return render_template( "admin/forum_category_form.html", page_title="编辑论坛分类", submit_text="保存修改", action_url=url_for("admin_forum_category_edit", category_id=category.id), error=error, name_val=name_val, sort_order_val=sort_order_val, is_active_val=is_active_val, category_id=category.id, ) @app.route("/admin/forum/category//delete", methods=["POST"]) @admin_required def admin_forum_category_delete(category_id): category = ForumCategory.query.get_or_404(category_id) total = ForumCategory.query.count() if total <= 1: return redirect(url_for("admin_forum_categories", error="至少保留一个分类,无法删除最后一个")) if category.is_active and ForumCategory.query.filter_by(is_active=True).count() <= 1: return redirect(url_for("admin_forum_categories", error="至少保留一个启用分类,无法删除最后一个启用项")) replacement = ( ForumCategory.query .filter(ForumCategory.id != category.id) .order_by(ForumCategory.is_active.desc(), ForumCategory.sort_order.asc(), ForumCategory.id.asc()) .first() ) if replacement is None: return redirect(url_for("admin_forum_categories", error="未找到可替代分类")) ForumPost.query.filter_by(category=category.name).update({"category": replacement.name}) db.session.delete(category) db.session.commit() return redirect(url_for("admin_forum_categories", msg="已删除分类,帖子迁移到:{}".format(replacement.name))) def _get_report_target_info(report): """返回举报目标的展示信息。""" info = { "exists": False, "post_id": None, "title": report.snapshot_title or "", "content": report.snapshot_content or "", "author_name": "", } if report.target_type == "post": post = db.session.get(ForumPost, report.target_id) if post: info.update({ "exists": True, "post_id": post.id, "title": post.title or info["title"], "content": post.content or info["content"], "author_name": post.author_rel.username if post.author_rel else "", }) elif report.target_type == "comment": comment = db.session.get(ForumComment, report.target_id) if comment: info.update({ "exists": True, "post_id": comment.post_id, "title": comment.post_rel.title if comment.post_rel else (info["title"] or ""), "content": comment.content or info["content"], "author_name": comment.author_rel.username if comment.author_rel else "", }) if info["content"] and len(info["content"]) > 140: info["content"] = info["content"][:140] + "..." return info @app.route("/admin/forum/reports") @admin_required def admin_forum_reports(): status = (request.args.get("status") or "pending").strip().lower() if status not in {"pending", "processed", "rejected", "all"}: status = "pending" q = ForumReport.query.order_by(ForumReport.created_at.desc(), ForumReport.id.desc()) if status != "all": q = q.filter_by(status=status) reports = q.limit(300).all() report_items = [] for r in reports: report_items.append({ "report": r, "target": _get_report_target_info(r), "reporter_name": r.reporter_rel.username if r.reporter_rel else "用户", }) grouped = ( db.session.query(ForumReport.status, func.count(ForumReport.id)) .group_by(ForumReport.status) .all() ) count_map = {k: int(v or 0) for k, v in grouped} return render_template( "admin/forum_reports.html", status=status, report_items=report_items, status_count_map=count_map, status_labels=FORUM_REPORT_STATUS_LABELS, msg=request.args.get("msg", ""), error=request.args.get("error", ""), ) @app.route("/admin/forum/report//process", methods=["POST"]) @admin_required def admin_forum_report_process(report_id): report = ForumReport.query.get_or_404(report_id) action = (request.form.get("action") or "").strip().lower() review_note = (request.form.get("review_note") or "").strip() if len(review_note) > 500: review_note = review_note[:500] if report.status != "pending": return redirect(url_for("admin_forum_reports", error="该举报已处理")) outcome = "" target_owner_id = None target_post_id = None target_kind_label = "内容" if action == "delete_target": deleted = False if report.target_type == "post": target = db.session.get(ForumPost, report.target_id) if target: target_owner_id = target.user_id target_post_id = target.id target_kind_label = "帖子" db.session.delete(target) deleted = True outcome = "已删除被举报帖子" if deleted else "目标帖子已不存在" elif report.target_type == "comment": target = db.session.get(ForumComment, report.target_id) if target: target_owner_id = target.user_id target_post_id = target.post_id target_kind_label = "评论" db.session.delete(target) deleted = True outcome = "已删除被举报评论" if deleted else "目标评论已不存在" else: return redirect(url_for("admin_forum_reports", error="未知举报目标类型")) report.status = "processed" report.review_note = review_note or outcome elif action == "keep": report.status = "processed" report.review_note = review_note or "审核后保留内容" outcome = "已标记为保留" elif action == "reject": report.status = "rejected" report.review_note = review_note or "举报不成立" outcome = "已驳回举报" else: return redirect(url_for("admin_forum_reports", error="未知处理动作")) report.reviewed_by = "admin" report.reviewed_at = datetime.now(timezone.utc) _create_notification( user_id=report.reporter_id, notif_type="report_processed", message="你提交的举报(#{})处理结果:{}".format(report.id, outcome), report_id=report.id, post_id=target_post_id, ) if action == "delete_target" and target_owner_id and target_owner_id != report.reporter_id: _create_notification( user_id=target_owner_id, notif_type="content_removed", message="你的{}因举报处理已被删除".format(target_kind_label), report_id=report.id, post_id=target_post_id, ) db.session.commit() return redirect(url_for("admin_forum_reports", msg=outcome)) @app.route("/admin/users") @admin_required def admin_users(): keyword = (request.args.get("q") or "").strip() if len(keyword) > 50: keyword = keyword[:50] q = User.query if keyword: pattern = "%{}%".format(keyword) q = q.filter(User.username.ilike(pattern)) users = q.order_by(User.created_at.desc(), User.id.desc()).limit(300).all() user_ids = [u.id for u in users] count_maps = _admin_user_counts(user_ids) return render_template( "admin/users.html", users=users, keyword=keyword, post_count_map=count_maps["posts"], comment_count_map=count_maps["comments"], report_count_map=count_maps["reports"], unread_notification_count_map=count_maps["unread_notifications"], msg=request.args.get("msg", ""), error=request.args.get("error", ""), ) @app.route("/admin/user/new", methods=["GET", "POST"]) @admin_required def admin_user_new(): error = "" username_val = "" if request.method == "POST": username_val = (request.form.get("username") or "").strip() password = request.form.get("password") or "" confirm_password = request.form.get("confirm_password") or "" if not _is_valid_username(username_val): error = "用户名需为 3-20 位,仅支持字母、数字、下划线" elif len(password) < 6: error = "密码至少 6 位" elif password != confirm_password: error = "两次输入的密码不一致" elif User.query.filter(func.lower(User.username) == username_val.lower()).first(): error = "用户名已存在" else: user = User(username=username_val) user.set_password(password) db.session.add(user) db.session.commit() return redirect(url_for("admin_users", msg="已新增用户:{}".format(username_val))) return render_template( "admin/user_form.html", page_title="新增用户", submit_text="创建用户", action_url=url_for("admin_user_new"), error=error, username_val=username_val, user_id=None, ) @app.route("/admin/user//edit", methods=["GET", "POST"]) @admin_required def admin_user_edit(user_id): user = User.query.get_or_404(user_id) error = "" username_val = user.username or "" if request.method == "POST": username_val = (request.form.get("username") or "").strip() new_password = request.form.get("new_password") or "" confirm_password = request.form.get("confirm_password") or "" if not _is_valid_username(username_val): error = "用户名需为 3-20 位,仅支持字母、数字、下划线" elif ( User.query .filter(func.lower(User.username) == username_val.lower(), User.id != user.id) .first() ): error = "用户名已存在" elif new_password and len(new_password) < 6: error = "新密码至少 6 位" elif new_password and new_password != confirm_password: error = "两次新密码输入不一致" else: old_username = user.username user.username = username_val changed = False if old_username != username_val: changed = True if new_password: user.set_password(new_password) changed = True if changed: db.session.commit() return redirect(url_for("admin_users", msg="已更新用户:{}".format(username_val))) return redirect(url_for("admin_users", msg="未检测到变更")) return render_template( "admin/user_form.html", page_title="编辑用户", submit_text="保存修改", action_url=url_for("admin_user_edit", user_id=user.id), error=error, username_val=username_val, user_id=user.id, ) @app.route("/admin/user//delete", methods=["POST"]) @admin_required def admin_user_delete(user_id): user = User.query.get_or_404(user_id) if User.query.count() <= 1: return redirect(url_for("admin_users", error="至少保留一个用户,无法删除最后一个")) username = user.username or "用户" try: # 其他用户已收到的通知可能引用该用户为 actor,删除前置空避免外键冲突。 ForumNotification.query.filter_by(actor_id=user.id).update({"actor_id": None}, synchronize_session=False) db.session.delete(user) db.session.commit() return redirect(url_for("admin_users", msg="已删除用户:{}".format(username))) except Exception: db.session.rollback() return redirect(url_for("admin_users", error="删除失败,请稍后重试")) @app.route("/admin/user//ban", methods=["POST"]) @admin_required def admin_user_ban(user_id): user = User.query.get_or_404(user_id) reason = (request.form.get("reason") or "").strip() if len(reason) > 255: reason = reason[:255] if user.is_banned: return redirect(url_for("admin_users", msg="用户已处于封禁状态:{}".format(user.username))) user.is_banned = True user.banned_at = datetime.now(timezone.utc) user.banned_reason = reason or "管理员封禁" db.session.commit() return redirect(url_for("admin_users", msg="已封禁用户:{}".format(user.username))) @app.route("/admin/user//unban", methods=["POST"]) @admin_required def admin_user_unban(user_id): user = User.query.get_or_404(user_id) if not user.is_banned: return redirect(url_for("admin_users", msg="用户未被封禁:{}".format(user.username))) user.is_banned = False user.banned_at = None user.banned_reason = None db.session.commit() return redirect(url_for("admin_users", msg="已解封用户:{}".format(user.username))) @app.route("/admin/forum/posts") @admin_required def admin_forum_posts(): keyword = (request.args.get("q") or "").strip() if len(keyword) > 80: keyword = keyword[:80] selected_category = (request.args.get("category") or "").strip() or None selected_author_id = request.args.get("author_id", type=int) comment_stats_subq = ( db.session.query( ForumComment.post_id.label("post_id"), func.count(ForumComment.id).label("comment_count"), ) .group_by(ForumComment.post_id) .subquery() ) like_stats_subq = ( db.session.query( ForumPostLike.post_id.label("post_id"), func.count(ForumPostLike.id).label("like_count"), ) .group_by(ForumPostLike.post_id) .subquery() ) bookmark_stats_subq = ( db.session.query( ForumPostBookmark.post_id.label("post_id"), func.count(ForumPostBookmark.id).label("bookmark_count"), ) .group_by(ForumPostBookmark.post_id) .subquery() ) q = ( db.session.query( ForumPost, func.coalesce(comment_stats_subq.c.comment_count, 0).label("comment_count"), User.username.label("author_name"), func.coalesce(like_stats_subq.c.like_count, 0).label("like_count"), func.coalesce(bookmark_stats_subq.c.bookmark_count, 0).label("bookmark_count"), ) .outerjoin(comment_stats_subq, comment_stats_subq.c.post_id == ForumPost.id) .outerjoin(like_stats_subq, like_stats_subq.c.post_id == ForumPost.id) .outerjoin(bookmark_stats_subq, bookmark_stats_subq.c.post_id == ForumPost.id) .outerjoin(User, User.id == ForumPost.user_id) ) if selected_category: q = q.filter(ForumPost.category == selected_category) if selected_author_id: q = q.filter(ForumPost.user_id == selected_author_id) if keyword: pattern = "%{}%".format(keyword) q = q.filter( or_( ForumPost.title.ilike(pattern), ForumPost.content.ilike(pattern), User.username.ilike(pattern), ) ) rows = q.order_by(ForumPost.is_pinned.desc(), ForumPost.created_at.desc(), ForumPost.id.desc()).limit(400).all() category_names = list(_get_forum_category_names(active_only=False)) for (name,) in db.session.query(ForumPost.category).distinct().all(): if name and name not in category_names: category_names.append(name) if selected_category and selected_category not in category_names: category_names.insert(0, selected_category) author_rows = ( db.session.query( User.id, User.username, func.count(ForumPost.id).label("post_count"), ) .outerjoin(ForumPost, ForumPost.user_id == User.id) .group_by(User.id) .order_by(func.count(ForumPost.id).desc(), User.username.asc()) .limit(300) .all() ) return render_template( "admin/forum_posts.html", rows=rows, category_names=category_names, author_rows=author_rows, keyword=keyword, selected_category=selected_category, selected_author_id=selected_author_id, msg=request.args.get("msg", ""), error=request.args.get("error", ""), ) @app.route("/admin/forum/post//moderate", methods=["POST"]) @admin_required def admin_forum_post_moderate(post_id): post = ForumPost.query.get_or_404(post_id) action = (request.form.get("action") or "").strip().lower() if action == "pin": post.is_pinned = True elif action == "unpin": post.is_pinned = False elif action == "feature": post.is_featured = True elif action == "unfeature": post.is_featured = False elif action == "lock": post.is_locked = True elif action == "unlock": post.is_locked = False else: return redirect(url_for("admin_forum_posts", error="未知帖子管理动作")) db.session.commit() return redirect(url_for("admin_forum_posts", msg="已更新帖子 #{} 状态".format(post.id))) @app.route("/admin/forum/post/new", methods=["GET", "POST"]) @admin_required def admin_forum_post_new(): error = "" users = _admin_load_user_options(limit=400) categories = _get_forum_category_names(active_only=False) if not categories: categories = list(DEFAULT_FORUM_CATEGORIES) selected_author_id = request.args.get("author_id", type=int) or (users[0].id if users else None) selected_category = request.args.get("category") or (categories[0] if categories else "综合讨论") is_pinned_val = False is_featured_val = False is_locked_val = False title_val = "" content_val = "" if request.method == "POST": selected_author_id = request.form.get("author_id", type=int) selected_category = (request.form.get("category") or "").strip() or selected_category is_pinned_val = bool(request.form.get("is_pinned")) is_featured_val = bool(request.form.get("is_featured")) is_locked_val = bool(request.form.get("is_locked")) title_val = (request.form.get("title") or "").strip() content_val = (request.form.get("content") or "").strip() author = db.session.get(User, selected_author_id or 0) if not author: error = "请选择有效作者" elif selected_category not in categories: error = "请选择有效分类" elif len(title_val) < 5: error = "标题至少 5 个字符" elif len(title_val) > 160: error = "标题不能超过 160 个字符" elif len(content_val) < 10: error = "内容至少 10 个字符" else: db.session.add(ForumPost( user_id=author.id, category=selected_category, title=title_val, content=content_val, is_pinned=is_pinned_val, is_featured=is_featured_val, is_locked=is_locked_val, )) db.session.commit() return redirect(url_for("admin_forum_posts", msg="已新增帖子")) if not users: error = error or "当前没有可用用户,请先在用户管理中新增用户" return render_template( "admin/forum_post_form.html", page_title="后台新增帖子", submit_text="创建帖子", action_url=url_for("admin_forum_post_new"), cancel_url=url_for("admin_forum_posts"), error=error, users=users, categories=categories, selected_author_id=selected_author_id, selected_category=selected_category, is_pinned_val=is_pinned_val, is_featured_val=is_featured_val, is_locked_val=is_locked_val, title_val=title_val, content_val=content_val, post_id=None, ) @app.route("/admin/forum/post//edit", methods=["GET", "POST"]) @admin_required def admin_forum_post_edit(post_id): post = ForumPost.query.get_or_404(post_id) error = "" users = _admin_load_user_options(limit=400) categories = _get_forum_category_names(active_only=False) if post.category and post.category not in categories: categories.insert(0, post.category) selected_author_id = post.user_id selected_category = post.category or (categories[0] if categories else "综合讨论") is_pinned_val = bool(post.is_pinned) is_featured_val = bool(post.is_featured) is_locked_val = bool(post.is_locked) title_val = post.title or "" content_val = post.content or "" if request.method == "POST": selected_author_id = request.form.get("author_id", type=int) selected_category = (request.form.get("category") or "").strip() or selected_category is_pinned_val = bool(request.form.get("is_pinned")) is_featured_val = bool(request.form.get("is_featured")) is_locked_val = bool(request.form.get("is_locked")) title_val = (request.form.get("title") or "").strip() content_val = (request.form.get("content") or "").strip() author = db.session.get(User, selected_author_id or 0) if not author: error = "请选择有效作者" elif selected_category not in categories: error = "请选择有效分类" elif len(title_val) < 5: error = "标题至少 5 个字符" elif len(title_val) > 160: error = "标题不能超过 160 个字符" elif len(content_val) < 10: error = "内容至少 10 个字符" else: post.user_id = author.id post.category = selected_category post.is_pinned = is_pinned_val post.is_featured = is_featured_val post.is_locked = is_locked_val post.title = title_val post.content = content_val db.session.commit() return redirect(url_for("admin_forum_posts", msg="已更新帖子 #{}".format(post.id))) if not users: error = error or "当前没有可用用户,请先在用户管理中新增用户" return render_template( "admin/forum_post_form.html", page_title="后台编辑帖子", submit_text="保存修改", action_url=url_for("admin_forum_post_edit", post_id=post.id), cancel_url=url_for("admin_forum_posts"), error=error, users=users, categories=categories, selected_author_id=selected_author_id, selected_category=selected_category, is_pinned_val=is_pinned_val, is_featured_val=is_featured_val, is_locked_val=is_locked_val, title_val=title_val, content_val=content_val, post_id=post.id, ) @app.route("/admin/forum/post//delete", methods=["POST"]) @admin_required def admin_forum_post_delete(post_id): post = ForumPost.query.get_or_404(post_id) db.session.delete(post) db.session.commit() return redirect(url_for("admin_forum_posts", msg="已删除帖子 #{}".format(post_id))) @app.route("/admin/forum/comments") @admin_required def admin_forum_comments(): keyword = (request.args.get("q") or "").strip() if len(keyword) > 80: keyword = keyword[:80] selected_author_id = request.args.get("author_id", type=int) selected_post_id = request.args.get("post_id", type=int) q = ( db.session.query( ForumComment, ForumPost.title.label("post_title"), User.username.label("author_name"), ) .join(ForumPost, ForumComment.post_id == ForumPost.id) .outerjoin(User, User.id == ForumComment.user_id) ) if selected_post_id: q = q.filter(ForumComment.post_id == selected_post_id) if selected_author_id: q = q.filter(ForumComment.user_id == selected_author_id) if keyword: pattern = "%{}%".format(keyword) q = q.filter( or_( ForumComment.content.ilike(pattern), ForumPost.title.ilike(pattern), User.username.ilike(pattern), ) ) rows = q.order_by(ForumComment.created_at.desc(), ForumComment.id.desc()).limit(500).all() author_rows = ( db.session.query( User.id, User.username, func.count(ForumComment.id).label("comment_count"), ) .outerjoin(ForumComment, ForumComment.user_id == User.id) .group_by(User.id) .order_by(func.count(ForumComment.id).desc(), User.username.asc()) .limit(300) .all() ) post_rows = ( db.session.query( ForumPost.id, ForumPost.title, ) .order_by(ForumPost.created_at.desc(), ForumPost.id.desc()) .limit(300) .all() ) if selected_post_id and all(pid != selected_post_id for pid, _ in post_rows): selected_post = db.session.get(ForumPost, selected_post_id) if selected_post: post_rows = [(selected_post.id, selected_post.title)] + post_rows return render_template( "admin/forum_comments.html", rows=rows, author_rows=author_rows, post_rows=post_rows, keyword=keyword, selected_author_id=selected_author_id, selected_post_id=selected_post_id, msg=request.args.get("msg", ""), error=request.args.get("error", ""), ) @app.route("/admin/forum/comment/new", methods=["GET", "POST"]) @admin_required def admin_forum_comment_new(): error = "" post_options = _admin_load_post_options(limit=400) user_options = _admin_load_user_options(limit=400) selected_post_id = request.args.get("post_id", type=int) or (post_options[0].id if post_options else None) selected_user_id = request.args.get("user_id", type=int) or (user_options[0].id if user_options else None) post_options, user_options = _admin_fill_post_and_user_options( post_options, selected_post_id, user_options, selected_user_id, ) content_val = "" if request.method == "POST": selected_post_id = request.form.get("post_id", type=int) selected_user_id = request.form.get("user_id", type=int) content_val = (request.form.get("content") or "").strip() post_options, user_options = _admin_fill_post_and_user_options( post_options, selected_post_id, user_options, selected_user_id, ) target_post = db.session.get(ForumPost, selected_post_id or 0) target_user = db.session.get(User, selected_user_id or 0) if not target_post: error = "请选择有效帖子" elif not target_user: error = "请选择有效用户" elif len(content_val) < 2: error = "评论至少 2 个字符" else: db.session.add(ForumComment( post_id=target_post.id, user_id=target_user.id, content=content_val, )) db.session.commit() return redirect(url_for("admin_forum_comments", post_id=target_post.id, msg="已新增评论")) if not post_options: error = error or "暂无可评论的帖子,请先新增帖子" elif not user_options: error = error or "当前没有可用用户,请先在用户管理中新增用户" return render_template( "admin/forum_comment_form.html", page_title="后台新增评论", submit_text="创建评论", action_url=url_for("admin_forum_comment_new"), cancel_url=url_for("admin_forum_comments"), error=error, post_options=post_options, user_options=user_options, selected_post_id=selected_post_id, selected_user_id=selected_user_id, content_val=content_val, comment_id=None, ) @app.route("/admin/forum/comment//edit", methods=["GET", "POST"]) @admin_required def admin_forum_comment_edit(comment_id): comment = ForumComment.query.get_or_404(comment_id) error = "" post_options = _admin_load_post_options(limit=400) user_options = _admin_load_user_options(limit=400) selected_post_id = comment.post_id selected_user_id = comment.user_id post_options, user_options = _admin_fill_post_and_user_options( post_options, selected_post_id, user_options, selected_user_id, ) content_val = comment.content or "" if request.method == "POST": selected_post_id = request.form.get("post_id", type=int) selected_user_id = request.form.get("user_id", type=int) content_val = (request.form.get("content") or "").strip() post_options, user_options = _admin_fill_post_and_user_options( post_options, selected_post_id, user_options, selected_user_id, ) target_post = db.session.get(ForumPost, selected_post_id or 0) target_user = db.session.get(User, selected_user_id or 0) if not target_post: error = "请选择有效帖子" elif not target_user: error = "请选择有效用户" elif len(content_val) < 2: error = "评论至少 2 个字符" else: comment.post_id = target_post.id comment.user_id = target_user.id comment.content = content_val db.session.commit() return redirect(url_for("admin_forum_comments", post_id=target_post.id, msg="已更新评论 #{}".format(comment.id))) if not post_options: error = error or "暂无可评论的帖子,请先新增帖子" elif not user_options: error = error or "当前没有可用用户,请先在用户管理中新增用户" return render_template( "admin/forum_comment_form.html", page_title="后台编辑评论", submit_text="保存修改", action_url=url_for("admin_forum_comment_edit", comment_id=comment.id), cancel_url=url_for("admin_forum_comments", post_id=selected_post_id), error=error, post_options=post_options, user_options=user_options, selected_post_id=selected_post_id, selected_user_id=selected_user_id, content_val=content_val, comment_id=comment.id, ) @app.route("/admin/forum/comment//delete", methods=["POST"]) @admin_required def admin_forum_comment_delete(comment_id): comment = ForumComment.query.get_or_404(comment_id) post_id = comment.post_id db.session.delete(comment) db.session.commit() return redirect(url_for("admin_forum_comments", post_id=post_id, msg="已删除评论 #{}".format(comment_id))) @app.route("/admin/plan/new", methods=["GET", "POST"]) @admin_required def admin_plan_new(): provider_id = request.args.get("provider_id", type=int) if request.method == "POST": return _save_plan(None) providers = Provider.query.order_by(Provider.name).all() return render_template( "admin/plan_form.html", plan=None, country_tags=COUNTRY_TAGS, providers=providers, preselected_provider_id=provider_id, ) @app.route("/admin/plan//edit", methods=["GET", "POST"]) @admin_required def admin_plan_edit(plan_id): plan = VPSPlan.query.get_or_404(plan_id) if request.method == "POST": return _save_plan(plan) return redirect(url_for("admin_dashboard")) def _parse_optional_int(s): s = (s or "").strip() if not s: return None try: return int(s) except ValueError: return None def _parse_optional_float(s): s = (s or "").strip() if not s: return None try: return float(s) except ValueError: return None def _save_plan(plan): provider_id = request.form.get("provider_id", type=int) countries = request.form.get("countries", "").strip() or None vcpu = _parse_optional_int(request.form.get("vcpu")) memory_gb = _parse_optional_int(request.form.get("memory_gb")) storage_gb = _parse_optional_int(request.form.get("storage_gb")) bandwidth_mbps = _parse_optional_int(request.form.get("bandwidth_mbps")) traffic = request.form.get("traffic", "").strip() or None price_cny = _parse_optional_float(request.form.get("price_cny")) price_usd = _parse_optional_float(request.form.get("price_usd")) currency = request.form.get("currency", "CNY").strip() or "CNY" official_url = request.form.get("official_url", "").strip() or None provider = None if provider_id: provider = db.session.get(Provider, provider_id) if not provider: providers = Provider.query.order_by(Provider.name).all() return render_template( "admin/plan_form.html", plan=plan, country_tags=COUNTRY_TAGS, providers=providers, preselected_provider_id=provider_id, error="请选择厂商", ) if plan is None: plan = VPSPlan( provider_id=provider.id, provider=provider.name, region=None, name=None, vcpu=vcpu, memory_gb=memory_gb, storage_gb=storage_gb, bandwidth_mbps=bandwidth_mbps, traffic=traffic, price_cny=price_cny, price_usd=price_usd, currency=currency, official_url=official_url, countries=countries, ) db.session.add(plan) else: plan.provider_id = provider.id plan.provider = provider.name plan.region = None plan.name = None plan.vcpu = vcpu plan.memory_gb = memory_gb plan.storage_gb = storage_gb plan.bandwidth_mbps = bandwidth_mbps plan.traffic = traffic plan.price_cny = price_cny plan.price_usd = price_usd plan.currency = currency plan.official_url = official_url plan.countries = countries db.session.flush() _record_price_history(plan, source="manual") db.session.commit() # 若从厂商详情页进入添加,保存后返回该厂商详情 from_provider_id = request.form.get("from_provider_id", type=int) if from_provider_id: return redirect(url_for("admin_provider_detail", provider_id=from_provider_id)) return redirect(url_for("admin_dashboard")) @app.route("/admin/plan//delete", methods=["POST"]) @admin_required def admin_plan_delete(plan_id): plan = VPSPlan.query.get_or_404(plan_id) PriceHistory.query.filter_by(plan_id=plan_id).delete() db.session.delete(plan) db.session.commit() return redirect(url_for("admin_dashboard")) # ---------- Excel 导出 / 导入 ---------- EXCEL_HEADERS = [ "厂商", "厂商官网", "国家", "vCPU", "内存GB", "存储GB", "带宽Mbps", "流量", "月付人民币", "月付美元", "货币", "配置官网", ] @app.route("/admin/export/excel") @admin_required def admin_export_excel(): wb = Workbook() ws = wb.active ws.title = "配置" ws.append(EXCEL_HEADERS) plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all() for p in plans: provider_url = (p.provider_rel.official_url if p.provider_rel else "") or "" ws.append([ p.provider_name, provider_url or "", p.countries or "", p.vcpu if p.vcpu is not None else "", p.memory_gb if p.memory_gb is not None else "", p.storage_gb if p.storage_gb is not None else "", p.bandwidth_mbps if p.bandwidth_mbps is not None else "", p.traffic or "", p.price_cny if p.price_cny is not None else "", p.price_usd if p.price_usd is not None else "", p.currency or "CNY", p.official_url or "", ]) buf = io.BytesIO() wb.save(buf) buf.seek(0) return send_file( buf, mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", as_attachment=True, download_name="vps_配置_导出.xlsx", ) def _num(v): if v is None or v == "": return None try: return int(float(v)) except (ValueError, TypeError): return None def _float(v): if v is None or v == "": return None try: return float(v) except (ValueError, TypeError): return None def _opt_text(v): if v is None: return None s = str(v).strip() return s or None def _safe_str(v): if v is None: return "" return str(v).strip() def _eq_optional(a, b): if a is None and b is None: return True if a is None or b is None: return False if isinstance(a, float) or isinstance(b, float): return abs(float(a) - float(b)) < 1e-9 return a == b def _record_price_history(plan, source): if plan is None: return if plan.price_cny is None and plan.price_usd is None: return if plan.id is None: db.session.flush() latest = ( PriceHistory.query .filter_by(plan_id=plan.id) .order_by(PriceHistory.captured_at.desc(), PriceHistory.id.desc()) .first() ) currency = _opt_text(plan.currency) or "CNY" if latest: same_currency = _safe_str(latest.currency).upper() == _safe_str(currency).upper() if same_currency and _eq_optional(latest.price_cny, plan.price_cny) and _eq_optional(latest.price_usd, plan.price_usd): return db.session.add(PriceHistory( plan_id=plan.id, price_cny=plan.price_cny, price_usd=plan.price_usd, currency=currency, source=source, )) def _display_val(v): if v is None or v == "": return "—" if isinstance(v, float): s = "{:.2f}".format(v).rstrip("0").rstrip(".") return s if s else "0" return str(v) def _row_identity_key(row): return ( _safe_str(row.get("厂商")), _num(row.get("vCPU")), _num(row.get("内存GB")), _num(row.get("存储GB")), _num(row.get("带宽Mbps")), _safe_str(row.get("国家")), _safe_str(row.get("流量")), ) def _plan_identity_key(plan): return ( _safe_str(plan.provider_name), plan.vcpu, plan.memory_gb, plan.storage_gb, plan.bandwidth_mbps, _safe_str(plan.countries), _safe_str(plan.traffic), ) def _plan_diff(plan, row): """返回导入行相对于现有 plan 的差异列表。""" fields = [ ("国家", "countries", _opt_text(row.get("国家"))), ("vCPU", "vcpu", _num(row.get("vCPU"))), ("内存GB", "memory_gb", _num(row.get("内存GB"))), ("存储GB", "storage_gb", _num(row.get("存储GB"))), ("带宽Mbps", "bandwidth_mbps", _num(row.get("带宽Mbps"))), ("流量", "traffic", _opt_text(row.get("流量"))), ("月付人民币", "price_cny", _float(row.get("月付人民币"))), ("月付美元", "price_usd", _float(row.get("月付美元"))), ("货币", "currency", _opt_text(row.get("货币")) or "CNY"), ("配置官网", "official_url", _opt_text(row.get("配置官网"))), ] diffs = [] for label, attr, new_value in fields: old_value = getattr(plan, attr) if not _eq_optional(old_value, new_value): diffs.append({ "label": label, "old": old_value, "new": new_value, "old_display": _display_val(old_value), "new_display": _display_val(new_value), }) return diffs def _upsert_provider_from_row(row): provider_name = _safe_str(row.get("厂商")) if not provider_name: return None imported_provider_url = _opt_text(row.get("厂商官网")) provider = Provider.query.filter_by(name=provider_name).first() if not provider: provider = Provider(name=provider_name, official_url=imported_provider_url) db.session.add(provider) db.session.flush() elif imported_provider_url and provider.official_url != imported_provider_url: provider.official_url = imported_provider_url return provider def _fill_plan_from_row(plan, row, provider): plan.provider_id = provider.id plan.provider = provider.name plan.region = None plan.name = None plan.vcpu = _num(row.get("vCPU")) plan.memory_gb = _num(row.get("内存GB")) plan.storage_gb = _num(row.get("存储GB")) plan.bandwidth_mbps = _num(row.get("带宽Mbps")) plan.traffic = _opt_text(row.get("流量")) plan.price_cny = _float(row.get("月付人民币")) plan.price_usd = _float(row.get("月付美元")) plan.currency = _opt_text(row.get("货币")) or "CNY" plan.official_url = _opt_text(row.get("配置官网")) plan.countries = _opt_text(row.get("国家")) @app.route("/admin/import", methods=["GET", "POST"]) @admin_required def admin_import(): if request.method == "GET": return render_template("admin/import.html") f = request.files.get("file") if not f or not f.filename: return render_template("admin/import.html", error="请选择 Excel 文件") if not f.filename.lower().endswith(".xlsx"): return render_template("admin/import.html", error="请上传 .xlsx 文件") try: wb = load_workbook(io.BytesIO(f.read()), read_only=True, data_only=True) ws = wb.active rows = list(ws.iter_rows(min_row=2, values_only=True)) except Exception as e: return render_template("admin/import.html", error="解析失败: {}".format(str(e))) headers = EXCEL_HEADERS parsed = [] for row in rows: if not any(cell is not None and str(cell).strip() for cell in row): continue d = {} for i, h in enumerate(headers): if i < len(row): v = row[i] if v is not None and hasattr(v, "strip"): v = v.strip() d[h] = v else: d[h] = None parsed.append(d) if not parsed: return render_template("admin/import.html", error="文件中没有有效数据行") plans = VPSPlan.query.all() plan_index = {} for p in plans: key = _plan_identity_key(p) if key not in plan_index: plan_index[key] = p seen_row_keys = set() preview_items = [] for row in parsed: key = _row_identity_key(row) provider_name = key[0] if not provider_name: continue if key in seen_row_keys: continue seen_row_keys.add(key) matched = plan_index.get(key) if not matched: preview_items.append({ "action": "add", "row": row, "changes": [], "provider_url_changed": False, }) continue changes = _plan_diff(matched, row) imported_provider_url = _opt_text(row.get("厂商官网")) old_provider_url = _opt_text(matched.provider_rel.official_url if matched.provider_rel else None) provider_url_changed = bool(imported_provider_url and imported_provider_url != old_provider_url) if changes or provider_url_changed: preview_items.append({ "action": "update", "plan_id": matched.id, "row": row, "changes": changes, "provider_url_changed": provider_url_changed, "provider_url_old": old_provider_url, "provider_url_new": imported_provider_url, }) session["import_preview"] = preview_items return redirect(url_for("admin_import_preview")) @app.route("/admin/import/preview", methods=["GET", "POST"]) @admin_required def admin_import_preview(): preview_items = session.get("import_preview") or [] add_count = sum(1 for x in preview_items if x.get("action") == "add") update_count = sum(1 for x in preview_items if x.get("action") == "update") if request.method == "GET": return render_template( "admin/import_preview.html", rows=list(enumerate(preview_items)), add_count=add_count, update_count=update_count, ) selected = request.form.getlist("row_index") if not selected: return render_template( "admin/import_preview.html", rows=list(enumerate(preview_items)), add_count=add_count, update_count=update_count, error="请至少勾选一行", ) indices = sorted(set(int(x) for x in selected if x.isdigit())) add_applied = 0 update_applied = 0 for i in indices: if i < 0 or i >= len(preview_items): continue item = preview_items[i] row = item.get("row") or {} provider = _upsert_provider_from_row(row) if not provider: continue action = item.get("action") if action == "update": plan = db.session.get(VPSPlan, item.get("plan_id")) if not plan: plan = VPSPlan() db.session.add(plan) add_applied += 1 else: update_applied += 1 _fill_plan_from_row(plan, row, provider) else: plan = VPSPlan() _fill_plan_from_row(plan, row, provider) db.session.add(plan) add_applied += 1 _record_price_history(plan, source="import") db.session.commit() session.pop("import_preview", None) msg = "已新增 {} 条,更新 {} 条配置".format(add_applied, update_applied) return redirect(url_for("admin_dashboard") + "?" + urlencode({"msg": msg})) if __name__ == "__main__": app.run(debug=True, port=5001)