suchkow's picture
Upload 48 files
25e7dcb verified
import pandas as pd
import numpy as np
import os
def add_sentiment(stock_data: pd.DataFrame, file_name) -> pd.DataFrame:
file_path = os.path.join(os.path.dirname(__file__), file_name)
sentiment_data = pd.read_csv(file_path, index_col='date', parse_dates=['date'])
merged_df = pd.merge(stock_data, sentiment_data, left_index=True, right_index=True, how='left')
return merged_df
def calc_rsi(over: pd.Series, fn_roll: callable, scale_down: bool = True) -> pd.Series:
# Get the difference in price from previous step
delta = over.diff()
# Get rid of the first row, which is NaN since it did not have a previous row to calculate the differences
delta = delta[1:]
# Make the positive gains (up) and negative gains (down) Series
up, down = delta.clip(lower=0), delta.clip(upper=0).abs()
roll_up, roll_down = fn_roll(up), fn_roll(down)
rs = roll_up / roll_down
rsi = 100.0 - (100.0 / (1.0 + rs))
# Avoid division-by-zero if `roll_down` is zero
# This prevents inf and/or nan values.
rsi[:] = np.select([roll_down == 0, roll_up == 0, True], [100, 0, rsi])
# rsi = rsi.case_when([((roll_down == 0), 100), ((roll_up == 0), 0)])
# This alternative to np.select works only for pd.__version__ >= 2.2.0.
rsi.name = 'rsi'
# Assert range
valid_rsi = rsi[13:]
assert ((0 <= valid_rsi) & (valid_rsi <= 100)).all()
# Note: rsi[:length - 1] is excluded from above assertion because it is NaN for SMA.
rsi = rsi.reindex(over.index)
if scale_down:
rsi = rsi / 100
return rsi
def scale_stock_data(data: pd.DataFrame, scale_volume: float, scale_price: float) -> pd.DataFrame:
data['volume'] = data['volume'] / scale_volume
data['open'] = data['open'] / scale_price
data['high'] = data['high'] / scale_price
data['low'] = data['low'] / scale_price
data['close'] = data['close'] / scale_price
data['adj_close'] = data['adj_close'] / scale_price
return data
def calculate_indicators(prices_dataframe: pd.DataFrame, use_regular_close=False) -> pd.DataFrame:
if use_regular_close:
col = 'close'
prices_dataframe = prices_dataframe.drop(columns=['adj_close'])
else:
col = 'adj_close'
prices_dataframe['return'] = prices_dataframe[col].pct_change(1).iloc[1:]
prices_dataframe['log1p_return'] = np.log1p(prices_dataframe['return'])
prices_dataframe['rsi_ema'] = calc_rsi(prices_dataframe[col], lambda s: s.ewm(span=14).mean())
# prices_dataframe['rsi_sma'] = calc_rsi(prices_dataframe[col], lambda s: s.rolling(14).mean())
# prices_dataframe['smstd_20'] = prices_dataframe[col].rolling(window=20).std()
# prices_dataframe['sma_20'] = prices_dataframe[col].rolling(window=20).mean()
prices_dataframe['ewma_20'] = prices_dataframe[col].ewm(span=20).mean()
prices_dataframe['ewma_60'] = prices_dataframe[col].ewm(span=60).mean()
prices_dataframe['ewmstd_20'] = prices_dataframe[col].ewm(span=20).std()
prices_dataframe['macd'] = prices_dataframe[col].ewm(span=12).mean() - prices_dataframe[col].ewm(span=26).mean()
# prices_dataframe['upper_band_sma'] = prices_dataframe.ewma_20 + (2 * prices_dataframe.ewmstd_20)
# prices_dataframe['lower_band_sma'] = prices_dataframe.sma_20 - (2 * prices_dataframe.ewmstd_20)
# prices_dataframe['upper_band_ewma'] = prices_dataframe.ewma_20 + (2 * prices_dataframe.ewmstd_20)
# prices_dataframe['lower_band_ewma'] = prices_dataframe.ewma_20 - (2 * prices_dataframe.ewmstd_20)
return prices_dataframe