haha
This commit is contained in:
83
models/migrate_tg_phone_devices.py
Normal file
83
models/migrate_tg_phone_devices.py
Normal file
@@ -0,0 +1,83 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
将 tg_phone_devices.py 中 TgPhoneDevices、TgPhoneDevices1 的数据(SQLite haha.db)
|
||||
迁移到 tg_models.py 对应模型所在的数据库(MySQL)。
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# 确保项目根目录在 path 中
|
||||
project_root = Path(__file__).resolve().parent.parent
|
||||
if str(project_root) not in sys.path:
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
# 先导入源模型(SQLite)
|
||||
import models.tg_phone_devices as src_module
|
||||
# 再导入目标模型(MySQL)
|
||||
import models.tg_models as tgt_module
|
||||
|
||||
# 要迁移的 (源模型, 目标模型, 表名描述) 列表
|
||||
MODEL_PAIRS = [
|
||||
(src_module.TgPhoneDevices, tgt_module.TgPhoneDevices, 'tg_phone_devices'),
|
||||
(src_module.TgPhoneDevices1, tgt_module.TgPhoneDevices1, 'tg_phone_devices_copy1'),
|
||||
]
|
||||
|
||||
|
||||
def get_data_fields(model):
|
||||
"""获取需要迁移的字段名"""
|
||||
return [f.name for f in model._meta.sorted_fields]
|
||||
|
||||
|
||||
def migrate_one(SourceModel, TargetModel, table_label, batch_size=500, truncate_before=False):
|
||||
"""
|
||||
迁移单个模型的数据。
|
||||
:param table_label: 表名描述,用于日志
|
||||
"""
|
||||
fields = get_data_fields(SourceModel)
|
||||
total = SourceModel.select().count()
|
||||
print(f"[{table_label}] 源表共 {total} 条记录,开始迁移...")
|
||||
|
||||
if truncate_before and total > 0:
|
||||
TargetModel.delete().execute()
|
||||
print(f" 已清空目标表 {table_label}。")
|
||||
|
||||
migrated = 0
|
||||
skipped = 0
|
||||
errors = 0
|
||||
|
||||
for offset in range(0, total, batch_size):
|
||||
batch = list(SourceModel.select().offset(offset).limit(batch_size))
|
||||
for rec in batch:
|
||||
try:
|
||||
row = {f: getattr(rec, f) for f in fields}
|
||||
TargetModel.insert(**row).execute()
|
||||
migrated += 1
|
||||
except Exception as e:
|
||||
if "Duplicate" in str(e) or "1062" in str(e):
|
||||
skipped += 1
|
||||
else:
|
||||
errors += 1
|
||||
print(f" 插入失败 id={getattr(rec, 'id', '?')}: {e}")
|
||||
|
||||
if total > batch_size and (offset + batch_size) % (batch_size * 2) == 0 or offset + batch_size >= total:
|
||||
print(f" 已处理 {min(offset + batch_size, total)}/{total} 条")
|
||||
|
||||
print(f"[{table_label}] 迁移完成: 成功 {migrated}, 跳过(重复) {skipped}, 错误 {errors}\n")
|
||||
|
||||
|
||||
def migrate_all(batch_size=500, truncate_before=False):
|
||||
"""迁移 TgPhoneDevices 和 TgPhoneDevices1。"""
|
||||
tgt_module.db.connect()
|
||||
for SourceModel, TargetModel, table_label in MODEL_PAIRS:
|
||||
tgt_module.db.create_tables([TargetModel], safe=True)
|
||||
migrate_one(SourceModel, TargetModel, table_label, batch_size=batch_size, truncate_before=truncate_before)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description='TgPhoneDevices / TgPhoneDevices1 SQLite -> MySQL 数据迁移')
|
||||
parser.add_argument('--batch-size', type=int, default=500, help='每批条数')
|
||||
parser.add_argument('--truncate', action='store_true', help='迁移前清空目标表(两个表都会清空)')
|
||||
args = parser.parse_args()
|
||||
migrate_all(batch_size=args.batch_size, truncate_before=args.truncate)
|
||||
Reference in New Issue
Block a user