Files
lm_code/adaptive_third_strategy/indicators.py
ddrwode 970080a2e6 hahaa
2026-01-31 10:35:25 +08:00

87 lines
2.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
技术指标EMA、ATR
"""
from typing import List, Dict, Optional
def ema(series: List[float], period: int) -> List[Optional[float]]:
"""EMA(period),前 period-1 个为 None"""
if not series or period < 1:
return []
k = 2.0 / (period + 1)
out: List[Optional[float]] = [None] * (period - 1)
s = sum(series[:period])
out.append(s / period)
for i in range(period, len(series)):
val = series[i] * k + out[-1] * (1 - k) # type: ignore
out.append(val)
return out
def atr(high: List[float], low: List[float], close: List[float], period: int) -> List[Optional[float]]:
"""ATR(period),前 period 个为 None"""
n = len(close)
if n < period + 1 or len(high) != n or len(low) != n:
return [None] * n
tr_list: List[float] = []
for i in range(n):
if i == 0:
tr_list.append(high[0] - low[0])
else:
tr = max(
high[i] - low[i],
abs(high[i] - close[i - 1]),
abs(low[i] - close[i - 1])
)
tr_list.append(tr)
return ema(tr_list, period)
def get_ema_atr_from_klines(klines: List[Dict], ema_period: int, atr_period: int
) -> tuple:
"""
从K线列表计算收盘价 EMA 和 ATR。
返回 (ema_list, atr_list),长度与 klines 一致。
"""
close = [float(k["close"]) for k in klines]
high = [float(k["high"]) for k in klines]
low = [float(k["low"]) for k in klines]
ema_list = ema(close, ema_period)
atr_list = atr(high, low, close, atr_period)
return ema_list, atr_list
def align_higher_tf_ema(
klines_5m: List[Dict],
klines_higher: List[Dict],
ema_fast: int,
ema_slow: int
) -> List[Dict]:
"""
根据更高周期K线计算 EMA并按 5 分钟时间对齐。
返回列表长度与 klines_5m 一致,每项为 {"ema_fast": float, "ema_slow": float} 或 None。
"""
if not klines_higher or len(klines_higher) < ema_slow:
return [{}] * len(klines_5m)
close_hi = [float(k["close"]) for k in klines_higher]
ema_f = ema(close_hi, ema_fast)
ema_s = ema(close_hi, ema_slow)
id_hi = [k["id"] for k in klines_higher]
result = []
for k5 in klines_5m:
t = k5["id"]
# 当前 5m 时刻所属的更高周期:取 <= t 的最后一根
idx = -1
for i, tid in enumerate(id_hi):
if tid <= t:
idx = i
else:
break
if idx >= ema_slow - 1 and ema_f[idx] is not None and ema_s[idx] is not None:
result.append({"ema_fast": ema_f[idx], "ema_slow": ema_s[idx]})
else:
result.append({})
return result