|
import pandas as pd |
|
import numpy as np |
|
from pathlib import Path |
|
|
|
class DataProcessor: |
|
def __init__(self): |
|
"""Initialize DataProcessor class and create dataset directory.""" |
|
self.data_dir = Path(".dataset") |
|
self.data_dir.mkdir(exist_ok=True) |
|
|
|
def preprocess_data(self, file): |
|
""" |
|
Preprocess the uploaded CSV file. |
|
|
|
Args: |
|
file: The uploaded CSV file. |
|
|
|
Returns: |
|
pd.DataFrame: The cleaned DataFrame. |
|
|
|
Raises: |
|
ValueError: If there are issues reading or processing the data. |
|
""" |
|
try: |
|
|
|
preprocess = pd.read_csv(file) |
|
|
|
|
|
label_column = self.get_label_column(preprocess) |
|
|
|
|
|
ddos_data = self.drop_unnecessary_columns(preprocess, label_column) |
|
|
|
|
|
ddos_data.replace([np.inf, -np.inf], np.nan, inplace=True) |
|
ddos_data.dropna(inplace=True) |
|
|
|
|
|
processed_path = self.data_dir / "original.csv" |
|
ddos_data.to_csv(processed_path, index=False) |
|
|
|
return ddos_data |
|
|
|
except Exception as e: |
|
raise ValueError(f"Error processing data: {e}") |
|
|
|
def get_label_column(self, df): |
|
""" |
|
Prompt the user for the label column name. |
|
|
|
Args: |
|
df: The DataFrame. |
|
|
|
Returns: |
|
str: The label column name. |
|
""" |
|
default_label_column = " Label" |
|
print(f"Default label column is '{default_label_column}'.") |
|
user_label_column = input("Specify a different label column name (or press Enter to keep default): ") |
|
|
|
return user_label_column.strip() or default_label_column |
|
|
|
def drop_unnecessary_columns(self, df, label_column): |
|
""" |
|
Drop unnecessary columns from the DataFrame. |
|
|
|
Args: |
|
df: The DataFrame to be cleaned. |
|
label_column: The label column to retain. |
|
|
|
Returns: |
|
pd.DataFrame: DataFrame with unnecessary columns dropped. |
|
""" |
|
default_columns_to_drop = ['Unnamed: 0', 'Flow ID', |
|
' Source IP', ' Source Port', |
|
' Destination IP', ' Destination Port', |
|
' Timestamp', 'SimillarHTTP'] |
|
|
|
print(f"Columns to drop by default: {default_columns_to_drop}") |
|
user_columns_to_drop = input("Specify additional columns to drop (comma-separated) or press Enter to keep default: ") |
|
|
|
|
|
columns_to_drop = default_columns_to_drop + [col.strip() for col in user_columns_to_drop.split(',')] if user_columns_to_drop else default_columns_to_drop |
|
|
|
|
|
return df.drop(columns=[col for col in columns_to_drop if col in df.columns], errors='ignore') |
|
|