Spaces:
Runtime error
Runtime error
File size: 52,201 Bytes
4dcc31d 183fbae 4dcc31d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 |
import logging
import os
import sys
from pathlib import Path
import json
import io
import uuid
import traceback
from typing import Dict, List, Any, Tuple, Optional
from dataclasses import dataclass
# Set UTF-8 encoding for Windows
if sys.platform == 'win32':
os.environ["PYTHONIOENCODING"] = "utf-8"
import gradio as gr
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
from sklearn.datasets import load_iris
import cv2
from PIL import Image
# Additional libraries for web research & scraping
import wikipedia
import requests
from bs4 import BeautifulSoup
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ---------------------------
# Agent Context & Memory System
# ---------------------------
@dataclass
class AgentMemory:
short_term: List[Dict[str, Any]] = None
long_term: Dict[str, Any] = None
def __post_init__(self):
if self.short_term is None:
self.short_term = []
if self.long_term is None:
self.long_term = {}
def add_short_term(self, data: Dict[str, Any]) -> None:
self.short_term.append(data)
if len(self.short_term) > 10:
self.short_term.pop(0)
def add_long_term(self, key: str, value: Any) -> None:
self.long_term[key] = value
def get_recent_context(self, n: int = 3) -> List[Dict[str, Any]]:
return self.short_term[-n:] if len(self.short_term) >= n else self.short_term
def search_long_term(self, query: str) -> List[Tuple[str, Any]]:
results = []
for key, value in self.long_term.items():
if query.lower() in key.lower():
results.append((key, value))
return results
# ---------------------------
# Agent Hub
# ---------------------------
class AgentHub:
def __init__(self):
self.agents = {}
self.global_memory = AgentMemory()
self.session_id = str(uuid.uuid4())
try:
self.tokenizer = AutoTokenizer.from_pretrained("distilgpt2")
self.model = AutoModelForCausalLM.from_pretrained("distilgpt2")
self.generator = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer)
logger.info("Initialized text generation pipeline with distilgpt2")
except Exception as e:
logger.error(f"Failed to initialize text generation: {e}")
self.generator = None
try:
self.summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
logger.info("Initialized summarization pipeline")
except Exception as e:
logger.error(f"Failed to initialize summarizer: {e}")
self.summarizer = None
def register_agent(self, agent_id: str, agent_instance) -> None:
self.agents[agent_id] = agent_instance
logger.info(f"Registered agent: {agent_id}")
def get_agent(self, agent_id: str):
return self.agents.get(agent_id)
def broadcast(self, message: Dict[str, Any], exclude: Optional[List[str]] = None) -> Dict[str, List[Dict]]:
exclude = exclude or []
responses = {}
for agent_id, agent in self.agents.items():
if agent_id not in exclude:
try:
response = agent.process_message(message)
responses[agent_id] = response
except Exception as e:
logger.error(f"Error in agent {agent_id}: {e}")
responses[agent_id] = {"error": str(e)}
return responses
def chain_of_thought(self, initial_task: str, agent_sequence: List[str]) -> Dict[str, Any]:
results = {"final_output": None, "chain_outputs": [], "errors": []}
current_input = initial_task
for agent_id in agent_sequence:
agent = self.get_agent(agent_id)
if not agent:
error = f"Agent {agent_id} not found"
results["errors"].append(error)
logger.error(error)
continue
try:
output = agent.process_task(current_input)
step_result = {"agent": agent_id, "input": current_input, "output": output}
results["chain_outputs"].append(step_result)
if isinstance(output, dict) and "text" in output:
current_input = output["text"]
elif isinstance(output, str):
current_input = output
else:
current_input = f"Result from {agent_id}: {type(output).__name__} object"
except Exception as e:
error = f"Error in agent {agent_id}: {str(e)}\n{traceback.format_exc()}"
results["errors"].append(error)
logger.error(error)
if results["chain_outputs"]:
last_output = results["chain_outputs"][-1]["output"]
results["final_output"] = last_output if isinstance(last_output, dict) else {"text": str(last_output)}
return results
# ---------------------------
# Intelligent Agent Base Class
# ---------------------------
class IntelligentAgent:
def __init__(self, agent_id: str, hub: AgentHub):
self.agent_id = agent_id
self.hub = hub
self.memory = AgentMemory()
logger.info(f"Initialized agent: {agent_id}")
def process_task(self, task: Any) -> Any:
raise NotImplementedError("Subclasses must implement process_task")
def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
logger.info(f"Agent {self.agent_id} received message: {message}")
self.memory.add_short_term({"timestamp": pd.Timestamp.now(), "message": message})
return {"sender": self.agent_id, "received": True, "action": "acknowledge"}
def request_assistance(self, target_agent_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
target_agent = self.hub.get_agent(target_agent_id)
if not target_agent:
logger.error(f"Agent {self.agent_id} requested unknown agent: {target_agent_id}")
return {"error": f"Agent {target_agent_id} not found"}
request = {"sender": self.agent_id, "type": "assistance_request", "data": data}
return target_agent.process_message(request)
def evaluate_result(self, result: Any) -> Dict[str, Any]:
success = result is not None
confidence = 0.8 if success else 0.2
return {"success": success, "confidence": confidence, "timestamp": pd.Timestamp.now().isoformat()}
# ---------------------------
# Specialized Agent Implementations
# ---------------------------
class WebResearchAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("web_research", hub)
def process_task(self, task: str) -> Dict[str, Any]:
logger.info(f"WebResearchAgent processing: {task}")
search_term = task
if self.hub.summarizer:
try:
keywords = task.split()
if len(keywords) > 5:
summary = self.hub.summarizer(task, max_length=20, min_length=5, do_sample=False)
search_term = summary[0]['summary_text']
else:
search_term = task
except Exception as e:
logger.error(f"Summarization error in WebResearchAgent: {e}")
search_term = task
try:
search_results = wikipedia.search(search_term)
if not search_results:
result = {"text": f"No Wikipedia pages found for '{task}'."}
self.memory.add_short_term({"task": task, "result": result, "success": False})
return result
page_title = None
summary_text = None
error_details = []
for candidate in search_results[:3]:
try:
summary_text = wikipedia.summary(candidate, sentences=5)
page_title = candidate
break
except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError) as e:
error_details.append(f"{candidate}: {str(e)}")
continue
if not summary_text:
result = {"text": f"Failed to get Wikipedia summary for '{task}'. Errors: {'; '.join(error_details)}", "search_results": search_results}
self.memory.add_short_term({"task": task, "result": result, "success": False})
return result
self.memory.add_long_term(f"research:{search_term}", {"page_title": page_title, "summary": summary_text, "timestamp": pd.Timestamp.now().isoformat()})
result = {"text": f"Research on '{page_title}':\n{summary_text}", "page_title": page_title, "related_topics": search_results[:5], "source": "Wikipedia"}
self.memory.add_short_term({"task": task, "result": result, "success": True})
return result
except Exception as e:
error_msg = f"Error in web research: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
self.memory.add_short_term({"task": task, "result": result, "success": False})
return result
class WebScraperAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("web_scraper", hub)
def process_task(self, task: str) -> Dict[str, Any]:
logger.info(f"WebScraperAgent processing URL: {task}")
if not task.startswith(('http://', 'https://')):
return {"text": "Invalid URL format. Please provide a URL starting with http:// or https://"}
try:
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(task, headers=headers, timeout=10)
if response.status_code != 200:
result = {"text": f"Error: received status code {response.status_code} from {task}"}
self.memory.add_short_term({"url": task, "result": result, "success": False})
return result
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.string.strip() if soup.title and soup.title.string else "No title found"
main_content = soup.find('main') or soup.find(id='content') or soup.find(class_='content')
paras = main_content.find_all('p') if main_content else soup.find_all('p')
content = "\n".join([p.get_text().strip() for p in paras if len(p.get_text().strip()) > 50])
if len(content) > 2000 and self.hub.summarizer:
chunks = [content[i:i+1000] for i in range(0, len(content), 1000)]
summarized_chunks = []
for chunk in chunks:
summary = self.hub.summarizer(chunk, max_length=100, min_length=30, do_sample=False)
summarized_chunks.append(summary[0]['summary_text'])
content = "\n".join(summarized_chunks)
elif len(content) > 2000:
content = content[:2000] + "... (content truncated)"
links = []
for a in soup.find_all('a', href=True):
href = a['href']
if href.startswith('http') and len(links) < 5:
links.append({"url": href, "text": a.get_text().strip() or href})
result = {"text": f"Content from {task}:\n\nTitle: {title}\n\n{content}", "title": title, "raw_content": content, "links": links, "source_url": task}
self.memory.add_short_term({"url": task, "result": result, "success": True})
self.memory.add_long_term(f"scraped:{task}", {"title": title, "content_preview": content[:200], "timestamp": pd.Timestamp.now().isoformat()})
return result
except requests.RequestException as e:
error_msg = f"Request error for {task}: {str(e)}"
logger.error(error_msg)
return {"text": error_msg, "error": str(e)}
except Exception as e:
error_msg = f"Error scraping {task}: {str(e)}"
logger.error(error_msg)
return {"text": error_msg, "error": str(e)}
class TextProcessingAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("text_processing", hub)
def process_task(self, task: str) -> Dict[str, Any]:
logger.info(f"TextProcessingAgent processing text ({len(task)} chars)")
if not task or len(task) < 10:
return {"text": "Text too short to process meaningfully."}
results = {}
words = task.split()
sentences = task.split('. ')
results["statistics"] = {
"character_count": len(task),
"word_count": len(words),
"estimated_sentences": len(sentences),
"average_word_length": sum(len(word) for word in words) / len(words) if words else 0
}
if len(task) > 5000:
chunk_size = 500
chunking_strategy = "character_blocks"
elif len(words) > 200:
chunk_size = 50
chunking_strategy = "word_blocks"
else:
chunk_size = 5
chunking_strategy = "sentence_blocks"
if chunking_strategy == "character_blocks":
chunks = [task[i:i+chunk_size] for i in range(0, len(task), chunk_size)]
elif chunking_strategy == "word_blocks":
chunks = [' '.join(words[i:i+chunk_size]) for i in range(0, len(words), chunk_size)]
else:
chunks = ['. '.join(sentences[i:i+chunk_size]) + '.' for i in range(0, len(sentences), chunk_size)]
results["chunks"] = chunks
results["chunking_strategy"] = chunking_strategy
if self.hub.summarizer and len(task) > 200:
try:
task_for_summary = task[:1000] if len(task) > 1000 else task
summary = self.hub.summarizer(task_for_summary, max_length=100, min_length=30, do_sample=False)
results["summary"] = summary[0]['summary_text']
except Exception as e:
logger.error(f"Summarization error: {e}")
results["summary_error"] = str(e)
stop_words = set(['the', 'a', 'an', 'and', 'in', 'on', 'at', 'to', 'for', 'of', 'with'])
word_freq = {}
for word in words:
w = word.lower().strip('.,!?:;()-"\'')
if w and w not in stop_words and len(w) > 1:
word_freq[w] = word_freq.get(w, 0) + 1
results["frequent_words"] = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:10]
positive_words = set(['good', 'great', 'excellent', 'positive', 'happy', 'best', 'better', 'success'])
negative_words = set(['bad', 'worst', 'terrible', 'negative', 'sad', 'problem', 'fail', 'issue'])
pos_count = sum(1 for word in words if word.lower().strip('.,!?:;()-"\'') in positive_words)
neg_count = sum(1 for word in words if word.lower().strip('.,!?:;()-"\'') in negative_words)
sentiment = "possibly positive" if pos_count > neg_count and pos_count > 2 else ("possibly negative" if neg_count > pos_count and neg_count > 2 else "neutral or mixed")
results["basic_sentiment"] = {"assessment": sentiment, "positive_word_count": pos_count, "negative_word_count": neg_count}
self.memory.add_short_term({"task_preview": task[:100] + "..." if len(task) > 100 else task, "word_count": results["statistics"]["word_count"], "result": results})
text_response = (
f"Text Analysis Results:\n- {results['statistics']['word_count']} words, {results['statistics']['character_count']} characters\n"
f"- Split into {len(chunks)} chunks using {chunking_strategy}\n"
)
if "summary" in results:
text_response += f"\nSummary:\n{results['summary']}\n"
if results["frequent_words"]:
text_response += "\nMost frequent words:\n"
for word, count in results["frequent_words"][:5]:
text_response += f"- {word}: {count} occurrences\n"
text_response += f"\nOverall tone appears {results['basic_sentiment']['assessment']}"
results["text"] = text_response
return results
class DataAnalysisAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("data_analysis", hub)
def process_task(self, task: str) -> Dict[str, Any]:
logger.info(f"DataAnalysisAgent processing: {task}")
file_path = None
if "analyze" in task.lower() and ".csv" in task.lower():
for word in task.split():
if word.endswith('.csv'):
file_path = word
break
if not file_path or not Path(file_path).exists():
logger.info("No specific CSV file mentioned or file not found, creating sample data")
if "time series" in task.lower():
dates = pd.date_range(start='2023-01-01', periods=30, freq='D')
df = pd.DataFrame({'date': dates, 'value': np.random.normal(100, 15, 30), 'trend': np.linspace(0, 20, 30) + np.random.normal(0, 2, 30)})
file_path = "sample_timeseries.csv"
elif "sales" in task.lower():
products = ['ProductA', 'ProductB', 'ProductC', 'ProductD']
regions = ['North', 'South', 'East', 'West']
dates = pd.date_range(start='2023-01-01', periods=50, freq='D')
data = []
for _ in range(200):
data.append({'date': np.random.choice(dates), 'product': np.random.choice(products), 'region': np.random.choice(regions), 'units_sold': np.random.randint(10, 100), 'revenue': np.random.uniform(100, 1000)})
df = pd.DataFrame(data)
file_path = "sample_sales.csv"
else:
df = pd.DataFrame({
'A': np.random.normal(0, 1, 100),
'B': np.random.normal(5, 2, 100),
'C': np.random.uniform(-10, 10, 100),
'D': np.random.randint(0, 5, 100),
'label': np.random.choice(['X', 'Y', 'Z'], 100)
})
file_path = "sample_data.csv"
df.to_csv(file_path, index=False)
logger.info(f"Created sample data file: {file_path}")
else:
try:
df = pd.read_csv(file_path)
logger.info(f"Loaded existing file: {file_path}")
except Exception as e:
error_msg = f"Error loading CSV file {file_path}: {str(e)}"
logger.error(error_msg)
return {"text": error_msg, "error": str(e)}
analysis_results = {}
try:
numeric_cols = df.select_dtypes(include=[np.number]).columns
analysis_results["summary_stats"] = df[numeric_cols].describe().to_dict()
categorical_cols = df.select_dtypes(exclude=[np.number]).columns
for col in categorical_cols:
if df[col].nunique() < 10:
analysis_results[f"{col}_distribution"] = df[col].value_counts().to_dict()
except Exception as e:
logger.error(f"Error in basic statistics: {e}")
analysis_results["stats_error"] = str(e)
try:
missing_values = df.isnull().sum().to_dict()
analysis_results["missing_values"] = {k: v for k, v in missing_values.items() if v > 0}
except Exception as e:
logger.error(f"Error in missing values analysis: {e}")
analysis_results["missing_values_error"] = str(e)
try:
if len(numeric_cols) > 1:
analysis_results["correlations"] = df[numeric_cols].corr().to_dict()
except Exception as e:
logger.error(f"Error in correlation analysis: {e}")
analysis_results["correlation_error"] = str(e)
try:
plt.figure(figsize=(10, 8))
categorical_cols = df.select_dtypes(exclude=[np.number]).columns
if len(numeric_cols) >= 2:
plt.subplot(2, 1, 1)
x_col, y_col = numeric_cols[0], numeric_cols[1]
sample_df = df.sample(1000) if len(df) > 1000 else df
if len(categorical_cols) > 0 and df[categorical_cols[0]].nunique() < 10:
cat_col = categorical_cols[0]
for category, group in sample_df.groupby(cat_col):
plt.scatter(group[x_col], group[y_col], label=category, alpha=0.6)
plt.legend()
else:
plt.scatter(sample_df[x_col], sample_df[y_col], alpha=0.6)
plt.xlabel(x_col)
plt.ylabel(y_col)
plt.title(f"Scatter Plot: {x_col} vs {y_col}")
plt.subplot(2, 1, 2)
if 'date' in df.columns or any('time' in col.lower() for col in df.columns):
date_col = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()][0]
value_col = numeric_cols[0] if numeric_cols[0] != date_col else numeric_cols[1]
if not pd.api.types.is_datetime64_dtype(df[date_col]):
df[date_col] = pd.to_datetime(df[date_col], errors='coerce')
temp_df = df.dropna(subset=[date_col, value_col]).sort_values(date_col)
plt.plot(temp_df[date_col], temp_df[value_col])
plt.xlabel(date_col)
plt.ylabel(value_col)
plt.title(f"Time Series: {value_col} over {date_col}")
plt.xticks(rotation=45)
else:
plt.hist(df[numeric_cols[0]].dropna(), bins=20, alpha=0.7)
plt.xlabel(numeric_cols[0])
plt.ylabel('Frequency')
plt.title(f"Distribution of {numeric_cols[0]}")
else:
if len(categorical_cols) > 0:
cat_col = categorical_cols[0]
df[cat_col].value_counts().plot(kind='bar')
plt.xlabel(cat_col)
plt.ylabel('Count')
plt.title(f"Counts by {cat_col}")
plt.xticks(rotation=45)
else:
plt.hist(df[numeric_cols[0]].dropna(), bins=20)
plt.xlabel(numeric_cols[0])
plt.ylabel('Frequency')
plt.title(f"Distribution of {numeric_cols[0]}")
plt.tight_layout()
viz_path = f"{Path(file_path).stem}_viz.png"
plt.savefig(viz_path)
plt.close()
analysis_results["visualization_path"] = viz_path
analysis_results["visualization_created"] = True
logger.info(f"Created visualization: {viz_path}")
except Exception as e:
logger.error(f"Error creating visualization: {e}")
analysis_results["visualization_error"] = str(e)
analysis_results["visualization_created"] = False
insights = []
try:
for col in numeric_cols:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
outlier_count = ((df[col] < (q1 - 1.5 * iqr)) | (df[col] > (q3 + 1.5 * iqr))).sum()
if outlier_count > 0:
insights.append(f"Found {outlier_count} potential outliers in '{col}'")
if "correlations" in analysis_results:
for col1, corr_dict in analysis_results["correlations"].items():
for col2, corr_val in corr_dict.items():
if col1 != col2 and abs(corr_val) > 0.7:
insights.append(f"Strong correlation ({corr_val:.2f}) between '{col1}' and '{col2}'")
for col in categorical_cols:
if df[col].nunique() < 10:
value_counts = df[col].value_counts()
most_common = value_counts.idxmax()
most_common_pct = value_counts.max() / value_counts.sum() * 100
if most_common_pct > 80:
insights.append(f"Imbalanced category in '{col}': '{most_common}' accounts for {most_common_pct:.1f}% of data")
analysis_results["insights"] = insights
except Exception as e:
logger.error(f"Error extracting insights: {e}")
analysis_results["insights_error"] = str(e)
self.memory.add_short_term({"file": file_path, "columns": list(df.columns), "row_count": len(df), "analysis": analysis_results})
if "sample" in file_path:
self.memory.add_long_term(f"analysis:{file_path}", {"file": file_path, "type": "generated", "columns": list(df.columns), "row_count": len(df), "timestamp": pd.Timestamp.now().isoformat()})
column_list = ", ".join(df.columns[:5]) + (", ..." if len(df.columns) > 5 else "")
text_response = (
f"Data Analysis Results for {file_path}\n- Dataset: {len(df)} rows x {len(df.columns)} columns ({column_list})\n"
)
if "missing_values" in analysis_results and analysis_results["missing_values"]:
text_response += f"- Missing values found in {len(analysis_results['missing_values'])} columns\n"
if insights:
text_response += "\nKey Insights:\n"
for i, insight in enumerate(insights[:5], 1):
text_response += f"{i}. {insight}\n"
if len(insights) > 5:
text_response += f"... and {len(insights) - 5} more insights\n"
text_response += f"\nVisualization saved to {viz_path}" if analysis_results.get("visualization_created") else "\nNo visualization created"
analysis_results["text"] = text_response
analysis_results["dataframe_shape"] = df.shape
analysis_results["data_preview"] = df.head(5).to_dict()
return analysis_results
class CodingAssistantAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("coding_assistant", hub)
self.code_snippets = {
"file_operations": {
"read_file": '''
def read_file(file_path):
"""Read a file and return its contents"""
with open(file_path, 'r') as file:
return file.read()
''',
"write_file": '''
def write_file(file_path, content):
"""Write content to a file"""
with open(file_path, 'w') as file:
file.write(content)
return True
'''
},
"data_processing": {
"pandas_read_csv": '''
import pandas as pd
def load_csv(file_path):
"""Load a CSV file into a Pandas DataFrame"""
return pd.read_csv(file_path)
''',
"pandas_basic_stats": '''
def get_basic_stats(df):
"""Get basic statistics for a DataFrame"""
numeric_stats = df.describe()
categorical_columns = df.select_dtypes(include=['object']).columns
categorical_stats = {col: df[col].value_counts().to_dict() for col in categorical_columns}
return {
'numeric': numeric_stats.to_dict(),
'categorical': categorical_stats
}
'''
},
"visualization": {
"matplotlib_basic_plot": '''
import matplotlib.pyplot as plt
def create_basic_plot(data, x_col, y_col, title="Plot", kind="line"):
"""Create a basic plot using matplotlib"""
plt.figure(figsize=(10, 6))
if kind == "line":
plt.plot(data[x_col], data[y_col])
elif kind == "scatter":
plt.scatter(data[x_col], data[y_col])
elif kind == "bar":
plt.bar(data[x_col], data[y_col])
plt.title(title)
plt.xlabel(x_col)
plt.ylabel(y_col)
plt.tight_layout()
plt.savefig(f"{title.lower().replace(' ', '_')}.png")
plt.close()
return f"{title.lower().replace(' ', '_')}.png"
'''
},
"web_scraping": {
"requests_beautifulsoup": '''
import requests
from bs4 import BeautifulSoup
def scrape_webpage(url):
"""Scrape a webpage and extract text from paragraphs"""
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
paragraphs = soup.find_all('p')
text = [p.get_text() for p in paragraphs]
return {
'title': soup.title.string if soup.title else "No title",
'text': text,
'url': url
}
except Exception as e:
return {'error': str(e), 'url': url}
'''
},
"nlp": {
"basic_text_analysis": '''
from collections import Counter
import re
def analyze_text(text):
"""Perform basic text analysis"""
text = text.lower()
words = re.findall(r'\w+', text)
word_count = len(words)
unique_words = len(set(words))
stop_words = {'the', 'a', 'an', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'and', 'or'}
word_freq = Counter([w for w in words if w not in stop_words and len(w) > 1])
return {
'word_count': word_count,
'unique_words': unique_words,
'avg_word_length': sum(len(w) for w in words) / word_count if word_count else 0,
'most_common': word_freq.most_common(10)
}
'''
},
"machine_learning": {
"basic_classifier": '''
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
def train_basic_classifier(X, y, test_size=0.2, random_state=42):
"""Train a basic RandomForest classifier"""
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
model = RandomForestClassifier(n_estimators=100, random_state=random_state)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
report = classification_report(y_test, y_pred, output_dict=True)
return {
'model': model,
'accuracy': report['accuracy'],
'classification_report': report,
'feature_importance': dict(zip(range(X.shape[1]), model.feature_importances_))
}
'''
}
}
def process_task(self, task: str) -> Dict[str, Any]:
logger.info(f"CodingAssistantAgent processing: {task}")
task_lower = task.lower()
keyword_mapping = {
"file": "file_operations",
"read file": "file_operations",
"write file": "file_operations",
"csv": "data_processing",
"data": "data_processing",
"pandas": "data_processing",
"dataframe": "data_processing",
"plot": "visualization",
"chart": "visualization",
"graph": "visualization",
"visualize": "visualization",
"matplotlib": "visualization",
"scrape": "web_scraping",
"web": "web_scraping",
"html": "web_scraping",
"beautifulsoup": "web_scraping",
"text analysis": "nlp",
"nlp": "nlp",
"natural language": "nlp",
"word count": "nlp",
"text processing": "nlp",
"machine learning": "machine_learning",
"ml": "machine_learning",
"model": "machine_learning",
"predict": "machine_learning",
"classifier": "machine_learning"
}
code_category = None
function_name = None
for keyword, category in keyword_mapping.items():
if keyword in task_lower:
code_category = category
for func_name in self.code_snippets.get(category, {}):
natural_func = func_name.replace('_', ' ')
if natural_func in task_lower:
function_name = func_name
break
break
if not code_category:
if any(word in task_lower for word in ["add", "sum", "calculate", "compute"]):
code_category = "data_processing"
elif any(word in task_lower for word in ["show", "display", "generate"]):
code_category = "visualization"
if code_category and not function_name and self.code_snippets.get(code_category):
function_name = next(iter(self.code_snippets[code_category]))
if not code_category:
function_parts = [word for word in task_lower.split() if word not in ["a", "the", "an", "to", "for", "function", "code", "create", "make"]]
func_name = "_".join(function_parts[:2]) if len(function_parts) >= 2 else "custom_function"
custom_code = f"""
def {func_name}(input_data):
# Custom function based on your request: '{task}'
result = None
# TODO: Implement specific logic based on requirements
if isinstance(input_data, list):
result = len(input_data)
elif isinstance(input_data, str):
result = input_data.upper()
elif isinstance(input_data, (int, float)):
result = input_data * 2
return {{
'input': input_data,
'result': result,
'status': 'processed'
}}
"""
result = {
"text": f"I've created a custom function template based on your request:\n\n```python\n{custom_code}\n```\n\nThis is a starting point you can customize further.",
"code": custom_code,
"language": "python",
"type": "custom"
}
else:
code_snippet = self.code_snippets[code_category][function_name]
result = {
"text": f"Here's a {code_category.replace('_', ' ')} function for {function_name.replace('_', ' ')}:\n\n```python\n{code_snippet}\n```\n\nYou can customize this code.",
"code": code_snippet,
"language": "python",
"category": code_category,
"function": function_name
}
self.memory.add_short_term({"task": task, "code_category": code_category, "function_provided": function_name, "timestamp": pd.Timestamp.now().isoformat()})
return result
class ImageProcessingAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("image_processing", hub)
def process_task(self, task: Any) -> Dict[str, Any]:
logger.info("ImageProcessingAgent processing task")
image = None
task_type = None
if isinstance(task, Image.Image):
image = task
task_type = "direct_image"
elif isinstance(task, str):
if Path(task).exists() and Path(task).suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']:
try:
image = Image.open(task)
task_type = "image_path"
except Exception as e:
return {"text": f"Error loading image from {task}: {str(e)}", "error": str(e)}
else:
task_type = "text_instruction"
elif isinstance(task, dict) and 'image' in task:
if isinstance(task['image'], Image.Image):
image = task['image']
elif isinstance(task['image'], str) and Path(task['image']).exists():
try:
image = Image.open(task['image'])
except Exception as e:
return {"text": f"Error loading image from {task['image']}: {str(e)}", "error": str(e)}
task_type = "dict_with_image"
if task_type == "text_instruction" and not image:
return {"text": "Please provide an image to process along with instructions."}
if not image:
return {"text": "No valid image provided for processing."}
processing_type = "edge_detection"
if task_type in ["text_instruction", "dict_with_image"] and isinstance(task, dict):
instruction = task.get('instruction', '').lower()
if 'blur' in instruction or 'smooth' in instruction:
processing_type = "blur"
elif 'edge' in instruction or 'contour' in instruction:
processing_type = "edge_detection"
elif 'gray' in instruction or 'greyscale' in instruction or 'black and white' in instruction:
processing_type = "grayscale"
elif 'bright' in instruction or 'contrast' in instruction:
processing_type = "enhance"
elif 'resize' in instruction or 'scale' in instruction:
processing_type = "resize"
try:
img_array = np.array(image)
if img_array.ndim == 3 and img_array.shape[-1] == 4:
img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGBA2BGR)
else:
img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
processed_img = None
processing_details = {"original_size": image.size}
if processing_type == "edge_detection":
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 100, 200)
processed_img = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
processing_details["processing"] = "Edge detection using Canny"
elif processing_type == "blur":
processed_img = cv2.GaussianBlur(img_cv, (7, 7), 0)
processing_details["processing"] = "Gaussian Blur"
elif processing_type == "grayscale":
processed_img = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
processed_img = cv2.cvtColor(processed_img, cv2.COLOR_GRAY2BGR)
processing_details["processing"] = "Grayscale conversion"
elif processing_type == "enhance":
lab = cv2.cvtColor(img_cv, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
cl = clahe.apply(l)
limg = cv2.merge((cl, a, b))
processed_img = cv2.cvtColor(limg, cv2.COLOR_LAB2BGR)
processing_details["processing"] = "Contrast enhancement"
elif processing_type == "resize":
processed_img = cv2.resize(img_cv, (image.size[0]//2, image.size[1]//2))
processing_details["processing"] = "Resized to half"
else:
processed_img = img_cv
processing_details["processing"] = "No processing applied"
processed_pil = Image.fromarray(cv2.cvtColor(processed_img, cv2.COLOR_BGR2RGB))
return {"text": f"Image processing completed with {processing_details['processing']}.", "image": processed_pil, "details": processing_details}
except Exception as e:
error_msg = f"Error processing image: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
return {"text": f"Error processing image: {str(e)}", "error": str(e)}
class FileManagementAgent(IntelligentAgent):
def __init__(self, hub: AgentHub):
super().__init__("file_management", hub)
def process_task(self, task: str) -> Dict[str, Any]:
logger.info(f"FileManagementAgent processing: {task}")
task_lower = task.lower()
if any(word in task_lower for word in ["create", "make", "generate", "write"]):
operation = "create"
elif any(word in task_lower for word in ["read", "open", "show", "display", "content"]):
operation = "read"
elif any(word in task_lower for word in ["list", "find", "directory", "folder", "files in"]):
operation = "list"
elif any(word in task_lower for word in ["delete", "remove"]):
operation = "delete"
else:
operation = "unknown"
filename = None
file_extensions = ['.txt', '.json', '.csv', '.md', '.py', '.html', '.js', '.css']
words = task.split()
for word in words:
for ext in file_extensions:
if ext in word.lower():
filename = word.strip(':"\'.,;')
break
if filename:
break
if not filename:
file_keywords = ["file", "named", "called", "filename"]
for i, word in enumerate(words):
if word.lower() in file_keywords and i < len(words) - 1:
potential_name = words[i+1].strip(':"\'.,;')
if '.' not in potential_name:
if "json" in task_lower:
potential_name += ".json"
elif "csv" in task_lower:
potential_name += ".csv"
elif "python" in task_lower or "py" in task_lower:
potential_name += ".py"
else:
potential_name += ".txt"
filename = potential_name
break
if not filename:
if "json" in task_lower:
filename = f"data_{uuid.uuid4().hex[:6]}.json"
elif "csv" in task_lower:
filename = f"data_{uuid.uuid4().hex[:6]}.csv"
elif "python" in task_lower or "py" in task_lower:
filename = f"script_{uuid.uuid4().hex[:6]}.py"
elif "log" in task_lower:
filename = f"log_{uuid.uuid4().hex[:6]}.txt"
else:
filename = f"file_{uuid.uuid4().hex[:6]}.txt"
result = {}
if operation == "create":
if filename.endswith('.json'):
content = json.dumps({
"name": "Sample Data",
"description": task,
"created": pd.Timestamp.now().isoformat(),
"values": [1, 2, 3, 4, 5],
"metadata": {"source": "FileManagementAgent", "version": "1.0"}
}, indent=2)
elif filename.endswith('.csv'):
content = "id,name,value,timestamp\n"
for i in range(5):
content += f"{i+1},Item{i+1},{np.random.randint(1, 100)},{pd.Timestamp.now().isoformat()}\n"
elif filename.endswith('.py'):
content = f"""# Generated Python Script: {filename}
# Created: {pd.Timestamp.now().isoformat()}
# Description: {task}
def main():
print("Hello from the FileManagementAgent!")
data = [1, 2, 3, 4, 5]
result = sum(data)
print(f"Sample calculation: sum(data) = {{result}}")
return result
if __name__ == "__main__":
main()
"""
else:
content = f"File created by FileManagementAgent\nCreated: {pd.Timestamp.now().isoformat()}\nBased on request: {task}\n\nThis is sample content."
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write(content)
result = {"text": f"Successfully created file: {filename}", "operation": "create", "filename": filename, "size": len(content), "preview": content[:200] + "..." if len(content) > 200 else content}
self.memory.add_short_term({"operation": "create", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()})
self.memory.add_long_term(f"file:{filename}", {"operation": "create", "type": Path(filename).suffix, "timestamp": pd.Timestamp.now().isoformat()})
except Exception as e:
error_msg = f"Error creating file {filename}: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
elif operation == "read":
if not filename:
result = {"text": "Please specify a filename to read."}
elif not Path(filename).exists():
result = {"text": f"File '{filename}' not found."}
else:
try:
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
result = {"text": f"Content of {filename}:\n\n{content}", "operation": "read", "filename": filename, "content": content, "size": len(content)}
self.memory.add_short_term({"operation": "read", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()})
except Exception as e:
error_msg = f"Error reading file {filename}: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
elif operation == "list":
try:
directory = "."
for term in ["directory", "folder", "in"]:
if term in task_lower:
parts = task_lower.split(term)
if len(parts) > 1:
potential_dir = parts[1].strip().split()[0].strip(':"\'.,;')
if Path(potential_dir).exists() and Path(potential_dir).is_dir():
directory = potential_dir
extension_filter = None
for ext in file_extensions:
if ext in task_lower:
extension_filter = ext
break
files = list(Path(directory).glob('*' + (extension_filter or '')))
file_groups = {}
for file in files:
file_groups.setdefault(file.suffix, []).append({
"name": file.name,
"size": file.stat().st_size,
"modified": pd.Timestamp(file.stat().st_mtime, unit='s').isoformat()
})
response_text = f"Found {len(files)} files" + (f" with extension {extension_filter}" if extension_filter else "") + f" in {directory}:\n\n"
for ext, group in file_groups.items():
response_text += f"{ext} files ({len(group)}):\n"
for file_info in sorted(group, key=lambda x: x["name"]):
size_kb = file_info["size"] / 1024
response_text += f"- {file_info['name']} ({size_kb:.1f} KB, modified: {file_info['modified']})\n"
response_text += "\n"
result = {"text": response_text, "operation": "list", "directory": directory, "file_count": len(files), "files": file_groups}
self.memory.add_short_term({"operation": "list", "directory": directory, "file_count": len(files), "timestamp": pd.Timestamp.now().isoformat()})
except Exception as e:
error_msg = f"Error listing files: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
elif operation == "delete":
if not filename:
result = {"text": "Please specify a filename to delete."}
elif not Path(filename).exists():
result = {"text": f"File '{filename}' not found."}
else:
try:
os.remove(filename)
result = {"text": f"Successfully deleted file: {filename}", "operation": "delete", "filename": filename}
self.memory.add_short_term({"operation": "delete", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()})
self.memory.add_long_term(f"file:{filename}", {"operation": "delete", "timestamp": pd.Timestamp.now().isoformat()})
except Exception as e:
error_msg = f"Error deleting file {filename}: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
else:
result = {"text": f"Unknown operation requested in task: {task}"}
return result
# ---------------------------
# Gradio Interface Setup
# ---------------------------
def create_agent_hub():
hub = AgentHub()
hub.register_agent("web_research", WebResearchAgent(hub))
hub.register_agent("web_scraper", WebScraperAgent(hub))
hub.register_agent("text_processing", TextProcessingAgent(hub))
hub.register_agent("data_analysis", DataAnalysisAgent(hub))
hub.register_agent("coding_assistant", CodingAssistantAgent(hub))
hub.register_agent("image_processing", ImageProcessingAgent(hub))
hub.register_agent("file_management", FileManagementAgent(hub))
return hub
def create_gradio_interface():
hub = create_agent_hub()
def process_request(request_type, input_data, extra_data=""):
try:
if request_type == "chain":
agent_sequence = [agent.strip() for agent in extra_data.split(",") if agent.strip()]
return hub.chain_of_thought(input_data, agent_sequence)
else:
agent = hub.get_agent(request_type)
if not agent:
return {"error": f"Unknown agent type: {request_type}"}
return agent.process_task(input_data)
except Exception as e:
logger.error(f"Error processing request: {e}")
return {"error": str(e)}
with gr.Blocks(title="SmolAgents Toolbelt") as interface:
gr.Markdown("# SmolAgents Toolbelt")
gr.Markdown("A collection of specialized agents for various tasks with evolved logic :contentReference[oaicite:0]{index=0}.")
with gr.Tabs():
with gr.Tab("Single Agent"):
agent_type = gr.Dropdown(
choices=["web_research", "web_scraper", "text_processing", "data_analysis", "coding_assistant", "image_processing", "file_management"],
label="Select Agent",
value="web_research"
)
with gr.Row():
input_text = gr.Textbox(label="Input", placeholder="Enter your request...")
extra_input = gr.Textbox(label="Extra (e.g., image path or additional info)", placeholder="Optional extra input...")
output_text = gr.JSON(label="Output")
process_btn = gr.Button("Process")
process_btn.click(fn=process_request, inputs=[agent_type, input_text, extra_input], outputs=output_text)
with gr.Tab("Chain of Thought"):
chain_input = gr.Textbox(label="Input", placeholder="Enter your request for the chain...")
chain_sequence = gr.Textbox(label="Agent Sequence", placeholder="Comma-separated agent names (e.g., text_processing,data_analysis)")
chain_output = gr.JSON(label="Chain Output")
chain_type = gr.State("chain")
chain_btn = gr.Button("Process Chain")
chain_btn.click(fn=process_request, inputs=[chain_type, chain_input, chain_sequence], outputs=chain_output)
with gr.Tab("Help"):
gr.Markdown("""
## Available Agents
- **Web Research Agent**: Searches Wikipedia for information.
- **Web Scraper Agent**: Scrapes content from provided URLs.
- **Text Processing Agent**: Analyzes and processes text.
- **Data Analysis Agent**: Performs data analysis and visualization.
- **Coding Assistant Agent**: Generates code snippets.
- **Image Processing Agent**: Processes images based on instructions.
- **File Management Agent**: Handles file creation, reading, listing, and deletion.
### Usage
1. Select an agent (or choose 'Chain of Thought' for a sequence).
2. Enter your request.
3. For chains, provide a comma-separated list of agent IDs.
""")
return interface
if __name__ == "__main__":
demo = create_gradio_interface()
demo.launch(server_name="0.0.0.0", server_port=7860, share=True)
|