105 lines
3.6 KiB
Python
105 lines
3.6 KiB
Python
from __future__ import annotations
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
|
|
def ema(s: pd.Series, span: int) -> pd.Series:
|
|
return s.ewm(span=span, adjust=False).mean()
|
|
|
|
|
|
def rsi(close: pd.Series, period: int) -> pd.Series:
|
|
delta = close.diff()
|
|
up = delta.clip(lower=0.0)
|
|
down = (-delta).clip(lower=0.0)
|
|
|
|
roll_up = up.ewm(alpha=1 / period, adjust=False).mean()
|
|
roll_down = down.ewm(alpha=1 / period, adjust=False).mean()
|
|
|
|
rs = roll_up / roll_down.replace(0.0, np.nan)
|
|
return 100.0 - (100.0 / (1.0 + rs))
|
|
|
|
|
|
def atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int) -> pd.Series:
|
|
prev_close = close.shift(1)
|
|
tr = pd.concat(
|
|
[
|
|
(high - low).abs(),
|
|
(high - prev_close).abs(),
|
|
(low - prev_close).abs(),
|
|
],
|
|
axis=1,
|
|
).max(axis=1)
|
|
return tr.ewm(alpha=1 / period, adjust=False).mean()
|
|
|
|
|
|
def bollinger(close: pd.Series, window: int, n_std: float):
|
|
mid = close.rolling(window=window, min_periods=window).mean()
|
|
std = close.rolling(window=window, min_periods=window).std(ddof=0)
|
|
upper = mid + n_std * std
|
|
lower = mid - n_std * std
|
|
width = (upper - lower) / mid
|
|
return mid, upper, lower, width
|
|
|
|
|
|
def macd(close: pd.Series, fast: int, slow: int, signal: int):
|
|
fast_ema = ema(close, fast)
|
|
slow_ema = ema(close, slow)
|
|
line = fast_ema - slow_ema
|
|
sig = ema(line, signal)
|
|
hist = line - sig
|
|
return line, sig, hist
|
|
|
|
|
|
def stochastic(high: pd.Series, low: pd.Series, close: pd.Series,
|
|
k_period: int = 14, d_period: int = 3):
|
|
"""Stochastic Oscillator (%K and %D)."""
|
|
lowest = low.rolling(window=k_period, min_periods=k_period).min()
|
|
highest = high.rolling(window=k_period, min_periods=k_period).max()
|
|
denom = highest - lowest
|
|
k = 100.0 * (close - lowest) / denom.replace(0.0, np.nan)
|
|
d = k.rolling(window=d_period, min_periods=d_period).mean()
|
|
return k, d
|
|
|
|
|
|
def cci(high: pd.Series, low: pd.Series, close: pd.Series,
|
|
period: int = 20) -> pd.Series:
|
|
"""Commodity Channel Index."""
|
|
tp = (high + low + close) / 3.0
|
|
sma = tp.rolling(window=period, min_periods=period).mean()
|
|
mad = tp.rolling(window=period, min_periods=period).apply(
|
|
lambda x: np.mean(np.abs(x - np.mean(x))), raw=True
|
|
)
|
|
return (tp - sma) / (0.015 * mad.replace(0.0, np.nan))
|
|
|
|
|
|
def adx(high: pd.Series, low: pd.Series, close: pd.Series,
|
|
period: int = 14) -> pd.Series:
|
|
"""Average Directional Index (returns ADX line only)."""
|
|
up_move = high.diff()
|
|
down_move = -low.diff()
|
|
|
|
plus_dm = pd.Series(np.where((up_move > down_move) & (up_move > 0), up_move, 0.0),
|
|
index=high.index)
|
|
minus_dm = pd.Series(np.where((down_move > up_move) & (down_move > 0), down_move, 0.0),
|
|
index=high.index)
|
|
|
|
atr_val = atr(high, low, close, period)
|
|
|
|
plus_di = 100.0 * plus_dm.ewm(alpha=1 / period, adjust=False).mean() / atr_val.replace(0.0, np.nan)
|
|
minus_di = 100.0 * minus_dm.ewm(alpha=1 / period, adjust=False).mean() / atr_val.replace(0.0, np.nan)
|
|
|
|
dx = 100.0 * (plus_di - minus_di).abs() / (plus_di + minus_di).replace(0.0, np.nan)
|
|
adx_line = dx.ewm(alpha=1 / period, adjust=False).mean()
|
|
return adx_line
|
|
|
|
|
|
def keltner_channel(high: pd.Series, low: pd.Series, close: pd.Series,
|
|
ema_period: int = 20, atr_period: int = 14, atr_mult: float = 1.5):
|
|
"""Keltner Channel (mid, upper, lower)."""
|
|
mid = ema(close, ema_period)
|
|
atr_val = atr(high, low, close, atr_period)
|
|
upper = mid + atr_mult * atr_val
|
|
lower = mid - atr_mult * atr_val
|
|
return mid, upper, lower
|