import pandas as pd import numpy as np from pathlib import Path class DataProcessor: def __init__(self): """Initialize DataProcessor class and create dataset directory.""" self.data_dir = Path(".dataset") self.data_dir.mkdir(exist_ok=True) def preprocess_data(self, file): """ Preprocess the uploaded CSV file. Args: file: The uploaded CSV file. Returns: pd.DataFrame: The cleaned DataFrame. Raises: ValueError: If there are issues reading or processing the data. """ try: # Read the CSV file preprocess = pd.read_csv(file) # Get the label column name label_column = self.get_label_column(preprocess) # Drop unnecessary columns ddos_data = self.drop_unnecessary_columns(preprocess, label_column) # Clean data: Replace infinities and drop NaNs ddos_data.replace([np.inf, -np.inf], np.nan, inplace=True) ddos_data.dropna(inplace=True) # Save cleaned data processed_path = self.data_dir / "original.csv" ddos_data.to_csv(processed_path, index=False) return ddos_data except Exception as e: raise ValueError(f"Error processing data: {e}") def get_label_column(self, df): """ Prompt the user for the label column name. Args: df: The DataFrame. Returns: str: The label column name. """ default_label_column = " Label" print(f"Default label column is '{default_label_column}'.") user_label_column = input("Specify a different label column name (or press Enter to keep default): ") return user_label_column.strip() or default_label_column def drop_unnecessary_columns(self, df, label_column): """ Drop unnecessary columns from the DataFrame. Args: df: The DataFrame to be cleaned. label_column: The label column to retain. Returns: pd.DataFrame: DataFrame with unnecessary columns dropped. """ default_columns_to_drop = ['Unnamed: 0', 'Flow ID', ' Source IP', ' Source Port', ' Destination IP', ' Destination Port', ' Timestamp', 'SimillarHTTP'] print(f"Columns to drop by default: {default_columns_to_drop}") user_columns_to_drop = input("Specify additional columns to drop (comma-separated) or press Enter to keep default: ") # Combine default columns and user-specified columns columns_to_drop = default_columns_to_drop + [col.strip() for col in user_columns_to_drop.split(',')] if user_columns_to_drop else default_columns_to_drop # Drop the columns from the DataFrame and return it return df.drop(columns=[col for col in columns_to_drop if col in df.columns], errors='ignore')