Files
mini_code/models/tg_models.py

210 lines
8.7 KiB
Python
Raw Normal View History

2025-12-01 17:22:53 +08:00
from peewee import *
2026-01-12 09:52:44 +08:00
from pathlib import Path
from datetime import datetime
2026-02-13 22:59:13 +08:00
import threading
import time
2025-12-01 17:22:53 +08:00
2026-01-12 09:52:44 +08:00
import pymysql
2026-02-13 22:59:13 +08:00
from playhouse.pool import PooledMySQLDatabase
2026-01-12 09:52:44 +08:00
# 安装 pymysql 作为 MySQLdb
pymysql.install_as_MySQLdb()
2026-02-13 22:59:13 +08:00
# 断连相关的 MySQL 错误码
_RETRY_ERRORS = (
0, # InterfaceError: (0, '')
2006, # MySQL server has gone away
2013, # Lost connection to MySQL server during query
)
class SingleConnectionMySQLDatabase(MySQLDatabase):
"""
多线程共用一个 MySQL 连接
- 使用 thread_safe=False 让所有线程共享同一个 _state一个连接
- RLock 串行化所有 connect/close/execute_sql保证同一时刻只有一个线程在用连接
"""
def __init__(self, *args, **kwargs):
# thread_safe=False → _state 为共享的 _ConnectionState不按线程隔离
super().__init__(*args, thread_safe=False, **kwargs)
self._lock = threading.RLock()
def connect(self, reuse_if_open=False):
with self._lock:
if self.deferred:
raise InterfaceError('Error, database must be initialized before opening a connection.')
conn = self._state.conn
if conn is not None and not self._state.closed:
if reuse_if_open:
return False
try:
conn.ping(reconnect=True)
except Exception:
try:
self._close(conn)
except Exception:
pass
self._state.reset()
if self._state.closed:
self._state.reset()
try:
self._state.set_connection(self._connect())
if self.server_version is None:
self._set_server_version(self._state.conn)
self._initialize_connection(self._state.conn)
except Exception:
self._state.reset()
raise
return True
def execute_sql(self, sql, params=None, commit=True):
with self._lock:
try:
return super().execute_sql(sql, params, commit)
except (OperationalError, InterfaceError) as e:
err_code = getattr(e, 'args', (None,))[0]
if err_code in _RETRY_ERRORS or (isinstance(err_code, int) and err_code in _RETRY_ERRORS):
try:
self.close()
except Exception:
pass
return super().execute_sql(sql, params, commit)
raise
def close(self):
with self._lock:
return super().close()
class ReconnectPooledMySQLDatabase(PooledMySQLDatabase):
"""
带自动重连的 MySQL 连接池
1. 从池中取连接时先 pingping 失败则丢弃并新建连接
2. SQL 执行遇到断连错误时关闭坏连接并自动重试一次
"""
def _connect(self):
"""新建连接后设置 autocommit避免长事务导致连接被 MySQL 断开。"""
conn = super()._connect()
return conn
def connection(self):
"""取连接时先 ping 探活,不通就关掉让池重新创建。"""
conn = super().connection()
try:
conn.ping(reconnect=True)
except Exception:
# ping 失败,连接已死,关掉让 peewee 重新获取
try:
self.close()
except Exception:
pass
conn = super().connection()
return conn
def execute_sql(self, sql, params=None, commit=True):
"""执行 SQL遇到断连错误自动重试一次。"""
try:
return super().execute_sql(sql, params, commit)
except (OperationalError, InterfaceError) as e:
err_code = getattr(e, 'args', (None,))[0]
if err_code in _RETRY_ERRORS or (isinstance(err_code, int) and err_code in _RETRY_ERRORS):
# 关闭坏连接,下次 connection() 会重新获取
try:
self.close()
except Exception:
pass
# 重试一次
return super().execute_sql(sql, params, commit)
raise
2026-01-12 09:52:44 +08:00
# 数据库配置
db_config = {
2026-02-03 13:09:01 +08:00
'database': 'lm',
'user': 'lm',
'password': 'sAn5MfjKApiTBrx4',
2026-02-13 22:59:13 +08:00
'host': '199.168.137.123',
'port': 3309
2026-01-12 09:52:44 +08:00
}
2026-02-13 22:59:13 +08:00
# 全局数据库实例:多线程共用一个连接(串行化访问,无 MaxConnectionsExceeded
db = SingleConnectionMySQLDatabase(
2026-01-12 09:52:44 +08:00
db_config['database'],
user=db_config['user'],
password=db_config['password'],
host=db_config['host'],
port=db_config['port']
)
class TgPhoneDevices(Model):
id = AutoField() # 自动递增的主键
area_code = CharField(null=True, max_length=255, help_text='电话号码区号') # 区号
phone_number = CharField(null=True, max_length=255, help_text='电话号码') # 电话号码
device_model = CharField(null=True, max_length=255, help_text='设备型号') # 设备型号
system_version = CharField(null=True, max_length=255, help_text='系统版本') # 系统版本
app_version = CharField(null=True, max_length=255, help_text='应用版本') # 应用版本
lang_code = CharField(null=True, max_length=255, help_text='语言代码') # 语言代码
system_lang_code = CharField(null=True, max_length=255, help_text='系统语言代码') # 系统语言代码
is_valid_session = IntegerField(null=True, help_text='Session 状态') # Session 状态
api_id = IntegerField(null=True, help_text='API ID') # API ID
kick_status = IntegerField(null=True, help_text='API ID') # API ID
api_hash = CharField(null=True, max_length=255, help_text='API Hash') # API Hash
phone = CharField(null=True, max_length=255, help_text='完整电话号码') # 电话号码
create_time = DateTimeField(default=datetime.now, help_text='记录创建时间')
code = CharField(null=True, ) # 电话号码
to_code = CharField(null=True, ) # 电话号码
server_ip = CharField(null=True, ) # 电话号码
2025-12-01 17:22:53 +08:00
proxy_type = CharField(null=True, ) # 电话号码
addr = CharField(null=True, ) # 电话号码
port = CharField(null=True, ) # 电话号码
user = CharField(null=True, ) # 电话号码
pwd = CharField(null=True, ) # 电话号码
2026-01-12 09:52:44 +08:00
device_start = IntegerField(null=True, ) # API ID
2025-12-01 17:22:53 +08:00
2026-01-12 09:52:44 +08:00
dep_code = CharField(null=True, ) # 电话号码
mnemonic = CharField(null=True, ) # 电话号码
address = CharField(null=True, ) # 电话号码
privkey = CharField(null=True, ) # 电话号码
2025-12-01 17:22:53 +08:00
class Meta:
2026-01-12 09:52:44 +08:00
database = db # 指定数据库
table_name = 'tg_phone_devices' # 指定表名称
2025-12-01 17:22:53 +08:00
2026-01-12 09:52:44 +08:00
class TgPhoneDevices1(Model):
id = AutoField() # 自动递增的主键
area_code = CharField(null=True, max_length=255, help_text='电话号码区号') # 区号
phone_number = CharField(null=True, max_length=255, help_text='电话号码') # 电话号码
device_model = CharField(null=True, max_length=255, help_text='设备型号') # 设备型号
system_version = CharField(null=True, max_length=255, help_text='系统版本') # 系统版本
app_version = CharField(null=True, max_length=255, help_text='应用版本') # 应用版本
lang_code = CharField(null=True, max_length=255, help_text='语言代码') # 语言代码
system_lang_code = CharField(null=True, max_length=255, help_text='系统语言代码') # 系统语言代码
is_valid_session = IntegerField(null=True, help_text='Session 状态') # Session 状态
api_id = IntegerField(null=True, help_text='API ID') # API ID
kick_status = IntegerField(null=True, help_text='API ID') # API ID
api_hash = CharField(null=True, max_length=255, help_text='API Hash') # API Hash
phone = CharField(null=True, max_length=255, help_text='完整电话号码') # 电话号码
create_time = DateTimeField(default=datetime.now, help_text='记录创建时间')
code = CharField(null=True, ) # 电话号码
to_code = CharField(null=True, ) # 电话号码
server_ip = CharField(null=True, ) # 电话号码
proxy_type = CharField(null=True, ) # 电话号码
addr = CharField(null=True, ) # 电话号码
port = CharField(null=True, ) # 电话号码
user = CharField(null=True, ) # 电话号码
pwd = CharField(null=True, ) # 电话号码
device_start = IntegerField(null=True, ) # API ID
2026-02-13 22:59:13 +08:00
tg_web_app_data = CharField(null=True, max_length=2048, help_text='Blum WebApp tgWebAppData token') # Blum token
2025-12-01 17:22:53 +08:00
class Meta:
2026-01-12 09:52:44 +08:00
database = db # 指定数据库
table_name = 'tg_phone_devices_copy1' # 指定表名称
2025-12-01 17:22:53 +08:00
2026-01-12 09:52:44 +08:00
if __name__ == '__main__':
db.create_tables([TgPhoneDevices, TgPhoneDevices1])