# -*- coding: utf-8 -*- """ 技术指标:EMA、ATR """ from typing import List, Dict, Optional def ema(series: List[float], period: int) -> List[Optional[float]]: """EMA(period),前 period-1 个为 None""" if not series or period < 1: return [] k = 2.0 / (period + 1) out: List[Optional[float]] = [None] * (period - 1) s = sum(series[:period]) out.append(s / period) for i in range(period, len(series)): val = series[i] * k + out[-1] * (1 - k) # type: ignore out.append(val) return out def atr(high: List[float], low: List[float], close: List[float], period: int) -> List[Optional[float]]: """ATR(period),前 period 个为 None""" n = len(close) if n < period + 1 or len(high) != n or len(low) != n: return [None] * n tr_list: List[float] = [] for i in range(n): if i == 0: tr_list.append(high[0] - low[0]) else: tr = max( high[i] - low[i], abs(high[i] - close[i - 1]), abs(low[i] - close[i - 1]) ) tr_list.append(tr) return ema(tr_list, period) def get_ema_atr_from_klines(klines: List[Dict], ema_period: int, atr_period: int ) -> tuple: """ 从K线列表计算收盘价 EMA 和 ATR。 返回 (ema_list, atr_list),长度与 klines 一致。 """ close = [float(k["close"]) for k in klines] high = [float(k["high"]) for k in klines] low = [float(k["low"]) for k in klines] ema_list = ema(close, ema_period) atr_list = atr(high, low, close, atr_period) return ema_list, atr_list def align_higher_tf_ema( klines_5m: List[Dict], klines_higher: List[Dict], ema_fast: int, ema_slow: int ) -> List[Dict]: """ 根据更高周期K线计算 EMA,并按 5 分钟时间对齐。 返回列表长度与 klines_5m 一致,每项为 {"ema_fast": float, "ema_slow": float} 或 None。 """ if not klines_higher or len(klines_higher) < ema_slow: return [{}] * len(klines_5m) close_hi = [float(k["close"]) for k in klines_higher] ema_f = ema(close_hi, ema_fast) ema_s = ema(close_hi, ema_slow) id_hi = [k["id"] for k in klines_higher] result = [] for k5 in klines_5m: t = k5["id"] # 当前 5m 时刻所属的更高周期:取 <= t 的最后一根 idx = -1 for i, tid in enumerate(id_hi): if tid <= t: idx = i else: break if idx >= ema_slow - 1 and ema_f[idx] is not None and ema_s[idx] is not None: result.append({"ema_fast": ema_f[idx], "ema_slow": ema_s[idx]}) else: result.append({}) return result