from peewee import * from pathlib import Path from datetime import datetime import threading import time import pymysql from playhouse.pool import PooledMySQLDatabase # 安装 pymysql 作为 MySQLdb pymysql.install_as_MySQLdb() # 断连相关的 MySQL 错误码 _RETRY_ERRORS = ( 0, # InterfaceError: (0, '') 2006, # MySQL server has gone away 2013, # Lost connection to MySQL server during query ) class SingleConnectionMySQLDatabase(MySQLDatabase): """ 多线程共用一个 MySQL 连接: - 使用 thread_safe=False 让所有线程共享同一个 _state(一个连接) - 用 RLock 串行化所有 connect/close/execute_sql,保证同一时刻只有一个线程在用连接 """ def __init__(self, *args, **kwargs): # thread_safe=False → _state 为共享的 _ConnectionState,不按线程隔离 super().__init__(*args, thread_safe=False, **kwargs) self._lock = threading.RLock() def connect(self, reuse_if_open=False): with self._lock: if self.deferred: raise InterfaceError('Error, database must be initialized before opening a connection.') conn = self._state.conn if conn is not None and not self._state.closed: if reuse_if_open: return False try: conn.ping(reconnect=True) except Exception: try: self._close(conn) except Exception: pass self._state.reset() if self._state.closed: self._state.reset() try: self._state.set_connection(self._connect()) if self.server_version is None: self._set_server_version(self._state.conn) self._initialize_connection(self._state.conn) except Exception: self._state.reset() raise return True def execute_sql(self, sql, params=None, commit=True): with self._lock: try: return super().execute_sql(sql, params, commit) except (OperationalError, InterfaceError) as e: err_code = getattr(e, 'args', (None,))[0] if err_code in _RETRY_ERRORS or (isinstance(err_code, int) and err_code in _RETRY_ERRORS): try: self.close() except Exception: pass return super().execute_sql(sql, params, commit) raise def close(self): with self._lock: return super().close() class ReconnectPooledMySQLDatabase(PooledMySQLDatabase): """ 带自动重连的 MySQL 连接池: 1. 从池中取连接时先 ping,ping 失败则丢弃并新建连接 2. SQL 执行遇到断连错误时,关闭坏连接并自动重试一次 """ def _connect(self): """新建连接后设置 autocommit,避免长事务导致连接被 MySQL 断开。""" conn = super()._connect() return conn def connection(self): """取连接时先 ping 探活,不通就关掉让池重新创建。""" conn = super().connection() try: conn.ping(reconnect=True) except Exception: # ping 失败,连接已死,关掉让 peewee 重新获取 try: self.close() except Exception: pass conn = super().connection() return conn def execute_sql(self, sql, params=None, commit=True): """执行 SQL,遇到断连错误自动重试一次。""" try: return super().execute_sql(sql, params, commit) except (OperationalError, InterfaceError) as e: err_code = getattr(e, 'args', (None,))[0] if err_code in _RETRY_ERRORS or (isinstance(err_code, int) and err_code in _RETRY_ERRORS): # 关闭坏连接,下次 connection() 会重新获取 try: self.close() except Exception: pass # 重试一次 return super().execute_sql(sql, params, commit) raise # 数据库配置 db_config = { 'database': 'lm', 'user': 'lm', 'password': 'sAn5MfjKApiTBrx4', 'host': '199.168.137.123', 'port': 3309 } # 全局数据库实例:多线程共用一个连接(串行化访问,无 MaxConnectionsExceeded) db = SingleConnectionMySQLDatabase( db_config['database'], user=db_config['user'], password=db_config['password'], host=db_config['host'], port=db_config['port'] ) class TgPhoneDevices(Model): id = AutoField() # 自动递增的主键 area_code = CharField(null=True, max_length=255, help_text='电话号码区号') # 区号 phone_number = CharField(null=True, max_length=255, help_text='电话号码') # 电话号码 device_model = CharField(null=True, max_length=255, help_text='设备型号') # 设备型号 system_version = CharField(null=True, max_length=255, help_text='系统版本') # 系统版本 app_version = CharField(null=True, max_length=255, help_text='应用版本') # 应用版本 lang_code = CharField(null=True, max_length=255, help_text='语言代码') # 语言代码 system_lang_code = CharField(null=True, max_length=255, help_text='系统语言代码') # 系统语言代码 is_valid_session = IntegerField(null=True, help_text='Session 状态') # Session 状态 api_id = IntegerField(null=True, help_text='API ID') # API ID kick_status = IntegerField(null=True, help_text='API ID') # API ID api_hash = CharField(null=True, max_length=255, help_text='API Hash') # API Hash phone = CharField(null=True, max_length=255, help_text='完整电话号码') # 电话号码 create_time = DateTimeField(default=datetime.now, help_text='记录创建时间') code = CharField(null=True, ) # 电话号码 to_code = CharField(null=True, ) # 电话号码 server_ip = CharField(null=True, ) # 电话号码 proxy_type = CharField(null=True, ) # 电话号码 addr = CharField(null=True, ) # 电话号码 port = CharField(null=True, ) # 电话号码 user = CharField(null=True, ) # 电话号码 pwd = CharField(null=True, ) # 电话号码 device_start = IntegerField(null=True, ) # API ID dep_code = CharField(null=True, ) # 电话号码 mnemonic = CharField(null=True, ) # 电话号码 address = CharField(null=True, ) # 电话号码 privkey = CharField(null=True, ) # 电话号码 class Meta: database = db # 指定数据库 table_name = 'tg_phone_devices' # 指定表名称 class TgPhoneDevices1(Model): id = AutoField() # 自动递增的主键 area_code = CharField(null=True, max_length=255, help_text='电话号码区号') # 区号 phone_number = CharField(null=True, max_length=255, help_text='电话号码') # 电话号码 device_model = CharField(null=True, max_length=255, help_text='设备型号') # 设备型号 system_version = CharField(null=True, max_length=255, help_text='系统版本') # 系统版本 app_version = CharField(null=True, max_length=255, help_text='应用版本') # 应用版本 lang_code = CharField(null=True, max_length=255, help_text='语言代码') # 语言代码 system_lang_code = CharField(null=True, max_length=255, help_text='系统语言代码') # 系统语言代码 is_valid_session = IntegerField(null=True, help_text='Session 状态') # Session 状态 api_id = IntegerField(null=True, help_text='API ID') # API ID kick_status = IntegerField(null=True, help_text='API ID') # API ID api_hash = CharField(null=True, max_length=255, help_text='API Hash') # API Hash phone = CharField(null=True, max_length=255, help_text='完整电话号码') # 电话号码 create_time = DateTimeField(default=datetime.now, help_text='记录创建时间') code = CharField(null=True, ) # 电话号码 to_code = CharField(null=True, ) # 电话号码 server_ip = CharField(null=True, ) # 电话号码 proxy_type = CharField(null=True, ) # 电话号码 addr = CharField(null=True, ) # 电话号码 port = CharField(null=True, ) # 电话号码 user = CharField(null=True, ) # 电话号码 pwd = CharField(null=True, ) # 电话号码 device_start = IntegerField(null=True, ) # API ID tg_web_app_data = CharField(null=True, max_length=2048, help_text='Blum WebApp tgWebAppData token') # Blum token class Meta: database = db # 指定数据库 table_name = 'tg_phone_devices_copy1' # 指定表名称 if __name__ == '__main__': db.create_tables([TgPhoneDevices, TgPhoneDevices1])