DRLLMTRY / processor.py
yash1506's picture
Update processor.py
65fc544 verified
raw
history blame
3.17 kB
import pandas as pd
import numpy as np
from pathlib import Path
class DataProcessor:
def __init__(self):
"""Initialize DataProcessor class and create dataset directory."""
self.data_dir = Path(".dataset")
self.data_dir.mkdir(exist_ok=True)
def preprocess_data(self, file):
"""
Preprocess the uploaded CSV file.
Args:
file: The uploaded CSV file.
Returns:
pd.DataFrame: The cleaned DataFrame.
Raises:
ValueError: If there are issues reading or processing the data.
"""
try:
# Read the CSV file
preprocess = pd.read_csv(file)
# Get the label column name
label_column = self.get_label_column(preprocess)
# Drop unnecessary columns
ddos_data = self.drop_unnecessary_columns(preprocess, label_column)
# Clean data: Replace infinities and drop NaNs
ddos_data.replace([np.inf, -np.inf], np.nan, inplace=True)
ddos_data.dropna(inplace=True)
# Save cleaned data
processed_path = self.data_dir / "original.csv"
ddos_data.to_csv(processed_path, index=False)
return ddos_data
except Exception as e:
raise ValueError(f"Error processing data: {e}")
def get_label_column(self, df):
"""
Prompt the user for the label column name.
Args:
df: The DataFrame.
Returns:
str: The label column name.
"""
default_label_column = " Label"
print(f"Default label column is '{default_label_column}'.")
user_label_column = input("Specify a different label column name (or press Enter to keep default): ")
return user_label_column.strip() or default_label_column
def drop_unnecessary_columns(self, df, label_column):
"""
Drop unnecessary columns from the DataFrame.
Args:
df: The DataFrame to be cleaned.
label_column: The label column to retain.
Returns:
pd.DataFrame: DataFrame with unnecessary columns dropped.
"""
default_columns_to_drop = ['Unnamed: 0', 'Flow ID',
' Source IP', ' Source Port',
' Destination IP', ' Destination Port',
' Timestamp', 'SimillarHTTP']
print(f"Columns to drop by default: {default_columns_to_drop}")
user_columns_to_drop = input("Specify additional columns to drop (comma-separated) or press Enter to keep default: ")
# Combine default columns and user-specified columns
columns_to_drop = default_columns_to_drop + [col.strip() for col in user_columns_to_drop.split(',')] if user_columns_to_drop else default_columns_to_drop
# Drop the columns from the DataFrame and return it
return df.drop(columns=[col for col in columns_to_drop if col in df.columns], errors='ignore')