# -*- coding: utf-8 -*- """云服务器价格对比 - Flask 应用""" import io from urllib.parse import urlencode from flask import Flask, render_template, jsonify, request, redirect, url_for, session, send_file from werkzeug.middleware.proxy_fix import ProxyFix from sqlalchemy import text from config import Config from extensions import db from openpyxl import Workbook from openpyxl import load_workbook app = Flask(__name__) app.config.from_object(Config) # 部署在 Nginx 等反向代理后时,信任 X-Forwarded-* 头(HTTPS、真实 IP) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1) db.init_app(app) from models import VPSPlan, Provider # noqa: E402 def _ensure_mysql_columns(): """为已有 MySQL 表添加缺失列,避免 1054 Unknown column。""" try: engine = db.engine if engine.dialect.name != "mysql": return with engine.connect() as conn: for col, spec in [ ("traffic", "VARCHAR(64) NULL"), ("countries", "VARCHAR(255) NULL"), ("provider_id", "INT NULL"), ]: try: conn.execute(text("ALTER TABLE vps_plans ADD COLUMN {} {}".format(col, spec))) conn.commit() except Exception: conn.rollback() for col, spec in [ ("name", "VARCHAR(128) NULL"), ("region", "VARCHAR(128) NULL"), ("price_cny", "DOUBLE NULL"), ("price_usd", "DOUBLE NULL"), ]: try: conn.execute(text("ALTER TABLE vps_plans MODIFY COLUMN {} {}".format(col, spec))) conn.commit() except Exception: conn.rollback() except Exception: pass # 表不存在或非 MySQL 时忽略 # 启动时自动创建表(若不存在),并为已有表补列 with app.app_context(): db.create_all() _ensure_mysql_columns() ADMIN_PASSWORD = app.config["ADMIN_PASSWORD"] SITE_URL = app.config["SITE_URL"] SITE_NAME = app.config["SITE_NAME"] # 国家/区域标签,供后台表单选择 COUNTRY_TAGS = [ "中国大陆", "中国香港", "中国台湾", "美国", "日本", "新加坡", "韩国", "德国", "英国", "法国", "荷兰", "加拿大", "澳大利亚", "印度", "其他", ] def admin_required(f): from functools import wraps @wraps(f) def wrapped(*args, **kwargs): if not session.get("admin_logged_in"): return redirect(url_for("admin_login")) return f(*args, **kwargs) return wrapped @app.route("/") def index(): plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all() return render_template( "index.html", site_url=SITE_URL, site_name=SITE_NAME, plans_json_ld=[p.to_dict() for p in plans], ) @app.route("/api/plans") def api_plans(): plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all() return jsonify([p.to_dict() for p in plans]) # ---------- SEO ---------- @app.route("/sitemap.xml") def sitemap(): from flask import make_response url = SITE_URL.rstrip("/") xml = f''' {url}/ weekly 1.0 ''' resp = make_response(xml) resp.mimetype = "application/xml" return resp @app.route("/robots.txt") def robots(): from flask import make_response url = SITE_URL.rstrip("/") txt = f"""User-agent: * Allow: / Sitemap: {url}/sitemap.xml """ resp = make_response(txt) resp.mimetype = "text/plain" return resp # ---------- 后台 ---------- @app.route("/admin/login", methods=["GET", "POST"]) def admin_login(): if request.method == "POST": password = request.form.get("password", "") if password == ADMIN_PASSWORD: session["admin_logged_in"] = True return redirect(url_for("admin_dashboard")) return render_template("admin/login.html", error="密码错误") return render_template("admin/login.html") @app.route("/admin/logout") def admin_logout(): session.pop("admin_logged_in", None) return redirect(url_for("index")) @app.route("/admin/api/plan/") @admin_required def admin_api_plan(plan_id): plan = VPSPlan.query.get_or_404(plan_id) return jsonify({ "id": plan.id, "provider_id": plan.provider_id, "countries": plan.countries or "", "vcpu": plan.vcpu, "memory_gb": plan.memory_gb, "storage_gb": plan.storage_gb, "bandwidth_mbps": plan.bandwidth_mbps, "traffic": plan.traffic or "", "price_cny": float(plan.price_cny) if plan.price_cny is not None else None, "price_usd": float(plan.price_usd) if plan.price_usd is not None else None, "currency": plan.currency or "CNY", "official_url": plan.official_url or "", }) @app.route("/admin") @admin_required def admin_dashboard(): providers = Provider.query.order_by(Provider.name).all() plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all() return render_template( "admin/dashboard.html", providers=providers, plans=plans, country_tags=COUNTRY_TAGS, ) # ---------- 厂商管理 ---------- @app.route("/admin/providers") @admin_required def admin_providers(): providers = Provider.query.order_by(Provider.name).all() return render_template("admin/providers.html", providers=providers) @app.route("/admin/provider/new", methods=["GET", "POST"]) @admin_required def admin_provider_new(): if request.method == "POST": name = request.form.get("name", "").strip() official_url = request.form.get("official_url", "").strip() or None if not name: return render_template("admin/provider_form.html", provider=None, error="请填写厂商名称") if Provider.query.filter_by(name=name).first(): return render_template("admin/provider_form.html", provider=None, error="该厂商名称已存在") p = Provider(name=name, official_url=official_url) db.session.add(p) db.session.commit() return redirect(url_for("admin_provider_detail", provider_id=p.id)) return render_template("admin/provider_form.html", provider=None) @app.route("/admin/provider/") @admin_required def admin_provider_detail(provider_id): provider = Provider.query.get_or_404(provider_id) plans = VPSPlan.query.filter( (VPSPlan.provider_id == provider_id) | (VPSPlan.provider == provider.name) ).order_by(VPSPlan.price_cny.asc(), VPSPlan.name).all() providers = Provider.query.order_by(Provider.name).all() return render_template( "admin/provider_detail.html", provider=provider, plans=plans, providers=providers, country_tags=COUNTRY_TAGS, ) @app.route("/admin/provider//edit", methods=["GET", "POST"]) @admin_required def admin_provider_edit(provider_id): provider = Provider.query.get_or_404(provider_id) if request.method == "POST": provider.name = request.form.get("name", "").strip() provider.official_url = request.form.get("official_url", "").strip() or None if not provider.name: return render_template("admin/provider_form.html", provider=provider, error="请填写厂商名称") db.session.commit() return redirect(url_for("admin_provider_detail", provider_id=provider.id)) return render_template("admin/provider_form.html", provider=provider) @app.route("/admin/provider//delete", methods=["POST"]) @admin_required def admin_provider_delete(provider_id): provider = Provider.query.get_or_404(provider_id) # 将该厂商下的配置改为无厂商关联(保留配置,仅清空 provider_id) VPSPlan.query.filter_by(provider_id=provider_id).update({"provider_id": None}) db.session.delete(provider) db.session.commit() return redirect(url_for("admin_providers")) @app.route("/admin/plan/new", methods=["GET", "POST"]) @admin_required def admin_plan_new(): provider_id = request.args.get("provider_id", type=int) if request.method == "POST": return _save_plan(None) providers = Provider.query.order_by(Provider.name).all() return render_template( "admin/plan_form.html", plan=None, country_tags=COUNTRY_TAGS, providers=providers, preselected_provider_id=provider_id, ) @app.route("/admin/plan//edit", methods=["GET", "POST"]) @admin_required def admin_plan_edit(plan_id): plan = VPSPlan.query.get_or_404(plan_id) if request.method == "POST": return _save_plan(plan) return redirect(url_for("admin_dashboard")) def _parse_optional_int(s): s = (s or "").strip() if not s: return None try: return int(s) except ValueError: return None def _parse_optional_float(s): s = (s or "").strip() if not s: return None try: return float(s) except ValueError: return None def _save_plan(plan): provider_id = request.form.get("provider_id", type=int) countries = request.form.get("countries", "").strip() or None vcpu = _parse_optional_int(request.form.get("vcpu")) memory_gb = _parse_optional_int(request.form.get("memory_gb")) storage_gb = _parse_optional_int(request.form.get("storage_gb")) bandwidth_mbps = _parse_optional_int(request.form.get("bandwidth_mbps")) traffic = request.form.get("traffic", "").strip() or None price_cny = _parse_optional_float(request.form.get("price_cny")) price_usd = _parse_optional_float(request.form.get("price_usd")) currency = request.form.get("currency", "CNY").strip() or "CNY" official_url = request.form.get("official_url", "").strip() or None provider = None if provider_id: provider = Provider.query.get(provider_id) if not provider: providers = Provider.query.order_by(Provider.name).all() return render_template( "admin/plan_form.html", plan=plan, country_tags=COUNTRY_TAGS, providers=providers, preselected_provider_id=provider_id, error="请选择厂商", ) if plan is None: plan = VPSPlan( provider_id=provider.id, provider=provider.name, region=None, name=None, vcpu=vcpu, memory_gb=memory_gb, storage_gb=storage_gb, bandwidth_mbps=bandwidth_mbps, traffic=traffic, price_cny=price_cny, price_usd=price_usd, currency=currency, official_url=official_url, countries=countries, ) db.session.add(plan) else: plan.provider_id = provider.id plan.provider = provider.name plan.region = None plan.name = None plan.vcpu = vcpu plan.memory_gb = memory_gb plan.storage_gb = storage_gb plan.bandwidth_mbps = bandwidth_mbps plan.traffic = traffic plan.price_cny = price_cny plan.price_usd = price_usd plan.currency = currency plan.official_url = official_url plan.countries = countries db.session.commit() # 若从厂商详情页进入添加,保存后返回该厂商详情 from_provider_id = request.form.get("from_provider_id", type=int) if from_provider_id: return redirect(url_for("admin_provider_detail", provider_id=from_provider_id)) return redirect(url_for("admin_dashboard")) @app.route("/admin/plan//delete", methods=["POST"]) @admin_required def admin_plan_delete(plan_id): plan = VPSPlan.query.get_or_404(plan_id) db.session.delete(plan) db.session.commit() return redirect(url_for("admin_dashboard")) # ---------- Excel 导出 / 导入 ---------- EXCEL_HEADERS = [ "厂商", "厂商官网", "国家", "vCPU", "内存GB", "存储GB", "带宽Mbps", "流量", "月付人民币", "月付美元", "货币", "配置官网", ] @app.route("/admin/export/excel") @admin_required def admin_export_excel(): wb = Workbook() ws = wb.active ws.title = "配置" ws.append(EXCEL_HEADERS) plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all() for p in plans: provider_url = (p.provider_rel.official_url if p.provider_rel else "") or "" ws.append([ p.provider_name, provider_url or "", p.countries or "", p.vcpu if p.vcpu is not None else "", p.memory_gb if p.memory_gb is not None else "", p.storage_gb if p.storage_gb is not None else "", p.bandwidth_mbps if p.bandwidth_mbps is not None else "", p.traffic or "", p.price_cny if p.price_cny is not None else "", p.price_usd if p.price_usd is not None else "", p.currency or "CNY", p.official_url or "", ]) buf = io.BytesIO() wb.save(buf) buf.seek(0) return send_file( buf, mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", as_attachment=True, download_name="vps_配置_导出.xlsx", ) def _plan_match(plan, row): """判断 DB 中的 plan 与导入行是否同一配置(厂商+配置+价格)。""" def eq(a, b): if a is None and b in (None, ""): return True if a is None or b in (None, ""): return a == b return a == b return ( plan.provider_name == (row.get("厂商") or "").strip() and eq(plan.vcpu, _num(row.get("vCPU"))) and eq(plan.memory_gb, _num(row.get("内存GB"))) and eq(plan.storage_gb, _num(row.get("存储GB"))) and eq(plan.bandwidth_mbps, _num(row.get("带宽Mbps"))) and (plan.countries or "").strip() == (row.get("国家") or "").strip() and eq(plan.price_cny, _float(row.get("月付人民币"))) and eq(plan.price_usd, _float(row.get("月付美元"))) ) def _num(v): if v is None or v == "": return None try: return int(float(v)) except (ValueError, TypeError): return None def _float(v): if v is None or v == "": return None try: return float(v) except (ValueError, TypeError): return None @app.route("/admin/import", methods=["GET", "POST"]) @admin_required def admin_import(): if request.method == "GET": return render_template("admin/import.html") f = request.files.get("file") if not f or not f.filename: return render_template("admin/import.html", error="请选择 Excel 文件") if not f.filename.lower().endswith(".xlsx"): return render_template("admin/import.html", error="请上传 .xlsx 文件") try: wb = load_workbook(io.BytesIO(f.read()), read_only=True, data_only=True) ws = wb.active rows = list(ws.iter_rows(min_row=2, values_only=True)) except Exception as e: return render_template("admin/import.html", error="解析失败: {}".format(str(e))) headers = EXCEL_HEADERS parsed = [] for row in rows: if not any(cell is not None and str(cell).strip() for cell in row): continue d = {} for i, h in enumerate(headers): if i < len(row): v = row[i] if v is not None and hasattr(v, "strip"): v = v.strip() d[h] = v else: d[h] = None parsed.append(d) if not parsed: return render_template("admin/import.html", error="文件中没有有效数据行") plans = VPSPlan.query.all() to_add = [] for row in parsed: provider_name = (row.get("厂商") or "").strip() if not provider_name: continue found = False for p in plans: if _plan_match(p, row): found = True break if not found: to_add.append(row) session["import_preview"] = to_add return redirect(url_for("admin_import_preview")) @app.route("/admin/import/preview", methods=["GET", "POST"]) @admin_required def admin_import_preview(): to_add = session.get("import_preview") or [] if request.method == "GET": return render_template("admin/import_preview.html", rows=list(enumerate(to_add))) selected = request.form.getlist("row_index") if not selected: to_add = session.get("import_preview") or [] return render_template("admin/import_preview.html", rows=list(enumerate(to_add)), error="请至少勾选一行") to_add = session.get("import_preview") or [] indices = set(int(x) for x in selected if x.isdigit()) for i in indices: if i < 0 or i >= len(to_add): continue row = to_add[i] provider_name = (row.get("厂商") or "").strip() if not provider_name: continue provider = Provider.query.filter_by(name=provider_name).first() if not provider: provider = Provider(name=provider_name, official_url=(row.get("厂商官网") or "").strip() or None) db.session.add(provider) db.session.flush() plan = VPSPlan( provider_id=provider.id, provider=provider.name, region=None, name=None, vcpu=_num(row.get("vCPU")), memory_gb=_num(row.get("内存GB")), storage_gb=_num(row.get("存储GB")), bandwidth_mbps=_num(row.get("带宽Mbps")), traffic=(row.get("流量") or "").strip() or None, price_cny=_float(row.get("月付人民币")), price_usd=_float(row.get("月付美元")), currency=(row.get("货币") or "CNY").strip() or "CNY", official_url=(row.get("配置官网") or "").strip() or None, countries=(row.get("国家") or "").strip() or None, ) db.session.add(plan) db.session.commit() session.pop("import_preview", None) return redirect(url_for("admin_dashboard") + "?" + urlencode({"msg": "已导入 {} 条配置".format(len(indices))})) if __name__ == "__main__": app.run(debug=True, port=5001)