from huggingface_hub import InferenceClient import pandas as pd import re from pathlib import Path import os from dotenv import load_dotenv class LLMHandler: def __init__(self): load_dotenv() self.client = InferenceClient(api_key=os.getenv("HF_TOK_KEY")) self.model = "Qwen/Qwen2.5-Coder-32B-Instruct" self.results_dir = Path("results") self.results_dir.mkdir(exist_ok=True) def analyze_data(self, df): """ Analyze the dataset using the LLM model """ # Generate knowledge prompt know_prompt = self._generate_knowledge_prompt(df) # Process each row results = [] for idx, row in df.iterrows(): result = self._process_row(row, know_prompt) results.append(result) result = self._process_row(row, know_prompt) # Process the row results.append(result) # Save results results_df = pd.DataFrame(results) results_path = self.results_dir / "PROBABILITY_OF_EACH_ROW_DDOS_AND_BENGNIN.csv" results_df.to_csv(results_path, index=False) return results_df def _generate_knowledge_prompt(self, df): """ Generate the knowledge prompt for the LLM """ df_without_label = df.drop(self._get_label_column(df), axis=1) stats = { 'max': df_without_label.max(), 'min': df_without_label.min(), 'median': df_without_label.median(), 'mean': df_without_label.mean(), 'variance': df_without_label.var() } prompt = ("Supposed that you are now an [[ HIGHLY EXPERIENCED NETWORK TRAFFIC DATA ANALYSIS EXPERT ]] . " "You need to help me analyze the data in the DDoS dataset and determine whether the data is [[ DDoS traffic ]] or [[ normal traffic ]] ." "Next, I will give you the maximum, minimum, median, mean, and variance of all the data under each label or columns present in the data set, which may help you make your judgment." "DO DEEP ANALYSIS ITS YOUR WORKS AND PRROIVDE ACCURATE ANSWERS ALONGWITH GIVEN TASK ::\n\n") for column in df_without_label.columns: prompt += (f"{column}: max={stats['max'][column]:.1f}, " f"min={stats['min'][column]:.1f}, " f"median={stats['median'][column]:.1f}, " f"mean={stats['mean'][column]:.1f}, " f"variance={stats['variance'][column]:.1f}\n") return prompt def _process_row(self, row, know_prompt): """ Process a single row using the LLM """ if isinstance(row, dict): row = pd.Series(row) # Prepare row data prompt row_prompt = self._generate_row_prompt(row) # Get LLM response messages = [ {'role': 'user', 'content': know_prompt}, {'role': 'user', 'content': row_prompt} ] completion = self.client.chat.completions.create( model=self.model, messages=messages, max_tokens=10000 ) # Extract probabilities response = completion.choices[0].message.content probabilities = self._extract_probabilities(response) return { 'index': row.name, # This should now work as row is a pandas.Series 'attack': probabilities[0], 'benign': probabilities[1], 'original': response } def _generate_row_prompt(self, row): """ Generate prompt for a single row """ return ("Next, I will give you a piece of data about network traffic information. You need to first tell me the probability of this data belonging to DDoS traffic data and normal traffic data directly and separately, and express the probability in the format of [0.xxx,0.xxx]. " "The first number is the probability of DDoS traffic like [[ 0.xxx ]] only where x are any numeral , and the second number is the probability of normal traffic like[ 0.xxx ]] only where x are any numeral . KEEP IN MIND : [ BOTH NUMBERS ARE DECIMALS BETWEEN 0 AND 1, AND THE SUM OF THE TWO NUMBERS IS 1 ] . " "CLEARLY NOTE THAT YOU HAVE TO PROVIDE ONLY PROBABLITY OF EACH DDOS AND BENGNIN GIVEN THAT TOATL PROBABILITY WHEN SUMMED MUST BE 1 . " "AND YES IT MUST BE LIKE 0<=P([DDOS,BENGNIN])<=1 BUT YOU HAVE TO PROVIDE PROBABILITY IN THIS FORM express the probability in the format of [0.xxx,0.xxx] ONLY ." "Let's think step by step and explain the reasons for your judgment Each POINTWISE . The characteristics of its network traffic data packets are" "for DDoS vs normal traffic in the format [0.xxx,0.xxx]. " "The data features are:\n" + "\n".join([f"{col}: {val}" for col, val in row.items()])) def _extract_probabilities(self, response): """ Extract probabilities from LLM response """ pattern = r'\[(.*?)\]' matches = re.findall(pattern, response) if matches and ',' in matches[0]: probs = matches[0].split(',') return [float(p) for p in probs] return [None, None] def get_chat_response(self, prompt): """ Get response for chat interface """ messages = [{'role': 'user', 'content': prompt}] completion = self.client.chat.completions.create( model=self.model, messages=messages, max_tokens=10000 ) return completion.choices[0].message.content def _get_label_column(self, df): """ Get the label column name """ potential_labels = [col for col in df.columns if ' Label' in col.lower()] return potential_labels[0] if potential_labels else None