|
import pandas as pd
|
|
import numpy as np
|
|
import os
|
|
|
|
|
|
def add_sentiment(stock_data: pd.DataFrame, file_name) -> pd.DataFrame:
|
|
file_path = os.path.join(os.path.dirname(__file__), file_name)
|
|
sentiment_data = pd.read_csv(file_path, index_col='date', parse_dates=['date'])
|
|
merged_df = pd.merge(stock_data, sentiment_data, left_index=True, right_index=True, how='left')
|
|
return merged_df
|
|
|
|
|
|
def calc_rsi(over: pd.Series, fn_roll: callable, scale_down: bool = True) -> pd.Series:
|
|
|
|
delta = over.diff()
|
|
|
|
delta = delta[1:]
|
|
|
|
|
|
up, down = delta.clip(lower=0), delta.clip(upper=0).abs()
|
|
|
|
roll_up, roll_down = fn_roll(up), fn_roll(down)
|
|
rs = roll_up / roll_down
|
|
rsi = 100.0 - (100.0 / (1.0 + rs))
|
|
|
|
|
|
|
|
rsi[:] = np.select([roll_down == 0, roll_up == 0, True], [100, 0, rsi])
|
|
|
|
|
|
rsi.name = 'rsi'
|
|
|
|
|
|
valid_rsi = rsi[13:]
|
|
assert ((0 <= valid_rsi) & (valid_rsi <= 100)).all()
|
|
|
|
rsi = rsi.reindex(over.index)
|
|
if scale_down:
|
|
rsi = rsi / 100
|
|
return rsi
|
|
|
|
|
|
def scale_stock_data(data: pd.DataFrame, scale_volume: float, scale_price: float) -> pd.DataFrame:
|
|
data['volume'] = data['volume'] / scale_volume
|
|
data['open'] = data['open'] / scale_price
|
|
data['high'] = data['high'] / scale_price
|
|
data['low'] = data['low'] / scale_price
|
|
data['close'] = data['close'] / scale_price
|
|
data['adj_close'] = data['adj_close'] / scale_price
|
|
return data
|
|
|
|
|
|
def calculate_indicators(prices_dataframe: pd.DataFrame, use_regular_close=False) -> pd.DataFrame:
|
|
if use_regular_close:
|
|
col = 'close'
|
|
prices_dataframe = prices_dataframe.drop(columns=['adj_close'])
|
|
else:
|
|
col = 'adj_close'
|
|
prices_dataframe['return'] = prices_dataframe[col].pct_change(1).iloc[1:]
|
|
prices_dataframe['log1p_return'] = np.log1p(prices_dataframe['return'])
|
|
|
|
prices_dataframe['rsi_ema'] = calc_rsi(prices_dataframe[col], lambda s: s.ewm(span=14).mean())
|
|
|
|
|
|
|
|
|
|
prices_dataframe['ewma_20'] = prices_dataframe[col].ewm(span=20).mean()
|
|
prices_dataframe['ewma_60'] = prices_dataframe[col].ewm(span=60).mean()
|
|
prices_dataframe['ewmstd_20'] = prices_dataframe[col].ewm(span=20).std()
|
|
prices_dataframe['macd'] = prices_dataframe[col].ewm(span=12).mean() - prices_dataframe[col].ewm(span=26).mean()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return prices_dataframe
|
|
|