Files
vps_web/app.py
ddrwode 5fe60fdd57 哈哈
2026-02-09 22:36:32 +08:00

1033 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""云服务器价格对比 - Flask 应用"""
import io
from datetime import datetime
from urllib.parse import urlencode
from flask import Flask, render_template, jsonify, request, redirect, url_for, session, send_file
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import text, func
from config import Config
from extensions import db
from openpyxl import Workbook
from openpyxl import load_workbook
app = Flask(__name__)
app.config.from_object(Config)
# 部署在 Nginx 等反向代理后时,信任 X-Forwarded-* 头HTTPS、真实 IP
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
db.init_app(app)
from models import VPSPlan, Provider, PriceHistory, User, ForumPost, ForumComment # noqa: E402
def _ensure_mysql_columns():
"""为已有 MySQL 表添加缺失列,避免 1054 Unknown column。"""
try:
engine = db.engine
if engine.dialect.name != "mysql":
return
with engine.connect() as conn:
for col, spec in [
("traffic", "VARCHAR(64) NULL"),
("countries", "VARCHAR(255) NULL"),
("provider_id", "INT NULL"),
]:
try:
conn.execute(text("ALTER TABLE vps_plans ADD COLUMN {} {}".format(col, spec)))
conn.commit()
except Exception:
conn.rollback()
for col, spec in [
("name", "VARCHAR(128) NULL"),
("region", "VARCHAR(128) NULL"),
("price_cny", "DOUBLE NULL"),
("price_usd", "DOUBLE NULL"),
]:
try:
conn.execute(text("ALTER TABLE vps_plans MODIFY COLUMN {} {}".format(col, spec)))
conn.commit()
except Exception:
conn.rollback()
except Exception:
pass # 表不存在或非 MySQL 时忽略
def _ensure_price_history_baseline():
"""为历史数据补首条价格快照,便于后续计算涨跌。"""
try:
missing = (
db.session.query(VPSPlan)
.outerjoin(PriceHistory, PriceHistory.plan_id == VPSPlan.id)
.filter(PriceHistory.id.is_(None))
.all()
)
if not missing:
return
for p in missing:
if p.price_cny is None and p.price_usd is None:
continue
db.session.add(PriceHistory(
plan_id=p.id,
price_cny=p.price_cny,
price_usd=p.price_usd,
currency=(p.currency or "CNY"),
source="bootstrap",
))
db.session.commit()
except Exception:
db.session.rollback()
# 启动时自动创建表(若不存在),并为已有表补列
with app.app_context():
db.create_all()
_ensure_mysql_columns()
_ensure_price_history_baseline()
ADMIN_PASSWORD = app.config["ADMIN_PASSWORD"]
SITE_URL = app.config["SITE_URL"]
SITE_NAME = app.config["SITE_NAME"]
# 国家/区域标签,供后台表单选择
COUNTRY_TAGS = [
"中国大陆", "中国香港", "中国台湾", "美国", "日本", "新加坡", "韩国",
"德国", "英国", "法国", "荷兰", "加拿大", "澳大利亚", "印度", "其他",
]
PRICE_SOURCE_LABELS = {
"manual": "手工编辑",
"import": "Excel 导入",
"bootstrap": "基线",
}
def _get_current_user():
user_id = session.get("user_id")
if not user_id:
return None
user = User.query.get(user_id)
if not user:
session.pop("user_id", None)
return user
def _is_valid_username(username):
if not username:
return False
if len(username) < 3 or len(username) > 20:
return False
return all(ch.isalnum() or ch == "_" for ch in username)
def _safe_next_url(default_endpoint):
nxt = (request.values.get("next") or "").strip()
if nxt.startswith("/") and not nxt.startswith("//"):
return nxt
return url_for(default_endpoint)
@app.context_processor
def inject_global_user():
return {
"current_user": _get_current_user(),
"admin_logged_in": bool(session.get("admin_logged_in")),
}
def _currency_symbol(currency):
return "¥" if (currency or "CNY").upper() == "CNY" else "$"
def _format_money(currency, value):
return "{}{:.2f}".format(_currency_symbol(currency), float(value))
def _format_history_time(dt):
return dt.strftime("%Y-%m-%d %H:%M") if dt else ""
def _pick_price_pair(latest, previous=None):
if previous is None:
if latest.price_cny is not None:
return "CNY", float(latest.price_cny), None
if latest.price_usd is not None:
return "USD", float(latest.price_usd), None
return None, None, None
if latest.price_cny is not None and previous.price_cny is not None:
return "CNY", float(latest.price_cny), float(previous.price_cny)
if latest.price_usd is not None and previous.price_usd is not None:
return "USD", float(latest.price_usd), float(previous.price_usd)
return None, None, None
def _build_price_trend(latest, previous=None):
currency, current_value, previous_value = _pick_price_pair(latest, previous)
if currency is None or current_value is None:
return None
source = PRICE_SOURCE_LABELS.get(latest.source or "", latest.source or "未知来源")
meta = "当前 {} · {} · {}".format(
_format_money(currency, current_value),
_format_history_time(latest.captured_at),
source,
)
if previous_value is None:
return {
"direction": "new",
"delta_text": "首次记录",
"meta_text": meta,
}
diff = current_value - previous_value
if abs(diff) < 1e-9:
return {
"direction": "flat",
"delta_text": "→ 持平",
"meta_text": meta,
}
direction = "up" if diff > 0 else "down"
arrow = "" if diff > 0 else ""
sign = "+" if diff > 0 else "-"
delta_text = "{} {}{}{:,.2f}".format(arrow, sign, _currency_symbol(currency), abs(diff))
if abs(previous_value) > 1e-9:
pct = diff / previous_value * 100
delta_text += " ({:+.2f}%)".format(pct)
return {
"direction": direction,
"delta_text": delta_text,
"meta_text": meta,
}
def _build_plan_trend_map(plans):
plan_ids = [p.id for p in plans if p.id is not None]
if not plan_ids:
return {}
rows = (
PriceHistory.query
.filter(PriceHistory.plan_id.in_(plan_ids))
.order_by(PriceHistory.plan_id.asc(), PriceHistory.captured_at.desc(), PriceHistory.id.desc())
.all()
)
grouped = {}
for row in rows:
bucket = grouped.setdefault(row.plan_id, [])
if len(bucket) < 2:
bucket.append(row)
result = {}
for plan_id, bucket in grouped.items():
latest = bucket[0] if bucket else None
previous = bucket[1] if len(bucket) > 1 else None
trend = _build_price_trend(latest, previous) if latest else None
if trend:
result[plan_id] = trend
return result
def admin_required(f):
from functools import wraps
@wraps(f)
def wrapped(*args, **kwargs):
if not session.get("admin_logged_in"):
return redirect(url_for("admin_login"))
return f(*args, **kwargs)
return wrapped
def user_login_required(f):
from functools import wraps
@wraps(f)
def wrapped(*args, **kwargs):
if not _get_current_user():
return redirect(url_for("user_login", next=request.path))
return f(*args, **kwargs)
return wrapped
@app.route("/")
def index():
plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all()
return render_template(
"index.html",
site_url=SITE_URL,
site_name=SITE_NAME,
plans_json_ld=[p.to_dict() for p in plans],
)
@app.route("/api/plans")
def api_plans():
plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all()
return jsonify([p.to_dict() for p in plans])
# ---------- 前台用户与论坛 ----------
@app.route("/register", methods=["GET", "POST"])
def user_register():
if _get_current_user():
return redirect(url_for("forum_index"))
error = None
if request.method == "POST":
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
confirm_password = request.form.get("confirm_password") or ""
if not _is_valid_username(username):
error = "用户名需为 3-20 位,仅支持字母、数字、下划线"
elif len(password) < 6:
error = "密码至少 6 位"
elif password != confirm_password:
error = "两次输入的密码不一致"
elif User.query.filter(func.lower(User.username) == username.lower()).first():
error = "用户名已存在"
else:
user = User(username=username)
user.set_password(password)
user.last_login_at = datetime.utcnow()
db.session.add(user)
db.session.commit()
session["user_id"] = user.id
return redirect(_safe_next_url("forum_index"))
return render_template("auth/register.html", error=error)
@app.route("/login", methods=["GET", "POST"])
def user_login():
if _get_current_user():
return redirect(url_for("forum_index"))
error = None
if request.method == "POST":
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
user = User.query.filter(func.lower(User.username) == username.lower()).first()
if not user or not user.check_password(password):
error = "用户名或密码错误"
else:
user.last_login_at = datetime.utcnow()
db.session.commit()
session["user_id"] = user.id
return redirect(_safe_next_url("forum_index"))
return render_template("auth/login.html", error=error)
@app.route("/logout")
def user_logout():
session.pop("user_id", None)
return redirect(url_for("forum_index"))
@app.route("/forum")
def forum_index():
posts = ForumPost.query.order_by(ForumPost.created_at.desc(), ForumPost.id.desc()).all()
return render_template("forum/index.html", posts=posts)
@app.route("/forum/post/new", methods=["GET", "POST"])
@user_login_required
def forum_post_new():
user = _get_current_user()
error = None
title = ""
content = ""
if request.method == "POST":
title = (request.form.get("title") or "").strip()
content = (request.form.get("content") or "").strip()
if len(title) < 5:
error = "标题至少 5 个字符"
elif len(title) > 160:
error = "标题不能超过 160 个字符"
elif len(content) < 10:
error = "内容至少 10 个字符"
else:
post = ForumPost(
user_id=user.id,
title=title,
content=content,
)
db.session.add(post)
db.session.commit()
return redirect(url_for("forum_post_detail", post_id=post.id))
return render_template("forum/post_form.html", error=error, title_val=title, content_val=content)
@app.route("/forum/post/<int:post_id>")
def forum_post_detail(post_id):
post = ForumPost.query.get_or_404(post_id)
comments = (
ForumComment.query
.filter_by(post_id=post.id)
.order_by(ForumComment.created_at.asc(), ForumComment.id.asc())
.all()
)
return render_template(
"forum/post_detail.html",
post=post,
comments=comments,
message=request.args.get("msg") or "",
error=request.args.get("error") or "",
)
@app.route("/forum/post/<int:post_id>/comment", methods=["POST"])
@user_login_required
def forum_post_comment(post_id):
post = ForumPost.query.get_or_404(post_id)
user = _get_current_user()
content = (request.form.get("content") or "").strip()
if len(content) < 2:
return redirect(url_for("forum_post_detail", post_id=post.id, error="评论至少 2 个字符"))
comment = ForumComment(
post_id=post.id,
user_id=user.id,
content=content,
)
db.session.add(comment)
db.session.commit()
return redirect(url_for("forum_post_detail", post_id=post.id, msg="评论发布成功"))
# ---------- SEO ----------
@app.route("/sitemap.xml")
def sitemap():
from flask import make_response
url = SITE_URL.rstrip("/")
xml = f'''<?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
<url>
<loc>{url}/</loc>
<changefreq>weekly</changefreq>
<priority>1.0</priority>
</url>
<url>
<loc>{url}/forum</loc>
<changefreq>daily</changefreq>
<priority>0.9</priority>
</url>
</urlset>'''
resp = make_response(xml)
resp.mimetype = "application/xml"
return resp
@app.route("/robots.txt")
def robots():
from flask import make_response
url = SITE_URL.rstrip("/")
txt = f"""User-agent: *
Allow: /
Sitemap: {url}/sitemap.xml
"""
resp = make_response(txt)
resp.mimetype = "text/plain"
return resp
# ---------- 后台 ----------
@app.route("/admin/login", methods=["GET", "POST"])
def admin_login():
if request.method == "POST":
password = request.form.get("password", "")
if password == ADMIN_PASSWORD:
session["admin_logged_in"] = True
return redirect(url_for("admin_dashboard"))
return render_template("admin/login.html", error="密码错误")
return render_template("admin/login.html")
@app.route("/admin/logout")
def admin_logout():
session.pop("admin_logged_in", None)
return redirect(url_for("index"))
@app.route("/admin/api/plan/<int:plan_id>")
@admin_required
def admin_api_plan(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
return jsonify({
"id": plan.id,
"provider_id": plan.provider_id,
"countries": plan.countries or "",
"vcpu": plan.vcpu,
"memory_gb": plan.memory_gb,
"storage_gb": plan.storage_gb,
"bandwidth_mbps": plan.bandwidth_mbps,
"traffic": plan.traffic or "",
"price_cny": float(plan.price_cny) if plan.price_cny is not None else None,
"price_usd": float(plan.price_usd) if plan.price_usd is not None else None,
"currency": plan.currency or "CNY",
"official_url": plan.official_url or "",
})
@app.route("/admin/api/plan/<int:plan_id>/price-history")
@admin_required
def admin_api_plan_price_history(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
rows = (
PriceHistory.query
.filter_by(plan_id=plan.id)
.order_by(PriceHistory.captured_at.desc(), PriceHistory.id.desc())
.limit(30)
.all()
)
return jsonify([
{
"id": r.id,
"price_cny": float(r.price_cny) if r.price_cny is not None else None,
"price_usd": float(r.price_usd) if r.price_usd is not None else None,
"currency": r.currency or "CNY",
"source": r.source or "",
"captured_at": r.captured_at.isoformat() if r.captured_at else "",
}
for r in rows
])
@app.route("/admin")
@admin_required
def admin_dashboard():
providers = Provider.query.order_by(Provider.name).all()
plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all()
plan_trends = _build_plan_trend_map(plans)
return render_template(
"admin/dashboard.html",
providers=providers,
plans=plans,
plan_trends=plan_trends,
country_tags=COUNTRY_TAGS,
)
# ---------- 厂商管理 ----------
@app.route("/admin/providers")
@admin_required
def admin_providers():
providers = Provider.query.order_by(Provider.name).all()
return render_template("admin/providers.html", providers=providers)
@app.route("/admin/provider/new", methods=["GET", "POST"])
@admin_required
def admin_provider_new():
if request.method == "POST":
name = request.form.get("name", "").strip()
official_url = request.form.get("official_url", "").strip() or None
if not name:
return render_template("admin/provider_form.html", provider=None, error="请填写厂商名称")
if Provider.query.filter_by(name=name).first():
return render_template("admin/provider_form.html", provider=None, error="该厂商名称已存在")
p = Provider(name=name, official_url=official_url)
db.session.add(p)
db.session.commit()
return redirect(url_for("admin_provider_detail", provider_id=p.id))
return render_template("admin/provider_form.html", provider=None)
@app.route("/admin/provider/<int:provider_id>")
@admin_required
def admin_provider_detail(provider_id):
provider = Provider.query.get_or_404(provider_id)
plans = VPSPlan.query.filter(
(VPSPlan.provider_id == provider_id) | (VPSPlan.provider == provider.name)
).order_by(VPSPlan.price_cny.asc(), VPSPlan.name).all()
providers = Provider.query.order_by(Provider.name).all()
plan_trends = _build_plan_trend_map(plans)
return render_template(
"admin/provider_detail.html",
provider=provider,
plans=plans,
plan_trends=plan_trends,
providers=providers,
country_tags=COUNTRY_TAGS,
)
@app.route("/admin/provider/<int:provider_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_provider_edit(provider_id):
provider = Provider.query.get_or_404(provider_id)
if request.method == "POST":
provider.name = request.form.get("name", "").strip()
provider.official_url = request.form.get("official_url", "").strip() or None
if not provider.name:
return render_template("admin/provider_form.html", provider=provider, error="请填写厂商名称")
db.session.commit()
return redirect(url_for("admin_provider_detail", provider_id=provider.id))
return render_template("admin/provider_form.html", provider=provider)
@app.route("/admin/provider/<int:provider_id>/delete", methods=["POST"])
@admin_required
def admin_provider_delete(provider_id):
provider = Provider.query.get_or_404(provider_id)
# 将该厂商下的配置改为无厂商关联(保留配置,仅清空 provider_id
VPSPlan.query.filter_by(provider_id=provider_id).update({"provider_id": None})
db.session.delete(provider)
db.session.commit()
return redirect(url_for("admin_providers"))
@app.route("/admin/plan/new", methods=["GET", "POST"])
@admin_required
def admin_plan_new():
provider_id = request.args.get("provider_id", type=int)
if request.method == "POST":
return _save_plan(None)
providers = Provider.query.order_by(Provider.name).all()
return render_template(
"admin/plan_form.html",
plan=None,
country_tags=COUNTRY_TAGS,
providers=providers,
preselected_provider_id=provider_id,
)
@app.route("/admin/plan/<int:plan_id>/edit", methods=["GET", "POST"])
@admin_required
def admin_plan_edit(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
if request.method == "POST":
return _save_plan(plan)
return redirect(url_for("admin_dashboard"))
def _parse_optional_int(s):
s = (s or "").strip()
if not s:
return None
try:
return int(s)
except ValueError:
return None
def _parse_optional_float(s):
s = (s or "").strip()
if not s:
return None
try:
return float(s)
except ValueError:
return None
def _save_plan(plan):
provider_id = request.form.get("provider_id", type=int)
countries = request.form.get("countries", "").strip() or None
vcpu = _parse_optional_int(request.form.get("vcpu"))
memory_gb = _parse_optional_int(request.form.get("memory_gb"))
storage_gb = _parse_optional_int(request.form.get("storage_gb"))
bandwidth_mbps = _parse_optional_int(request.form.get("bandwidth_mbps"))
traffic = request.form.get("traffic", "").strip() or None
price_cny = _parse_optional_float(request.form.get("price_cny"))
price_usd = _parse_optional_float(request.form.get("price_usd"))
currency = request.form.get("currency", "CNY").strip() or "CNY"
official_url = request.form.get("official_url", "").strip() or None
provider = None
if provider_id:
provider = Provider.query.get(provider_id)
if not provider:
providers = Provider.query.order_by(Provider.name).all()
return render_template(
"admin/plan_form.html",
plan=plan,
country_tags=COUNTRY_TAGS,
providers=providers,
preselected_provider_id=provider_id,
error="请选择厂商",
)
if plan is None:
plan = VPSPlan(
provider_id=provider.id,
provider=provider.name,
region=None,
name=None,
vcpu=vcpu,
memory_gb=memory_gb,
storage_gb=storage_gb,
bandwidth_mbps=bandwidth_mbps,
traffic=traffic,
price_cny=price_cny,
price_usd=price_usd,
currency=currency,
official_url=official_url,
countries=countries,
)
db.session.add(plan)
else:
plan.provider_id = provider.id
plan.provider = provider.name
plan.region = None
plan.name = None
plan.vcpu = vcpu
plan.memory_gb = memory_gb
plan.storage_gb = storage_gb
plan.bandwidth_mbps = bandwidth_mbps
plan.traffic = traffic
plan.price_cny = price_cny
plan.price_usd = price_usd
plan.currency = currency
plan.official_url = official_url
plan.countries = countries
db.session.flush()
_record_price_history(plan, source="manual")
db.session.commit()
# 若从厂商详情页进入添加,保存后返回该厂商详情
from_provider_id = request.form.get("from_provider_id", type=int)
if from_provider_id:
return redirect(url_for("admin_provider_detail", provider_id=from_provider_id))
return redirect(url_for("admin_dashboard"))
@app.route("/admin/plan/<int:plan_id>/delete", methods=["POST"])
@admin_required
def admin_plan_delete(plan_id):
plan = VPSPlan.query.get_or_404(plan_id)
PriceHistory.query.filter_by(plan_id=plan_id).delete()
db.session.delete(plan)
db.session.commit()
return redirect(url_for("admin_dashboard"))
# ---------- Excel 导出 / 导入 ----------
EXCEL_HEADERS = [
"厂商", "厂商官网", "国家", "vCPU", "内存GB", "存储GB", "带宽Mbps", "流量",
"月付人民币", "月付美元", "货币", "配置官网",
]
@app.route("/admin/export/excel")
@admin_required
def admin_export_excel():
wb = Workbook()
ws = wb.active
ws.title = "配置"
ws.append(EXCEL_HEADERS)
plans = VPSPlan.query.order_by(VPSPlan.provider, VPSPlan.price_cny).all()
for p in plans:
provider_url = (p.provider_rel.official_url if p.provider_rel else "") or ""
ws.append([
p.provider_name,
provider_url or "",
p.countries or "",
p.vcpu if p.vcpu is not None else "",
p.memory_gb if p.memory_gb is not None else "",
p.storage_gb if p.storage_gb is not None else "",
p.bandwidth_mbps if p.bandwidth_mbps is not None else "",
p.traffic or "",
p.price_cny if p.price_cny is not None else "",
p.price_usd if p.price_usd is not None else "",
p.currency or "CNY",
p.official_url or "",
])
buf = io.BytesIO()
wb.save(buf)
buf.seek(0)
return send_file(
buf,
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
as_attachment=True,
download_name="vps_配置_导出.xlsx",
)
def _num(v):
if v is None or v == "":
return None
try:
return int(float(v))
except (ValueError, TypeError):
return None
def _float(v):
if v is None or v == "":
return None
try:
return float(v)
except (ValueError, TypeError):
return None
def _opt_text(v):
if v is None:
return None
s = str(v).strip()
return s or None
def _safe_str(v):
if v is None:
return ""
return str(v).strip()
def _eq_optional(a, b):
if a is None and b is None:
return True
if a is None or b is None:
return False
if isinstance(a, float) or isinstance(b, float):
return abs(float(a) - float(b)) < 1e-9
return a == b
def _record_price_history(plan, source):
if plan is None:
return
if plan.price_cny is None and plan.price_usd is None:
return
if plan.id is None:
db.session.flush()
latest = (
PriceHistory.query
.filter_by(plan_id=plan.id)
.order_by(PriceHistory.captured_at.desc(), PriceHistory.id.desc())
.first()
)
currency = _opt_text(plan.currency) or "CNY"
if latest:
same_currency = _safe_str(latest.currency).upper() == _safe_str(currency).upper()
if same_currency and _eq_optional(latest.price_cny, plan.price_cny) and _eq_optional(latest.price_usd, plan.price_usd):
return
db.session.add(PriceHistory(
plan_id=plan.id,
price_cny=plan.price_cny,
price_usd=plan.price_usd,
currency=currency,
source=source,
))
def _display_val(v):
if v is None or v == "":
return ""
if isinstance(v, float):
s = "{:.2f}".format(v).rstrip("0").rstrip(".")
return s if s else "0"
return str(v)
def _row_identity_key(row):
return (
_safe_str(row.get("厂商")),
_num(row.get("vCPU")),
_num(row.get("内存GB")),
_num(row.get("存储GB")),
_num(row.get("带宽Mbps")),
_safe_str(row.get("国家")),
_safe_str(row.get("流量")),
)
def _plan_identity_key(plan):
return (
_safe_str(plan.provider_name),
plan.vcpu,
plan.memory_gb,
plan.storage_gb,
plan.bandwidth_mbps,
_safe_str(plan.countries),
_safe_str(plan.traffic),
)
def _plan_diff(plan, row):
"""返回导入行相对于现有 plan 的差异列表。"""
fields = [
("国家", "countries", _opt_text(row.get("国家"))),
("vCPU", "vcpu", _num(row.get("vCPU"))),
("内存GB", "memory_gb", _num(row.get("内存GB"))),
("存储GB", "storage_gb", _num(row.get("存储GB"))),
("带宽Mbps", "bandwidth_mbps", _num(row.get("带宽Mbps"))),
("流量", "traffic", _opt_text(row.get("流量"))),
("月付人民币", "price_cny", _float(row.get("月付人民币"))),
("月付美元", "price_usd", _float(row.get("月付美元"))),
("货币", "currency", _opt_text(row.get("货币")) or "CNY"),
("配置官网", "official_url", _opt_text(row.get("配置官网"))),
]
diffs = []
for label, attr, new_value in fields:
old_value = getattr(plan, attr)
if not _eq_optional(old_value, new_value):
diffs.append({
"label": label,
"old": old_value,
"new": new_value,
"old_display": _display_val(old_value),
"new_display": _display_val(new_value),
})
return diffs
def _upsert_provider_from_row(row):
provider_name = _safe_str(row.get("厂商"))
if not provider_name:
return None
imported_provider_url = _opt_text(row.get("厂商官网"))
provider = Provider.query.filter_by(name=provider_name).first()
if not provider:
provider = Provider(name=provider_name, official_url=imported_provider_url)
db.session.add(provider)
db.session.flush()
elif imported_provider_url and provider.official_url != imported_provider_url:
provider.official_url = imported_provider_url
return provider
def _fill_plan_from_row(plan, row, provider):
plan.provider_id = provider.id
plan.provider = provider.name
plan.region = None
plan.name = None
plan.vcpu = _num(row.get("vCPU"))
plan.memory_gb = _num(row.get("内存GB"))
plan.storage_gb = _num(row.get("存储GB"))
plan.bandwidth_mbps = _num(row.get("带宽Mbps"))
plan.traffic = _opt_text(row.get("流量"))
plan.price_cny = _float(row.get("月付人民币"))
plan.price_usd = _float(row.get("月付美元"))
plan.currency = _opt_text(row.get("货币")) or "CNY"
plan.official_url = _opt_text(row.get("配置官网"))
plan.countries = _opt_text(row.get("国家"))
@app.route("/admin/import", methods=["GET", "POST"])
@admin_required
def admin_import():
if request.method == "GET":
return render_template("admin/import.html")
f = request.files.get("file")
if not f or not f.filename:
return render_template("admin/import.html", error="请选择 Excel 文件")
if not f.filename.lower().endswith(".xlsx"):
return render_template("admin/import.html", error="请上传 .xlsx 文件")
try:
wb = load_workbook(io.BytesIO(f.read()), read_only=True, data_only=True)
ws = wb.active
rows = list(ws.iter_rows(min_row=2, values_only=True))
except Exception as e:
return render_template("admin/import.html", error="解析失败: {}".format(str(e)))
headers = EXCEL_HEADERS
parsed = []
for row in rows:
if not any(cell is not None and str(cell).strip() for cell in row):
continue
d = {}
for i, h in enumerate(headers):
if i < len(row):
v = row[i]
if v is not None and hasattr(v, "strip"):
v = v.strip()
d[h] = v
else:
d[h] = None
parsed.append(d)
if not parsed:
return render_template("admin/import.html", error="文件中没有有效数据行")
plans = VPSPlan.query.all()
plan_index = {}
for p in plans:
key = _plan_identity_key(p)
if key not in plan_index:
plan_index[key] = p
seen_row_keys = set()
preview_items = []
for row in parsed:
key = _row_identity_key(row)
provider_name = key[0]
if not provider_name:
continue
if key in seen_row_keys:
continue
seen_row_keys.add(key)
matched = plan_index.get(key)
if not matched:
preview_items.append({
"action": "add",
"row": row,
"changes": [],
"provider_url_changed": False,
})
continue
changes = _plan_diff(matched, row)
imported_provider_url = _opt_text(row.get("厂商官网"))
old_provider_url = _opt_text(matched.provider_rel.official_url if matched.provider_rel else None)
provider_url_changed = bool(imported_provider_url and imported_provider_url != old_provider_url)
if changes or provider_url_changed:
preview_items.append({
"action": "update",
"plan_id": matched.id,
"row": row,
"changes": changes,
"provider_url_changed": provider_url_changed,
"provider_url_old": old_provider_url,
"provider_url_new": imported_provider_url,
})
session["import_preview"] = preview_items
return redirect(url_for("admin_import_preview"))
@app.route("/admin/import/preview", methods=["GET", "POST"])
@admin_required
def admin_import_preview():
preview_items = session.get("import_preview") or []
add_count = sum(1 for x in preview_items if x.get("action") == "add")
update_count = sum(1 for x in preview_items if x.get("action") == "update")
if request.method == "GET":
return render_template(
"admin/import_preview.html",
rows=list(enumerate(preview_items)),
add_count=add_count,
update_count=update_count,
)
selected = request.form.getlist("row_index")
if not selected:
return render_template(
"admin/import_preview.html",
rows=list(enumerate(preview_items)),
add_count=add_count,
update_count=update_count,
error="请至少勾选一行",
)
indices = sorted(set(int(x) for x in selected if x.isdigit()))
add_applied = 0
update_applied = 0
for i in indices:
if i < 0 or i >= len(preview_items):
continue
item = preview_items[i]
row = item.get("row") or {}
provider = _upsert_provider_from_row(row)
if not provider:
continue
action = item.get("action")
if action == "update":
plan = VPSPlan.query.get(item.get("plan_id"))
if not plan:
plan = VPSPlan()
db.session.add(plan)
add_applied += 1
else:
update_applied += 1
_fill_plan_from_row(plan, row, provider)
else:
plan = VPSPlan()
_fill_plan_from_row(plan, row, provider)
db.session.add(plan)
add_applied += 1
_record_price_history(plan, source="import")
db.session.commit()
session.pop("import_preview", None)
msg = "已新增 {} 条,更新 {} 条配置".format(add_applied, update_applied)
return redirect(url_for("admin_dashboard") + "?" + urlencode({"msg": msg}))
if __name__ == "__main__":
app.run(debug=True, port=5001)