# src/relationships.py from typing import Dict, List, Tuple, Optional, Any import sqlite3 from datetime import datetime from dataclasses import dataclass @dataclass class Entity: """Entity data structure.""" id: Optional[int] text: str type: str first_seen: str last_seen: str frequency: int confidence: float @dataclass class Relationship: """Relationship data structure.""" id: Optional[int] source_id: int target_id: int type: str confidence: float first_seen: str last_seen: str class RelationshipEngine: """Engine for managing entity and event relationships.""" def __init__(self, db_path: str = ':memory:'): """Initialize the relationship engine with database connection.""" self.conn = sqlite3.connect(db_path, check_same_thread=False) self.conn.row_factory = sqlite3.Row # Critical fix self.setup_database() def setup_database(self) -> None: """Initialize database schema.""" self.conn.executescript(''' CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY, text TEXT, timestamp DATETIME, confidence REAL ); CREATE TABLE IF NOT EXISTS entities ( id INTEGER PRIMARY KEY, entity_text TEXT, entity_type TEXT, first_seen DATETIME, last_seen DATETIME, frequency INTEGER DEFAULT 1, confidence REAL ); CREATE TABLE IF NOT EXISTS event_entities ( event_id INTEGER, entity_id INTEGER, FOREIGN KEY (event_id) REFERENCES events(id), FOREIGN KEY (entity_id) REFERENCES entities(id), PRIMARY KEY (event_id, entity_id) ); CREATE TABLE IF NOT EXISTS entity_relationships ( id INTEGER PRIMARY KEY, source_entity_id INTEGER, target_entity_id INTEGER, relationship_type TEXT, confidence REAL, first_seen DATETIME, last_seen DATETIME, FOREIGN KEY (source_entity_id) REFERENCES entities(id), FOREIGN KEY (target_entity_id) REFERENCES entities(id) ); CREATE INDEX IF NOT EXISTS idx_entity_text ON entities(entity_text, entity_type); CREATE INDEX IF NOT EXISTS idx_event_entities ON event_entities(event_id, entity_id); CREATE INDEX IF NOT EXISTS idx_entity_relationships ON entity_relationships(source_entity_id, target_entity_id); ''') self.conn.commit() def store_entities(self, event_id: int, entities_dict: Dict[str, List[str]]) -> None: """Store or update entities and their relationships to events.""" now = datetime.now().isoformat() for entity_type, entities in entities_dict.items(): if not isinstance(entities, list): continue for entity_text in entities: # Check if entity exists cursor = self.conn.execute( 'SELECT id, frequency FROM entities WHERE entity_text = ? AND entity_type = ?', (entity_text, entity_type) ) result = cursor.fetchone() if result: entity_id, freq = result self.conn.execute(''' UPDATE entities SET frequency = ?, last_seen = ? WHERE id = ? ''', (freq + 1, now, entity_id)) else: cursor = self.conn.execute(''' INSERT INTO entities (entity_text, entity_type, first_seen, last_seen, confidence) VALUES (?, ?, ?, ?, ?) ''', (entity_text, entity_type, now, now, 1.0)) entity_id = cursor.lastrowid self.conn.execute(''' INSERT OR IGNORE INTO event_entities (event_id, entity_id) VALUES (?, ?) ''', (event_id, entity_id)) self.conn.commit() def find_related_events(self, event_data: Dict) -> List[Tuple]: """Find events related through shared entities.""" entity_texts = [] for entity_type, entities in event_data.get('entities', {}).items(): if isinstance(entities, list): entity_texts.extend(entities) if not entity_texts: return [] placeholders = ','.join('?' * len(entity_texts)) query = f''' SELECT DISTINCT e.*, COUNT(ee.entity_id) as shared_entities FROM events e JOIN event_entities ee ON e.id = ee.event_id JOIN entities ent ON ee.entity_id = ent.id WHERE ent.entity_text IN ({placeholders}) GROUP BY e.id ORDER BY shared_entities DESC, e.timestamp DESC LIMIT 5 ''' return self.conn.execute(query, entity_texts).fetchall() def update_entity_relationships(self, event_id: int) -> None: """Update relationships between entities in an event.""" entities = self.conn.execute(''' SELECT e.id, e.entity_text, e.entity_type FROM entities e JOIN event_entities ee ON e.id = ee.entity_id WHERE ee.event_id = ? ''', (event_id,)).fetchall() now = datetime.now().isoformat() for i, entity1 in enumerate(entities): for entity2 in entities[i+1:]: if entity1[2] == entity2[2]: continue relationship_type = f"{entity1[2]}_to_{entity2[2]}" self._update_relationship(entity1[0], entity2[0], relationship_type, now) self.conn.commit() def _update_relationship(self, source_id: int, target_id: int, rel_type: str, timestamp: str) -> None: """Update or create a relationship between entities.""" result = self.conn.execute(''' SELECT id FROM entity_relationships WHERE (source_entity_id = ? AND target_entity_id = ?) OR (source_entity_id = ? AND target_entity_id = ?) ''', (source_id, target_id, target_id, source_id)).fetchone() if result: self.conn.execute(''' UPDATE entity_relationships SET last_seen = ?, confidence = confidence + 0.1 WHERE id = ? ''', (timestamp, result[0])) else: self.conn.execute(''' INSERT INTO entity_relationships (source_entity_id, target_entity_id, relationship_type, confidence, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?) ''', (source_id, target_id, rel_type, 0.5, timestamp, timestamp)) def get_entity_relationships(self, event_id: int) -> List[Dict[str, Any]]: """Get all relationships for entities in an event.""" query = ''' SELECT DISTINCT er.*, e1.entity_text as source_text, e1.entity_type as source_type, e2.entity_text as target_text, e2.entity_type as target_type FROM event_entities ee JOIN entity_relationships er ON ee.entity_id IN (er.source_entity_id, er.target_entity_id) JOIN entities e1 ON er.source_entity_id = e1.id JOIN entities e2 ON er.target_entity_id = e2.id WHERE ee.event_id = ? ''' return [dict(row) for row in self.conn.execute(query, (event_id,)).fetchall()]