Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GH-148] Add Correlation Trend Indicator #149

Merged
merged 1 commit into from
Jun 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[![codecov](https://codecov.io/gh/jealous/stockstats/branch/master/graph/badge.svg?token=IFMD1pVJ7T)](https://codecov.io/gh/jealous/stockstats)
[![pypi](https://img.shields.io/pypi/v/stockstats.svg)](https://pypi.python.org/pypi/stockstats)

VERSION: 0.5.5
VERSION: 0.6.0

## Introduction

Expand Down Expand Up @@ -63,6 +63,8 @@ Supported statistics/indicators are:
* ROC: Rate of Change
* Coppock: Coppock Curve
* Ichimoku: Ichimoku Cloud
* CTI: Correlation Trend Indicator
* LRMA: Linear Regression Moving Average

## Installation

Expand Down Expand Up @@ -837,6 +839,31 @@ Examples:
* `df['ichimoku_7,22,44']` returns the ichimoku cloud width with window sizes
7, 22, 44

#### [Linear Regression Moving Average](https://www.daytrading.com/moving-linear-regression)

Linear regression works by taking various data points in a sample and
providing a “best fit” line to match the general trend in the data.

Implementation reference:

https://github.com/twopirllc/pandas-ta/blob/main/pandas_ta/overlap/linreg.py

Examples:
* `df['close_10_lrma']` linear regression of close price with window size 10

#### [Correlation Trend Indicator](https://tlc.thinkorswim.com/center/reference/Tech-Indicators/studies-library/C-D/CorrelationTrendIndicator)

Correlation Trend Indicator is a study that estimates
the current direction and strength of a trend.

Implementation is based on the following code:

https://github.com/twopirllc/pandas-ta/blob/main/pandas_ta/momentum/cti.py

Examples:
* `df['cti']` returns the CTI of close price with window 12
* `df['high_5_cti']` returns the CTI of high price with window 5

## Issues

We use [Github Issues](https://github.com/jealous/stockstats/issues) to track
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
numpy>=1.16.6
pandas>=0.24.2
96 changes: 89 additions & 7 deletions stockstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

__author__ = 'Cedric Zhuang'

from numpy.lib.stride_tricks import as_strided


def wrap(df, index_column=None):
""" wraps a pandas DataFrame to StockDataFrame
Expand Down Expand Up @@ -114,6 +116,8 @@ class StockDataFrame(pd.DataFrame):

ICHIMOKU = (9, 26, 52)

CTI = 12

MULTI_SPLIT_INDICATORS = ("kama",)

# End of options
Expand Down Expand Up @@ -163,12 +167,14 @@ def _get_p(self, column, shifts):
self.set_nan(cp, shifts)
self[column_name] = cp

def to_ints(self, shifts):
items = map(self._process_shifts_segment, shifts.split(','))
@classmethod
def to_ints(cls, shifts):
items = map(cls._process_shifts_segment, shifts.split(','))
return sorted(list(set(itertools.chain(*items))))

def to_int(self, shifts):
numbers = self.to_ints(shifts)
@classmethod
def to_int(cls, shifts):
numbers = cls.to_ints(shifts)
if len(numbers) != 1:
raise IndexError("only accept 1 number.")
return numbers[0]
Expand Down Expand Up @@ -983,13 +989,24 @@ def _get_sma(self, column, windows):
""" get simple moving average

:param column: column to calculate
:param windows: collection of window of simple moving average
:param windows: window of simple moving average
:return: None
"""
window = self.get_int_positive(windows)
column_name = '{}_{}_sma'.format(column, window)
self[column_name] = self.sma(self[column], window)

def _get_lrma(self, column, window):
""" get linear regression moving average

:param column: column to calculate
:param window: window size
:return: None
"""
window = self.get_int_positive(window)
column_name = '{}_{}_lrma'.format(column, window)
self[column_name] = self.linear_reg(self[column], window)

def _get_roc(self, column, window):
"""get Rate of Change (ROC) of a column

Expand Down Expand Up @@ -1044,6 +1061,69 @@ def _compute(x):
rolling = cls._rolling(series, window)
return rolling.apply(linear(weights), raw=True)

@classmethod
def linear_reg(cls,
series,
window,
correlation=False):
window = cls.get_int_positive(window)

x = range(1, window + 1)
x_sum = 0.5 * window * (window + 1)
x2_sum = x_sum * (2 * window + 1) / 3
divisor = window * x2_sum - x_sum * x_sum

def linear_regression(s):
y_sum = s.sum()
xy_sum = (x * s).sum()

m = (window * xy_sum - x_sum * y_sum) / divisor
b = (y_sum * x2_sum - x_sum * xy_sum) / divisor

if correlation:
y2_sum = (s * s).sum()
rn = window * xy_sum - x_sum * y_sum
rd = (divisor * (window * y2_sum - y_sum * y_sum)) ** 0.5
return rn / rd
return m * (window - 1) + b

def rolling(arr):
strides = arr.strides + (arr.strides[-1],)
shape = arr.shape[:-1] + (arr.shape[-1] - window + 1, window)
return as_strided(arr, shape=shape, strides=strides)

value = [linear_regression(_)
for _ in rolling(np.array(series))]
ret = pd.Series([0.0] * (window - 1) + value,
index=series.index)
return ret

def _get_cti(self, column=None, window=None):
""" get correlation trend indicator

Correlation Trend Indicator is a study that estimates
the current direction and strength of a trend.
https://tlc.thinkorswim.com/center/reference/Tech-Indicators/studies-library/C-D/CorrelationTrendIndicator

:param column: column to calculate, default to 'close'
:param window: window of Correlation Trend Indicator
"""
if column is None and window is None:
column_name = 'cti'
else:
column_name = '{}_{}_cti'.format(column, window)

if column is None:
column = 'close'
if window is None:
window = self.CTI
else:
window = self.get_int_positive(window)

value = self.linear_reg(
self[column], window, correlation=True)
self[column_name] = value

def _get_ema(self, column, windows):
""" get exponential moving average

Expand Down Expand Up @@ -1151,11 +1231,12 @@ def _get_coppock(self, windows=None):
roc_ema = self.linear_wma(fast_roc + slow_roc, window)
self[column_name] = roc_ema

def get_int_positive(self, windows):
@classmethod
def get_int_positive(cls, windows):
if isinstance(windows, int):
window = windows
else:
window = self.to_int(windows)
window = cls.to_int(windows)
if window <= 0:
raise IndexError("window must be greater than 0")
return window
Expand Down Expand Up @@ -1585,6 +1666,7 @@ def handler(self):
('cmo',): self._get_cmo,
('coppock',): self._get_coppock,
('ichimoku',): self._get_ichimoku,
('cti',): self._get_cti,
}

def __init_not_exist_column(self, key):
Expand Down
36 changes: 36 additions & 0 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,3 +838,39 @@ def test_coppock(self):
assert_that(c2[20110117], equal_to(0))
assert_that(c2[20110221], near_to(4.649))
assert_that(c2[20110324], near_to(-2.177))

@staticmethod
def test_linear_regression_raw():
arr = [1, 5, 7, 2, 4, 3, 7, 9, 2]
series = pd.Series(arr)
lg = StockDataFrame.linear_reg(series, 5)
assert_that(lg.iloc[3], equal_to(0.0))
assert_that(lg.iloc[8], equal_to(5.2))

cr = StockDataFrame.linear_reg(
series, 5, correlation=True)
assert_that(cr.iloc[3], equal_to(0.0))
assert_that(cr.iloc[8], near_to(0.108))

def test_linear_regression(self):
stock = self.get_stock_90days()
lr = stock['close_10_lrma']
assert_that(lr[20110114], equal_to(0))
assert_that(lr[20110127], near_to(12.782))

def test_cti(self):
stock = self.get_stock_90days()
cti = stock['cti']
assert_that(cti[20110118], equal_to(0))
assert_that(cti[20110131], near_to(-0.113))
assert_that(cti[20110215], near_to(0.369))

cti = stock['close_12_cti']
assert_that(cti[20110118], equal_to(0))
assert_that(cti[20110131], near_to(-0.113))
assert_that(cti[20110215], near_to(0.369))

cti = stock['high_10_cti']
assert_that(cti[20110118], near_to(-0.006))
assert_that(cti[20110131], near_to(-0.043))
assert_that(cti[20110215], near_to(0.5006))