import ast from typing import Dict, Any, List from loguru import logger import os import re class CodeAnalysisAgent: def __init__(self): """Initialize the Code Analysis Agent.""" logger.info("Initializing CodeAnalysisAgent") self.capabilities = [ "code_analysis", "static_analysis", "repository_analysis", "code_summarization", "vulnerability_detection", "debugging", "bug_detection", "code_smell_detection" ] self.setup_logger() async def find_bugs(self, file_path: str) -> List[str]: """Find potential bugs in a code file using static analysis.""" logger.info(f"Finding bugs in file {file_path}") bugs = [] try: with open(file_path, "r") as f: source_code = f.read() tree = ast.parse(source_code) # Example: Find unclosed files for node in ast.walk(tree): if isinstance(node, ast.Call) and isinstance( node.func, ast.Name) and node.func.id == 'open': if not any(isinstance(parent, ast.With) for parent in ast.walk(node)): bugs.append( f"Line {node.lineno}: Potential unclosed file") # Add more bug detection patterns here except Exception as e: logger.error(f"Error finding bugs in file {file_path}: {str(e)}") return bugs def setup_logger(self): """Configure logging for the agent.""" logger.add("logs/code_analysis_agent.log", rotation="500 MB") async def analyze_repository(self, repo_path: str) -> Dict[str, Any]: """Analyze a code repository.""" logger.info(f"Analyzing repository at {repo_path}") try: code_files = self.collect_code_files(repo_path) analysis_results = [] for file_path in code_files: analysis_results.append(await self.analyze_file(file_path)) summary = self.generate_repository_summary(analysis_results) logger.info(f"Finished analyzing repository at {repo_path}") return { "status": "success", "results": analysis_results, "summary": summary } except Exception as e: logger.error(f"Error analyzing repository {repo_path}: {str(e)}") return { "status": "error", "message": str(e) } def collect_code_files(self, repo_path: str) -> List[str]: """Collect all code files in a repository.""" code_files = [] for root, _, files in os.walk(repo_path): for file in files: if file.endswith( ".py"): # Currently only supports Python files code_files.append(os.path.join(root, file)) return code_files async def analyze_file(self, file_path: str) -> Dict[str, Any]: """Analyze a single code file.""" logger.info(f"Analyzing file {file_path}") try: with open(file_path, "r") as f: source_code = f.read() # Basic static analysis using AST tree = ast.parse(source_code) # Extract functions and classes functions = [node.name for node in ast.walk( tree) if isinstance(node, ast.FunctionDef)] classes = [node.name for node in ast.walk( tree) if isinstance(node, ast.ClassDef)] # Basic complexity analysis (Cyclomatic Complexity) complexity = self.calculate_complexity(tree) # Identify potential vulnerabilities (basic example) vulnerabilities = self.detect_vulnerabilities(source_code) return { "file_path": file_path, "functions": functions, "classes": classes, "complexity": complexity, "vulnerabilities": vulnerabilities } except Exception as e: logger.error(f"Error analyzing file {file_path}: {str(e)}") return { "file_path": file_path, "error": str(e) } def calculate_complexity(self, tree: ast.AST) -> int: """Calculate the Cyclomatic Complexity of a code snippet.""" complexity = 1 for node in ast.walk(tree): if isinstance(node, (ast.If, ast.While, ast.For, ast.AsyncFor, ast.With, ast.AsyncWith, ast.Try)): complexity += 1 return complexity def detect_vulnerabilities(self, source_code: str) -> List[str]: """Detect potential vulnerabilities in the code (basic example).""" vulnerabilities = [] # Detect SQL injection patterns if re.search(r"SELECT \* FROM .* WHERE .*", source_code): vulnerabilities.append("Potential SQL injection vulnerability") # Detect hardcoded credentials if re.search(r"password\s*=\s*['\"].*['\"]", source_code, re.IGNORECASE): vulnerabilities.append("Potential hardcoded credentials") return vulnerabilities def detect_code_smells(self, tree: ast.AST) -> List[str]: """Detect potential code smells in the code.""" code_smells = [] # Example: Long function for node in ast.walk(tree): if isinstance(node, ast.FunctionDef) and len(node.body) > 50: code_smells.append(f"Line {node.lineno}: Long function") # Example: Large class for node in ast.walk(tree): if isinstance(node, ast.ClassDef) and len(node.body) > 100: code_smells.append(f"Line {node.lineno}: Large class") return code_smells def generate_repository_summary( self, analysis_results: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate a summary of the repository analysis.""" total_files = len(analysis_results) total_functions = sum(len(result.get("functions", [])) for result in analysis_results) total_classes = sum(len(result.get("classes", [])) for result in analysis_results) average_complexity = sum( result.get( "complexity", 0) for result in analysis_results) / total_files if total_files > 0 else 0 total_vulnerabilities = sum( len(result.get("vulnerabilities", [])) for result in analysis_results) return { "total_files": total_files, "total_functions": total_functions, "total_classes": total_classes, "average_complexity": average_complexity, "total_vulnerabilities": total_vulnerabilities }