project67 / app.py
SmokeyBandit's picture
Update app.py
584ca0e verified
raw
history blame
52.2 kB
import logging
import os
import sys
from pathlib import Path
import json
import io
import uuid
import traceback
from typing import Dict, List, Any, Tuple, Optional
from dataclasses import dataclass
# Set UTF-8 encoding for Windows
if sys.platform == 'win32':
os.environ["PYTHONIOENCODING"] = "utf-8"
import gradio as gr
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
from sklearn.datasets import load_iris
import cv2
from PIL import Image
# Additional libraries for web research & scraping
import wikipedia
import requests
from bs4 import BeautifulSoup
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ---------------------------
# Agent Context & Memory System
# ---------------------------
@dataclass
class AgentMemory:
short_term: List[Dict[str, Any]] = None
long_term: Dict[str, Any] = None
def __post_init__(self):
if self.short_term is None:
self.short_term = []
if self.long_term is None:
self.long_term = {}
def add_short_term(self, data: Dict[str, Any]) -> None:
self.short_term.append(data)
if len(self.short_term) > 10:
self.short_term.pop(0)
def add_long_term(self, key: str, value: Any) -> None:
self.long_term[key] = value
def get_recent_context(self, n: int = 3) -> List[Dict[str, Any]]:
return self.short_term[-n:] if len(self.short_term) >= n else self.short_term
def search_long_term(self, query: str) -> List[Tuple[str, Any]]:
results = []
for key, value in self.long_term.items():
if query.lower() in key.lower():
results.append((key, value))
return results
# ---------------------------
# Agent Hub
# ---------------------------
class AgentHub:
def __init__(self):
self.agents = {}
self.global_memory = AgentMemory()
self.session_id = str(uuid.uuid4())
try:
self.tokenizer = AutoTokenizer.from_pretrained("distilgpt2")
self.model = AutoModelForCausalLM.from_pretrained("distilgpt2")
self.generator = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer)
logger.info("Initialized text generation pipeline with distilgpt2")
except Exception as e:
logger.error(f"Failed to initialize text generation: {e}")
self.generator = None
try:
self.summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
logger.info("Initialized summarization pipeline")
except Exception as e:
logger.error(f"Failed to initialize summarizer: {e}")
self.summarizer = None
def register_agent(self, agent_id: str, agent_instance) -> None:
self.agents[agent_id] = agent_instance
logger.info(f"Registered agent: {agent_id}")
def get_agent(self, agent_id: str):
return self.agents.get(agent_id)
def broadcast(self, message: Dict[str, Any], exclude: Optional[List[str]] = None) -> Dict[str, List[Dict]]:
exclude = exclude or []
responses = {}
for agent_id, agent in self.agents.items():
if agent_id not in exclude:
try:
response = agent.process_message(message)
responses[agent_id] = response
except Exception as e:
logger.error(f"Error in agent {agent_id}: {e}")
responses[agent_id] = {"error": str(e)}
return responses
def chain_of_thought(self, initial_task: str, agent_sequence: List[str]) -> Dict[str, Any]:
results = {"final_output": None, "chain_outputs": [], "errors": []}
current_input = initial_task
for agent_id in agent_sequence:
agent = self.get_agent(agent_id)
if not agent:
error = f"Agent {agent_id} not found"
results["errors"].append(error)
logger.error(error)
continue
try:
output = agent.process_task(current_input)
step_result = {"agent": agent_id, "input": current_input, "output": output}
results["chain_outputs"].append(step_result)
if isinstance(output, dict) and "text" in output:
current_input = output["text"]
elif isinstance(output, str):
current_input = output
else:
current_input = f"Result from {agent_id}: {type(output).__name__} object"
except Exception as e:
error = f"Error in agent {agent_id}: {str(e)}\n{traceback.format_exc()}"
results["errors"].append(error)
logger.error(error)
if results["chain_outputs"]:
last_output = results["chain_outputs"][-1]["output"]
results["final_output"] = last_output if isinstance(last_output, dict) else {"text": str(last_output)}
return results
# ---------------------------
# Intelligent Agent Base Class
# ---------------------------
class IntelligentAgent:
def __init__(self, agent_id: str, hub: AgentHub):
self.agent_id = agent_id
self.hub = hub
self.memory = AgentMemory()
logger.info(f"Initialized agent: {agent_id}")
def process_task(self, task: Any) -> Any:
raise NotImplementedError("Subclasses must implement process_task")
def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
logger.info(f"Agent {self.agent_id} received message: {message}")
self.memory.add_short_term({"timestamp": pd.Timestamp.now(), "message": message})
return {"sender": self.agent_id, "received": True, "action": "acknowledge"}
def request_assistance(self, target_agent_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
target_agent = self.hub.get_agent(target_agent_id)
if not target_agent:
logger.error(f"Agent {self.agent_id} requested unknown agent: {target_agent_id}")
return {"error": f"Agent {target_agent_id} not found"}
request = {"sender": self.agent_id, "type": "assistance_request", "data": data}
return target_agent.process_message(request)
def evaluate_result(self, result: Any) -> Dict[str, Any]:
success = result is not None
confidence = 0.8 if success else 0.2
return {"success": success, "confidence": confidence, "timestamp": pd.Timestamp.now().isoformat()}
# ---------------------------
# Specialized Agent Implementations
# ---------------------------
class WebResearchAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("web_research", hub)
def process_task(self, task: str) -> Dict[str, Any]:
logger.info(f"WebResearchAgent processing: {task}")
search_term = task
if self.hub.summarizer:
try:
keywords = task.split()
if len(keywords) > 5:
summary = self.hub.summarizer(task, max_length=20, min_length=5, do_sample=False)
search_term = summary[0]['summary_text']
else:
search_term = task
except Exception as e:
logger.error(f"Summarization error in WebResearchAgent: {e}")
search_term = task
try:
search_results = wikipedia.search(search_term)
if not search_results:
result = {"text": f"No Wikipedia pages found for '{task}'."}
self.memory.add_short_term({"task": task, "result": result, "success": False})
return result
page_title = None
summary_text = None
error_details = []
for candidate in search_results[:3]:
try:
summary_text = wikipedia.summary(candidate, sentences=5)
page_title = candidate
break
except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError) as e:
error_details.append(f"{candidate}: {str(e)}")
continue
if not summary_text:
result = {"text": f"Failed to get Wikipedia summary for '{task}'. Errors: {'; '.join(error_details)}", "search_results": search_results}
self.memory.add_short_term({"task": task, "result": result, "success": False})
return result
self.memory.add_long_term(f"research:{search_term}", {"page_title": page_title, "summary": summary_text, "timestamp": pd.Timestamp.now().isoformat()})
result = {"text": f"Research on '{page_title}':\n{summary_text}", "page_title": page_title, "related_topics": search_results[:5], "source": "Wikipedia"}
self.memory.add_short_term({"task": task, "result": result, "success": True})
return result
except Exception as e:
error_msg = f"Error in web research: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
self.memory.add_short_term({"task": task, "result": result, "success": False})
return result
class WebScraperAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("web_scraper", hub)
def process_task(self, task: str) -> Dict[str, Any]:
logger.info(f"WebScraperAgent processing URL: {task}")
if not task.startswith(('http://', 'https://')):
return {"text": "Invalid URL format. Please provide a URL starting with http:// or https://"}
try:
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(task, headers=headers, timeout=10)
if response.status_code != 200:
result = {"text": f"Error: received status code {response.status_code} from {task}"}
self.memory.add_short_term({"url": task, "result": result, "success": False})
return result
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.string.strip() if soup.title and soup.title.string else "No title found"
main_content = soup.find('main') or soup.find(id='content') or soup.find(class_='content')
paras = main_content.find_all('p') if main_content else soup.find_all('p')
content = "\n".join([p.get_text().strip() for p in paras if len(p.get_text().strip()) > 50])
if len(content) > 2000 and self.hub.summarizer:
chunks = [content[i:i+1000] for i in range(0, len(content), 1000)]
summarized_chunks = []
for chunk in chunks:
summary = self.hub.summarizer(chunk, max_length=100, min_length=30, do_sample=False)
summarized_chunks.append(summary[0]['summary_text'])
content = "\n".join(summarized_chunks)
elif len(content) > 2000:
content = content[:2000] + "... (content truncated)"
links = []
for a in soup.find_all('a', href=True):
href = a['href']
if href.startswith('http') and len(links) < 5:
links.append({"url": href, "text": a.get_text().strip() or href})
result = {"text": f"Content from {task}:\n\nTitle: {title}\n\n{content}", "title": title, "raw_content": content, "links": links, "source_url": task}
self.memory.add_short_term({"url": task, "result": result, "success": True})
self.memory.add_long_term(f"scraped:{task}", {"title": title, "content_preview": content[:200], "timestamp": pd.Timestamp.now().isoformat()})
return result
except requests.RequestException as e:
error_msg = f"Request error for {task}: {str(e)}"
logger.error(error_msg)
return {"text": error_msg, "error": str(e)}
except Exception as e:
error_msg = f"Error scraping {task}: {str(e)}"
logger.error(error_msg)
return {"text": error_msg, "error": str(e)}
class TextProcessingAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("text_processing", hub)
def process_task(self, task: str) -> Dict[str, Any]:
logger.info(f"TextProcessingAgent processing text ({len(task)} chars)")
if not task or len(task) < 10:
return {"text": "Text too short to process meaningfully."}
results = {}
words = task.split()
sentences = task.split('. ')
results["statistics"] = {
"character_count": len(task),
"word_count": len(words),
"estimated_sentences": len(sentences),
"average_word_length": sum(len(word) for word in words) / len(words) if words else 0
}
if len(task) > 5000:
chunk_size = 500
chunking_strategy = "character_blocks"
elif len(words) > 200:
chunk_size = 50
chunking_strategy = "word_blocks"
else:
chunk_size = 5
chunking_strategy = "sentence_blocks"
if chunking_strategy == "character_blocks":
chunks = [task[i:i+chunk_size] for i in range(0, len(task), chunk_size)]
elif chunking_strategy == "word_blocks":
chunks = [' '.join(words[i:i+chunk_size]) for i in range(0, len(words), chunk_size)]
else:
chunks = ['. '.join(sentences[i:i+chunk_size]) + '.' for i in range(0, len(sentences), chunk_size)]
results["chunks"] = chunks
results["chunking_strategy"] = chunking_strategy
if self.hub.summarizer and len(task) > 200:
try:
task_for_summary = task[:1000] if len(task) > 1000 else task
summary = self.hub.summarizer(task_for_summary, max_length=100, min_length=30, do_sample=False)
results["summary"] = summary[0]['summary_text']
except Exception as e:
logger.error(f"Summarization error: {e}")
results["summary_error"] = str(e)
stop_words = set(['the', 'a', 'an', 'and', 'in', 'on', 'at', 'to', 'for', 'of', 'with'])
word_freq = {}
for word in words:
w = word.lower().strip('.,!?:;()-"\'')
if w and w not in stop_words and len(w) > 1:
word_freq[w] = word_freq.get(w, 0) + 1
results["frequent_words"] = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:10]
positive_words = set(['good', 'great', 'excellent', 'positive', 'happy', 'best', 'better', 'success'])
negative_words = set(['bad', 'worst', 'terrible', 'negative', 'sad', 'problem', 'fail', 'issue'])
pos_count = sum(1 for word in words if word.lower().strip('.,!?:;()-"\'') in positive_words)
neg_count = sum(1 for word in words if word.lower().strip('.,!?:;()-"\'') in negative_words)
sentiment = "possibly positive" if pos_count > neg_count and pos_count > 2 else ("possibly negative" if neg_count > pos_count and neg_count > 2 else "neutral or mixed")
results["basic_sentiment"] = {"assessment": sentiment, "positive_word_count": pos_count, "negative_word_count": neg_count}
self.memory.add_short_term({"task_preview": task[:100] + "..." if len(task) > 100 else task, "word_count": results["statistics"]["word_count"], "result": results})
text_response = (
f"Text Analysis Results:\n- {results['statistics']['word_count']} words, {results['statistics']['character_count']} characters\n"
f"- Split into {len(chunks)} chunks using {chunking_strategy}\n"
)
if "summary" in results:
text_response += f"\nSummary:\n{results['summary']}\n"
if results["frequent_words"]:
text_response += "\nMost frequent words:\n"
for word, count in results["frequent_words"][:5]:
text_response += f"- {word}: {count} occurrences\n"
text_response += f"\nOverall tone appears {results['basic_sentiment']['assessment']}"
results["text"] = text_response
return results
class DataAnalysisAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("data_analysis", hub)
def process_task(self, task: str) -> Dict[str, Any]:
logger.info(f"DataAnalysisAgent processing: {task}")
file_path = None
if "analyze" in task.lower() and ".csv" in task.lower():
for word in task.split():
if word.endswith('.csv'):
file_path = word
break
if not file_path or not Path(file_path).exists():
logger.info("No specific CSV file mentioned or file not found, creating sample data")
if "time series" in task.lower():
dates = pd.date_range(start='2023-01-01', periods=30, freq='D')
df = pd.DataFrame({'date': dates, 'value': np.random.normal(100, 15, 30), 'trend': np.linspace(0, 20, 30) + np.random.normal(0, 2, 30)})
file_path = "sample_timeseries.csv"
elif "sales" in task.lower():
products = ['ProductA', 'ProductB', 'ProductC', 'ProductD']
regions = ['North', 'South', 'East', 'West']
dates = pd.date_range(start='2023-01-01', periods=50, freq='D')
data = []
for _ in range(200):
data.append({'date': np.random.choice(dates), 'product': np.random.choice(products), 'region': np.random.choice(regions), 'units_sold': np.random.randint(10, 100), 'revenue': np.random.uniform(100, 1000)})
df = pd.DataFrame(data)
file_path = "sample_sales.csv"
else:
df = pd.DataFrame({
'A': np.random.normal(0, 1, 100),
'B': np.random.normal(5, 2, 100),
'C': np.random.uniform(-10, 10, 100),
'D': np.random.randint(0, 5, 100),
'label': np.random.choice(['X', 'Y', 'Z'], 100)
})
file_path = "sample_data.csv"
df.to_csv(file_path, index=False)
logger.info(f"Created sample data file: {file_path}")
else:
try:
df = pd.read_csv(file_path)
logger.info(f"Loaded existing file: {file_path}")
except Exception as e:
error_msg = f"Error loading CSV file {file_path}: {str(e)}"
logger.error(error_msg)
return {"text": error_msg, "error": str(e)}
analysis_results = {}
try:
numeric_cols = df.select_dtypes(include=[np.number]).columns
analysis_results["summary_stats"] = df[numeric_cols].describe().to_dict()
categorical_cols = df.select_dtypes(exclude=[np.number]).columns
for col in categorical_cols:
if df[col].nunique() < 10:
analysis_results[f"{col}_distribution"] = df[col].value_counts().to_dict()
except Exception as e:
logger.error(f"Error in basic statistics: {e}")
analysis_results["stats_error"] = str(e)
try:
missing_values = df.isnull().sum().to_dict()
analysis_results["missing_values"] = {k: v for k, v in missing_values.items() if v > 0}
except Exception as e:
logger.error(f"Error in missing values analysis: {e}")
analysis_results["missing_values_error"] = str(e)
try:
if len(numeric_cols) > 1:
analysis_results["correlations"] = df[numeric_cols].corr().to_dict()
except Exception as e:
logger.error(f"Error in correlation analysis: {e}")
analysis_results["correlation_error"] = str(e)
try:
plt.figure(figsize=(10, 8))
categorical_cols = df.select_dtypes(exclude=[np.number]).columns
if len(numeric_cols) >= 2:
plt.subplot(2, 1, 1)
x_col, y_col = numeric_cols[0], numeric_cols[1]
sample_df = df.sample(1000) if len(df) > 1000 else df
if len(categorical_cols) > 0 and df[categorical_cols[0]].nunique() < 10:
cat_col = categorical_cols[0]
for category, group in sample_df.groupby(cat_col):
plt.scatter(group[x_col], group[y_col], label=category, alpha=0.6)
plt.legend()
else:
plt.scatter(sample_df[x_col], sample_df[y_col], alpha=0.6)
plt.xlabel(x_col)
plt.ylabel(y_col)
plt.title(f"Scatter Plot: {x_col} vs {y_col}")
plt.subplot(2, 1, 2)
if 'date' in df.columns or any('time' in col.lower() for col in df.columns):
date_col = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()][0]
value_col = numeric_cols[0] if numeric_cols[0] != date_col else numeric_cols[1]
if not pd.api.types.is_datetime64_dtype(df[date_col]):
df[date_col] = pd.to_datetime(df[date_col], errors='coerce')
temp_df = df.dropna(subset=[date_col, value_col]).sort_values(date_col)
plt.plot(temp_df[date_col], temp_df[value_col])
plt.xlabel(date_col)
plt.ylabel(value_col)
plt.title(f"Time Series: {value_col} over {date_col}")
plt.xticks(rotation=45)
else:
plt.hist(df[numeric_cols[0]].dropna(), bins=20, alpha=0.7)
plt.xlabel(numeric_cols[0])
plt.ylabel('Frequency')
plt.title(f"Distribution of {numeric_cols[0]}")
else:
if len(categorical_cols) > 0:
cat_col = categorical_cols[0]
df[cat_col].value_counts().plot(kind='bar')
plt.xlabel(cat_col)
plt.ylabel('Count')
plt.title(f"Counts by {cat_col}")
plt.xticks(rotation=45)
else:
plt.hist(df[numeric_cols[0]].dropna(), bins=20)
plt.xlabel(numeric_cols[0])
plt.ylabel('Frequency')
plt.title(f"Distribution of {numeric_cols[0]}")
plt.tight_layout()
viz_path = f"{Path(file_path).stem}_viz.png"
plt.savefig(viz_path)
plt.close()
analysis_results["visualization_path"] = viz_path
analysis_results["visualization_created"] = True
logger.info(f"Created visualization: {viz_path}")
except Exception as e:
logger.error(f"Error creating visualization: {e}")
analysis_results["visualization_error"] = str(e)
analysis_results["visualization_created"] = False
insights = []
try:
for col in numeric_cols:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
outlier_count = ((df[col] < (q1 - 1.5 * iqr)) | (df[col] > (q3 + 1.5 * iqr))).sum()
if outlier_count > 0:
insights.append(f"Found {outlier_count} potential outliers in '{col}'")
if "correlations" in analysis_results:
for col1, corr_dict in analysis_results["correlations"].items():
for col2, corr_val in corr_dict.items():
if col1 != col2 and abs(corr_val) > 0.7:
insights.append(f"Strong correlation ({corr_val:.2f}) between '{col1}' and '{col2}'")
for col in categorical_cols:
if df[col].nunique() < 10:
value_counts = df[col].value_counts()
most_common = value_counts.idxmax()
most_common_pct = value_counts.max() / value_counts.sum() * 100
if most_common_pct > 80:
insights.append(f"Imbalanced category in '{col}': '{most_common}' accounts for {most_common_pct:.1f}% of data")
analysis_results["insights"] = insights
except Exception as e:
logger.error(f"Error extracting insights: {e}")
analysis_results["insights_error"] = str(e)
self.memory.add_short_term({"file": file_path, "columns": list(df.columns), "row_count": len(df), "analysis": analysis_results})
if "sample" in file_path:
self.memory.add_long_term(f"analysis:{file_path}", {"file": file_path, "type": "generated", "columns": list(df.columns), "row_count": len(df), "timestamp": pd.Timestamp.now().isoformat()})
column_list = ", ".join(df.columns[:5]) + (", ..." if len(df.columns) > 5 else "")
text_response = (
f"Data Analysis Results for {file_path}\n- Dataset: {len(df)} rows x {len(df.columns)} columns ({column_list})\n"
)
if "missing_values" in analysis_results and analysis_results["missing_values"]:
text_response += f"- Missing values found in {len(analysis_results['missing_values'])} columns\n"
if insights:
text_response += "\nKey Insights:\n"
for i, insight in enumerate(insights[:5], 1):
text_response += f"{i}. {insight}\n"
if len(insights) > 5:
text_response += f"... and {len(insights) - 5} more insights\n"
text_response += f"\nVisualization saved to {viz_path}" if analysis_results.get("visualization_created") else "\nNo visualization created"
analysis_results["text"] = text_response
analysis_results["dataframe_shape"] = df.shape
analysis_results["data_preview"] = df.head(5).to_dict()
return analysis_results
class CodingAssistantAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("coding_assistant", hub)
self.code_snippets = {
"file_operations": {
"read_file": '''
def read_file(file_path):
"""Read a file and return its contents"""
with open(file_path, 'r') as file:
return file.read()
''',
"write_file": '''
def write_file(file_path, content):
"""Write content to a file"""
with open(file_path, 'w') as file:
file.write(content)
return True
'''
},
"data_processing": {
"pandas_read_csv": '''
import pandas as pd
def load_csv(file_path):
"""Load a CSV file into a Pandas DataFrame"""
return pd.read_csv(file_path)
''',
"pandas_basic_stats": '''
def get_basic_stats(df):
"""Get basic statistics for a DataFrame"""
numeric_stats = df.describe()
categorical_columns = df.select_dtypes(include=['object']).columns
categorical_stats = {col: df[col].value_counts().to_dict() for col in categorical_columns}
return {
'numeric': numeric_stats.to_dict(),
'categorical': categorical_stats
}
'''
},
"visualization": {
"matplotlib_basic_plot": '''
import matplotlib.pyplot as plt
def create_basic_plot(data, x_col, y_col, title="Plot", kind="line"):
"""Create a basic plot using matplotlib"""
plt.figure(figsize=(10, 6))
if kind == "line":
plt.plot(data[x_col], data[y_col])
elif kind == "scatter":
plt.scatter(data[x_col], data[y_col])
elif kind == "bar":
plt.bar(data[x_col], data[y_col])
plt.title(title)
plt.xlabel(x_col)
plt.ylabel(y_col)
plt.tight_layout()
plt.savefig(f"{title.lower().replace(' ', '_')}.png")
plt.close()
return f"{title.lower().replace(' ', '_')}.png"
'''
},
"web_scraping": {
"requests_beautifulsoup": '''
import requests
from bs4 import BeautifulSoup
def scrape_webpage(url):
"""Scrape a webpage and extract text from paragraphs"""
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
paragraphs = soup.find_all('p')
text = [p.get_text() for p in paragraphs]
return {
'title': soup.title.string if soup.title else "No title",
'text': text,
'url': url
}
except Exception as e:
return {'error': str(e), 'url': url}
'''
},
"nlp": {
"basic_text_analysis": '''
from collections import Counter
import re
def analyze_text(text):
"""Perform basic text analysis"""
text = text.lower()
words = re.findall(r'\w+', text)
word_count = len(words)
unique_words = len(set(words))
stop_words = {'the', 'a', 'an', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'and', 'or'}
word_freq = Counter([w for w in words if w not in stop_words and len(w) > 1])
return {
'word_count': word_count,
'unique_words': unique_words,
'avg_word_length': sum(len(w) for w in words) / word_count if word_count else 0,
'most_common': word_freq.most_common(10)
}
'''
},
"machine_learning": {
"basic_classifier": '''
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
def train_basic_classifier(X, y, test_size=0.2, random_state=42):
"""Train a basic RandomForest classifier"""
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
model = RandomForestClassifier(n_estimators=100, random_state=random_state)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
report = classification_report(y_test, y_pred, output_dict=True)
return {
'model': model,
'accuracy': report['accuracy'],
'classification_report': report,
'feature_importance': dict(zip(range(X.shape[1]), model.feature_importances_))
}
'''
}
}
def process_task(self, task: str) -> Dict[str, Any]:
logger.info(f"CodingAssistantAgent processing: {task}")
task_lower = task.lower()
keyword_mapping = {
"file": "file_operations",
"read file": "file_operations",
"write file": "file_operations",
"csv": "data_processing",
"data": "data_processing",
"pandas": "data_processing",
"dataframe": "data_processing",
"plot": "visualization",
"chart": "visualization",
"graph": "visualization",
"visualize": "visualization",
"matplotlib": "visualization",
"scrape": "web_scraping",
"web": "web_scraping",
"html": "web_scraping",
"beautifulsoup": "web_scraping",
"text analysis": "nlp",
"nlp": "nlp",
"natural language": "nlp",
"word count": "nlp",
"text processing": "nlp",
"machine learning": "machine_learning",
"ml": "machine_learning",
"model": "machine_learning",
"predict": "machine_learning",
"classifier": "machine_learning"
}
code_category = None
function_name = None
for keyword, category in keyword_mapping.items():
if keyword in task_lower:
code_category = category
for func_name in self.code_snippets.get(category, {}):
natural_func = func_name.replace('_', ' ')
if natural_func in task_lower:
function_name = func_name
break
break
if not code_category:
if any(word in task_lower for word in ["add", "sum", "calculate", "compute"]):
code_category = "data_processing"
elif any(word in task_lower for word in ["show", "display", "generate"]):
code_category = "visualization"
if code_category and not function_name and self.code_snippets.get(code_category):
function_name = next(iter(self.code_snippets[code_category]))
if not code_category:
function_parts = [word for word in task_lower.split() if word not in ["a", "the", "an", "to", "for", "function", "code", "create", "make"]]
func_name = "_".join(function_parts[:2]) if len(function_parts) >= 2 else "custom_function"
custom_code = f"""
def {func_name}(input_data):
# Custom function based on your request: '{task}'
result = None
# TODO: Implement specific logic based on requirements
if isinstance(input_data, list):
result = len(input_data)
elif isinstance(input_data, str):
result = input_data.upper()
elif isinstance(input_data, (int, float)):
result = input_data * 2
return {{
'input': input_data,
'result': result,
'status': 'processed'
}}
"""
result = {
"text": f"I've created a custom function template based on your request:\n\n```python\n{custom_code}\n```\n\nThis is a starting point you can customize further.",
"code": custom_code,
"language": "python",
"type": "custom"
}
else:
code_snippet = self.code_snippets[code_category][function_name]
result = {
"text": f"Here's a {code_category.replace('_', ' ')} function for {function_name.replace('_', ' ')}:\n\n```python\n{code_snippet}\n```\n\nYou can customize this code.",
"code": code_snippet,
"language": "python",
"category": code_category,
"function": function_name
}
self.memory.add_short_term({"task": task, "code_category": code_category, "function_provided": function_name, "timestamp": pd.Timestamp.now().isoformat()})
return result
class ImageProcessingAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("image_processing", hub)
def process_task(self, task: Any) -> Dict[str, Any]:
logger.info("ImageProcessingAgent processing task")
image = None
task_type = None
if isinstance(task, Image.Image):
image = task
task_type = "direct_image"
elif isinstance(task, str):
if Path(task).exists() and Path(task).suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']:
try:
image = Image.open(task)
task_type = "image_path"
except Exception as e:
return {"text": f"Error loading image from {task}: {str(e)}", "error": str(e)}
else:
task_type = "text_instruction"
elif isinstance(task, dict) and 'image' in task:
if isinstance(task['image'], Image.Image):
image = task['image']
elif isinstance(task['image'], str) and Path(task['image']).exists():
try:
image = Image.open(task['image'])
except Exception as e:
return {"text": f"Error loading image from {task['image']}: {str(e)}", "error": str(e)}
task_type = "dict_with_image"
if task_type == "text_instruction" and not image:
return {"text": "Please provide an image to process along with instructions."}
if not image:
return {"text": "No valid image provided for processing."}
processing_type = "edge_detection"
if task_type in ["text_instruction", "dict_with_image"] and isinstance(task, dict):
instruction = task.get('instruction', '').lower()
if 'blur' in instruction or 'smooth' in instruction:
processing_type = "blur"
elif 'edge' in instruction or 'contour' in instruction:
processing_type = "edge_detection"
elif 'gray' in instruction or 'greyscale' in instruction or 'black and white' in instruction:
processing_type = "grayscale"
elif 'bright' in instruction or 'contrast' in instruction:
processing_type = "enhance"
elif 'resize' in instruction or 'scale' in instruction:
processing_type = "resize"
try:
img_array = np.array(image)
if img_array.ndim == 3 and img_array.shape[-1] == 4:
img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGBA2BGR)
else:
img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
processed_img = None
processing_details = {"original_size": image.size}
if processing_type == "edge_detection":
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 100, 200)
processed_img = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
processing_details["processing"] = "Edge detection using Canny"
elif processing_type == "blur":
processed_img = cv2.GaussianBlur(img_cv, (7, 7), 0)
processing_details["processing"] = "Gaussian Blur"
elif processing_type == "grayscale":
processed_img = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
processed_img = cv2.cvtColor(processed_img, cv2.COLOR_GRAY2BGR)
processing_details["processing"] = "Grayscale conversion"
elif processing_type == "enhance":
lab = cv2.cvtColor(img_cv, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
cl = clahe.apply(l)
limg = cv2.merge((cl, a, b))
processed_img = cv2.cvtColor(limg, cv2.COLOR_LAB2BGR)
processing_details["processing"] = "Contrast enhancement"
elif processing_type == "resize":
processed_img = cv2.resize(img_cv, (image.size[0]//2, image.size[1]//2))
processing_details["processing"] = "Resized to half"
else:
processed_img = img_cv
processing_details["processing"] = "No processing applied"
processed_pil = Image.fromarray(cv2.cvtColor(processed_img, cv2.COLOR_BGR2RGB))
return {"text": f"Image processing completed with {processing_details['processing']}.", "image": processed_pil, "details": processing_details}
except Exception as e:
error_msg = f"Error processing image: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
return {"text": f"Error processing image: {str(e)}", "error": str(e)}
class FileManagementAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("file_management", hub)
def process_task(self, task: str) -> Dict[str, Any]:
logger.info(f"FileManagementAgent processing: {task}")
task_lower = task.lower()
if any(word in task_lower for word in ["create", "make", "generate", "write"]):
operation = "create"
elif any(word in task_lower for word in ["read", "open", "show", "display", "content"]):
operation = "read"
elif any(word in task_lower for word in ["list", "find", "directory", "folder", "files in"]):
operation = "list"
elif any(word in task_lower for word in ["delete", "remove"]):
operation = "delete"
else:
operation = "unknown"
filename = None
file_extensions = ['.txt', '.json', '.csv', '.md', '.py', '.html', '.js', '.css']
words = task.split()
for word in words:
for ext in file_extensions:
if ext in word.lower():
filename = word.strip(':"\'.,;')
break
if filename:
break
if not filename:
file_keywords = ["file", "named", "called", "filename"]
for i, word in enumerate(words):
if word.lower() in file_keywords and i < len(words) - 1:
potential_name = words[i+1].strip(':"\'.,;')
if '.' not in potential_name:
if "json" in task_lower:
potential_name += ".json"
elif "csv" in task_lower:
potential_name += ".csv"
elif "python" in task_lower or "py" in task_lower:
potential_name += ".py"
else:
potential_name += ".txt"
filename = potential_name
break
if not filename:
if "json" in task_lower:
filename = f"data_{uuid.uuid4().hex[:6]}.json"
elif "csv" in task_lower:
filename = f"data_{uuid.uuid4().hex[:6]}.csv"
elif "python" in task_lower or "py" in task_lower:
filename = f"script_{uuid.uuid4().hex[:6]}.py"
elif "log" in task_lower:
filename = f"log_{uuid.uuid4().hex[:6]}.txt"
else:
filename = f"file_{uuid.uuid4().hex[:6]}.txt"
result = {}
if operation == "create":
if filename.endswith('.json'):
content = json.dumps({
"name": "Sample Data",
"description": task,
"created": pd.Timestamp.now().isoformat(),
"values": [1, 2, 3, 4, 5],
"metadata": {"source": "FileManagementAgent", "version": "1.0"}
}, indent=2)
elif filename.endswith('.csv'):
content = "id,name,value,timestamp\n"
for i in range(5):
content += f"{i+1},Item{i+1},{np.random.randint(1, 100)},{pd.Timestamp.now().isoformat()}\n"
elif filename.endswith('.py'):
content = f"""# Generated Python Script: {filename}
# Created: {pd.Timestamp.now().isoformat()}
# Description: {task}
def main():
print("Hello from the FileManagementAgent!")
data = [1, 2, 3, 4, 5]
result = sum(data)
print(f"Sample calculation: sum(data) = {{result}}")
return result
if __name__ == "__main__":
main()
"""
else:
content = f"File created by FileManagementAgent\nCreated: {pd.Timestamp.now().isoformat()}\nBased on request: {task}\n\nThis is sample content."
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write(content)
result = {"text": f"Successfully created file: {filename}", "operation": "create", "filename": filename, "size": len(content), "preview": content[:200] + "..." if len(content) > 200 else content}
self.memory.add_short_term({"operation": "create", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()})
self.memory.add_long_term(f"file:{filename}", {"operation": "create", "type": Path(filename).suffix, "timestamp": pd.Timestamp.now().isoformat()})
except Exception as e:
error_msg = f"Error creating file {filename}: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
elif operation == "read":
if not filename:
result = {"text": "Please specify a filename to read."}
elif not Path(filename).exists():
result = {"text": f"File '{filename}' not found."}
else:
try:
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
result = {"text": f"Content of {filename}:\n\n{content}", "operation": "read", "filename": filename, "content": content, "size": len(content)}
self.memory.add_short_term({"operation": "read", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()})
except Exception as e:
error_msg = f"Error reading file {filename}: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
elif operation == "list":
try:
directory = "."
for term in ["directory", "folder", "in"]:
if term in task_lower:
parts = task_lower.split(term)
if len(parts) > 1:
potential_dir = parts[1].strip().split()[0].strip(':"\'.,;')
if Path(potential_dir).exists() and Path(potential_dir).is_dir():
directory = potential_dir
extension_filter = None
for ext in file_extensions:
if ext in task_lower:
extension_filter = ext
break
files = list(Path(directory).glob('*' + (extension_filter or '')))
file_groups = {}
for file in files:
file_groups.setdefault(file.suffix, []).append({
"name": file.name,
"size": file.stat().st_size,
"modified": pd.Timestamp(file.stat().st_mtime, unit='s').isoformat()
})
response_text = f"Found {len(files)} files" + (f" with extension {extension_filter}" if extension_filter else "") + f" in {directory}:\n\n"
for ext, group in file_groups.items():
response_text += f"{ext} files ({len(group)}):\n"
for file_info in sorted(group, key=lambda x: x["name"]):
size_kb = file_info["size"] / 1024
response_text += f"- {file_info['name']} ({size_kb:.1f} KB, modified: {file_info['modified']})\n"
response_text += "\n"
result = {"text": response_text, "operation": "list", "directory": directory, "file_count": len(files), "files": file_groups}
self.memory.add_short_term({"operation": "list", "directory": directory, "file_count": len(files), "timestamp": pd.Timestamp.now().isoformat()})
except Exception as e:
error_msg = f"Error listing files: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
elif operation == "delete":
if not filename:
result = {"text": "Please specify a filename to delete."}
elif not Path(filename).exists():
result = {"text": f"File '{filename}' not found."}
else:
try:
os.remove(filename)
result = {"text": f"Successfully deleted file: {filename}", "operation": "delete", "filename": filename}
self.memory.add_short_term({"operation": "delete", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()})
self.memory.add_long_term(f"file:{filename}", {"operation": "delete", "timestamp": pd.Timestamp.now().isoformat()})
except Exception as e:
error_msg = f"Error deleting file {filename}: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
else:
result = {"text": f"Unknown operation requested in task: {task}"}
return result
# ---------------------------
# Gradio Interface Setup
# ---------------------------
def create_agent_hub():
hub = AgentHub()
hub.register_agent("web_research", WebResearchAgent(hub))
hub.register_agent("web_scraper", WebScraperAgent(hub))
hub.register_agent("text_processing", TextProcessingAgent(hub))
hub.register_agent("data_analysis", DataAnalysisAgent(hub))
hub.register_agent("coding_assistant", CodingAssistantAgent(hub))
hub.register_agent("image_processing", ImageProcessingAgent(hub))
hub.register_agent("file_management", FileManagementAgent(hub))
return hub
def create_gradio_interface():
hub = create_agent_hub()
def process_request(request_type, input_data, extra_data=""):
try:
if request_type == "chain":
agent_sequence = [agent.strip() for agent in extra_data.split(",") if agent.strip()]
return hub.chain_of_thought(input_data, agent_sequence)
else:
agent = hub.get_agent(request_type)
if not agent:
return {"error": f"Unknown agent type: {request_type}"}
return agent.process_task(input_data)
except Exception as e:
logger.error(f"Error processing request: {e}")
return {"error": str(e)}
with gr.Blocks(title="SmolAgents Toolbelt") as interface:
gr.Markdown("# SmolAgents Toolbelt")
gr.Markdown("A collection of specialized agents for various tasks with evolved logic :contentReference[oaicite:0]{index=0}.")
with gr.Tabs():
with gr.Tab("Single Agent"):
agent_type = gr.Dropdown(
choices=["web_research", "web_scraper", "text_processing", "data_analysis", "coding_assistant", "image_processing", "file_management"],
label="Select Agent",
value="web_research"
)
with gr.Row():
input_text = gr.Textbox(label="Input", placeholder="Enter your request...")
extra_input = gr.Textbox(label="Extra (e.g., image path or additional info)", placeholder="Optional extra input...")
output_text = gr.JSON(label="Output")
process_btn = gr.Button("Process")
process_btn.click(fn=process_request, inputs=[agent_type, input_text, extra_input], outputs=output_text)
with gr.Tab("Chain of Thought"):
chain_input = gr.Textbox(label="Input", placeholder="Enter your request for the chain...")
chain_sequence = gr.Textbox(label="Agent Sequence", placeholder="Comma-separated agent names (e.g., text_processing,data_analysis)")
chain_output = gr.JSON(label="Chain Output")
chain_type = gr.State("chain")
chain_btn = gr.Button("Process Chain")
chain_btn.click(fn=process_request, inputs=[chain_type, chain_input, chain_sequence], outputs=chain_output)
with gr.Tab("Help"):
gr.Markdown("""
## Available Agents
- **Web Research Agent**: Searches Wikipedia for information.
- **Web Scraper Agent**: Scrapes content from provided URLs.
- **Text Processing Agent**: Analyzes and processes text.
- **Data Analysis Agent**: Performs data analysis and visualization.
- **Coding Assistant Agent**: Generates code snippets.
- **Image Processing Agent**: Processes images based on instructions.
- **File Management Agent**: Handles file creation, reading, listing, and deletion.
### Usage
1. Select an agent (or choose 'Chain of Thought' for a sequence).
2. Enter your request.
3. For chains, provide a comma-separated list of agent IDs.
""")
return interface
if __name__ == "__main__":
demo = create_gradio_interface()
demo.launch(server_name="0.0.0.0", server_port=7860, share=True)