|
import streamlit as st |
|
import pandas as pd |
|
from pathlib import Path |
|
from processor import DataProcessor |
|
from llm_handler import LLMHandler |
|
|
|
class DDoSAnalyzerUI: |
|
def __init__(self): |
|
|
|
st.set_page_config( |
|
page_title="DDoS Traffic Analyzer", |
|
page_icon=":shield:", |
|
layout="wide", |
|
initial_sidebar_state="expanded" |
|
) |
|
|
|
|
|
self.setup_session_state() |
|
|
|
|
|
self.processor = DataProcessor() |
|
self.llm_handler = LLMHandler() |
|
|
|
|
|
self.themes = { |
|
'Light': { |
|
'base': 'light', |
|
'primaryColor': '#3B82F6', |
|
'backgroundColor': '#FFFFFF', |
|
'secondaryBackgroundColor': '#F3F4F6', |
|
'textColor': '#000000' |
|
}, |
|
'Dark': { |
|
'base': 'dark', |
|
'primaryColor': '#10B981', |
|
'backgroundColor': '#1F2937', |
|
'secondaryBackgroundColor': '#374151', |
|
'textColor': '#FFFFFF' |
|
}, |
|
'Neon': { |
|
'base': 'dark', |
|
'primaryColor': '#22D3EE', |
|
'backgroundColor': '#000000', |
|
'secondaryBackgroundColor': '#1F2937', |
|
'textColor': '#22D3EE' |
|
} |
|
} |
|
|
|
def setup_session_state(self): |
|
"""Initialize Streamlit session state variables.""" |
|
st.write("Setting up session state.") |
|
session_keys = [ |
|
'chat_history', |
|
'current_file', |
|
'analysis_complete', |
|
'selected_theme', |
|
'preview_data' |
|
] |
|
|
|
for key in session_keys: |
|
if key not in st.session_state: |
|
st.session_state[key] = None if key != 'chat_history' else [] |
|
|
|
st.write("Session state setup complete.") |
|
|
|
def apply_theme(self): |
|
"""Apply selected theme to Streamlit app.""" |
|
theme_name = st.session_state.get('selected_theme', 'Light') |
|
theme = self.themes[theme_name] |
|
|
|
st.markdown(f""" |
|
<style> |
|
.stApp {{ |
|
background-color: {theme['backgroundColor']}; |
|
color: {theme['textColor']}; |
|
}} |
|
.stTabs [data-baseweb="tab-list"] {{ |
|
background-color: {theme['secondaryBackgroundColor']}; |
|
}} |
|
.stButton>button {{ |
|
background-color: {theme['primaryColor']}; |
|
color: white; |
|
}} |
|
</style> |
|
""", unsafe_allow_html=True) |
|
|
|
def render_theme_icons(self): |
|
"""Render theme selection icons on the right sidebar.""" |
|
st.sidebar.header("π Theme Selector") |
|
|
|
|
|
light_icon = "βοΈ" |
|
dark_icon = "π" |
|
neon_icon = "π‘" |
|
|
|
col1, col2, col3 = st.sidebar.columns(3) |
|
|
|
with col1: |
|
if st.button(light_icon): |
|
st.session_state.selected_theme = 'Light' |
|
self.apply_theme() |
|
|
|
with col2: |
|
if st.button(dark_icon): |
|
st.session_state.selected_theme = 'Dark' |
|
self.apply_theme() |
|
|
|
with col3: |
|
if st.button(neon_icon): |
|
st.session_state.selected_theme = 'Neon' |
|
self.apply_theme() |
|
|
|
def render_file_upload(self): |
|
"""Render file upload section in the sidebar.""" |
|
st.sidebar.header("π€ Upload Data") |
|
|
|
uploaded_file = st.sidebar.file_uploader( |
|
"Choose a CSV file", type=['csv'], |
|
help="Upload your network traffic data for DDoS analysis" |
|
) |
|
|
|
if uploaded_file: |
|
try: |
|
|
|
df = pd.read_csv(uploaded_file) |
|
st.session_state.current_file = uploaded_file |
|
st.session_state.preview_data = df.head(5) |
|
|
|
if st.sidebar.button("π Process Data", use_container_width=True): |
|
self.process_uploaded_file(uploaded_file) |
|
|
|
except Exception as e: |
|
st.error(f"Error processing file: {e}") |
|
|
|
def process_uploaded_file(self, file): |
|
"""Process the uploaded file and perform analysis.""" |
|
try: |
|
st.write("Processing uploaded file.") |
|
with st.spinner("Processing data..."): |
|
|
|
df = self.processor.preprocess_data(file) |
|
|
|
|
|
stats = self.processor.calculate_statistics(df) |
|
|
|
|
|
results = self.llm_handler.analyze_data(df) |
|
|
|
|
|
st.session_state.analysis_complete = True |
|
st.success("Analysis completed successfully!") |
|
|
|
except Exception as e: |
|
st.error(f"Error processing file: {e}") |
|
|
|
st.write("File processing complete.") |
|
|
|
def render_main_content(self): |
|
"""Render main content area with tabs.""" |
|
if not st.session_state.current_file: |
|
st.info("Please upload a CSV file to begin analysis") |
|
return |
|
|
|
|
|
tab1, tab2 = st.tabs(["π Analysis", "π¬ Chat"]) |
|
|
|
with tab1: |
|
self.render_analysis_tab() |
|
|
|
with tab2: |
|
self.render_chat_tab() |
|
|
|
def render_analysis_tab(self): |
|
"""Render analysis results tab.""" |
|
if st.session_state.preview_data is not None: |
|
st.subheader("π Data Preview") |
|
st.dataframe(st.session_state.preview_data) |
|
|
|
if st.session_state.analysis_complete: |
|
st.subheader("π Analysis Results") |
|
|
|
st.info("Full analysis results would be displayed here.") |
|
|
|
def render_chat_tab(self): |
|
"""Render interactive chat interface.""" |
|
|
|
for message in st.session_state.chat_history: |
|
with st.chat_message(message['role']): |
|
st.write(message['content']) |
|
|
|
|
|
if prompt := st.chat_input("Ask about the analysis..."): |
|
|
|
st.session_state.chat_history.append({ |
|
'role': 'user', |
|
'content': prompt |
|
}) |
|
|
|
|
|
response = self.llm_handler.get_chat_response(prompt) |
|
|
|
|
|
st.session_state.chat_history.append({ |
|
'role': 'assistant', |
|
'content': response |
|
}) |
|
|
|
def main(self): |
|
"""Main method to run the Streamlit application.""" |
|
self.render_theme_icons() |
|
|
|
|
|
st.title("π‘οΈ DDoS Traffic Analyzer") |
|
self.render_file_upload() |
|
self.render_main_content() |
|
|
|
|
|
if __name__ == "__main__": |
|
app = DDoSAnalyzerUI() |
|
app.main() |
|
|