import logging import os import sys from pathlib import Path import json import io import uuid import traceback from typing import Dict, List, Any, Tuple, Optional from dataclasses import dataclass # Set UTF-8 encoding for Windows if sys.platform == 'win32': os.environ["PYTHONIOENCODING"] = "utf-8" import gradio as gr import numpy as np import pandas as pd import matplotlib.pyplot as plt from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM from sklearn.datasets import load_iris import cv2 from PIL import Image # Additional libraries for web research & scraping import wikipedia import requests from bs4 import BeautifulSoup # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --------------------------- # Agent Context & Memory System # --------------------------- @dataclass class AgentMemory: short_term: List[Dict[str, Any]] = None long_term: Dict[str, Any] = None def __post_init__(self): if self.short_term is None: self.short_term = [] if self.long_term is None: self.long_term = {} def add_short_term(self, data: Dict[str, Any]) -> None: self.short_term.append(data) if len(self.short_term) > 10: self.short_term.pop(0) def add_long_term(self, key: str, value: Any) -> None: self.long_term[key] = value def get_recent_context(self, n: int = 3) -> List[Dict[str, Any]]: return self.short_term[-n:] if len(self.short_term) >= n else self.short_term def search_long_term(self, query: str) -> List[Tuple[str, Any]]: results = [] for key, value in self.long_term.items(): if query.lower() in key.lower(): results.append((key, value)) return results # --------------------------- # Agent Hub # --------------------------- class AgentHub: def __init__(self): self.agents = {} self.global_memory = AgentMemory() self.session_id = str(uuid.uuid4()) try: self.tokenizer = AutoTokenizer.from_pretrained("distilgpt2") self.model = AutoModelForCausalLM.from_pretrained("distilgpt2") self.generator = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer) logger.info("Initialized text generation pipeline with distilgpt2") except Exception as e: logger.error(f"Failed to initialize text generation: {e}") self.generator = None try: self.summarizer = pipeline("summarization", model="facebook/bart-large-cnn") logger.info("Initialized summarization pipeline") except Exception as e: logger.error(f"Failed to initialize summarizer: {e}") self.summarizer = None def register_agent(self, agent_id: str, agent_instance) -> None: self.agents[agent_id] = agent_instance logger.info(f"Registered agent: {agent_id}") def get_agent(self, agent_id: str): return self.agents.get(agent_id) def broadcast(self, message: Dict[str, Any], exclude: Optional[List[str]] = None) -> Dict[str, List[Dict]]: exclude = exclude or [] responses = {} for agent_id, agent in self.agents.items(): if agent_id not in exclude: try: response = agent.process_message(message) responses[agent_id] = response except Exception as e: logger.error(f"Error in agent {agent_id}: {e}") responses[agent_id] = {"error": str(e)} return responses def chain_of_thought(self, initial_task: str, agent_sequence: List[str]) -> Dict[str, Any]: results = {"final_output": None, "chain_outputs": [], "errors": []} current_input = initial_task for agent_id in agent_sequence: agent = self.get_agent(agent_id) if not agent: error = f"Agent {agent_id} not found" results["errors"].append(error) logger.error(error) continue try: output = agent.process_task(current_input) step_result = {"agent": agent_id, "input": current_input, "output": output} results["chain_outputs"].append(step_result) if isinstance(output, dict) and "text" in output: current_input = output["text"] elif isinstance(output, str): current_input = output else: current_input = f"Result from {agent_id}: {type(output).__name__} object" except Exception as e: error = f"Error in agent {agent_id}: {str(e)}\n{traceback.format_exc()}" results["errors"].append(error) logger.error(error) if results["chain_outputs"]: last_output = results["chain_outputs"][-1]["output"] results["final_output"] = last_output if isinstance(last_output, dict) else {"text": str(last_output)} return results # --------------------------- # Intelligent Agent Base Class # --------------------------- class IntelligentAgent: def __init__(self, agent_id: str, hub: AgentHub): self.agent_id = agent_id self.hub = hub self.memory = AgentMemory() logger.info(f"Initialized agent: {agent_id}") def process_task(self, task: Any) -> Any: raise NotImplementedError("Subclasses must implement process_task") def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]: logger.info(f"Agent {self.agent_id} received message: {message}") self.memory.add_short_term({"timestamp": pd.Timestamp.now(), "message": message}) return {"sender": self.agent_id, "received": True, "action": "acknowledge"} def request_assistance(self, target_agent_id: str, data: Dict[str, Any]) -> Dict[str, Any]: target_agent = self.hub.get_agent(target_agent_id) if not target_agent: logger.error(f"Agent {self.agent_id} requested unknown agent: {target_agent_id}") return {"error": f"Agent {target_agent_id} not found"} request = {"sender": self.agent_id, "type": "assistance_request", "data": data} return target_agent.process_message(request) def evaluate_result(self, result: Any) -> Dict[str, Any]: success = result is not None confidence = 0.8 if success else 0.2 return {"success": success, "confidence": confidence, "timestamp": pd.Timestamp.now().isoformat()} # --------------------------- # Specialized Agent Implementations # --------------------------- class WebResearchAgent(IntelligentAgent): def __init__(self, hub: AgentHub): super().__init__("web_research", hub) def process_task(self, task: str) -> Dict[str, Any]: logger.info(f"WebResearchAgent processing: {task}") search_term = task if self.hub.summarizer: try: keywords = task.split() if len(keywords) > 5: summary = self.hub.summarizer(task, max_length=20, min_length=5, do_sample=False) search_term = summary[0]['summary_text'] else: search_term = task except Exception as e: logger.error(f"Summarization error in WebResearchAgent: {e}") search_term = task try: search_results = wikipedia.search(search_term) if not search_results: result = {"text": f"No Wikipedia pages found for '{task}'."} self.memory.add_short_term({"task": task, "result": result, "success": False}) return result page_title = None summary_text = None error_details = [] for candidate in search_results[:3]: try: summary_text = wikipedia.summary(candidate, sentences=5) page_title = candidate break except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError) as e: error_details.append(f"{candidate}: {str(e)}") continue if not summary_text: result = {"text": f"Failed to get Wikipedia summary for '{task}'. Errors: {'; '.join(error_details)}", "search_results": search_results} self.memory.add_short_term({"task": task, "result": result, "success": False}) return result self.memory.add_long_term(f"research:{search_term}", {"page_title": page_title, "summary": summary_text, "timestamp": pd.Timestamp.now().isoformat()}) result = {"text": f"Research on '{page_title}':\n{summary_text}", "page_title": page_title, "related_topics": search_results[:5], "source": "Wikipedia"} self.memory.add_short_term({"task": task, "result": result, "success": True}) return result except Exception as e: error_msg = f"Error in web research: {str(e)}" logger.error(error_msg) result = {"text": error_msg, "error": str(e)} self.memory.add_short_term({"task": task, "result": result, "success": False}) return result class WebScraperAgent(IntelligentAgent): def __init__(self, hub: AgentHub): super().__init__("web_scraper", hub) def process_task(self, task: str) -> Dict[str, Any]: logger.info(f"WebScraperAgent processing URL: {task}") if not task.startswith(('http://', 'https://')): return {"text": "Invalid URL format. Please provide a URL starting with http:// or https://"} try: headers = {'User-Agent': 'Mozilla/5.0'} response = requests.get(task, headers=headers, timeout=10) if response.status_code != 200: result = {"text": f"Error: received status code {response.status_code} from {task}"} self.memory.add_short_term({"url": task, "result": result, "success": False}) return result soup = BeautifulSoup(response.text, 'html.parser') title = soup.title.string.strip() if soup.title and soup.title.string else "No title found" main_content = soup.find('main') or soup.find(id='content') or soup.find(class_='content') paras = main_content.find_all('p') if main_content else soup.find_all('p') content = "\n".join([p.get_text().strip() for p in paras if len(p.get_text().strip()) > 50]) if len(content) > 2000 and self.hub.summarizer: chunks = [content[i:i+1000] for i in range(0, len(content), 1000)] summarized_chunks = [] for chunk in chunks: summary = self.hub.summarizer(chunk, max_length=100, min_length=30, do_sample=False) summarized_chunks.append(summary[0]['summary_text']) content = "\n".join(summarized_chunks) elif len(content) > 2000: content = content[:2000] + "... (content truncated)" links = [] for a in soup.find_all('a', href=True): href = a['href'] if href.startswith('http') and len(links) < 5: links.append({"url": href, "text": a.get_text().strip() or href}) result = {"text": f"Content from {task}:\n\nTitle: {title}\n\n{content}", "title": title, "raw_content": content, "links": links, "source_url": task} self.memory.add_short_term({"url": task, "result": result, "success": True}) self.memory.add_long_term(f"scraped:{task}", {"title": title, "content_preview": content[:200], "timestamp": pd.Timestamp.now().isoformat()}) return result except requests.RequestException as e: error_msg = f"Request error for {task}: {str(e)}" logger.error(error_msg) return {"text": error_msg, "error": str(e)} except Exception as e: error_msg = f"Error scraping {task}: {str(e)}" logger.error(error_msg) return {"text": error_msg, "error": str(e)} class TextProcessingAgent(IntelligentAgent): def __init__(self, hub: AgentHub): super().__init__("text_processing", hub) def process_task(self, task: str) -> Dict[str, Any]: logger.info(f"TextProcessingAgent processing text ({len(task)} chars)") if not task or len(task) < 10: return {"text": "Text too short to process meaningfully."} results = {} words = task.split() sentences = task.split('. ') results["statistics"] = { "character_count": len(task), "word_count": len(words), "estimated_sentences": len(sentences), "average_word_length": sum(len(word) for word in words) / len(words) if words else 0 } if len(task) > 5000: chunk_size = 500 chunking_strategy = "character_blocks" elif len(words) > 200: chunk_size = 50 chunking_strategy = "word_blocks" else: chunk_size = 5 chunking_strategy = "sentence_blocks" if chunking_strategy == "character_blocks": chunks = [task[i:i+chunk_size] for i in range(0, len(task), chunk_size)] elif chunking_strategy == "word_blocks": chunks = [' '.join(words[i:i+chunk_size]) for i in range(0, len(words), chunk_size)] else: chunks = ['. '.join(sentences[i:i+chunk_size]) + '.' for i in range(0, len(sentences), chunk_size)] results["chunks"] = chunks results["chunking_strategy"] = chunking_strategy if self.hub.summarizer and len(task) > 200: try: task_for_summary = task[:1000] if len(task) > 1000 else task summary = self.hub.summarizer(task_for_summary, max_length=100, min_length=30, do_sample=False) results["summary"] = summary[0]['summary_text'] except Exception as e: logger.error(f"Summarization error: {e}") results["summary_error"] = str(e) stop_words = set(['the', 'a', 'an', 'and', 'in', 'on', 'at', 'to', 'for', 'of', 'with']) word_freq = {} for word in words: w = word.lower().strip('.,!?:;()-"\'') if w and w not in stop_words and len(w) > 1: word_freq[w] = word_freq.get(w, 0) + 1 results["frequent_words"] = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:10] positive_words = set(['good', 'great', 'excellent', 'positive', 'happy', 'best', 'better', 'success']) negative_words = set(['bad', 'worst', 'terrible', 'negative', 'sad', 'problem', 'fail', 'issue']) pos_count = sum(1 for word in words if word.lower().strip('.,!?:;()-"\'') in positive_words) neg_count = sum(1 for word in words if word.lower().strip('.,!?:;()-"\'') in negative_words) sentiment = "possibly positive" if pos_count > neg_count and pos_count > 2 else ("possibly negative" if neg_count > pos_count and neg_count > 2 else "neutral or mixed") results["basic_sentiment"] = {"assessment": sentiment, "positive_word_count": pos_count, "negative_word_count": neg_count} self.memory.add_short_term({"task_preview": task[:100] + "..." if len(task) > 100 else task, "word_count": results["statistics"]["word_count"], "result": results}) text_response = ( f"Text Analysis Results:\n- {results['statistics']['word_count']} words, {results['statistics']['character_count']} characters\n" f"- Split into {len(chunks)} chunks using {chunking_strategy}\n" ) if "summary" in results: text_response += f"\nSummary:\n{results['summary']}\n" if results["frequent_words"]: text_response += "\nMost frequent words:\n" for word, count in results["frequent_words"][:5]: text_response += f"- {word}: {count} occurrences\n" text_response += f"\nOverall tone appears {results['basic_sentiment']['assessment']}" results["text"] = text_response return results class DataAnalysisAgent(IntelligentAgent): def __init__(self, hub: AgentHub): super().__init__("data_analysis", hub) def process_task(self, task: str) -> Dict[str, Any]: logger.info(f"DataAnalysisAgent processing: {task}") file_path = None if "analyze" in task.lower() and ".csv" in task.lower(): for word in task.split(): if word.endswith('.csv'): file_path = word break if not file_path or not Path(file_path).exists(): logger.info("No specific CSV file mentioned or file not found, creating sample data") if "time series" in task.lower(): dates = pd.date_range(start='2023-01-01', periods=30, freq='D') df = pd.DataFrame({'date': dates, 'value': np.random.normal(100, 15, 30), 'trend': np.linspace(0, 20, 30) + np.random.normal(0, 2, 30)}) file_path = "sample_timeseries.csv" elif "sales" in task.lower(): products = ['ProductA', 'ProductB', 'ProductC', 'ProductD'] regions = ['North', 'South', 'East', 'West'] dates = pd.date_range(start='2023-01-01', periods=50, freq='D') data = [] for _ in range(200): data.append({'date': np.random.choice(dates), 'product': np.random.choice(products), 'region': np.random.choice(regions), 'units_sold': np.random.randint(10, 100), 'revenue': np.random.uniform(100, 1000)}) df = pd.DataFrame(data) file_path = "sample_sales.csv" else: df = pd.DataFrame({ 'A': np.random.normal(0, 1, 100), 'B': np.random.normal(5, 2, 100), 'C': np.random.uniform(-10, 10, 100), 'D': np.random.randint(0, 5, 100), 'label': np.random.choice(['X', 'Y', 'Z'], 100) }) file_path = "sample_data.csv" df.to_csv(file_path, index=False) logger.info(f"Created sample data file: {file_path}") else: try: df = pd.read_csv(file_path) logger.info(f"Loaded existing file: {file_path}") except Exception as e: error_msg = f"Error loading CSV file {file_path}: {str(e)}" logger.error(error_msg) return {"text": error_msg, "error": str(e)} analysis_results = {} try: numeric_cols = df.select_dtypes(include=[np.number]).columns analysis_results["summary_stats"] = df[numeric_cols].describe().to_dict() categorical_cols = df.select_dtypes(exclude=[np.number]).columns for col in categorical_cols: if df[col].nunique() < 10: analysis_results[f"{col}_distribution"] = df[col].value_counts().to_dict() except Exception as e: logger.error(f"Error in basic statistics: {e}") analysis_results["stats_error"] = str(e) try: missing_values = df.isnull().sum().to_dict() analysis_results["missing_values"] = {k: v for k, v in missing_values.items() if v > 0} except Exception as e: logger.error(f"Error in missing values analysis: {e}") analysis_results["missing_values_error"] = str(e) try: if len(numeric_cols) > 1: analysis_results["correlations"] = df[numeric_cols].corr().to_dict() except Exception as e: logger.error(f"Error in correlation analysis: {e}") analysis_results["correlation_error"] = str(e) try: plt.figure(figsize=(10, 8)) categorical_cols = df.select_dtypes(exclude=[np.number]).columns if len(numeric_cols) >= 2: plt.subplot(2, 1, 1) x_col, y_col = numeric_cols[0], numeric_cols[1] sample_df = df.sample(1000) if len(df) > 1000 else df if len(categorical_cols) > 0 and df[categorical_cols[0]].nunique() < 10: cat_col = categorical_cols[0] for category, group in sample_df.groupby(cat_col): plt.scatter(group[x_col], group[y_col], label=category, alpha=0.6) plt.legend() else: plt.scatter(sample_df[x_col], sample_df[y_col], alpha=0.6) plt.xlabel(x_col) plt.ylabel(y_col) plt.title(f"Scatter Plot: {x_col} vs {y_col}") plt.subplot(2, 1, 2) if 'date' in df.columns or any('time' in col.lower() for col in df.columns): date_col = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()][0] value_col = numeric_cols[0] if numeric_cols[0] != date_col else numeric_cols[1] if not pd.api.types.is_datetime64_dtype(df[date_col]): df[date_col] = pd.to_datetime(df[date_col], errors='coerce') temp_df = df.dropna(subset=[date_col, value_col]).sort_values(date_col) plt.plot(temp_df[date_col], temp_df[value_col]) plt.xlabel(date_col) plt.ylabel(value_col) plt.title(f"Time Series: {value_col} over {date_col}") plt.xticks(rotation=45) else: plt.hist(df[numeric_cols[0]].dropna(), bins=20, alpha=0.7) plt.xlabel(numeric_cols[0]) plt.ylabel('Frequency') plt.title(f"Distribution of {numeric_cols[0]}") else: if len(categorical_cols) > 0: cat_col = categorical_cols[0] df[cat_col].value_counts().plot(kind='bar') plt.xlabel(cat_col) plt.ylabel('Count') plt.title(f"Counts by {cat_col}") plt.xticks(rotation=45) else: plt.hist(df[numeric_cols[0]].dropna(), bins=20) plt.xlabel(numeric_cols[0]) plt.ylabel('Frequency') plt.title(f"Distribution of {numeric_cols[0]}") plt.tight_layout() viz_path = f"{Path(file_path).stem}_viz.png" plt.savefig(viz_path) plt.close() analysis_results["visualization_path"] = viz_path analysis_results["visualization_created"] = True logger.info(f"Created visualization: {viz_path}") except Exception as e: logger.error(f"Error creating visualization: {e}") analysis_results["visualization_error"] = str(e) analysis_results["visualization_created"] = False insights = [] try: for col in numeric_cols: q1 = df[col].quantile(0.25) q3 = df[col].quantile(0.75) iqr = q3 - q1 outlier_count = ((df[col] < (q1 - 1.5 * iqr)) | (df[col] > (q3 + 1.5 * iqr))).sum() if outlier_count > 0: insights.append(f"Found {outlier_count} potential outliers in '{col}'") if "correlations" in analysis_results: for col1, corr_dict in analysis_results["correlations"].items(): for col2, corr_val in corr_dict.items(): if col1 != col2 and abs(corr_val) > 0.7: insights.append(f"Strong correlation ({corr_val:.2f}) between '{col1}' and '{col2}'") for col in categorical_cols: if df[col].nunique() < 10: value_counts = df[col].value_counts() most_common = value_counts.idxmax() most_common_pct = value_counts.max() / value_counts.sum() * 100 if most_common_pct > 80: insights.append(f"Imbalanced category in '{col}': '{most_common}' accounts for {most_common_pct:.1f}% of data") analysis_results["insights"] = insights except Exception as e: logger.error(f"Error extracting insights: {e}") analysis_results["insights_error"] = str(e) self.memory.add_short_term({"file": file_path, "columns": list(df.columns), "row_count": len(df), "analysis": analysis_results}) if "sample" in file_path: self.memory.add_long_term(f"analysis:{file_path}", {"file": file_path, "type": "generated", "columns": list(df.columns), "row_count": len(df), "timestamp": pd.Timestamp.now().isoformat()}) column_list = ", ".join(df.columns[:5]) + (", ..." if len(df.columns) > 5 else "") text_response = ( f"Data Analysis Results for {file_path}\n- Dataset: {len(df)} rows x {len(df.columns)} columns ({column_list})\n" ) if "missing_values" in analysis_results and analysis_results["missing_values"]: text_response += f"- Missing values found in {len(analysis_results['missing_values'])} columns\n" if insights: text_response += "\nKey Insights:\n" for i, insight in enumerate(insights[:5], 1): text_response += f"{i}. {insight}\n" if len(insights) > 5: text_response += f"... and {len(insights) - 5} more insights\n" text_response += f"\nVisualization saved to {viz_path}" if analysis_results.get("visualization_created") else "\nNo visualization created" analysis_results["text"] = text_response analysis_results["dataframe_shape"] = df.shape analysis_results["data_preview"] = df.head(5).to_dict() return analysis_results class CodingAssistantAgent(IntelligentAgent): def __init__(self, hub: AgentHub): super().__init__("coding_assistant", hub) self.code_snippets = { "file_operations": { "read_file": ''' def read_file(file_path): """Read a file and return its contents""" with open(file_path, 'r') as file: return file.read() ''', "write_file": ''' def write_file(file_path, content): """Write content to a file""" with open(file_path, 'w') as file: file.write(content) return True ''' }, "data_processing": { "pandas_read_csv": ''' import pandas as pd def load_csv(file_path): """Load a CSV file into a Pandas DataFrame""" return pd.read_csv(file_path) ''', "pandas_basic_stats": ''' def get_basic_stats(df): """Get basic statistics for a DataFrame""" numeric_stats = df.describe() categorical_columns = df.select_dtypes(include=['object']).columns categorical_stats = {col: df[col].value_counts().to_dict() for col in categorical_columns} return { 'numeric': numeric_stats.to_dict(), 'categorical': categorical_stats } ''' }, "visualization": { "matplotlib_basic_plot": ''' import matplotlib.pyplot as plt def create_basic_plot(data, x_col, y_col, title="Plot", kind="line"): """Create a basic plot using matplotlib""" plt.figure(figsize=(10, 6)) if kind == "line": plt.plot(data[x_col], data[y_col]) elif kind == "scatter": plt.scatter(data[x_col], data[y_col]) elif kind == "bar": plt.bar(data[x_col], data[y_col]) plt.title(title) plt.xlabel(x_col) plt.ylabel(y_col) plt.tight_layout() plt.savefig(f"{title.lower().replace(' ', '_')}.png") plt.close() return f"{title.lower().replace(' ', '_')}.png" ''' }, "web_scraping": { "requests_beautifulsoup": ''' import requests from bs4 import BeautifulSoup def scrape_webpage(url): """Scrape a webpage and extract text from paragraphs""" try: response = requests.get(url) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') paragraphs = soup.find_all('p') text = [p.get_text() for p in paragraphs] return { 'title': soup.title.string if soup.title else "No title", 'text': text, 'url': url } except Exception as e: return {'error': str(e), 'url': url} ''' }, "nlp": { "basic_text_analysis": ''' from collections import Counter import re def analyze_text(text): """Perform basic text analysis""" text = text.lower() words = re.findall(r'\w+', text) word_count = len(words) unique_words = len(set(words)) stop_words = {'the', 'a', 'an', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'and', 'or'} word_freq = Counter([w for w in words if w not in stop_words and len(w) > 1]) return { 'word_count': word_count, 'unique_words': unique_words, 'avg_word_length': sum(len(w) for w in words) / word_count if word_count else 0, 'most_common': word_freq.most_common(10) } ''' }, "machine_learning": { "basic_classifier": ''' from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import classification_report def train_basic_classifier(X, y, test_size=0.2, random_state=42): """Train a basic RandomForest classifier""" X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state) model = RandomForestClassifier(n_estimators=100, random_state=random_state) model.fit(X_train, y_train) y_pred = model.predict(X_test) report = classification_report(y_test, y_pred, output_dict=True) return { 'model': model, 'accuracy': report['accuracy'], 'classification_report': report, 'feature_importance': dict(zip(range(X.shape[1]), model.feature_importances_)) } ''' } } def process_task(self, task: str) -> Dict[str, Any]: logger.info(f"CodingAssistantAgent processing: {task}") task_lower = task.lower() keyword_mapping = { "file": "file_operations", "read file": "file_operations", "write file": "file_operations", "csv": "data_processing", "data": "data_processing", "pandas": "data_processing", "dataframe": "data_processing", "plot": "visualization", "chart": "visualization", "graph": "visualization", "visualize": "visualization", "matplotlib": "visualization", "scrape": "web_scraping", "web": "web_scraping", "html": "web_scraping", "beautifulsoup": "web_scraping", "text analysis": "nlp", "nlp": "nlp", "natural language": "nlp", "word count": "nlp", "text processing": "nlp", "machine learning": "machine_learning", "ml": "machine_learning", "model": "machine_learning", "predict": "machine_learning", "classifier": "machine_learning" } code_category = None function_name = None for keyword, category in keyword_mapping.items(): if keyword in task_lower: code_category = category for func_name in self.code_snippets.get(category, {}): natural_func = func_name.replace('_', ' ') if natural_func in task_lower: function_name = func_name break break if not code_category: if any(word in task_lower for word in ["add", "sum", "calculate", "compute"]): code_category = "data_processing" elif any(word in task_lower for word in ["show", "display", "generate"]): code_category = "visualization" if code_category and not function_name and self.code_snippets.get(code_category): function_name = next(iter(self.code_snippets[code_category])) if not code_category: function_parts = [word for word in task_lower.split() if word not in ["a", "the", "an", "to", "for", "function", "code", "create", "make"]] func_name = "_".join(function_parts[:2]) if len(function_parts) >= 2 else "custom_function" custom_code = f""" def {func_name}(input_data): # Custom function based on your request: '{task}' result = None # TODO: Implement specific logic based on requirements if isinstance(input_data, list): result = len(input_data) elif isinstance(input_data, str): result = input_data.upper() elif isinstance(input_data, (int, float)): result = input_data * 2 return {{ 'input': input_data, 'result': result, 'status': 'processed' }} """ result = { "text": f"I've created a custom function template based on your request:\n\n```python\n{custom_code}\n```\n\nThis is a starting point you can customize further.", "code": custom_code, "language": "python", "type": "custom" } else: code_snippet = self.code_snippets[code_category][function_name] result = { "text": f"Here's a {code_category.replace('_', ' ')} function for {function_name.replace('_', ' ')}:\n\n```python\n{code_snippet}\n```\n\nYou can customize this code.", "code": code_snippet, "language": "python", "category": code_category, "function": function_name } self.memory.add_short_term({"task": task, "code_category": code_category, "function_provided": function_name, "timestamp": pd.Timestamp.now().isoformat()}) return result class ImageProcessingAgent(IntelligentAgent): def __init__(self, hub: AgentHub): super().__init__("image_processing", hub) def process_task(self, task: Any) -> Dict[str, Any]: logger.info("ImageProcessingAgent processing task") image = None task_type = None if isinstance(task, Image.Image): image = task task_type = "direct_image" elif isinstance(task, str): if Path(task).exists() and Path(task).suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']: try: image = Image.open(task) task_type = "image_path" except Exception as e: return {"text": f"Error loading image from {task}: {str(e)}", "error": str(e)} else: task_type = "text_instruction" elif isinstance(task, dict) and 'image' in task: if isinstance(task['image'], Image.Image): image = task['image'] elif isinstance(task['image'], str) and Path(task['image']).exists(): try: image = Image.open(task['image']) except Exception as e: return {"text": f"Error loading image from {task['image']}: {str(e)}", "error": str(e)} task_type = "dict_with_image" if task_type == "text_instruction" and not image: return {"text": "Please provide an image to process along with instructions."} if not image: return {"text": "No valid image provided for processing."} processing_type = "edge_detection" if task_type in ["text_instruction", "dict_with_image"] and isinstance(task, dict): instruction = task.get('instruction', '').lower() if 'blur' in instruction or 'smooth' in instruction: processing_type = "blur" elif 'edge' in instruction or 'contour' in instruction: processing_type = "edge_detection" elif 'gray' in instruction or 'greyscale' in instruction or 'black and white' in instruction: processing_type = "grayscale" elif 'bright' in instruction or 'contrast' in instruction: processing_type = "enhance" elif 'resize' in instruction or 'scale' in instruction: processing_type = "resize" try: img_array = np.array(image) if img_array.ndim == 3 and img_array.shape[-1] == 4: img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGBA2BGR) else: img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) processed_img = None processing_details = {"original_size": image.size} if processing_type == "edge_detection": gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 100, 200) processed_img = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) processing_details["processing"] = "Edge detection using Canny" elif processing_type == "blur": processed_img = cv2.GaussianBlur(img_cv, (7, 7), 0) processing_details["processing"] = "Gaussian Blur" elif processing_type == "grayscale": processed_img = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) processed_img = cv2.cvtColor(processed_img, cv2.COLOR_GRAY2BGR) processing_details["processing"] = "Grayscale conversion" elif processing_type == "enhance": lab = cv2.cvtColor(img_cv, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) cl = clahe.apply(l) limg = cv2.merge((cl, a, b)) processed_img = cv2.cvtColor(limg, cv2.COLOR_LAB2BGR) processing_details["processing"] = "Contrast enhancement" elif processing_type == "resize": processed_img = cv2.resize(img_cv, (image.size[0]//2, image.size[1]//2)) processing_details["processing"] = "Resized to half" else: processed_img = img_cv processing_details["processing"] = "No processing applied" processed_pil = Image.fromarray(cv2.cvtColor(processed_img, cv2.COLOR_BGR2RGB)) return {"text": f"Image processing completed with {processing_details['processing']}.", "image": processed_pil, "details": processing_details} except Exception as e: error_msg = f"Error processing image: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) return {"text": f"Error processing image: {str(e)}", "error": str(e)} class FileManagementAgent(IntelligentAgent): def __init__(self, hub: AgentHub): super().__init__("file_management", hub) def process_task(self, task: str) -> Dict[str, Any]: logger.info(f"FileManagementAgent processing: {task}") task_lower = task.lower() if any(word in task_lower for word in ["create", "make", "generate", "write"]): operation = "create" elif any(word in task_lower for word in ["read", "open", "show", "display", "content"]): operation = "read" elif any(word in task_lower for word in ["list", "find", "directory", "folder", "files in"]): operation = "list" elif any(word in task_lower for word in ["delete", "remove"]): operation = "delete" else: operation = "unknown" filename = None file_extensions = ['.txt', '.json', '.csv', '.md', '.py', '.html', '.js', '.css'] words = task.split() for word in words: for ext in file_extensions: if ext in word.lower(): filename = word.strip(':"\'.,;') break if filename: break if not filename: file_keywords = ["file", "named", "called", "filename"] for i, word in enumerate(words): if word.lower() in file_keywords and i < len(words) - 1: potential_name = words[i+1].strip(':"\'.,;') if '.' not in potential_name: if "json" in task_lower: potential_name += ".json" elif "csv" in task_lower: potential_name += ".csv" elif "python" in task_lower or "py" in task_lower: potential_name += ".py" else: potential_name += ".txt" filename = potential_name break if not filename: if "json" in task_lower: filename = f"data_{uuid.uuid4().hex[:6]}.json" elif "csv" in task_lower: filename = f"data_{uuid.uuid4().hex[:6]}.csv" elif "python" in task_lower or "py" in task_lower: filename = f"script_{uuid.uuid4().hex[:6]}.py" elif "log" in task_lower: filename = f"log_{uuid.uuid4().hex[:6]}.txt" else: filename = f"file_{uuid.uuid4().hex[:6]}.txt" result = {} if operation == "create": if filename.endswith('.json'): content = json.dumps({ "name": "Sample Data", "description": task, "created": pd.Timestamp.now().isoformat(), "values": [1, 2, 3, 4, 5], "metadata": {"source": "FileManagementAgent", "version": "1.0"} }, indent=2) elif filename.endswith('.csv'): content = "id,name,value,timestamp\n" for i in range(5): content += f"{i+1},Item{i+1},{np.random.randint(1, 100)},{pd.Timestamp.now().isoformat()}\n" elif filename.endswith('.py'): content = f"""# Generated Python Script: {filename} # Created: {pd.Timestamp.now().isoformat()} # Description: {task} def main(): print("Hello from the FileManagementAgent!") data = [1, 2, 3, 4, 5] result = sum(data) print(f"Sample calculation: sum(data) = {{result}}") return result if __name__ == "__main__": main() """ else: content = f"File created by FileManagementAgent\nCreated: {pd.Timestamp.now().isoformat()}\nBased on request: {task}\n\nThis is sample content." try: with open(filename, 'w', encoding='utf-8') as f: f.write(content) result = {"text": f"Successfully created file: {filename}", "operation": "create", "filename": filename, "size": len(content), "preview": content[:200] + "..." if len(content) > 200 else content} self.memory.add_short_term({"operation": "create", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()}) self.memory.add_long_term(f"file:{filename}", {"operation": "create", "type": Path(filename).suffix, "timestamp": pd.Timestamp.now().isoformat()}) except Exception as e: error_msg = f"Error creating file {filename}: {str(e)}" logger.error(error_msg) result = {"text": error_msg, "error": str(e)} elif operation == "read": if not filename: result = {"text": "Please specify a filename to read."} elif not Path(filename).exists(): result = {"text": f"File '{filename}' not found."} else: try: with open(filename, 'r', encoding='utf-8') as f: content = f.read() result = {"text": f"Content of {filename}:\n\n{content}", "operation": "read", "filename": filename, "content": content, "size": len(content)} self.memory.add_short_term({"operation": "read", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()}) except Exception as e: error_msg = f"Error reading file {filename}: {str(e)}" logger.error(error_msg) result = {"text": error_msg, "error": str(e)} elif operation == "list": try: directory = "." for term in ["directory", "folder", "in"]: if term in task_lower: parts = task_lower.split(term) if len(parts) > 1: potential_dir = parts[1].strip().split()[0].strip(':"\'.,;') if Path(potential_dir).exists() and Path(potential_dir).is_dir(): directory = potential_dir extension_filter = None for ext in file_extensions: if ext in task_lower: extension_filter = ext break files = list(Path(directory).glob('*' + (extension_filter or ''))) file_groups = {} for file in files: file_groups.setdefault(file.suffix, []).append({ "name": file.name, "size": file.stat().st_size, "modified": pd.Timestamp(file.stat().st_mtime, unit='s').isoformat() }) response_text = f"Found {len(files)} files" + (f" with extension {extension_filter}" if extension_filter else "") + f" in {directory}:\n\n" for ext, group in file_groups.items(): response_text += f"{ext} files ({len(group)}):\n" for file_info in sorted(group, key=lambda x: x["name"]): size_kb = file_info["size"] / 1024 response_text += f"- {file_info['name']} ({size_kb:.1f} KB, modified: {file_info['modified']})\n" response_text += "\n" result = {"text": response_text, "operation": "list", "directory": directory, "file_count": len(files), "files": file_groups} self.memory.add_short_term({"operation": "list", "directory": directory, "file_count": len(files), "timestamp": pd.Timestamp.now().isoformat()}) except Exception as e: error_msg = f"Error listing files: {str(e)}" logger.error(error_msg) result = {"text": error_msg, "error": str(e)} elif operation == "delete": if not filename: result = {"text": "Please specify a filename to delete."} elif not Path(filename).exists(): result = {"text": f"File '{filename}' not found."} else: try: os.remove(filename) result = {"text": f"Successfully deleted file: {filename}", "operation": "delete", "filename": filename} self.memory.add_short_term({"operation": "delete", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()}) self.memory.add_long_term(f"file:{filename}", {"operation": "delete", "timestamp": pd.Timestamp.now().isoformat()}) except Exception as e: error_msg = f"Error deleting file {filename}: {str(e)}" logger.error(error_msg) result = {"text": error_msg, "error": str(e)} else: result = {"text": f"Unknown operation requested in task: {task}"} return result # --------------------------- # Gradio Interface Setup # --------------------------- def create_agent_hub(): hub = AgentHub() hub.register_agent("web_research", WebResearchAgent(hub)) hub.register_agent("web_scraper", WebScraperAgent(hub)) hub.register_agent("text_processing", TextProcessingAgent(hub)) hub.register_agent("data_analysis", DataAnalysisAgent(hub)) hub.register_agent("coding_assistant", CodingAssistantAgent(hub)) hub.register_agent("image_processing", ImageProcessingAgent(hub)) hub.register_agent("file_management", FileManagementAgent(hub)) return hub def create_gradio_interface(): hub = create_agent_hub() def process_request(request_type, input_data, extra_data=""): try: if request_type == "chain": agent_sequence = [agent.strip() for agent in extra_data.split(",") if agent.strip()] return hub.chain_of_thought(input_data, agent_sequence) else: agent = hub.get_agent(request_type) if not agent: return {"error": f"Unknown agent type: {request_type}"} return agent.process_task(input_data) except Exception as e: logger.error(f"Error processing request: {e}") return {"error": str(e)} with gr.Blocks(title="SmolAgents Toolbelt") as interface: gr.Markdown("# SmolAgents Toolbelt") gr.Markdown("A collection of specialized agents for various tasks with evolved logic :contentReference[oaicite:0]{index=0}.") with gr.Tabs(): with gr.Tab("Single Agent"): agent_type = gr.Dropdown( choices=["web_research", "web_scraper", "text_processing", "data_analysis", "coding_assistant", "image_processing", "file_management"], label="Select Agent", value="web_research" ) with gr.Row(): input_text = gr.Textbox(label="Input", placeholder="Enter your request...") extra_input = gr.Textbox(label="Extra (e.g., image path or additional info)", placeholder="Optional extra input...") output_text = gr.JSON(label="Output") process_btn = gr.Button("Process") process_btn.click(fn=process_request, inputs=[agent_type, input_text, extra_input], outputs=output_text) with gr.Tab("Chain of Thought"): chain_input = gr.Textbox(label="Input", placeholder="Enter your request for the chain...") chain_sequence = gr.Textbox(label="Agent Sequence", placeholder="Comma-separated agent names (e.g., text_processing,data_analysis)") chain_output = gr.JSON(label="Chain Output") chain_type = gr.State("chain") chain_btn = gr.Button("Process Chain") chain_btn.click(fn=process_request, inputs=[chain_type, chain_input, chain_sequence], outputs=chain_output) with gr.Tab("Help"): gr.Markdown(""" ## Available Agents - **Web Research Agent**: Searches Wikipedia for information. - **Web Scraper Agent**: Scrapes content from provided URLs. - **Text Processing Agent**: Analyzes and processes text. - **Data Analysis Agent**: Performs data analysis and visualization. - **Coding Assistant Agent**: Generates code snippets. - **Image Processing Agent**: Processes images based on instructions. - **File Management Agent**: Handles file creation, reading, listing, and deletion. ### Usage 1. Select an agent (or choose 'Chain of Thought' for a sequence). 2. Enter your request. 3. For chains, provide a comma-separated list of agent IDs. """) return interface if __name__ == "__main__": demo = create_gradio_interface() demo.launch(server_name="0.0.0.0", server_port=7860, share=True)