agenticAi / core /knowledge_management_layer.py
Cline
Initial commit
0af0a55
from typing import Dict, List, Any
import networkx as nx
from rdflib import Graph, Literal, RDF, URIRef
from loguru import logger
from utils.llm_orchestrator import LLMOrchestrator
import json
from datetime import datetime
class KnowledgeManagementLayer:
def __init__(self, llm_api_key: str):
"""Initialize the Knowledge Management Layer."""
self.llm_orchestrator = LLMOrchestrator(llm_api_key)
self.knowledge_graph = nx.DiGraph()
self.rdf_graph = Graph()
self.setup_logger()
def setup_logger(self):
"""Configure logging for the knowledge management layer."""
logger.add("logs/knowledge_management.log", rotation="500 MB")
async def update_knowledge_graph(
self, new_info: Dict[str, Any]) -> Dict[str, str]:
"""Update the knowledge graph with new information."""
logger.info("Updating knowledge graph with new information")
entities_added = 0
relations_added = 0
try:
# Process new information using LLM
processed_info = await self.process_information(new_info)
# Add nodes and edges to the graph
for entity in processed_info['entities']:
self.knowledge_graph.add_node(
entity['id'],
**entity['attributes']
)
entities_added += 1
for relation in processed_info['relations']:
self.knowledge_graph.add_edge(
relation['source'],
relation['target'],
**relation['attributes']
)
relations_added += 1
# Update RDF graph
await self.update_rdf_graph(processed_info)
logger.info(
f"Successfully updated knowledge graph: Added {entities_added} entities and {relations_added} relations")
return {
'status': 'success',
'message': f"Added {entities_added} entities and {relations_added} relations"
}
except Exception as e:
logger.error(f"Error updating knowledge graph: {str(e)}")
logger.error(
f"Processed {entities_added} entities and {relations_added} relations before error")
return {
'status': 'error',
'message': str(e)
}
async def process_information(
self, info: Dict[str, Any]) -> Dict[str, Any]:
"""Process raw information using LLM to extract entities and relations."""
logger.info("Processing information to extract entities and relations")
try:
# Generate prompt for entity extraction
entity_prompt = f"""
Extract entities and their attributes from the following information:
{json.dumps(info, indent=2)}
Return the entities in the following format:
- Entity ID
- Entity Type
- Attributes (key-value pairs)
"""
entity_response = await self.llm_orchestrator.generate_completion(entity_prompt)
entities = self.parse_llm_response(entity_response, 'entities')
logger.info(f"Extracted {len(entities)} entities")
# Generate prompt for relation extraction
relation_prompt = f"""
Extract relations between entities from the following information:
{json.dumps(info, indent=2)}
Entities found:
{json.dumps(entities, indent=2)}
Return the relations in the following format:
- Source Entity ID
- Target Entity ID
- Relation Type
- Attributes (key-value pairs)
"""
relation_response = await self.llm_orchestrator.generate_completion(relation_prompt)
relations = self.parse_llm_response(relation_response, 'relations')
logger.info(f"Extracted {len(relations)} relations")
return {
'entities': entities,
'relations': relations
}
except Exception as e:
logger.error(f"Error processing information: {str(e)}")
raise
async def update_rdf_graph(self, processed_info: Dict[str, Any]):
"""Update the RDF graph with processed information."""
try:
for entity in processed_info['entities']:
subject = URIRef(f"entity:{entity['id']}")
self.rdf_graph.add(
(subject, RDF.type, URIRef(f"type:{entity['type']}")))
for key, value in entity['attributes'].items():
self.rdf_graph.add(
(subject, URIRef(f"attribute:{key}"), Literal(value)))
for relation in processed_info['relations']:
subject = URIRef(f"entity:{relation['source']}")
obj = URIRef(f"entity:{relation['target']}")
predicate = URIRef(f"relation:{relation['type']}")
self.rdf_graph.add((subject, predicate, obj))
for key, value in relation['attributes'].items():
self.rdf_graph.add(
(predicate, URIRef(f"attribute:{key}"), Literal(value)))
except Exception as e:
logger.error(f"Error updating RDF graph: {str(e)}")
raise
async def query_knowledge(self, query: Dict[str, Any]) -> Dict[str, Any]:
"""Query the knowledge graph based on specific criteria."""
try:
# Generate SPARQL query using LLM
sparql_prompt = f"""
Generate a SPARQL query for the following search criteria:
{json.dumps(query, indent=2)}
"""
sparql_query = await self.llm_orchestrator.generate_completion(sparql_prompt)
# Execute query on RDF graph
results = self.rdf_graph.query(sparql_query)
# Process and format results
formatted_results = await self.format_query_results(results)
return {
'status': 'success',
'results': formatted_results
}
except Exception as e:
logger.error(f"Error querying knowledge graph: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
async def generate_insights(
self, context: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate insights from the knowledge graph."""
try:
# Extract relevant subgraph based on context
subgraph = self.extract_relevant_subgraph(context)
# Generate insights using LLM
insight_prompt = f"""
Generate insights from the following knowledge graph data:
Nodes: {len(subgraph.nodes)}
Edges: {len(subgraph.edges)}
Context: {json.dumps(context, indent=2)}
Graph Summary:
{self.summarize_subgraph(subgraph)}
"""
insights = await self.llm_orchestrator.generate_completion(insight_prompt)
return self.parse_llm_response(insights, 'insights')
except Exception as e:
logger.error(f"Error generating insights: {str(e)}")
raise
def extract_relevant_subgraph(self, context: Dict[str, Any]) -> nx.DiGraph:
"""Extract a relevant subgraph based on context."""
# Implementation would include logic to extract relevant portions of the graph
# based on the provided context
return self.knowledge_graph
def summarize_subgraph(self, subgraph: nx.DiGraph) -> str:
"""Generate a summary of the subgraph."""
summary = {
'node_types': {},
'edge_types': {},
'key_entities': []
}
# Count node types
for node in subgraph.nodes(data=True):
node_type = node[1].get('type', 'unknown')
summary['node_types'][node_type] = summary['node_types'].get(
node_type, 0) + 1
# Count edge types
for edge in subgraph.edges(data=True):
edge_type = edge[2].get('type', 'unknown')
summary['edge_types'][edge_type] = summary['edge_types'].get(
edge_type, 0) + 1
# Identify key entities (e.g., nodes with highest degree)
for node in sorted(subgraph.degree, key=lambda x: x[1], reverse=True)[
:5]:
summary['key_entities'].append({
'id': node[0],
'degree': node[1]
})
return json.dumps(summary, indent=2)
@staticmethod
def parse_llm_response(
response: str, response_type: str) -> List[Dict[str, Any]]:
"""Parse LLM response into structured data."""
# Implementation would include logic to parse the LLM's response
# into a structured format based on the response_type
return [] # Placeholder return
async def backup_knowledge(self, backup_path: str):
"""Backup the knowledge graph to a file."""
try:
timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
# Backup NetworkX graph
nx.write_gpickle(
self.knowledge_graph,
f"{backup_path}/knowledge_graph_{timestamp}.gpickle")
# Backup RDF graph
self.rdf_graph.serialize(
f"{backup_path}/rdf_graph_{timestamp}.ttl",
format="turtle")
logger.info(
f"Knowledge graph backed up successfully at {timestamp}")
except Exception as e:
logger.error(f"Error backing up knowledge graph: {str(e)}")
raise