OOP_KG_Transform_POC / database_operations.py
Zaherrr's picture
Upload 8 files
b2ad712 verified
raw
history blame
1.83 kB
from neo4j import GraphDatabase
class Neo4jDatabase:
def __init__(self, uri, username, password):
self.driver = GraphDatabase.driver(uri, auth=(username, password))
def close(self):
self.driver.close()
def dump_to_neo4j(self, nodes, edges, label_prefix):
with self.driver.session() as session:
for node in nodes:
session.run(f"CREATE (n:{label_prefix}:Node {{id: $id, label: $label}})", id=node['id'], label=node['label'])
for edge in edges:
session.run(f"""
MATCH (a:{label_prefix}:Node {{id: $source}}), (b:{label_prefix}:Node {{id: $target}})
CREATE (a)-[r:RELATION {{type: $type}}]->(b)
""", source=edge['source'], target=edge['target'], type=edge['type'])
def check_existing_graph(self, label_prefix):
with self.driver.session() as session:
result = session.run(f"MATCH (n:{label_prefix}) RETURN count(n) as count")
count = result.single()["count"]
return count > 0
def get_graph_data(self, label_prefix):
with self.driver.session() as session:
nodes = session.run(f"MATCH (n:{label_prefix}) RETURN n.id AS id, n.label AS label")
edges = session.run(f"MATCH (a:{label_prefix})-[r]->(b:{label_prefix}) RETURN a.id AS source, b.id AS target, type(r) AS type")
nodes = [{"id": record["id"], "label": record["label"]} for record in nodes]
edges = [{"source": record["source"], "target": record["target"], "type": record["type"]} for record in edges]
return {"nodes": nodes, "edges": edges}
def delete_graph(self, label_prefix):
with self.driver.session() as session:
session.run(f"MATCH (n:{label_prefix}) DETACH DELETE n")