Files
vps_web/app.py
ddrwode b67f94a813 哈哈
2026-02-09 14:28:56 +08:00

549 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""云服务器价格对比 - Flask 应用"""
import io
from urllib.parse import urlencode
from flask import Flask, render_template, jsonify, request, redirect, url_for, session, send_file
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import text
from config import Config
from extensions import db
from openpyxl import Workbook
from openpyxl import load_workbook
app = Flask(__name__)
app.config.from_object(Config)
# 部署在 Nginx 等反向代理后时,信任 X-Forwarded-* 头HTTPS、真实 IP
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
db.init_app(app)
from models import VPSPlan, Provider # noqa: E402
def _ensure_mysql_columns():
"""为已有 MySQL 表添加缺失列,避免 1054 Unknown column。"""
try:
engine = db.engine
if engine.dialect.name != "mysql":
return
with engine.connect() as conn:
for col, spec in [
("traffic", "VARCHAR(64) NULL"),
("countries", "VARCHAR(255) NULL"),
("provider_id", "INT NULL"),
]:
try:
conn.execute(text("ALTER TABLE vps_plans ADD COLUMN {} {}".format(col, spec)))
conn.commit()
except Exception:
conn.rollback()
for col, spec in [
("name", "VARCHAR(128) NULL"),
("region", "VARCHAR(128) NULL"),
("price_cny", "DOUBLE NULL"),
("price_usd", "DOUBLE NULL"),
]:
try:
conn.execute(text("ALTER TABLE vps_plans MODIFY COLUMN {} {}".format(col, spec)))
conn.commit()
except Exception:
conn.rollback()
except Exception:
pass # 表不存在或非 MySQL 时忽略
# 启动时自动创建表(若不存在),并为已有表补列
with app.app_context():
db.create_all()
_ensure_mysql_columns()
ADMIN_PASSWORD = app.config["ADMIN_PASSWORD"]
SITE_URL = app.config["SITE_URL"]
SITE_NAME = app.config["SITE_NAME"]
# 国家/区域标签,供后台表单选择
COUNTRY_TAGS = [
"中国大陆", "中国香港", "中国台湾", "美国", "日本", "新加坡", "韩国",
"德国", "英国", "法国", "荷兰", "加拿大", "澳大利亚", "印度", "其他",
]
def admin_required(f):
from functools import wraps
@wraps(f)
def wrapped(*args, **kwargs):
if not session.get("admin_logged_in"):
return redirect(url_for("admin_login"))
return f(*args, **kwargs)
return wrapped
@app.route("/")
def index():
plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all()
return render_template(
"index.html",
site_url=SITE_URL,
site_name=SITE_NAME,
plans_json_ld=[p.to_dict() for p in plans],
)
@app.route("/api/plans")
def api_plans():
plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all()
return jsonify([p.to_dict() for p in plans])
# ---------- SEO ----------
@app.route("/sitemap.xml")
def sitemap():
from flask import make_response
url = SITE_URL.rstrip("/")
xml = f'''<?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
<url>
<loc>{url}/</loc>
<changefreq>weekly</changefreq>
<priority>1.0</priority>
</url>
</urlset>'''
resp = make_response(xml)
resp.mimetype = "application/xml"
return resp
@app.route("/robots.txt")
def robots():
from flask import make_response
url = SITE_URL.rstrip("/")
txt = f"""User-agent: *
Allow: /
Sitemap: {url}/sitemap.xml
"""
resp = make_response(txt)
resp.mimetype = "text/plain"
return resp
# ---------- 后台 ----------
@app.route("/admin/login", methods=["GET", "POST"])
def admin_login():
if request.method == "POST":
password = request.form.get("password", "")
if password == ADMIN_PASSWORD:
session["admin_logged_in"] = True
return redirect(url_for("admin_dashboard"))
return render_template("admin/login.html", error="密码错误")
return render_template("admin/login.html")
@app.route("/admin/logout")
def admin_logout():
session.pop("admin_logged_in", None)
return redirect(url_for("index"))
@app.route("/admin/api/plan/<int:plan_id>")
@admin_required
def admin_api_plan(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
return jsonify({
"id": plan.id,
"provider_id": plan.provider_id,
"countries": plan.countries or "",
"vcpu": plan.vcpu,
"memory_gb": plan.memory_gb,
"storage_gb": plan.storage_gb,
"bandwidth_mbps": plan.bandwidth_mbps,
"traffic": plan.traffic or "",
"price_cny": float(plan.price_cny) if plan.price_cny is not None else None,
"price_usd": float(plan.price_usd) if plan.price_usd is not None else None,
"currency": plan.currency or "CNY",
"official_url": plan.official_url or "",
})
@app.route("/admin")
@admin_required
def admin_dashboard():
providers = Provider.query.order_by(Provider.name).all()
plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all()
return render_template(
"admin/dashboard.html",
providers=providers,
plans=plans,
country_tags=COUNTRY_TAGS,
)
# ---------- 厂商管理 ----------
@app.route("/admin/providers")
@admin_required
def admin_providers():
providers = Provider.query.order_by(Provider.name).all()
return render_template("admin/providers.html", providers=providers)
@app.route("/admin/provider/new", methods=["GET", "POST"])
@admin_required
def admin_provider_new():
if request.method == "POST":
name = request.form.get("name", "").strip()
official_url = request.form.get("official_url", "").strip() or None
if not name:
return render_template("admin/provider_form.html", provider=None, error="请填写厂商名称")
if Provider.query.filter_by(name=name).first():
return render_template("admin/provider_form.html", provider=None, error="该厂商名称已存在")
p = Provider(name=name, official_url=official_url)
db.session.add(p)
db.session.commit()
return redirect(url_for("admin_provider_detail", provider_id=p.id))
return render_template("admin/provider_form.html", provider=None)
@app.route("/admin/provider/<int:provider_id>")
@admin_required
def admin_provider_detail(provider_id):
provider = Provider.query.get_or_404(provider_id)
plans = VPSPlan.query.filter(
(VPSPlan.provider_id == provider_id) | (VPSPlan.provider == provider.name)
).order_by(VPSPlan.price_cny.asc(), VPSPlan.name).all()
providers = Provider.query.order_by(Provider.name).all()
return render_template(
"admin/provider_detail.html",
provider=provider,
plans=plans,
providers=providers,
country_tags=COUNTRY_TAGS,
)
@app.route("/admin/provider/<int:provider_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_provider_edit(provider_id):
provider = Provider.query.get_or_404(provider_id)
if request.method == "POST":
provider.name = request.form.get("name", "").strip()
provider.official_url = request.form.get("official_url", "").strip() or None
if not provider.name:
return render_template("admin/provider_form.html", provider=provider, error="请填写厂商名称")
db.session.commit()
return redirect(url_for("admin_provider_detail", provider_id=provider.id))
return render_template("admin/provider_form.html", provider=provider)
@app.route("/admin/provider/<int:provider_id>/delete", methods=["POST"])
@admin_required
def admin_provider_delete(provider_id):
provider = Provider.query.get_or_404(provider_id)
# 将该厂商下的配置改为无厂商关联(保留配置,仅清空 provider_id
VPSPlan.query.filter_by(provider_id=provider_id).update({"provider_id": None})
db.session.delete(provider)
db.session.commit()
return redirect(url_for("admin_providers"))
@app.route("/admin/plan/new", methods=["GET", "POST"])
@admin_required
def admin_plan_new():
provider_id = request.args.get("provider_id", type=int)
if request.method == "POST":
return _save_plan(None)
providers = Provider.query.order_by(Provider.name).all()
return render_template(
"admin/plan_form.html",
plan=None,
country_tags=COUNTRY_TAGS,
providers=providers,
preselected_provider_id=provider_id,
)
@app.route("/admin/plan/<int:plan_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_plan_edit(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
if request.method == "POST":
return _save_plan(plan)
return redirect(url_for("admin_dashboard"))
def _parse_optional_int(s):
s = (s or "").strip()
if not s:
return None
try:
return int(s)
except ValueError:
return None
def _parse_optional_float(s):
s = (s or "").strip()
if not s:
return None
try:
return float(s)
except ValueError:
return None
def _save_plan(plan):
provider_id = request.form.get("provider_id", type=int)
countries = request.form.get("countries", "").strip() or None
vcpu = _parse_optional_int(request.form.get("vcpu"))
memory_gb = _parse_optional_int(request.form.get("memory_gb"))
storage_gb = _parse_optional_int(request.form.get("storage_gb"))
bandwidth_mbps = _parse_optional_int(request.form.get("bandwidth_mbps"))
traffic = request.form.get("traffic", "").strip() or None
price_cny = _parse_optional_float(request.form.get("price_cny"))
price_usd = _parse_optional_float(request.form.get("price_usd"))
currency = request.form.get("currency", "CNY").strip() or "CNY"
official_url = request.form.get("official_url", "").strip() or None
provider = None
if provider_id:
provider = Provider.query.get(provider_id)
if not provider:
providers = Provider.query.order_by(Provider.name).all()
return render_template(
"admin/plan_form.html",
plan=plan,
country_tags=COUNTRY_TAGS,
providers=providers,
preselected_provider_id=provider_id,
error="请选择厂商",
)
if plan is None:
plan = VPSPlan(
provider_id=provider.id,
provider=provider.name,
region=None,
name=None,
vcpu=vcpu,
memory_gb=memory_gb,
storage_gb=storage_gb,
bandwidth_mbps=bandwidth_mbps,
traffic=traffic,
price_cny=price_cny,
price_usd=price_usd,
currency=currency,
official_url=official_url,
countries=countries,
)
db.session.add(plan)
else:
plan.provider_id = provider.id
plan.provider = provider.name
plan.region = None
plan.name = None
plan.vcpu = vcpu
plan.memory_gb = memory_gb
plan.storage_gb = storage_gb
plan.bandwidth_mbps = bandwidth_mbps
plan.traffic = traffic
plan.price_cny = price_cny
plan.price_usd = price_usd
plan.currency = currency
plan.official_url = official_url
plan.countries = countries
db.session.commit()
# 若从厂商详情页进入添加,保存后返回该厂商详情
from_provider_id = request.form.get("from_provider_id", type=int)
if from_provider_id:
return redirect(url_for("admin_provider_detail", provider_id=from_provider_id))
return redirect(url_for("admin_dashboard"))
@app.route("/admin/plan/<int:plan_id>/delete", methods=["POST"])
@admin_required
def admin_plan_delete(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
db.session.delete(plan)
db.session.commit()
return redirect(url_for("admin_dashboard"))
# ---------- Excel 导出 / 导入 ----------
EXCEL_HEADERS = [
"厂商", "厂商官网", "国家", "vCPU", "内存GB", "存储GB", "带宽Mbps", "流量",
"月付人民币", "月付美元", "货币", "配置官网",
]
@app.route("/admin/export/excel")
@admin_required
def admin_export_excel():
wb = Workbook()
ws = wb.active
ws.title = "配置"
ws.append(EXCEL_HEADERS)
plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all()
for p in plans:
provider_url = (p.provider_rel.official_url if p.provider_rel else "") or ""
ws.append([
p.provider_name,
provider_url or "",
p.countries or "",
p.vcpu if p.vcpu is not None else "",
p.memory_gb if p.memory_gb is not None else "",
p.storage_gb if p.storage_gb is not None else "",
p.bandwidth_mbps if p.bandwidth_mbps is not None else "",
p.traffic or "",
p.price_cny if p.price_cny is not None else "",
p.price_usd if p.price_usd is not None else "",
p.currency or "CNY",
p.official_url or "",
])
buf = io.BytesIO()
wb.save(buf)
buf.seek(0)
return send_file(
buf,
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
as_attachment=True,
download_name="vps_配置_导出.xlsx",
)
def _plan_match(plan, row):
"""判断 DB 中的 plan 与导入行是否同一配置(厂商+配置+价格)。"""
def eq(a, b):
if a is None and b in (None, ""):
return True
if a is None or b in (None, ""):
return a == b
return a == b
return (
plan.provider_name == (row.get("厂商") or "").strip()
and eq(plan.vcpu, _num(row.get("vCPU")))
and eq(plan.memory_gb, _num(row.get("内存GB")))
and eq(plan.storage_gb, _num(row.get("存储GB")))
and eq(plan.bandwidth_mbps, _num(row.get("带宽Mbps")))
and (plan.countries or "").strip() == (row.get("国家") or "").strip()
and eq(plan.price_cny, _float(row.get("月付人民币")))
and eq(plan.price_usd, _float(row.get("月付美元")))
)
def _num(v):
if v is None or v == "":
return None
try:
return int(float(v))
except (ValueError, TypeError):
return None
def _float(v):
if v is None or v == "":
return None
try:
return float(v)
except (ValueError, TypeError):
return None
@app.route("/admin/import", methods=["GET", "POST"])
@admin_required
def admin_import():
if request.method == "GET":
return render_template("admin/import.html")
f = request.files.get("file")
if not f or not f.filename:
return render_template("admin/import.html", error="请选择 Excel 文件")
if not f.filename.lower().endswith(".xlsx"):
return render_template("admin/import.html", error="请上传 .xlsx 文件")
try:
wb = load_workbook(io.BytesIO(f.read()), read_only=True, data_only=True)
ws = wb.active
rows = list(ws.iter_rows(min_row=2, values_only=True))
except Exception as e:
return render_template("admin/import.html", error="解析失败: {}".format(str(e)))
headers = EXCEL_HEADERS
parsed = []
for row in rows:
if not any(cell is not None and str(cell).strip() for cell in row):
continue
d = {}
for i, h in enumerate(headers):
if i < len(row):
v = row[i]
if v is not None and hasattr(v, "strip"):
v = v.strip()
d[h] = v
else:
d[h] = None
parsed.append(d)
if not parsed:
return render_template("admin/import.html", error="文件中没有有效数据行")
plans = VPSPlan.query.all()
to_add = []
for row in parsed:
provider_name = (row.get("厂商") or "").strip()
if not provider_name:
continue
found = False
for p in plans:
if _plan_match(p, row):
found = True
break
if not found:
to_add.append(row)
session["import_preview"] = to_add
return redirect(url_for("admin_import_preview"))
@app.route("/admin/import/preview", methods=["GET", "POST"])
@admin_required
def admin_import_preview():
to_add = session.get("import_preview") or []
if request.method == "GET":
return render_template("admin/import_preview.html", rows=list(enumerate(to_add)))
selected = request.form.getlist("row_index")
if not selected:
to_add = session.get("import_preview") or []
return render_template("admin/import_preview.html", rows=list(enumerate(to_add)), error="请至少勾选一行")
to_add = session.get("import_preview") or []
indices = set(int(x) for x in selected if x.isdigit())
for i in indices:
if i < 0 or i >= len(to_add):
continue
row = to_add[i]
provider_name = (row.get("厂商") or "").strip()
if not provider_name:
continue
provider = Provider.query.filter_by(name=provider_name).first()
if not provider:
provider = Provider(name=provider_name, official_url=(row.get("厂商官网") or "").strip() or None)
db.session.add(provider)
db.session.flush()
plan = VPSPlan(
provider_id=provider.id,
provider=provider.name,
region=None,
name=None,
vcpu=_num(row.get("vCPU")),
memory_gb=_num(row.get("内存GB")),
storage_gb=_num(row.get("存储GB")),
bandwidth_mbps=_num(row.get("带宽Mbps")),
traffic=(row.get("流量") or "").strip() or None,
price_cny=_float(row.get("月付人民币")),
price_usd=_float(row.get("月付美元")),
currency=(row.get("货币") or "CNY").strip() or "CNY",
official_url=(row.get("配置官网") or "").strip() or None,
countries=(row.get("国家") or "").strip() or None,
)
db.session.add(plan)
db.session.commit()
session.pop("import_preview", None)
return redirect(url_for("admin_dashboard") + "?" + urlencode({"msg": "已导入 {} 条配置".format(len(indices))}))
if __name__ == "__main__":
app.run(debug=True, port=5001)