Files
lm_code/adaptive_third_strategy/indicators.py

87 lines
2.7 KiB
Python
Raw Normal View History

2026-01-31 10:35:25 +08:00
# -*- coding: utf-8 -*-
"""
技术指标EMAATR
"""
from typing import List, Dict, Optional
def ema(series: List[float], period: int) -> List[Optional[float]]:
"""EMA(period),前 period-1 个为 None"""
if not series or period < 1:
return []
k = 2.0 / (period + 1)
out: List[Optional[float]] = [None] * (period - 1)
s = sum(series[:period])
out.append(s / period)
for i in range(period, len(series)):
val = series[i] * k + out[-1] * (1 - k) # type: ignore
out.append(val)
return out
def atr(high: List[float], low: List[float], close: List[float], period: int) -> List[Optional[float]]:
"""ATR(period),前 period 个为 None"""
n = len(close)
if n < period + 1 or len(high) != n or len(low) != n:
return [None] * n
tr_list: List[float] = []
for i in range(n):
if i == 0:
tr_list.append(high[0] - low[0])
else:
tr = max(
high[i] - low[i],
abs(high[i] - close[i - 1]),
abs(low[i] - close[i - 1])
)
tr_list.append(tr)
return ema(tr_list, period)
def get_ema_atr_from_klines(klines: List[Dict], ema_period: int, atr_period: int
) -> tuple:
"""
从K线列表计算收盘价 EMA ATR
返回 (ema_list, atr_list)长度与 klines 一致
"""
close = [float(k["close"]) for k in klines]
high = [float(k["high"]) for k in klines]
low = [float(k["low"]) for k in klines]
ema_list = ema(close, ema_period)
atr_list = atr(high, low, close, atr_period)
return ema_list, atr_list
def align_higher_tf_ema(
klines_5m: List[Dict],
klines_higher: List[Dict],
ema_fast: int,
ema_slow: int
) -> List[Dict]:
"""
根据更高周期K线计算 EMA并按 5 分钟时间对齐
返回列表长度与 klines_5m 一致每项为 {"ema_fast": float, "ema_slow": float} None
"""
if not klines_higher or len(klines_higher) < ema_slow:
return [{}] * len(klines_5m)
close_hi = [float(k["close"]) for k in klines_higher]
ema_f = ema(close_hi, ema_fast)
ema_s = ema(close_hi, ema_slow)
id_hi = [k["id"] for k in klines_higher]
result = []
for k5 in klines_5m:
t = k5["id"]
# 当前 5m 时刻所属的更高周期:取 <= t 的最后一根
idx = -1
for i, tid in enumerate(id_hi):
if tid <= t:
idx = i
else:
break
if idx >= ema_slow - 1 and ema_f[idx] is not None and ema_s[idx] is not None:
result.append({"ema_fast": ema_f[idx], "ema_slow": ema_s[idx]})
else:
result.append({})
return result