Spaces:
Runtime error
Runtime error
from typing import Dict, List, Any | |
import networkx as nx | |
from rdflib import Graph, Literal, RDF, URIRef | |
from loguru import logger | |
from utils.llm_orchestrator import LLMOrchestrator | |
import json | |
from datetime import datetime | |
class KnowledgeManagementLayer: | |
def __init__(self, llm_api_key: str): | |
"""Initialize the Knowledge Management Layer.""" | |
self.llm_orchestrator = LLMOrchestrator(llm_api_key) | |
self.knowledge_graph = nx.DiGraph() | |
self.rdf_graph = Graph() | |
self.setup_logger() | |
def setup_logger(self): | |
"""Configure logging for the knowledge management layer.""" | |
logger.add("logs/knowledge_management.log", rotation="500 MB") | |
async def update_knowledge_graph( | |
self, new_info: Dict[str, Any]) -> Dict[str, str]: | |
"""Update the knowledge graph with new information.""" | |
logger.info("Updating knowledge graph with new information") | |
entities_added = 0 | |
relations_added = 0 | |
try: | |
# Process new information using LLM | |
processed_info = await self.process_information(new_info) | |
# Add nodes and edges to the graph | |
for entity in processed_info['entities']: | |
self.knowledge_graph.add_node( | |
entity['id'], | |
**entity['attributes'] | |
) | |
entities_added += 1 | |
for relation in processed_info['relations']: | |
self.knowledge_graph.add_edge( | |
relation['source'], | |
relation['target'], | |
**relation['attributes'] | |
) | |
relations_added += 1 | |
# Update RDF graph | |
await self.update_rdf_graph(processed_info) | |
logger.info( | |
f"Successfully updated knowledge graph: Added {entities_added} entities and {relations_added} relations") | |
return { | |
'status': 'success', | |
'message': f"Added {entities_added} entities and {relations_added} relations" | |
} | |
except Exception as e: | |
logger.error(f"Error updating knowledge graph: {str(e)}") | |
logger.error( | |
f"Processed {entities_added} entities and {relations_added} relations before error") | |
return { | |
'status': 'error', | |
'message': str(e) | |
} | |
async def process_information( | |
self, info: Dict[str, Any]) -> Dict[str, Any]: | |
"""Process raw information using LLM to extract entities and relations.""" | |
logger.info("Processing information to extract entities and relations") | |
try: | |
# Generate prompt for entity extraction | |
entity_prompt = f""" | |
Extract entities and their attributes from the following information: | |
{json.dumps(info, indent=2)} | |
Return the entities in the following format: | |
- Entity ID | |
- Entity Type | |
- Attributes (key-value pairs) | |
""" | |
entity_response = await self.llm_orchestrator.generate_completion(entity_prompt) | |
entities = self.parse_llm_response(entity_response, 'entities') | |
logger.info(f"Extracted {len(entities)} entities") | |
# Generate prompt for relation extraction | |
relation_prompt = f""" | |
Extract relations between entities from the following information: | |
{json.dumps(info, indent=2)} | |
Entities found: | |
{json.dumps(entities, indent=2)} | |
Return the relations in the following format: | |
- Source Entity ID | |
- Target Entity ID | |
- Relation Type | |
- Attributes (key-value pairs) | |
""" | |
relation_response = await self.llm_orchestrator.generate_completion(relation_prompt) | |
relations = self.parse_llm_response(relation_response, 'relations') | |
logger.info(f"Extracted {len(relations)} relations") | |
return { | |
'entities': entities, | |
'relations': relations | |
} | |
except Exception as e: | |
logger.error(f"Error processing information: {str(e)}") | |
raise | |
async def update_rdf_graph(self, processed_info: Dict[str, Any]): | |
"""Update the RDF graph with processed information.""" | |
try: | |
for entity in processed_info['entities']: | |
subject = URIRef(f"entity:{entity['id']}") | |
self.rdf_graph.add( | |
(subject, RDF.type, URIRef(f"type:{entity['type']}"))) | |
for key, value in entity['attributes'].items(): | |
self.rdf_graph.add( | |
(subject, URIRef(f"attribute:{key}"), Literal(value))) | |
for relation in processed_info['relations']: | |
subject = URIRef(f"entity:{relation['source']}") | |
obj = URIRef(f"entity:{relation['target']}") | |
predicate = URIRef(f"relation:{relation['type']}") | |
self.rdf_graph.add((subject, predicate, obj)) | |
for key, value in relation['attributes'].items(): | |
self.rdf_graph.add( | |
(predicate, URIRef(f"attribute:{key}"), Literal(value))) | |
except Exception as e: | |
logger.error(f"Error updating RDF graph: {str(e)}") | |
raise | |
async def query_knowledge(self, query: Dict[str, Any]) -> Dict[str, Any]: | |
"""Query the knowledge graph based on specific criteria.""" | |
try: | |
# Generate SPARQL query using LLM | |
sparql_prompt = f""" | |
Generate a SPARQL query for the following search criteria: | |
{json.dumps(query, indent=2)} | |
""" | |
sparql_query = await self.llm_orchestrator.generate_completion(sparql_prompt) | |
# Execute query on RDF graph | |
results = self.rdf_graph.query(sparql_query) | |
# Process and format results | |
formatted_results = await self.format_query_results(results) | |
return { | |
'status': 'success', | |
'results': formatted_results | |
} | |
except Exception as e: | |
logger.error(f"Error querying knowledge graph: {str(e)}") | |
return { | |
'status': 'error', | |
'message': str(e) | |
} | |
async def generate_insights( | |
self, context: Dict[str, Any]) -> List[Dict[str, Any]]: | |
"""Generate insights from the knowledge graph.""" | |
try: | |
# Extract relevant subgraph based on context | |
subgraph = self.extract_relevant_subgraph(context) | |
# Generate insights using LLM | |
insight_prompt = f""" | |
Generate insights from the following knowledge graph data: | |
Nodes: {len(subgraph.nodes)} | |
Edges: {len(subgraph.edges)} | |
Context: {json.dumps(context, indent=2)} | |
Graph Summary: | |
{self.summarize_subgraph(subgraph)} | |
""" | |
insights = await self.llm_orchestrator.generate_completion(insight_prompt) | |
return self.parse_llm_response(insights, 'insights') | |
except Exception as e: | |
logger.error(f"Error generating insights: {str(e)}") | |
raise | |
def extract_relevant_subgraph(self, context: Dict[str, Any]) -> nx.DiGraph: | |
"""Extract a relevant subgraph based on context.""" | |
# Implementation would include logic to extract relevant portions of the graph | |
# based on the provided context | |
return self.knowledge_graph | |
def summarize_subgraph(self, subgraph: nx.DiGraph) -> str: | |
"""Generate a summary of the subgraph.""" | |
summary = { | |
'node_types': {}, | |
'edge_types': {}, | |
'key_entities': [] | |
} | |
# Count node types | |
for node in subgraph.nodes(data=True): | |
node_type = node[1].get('type', 'unknown') | |
summary['node_types'][node_type] = summary['node_types'].get( | |
node_type, 0) + 1 | |
# Count edge types | |
for edge in subgraph.edges(data=True): | |
edge_type = edge[2].get('type', 'unknown') | |
summary['edge_types'][edge_type] = summary['edge_types'].get( | |
edge_type, 0) + 1 | |
# Identify key entities (e.g., nodes with highest degree) | |
for node in sorted(subgraph.degree, key=lambda x: x[1], reverse=True)[ | |
:5]: | |
summary['key_entities'].append({ | |
'id': node[0], | |
'degree': node[1] | |
}) | |
return json.dumps(summary, indent=2) | |
def parse_llm_response( | |
response: str, response_type: str) -> List[Dict[str, Any]]: | |
"""Parse LLM response into structured data.""" | |
# Implementation would include logic to parse the LLM's response | |
# into a structured format based on the response_type | |
return [] # Placeholder return | |
async def backup_knowledge(self, backup_path: str): | |
"""Backup the knowledge graph to a file.""" | |
try: | |
timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S') | |
# Backup NetworkX graph | |
nx.write_gpickle( | |
self.knowledge_graph, | |
f"{backup_path}/knowledge_graph_{timestamp}.gpickle") | |
# Backup RDF graph | |
self.rdf_graph.serialize( | |
f"{backup_path}/rdf_graph_{timestamp}.ttl", | |
format="turtle") | |
logger.info( | |
f"Knowledge graph backed up successfully at {timestamp}") | |
except Exception as e: | |
logger.error(f"Error backing up knowledge graph: {str(e)}") | |
raise | |