From a06c4cf12f22c3e7d918f5ef4282d7e3cd66bcc5 Mon Sep 17 00:00:00 2001 From: 27942 Date: Mon, 12 Jan 2026 14:27:56 +0800 Subject: [PATCH] fewfef --- evm_助记词查询余额.py | 415 +++++++++++++++++++ weex/长期持有信号/读取数据库数据-30分钟版.py | 14 +- 2 files changed, 427 insertions(+), 2 deletions(-) create mode 100644 evm_助记词查询余额.py diff --git a/evm_助记词查询余额.py b/evm_助记词查询余额.py new file mode 100644 index 0000000..7e8f301 --- /dev/null +++ b/evm_助记词查询余额.py @@ -0,0 +1,415 @@ +""" +EVM助记词碰撞工具 - 高速版本 +批量生成助记词并检查余额,发现有钱包自动保存 + +依赖安装: + pip install mnemonic eth-account web3 loguru + +使用方法: + python evm_助记词查询余额.py +""" + +import os +import json +import threading +from datetime import datetime +from mnemonic import Mnemonic +from eth_account import Account +from web3 import Web3 +from loguru import logger +import time + +# 尝试导入POA中间件 +try: + from web3.middleware import geth_poa_middleware +except ImportError: + try: + from web3.middleware.geth_poa import geth_poa_middleware + except ImportError: + geth_poa_middleware = None + +# 配置日志 +logger.remove() +logger.add(lambda msg: print(msg, end=''), format="{time:HH:mm:ss} | {level: <8} | {message}", level="INFO") + +# 全局统计 +stats = { + 'total_checked': 0, + 'found_balance': 0, + 'start_time': time.time(), + 'lock': threading.Lock() +} + +# 结果保存文件 +RESULTS_FILE = "found_wallets.txt" +SAVE_LOCK = threading.Lock() + + +class FastEVMWallet: + """高速EVM钱包碰撞工具""" + + # 优化的RPC端点(使用多个备用节点提高速度) + RPC_ENDPOINTS = { + "ethereum": [ + "https://eth.llamarpc.com", + "https://rpc.ankr.com/eth", + "https://ethereum.publicnode.com", + ], + "bsc": [ + "https://bsc-dataseed1.binance.org", + "https://bsc-dataseed2.binance.org", + "https://rpc.ankr.com/bsc", + ], + # "polygon": [ + # "https://polygon-rpc.com", + # "https://rpc.ankr.com/polygon", + # ], + # "arbitrum": [ + # "https://arb1.arbitrum.io/rpc", + # "https://rpc.ankr.com/arbitrum", + # ], + # "optimism": [ + # "https://mainnet.optimism.io", + # "https://rpc.ankr.com/optimism", + # ], + "base": [ + "https://mainnet.base.org", + "https://rpc.ankr.com/base", + ], + } + + def __init__(self, chains=None): + """ + 初始化 + + Args: + chains: 要查询的链列表,None表示查询所有链 + """ + self.mnemo = Mnemonic("english") + Account.enable_unaudited_hdwallet_features() + + # 选择要查询的链(默认只查询主要链以提高速度) + if chains is None: + self.chains = ["ethereum", "bsc", "polygon", "arbitrum", "base"] + else: + self.chains = chains + + # 为每个链创建Web3连接池 + self.w3_pools = {} + self._init_connections() + + def _init_connections(self): + """初始化RPC连接""" + logger.info("正在初始化RPC连接...") + for chain in self.chains: + if chain in self.RPC_ENDPOINTS: + # 尝试所有RPC节点,直到找到一个可用的 + w3 = None + for rpc_url in self.RPC_ENDPOINTS[chain]: + try: + # 设置更短的超时时间,避免阻塞 + w3 = Web3(Web3.HTTPProvider(rpc_url, request_kwargs={'timeout': 3})) + + # 某些链需要POA中间件 + if chain in ["bsc", "polygon"] and geth_poa_middleware is not None: + try: + w3.middleware_onion.inject(geth_poa_middleware, layer=0) + except: + pass + + # 测试连接(快速测试) + try: + if w3.is_connected(): + self.w3_pools[chain] = w3 + logger.info(f"✓ {chain} 连接成功: {rpc_url}") + break + except: + continue + except Exception as e: + continue + + if chain not in self.w3_pools: + logger.warning(f"✗ {chain} 连接失败,将跳过该链") + + def generate_mnemonic(self, strength=128): + """快速生成助记词""" + return self.mnemo.generate(strength=strength) + + def mnemonic_to_address(self, mnemonic, account_index=0): + """ + 从助记词快速生成地址(不生成私钥,节省时间) + + Returns: + str: 钱包地址,失败返回None + """ + try: + account = Account.from_mnemonic(mnemonic, account_path=f"m/44'/60'/0'/0/{account_index}") + return account.address + except: + return None + + def quick_check_balance(self, address): + """ + 快速检查地址是否有余额(只检查主要链,一旦发现余额就返回) + + Returns: + dict: 如果有余额返回余额信息,否则返回None + """ + # 只检查最快的链(ethereum和bsc),提高速度 + priority_chains = ["ethereum", "bsc"] + + for chain in priority_chains: + if chain not in self.chains: + continue + try: + w3 = self.w3_pools.get(chain) + if not w3: + continue + + # 查询余额(设置超时,避免阻塞) + try: + balance_wei = w3.eth.get_balance(address) + + if balance_wei > 0: + balance_eth = Web3.from_wei(balance_wei, 'ether') + return { + 'chain': chain, + 'address': address, + 'balance_wei': balance_wei, + 'balance_eth': float(balance_eth), + 'balance_formatted': f"{balance_eth:.6f} ETH" + } + except Exception as e: + # 查询失败,尝试下一个链 + continue + except: + continue + + return None + + def check_all_chains(self, address): + """ + 检查所有链的余额(发现余额后调用) + + Returns: + list: 所有有余额的链信息 + """ + results = [] + for chain in self.chains: + try: + w3 = self.w3_pools.get(chain) + if not w3 or not w3.is_connected(): + continue + + balance_wei = w3.eth.get_balance(address) + if balance_wei > 0: + balance_eth = Web3.from_wei(balance_wei, 'ether') + results.append({ + 'chain': chain, + 'balance_wei': balance_wei, + 'balance_eth': float(balance_eth), + 'balance_formatted': f"{balance_eth:.6f} ETH" + }) + except: + continue + + return results + + +def save_found_wallet(mnemonic, address, private_key, balances): + """保存发现的钱包信息""" + with SAVE_LOCK: + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + info = { + 'timestamp': timestamp, + 'mnemonic': mnemonic, + 'address': address, + 'private_key': private_key, + 'balances': balances + } + + # 保存到文件 + with open(RESULTS_FILE, 'a', encoding='utf-8') as f: + f.write("="*80 + "\n") + f.write(f"发现时间: {timestamp}\n") + f.write(f"助记词: {mnemonic}\n") + f.write(f"地址: {address}\n") + f.write(f"私钥: {private_key}\n") + f.write("余额信息:\n") + for balance in balances: + f.write(f" {balance['chain'].upper()}: {balance['balance_formatted']}\n") + f.write("="*80 + "\n\n") + + # 同时保存JSON格式 + json_file = RESULTS_FILE.replace('.txt', '.json') + try: + if os.path.exists(json_file): + with open(json_file, 'r', encoding='utf-8') as f: + data = json.load(f) + else: + data = [] + + data.append(info) + with open(json_file, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + except: + pass + + +def worker_thread(wallet_tool, thread_id): + """工作线程:生成助记词并检查余额""" + global stats + + # 每个线程独立计数,减少锁竞争 + local_count = 0 + thread_start_time = time.time() + + while True: + try: + # 生成助记词 + mnemonic = wallet_tool.generate_mnemonic() + + # 生成地址 + address = wallet_tool.mnemonic_to_address(mnemonic) + if not address: + local_count += 1 + # 立即更新统计(不等待) + if local_count >= 1: # 每次都更新,确保统计正常 + with stats['lock']: + stats['total_checked'] += local_count + local_count = 0 + continue + + # 先更新统计(在查询余额之前) + local_count += 1 + if local_count >= 1: # 每次都更新 + with stats['lock']: + stats['total_checked'] += local_count + total = stats['total_checked'] + found = stats['found_balance'] + if total % 20 == 0: # 每20次打印一次 + elapsed = time.time() - stats['start_time'] + speed = total / elapsed if elapsed > 0 else 0 + logger.info(f"已检查: {total} | 发现: {found} | 速度: {speed:.1f} 个/秒") + local_count = 0 + + # 快速检查余额(只检查主要链,设置超时) + try: + balance_info = wallet_tool.quick_check_balance(address) + except: + balance_info = None + + # 如果发现余额 + if balance_info: + # 生成私钥 + account = Account.from_mnemonic(mnemonic, account_path="m/44'/60'/0'/0/0") + private_key = account.key.hex() + + # 检查所有链的余额 + all_balances = wallet_tool.check_all_chains(address) + + # 保存结果 + save_found_wallet(mnemonic, address, private_key, all_balances) + + # 更新统计并打印 + with stats['lock']: + stats['found_balance'] += 1 + + logger.success(f"🎉 发现余额钱包!") + logger.success(f"地址: {address}") + logger.success(f"助记词: {mnemonic}") + for bal in all_balances: + logger.success(f" {bal['chain'].upper()}: {bal['balance_formatted']}") + + except Exception as e: + # 即使出错也更新统计 + local_count += 1 + if local_count >= 1: + with stats['lock']: + stats['total_checked'] += local_count + local_count = 0 + continue + + +def main(): + """主函数""" + print("="*80) + print("EVM助记词碰撞工具 - 高速版本") + print("="*80) + print(f"结果将保存到: {RESULTS_FILE}") + print(f"查询的链: ethereum, bsc, polygon, arbitrum, base") + print("="*80) + print() + + # 选择要查询的链(可以修改这里) + chains = ["ethereum", "bsc", "polygon", "arbitrum", "base"] + + # 创建钱包工具 + wallet_tool = FastEVMWallet(chains=chains) + + # 设置线程数(根据CPU核心数调整) + num_threads = os.cpu_count() or 4 + print(f"启动 {num_threads} 个工作线程...") + print("按 Ctrl+C 停止程序\n") + + # 创建线程 + threads = [] + + for i in range(num_threads): + t = threading.Thread(target=worker_thread, args=(wallet_tool, i), daemon=True) + t.start() + threads.append(t) + + # 等待一小段时间确保线程启动 + time.sleep(1) + + # 检查是否有线程在运行 + active_threads = sum(1 for t in threads if t.is_alive()) + if active_threads == 0: + logger.error("所有工作线程都停止了,请检查RPC连接") + return + else: + logger.info(f"✓ {active_threads} 个工作线程正在运行") + + # 测试:生成一个测试地址并查询,验证功能是否正常 + logger.info("正在测试功能...") + test_mnemonic = wallet_tool.generate_mnemonic() + test_address = wallet_tool.mnemonic_to_address(test_mnemonic) + if test_address: + logger.info(f"测试地址生成成功: {test_address[:10]}...") + test_balance = wallet_tool.quick_check_balance(test_address) + if test_balance is None: + logger.info("测试余额查询成功(无余额,正常)") + else: + logger.warning(f"测试发现余额!这不太可能,请检查") + else: + logger.error("测试地址生成失败!") + + try: + # 主线程等待 + last_count = 0 + while True: + time.sleep(2) # 每2秒更新一次显示 + # 定期打印统计信息 + elapsed = time.time() - stats['start_time'] + current_count = stats['total_checked'] + + # 如果计数没有变化,可能是线程有问题 + if elapsed > 10 and current_count == last_count and current_count == 0: + logger.warning("警告:工作线程可能没有正常工作,请检查RPC连接") + + if elapsed > 0: + speed = current_count / elapsed if current_count > 0 else 0 + print(f"\r运行时间: {int(elapsed)}秒 | 已检查: {current_count} | 发现: {stats['found_balance']} | 速度: {speed:.1f} 个/秒", end='', flush=True) + + last_count = current_count + except KeyboardInterrupt: + print("\n\n正在停止...") + print(f"\n最终统计:") + print(f" 总检查数: {stats['total_checked']}") + print(f" 发现余额: {stats['found_balance']}") + print(f" 结果已保存到: {RESULTS_FILE}") + + +if __name__ == "__main__": + main() diff --git a/weex/长期持有信号/读取数据库数据-30分钟版.py b/weex/长期持有信号/读取数据库数据-30分钟版.py index 6772dc1..db9663e 100644 --- a/weex/长期持有信号/读取数据库数据-30分钟版.py +++ b/weex/长期持有信号/读取数据库数据-30分钟版.py @@ -320,11 +320,21 @@ def backtest_15m_trend_optimized(dates: List[str]): # ========================= 运行示例(优化版盈利计算) ========================= if __name__ == '__main__': dates = [] + # 获取当前日期 + today = datetime.datetime.now() + target_year = 2026 + for month in range(1, 2): # 获取该月的实际天数 - days_in_month = calendar.monthrange(2026, month)[1] + days_in_month = calendar.monthrange(target_year, month)[1] for day in range(1, days_in_month + 1): - dates.append(f"2025-{month:02d}-{day:02d}") + # 只添加今天及之前的日期 + date_str = f"{target_year}-{month:02d}-{day:02d}" + date_obj = datetime.datetime.strptime(date_str, '%Y-%m-%d') + # 如果日期在今天之后,跳过 + if date_obj > today: + break + dates.append(date_str) print(dates)