Spaces:
Sleeping
Sleeping
import logging | |
import os | |
import sys | |
from pathlib import Path | |
import json | |
import io | |
import uuid | |
import traceback | |
from typing import Dict, List, Any, Tuple, Optional | |
from dataclasses import dataclass | |
# Set UTF-8 encoding for Windows | |
if sys.platform == 'win32': | |
os.environ["PYTHONIOENCODING"] = "utf-8" | |
import gradio as gr | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM | |
from sklearn.datasets import load_iris | |
import cv2 | |
from PIL import Image | |
# Additional libraries for web research & scraping | |
import wikipedia | |
import requests | |
from bs4 import BeautifulSoup | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# --------------------------- | |
# Agent Context & Memory System | |
# --------------------------- | |
class AgentMemory: | |
short_term: List[Dict[str, Any]] = None | |
long_term: Dict[str, Any] = None | |
def __post_init__(self): | |
if self.short_term is None: | |
self.short_term = [] | |
if self.long_term is None: | |
self.long_term = {} | |
def add_short_term(self, data: Dict[str, Any]) -> None: | |
self.short_term.append(data) | |
# Keep only the last 10 entries | |
if len(self.short_term) > 10: | |
self.short_term.pop(0) | |
def add_long_term(self, key: str, value: Any) -> None: | |
self.long_term[key] = value | |
def get_recent_context(self, n: int = 3) -> List[Dict[str, Any]]: | |
return self.short_term[-n:] if len(self.short_term) >= n else self.short_term | |
def search_long_term(self, query: str) -> List[Tuple[str, Any]]: | |
results = [] | |
for key, value in self.long_term.items(): | |
if query.lower() in key.lower(): | |
results.append((key, value)) | |
return results | |
# --------------------------- | |
# Agent Hub | |
# --------------------------- | |
class AgentHub: | |
def __init__(self): | |
self.agents = {} | |
self.global_memory = AgentMemory() | |
self.session_id = str(uuid.uuid4()) | |
# Initialize NLP components | |
try: | |
self.tokenizer = AutoTokenizer.from_pretrained("distilgpt2") | |
self.model = AutoModelForCausalLM.from_pretrained("distilgpt2") | |
self.generator = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer) | |
logger.info("Initialized text generation pipeline with distilgpt2") | |
except Exception as e: | |
logger.error(f"Failed to initialize text generation: {e}") | |
self.generator = None | |
try: | |
self.summarizer = pipeline("summarization", model="facebook/bart-large-cnn") | |
logger.info("Initialized summarization pipeline") | |
except Exception as e: | |
logger.error(f"Failed to initialize summarizer: {e}") | |
self.summarizer = None | |
def register_agent(self, agent_id: str, agent_instance) -> None: | |
self.agents[agent_id] = agent_instance | |
logger.info(f"Registered agent: {agent_id}") | |
def get_agent(self, agent_id: str): | |
return self.agents.get(agent_id) | |
def broadcast(self, message: Dict[str, Any], exclude: Optional[List[str]] = None) -> Dict[str, List[Dict]]: | |
exclude = exclude or [] | |
responses = {} | |
for agent_id, agent in self.agents.items(): | |
if agent_id not in exclude: | |
try: | |
response = agent.process_message(message) | |
responses[agent_id] = response | |
except Exception as e: | |
logger.error(f"Error in agent {agent_id}: {e}") | |
responses[agent_id] = {"error": str(e)} | |
return responses | |
def chain_of_thought(self, initial_task: str, agent_sequence: List[str]) -> Dict[str, Any]: | |
results = {"final_output": None, "chain_outputs": [], "errors": []} | |
current_input = initial_task | |
for agent_id in agent_sequence: | |
agent = self.get_agent(agent_id) | |
if not agent: | |
error = f"Agent {agent_id} not found" | |
results["errors"].append(error) | |
logger.error(error) | |
continue | |
try: | |
output = agent.process_task(current_input) | |
step_result = {"agent": agent_id, "input": current_input, "output": output} | |
results["chain_outputs"].append(step_result) | |
if isinstance(output, dict) and "text" in output: | |
current_input = output["text"] | |
elif isinstance(output, str): | |
current_input = output | |
else: | |
current_input = f"Result from {agent_id}: {type(output).__name__} object" | |
except Exception as e: | |
error = f"Error in agent {agent_id}: {str(e)}\n{traceback.format_exc()}" | |
results["errors"].append(error) | |
logger.error(error) | |
if results["chain_outputs"]: | |
last_output = results["chain_outputs"][-1]["output"] | |
results["final_output"] = last_output if isinstance(last_output, dict) else {"text": str(last_output)} | |
return results | |
# --------------------------- | |
# Intelligent Agent Base Class | |
# --------------------------- | |
class IntelligentAgent: | |
def __init__(self, agent_id: str, hub: AgentHub): | |
self.agent_id = agent_id | |
self.hub = hub | |
self.memory = AgentMemory() | |
logger.info(f"Initialized agent: {agent_id}") | |
def process_task(self, task: Any) -> Any: | |
raise NotImplementedError("Subclasses must implement process_task") | |
def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]: | |
logger.info(f"Agent {self.agent_id} received message: {message}") | |
self.memory.add_short_term({"timestamp": pd.Timestamp.now(), "message": message}) | |
return {"sender": self.agent_id, "received": True, "action": "acknowledge"} | |
def request_assistance(self, target_agent_id: str, data: Dict[str, Any]) -> Dict[str, Any]: | |
target_agent = self.hub.get_agent(target_agent_id) | |
if not target_agent: | |
logger.error(f"Agent {self.agent_id} requested unknown agent: {target_agent_id}") | |
return {"error": f"Agent {target_agent_id} not found"} | |
request = {"sender": self.agent_id, "type": "assistance_request", "data": data} | |
return target_agent.process_message(request) | |
def evaluate_result(self, result: Any) -> Dict[str, Any]: | |
success = result is not None | |
confidence = 0.8 if success else 0.2 | |
return {"success": success, "confidence": confidence, "timestamp": pd.Timestamp.now().isoformat()} | |
# --------------------------- | |
# Specialized Agent Implementations | |
# --------------------------- | |
class WebResearchAgent(IntelligentAgent): | |
def __init__(self, hub: AgentHub): | |
super().__init__("web_research", hub) | |
def process_task(self, task: str) -> Dict[str, Any]: | |
logger.info(f"WebResearchAgent processing: {task}") | |
search_term = task | |
if self.hub.summarizer: | |
try: | |
keywords = task.split() | |
if len(keywords) > 5: | |
summary = self.hub.summarizer(task, max_length=20, min_length=5, do_sample=False) | |
search_term = summary[0]['summary_text'] | |
else: | |
search_term = task | |
except Exception as e: | |
logger.error(f"Summarization error in WebResearchAgent: {e}") | |
search_term = task | |
try: | |
search_results = wikipedia.search(search_term) | |
if not search_results: | |
result = {"text": f"No Wikipedia pages found for '{task}'."} | |
self.memory.add_short_term({"task": task, "result": result, "success": False}) | |
return result | |
page_title = None | |
summary_text = None | |
error_details = [] | |
for candidate in search_results[:3]: | |
try: | |
summary_text = wikipedia.summary(candidate, sentences=5) | |
page_title = candidate | |
break | |
except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError) as e: | |
error_details.append(f"{candidate}: {str(e)}") | |
continue | |
if not summary_text: | |
result = {"text": f"Failed to get Wikipedia summary for '{task}'. Errors: {'; '.join(error_details)}", "search_results": search_results} | |
self.memory.add_short_term({"task": task, "result": result, "success": False}) | |
return result | |
self.memory.add_long_term(f"research:{search_term}", {"page_title": page_title, "summary": summary_text, "timestamp": pd.Timestamp.now().isoformat()}) | |
result = {"text": f"Research on '{page_title}':\n{summary_text}", "page_title": page_title, "related_topics": search_results[:5], "source": "Wikipedia"} | |
self.memory.add_short_term({"task": task, "result": result, "success": True}) | |
return result | |
except Exception as e: | |
error_msg = f"Error in web research: {str(e)}" | |
logger.error(error_msg) | |
result = {"text": error_msg, "error": str(e)} | |
self.memory.add_short_term({"task": task, "result": result, "success": False}) | |
return result | |
class WebScraperAgent(IntelligentAgent): | |
def __init__(self, hub: AgentHub): | |
super().__init__("web_scraper", hub) | |
def process_task(self, task: str) -> Dict[str, Any]: | |
logger.info(f"WebScraperAgent processing URL: {task}") | |
if not task.startswith(('http://', 'https://')): | |
return {"text": "Invalid URL format. Please provide a URL starting with http:// or https://"} | |
try: | |
headers = {'User-Agent': 'Mozilla/5.0'} | |
response = requests.get(task, headers=headers, timeout=10) | |
if response.status_code != 200: | |
result = {"text": f"Error: received status code {response.status_code} from {task}"} | |
self.memory.add_short_term({"url": task, "result": result, "success": False}) | |
return result | |
soup = BeautifulSoup(response.text, 'html.parser') | |
title = soup.title.string.strip() if soup.title and soup.title.string else "No title found" | |
main_content = soup.find('main') or soup.find(id='content') or soup.find(class_='content') | |
paras = main_content.find_all('p') if main_content else soup.find_all('p') | |
content = "\n".join([p.get_text().strip() for p in paras if len(p.get_text().strip()) > 50]) | |
if len(content) > 2000 and self.hub.summarizer: | |
chunks = [content[i:i+1000] for i in range(0, len(content), 1000)] | |
summarized_chunks = [] | |
for chunk in chunks: | |
summary = self.hub.summarizer(chunk, max_length=100, min_length=30, do_sample=False) | |
summarized_chunks.append(summary[0]['summary_text']) | |
content = "\n".join(summarized_chunks) | |
elif len(content) > 2000: | |
content = content[:2000] + "... (content truncated)" | |
links = [] | |
for a in soup.find_all('a', href=True): | |
href = a['href'] | |
if href.startswith('http') and len(links) < 5: | |
links.append({"url": href, "text": a.get_text().strip() or href}) | |
result = {"text": f"Content from {task}:\n\nTitle: {title}\n\n{content}", "title": title, "raw_content": content, "links": links, "source_url": task} | |
self.memory.add_short_term({"url": task, "result": result, "success": True}) | |
self.memory.add_long_term(f"scraped:{task}", {"title": title, "content_preview": content[:200], "timestamp": pd.Timestamp.now().isoformat()}) | |
return result | |
except requests.RequestException as e: | |
error_msg = f"Request error for {task}: {str(e)}" | |
logger.error(error_msg) | |
return {"text": error_msg, "error": str(e)} | |
except Exception as e: | |
error_msg = f"Error scraping {task}: {str(e)}" | |
logger.error(error_msg) | |
return {"text": error_msg, "error": str(e)} | |
class TextProcessingAgent(IntelligentAgent): | |
def __init__(self, hub: AgentHub): | |
super().__init__("text_processing", hub) | |
def process_task(self, task: str) -> Dict[str, Any]: | |
logger.info(f"TextProcessingAgent processing text ({len(task)} chars)") | |
if not task or len(task) < 10: | |
return {"text": "Text too short to process meaningfully."} | |
results = {} | |
words = task.split() | |
sentences = task.split('. ') | |
results["statistics"] = { | |
"character_count": len(task), | |
"word_count": len(words), | |
"estimated_sentences": len(sentences), | |
"average_word_length": sum(len(word) for word in words) / len(words) if words else 0 | |
} | |
if len(task) > 5000: | |
chunk_size = 500 | |
chunking_strategy = "character_blocks" | |
elif len(words) > 200: | |
chunk_size = 50 | |
chunking_strategy = "word_blocks" | |
else: | |
chunk_size = 5 | |
chunking_strategy = "sentence_blocks" | |
if chunking_strategy == "character_blocks": | |
chunks = [task[i:i+chunk_size] for i in range(0, len(task), chunk_size)] | |
elif chunking_strategy == "word_blocks": | |
chunks = [' '.join(words[i:i+chunk_size]) for i in range(0, len(words), chunk_size)] | |
else: | |
chunks = ['. '.join(sentences[i:i+chunk_size]) + '.' for i in range(0, len(sentences), chunk_size)] | |
results["chunks"] = chunks | |
results["chunking_strategy"] = chunking_strategy | |
if self.hub.summarizer and len(task) > 200: | |
try: | |
task_for_summary = task[:1000] if len(task) > 1000 else task | |
summary = self.hub.summarizer(task_for_summary, max_length=100, min_length=30, do_sample=False) | |
results["summary"] = summary[0]['summary_text'] | |
except Exception as e: | |
logger.error(f"Summarization error: {e}") | |
results["summary_error"] = str(e) | |
stop_words = set(['the', 'a', 'an', 'and', 'in', 'on', 'at', 'to', 'for', 'of', 'with']) | |
word_freq = {} | |
for word in words: | |
w = word.lower().strip('.,!?:;()-"\'') | |
if w and w not in stop_words and len(w) > 1: | |
word_freq[w] = word_freq.get(w, 0) + 1 | |
results["frequent_words"] = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:10] | |
positive_words = set(['good', 'great', 'excellent', 'positive', 'happy', 'best', 'better', 'success']) | |
negative_words = set(['bad', 'worst', 'terrible', 'negative', 'sad', 'problem', 'fail', 'issue']) | |
pos_count = sum(1 for word in words if word.lower().strip('.,!?:;()-"\'') in positive_words) | |
neg_count = sum(1 for word in words if word.lower().strip('.,!?:;()-"\'') in negative_words) | |
sentiment = "possibly positive" if pos_count > neg_count and pos_count > 2 else ("possibly negative" if neg_count > pos_count and neg_count > 2 else "neutral or mixed") | |
results["basic_sentiment"] = {"assessment": sentiment, "positive_word_count": pos_count, "negative_word_count": neg_count} | |
self.memory.add_short_term({"task_preview": task[:100] + "..." if len(task) > 100 else task, "word_count": results["statistics"]["word_count"], "result": results}) | |
text_response = ( | |
f"Text Analysis Results:\n- {results['statistics']['word_count']} words, {results['statistics']['character_count']} characters\n" | |
f"- Split into {len(chunks)} chunks using {chunking_strategy}\n" | |
) | |
if "summary" in results: | |
text_response += f"\nSummary:\n{results['summary']}\n" | |
if results["frequent_words"]: | |
text_response += "\nMost frequent words:\n" | |
for word, count in results["frequent_words"][:5]: | |
text_response += f"- {word}: {count} occurrences\n" | |
text_response += f"\nOverall tone appears {results['basic_sentiment']['assessment']}" | |
results["text"] = text_response | |
return results | |
class DataAnalysisAgent(IntelligentAgent): | |
def __init__(self, hub: AgentHub): | |
super().__init__("data_analysis", hub) | |
def process_task(self, task: str) -> Dict[str, Any]: | |
logger.info(f"DataAnalysisAgent processing: {task}") | |
file_path = None | |
if "analyze" in task.lower() and ".csv" in task.lower(): | |
for word in task.split(): | |
if word.endswith('.csv'): | |
file_path = word | |
break | |
if not file_path or not Path(file_path).exists(): | |
logger.info("No specific CSV file mentioned or file not found, creating sample data") | |
if "time series" in task.lower(): | |
dates = pd.date_range(start='2023-01-01', periods=30, freq='D') | |
df = pd.DataFrame({'date': dates, 'value': np.random.normal(100, 15, 30), 'trend': np.linspace(0, 20, 30) + np.random.normal(0, 2, 30)}) | |
file_path = "sample_timeseries.csv" | |
elif "sales" in task.lower(): | |
products = ['ProductA', 'ProductB', 'ProductC', 'ProductD'] | |
regions = ['North', 'South', 'East', 'West'] | |
dates = pd.date_range(start='2023-01-01', periods=50, freq='D') | |
data = [] | |
for _ in range(200): | |
data.append({'date': np.random.choice(dates), 'product': np.random.choice(products), 'region': np.random.choice(regions), 'units_sold': np.random.randint(10, 100), 'revenue': np.random.uniform(100, 1000)}) | |
df = pd.DataFrame(data) | |
file_path = "sample_sales.csv" | |
else: | |
df = pd.DataFrame({ | |
'A': np.random.normal(0, 1, 100), | |
'B': np.random.normal(5, 2, 100), | |
'C': np.random.uniform(-10, 10, 100), | |
'D': np.random.randint(0, 5, 100), | |
'label': np.random.choice(['X', 'Y', 'Z'], 100) | |
}) | |
file_path = "sample_data.csv" | |
df.to_csv(file_path, index=False) | |
logger.info(f"Created sample data file: {file_path}") | |
else: | |
try: | |
df = pd.read_csv(file_path) | |
logger.info(f"Loaded existing file: {file_path}") | |
except Exception as e: | |
error_msg = f"Error loading CSV file {file_path}: {str(e)}" | |
logger.error(error_msg) | |
return {"text": error_msg, "error": str(e)} | |
analysis_results = {} | |
try: | |
numeric_cols = df.select_dtypes(include=[np.number]).columns | |
analysis_results["summary_stats"] = df[numeric_cols].describe().to_dict() | |
categorical_cols = df.select_dtypes(exclude=[np.number]).columns | |
for col in categorical_cols: | |
if df[col].nunique() < 10: | |
analysis_results[f"{col}_distribution"] = df[col].value_counts().to_dict() | |
except Exception as e: | |
logger.error(f"Error in basic statistics: {e}") | |
analysis_results["stats_error"] = str(e) | |
try: | |
missing_values = df.isnull().sum().to_dict() | |
analysis_results["missing_values"] = {k: v for k, v in missing_values.items() if v > 0} | |
except Exception as e: | |
logger.error(f"Error in missing values analysis: {e}") | |
analysis_results["missing_values_error"] = str(e) | |
try: | |
if len(numeric_cols) > 1: | |
analysis_results["correlations"] = df[numeric_cols].corr().to_dict() | |
except Exception as e: | |
logger.error(f"Error in correlation analysis: {e}") | |
analysis_results["correlation_error"] = str(e) | |
try: | |
plt.figure(figsize=(10, 8)) | |
categorical_cols = df.select_dtypes(exclude=[np.number]).columns | |
if len(numeric_cols) >= 2: | |
plt.subplot(2, 1, 1) | |
x_col, y_col = numeric_cols[0], numeric_cols[1] | |
sample_df = df.sample(1000) if len(df) > 1000 else df | |
if len(categorical_cols) > 0 and df[categorical_cols[0]].nunique() < 10: | |
cat_col = categorical_cols[0] | |
for category, group in sample_df.groupby(cat_col): | |
plt.scatter(group[x_col], group[y_col], label=category, alpha=0.6) | |
plt.legend() | |
else: | |
plt.scatter(sample_df[x_col], sample_df[y_col], alpha=0.6) | |
plt.xlabel(x_col) | |
plt.ylabel(y_col) | |
plt.title(f"Scatter Plot: {x_col} vs {y_col}") | |
plt.subplot(2, 1, 2) | |
if 'date' in df.columns or any('time' in col.lower() for col in df.columns): | |
date_col = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()][0] | |
value_col = numeric_cols[0] if numeric_cols[0] != date_col else numeric_cols[1] | |
if not pd.api.types.is_datetime64_dtype(df[date_col]): | |
df[date_col] = pd.to_datetime(df[date_col], errors='coerce') | |
temp_df = df.dropna(subset=[date_col, value_col]).sort_values(date_col) | |
plt.plot(temp_df[date_col], temp_df[value_col]) | |
plt.xlabel(date_col) | |
plt.ylabel(value_col) | |
plt.title(f"Time Series: {value_col} over {date_col}") | |
plt.xticks(rotation=45) | |
else: | |
plt.hist(df[numeric_cols[0]].dropna(), bins=20, alpha=0.7) | |
plt.xlabel(numeric_cols[0]) | |
plt.ylabel('Frequency') | |
plt.title(f"Distribution of {numeric_cols[0]}") | |
else: | |
if len(categorical_cols) > 0: | |
cat_col = categorical_cols[0] | |
df[cat_col].value_counts().plot(kind='bar') | |
plt.xlabel(cat_col) | |
plt.ylabel('Count') | |
plt.title(f"Counts by {cat_col}") | |
plt.xticks(rotation=45) | |
else: | |
plt.hist(df[numeric_cols[0]].dropna(), bins=20) | |
plt.xlabel(numeric_cols[0]) | |
plt.ylabel('Frequency') | |
plt.title(f"Distribution of {numeric_cols[0]}") | |
plt.tight_layout() | |
viz_path = f"{Path(file_path).stem}_viz.png" | |
plt.savefig(viz_path) | |
plt.close() | |
analysis_results["visualization_path"] = viz_path | |
analysis_results["visualization_created"] = True | |
logger.info(f"Created visualization: {viz_path}") | |
except Exception as e: | |
logger.error(f"Error creating visualization: {e}") | |
analysis_results["visualization_error"] = str(e) | |
analysis_results["visualization_created"] = False | |
insights = [] | |
try: | |
for col in numeric_cols: | |
q1 = df[col].quantile(0.25) | |
q3 = df[col].quantile(0.75) | |
iqr = q3 - q1 | |
outlier_count = ((df[col] < (q1 - 1.5 * iqr)) | (df[col] > (q3 + 1.5 * iqr))).sum() | |
if outlier_count > 0: | |
insights.append(f"Found {outlier_count} potential outliers in '{col}'") | |
if "correlations" in analysis_results: | |
for col1, corr_dict in analysis_results["correlations"].items(): | |
for col2, corr_val in corr_dict.items(): | |
if col1 != col2 and abs(corr_val) > 0.7: | |
insights.append(f"Strong correlation ({corr_val:.2f}) between '{col1}' and '{col2}'") | |
for col in categorical_cols: | |
if df[col].nunique() < 10: | |
value_counts = df[col].value_counts() | |
most_common = value_counts.idxmax() | |
most_common_pct = value_counts.max() / value_counts.sum() * 100 | |
if most_common_pct > 80: | |
insights.append(f"Imbalanced category in '{col}': '{most_common}' accounts for {most_common_pct:.1f}% of data") | |
analysis_results["insights"] = insights | |
except Exception as e: | |
logger.error(f"Error extracting insights: {e}") | |
analysis_results["insights_error"] = str(e) | |
self.memory.add_short_term({"file": file_path, "columns": list(df.columns), "row_count": len(df), "analysis": analysis_results}) | |
if "sample" in file_path: | |
self.memory.add_long_term(f"analysis:{file_path}", {"file": file_path, "type": "generated", "columns": list(df.columns), "row_count": len(df), "timestamp": pd.Timestamp.now().isoformat()}) | |
column_list = ", ".join(df.columns[:5]) + (", ..." if len(df.columns) > 5 else "") | |
text_response = ( | |
f"Data Analysis Results for {file_path}\n- Dataset: {len(df)} rows x {len(df.columns)} columns ({column_list})\n" | |
) | |
if "missing_values" in analysis_results and analysis_results["missing_values"]: | |
text_response += f"- Missing values found in {len(analysis_results['missing_values'])} columns\n" | |
if insights: | |
text_response += "\nKey Insights:\n" | |
for i, insight in enumerate(insights[:5], 1): | |
text_response += f"{i}. {insight}\n" | |
if len(insights) > 5: | |
text_response += f"... and {len(insights) - 5} more insights\n" | |
text_response += f"\nVisualization saved to {viz_path}" if analysis_results.get("visualization_created") else "\nNo visualization created" | |
analysis_results["text"] = text_response | |
analysis_results["dataframe_shape"] = df.shape | |
analysis_results["data_preview"] = df.head(5).to_dict() | |
return analysis_results | |
class CodingAssistantAgent(IntelligentAgent): | |
def __init__(self, hub: AgentHub): | |
super().__init__("coding_assistant", hub) | |
self.code_snippets = { | |
"file_operations": { | |
"read_file": ''' | |
def read_file(file_path): | |
"""Read a file and return its contents""" | |
with open(file_path, 'r') as file: | |
return file.read() | |
''', | |
"write_file": ''' | |
def write_file(file_path, content): | |
"""Write content to a file""" | |
with open(file_path, 'w') as file: | |
file.write(content) | |
return True | |
''' | |
}, | |
"data_processing": { | |
"pandas_read_csv": ''' | |
import pandas as pd | |
def load_csv(file_path): | |
"""Load a CSV file into a Pandas DataFrame""" | |
return pd.read_csv(file_path) | |
''', | |
"pandas_basic_stats": ''' | |
def get_basic_stats(df): | |
"""Get basic statistics for a DataFrame""" | |
numeric_stats = df.describe() | |
categorical_columns = df.select_dtypes(include=['object']).columns | |
categorical_stats = {col: df[col].value_counts().to_dict() for col in categorical_columns} | |
return { | |
'numeric': numeric_stats.to_dict(), | |
'categorical': categorical_stats | |
} | |
''' | |
}, | |
"visualization": { | |
"matplotlib_basic_plot": ''' | |
import matplotlib.pyplot as plt | |
def create_basic_plot(data, x_col, y_col, title="Plot", kind="line"): | |
"""Create a basic plot using matplotlib""" | |
plt.figure(figsize=(10, 6)) | |
if kind == "line": | |
plt.plot(data[x_col], data[y_col]) | |
elif kind == "scatter": | |
plt.scatter(data[x_col], data[y_col]) | |
elif kind == "bar": | |
plt.bar(data[x_col], data[y_col]) | |
plt.title(title) | |
plt.xlabel(x_col) | |
plt.ylabel(y_col) | |
plt.tight_layout() | |
plt.savefig(f"{title.lower().replace(' ', '_')}.png") | |
plt.close() | |
return f"{title.lower().replace(' ', '_')}.png" | |
''' | |
}, | |
"web_scraping": { | |
"requests_beautifulsoup": ''' | |
import requests | |
from bs4 import BeautifulSoup | |
def scrape_webpage(url): | |
"""Scrape a webpage and extract text from paragraphs""" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
paragraphs = soup.find_all('p') | |
text = [p.get_text() for p in paragraphs] | |
return { | |
'title': soup.title.string if soup.title else "No title", | |
'text': text, | |
'url': url | |
} | |
except Exception as e: | |
return {'error': str(e), 'url': url} | |
''' | |
}, | |
"nlp": { | |
"basic_text_analysis": ''' | |
from collections import Counter | |
import re | |
def analyze_text(text): | |
"""Perform basic text analysis""" | |
text = text.lower() | |
words = re.findall(r'\w+', text) | |
word_count = len(words) | |
unique_words = len(set(words)) | |
stop_words = {'the', 'a', 'an', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'and', 'or'} | |
word_freq = Counter([w for w in words if w not in stop_words and len(w) > 1]) | |
return { | |
'word_count': word_count, | |
'unique_words': unique_words, | |
'avg_word_length': sum(len(w) for w in words) / word_count if word_count else 0, | |
'most_common': word_freq.most_common(10) | |
} | |
''' | |
}, | |
"machine_learning": { | |
"basic_classifier": ''' | |
from sklearn.model_selection import train_test_split | |
from sklearn.ensemble import RandomForestClassifier | |
from sklearn.metrics import classification_report | |
def train_basic_classifier(X, y, test_size=0.2, random_state=42): | |
"""Train a basic RandomForest classifier""" | |
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state) | |
model = RandomForestClassifier(n_estimators=100, random_state=random_state) | |
model.fit(X_train, y_train) | |
y_pred = model.predict(X_test) | |
report = classification_report(y_test, y_pred, output_dict=True) | |
return { | |
'model': model, | |
'accuracy': report['accuracy'], | |
'classification_report': report, | |
'feature_importance': dict(zip(range(X.shape[1]), model.feature_importances_)) | |
} | |
''' | |
} | |
} | |
def process_task(self, task: str) -> Dict[str, Any]: | |
logger.info(f"CodingAssistantAgent processing: {task}") | |
task_lower = task.lower() | |
keyword_mapping = { | |
"file": "file_operations", | |
"read file": "file_operations", | |
"write file": "file_operations", | |
"csv": "data_processing", | |
"data": "data_processing", | |
"pandas": "data_processing", | |
"dataframe": "data_processing", | |
"plot": "visualization", | |
"chart": "visualization", | |
"graph": "visualization", | |
"visualize": "visualization", | |
"matplotlib": "visualization", | |
"scrape": "web_scraping", | |
"web": "web_scraping", | |
"html": "web_scraping", | |
"beautifulsoup": "web_scraping", | |
"text analysis": "nlp", | |
"nlp": "nlp", | |
"natural language": "nlp", | |
"word count": "nlp", | |
"text processing": "nlp", | |
"machine learning": "machine_learning", | |
"ml": "machine_learning", | |
"model": "machine_learning", | |
"predict": "machine_learning", | |
"classifier": "machine_learning" | |
} | |
code_category = None | |
function_name = None | |
for keyword, category in keyword_mapping.items(): | |
if keyword in task_lower: | |
code_category = category | |
for func_name in self.code_snippets.get(category, {}): | |
natural_func = func_name.replace('_', ' ') | |
if natural_func in task_lower: | |
function_name = func_name | |
break | |
break | |
if not code_category: | |
if any(word in task_lower for word in ["add", "sum", "calculate", "compute"]): | |
code_category = "data_processing" | |
elif any(word in task_lower for word in ["show", "display", "generate"]): | |
code_category = "visualization" | |
if code_category and not function_name and self.code_snippets.get(code_category): | |
function_name = next(iter(self.code_snippets[code_category])) | |
if not code_category: | |
function_parts = [word for word in task_lower.split() if word not in ["a", "the", "an", "to", "for", "function", "code", "create", "make"]] | |
func_name = "_".join(function_parts[:2]) if len(function_parts) >= 2 else "custom_function" | |
custom_code = f""" | |
def {func_name}(input_data): | |
# Custom function based on your request: '{task}' | |
result = None | |
# TODO: Implement specific logic based on requirements | |
if isinstance(input_data, list): | |
result = len(input_data) | |
elif isinstance(input_data, str): | |
result = input_data.upper() | |
elif isinstance(input_data, (int, float)): | |
result = input_data * 2 | |
return {{ | |
'input': input_data, | |
'result': result, | |
'status': 'processed' | |
}} | |
""" | |
result = { | |
"text": f"I've created a custom function template based on your request:\n\n```python\n{custom_code}\n```\n\nThis is a starting point you can customize further.", | |
"code": custom_code, | |
"language": "python", | |
"type": "custom" | |
} | |
else: | |
code_snippet = self.code_snippets[code_category][function_name] | |
result = { | |
"text": f"Here's a {code_category.replace('_', ' ')} function for {function_name.replace('_', ' ')}:\n\n```python\n{code_snippet}\n```\n\nYou can customize this code.", | |
"code": code_snippet, | |
"language": "python", | |
"category": code_category, | |
"function": function_name | |
} | |
self.memory.add_short_term({"task": task, "code_category": code_category, "function_provided": function_name, "timestamp": pd.Timestamp.now().isoformat()}) | |
return result | |
class ImageProcessingAgent(IntelligentAgent): | |
def __init__(self, hub: AgentHub): | |
super().__init__("image_processing", hub) | |
def process_task(self, task: Any) -> Dict[str, Any]: | |
logger.info("ImageProcessingAgent processing task") | |
image = None | |
task_type = None | |
if isinstance(task, Image.Image): | |
image = task | |
task_type = "direct_image" | |
elif isinstance(task, str): | |
if Path(task).exists() and Path(task).suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']: | |
try: | |
image = Image.open(task) | |
task_type = "image_path" | |
except Exception as e: | |
return {"text": f"Error loading image from {task}: {str(e)}", "error": str(e)} | |
else: | |
task_type = "text_instruction" | |
elif isinstance(task, dict) and 'image' in task: | |
if isinstance(task['image'], Image.Image): | |
image = task['image'] | |
elif isinstance(task['image'], str) and Path(task['image']).exists(): | |
try: | |
image = Image.open(task['image']) | |
except Exception as e: | |
return {"text": f"Error loading image from {task['image']}: {str(e)}", "error": str(e)} | |
task_type = "dict_with_image" | |
if task_type == "text_instruction" and not image: | |
return {"text": "Please provide an image to process along with instructions."} | |
if not image: | |
return {"text": "No valid image provided for processing."} | |
processing_type = "edge_detection" | |
if task_type in ["text_instruction", "dict_with_image"] and isinstance(task, dict): | |
instruction = task.get('instruction', '').lower() | |
if 'blur' in instruction or 'smooth' in instruction: | |
processing_type = "blur" | |
elif 'edge' in instruction or 'contour' in instruction: | |
processing_type = "edge_detection" | |
elif 'gray' in instruction or 'greyscale' in instruction or 'black and white' in instruction: | |
processing_type = "grayscale" | |
elif 'bright' in instruction or 'contrast' in instruction: | |
processing_type = "enhance" | |
elif 'resize' in instruction or 'scale' in instruction: | |
processing_type = "resize" | |
try: | |
img_array = np.array(image) | |
if img_array.ndim == 3 and img_array.shape[-1] == 4: | |
img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGBA2BGR) | |
else: | |
img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) | |
processed_img = None | |
processing_details = {"original_size": image.size} | |
if processing_type == "edge_detection": | |
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) | |
edges = cv2.Canny(gray, 100, 200) | |
processed_img = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) | |
processing_details["processing"] = "Edge detection using Canny" | |
elif processing_type == "blur": | |
processed_img = cv2.GaussianBlur(img_cv, (7, 7), 0) | |
processing_details["processing"] = "Gaussian Blur" | |
elif processing_type == "grayscale": | |
processed_img = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) | |
processed_img = cv2.cvtColor(processed_img, cv2.COLOR_GRAY2BGR) | |
processing_details["processing"] = "Grayscale conversion" | |
elif processing_type == "enhance": | |
lab = cv2.cvtColor(img_cv, cv2.COLOR_BGR2LAB) | |
l, a, b = cv2.split(lab) | |
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) | |
cl = clahe.apply(l) | |
limg = cv2.merge((cl, a, b)) | |
processed_img = cv2.cvtColor(limg, cv2.COLOR_LAB2BGR) | |
processing_details["processing"] = "Contrast enhancement" | |
elif processing_type == "resize": | |
processed_img = cv2.resize(img_cv, (image.size[0]//2, image.size[1]//2)) | |
processing_details["processing"] = "Resized to half" | |
else: | |
processed_img = img_cv | |
processing_details["processing"] = "No processing applied" | |
processed_pil = Image.fromarray(cv2.cvtColor(processed_img, cv2.COLOR_BGR2RGB)) | |
return {"text": f"Image processing completed with {processing_details['processing']}.", "image": processed_pil, "details": processing_details} | |
except Exception as e: | |
error_msg = f"Error processing image: {str(e)}\n{traceback.format_exc()}" | |
logger.error(error_msg) | |
return {"text": f"Error processing image: {str(e)}", "error": str(e)} | |
class FileManagementAgent(IntelligentAgent): | |
def __init__(self, hub: AgentHub): | |
super().__init__("file_management", hub) | |
def process_task(self, task: str) -> Dict[str, Any]: | |
logger.info(f"FileManagementAgent processing: {task}") | |
task_lower = task.lower() | |
if any(word in task_lower for word in ["create", "make", "generate", "write"]): | |
operation = "create" | |
elif any(word in task_lower for word in ["read", "open", "show", "display", "content"]): | |
operation = "read" | |
elif any(word in task_lower for word in ["list", "find", "directory", "folder", "files in"]): | |
operation = "list" | |
elif any(word in task_lower for word in ["delete", "remove"]): | |
operation = "delete" | |
else: | |
operation = "unknown" | |
filename = None | |
file_extensions = ['.txt', '.json', '.csv', '.md', '.py', '.html', '.js', '.css'] | |
words = task.split() | |
for word in words: | |
for ext in file_extensions: | |
if ext in word.lower(): | |
filename = word.strip(':"\'.,;') | |
break | |
if filename: | |
break | |
if not filename: | |
file_keywords = ["file", "named", "called", "filename"] | |
for i, word in enumerate(words): | |
if word.lower() in file_keywords and i < len(words) - 1: | |
potential_name = words[i+1].strip(':"\'.,;') | |
if '.' not in potential_name: | |
if "json" in task_lower: | |
potential_name += ".json" | |
elif "csv" in task_lower: | |
potential_name += ".csv" | |
elif "python" in task_lower or "py" in task_lower: | |
potential_name += ".py" | |
else: | |
potential_name += ".txt" | |
filename = potential_name | |
break | |
if not filename: | |
if "json" in task_lower: | |
filename = f"data_{uuid.uuid4().hex[:6]}.json" | |
elif "csv" in task_lower: | |
filename = f"data_{uuid.uuid4().hex[:6]}.csv" | |
elif "python" in task_lower or "py" in task_lower: | |
filename = f"script_{uuid.uuid4().hex[:6]}.py" | |
elif "log" in task_lower: | |
filename = f"log_{uuid.uuid4().hex[:6]}.txt" | |
else: | |
filename = f"file_{uuid.uuid4().hex[:6]}.txt" | |
result = {} | |
if operation == "create": | |
if filename.endswith('.json'): | |
content = json.dumps({ | |
"name": "Sample Data", | |
"description": task, | |
"created": pd.Timestamp.now().isoformat(), | |
"values": [1, 2, 3, 4, 5], | |
"metadata": {"source": "FileManagementAgent", "version": "1.0"} | |
}, indent=2) | |
elif filename.endswith('.csv'): | |
content = "id,name,value,timestamp\n" | |
for i in range(5): | |
content += f"{i+1},Item{i+1},{np.random.randint(1, 100)},{pd.Timestamp.now().isoformat()}\n" | |
elif filename.endswith('.py'): | |
content = f"""# Generated Python Script: {filename} | |
# Created: {pd.Timestamp.now().isoformat()} | |
# Description: {task} | |
def main(): | |
print("Hello from the FileManagementAgent!") | |
data = [1, 2, 3, 4, 5] | |
result = sum(data) | |
print(f"Sample calculation: sum(data) = {{result}}") | |
return result | |
if __name__ == "__main__": | |
main() | |
""" | |
else: | |
content = f"File created by FileManagementAgent\nCreated: {pd.Timestamp.now().isoformat()}\nBased on request: {task}\n\nThis is sample content." | |
try: | |
with open(filename, 'w', encoding='utf-8') as f: | |
f.write(content) | |
result = {"text": f"Successfully created file: {filename}", "operation": "create", "filename": filename, "size": len(content), "preview": content[:200] + "..." if len(content) > 200 else content} | |
self.memory.add_short_term({"operation": "create", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()}) | |
self.memory.add_long_term(f"file:{filename}", {"operation": "create", "type": Path(filename).suffix, "timestamp": pd.Timestamp.now().isoformat()}) | |
except Exception as e: | |
error_msg = f"Error creating file {filename}: {str(e)}" | |
logger.error(error_msg) | |
result = {"text": error_msg, "error": str(e)} | |
elif operation == "read": | |
if not filename: | |
result = {"text": "Please specify a filename to read."} | |
elif not Path(filename).exists(): | |
result = {"text": f"File '{filename}' not found."} | |
else: | |
try: | |
with open(filename, 'r', encoding='utf-8') as f: | |
content = f.read() | |
result = {"text": f"Content of {filename}:\n\n{content}", "operation": "read", "filename": filename, "content": content, "size": len(content)} | |
self.memory.add_short_term({"operation": "read", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()}) | |
except Exception as e: | |
error_msg = f"Error reading file {filename}: {str(e)}" | |
logger.error(error_msg) | |
result = {"text": error_msg, "error": str(e)} | |
elif operation == "list": | |
try: | |
directory = "." | |
for term in ["directory", "folder", "in"]: | |
if term in task_lower: | |
parts = task_lower.split(term) | |
if len(parts) > 1: | |
potential_dir = parts[1].strip().split()[0].strip(':"\'.,;') | |
if Path(potential_dir).exists() and Path(potential_dir).is_dir(): | |
directory = potential_dir | |
extension_filter = None | |
for ext in file_extensions: | |
if ext in task_lower: | |
extension_filter = ext | |
break | |
files = list(Path(directory).glob('*' + (extension_filter or ''))) | |
file_groups = {} | |
for file in files: | |
file_groups.setdefault(file.suffix, []).append({ | |
"name": file.name, | |
"size": file.stat().st_size, | |
"modified": pd.Timestamp(file.stat().st_mtime, unit='s').isoformat() | |
}) | |
response_text = f"Found {len(files)} files" + (f" with extension {extension_filter}" if extension_filter else "") + f" in {directory}:\n\n" | |
for ext, group in file_groups.items(): | |
response_text += f"{ext} files ({len(group)}):\n" | |
for file_info in sorted(group, key=lambda x: x["name"]): | |
size_kb = file_info["size"] / 1024 | |
response_text += f"- {file_info['name']} ({size_kb:.1f} KB, modified: {file_info['modified']})\n" | |
response_text += "\n" | |
result = {"text": response_text, "operation": "list", "directory": directory, "file_count": len(files), "files": file_groups} | |
self.memory.add_short_term({"operation": "list", "directory": directory, "file_count": len(files), "timestamp": pd.Timestamp.now().isoformat()}) | |
except Exception as e: | |
error_msg = f"Error listing files: {str(e)}" | |
logger.error(error_msg) | |
result = {"text": error_msg, "error": str(e)} | |
elif operation == "delete": | |
if not filename: | |
result = {"text": "Please specify a filename to delete."} | |
elif not Path(filename).exists(): | |
result = {"text": f"File '{filename}' not found."} | |
else: | |
try: | |
os.remove(filename) | |
result = {"text": f"Successfully deleted file: {filename}", "operation": "delete", "filename": filename} | |
self.memory.add_short_term({"operation": "delete", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()}) | |
self.memory.add_long_term(f"file:{filename}", {"operation": "delete", "timestamp": pd.Timestamp.now().isoformat()}) | |
except Exception as e: | |
error_msg = f"Error deleting file {filename}: {str(e)}" | |
logger.error(error_msg) | |
result = {"text": error_msg, "error": str(e)} | |
else: | |
result = {"text": f"Unknown operation requested in task: {task}"} | |
return result | |
# --------------------------- | |
# Gradio Interface Setup | |
# --------------------------- | |
def create_agent_hub(): | |
hub = AgentHub() | |
hub.register_agent("web_research", WebResearchAgent(hub)) | |
hub.register_agent("web_scraper", WebScraperAgent(hub)) | |
hub.register_agent("text_processing", TextProcessingAgent(hub)) | |
hub.register_agent("data_analysis", DataAnalysisAgent(hub)) | |
hub.register_agent("coding_assistant", CodingAssistantAgent(hub)) | |
hub.register_agent("image_processing", ImageProcessingAgent(hub)) | |
hub.register_agent("file_management", FileManagementAgent(hub)) | |
return hub | |
def create_gradio_interface(): | |
hub = create_agent_hub() | |
def process_request(request_type, input_data, extra_data=""): | |
try: | |
if request_type == "chain": | |
agent_sequence = [agent.strip() for agent in extra_data.split(",") if agent.strip()] | |
return hub.chain_of_thought(input_data, agent_sequence) | |
else: | |
agent = hub.get_agent(request_type) | |
if not agent: | |
return {"error": f"Unknown agent type: {request_type}"} | |
return agent.process_task(input_data) | |
except Exception as e: | |
logger.error(f"Error processing request: {e}") | |
return {"error": str(e)} | |
with gr.Blocks(title="SmolAgents Toolbelt") as interface: | |
gr.Markdown("# SmolAgents Toolbelt") | |
gr.Markdown("A collection of specialized agents for various tasks with evolved logic :contentReference[oaicite:0]{index=0}.") | |
with gr.Tabs(): | |
with gr.Tab("Single Agent"): | |
agent_type = gr.Dropdown( | |
choices=["web_research", "web_scraper", "text_processing", "data_analysis", "coding_assistant", "image_processing", "file_management"], | |
label="Select Agent", | |
value="web_research" | |
) | |
with gr.Row(): | |
input_text = gr.Textbox(label="Input", placeholder="Enter your request...") | |
extra_input = gr.Textbox(label="Extra (e.g., image path or additional info)", placeholder="Optional extra input...") | |
output_text = gr.JSON(label="Output") | |
process_btn = gr.Button("Process") | |
process_btn.click(fn=process_request, inputs=[agent_type, input_text, extra_input], outputs=output_text) | |
with gr.Tab("Chain of Thought"): | |
chain_input = gr.Textbox(label="Input", placeholder="Enter your request for the chain...") | |
chain_sequence = gr.Textbox(label="Agent Sequence", placeholder="Comma-separated agent names (e.g., text_processing,data_analysis)") | |
chain_output = gr.JSON(label="Chain Output") | |
chain_type = gr.State("chain") | |
chain_btn = gr.Button("Process Chain") | |
chain_btn.click(fn=process_request, inputs=[chain_type, chain_input, chain_sequence], outputs=chain_output) | |
with gr.Tab("Help"): | |
gr.Markdown(""" | |
## Available Agents | |
- **Web Research Agent**: Searches Wikipedia for information. | |
- **Web Scraper Agent**: Scrapes content from provided URLs. | |
- **Text Processing Agent**: Analyzes and processes text. | |
- **Data Analysis Agent**: Performs data analysis and visualization. | |
- **Coding Assistant Agent**: Generates code snippets. | |
- **Image Processing Agent**: Processes images based on instructions. | |
- **File Management Agent**: Handles file creation, reading, listing, and deletion. | |
### Usage | |
1. Select an agent (or choose 'Chain of Thought' for a sequence). | |
2. Enter your request. | |
3. For chains, provide a comma-separated list of agent IDs. | |
""") | |
return interface | |
if __name__ == "__main__": | |
demo = create_gradio_interface() | |
demo.launch(server_name="0.0.0.0", server_port=7860, share=True) | |