import pandas as pd import numpy as np import os def add_sentiment(stock_data: pd.DataFrame, file_name) -> pd.DataFrame: file_path = os.path.join(os.path.dirname(__file__), file_name) sentiment_data = pd.read_csv(file_path, index_col='date', parse_dates=['date']) merged_df = pd.merge(stock_data, sentiment_data, left_index=True, right_index=True, how='left') return merged_df def calc_rsi(over: pd.Series, fn_roll: callable, scale_down: bool = True) -> pd.Series: # Get the difference in price from previous step delta = over.diff() # Get rid of the first row, which is NaN since it did not have a previous row to calculate the differences delta = delta[1:] # Make the positive gains (up) and negative gains (down) Series up, down = delta.clip(lower=0), delta.clip(upper=0).abs() roll_up, roll_down = fn_roll(up), fn_roll(down) rs = roll_up / roll_down rsi = 100.0 - (100.0 / (1.0 + rs)) # Avoid division-by-zero if `roll_down` is zero # This prevents inf and/or nan values. rsi[:] = np.select([roll_down == 0, roll_up == 0, True], [100, 0, rsi]) # rsi = rsi.case_when([((roll_down == 0), 100), ((roll_up == 0), 0)]) # This alternative to np.select works only for pd.__version__ >= 2.2.0. rsi.name = 'rsi' # Assert range valid_rsi = rsi[13:] assert ((0 <= valid_rsi) & (valid_rsi <= 100)).all() # Note: rsi[:length - 1] is excluded from above assertion because it is NaN for SMA. rsi = rsi.reindex(over.index) if scale_down: rsi = rsi / 100 return rsi def scale_stock_data(data: pd.DataFrame, scale_volume: float, scale_price: float) -> pd.DataFrame: data['volume'] = data['volume'] / scale_volume data['open'] = data['open'] / scale_price data['high'] = data['high'] / scale_price data['low'] = data['low'] / scale_price data['close'] = data['close'] / scale_price data['adj_close'] = data['adj_close'] / scale_price return data def calculate_indicators(prices_dataframe: pd.DataFrame, use_regular_close=False) -> pd.DataFrame: if use_regular_close: col = 'close' prices_dataframe = prices_dataframe.drop(columns=['adj_close']) else: col = 'adj_close' prices_dataframe['return'] = prices_dataframe[col].pct_change(1).iloc[1:] prices_dataframe['log1p_return'] = np.log1p(prices_dataframe['return']) prices_dataframe['rsi_ema'] = calc_rsi(prices_dataframe[col], lambda s: s.ewm(span=14).mean()) # prices_dataframe['rsi_sma'] = calc_rsi(prices_dataframe[col], lambda s: s.rolling(14).mean()) # prices_dataframe['smstd_20'] = prices_dataframe[col].rolling(window=20).std() # prices_dataframe['sma_20'] = prices_dataframe[col].rolling(window=20).mean() prices_dataframe['ewma_20'] = prices_dataframe[col].ewm(span=20).mean() prices_dataframe['ewma_60'] = prices_dataframe[col].ewm(span=60).mean() prices_dataframe['ewmstd_20'] = prices_dataframe[col].ewm(span=20).std() prices_dataframe['macd'] = prices_dataframe[col].ewm(span=12).mean() - prices_dataframe[col].ewm(span=26).mean() # prices_dataframe['upper_band_sma'] = prices_dataframe.ewma_20 + (2 * prices_dataframe.ewmstd_20) # prices_dataframe['lower_band_sma'] = prices_dataframe.sma_20 - (2 * prices_dataframe.ewmstd_20) # prices_dataframe['upper_band_ewma'] = prices_dataframe.ewma_20 + (2 * prices_dataframe.ewmstd_20) # prices_dataframe['lower_band_ewma'] = prices_dataframe.ewma_20 - (2 * prices_dataframe.ewmstd_20) return prices_dataframe