Files
to_session/evm助记词/core.py
Administrator a0720d80dc fefdwef
2025-11-12 12:54:37 +08:00

588 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import copy
from fractions import Fraction
import json
from multiprocessing import JoinableQueue
import os
from pathlib import Path
import queue
import random
import signal
import time
from diskcache import Cache
from loguru import logger
from hexbytes import HexBytes
import psutil
from tabulate import tabulate
from web3 import Account, Web3
from web3.exceptions import Web3RPCError
from types import SimpleNamespace
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
import re
from datetime import datetime, timedelta
from dateutil import tz
from eth_abi import encode
from eth_utils import to_hex, function_signature_to_4byte_selector
from functools import wraps
from curl_cffi import requests
from urllib.parse import urljoin
from web3.types import Wei
import threading
DEFAULT_ACCEPT_GAS_PRICE = {
# chainId: accept_gas_price
1868: 0.00000000015,
743111: 0.0000001,
743111: 0.0000001,
}
DEFAULT_GAS_FACTOR = 1 # gas放大比例
# ---------start------
EIP_1559_TYPE = 2 # 代表 EIP-1559 交易类型
LEGACY_TYPE = 0 # 代表传统类型的交易Legacy Transaction
# ---------end--------
def send_raw_transaction(
config: SimpleNamespace,
) -> tuple[HexBytes, Literal["成功", "失败"]] | Any:
"""常用支持参数: default_gas_price_enabled accept_gas_price gas_factor scalingFactor eip_txn debug"""
logger.info(f"amout:[{config.amout}] wallet_address:[{config.wallet_address}]")
request_kwargs = None
if hasattr(config, "request_kwargs"):
request_kwargs = config.request_kwargs
logger.info(request_kwargs)
web3 = Web3(Web3.HTTPProvider(config.rpc_url, request_kwargs=request_kwargs))
gas_factor = getattr(config, "gas_factor", DEFAULT_GAS_FACTOR)
if gas_factor != DEFAULT_GAS_FACTOR:
logger.success(f"设置调整gas_factor: {gas_factor}")
gas_price = max(int(web3.eth.gas_price), int(getattr(config, "gasPrice", 0)))
setattr(config, "chainId", web3.eth.chain_id)
if (
config.chainId in DEFAULT_ACCEPT_GAS_PRICE.keys()
and not hasattr(config, "accept_gas_price")
and getattr(config, "default_gas_price_enabled", True)
):
accept_gas_price = DEFAULT_ACCEPT_GAS_PRICE.get(config.chainId)
logger.info(f"设置 default_accept_gas_price:{accept_gas_price}")
config.accept_gas_price = DEFAULT_ACCEPT_GAS_PRICE.get(config.chainId)
if getattr(config, "accept_gas_price", None) and not getattr(
config, "debug", False
):
set_time = datetime.now(tz=tz.gettz("Asia/Shanghai"))
while True:
table = tabulate(
[
[
"accept_gas_price",
web3.to_wei(config.accept_gas_price, "ether"),
set_time.strftime("%Y-%m-%d %H:%M:%S"),
],
[
"gas_price",
gas_price,
datetime.now(tz=tz.gettz("Asia/Shanghai")).strftime(
"%Y-%m-%d %H:%M:%S"
),
],
],
["Description", "Value (wei)", "Time"],
tablefmt="pretty",
stralign="left",
)
print(f"\r{table}", end="", flush=True)
for _ in range(5):
print("\033[A", end="")
if gas_price <= web3.to_wei(config.accept_gas_price, "ether"):
wait_need_retry = getattr(config, "wait_need_retry", True)
if wait_need_retry and datetime.now(
tz=tz.gettz("Asia/Shanghai")
) - set_time > timedelta(minutes=1):
raise RuntimeError(
f"设置的gasPrice:{config.gasPrice}达到预期,但时间过长."
)
break
elif hasattr(config, "gasPrice"):
raise RuntimeError(
f"设置的gasPrice:{config.gasPrice}与设置的accept_gas_price:{config.accept_gas_price}冲突."
)
time.sleep(10)
gas_price = int(web3.eth.gas_price * gas_factor)
for _ in range(20):
print("\033[B", end="")
print("\n")
transaction = {
"from": Web3.to_checksum_address(config.wallet_address),
"value": web3.to_wei(config.amout, "ether"),
}
if getattr(config, "to_address", ""):
transaction["to"] = Web3.to_checksum_address(config.to_address)
if getattr(config, "data", ""):
transaction["data"] = config.data
try:
nonce = web3.eth.get_transaction_count(
Web3.to_checksum_address(config.wallet_address)
)
maxPriorityFeePerGas = getattr(config, "maxPriorityFeePerGas", gas_price)
maxFeePerGas = getattr(config, "maxFeePerGas", gas_price)
if maxPriorityFeePerGas != gas_price and maxFeePerGas == gas_price:
latest_block = web3.eth.get_block("latest")
base_fee = latest_block["baseFeePerGas"]
maxFeePerGas = base_fee + maxPriorityFeePerGas
logger.info(
f"base_fee:{base_fee} maxPriorityFeePerGas:{maxPriorityFeePerGas} maxFeePerGas:{maxFeePerGas}"
)
gas_estimate = web3.eth.estimate_gas(transaction)
logger.info(
f'gas_estimate:{gas_estimate} gas_price: {gas_price} cost: {((gas_estimate * gas_price) + web3.to_wei(config.amout, "ether")) / 10**18} ETH'
)
if getattr(config, "scalingFactor", None):
gas_estimate = gas_estimate * config.scalingFactor
logger.info(
f'scalingFactor:{config.scalingFactor} gas_factor:{gas_factor} gas_price:[{Web3.from_wei(gas_price,'Gwei')}]Gwei gas_estimate:{gas_estimate} cost: {((gas_estimate * gas_price) + web3.to_wei(config.amout, "ether")) / 10**18} ETH'
)
gas_estimate = max(int(getattr(config, "gasLimit", 0)), int(gas_estimate))
eip_txn = getattr(config, "eip_txn", EIP_1559_TYPE)
if eip_txn == EIP_1559_TYPE:
txn_type_parm = {
"maxFeePerGas": maxFeePerGas,
"maxPriorityFeePerGas": maxPriorityFeePerGas,
}
elif eip_txn == LEGACY_TYPE:
txn_type_parm = {"gasPrice": gas_price}
transaction = {
**transaction,
"gas": int(gas_estimate),
**txn_type_parm,
"nonce": nonce,
"chainId": config.chainId,
}
# logger.info(transaction)
signed_txn = web3.eth.account.sign_transaction(
transaction, private_key=config.private_key
)
if getattr(config, "debug", False):
logger.debug(f"模拟发送信息,退出:{json.dumps(transaction,indent=2)}")
return
tx_hash = web3.eth.send_raw_transaction(signed_txn.raw_transaction)
logger.info(f"Transaction hash: {tx_hash.hex()}")
receipt = web3.eth.wait_for_transaction_receipt(tx_hash)
ret = "成功" if receipt.get("status") == 1 else "失败"
logger.info(
f"Transaction receipt: {config.wallet_address}[{tx_hash.hex()}] {'成功' if receipt.get('status') == 1 else '失败'}"
)
return tx_hash, ret
except Web3RPCError as e:
error_message = str(e)
if "insufficient funds" in error_message and getattr(config, "adjust", False):
logger.warning(f"余额不足,调整转账金额...{error_message}")
amount_wei = Web3.to_wei(config.amout, "ether")
match_overshot = re.search(r"overshot (\d+)", error_message)
match_have_want = re.search(r"have (\d+) want (\d+)", error_message)
if match_overshot:
overshot_wei = int(match_overshot.group(1))
gas_cost_wei = gas_estimate * gas_price
logger.info(
f"预计交易 Gas 费用: {Web3.from_wei(gas_cost_wei, 'ether')} ETH"
)
logger.info(f"交易超出: {Web3.from_wei(overshot_wei, 'ether')} ETH")
gas_buffer_wei = int(gas_cost_wei * 0.05) # 预留 5% buffer
new_amount_wei = amount_wei - overshot_wei - gas_buffer_wei
elif match_have_want:
have_wei = int(match_have_want.group(1))
want_wei = int(match_have_want.group(2))
shortage_wei = want_wei - have_wei # 计算缺少的金额
logger.info(f"资金缺口: {Web3.from_wei(shortage_wei, 'ether')} ETH")
new_amount_wei = amount_wei - shortage_wei # 直接减少 value 以适应余额
if new_amount_wei < 0:
logger.error(
f"余额不足,无法覆盖交易费用 [amount: {amount_wei}, overshot: {overshot_wei}, gas_buffer_wei: {gas_buffer_wei}]"
)
raise e
config.amout = Web3.from_wei(new_amount_wei, "ether")
logger.info(f"调整后的金额{Web3.from_wei(new_amount_wei, "ether")}")
config.adjust = True
time.sleep(0.5)
return send_raw_transaction(config)
raise e
def is_gas_accept(web3: Web3, accept_gas_price=0):
set_time = datetime.now(tz=tz.gettz("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")
accept_gas_price = max(
accept_gas_price, DEFAULT_ACCEPT_GAS_PRICE[web3.eth.chain_id]
)
assert accept_gas_price > 0, f"没有可设置的accept_gas_price"
while True:
gas_price = int(web3.eth.gas_price)
table = tabulate(
[
[
"accept_gas_price",
web3.to_wei(accept_gas_price, "ether"),
set_time,
],
[
"gas_price",
gas_price,
datetime.now(tz=tz.gettz("Asia/Shanghai")).strftime(
"%Y-%m-%d %H:%M:%S"
),
],
],
["Description", "Value (wei)", "Time"],
tablefmt="pretty",
stralign="left",
)
print(f"\r{table}", end="", flush=True)
for _ in range(5):
print("\033[A", end="")
if gas_price <= web3.to_wei(accept_gas_price, "ether"):
break
time.sleep(10)
for _ in range(10):
print("\033[B", end="")
print("\n")
return True
def direct_send_raw_transaction(
config: SimpleNamespace,
) -> tuple[HexBytes, Literal["成功", "失败"]] | Any:
logger.info(f"amout:{config.value} wallet_address:{config.wallet_address}")
request_kwargs = None
if hasattr(config, "request_kwargs"):
request_kwargs = config.request_kwargs
logger.info(request_kwargs)
web3 = Web3(Web3.HTTPProvider(config.rpc_url, request_kwargs=request_kwargs))
nonce = web3.eth.get_transaction_count(
Web3.to_checksum_address(config.wallet_address)
)
transaction = {
"to": Web3.to_checksum_address(config.to_address),
"from": Web3.to_checksum_address(config.wallet_address),
"value": config.value,
}
if hasattr(config, "data") and config.data:
transaction["data"] = config.data
transaction = {
**transaction,
"gas": int(config.gas),
"maxFeePerGas": int(config.maxFeePerGas),
"maxPriorityFeePerGas": int(config.maxPriorityFeePerGas),
"nonce": nonce,
"chainId": config.chainId,
}
signed_txn = web3.eth.account.sign_transaction(
transaction, private_key=config.private_key
)
tx_hash = web3.eth.send_raw_transaction(signed_txn.raw_transaction)
logger.info(f"Transaction hash: {tx_hash.hex()}")
receipt = web3.eth.wait_for_transaction_receipt(tx_hash)
ret = "成功" if receipt.get("status") == 1 else "失败"
logger.info(
f"Transaction receipt: {'成功' if receipt.get('status') == 1 else '失败'}"
)
return tx_hash, ret
def get_balance(config: SimpleNamespace):
web3 = Web3(Web3.HTTPProvider(config.rpc_url))
balance_wei = web3.eth.get_balance(Web3.to_checksum_address(config.wallet_address))
return Web3.from_wei(balance_wei, "ether")
def strip_0x_prefix(address):
return (
address[2:].lower()
if isinstance(address, str) and address.startswith("0x")
else address
)
def sign_message(private_key, msg, prefix="0x"):
from eth_account.messages import encode_defunct
from web3 import AsyncWeb3
w3 = AsyncWeb3(AsyncWeb3.AsyncHTTPProvider("https://cloudflare-eth.com"))
account = w3.eth.account.from_key(private_key)
signature = account.sign_message(encode_defunct(text=msg))
signature = signature.signature.hex()
return f"{prefix}{signature}"
def sign_typed_data(private_key, domain_data, message_types, message_data, prefix="0x"):
"""EIP-712 类型签名"""
signed_typed_data = Account.sign_typed_data(
private_key, domain_data, message_types, message_data
)
logger.info(signed_typed_data.message_hash)
return f"{prefix}{signed_typed_data.signature.hex()}"
def abi_encode_data(function_signature, types, args):
method_id = function_signature_to_4byte_selector(function_signature)
encoded_params = encode(types, args)
final_calldata = to_hex(method_id + encoded_params)
# logger.info(final_calldata)
return final_calldata
def thread_start(batch, execute, workers_num=3, delay=2, **kwargs):
task_queue = queue.Queue()
time.sleep(5)
for data in batch:
task_queue.put(data)
def submit_next_task(
executor: ThreadPoolExecutor, initial_limit=workers_num, delay=delay
):
count = 0
while not task_queue.empty():
data = task_queue.get()
# 限制初始任务提交速率
if count < initial_limit:
time.sleep(delay)
count += 1
future = executor.submit(execute, data, **kwargs)
future.add_done_callback(lambda _: task_queue.task_done())
# 等待所有任务完成
task_queue.join()
with ThreadPoolExecutor(max_workers=workers_num) as executor:
submit_next_task(executor)
task_queue.join()
logger.info("All tasks completed")
def multiprocess_start(batch, execute, workers_num=3, delay=2):
task_queue = JoinableQueue()
for data in batch:
task_queue.put(data)
def submit_next_task(
executor: ProcessPoolExecutor, initial_limit=workers_num, delay=delay
):
count = 0
while not task_queue.empty():
data = task_queue.get()
# 限制初始任务提交速率
if count < initial_limit:
time.sleep(delay)
count += 1
future = executor.submit(execute, data)
future.add_done_callback(lambda _: task_queue.task_done())
task_queue.join()
with ProcessPoolExecutor(max_workers=workers_num) as executor:
try:
submit_next_task(executor)
except KeyboardInterrupt:
# executor.shutdown(wait=False, cancel_futures=True)
os.kill(os.getpid(), signal.SIGTERM)
# finally:
# executor.shutdown(wait=True)
logger.info("All tasks completed")
def worker(execute):
@wraps(execute)
def wrapper(data):
try:
if asyncio.iscoroutinefunction(execute):
need_close = False
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
need_close = True
task = loop.create_task(execute(data))
task.set_name(f"MARK-Task-{threading.get_ident()}")
loop.run_until_complete(task)
if need_close:
loop.close()
else:
execute(data) # 执行同步函数
except Exception as e:
logger.exception("Error occurred during execution: %s", e)
return wrapper
def get_identifier_with_file(file_path) -> str:
path = Path(file_path)
return path.parent.parent.name + "." + path.parent.name + "." + path.stem
def generate_random_amount(min_val: float, max_val: float, precision=9):
"""
生成一个随机金额,并将其格式化为指定精度,返回格式化后的浮动值和对应的十六进制字符串。
:param min_val: 随机数的最小值
:param max_val: 随机数的最大值
:param precision: 保留的精度默认为9位
:return: 返回格式化的浮动值和对应的十六进制字符串
"""
amt = random.uniform(float(min_val), float(max_val)) # 在指定范围内生成随机浮动数
formatted_value = float(f"{amt:.{precision}f}") # 按照指定精度格式化
scaled_value = int(formatted_value * 10 ** (precision)) # 按照精度放大
return formatted_value, hex(scaled_value)[2:]
def balance_of(rpc, token_address, owner_address):
rpc = rpc if isinstance(rpc, Web3) else Web3(Web3.HTTPProvider(rpc))
abi = json.loads(
'[{"inputs":[{"internalType":"address","name":"user","type":"address"}],"name":"balanceOf","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"}]'
)
contract = rpc.eth.contract(
address=Web3.to_checksum_address(token_address), abi=abi
)
ret = contract.functions.balanceOf(Web3.to_checksum_address(owner_address)).call()
logger.success(ret)
return ret
def balance(rpc, address, unit: Literal["ether"] = "ether"):
rpc = rpc if isinstance(rpc, Web3) else Web3(Web3.HTTPProvider(rpc))
balance_wei = rpc.eth.get_balance(Web3.to_checksum_address(address))
ret = Web3.from_wei(balance_wei, unit)
logger.success(ret)
return ret
def get_transactions(rpc, address, session=None, **kwargs):
session = session if session else requests.Session(impersonate="chrome124")
# url = f"{rpc}/addresses/{address}/transactions"
url = urljoin(rpc, f"addresses/{address}/transactions")
headers = {"accept": "application/json"}
response = session.get(url, headers=headers, params=kwargs)
try:
response.raise_for_status()
except Exception as e:
logger.error(response.text)
raise e
return response.json()
def check_date(date_string: str | None | datetime, delta: int | timedelta = 0) -> bool:
def _parse_date(date_input):
if not date_input:
return None
if isinstance(date_input, datetime):
return date_input.date()
if isinstance(date_input, int):
if date_input > 10**10: # 判断是否是毫秒级时间戳
date_input = date_input / 1000 # 转换为秒级
return datetime.fromtimestamp(date_input).date()
match = re.search(r"\d{4}-\d{2}-\d{2}", date_input)
if match:
return datetime.strptime(match.group(0), "%Y-%m-%d").date()
return None
date_obj = _parse_date(date_string)
if not date_obj:
return False
if isinstance(delta, (int, float)):
delta = timedelta(days=delta)
target_date = datetime.today().date() + delta
return date_obj >= target_date if delta.days else date_obj == target_date
is_today = check_date
def get_nested_value(data: Dict, path: List[str]) -> Optional[str]:
"""根据路径列表获取嵌套字典的值"""
_data = copy.deepcopy(data)
for key in path:
if isinstance(_data, dict) and key in _data:
_data = _data[key]
else:
return None
return _data.lower() if isinstance(_data, str) else None
def get_latest_tx_time(
transactions: list,
to_address: Optional[Union[str, List[str]]] = None,
need_success=True,
to_address_path=["to", "hash"],
) -> Tuple[Optional[Dict], Union[Literal["无交易记录"], datetime]]:
"""
获取最新的交易记录时间,可筛选特定地址和是否仅筛选成功交易。
:param transactions: 交易列表,每个交易是一个字典,包含 'status', 'timestamp', 'to' 等字段。
:param to_address: 目标地址(可选)。
:param need_success: 是否仅筛选成功交易,默认为 True。
:return: (最新交易, 最新交易时间);如果无匹配交易,返回 (None, "无交易记录")。
"""
address_list = (
[to_address.lower()]
if isinstance(to_address, str)
else (
[addr.lower() for addr in to_address]
if isinstance(to_address, list)
else []
)
)
filtered_txs = [
tx
for tx in transactions
if (to_address is None or get_nested_value(tx, to_address_path) in address_list)
and (not need_success or tx.get("status") in ("ok", "success", "1", 1))
]
if not filtered_txs:
return None, "无交易记录"
latest_tx = max(filtered_txs, key=lambda tx: tx["timestamp"])
return latest_tx, latest_tx["timestamp"]
def apply_slippage(amount: Wei, slippage_tolerance: Fraction) -> Wei:
"""
根据滑点容忍度计算扣除后的实际输出金额。
:param amount: 原始金额 (整数)
:param slippage_tolerance: 滑点容忍度 (Fraction),例如 Fraction(5, 1000) 表示 0.5%
:return: 调整后的金额 (整数)
"""
factor = Fraction(1) / (Fraction(1) + slippage_tolerance)
return Web3.to_wei(int(factor * amount), "wei")
def kill_process_by_port(port: int):
"""根据端口号杀死进程"""
for conn in psutil.net_connections(kind="inet"):
if conn.laddr.port == port:
try:
p = psutil.Process(conn.pid)
p.terminate() # 先尝试优雅终止
p.wait(timeout=3) # 等待进程退出
except psutil.NoSuchProcess:
logger.info(f"进程 {conn.pid} 已不存在")
except psutil.TimeoutExpired:
p.kill() # 强制杀死进程
logger.info(f"进程 {conn.pid} 强制杀死")
else:
logger.info(f"成功杀死进程 {conn.pid}")
return
logger.info(f"未找到占用端口 {port} 的进程")
def get_cdp_url(port):
url = f"http://localhost:{port}/json/version"
response = requests.get(url)
cdp_url = response.json().get("webSocketDebuggerUrl")
logger.success(cdp_url)
return cdp_url
cache = Cache(Path(__file__).resolve().parent.parent / "cache")