Files
lm_code/evm_助记词查询余额.py
2026-01-12 14:27:56 +08:00

416 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
EVM助记词碰撞工具 - 高速版本
批量生成助记词并检查余额,发现有钱包自动保存
依赖安装:
pip install mnemonic eth-account web3 loguru
使用方法:
python evm_助记词查询余额.py
"""
import os
import json
import threading
from datetime import datetime
from mnemonic import Mnemonic
from eth_account import Account
from web3 import Web3
from loguru import logger
import time
# 尝试导入POA中间件
try:
from web3.middleware import geth_poa_middleware
except ImportError:
try:
from web3.middleware.geth_poa import geth_poa_middleware
except ImportError:
geth_poa_middleware = None
# 配置日志
logger.remove()
logger.add(lambda msg: print(msg, end=''), format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | {message}", level="INFO")
# 全局统计
stats = {
'total_checked': 0,
'found_balance': 0,
'start_time': time.time(),
'lock': threading.Lock()
}
# 结果保存文件
RESULTS_FILE = "found_wallets.txt"
SAVE_LOCK = threading.Lock()
class FastEVMWallet:
"""高速EVM钱包碰撞工具"""
# 优化的RPC端点使用多个备用节点提高速度
RPC_ENDPOINTS = {
"ethereum": [
"https://eth.llamarpc.com",
"https://rpc.ankr.com/eth",
"https://ethereum.publicnode.com",
],
"bsc": [
"https://bsc-dataseed1.binance.org",
"https://bsc-dataseed2.binance.org",
"https://rpc.ankr.com/bsc",
],
# "polygon": [
# "https://polygon-rpc.com",
# "https://rpc.ankr.com/polygon",
# ],
# "arbitrum": [
# "https://arb1.arbitrum.io/rpc",
# "https://rpc.ankr.com/arbitrum",
# ],
# "optimism": [
# "https://mainnet.optimism.io",
# "https://rpc.ankr.com/optimism",
# ],
"base": [
"https://mainnet.base.org",
"https://rpc.ankr.com/base",
],
}
def __init__(self, chains=None):
"""
初始化
Args:
chains: 要查询的链列表None表示查询所有链
"""
self.mnemo = Mnemonic("english")
Account.enable_unaudited_hdwallet_features()
# 选择要查询的链(默认只查询主要链以提高速度)
if chains is None:
self.chains = ["ethereum", "bsc", "polygon", "arbitrum", "base"]
else:
self.chains = chains
# 为每个链创建Web3连接池
self.w3_pools = {}
self._init_connections()
def _init_connections(self):
"""初始化RPC连接"""
logger.info("正在初始化RPC连接...")
for chain in self.chains:
if chain in self.RPC_ENDPOINTS:
# 尝试所有RPC节点直到找到一个可用的
w3 = None
for rpc_url in self.RPC_ENDPOINTS[chain]:
try:
# 设置更短的超时时间,避免阻塞
w3 = Web3(Web3.HTTPProvider(rpc_url, request_kwargs={'timeout': 3}))
# 某些链需要POA中间件
if chain in ["bsc", "polygon"] and geth_poa_middleware is not None:
try:
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
except:
pass
# 测试连接(快速测试)
try:
if w3.is_connected():
self.w3_pools[chain] = w3
logger.info(f"{chain} 连接成功: {rpc_url}")
break
except:
continue
except Exception as e:
continue
if chain not in self.w3_pools:
logger.warning(f"{chain} 连接失败,将跳过该链")
def generate_mnemonic(self, strength=128):
"""快速生成助记词"""
return self.mnemo.generate(strength=strength)
def mnemonic_to_address(self, mnemonic, account_index=0):
"""
从助记词快速生成地址(不生成私钥,节省时间)
Returns:
str: 钱包地址失败返回None
"""
try:
account = Account.from_mnemonic(mnemonic, account_path=f"m/44'/60'/0'/0/{account_index}")
return account.address
except:
return None
def quick_check_balance(self, address):
"""
快速检查地址是否有余额(只检查主要链,一旦发现余额就返回)
Returns:
dict: 如果有余额返回余额信息否则返回None
"""
# 只检查最快的链ethereum和bsc提高速度
priority_chains = ["ethereum", "bsc"]
for chain in priority_chains:
if chain not in self.chains:
continue
try:
w3 = self.w3_pools.get(chain)
if not w3:
continue
# 查询余额(设置超时,避免阻塞)
try:
balance_wei = w3.eth.get_balance(address)
if balance_wei > 0:
balance_eth = Web3.from_wei(balance_wei, 'ether')
return {
'chain': chain,
'address': address,
'balance_wei': balance_wei,
'balance_eth': float(balance_eth),
'balance_formatted': f"{balance_eth:.6f} ETH"
}
except Exception as e:
# 查询失败,尝试下一个链
continue
except:
continue
return None
def check_all_chains(self, address):
"""
检查所有链的余额(发现余额后调用)
Returns:
list: 所有有余额的链信息
"""
results = []
for chain in self.chains:
try:
w3 = self.w3_pools.get(chain)
if not w3 or not w3.is_connected():
continue
balance_wei = w3.eth.get_balance(address)
if balance_wei > 0:
balance_eth = Web3.from_wei(balance_wei, 'ether')
results.append({
'chain': chain,
'balance_wei': balance_wei,
'balance_eth': float(balance_eth),
'balance_formatted': f"{balance_eth:.6f} ETH"
})
except:
continue
return results
def save_found_wallet(mnemonic, address, private_key, balances):
"""保存发现的钱包信息"""
with SAVE_LOCK:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
info = {
'timestamp': timestamp,
'mnemonic': mnemonic,
'address': address,
'private_key': private_key,
'balances': balances
}
# 保存到文件
with open(RESULTS_FILE, 'a', encoding='utf-8') as f:
f.write("="*80 + "\n")
f.write(f"发现时间: {timestamp}\n")
f.write(f"助记词: {mnemonic}\n")
f.write(f"地址: {address}\n")
f.write(f"私钥: {private_key}\n")
f.write("余额信息:\n")
for balance in balances:
f.write(f" {balance['chain'].upper()}: {balance['balance_formatted']}\n")
f.write("="*80 + "\n\n")
# 同时保存JSON格式
json_file = RESULTS_FILE.replace('.txt', '.json')
try:
if os.path.exists(json_file):
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
else:
data = []
data.append(info)
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except:
pass
def worker_thread(wallet_tool, thread_id):
"""工作线程:生成助记词并检查余额"""
global stats
# 每个线程独立计数,减少锁竞争
local_count = 0
thread_start_time = time.time()
while True:
try:
# 生成助记词
mnemonic = wallet_tool.generate_mnemonic()
# 生成地址
address = wallet_tool.mnemonic_to_address(mnemonic)
if not address:
local_count += 1
# 立即更新统计(不等待)
if local_count >= 1: # 每次都更新,确保统计正常
with stats['lock']:
stats['total_checked'] += local_count
local_count = 0
continue
# 先更新统计(在查询余额之前)
local_count += 1
if local_count >= 1: # 每次都更新
with stats['lock']:
stats['total_checked'] += local_count
total = stats['total_checked']
found = stats['found_balance']
if total % 20 == 0: # 每20次打印一次
elapsed = time.time() - stats['start_time']
speed = total / elapsed if elapsed > 0 else 0
logger.info(f"已检查: {total} | 发现: {found} | 速度: {speed:.1f} 个/秒")
local_count = 0
# 快速检查余额(只检查主要链,设置超时)
try:
balance_info = wallet_tool.quick_check_balance(address)
except:
balance_info = None
# 如果发现余额
if balance_info:
# 生成私钥
account = Account.from_mnemonic(mnemonic, account_path="m/44'/60'/0'/0/0")
private_key = account.key.hex()
# 检查所有链的余额
all_balances = wallet_tool.check_all_chains(address)
# 保存结果
save_found_wallet(mnemonic, address, private_key, all_balances)
# 更新统计并打印
with stats['lock']:
stats['found_balance'] += 1
logger.success(f"🎉 发现余额钱包!")
logger.success(f"地址: {address}")
logger.success(f"助记词: {mnemonic}")
for bal in all_balances:
logger.success(f" {bal['chain'].upper()}: {bal['balance_formatted']}")
except Exception as e:
# 即使出错也更新统计
local_count += 1
if local_count >= 1:
with stats['lock']:
stats['total_checked'] += local_count
local_count = 0
continue
def main():
"""主函数"""
print("="*80)
print("EVM助记词碰撞工具 - 高速版本")
print("="*80)
print(f"结果将保存到: {RESULTS_FILE}")
print(f"查询的链: ethereum, bsc, polygon, arbitrum, base")
print("="*80)
print()
# 选择要查询的链(可以修改这里)
chains = ["ethereum", "bsc", "polygon", "arbitrum", "base"]
# 创建钱包工具
wallet_tool = FastEVMWallet(chains=chains)
# 设置线程数根据CPU核心数调整
num_threads = os.cpu_count() or 4
print(f"启动 {num_threads} 个工作线程...")
print("按 Ctrl+C 停止程序\n")
# 创建线程
threads = []
for i in range(num_threads):
t = threading.Thread(target=worker_thread, args=(wallet_tool, i), daemon=True)
t.start()
threads.append(t)
# 等待一小段时间确保线程启动
time.sleep(1)
# 检查是否有线程在运行
active_threads = sum(1 for t in threads if t.is_alive())
if active_threads == 0:
logger.error("所有工作线程都停止了请检查RPC连接")
return
else:
logger.info(f"{active_threads} 个工作线程正在运行")
# 测试:生成一个测试地址并查询,验证功能是否正常
logger.info("正在测试功能...")
test_mnemonic = wallet_tool.generate_mnemonic()
test_address = wallet_tool.mnemonic_to_address(test_mnemonic)
if test_address:
logger.info(f"测试地址生成成功: {test_address[:10]}...")
test_balance = wallet_tool.quick_check_balance(test_address)
if test_balance is None:
logger.info("测试余额查询成功(无余额,正常)")
else:
logger.warning(f"测试发现余额!这不太可能,请检查")
else:
logger.error("测试地址生成失败!")
try:
# 主线程等待
last_count = 0
while True:
time.sleep(2) # 每2秒更新一次显示
# 定期打印统计信息
elapsed = time.time() - stats['start_time']
current_count = stats['total_checked']
# 如果计数没有变化,可能是线程有问题
if elapsed > 10 and current_count == last_count and current_count == 0:
logger.warning("警告工作线程可能没有正常工作请检查RPC连接")
if elapsed > 0:
speed = current_count / elapsed if current_count > 0 else 0
print(f"\r运行时间: {int(elapsed)}秒 | 已检查: {current_count} | 发现: {stats['found_balance']} | 速度: {speed:.1f} 个/秒", end='', flush=True)
last_count = current_count
except KeyboardInterrupt:
print("\n\n正在停止...")
print(f"\n最终统计:")
print(f" 总检查数: {stats['total_checked']}")
print(f" 发现余额: {stats['found_balance']}")
print(f" 结果已保存到: {RESULTS_FILE}")
if __name__ == "__main__":
main()