This commit is contained in:
27942
2026-01-12 14:27:56 +08:00
parent 4ad7545422
commit a06c4cf12f
2 changed files with 427 additions and 2 deletions

View File

@@ -0,0 +1,415 @@
"""
EVM助记词碰撞工具 - 高速版本
批量生成助记词并检查余额,发现有钱包自动保存
依赖安装:
pip install mnemonic eth-account web3 loguru
使用方法:
python evm_助记词查询余额.py
"""
import os
import json
import threading
from datetime import datetime
from mnemonic import Mnemonic
from eth_account import Account
from web3 import Web3
from loguru import logger
import time
# 尝试导入POA中间件
try:
from web3.middleware import geth_poa_middleware
except ImportError:
try:
from web3.middleware.geth_poa import geth_poa_middleware
except ImportError:
geth_poa_middleware = None
# 配置日志
logger.remove()
logger.add(lambda msg: print(msg, end=''), format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | {message}", level="INFO")
# 全局统计
stats = {
'total_checked': 0,
'found_balance': 0,
'start_time': time.time(),
'lock': threading.Lock()
}
# 结果保存文件
RESULTS_FILE = "found_wallets.txt"
SAVE_LOCK = threading.Lock()
class FastEVMWallet:
"""高速EVM钱包碰撞工具"""
# 优化的RPC端点使用多个备用节点提高速度
RPC_ENDPOINTS = {
"ethereum": [
"https://eth.llamarpc.com",
"https://rpc.ankr.com/eth",
"https://ethereum.publicnode.com",
],
"bsc": [
"https://bsc-dataseed1.binance.org",
"https://bsc-dataseed2.binance.org",
"https://rpc.ankr.com/bsc",
],
# "polygon": [
# "https://polygon-rpc.com",
# "https://rpc.ankr.com/polygon",
# ],
# "arbitrum": [
# "https://arb1.arbitrum.io/rpc",
# "https://rpc.ankr.com/arbitrum",
# ],
# "optimism": [
# "https://mainnet.optimism.io",
# "https://rpc.ankr.com/optimism",
# ],
"base": [
"https://mainnet.base.org",
"https://rpc.ankr.com/base",
],
}
def __init__(self, chains=None):
"""
初始化
Args:
chains: 要查询的链列表None表示查询所有链
"""
self.mnemo = Mnemonic("english")
Account.enable_unaudited_hdwallet_features()
# 选择要查询的链(默认只查询主要链以提高速度)
if chains is None:
self.chains = ["ethereum", "bsc", "polygon", "arbitrum", "base"]
else:
self.chains = chains
# 为每个链创建Web3连接池
self.w3_pools = {}
self._init_connections()
def _init_connections(self):
"""初始化RPC连接"""
logger.info("正在初始化RPC连接...")
for chain in self.chains:
if chain in self.RPC_ENDPOINTS:
# 尝试所有RPC节点直到找到一个可用的
w3 = None
for rpc_url in self.RPC_ENDPOINTS[chain]:
try:
# 设置更短的超时时间,避免阻塞
w3 = Web3(Web3.HTTPProvider(rpc_url, request_kwargs={'timeout': 3}))
# 某些链需要POA中间件
if chain in ["bsc", "polygon"] and geth_poa_middleware is not None:
try:
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
except:
pass
# 测试连接(快速测试)
try:
if w3.is_connected():
self.w3_pools[chain] = w3
logger.info(f"{chain} 连接成功: {rpc_url}")
break
except:
continue
except Exception as e:
continue
if chain not in self.w3_pools:
logger.warning(f"{chain} 连接失败,将跳过该链")
def generate_mnemonic(self, strength=128):
"""快速生成助记词"""
return self.mnemo.generate(strength=strength)
def mnemonic_to_address(self, mnemonic, account_index=0):
"""
从助记词快速生成地址(不生成私钥,节省时间)
Returns:
str: 钱包地址失败返回None
"""
try:
account = Account.from_mnemonic(mnemonic, account_path=f"m/44'/60'/0'/0/{account_index}")
return account.address
except:
return None
def quick_check_balance(self, address):
"""
快速检查地址是否有余额(只检查主要链,一旦发现余额就返回)
Returns:
dict: 如果有余额返回余额信息否则返回None
"""
# 只检查最快的链ethereum和bsc提高速度
priority_chains = ["ethereum", "bsc"]
for chain in priority_chains:
if chain not in self.chains:
continue
try:
w3 = self.w3_pools.get(chain)
if not w3:
continue
# 查询余额(设置超时,避免阻塞)
try:
balance_wei = w3.eth.get_balance(address)
if balance_wei > 0:
balance_eth = Web3.from_wei(balance_wei, 'ether')
return {
'chain': chain,
'address': address,
'balance_wei': balance_wei,
'balance_eth': float(balance_eth),
'balance_formatted': f"{balance_eth:.6f} ETH"
}
except Exception as e:
# 查询失败,尝试下一个链
continue
except:
continue
return None
def check_all_chains(self, address):
"""
检查所有链的余额(发现余额后调用)
Returns:
list: 所有有余额的链信息
"""
results = []
for chain in self.chains:
try:
w3 = self.w3_pools.get(chain)
if not w3 or not w3.is_connected():
continue
balance_wei = w3.eth.get_balance(address)
if balance_wei > 0:
balance_eth = Web3.from_wei(balance_wei, 'ether')
results.append({
'chain': chain,
'balance_wei': balance_wei,
'balance_eth': float(balance_eth),
'balance_formatted': f"{balance_eth:.6f} ETH"
})
except:
continue
return results
def save_found_wallet(mnemonic, address, private_key, balances):
"""保存发现的钱包信息"""
with SAVE_LOCK:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
info = {
'timestamp': timestamp,
'mnemonic': mnemonic,
'address': address,
'private_key': private_key,
'balances': balances
}
# 保存到文件
with open(RESULTS_FILE, 'a', encoding='utf-8') as f:
f.write("="*80 + "\n")
f.write(f"发现时间: {timestamp}\n")
f.write(f"助记词: {mnemonic}\n")
f.write(f"地址: {address}\n")
f.write(f"私钥: {private_key}\n")
f.write("余额信息:\n")
for balance in balances:
f.write(f" {balance['chain'].upper()}: {balance['balance_formatted']}\n")
f.write("="*80 + "\n\n")
# 同时保存JSON格式
json_file = RESULTS_FILE.replace('.txt', '.json')
try:
if os.path.exists(json_file):
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
else:
data = []
data.append(info)
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except:
pass
def worker_thread(wallet_tool, thread_id):
"""工作线程:生成助记词并检查余额"""
global stats
# 每个线程独立计数,减少锁竞争
local_count = 0
thread_start_time = time.time()
while True:
try:
# 生成助记词
mnemonic = wallet_tool.generate_mnemonic()
# 生成地址
address = wallet_tool.mnemonic_to_address(mnemonic)
if not address:
local_count += 1
# 立即更新统计(不等待)
if local_count >= 1: # 每次都更新,确保统计正常
with stats['lock']:
stats['total_checked'] += local_count
local_count = 0
continue
# 先更新统计(在查询余额之前)
local_count += 1
if local_count >= 1: # 每次都更新
with stats['lock']:
stats['total_checked'] += local_count
total = stats['total_checked']
found = stats['found_balance']
if total % 20 == 0: # 每20次打印一次
elapsed = time.time() - stats['start_time']
speed = total / elapsed if elapsed > 0 else 0
logger.info(f"已检查: {total} | 发现: {found} | 速度: {speed:.1f} 个/秒")
local_count = 0
# 快速检查余额(只检查主要链,设置超时)
try:
balance_info = wallet_tool.quick_check_balance(address)
except:
balance_info = None
# 如果发现余额
if balance_info:
# 生成私钥
account = Account.from_mnemonic(mnemonic, account_path="m/44'/60'/0'/0/0")
private_key = account.key.hex()
# 检查所有链的余额
all_balances = wallet_tool.check_all_chains(address)
# 保存结果
save_found_wallet(mnemonic, address, private_key, all_balances)
# 更新统计并打印
with stats['lock']:
stats['found_balance'] += 1
logger.success(f"🎉 发现余额钱包!")
logger.success(f"地址: {address}")
logger.success(f"助记词: {mnemonic}")
for bal in all_balances:
logger.success(f" {bal['chain'].upper()}: {bal['balance_formatted']}")
except Exception as e:
# 即使出错也更新统计
local_count += 1
if local_count >= 1:
with stats['lock']:
stats['total_checked'] += local_count
local_count = 0
continue
def main():
"""主函数"""
print("="*80)
print("EVM助记词碰撞工具 - 高速版本")
print("="*80)
print(f"结果将保存到: {RESULTS_FILE}")
print(f"查询的链: ethereum, bsc, polygon, arbitrum, base")
print("="*80)
print()
# 选择要查询的链(可以修改这里)
chains = ["ethereum", "bsc", "polygon", "arbitrum", "base"]
# 创建钱包工具
wallet_tool = FastEVMWallet(chains=chains)
# 设置线程数根据CPU核心数调整
num_threads = os.cpu_count() or 4
print(f"启动 {num_threads} 个工作线程...")
print("按 Ctrl+C 停止程序\n")
# 创建线程
threads = []
for i in range(num_threads):
t = threading.Thread(target=worker_thread, args=(wallet_tool, i), daemon=True)
t.start()
threads.append(t)
# 等待一小段时间确保线程启动
time.sleep(1)
# 检查是否有线程在运行
active_threads = sum(1 for t in threads if t.is_alive())
if active_threads == 0:
logger.error("所有工作线程都停止了请检查RPC连接")
return
else:
logger.info(f"{active_threads} 个工作线程正在运行")
# 测试:生成一个测试地址并查询,验证功能是否正常
logger.info("正在测试功能...")
test_mnemonic = wallet_tool.generate_mnemonic()
test_address = wallet_tool.mnemonic_to_address(test_mnemonic)
if test_address:
logger.info(f"测试地址生成成功: {test_address[:10]}...")
test_balance = wallet_tool.quick_check_balance(test_address)
if test_balance is None:
logger.info("测试余额查询成功(无余额,正常)")
else:
logger.warning(f"测试发现余额!这不太可能,请检查")
else:
logger.error("测试地址生成失败!")
try:
# 主线程等待
last_count = 0
while True:
time.sleep(2) # 每2秒更新一次显示
# 定期打印统计信息
elapsed = time.time() - stats['start_time']
current_count = stats['total_checked']
# 如果计数没有变化,可能是线程有问题
if elapsed > 10 and current_count == last_count and current_count == 0:
logger.warning("警告工作线程可能没有正常工作请检查RPC连接")
if elapsed > 0:
speed = current_count / elapsed if current_count > 0 else 0
print(f"\r运行时间: {int(elapsed)}秒 | 已检查: {current_count} | 发现: {stats['found_balance']} | 速度: {speed:.1f} 个/秒", end='', flush=True)
last_count = current_count
except KeyboardInterrupt:
print("\n\n正在停止...")
print(f"\n最终统计:")
print(f" 总检查数: {stats['total_checked']}")
print(f" 发现余额: {stats['found_balance']}")
print(f" 结果已保存到: {RESULTS_FILE}")
if __name__ == "__main__":
main()

View File

@@ -320,11 +320,21 @@ def backtest_15m_trend_optimized(dates: List[str]):
# ========================= 运行示例(优化版盈利计算) =========================
if __name__ == '__main__':
dates = []
# 获取当前日期
today = datetime.datetime.now()
target_year = 2026
for month in range(1, 2):
# 获取该月的实际天数
days_in_month = calendar.monthrange(2026, month)[1]
days_in_month = calendar.monthrange(target_year, month)[1]
for day in range(1, days_in_month + 1):
dates.append(f"2025-{month:02d}-{day:02d}")
# 只添加今天及之前的日期
date_str = f"{target_year}-{month:02d}-{day:02d}"
date_obj = datetime.datetime.strptime(date_str, '%Y-%m-%d')
# 如果日期在今天之后,跳过
if date_obj > today:
break
dates.append(date_str)
print(dates)