Spaces:
Runtime error
Runtime error
from copy import deepcopy | |
from SPARQLWrapper import SPARQLWrapper, JSON | |
import csv | |
import regex as re | |
import os | |
DIRPATH = os.path.dirname(os.path.abspath(__file__)) | |
# Dictionary to store all property labels and description | |
class PropertyDetails: | |
def __init__(self): | |
self.prop_details = dict() | |
with open(f"{DIRPATH}/property.csv", "r", encoding="utf-8") as f: | |
reader = csv.reader(f, delimiter=",") | |
for prop in reader: | |
self.prop_details[prop[0]] = [prop[1], prop[2]] | |
def get_details(self, prop_id): | |
return self.prop_details.get(prop_id, ["", ""]) | |
sid_num = 0 | |
propdetails = PropertyDetails() | |
def convert_sparql_to_backend(query): | |
all_var_str = "[() ]\?[a-zA-Z0-9_-]+[() ]" | |
filter_str = r"\(.+\(.* (\?.+)\) [Aa][Ss].*\)" | |
sparql_split = query.split("\n") | |
select = sparql_split[0] | |
select += " " | |
sel_list = re.findall(all_var_str, select, overlapped=True) | |
sel_list = [sel[1:-1] for sel in sel_list] | |
rm_list = re.findall(filter_str, select) | |
for sel in rm_list: | |
sel_list.remove(sel) | |
# print(sel_list) | |
added_sel_list = [] | |
basic_sel_list = [] | |
for sel in sel_list: | |
if len(sel) > 0 and sel[0] == "?": | |
basic_sel_list.append(sel) | |
added_sel_list.append(sel + "Label") | |
added_sel_list.append(sel + "Description") | |
if len(rm_list) == 0: | |
for sel in added_sel_list: | |
select += " " + sel | |
# print(select) | |
sparql_split[0] = select | |
service_pos = -1 | |
query = "\n".join(sparql_split) | |
for i in range(len(query) - 1, -1, -1): | |
if query[i] == "}": | |
service_pos = i | |
break | |
query = ( | |
query[:service_pos] | |
+ 'SERVICE wikibase:label { bd:serviceParam wikibase:language "en". }\n' | |
+ query[service_pos:] | |
+ "\nLIMIT 200" | |
) | |
basic_sel_list = [b[1:] for b in basic_sel_list] | |
return query, basic_sel_list | |
def get_property_details_with_name(url: str, name: str): | |
id = url.split("/")[-1] | |
checkConst = url.split("/")[4] | |
if len(id) > 0 and id[0] == "#": | |
return { | |
name: id, | |
name + "Label": "", | |
name + "Description": "", | |
# name + 'uri': '' | |
} | |
elif checkConst[0] == '"': | |
label = url.split('"')[1] | |
type = url.split("<")[-1] | |
type = type[0 : len(type) - 1] | |
return { | |
name: "", | |
name + "Label": label, | |
name + "Description": "", | |
# name + 'uri': '', | |
# 'type': type | |
} | |
prop = propdetails.get_details(id) | |
id = id.split("+") | |
if len(id) == 1: | |
return { | |
name: id[0], | |
name + "Label": prop[0], | |
name + "Description": prop[1], | |
# 'propuri': url | |
} | |
labels = [propdetails.get_details(id_)[0] for id_ in id] | |
return { | |
name: "+".join(id), | |
name + "Label": "+".join(labels), | |
name + "Description": "", | |
# name + 'uri': '' | |
} | |
def convert(dictt): | |
for key in dictt: | |
dictt[key] = {"value": dictt[key]} | |
return dictt | |
def getResult(query): | |
sparql = SPARQLWrapper( | |
"https://query.wikidata.org/sparql", | |
agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36", | |
) | |
sparql.setQuery(query) | |
sparql.setReturnFormat(JSON) | |
result = sparql.query().convert() | |
return result["results"]["bindings"] | |
def get_details_sparql(id): | |
condition = 'wd:{} rdfs:label ?label. wd:{} schema:description ?description. FILTER(lang(?label) = "en" && lang(?description) = "en")'.format( | |
id, id | |
) | |
query = "SELECT ?label ?description \nWHERE\n{\n" + condition + "\n}\n LIMIT 1" | |
return query | |
# Get label & description of an entity | |
def get_entity_details(url: str): | |
id = url.split("/")[-1] | |
if len(id) > 0 and id[0] == "#": | |
return {"id": id, "label": None, "description": None, "uri": None} | |
if not (len(id) > 0 and id[0] in ["P", "Q"]): | |
return {"id": id, "label": None, "description": None, "uri": None} | |
if url[0:4] != "http": | |
return {"id": None, "label": None, "description": None, "uri": None} | |
if url[7:23] != "www.wikidata.org": | |
return {"id": None, "label": None, "description": None, "uri": url} | |
result = getResult(get_details_sparql(id)) | |
if len(result) == 0: | |
return {"id": id, "label": None, "description": None, "uri": url, "type": ""} | |
response = { | |
"id": id, | |
"label": result[0].get("label", {"value": ""})["value"], | |
"description": result[0].get("description", {"value": ""})["value"], | |
"uri": url, | |
"type": "uri", | |
} | |
return response | |
# Get label & description of a property | |
def get_property_details(url: str): | |
id = url.split("/")[-1] | |
checkConst = url.split("/")[4] | |
if len(id) > 0 and id[0] == "#": | |
return { | |
"prop": id, | |
"propLabel": "", | |
"propDescription": "", | |
# 'propuri': '' | |
} | |
elif checkConst[0] == '"': | |
label = url.split('"')[1] | |
type = url.split("<")[-1] | |
type = type[0 : len(type) - 1] | |
return { | |
"prop": "", | |
"propLabel": label, | |
"propDescription": "", | |
# 'propuri': '', | |
# 'type': type | |
} | |
prop = propdetails.get_details(id) | |
id = id.split("+") | |
if len(id) == 1: | |
return { | |
"prop": id[0], | |
"propLabel": prop[0], | |
"propDescription": prop[1], | |
# 'propuri': url | |
} | |
labels = [propdetails.get_details(id_)[0] for id_ in id] | |
return { | |
"prop": "+".join(id), | |
"propLabel": "+".join(labels), | |
"propDescription": "", | |
# 'propuri': '' | |
} | |
def enc(i): | |
assert "attr" in i and i["id"] != None | |
global sid_num | |
if i["attr"] == "tmp": | |
return "?" + "tmp" + str(i["id"]) + "_" | |
if i["attr"] == "val": | |
return str(i["id"]) | |
if i["attr"] == "sid": | |
return "?" + "sid_" + str(i["id"]) | |
if len(i["id"].split("|")) > 1: | |
Str = "" | |
for Id in i["id"].split("|"): | |
sid_num += 1 | |
Str += i["attr"] + ":" + Id | |
Str += "|" | |
return Str[:-1] | |
if i["attr"] == "wdt": | |
sid_num += 1 | |
return "p:{} ?sid_{}.\n?sid_{} ps:{}".format( | |
str(i["id"]), sid_num, sid_num, str(i["id"]) | |
) | |
return i["attr"] + ":" + str(i["id"]) | |
class Slot2Sparql: | |
class selection: | |
def __init__(self): | |
self.str0 = "SELECT " # 搜索的目标字符串 | |
self.str1 = "" | |
self.str2 = "" | |
self.select = [] # select后内容 | |
self.select_sid = [] # 最新statementId | |
self.new_select = [] # count max min select 的tmp id | |
self.trip = [] # 下方的搜索字符串 | |
self.tmp = [] # 临时变量 | |
self.state = [] | |
self.tail = [] # 尾部 | |
self.find_tuple_match = {} | |
def give_str(self): | |
need_label = len(self.str1) == 0 | |
str = self.str0 | |
for s in self.select: | |
cur_enc = enc(s) | |
str += cur_enc | |
if need_label: | |
str += " {}Label {}Description".format(cur_enc, cur_enc) | |
if len(self.select) == 1: | |
str += self.str1 | |
str += self.str2 | |
str += " " | |
for s in self.select_sid: | |
str += enc(s) | |
str += "\nWHERE\n{\n" | |
for s in self.trip: | |
str += s | |
if str[-1] != "{": | |
str += "." | |
str += "\n" | |
if need_label: | |
str += 'SERVICE wikibase:label { bd:serviceParam wikibase:language "en". }\n' | |
str += "}" | |
for s in self.tail: | |
str += "\n" | |
str += s | |
str += "\n" | |
return str | |
def set_select(self, sele): | |
self.str0 = "SELECT " | |
self.select = [sele] | |
self.str1 = "" | |
def clear_all(self): | |
self.str0 = "SELECT " # 搜索的目标字符串 | |
self.str1 = "" | |
self.str2 = "" | |
self.select = [] # select后内容 | |
self.trip = [] # 下方的搜索字符串 | |
self.tmp = [] # 临时变量 | |
self.state = [] | |
self.tail = [] # 尾部 | |
def getCount(self): | |
str = self.str0 | |
str += "(COUNT(" | |
for s in self.select: | |
cur_enc = enc(s) | |
str += cur_enc | |
if len(self.select) == 1: | |
str += self.str1 | |
str += " " | |
str += ") AS ?cnt)" | |
str += "\nWHERE\n{\n" | |
for s in self.trip: | |
str += s | |
str += "." | |
str += "\n" | |
str += "}" | |
for s in self.tail: | |
str += "\n" | |
str += s | |
str += "\n" | |
return str | |
def __init__(self): | |
self.select_lst = [] | |
self.num = 0 | |
sid_num = 0 | |
def clear_all(self): | |
self.select_lst = [] | |
self.num = 0 | |
def new_select_lst(self): | |
self.select_lst.append(self.selection()) | |
def prev_select_lst(self, idx): | |
self.select_lst.append(deepcopy(self.select_lst[idx])) | |
def find_select_lst(self, tar): | |
assert tar["attr"] == "tmp" and tar["id"] < self.num | |
if tar in self.select_lst[-1].tmp: | |
return | |
for i in range(len(self.select_lst) - 2, -1, -1): | |
if tar in self.select_lst[i].select: | |
self.select_lst[-1].trip += self.select_lst[i].trip # 下方的搜索字符串 | |
self.select_lst[-1].state += self.select_lst[i].state | |
self.select_lst[-1].tmp += self.select_lst[i].tmp | |
self.select_lst[-1].tail += self.select_lst[i].tail # 尾部 | |
return | |
def head( | |
self, | |
): | |
pass | |
def body( | |
self, | |
): | |
pass | |
def find_tuple(self, tup): | |
self.new_select_lst() | |
target = [] | |
for i in tup: | |
if tup[i]["attr"] == "tmp" and tup[i]["id"] == None: | |
# 新的临时变量 | |
tup[i]["id"] = self.num | |
self.num += 1 | |
target.append(tup[i]) | |
self.select_lst[-1].find_tuple_match[i] = enc(tup[i])[1:] | |
self.select_lst[-1].tmp.append(tup[i]) | |
elif tup[i]["attr"] == "tmp": | |
assert tup[i]["id"] < self.num | |
self.find_select_lst(tup[i]) | |
target.append(tup[i]) | |
self.select_lst[-1].find_tuple_match[i] = enc(tup[i])[1:] | |
if target == []: | |
is_triplet_full = True | |
for i in tup: | |
if tup[i]["attr"] == "tmp": | |
self.find_select_lst(tup[i]) | |
target.append(tup[i]) | |
self.select_lst[-1].find_tuple_match[i] = enc(tup[i])[1:] | |
break | |
else: | |
is_triplet_full = False | |
self.select_lst[-1].select = target | |
self.select_lst[-1].state.append([tup["x"], tup["y"], tup["z"]]) | |
if type(tup["y"]["id"]) == str: | |
y_id_splited = tup["y"]["id"].split("+") | |
else: | |
y_id_splited = [] | |
tmpXZ = [tup["x"]] | |
for i in range(len(y_id_splited) - 1): | |
tmpXZ.append({"attr": "tmp", "id": self.num}) | |
self.num += 1 | |
tmpXZ.append(tup["z"]) | |
idx = 0 | |
str1 = "" | |
if len(y_id_splited) != 0: | |
for tmpY in y_id_splited: | |
newY = {"attr": "wdt", "id": tmpY} | |
str1 += enc(tmpXZ[idx]) | |
str1 += " " | |
str1 += enc(newY) | |
str1 += " " | |
str1 += enc(tmpXZ[idx + 1]) | |
str1 += ".\n" | |
idx += 1 | |
else: | |
str1 += enc(tup["x"]) | |
str1 += " " | |
str1 += enc(tup["y"]) | |
str1 += " " | |
str1 += enc(tup["z"]) | |
str1 += ".\n" | |
str1 = str1[:-2] | |
print(str1) | |
self.select_lst[-1].select_sid = [{"attr": "sid", "id": sid_num}] | |
self.select_lst[-1].trip.append(str1) | |
if is_triplet_full: | |
self.change_tmpidx(target[0]) | |
def find_entity(self, ent1): | |
self.new_select_lst() | |
self.select_lst[-1].str0 += "DISTINCT " | |
self.select_lst[-1].select = [{}, {}, {}] | |
innerSelect = [{}, {}] | |
for t in self.select_lst[-1].select: | |
t["attr"] = "tmp" | |
t["id"] = self.num | |
self.num += 1 | |
self.select_lst[-1].tmp.append(t) | |
for t in innerSelect: | |
t["attr"] = "tmp" | |
t["id"] = self.num | |
self.num += 1 | |
if ent1["attr"] == "tmp": | |
self.find_select_lst(ent1) | |
# ent1位于三元组的头 | |
self.select_lst[-1].state.append( | |
[ | |
ent1, | |
self.select_lst[-1].select[0], | |
self.select_lst[-1].select[1], | |
self.select_lst[-1].select[2], | |
] | |
) | |
str1 = enc(ent1) | |
str1 += " " | |
str1 += enc(self.select_lst[-1].select[0]) | |
str1 += " " | |
str1 += enc(self.select_lst[-1].select[1]) | |
self.select_lst[-1].trip.append("{") | |
self.select_lst[-1].trip.append(str1) | |
self.select_lst[-1].trip.append("}\nUNION\n{") | |
str1 = enc(ent1) | |
str1 += " " | |
str1 += enc(innerSelect[0]) | |
str1 += " " | |
str1 += enc(innerSelect[1]) | |
self.select_lst[-1].trip.append(str1) | |
str1 = enc(innerSelect[1]) | |
str1 += " pq:P585 " | |
str1 += enc(self.select_lst[-1].select[2]) | |
str1 += ";\n" | |
str1 += enc(self.select_lst[-1].select[0]) | |
str1 += " " | |
str1 += enc(self.select_lst[-1].select[1]) | |
self.select_lst[-1].trip.append(str1) | |
self.select_lst[-1].trip.append("}") | |
if ent1["attr"] == "wds": | |
str1 = 'FILTER(STRSTARTS ( STR ( {} ), "http://www.wikidata.org/prop/" ))'.format( | |
enc(self.select_lst[-1].select[0]) | |
) | |
else: | |
str1 = 'FILTER(STRSTARTS ( STR ( {} ), "http://www.wikidata.org/prop/direct/" ) || STRSTARTS ( STR ( {} ),"http://www.wikidata.org/prop/statement/" ))'.format( | |
enc(self.select_lst[-1].select[0]), enc(self.select_lst[-1].select[0]) | |
) | |
self.select_lst[-1].trip.append(str1) | |
def find_entity_by_tail(self, ent1): | |
self.new_select_lst() | |
self.select_lst[-1].str0 += "DISTINCT " | |
self.select_lst[-1].select = [{}, {}] | |
for t in self.select_lst[-1].select: | |
t["attr"] = "tmp" | |
t["id"] = self.num | |
self.num += 1 | |
self.select_lst[-1].tmp.append(t) | |
if ent1["attr"] == "tmp": | |
self.find_select_lst(ent1) | |
# ent1位于三元组的尾 | |
self.select_lst[-1].state.append( | |
[self.select_lst[-1].select[0], self.select_lst[-1].select[1], ent1] | |
) | |
str1 = enc(self.select_lst[-1].select[0]) | |
str1 += " " | |
str1 += enc(self.select_lst[-1].select[1]) | |
str1 += " " | |
str1 += enc(ent1) | |
self.select_lst[-1].trip.append(str1) | |
str1 = 'FILTER(STRSTARTS ( STR ( {} ), "http://www.wikidata.org/entity/Q" ))'.format( | |
enc(self.select_lst[-1].select[0]) | |
) | |
self.select_lst[-1].trip.append(str1) | |
str1 = ( | |
'FILTER(STRSTARTS ( STR ( {} ), "http://www.wikidata.org/prop/" ))'.format( | |
enc(self.select_lst[-1].select[1]) | |
) | |
) | |
self.select_lst[-1].trip.append(str1) | |
def find_entity_by_relation(self, ent1): | |
self.new_select_lst() | |
self.select_lst[-1].str0 += "DISTINCT " | |
self.select_lst[-1].select = [{}, {}] | |
for t in self.select_lst[-1].select: | |
t["attr"] = "tmp" | |
t["id"] = self.num | |
self.num += 1 | |
self.select_lst[-1].tmp.append(t) | |
if ent1["attr"] == "tmp": | |
self.find_select_lst(ent1) | |
# ent1位于三元组的尾 | |
self.select_lst[-1].state.append( | |
[self.select_lst[-1].select[0], self.select_lst[-1].select[1], ent1] | |
) | |
str1 = enc(self.select_lst[-1].select[0]) | |
str1 += " " | |
str1 += enc(ent1) | |
str1 += " " | |
str1 += enc(self.select_lst[-1].select[1]) | |
self.select_lst[-1].trip.append(str1) | |
str1 = 'FILTER(STRSTARTS ( STR ( {} ), "http://www.wikidata.org/entity/Q" ))'.format( | |
enc(self.select_lst[-1].select[0]) | |
) | |
self.select_lst[-1].trip.append(str1) | |
def binary_operation(self, ent1, op, ent2): | |
if op in [">", "<", "=", "!=", ">=", "<="]: | |
self.new_select_lst() | |
assert ent1["attr"] == "tmp" | |
self.find_select_lst(ent1) | |
# 使用 filter 表示比较关系 | |
str1 = "FILTER (" | |
str1 += enc(ent1) | |
str1 += " " | |
str1 += op | |
str1 += " " | |
str1 += enc(ent2) | |
str1 += ")" | |
self.select_lst[-1].trip.append(str1) | |
self.select_lst[-1].select = [ent1] | |
self.change_tmpidx(ent1) | |
if ent2["attr"] == "tmp": | |
self.select_lst[-1].select.append(ent2) | |
elif op in ["+", "-", "*", "/"]: | |
self.new_select_lst() | |
if ent1["attr"] == "tmp": | |
self.find_select_lst(ent1) | |
if ent2["attr"] == "tmp": | |
self.find_select_lst(ent2) | |
# 使用新的临时变量 | |
# BIND(?tmpxx / 365.2425 AS ?tmpxx). | |
t = {} | |
t["attr"] = "tmp" | |
t["id"] = self.num | |
self.num += 1 | |
self.select_lst[-1].select = [t] | |
self.select_lst[-1].tmp.append(t) | |
str1 = "BIND (" | |
str1 += enc(ent1) | |
str1 += " " | |
str1 += op | |
str1 += " " | |
str1 += enc(ent2) | |
str1 += " AS " | |
str1 += enc(t) | |
str1 += ")." | |
self.select_lst[-1].trip.append(str1) | |
elif op in ["&&", "||", "~"]: | |
self.new_select_lst() | |
assert ent1["attr"] == ent2["attr"] == "tmp" | |
self.select_lst[-1].trip.append("{") | |
self.find_select_lst(ent1) | |
if op == "&&": | |
pass | |
elif op == "||": | |
self.select_lst[-1].trip.append("}\nUNION\n{") | |
else: | |
self.select_lst[-1].trip.append("}\nMINUS\n{") | |
self.find_select_lst(ent2) | |
self.select_lst[-1].trip.append("}") | |
# 使用新的临时变量 | |
# BIND(?tmpxx / 365.2425 AS ?tmpxx). | |
t = {} | |
t["attr"] = "tmp" | |
t["id"] = self.num | |
self.num += 1 | |
tmp = [] | |
self.select_lst[-1].select = [t] | |
self.select_lst[-1].tmp.append(t) | |
self.select_lst[-1].tmp.remove(ent1) | |
self.select_lst[-1].tmp.remove(ent2) | |
for line in self.select_lst[-1].trip: | |
nline = line.replace(enc(ent1), enc(t)) | |
nline = nline.replace(enc(ent2), enc(t)) | |
tmp.append(nline) | |
self.select_lst[-1].trip = tmp | |
for line in self.select_lst[-1].state: | |
for i in line: | |
if i == ent1 or i == ent2: | |
i = t | |
tmp = [] | |
for line in self.select_lst[-1].tail: | |
nline = line.replace(enc(ent1), enc(t)) | |
nline = nline.replace(enc(ent2), enc(t)) | |
tmp.append(nline) | |
self.select_lst[-1].tail = tmp | |
def unitary_operation(self, ent, op, last_sparql_idx): | |
if op in ["ORDER", "GROUP (ASC)", "GROUP (DESC)"]: | |
self.new_select_lst() | |
self.find_select_lst(ent) | |
self.select_lst[-1].select = [ent] | |
str1 = op.split(" ")[0] + " BY " | |
str1 += enc(ent) | |
if "GROUP" in op.split(" "): | |
str1 += " {}Label {}Description".format(enc(ent), enc(ent)) | |
if op.split(" ")[-1] == "(DESC)": | |
str1 += "\nORDER BY DESC(?cnt)" | |
else: | |
str1 += "\nORDER BY ?cnt" | |
self.select_lst[-1].tail.append(str1) | |
self.change_tmpidx(ent) | |
if "GROUP" in op.split(" "): | |
self.select_lst[-1].str2 = " (COUNT({}) AS ?cnt) ".format( | |
enc(self.select_lst[-1].select[0]) | |
) | |
elif op in ["LIMIT", "OFFSET"]: | |
self.prev_select_lst(last_sparql_idx) | |
str1 = op + " " | |
str1 += enc(ent) | |
self.select_lst[-1].tail.append(str1) | |
self.change_tmpidx(self.select_lst[-1].select[0]) | |
self.select_lst[-1].new_select = self.select_lst[-1].select | |
elif op in ["DISTINCT", "REDUCED"]: | |
self.new_select_lst() | |
self.find_select_lst(ent) | |
self.select_lst[-1].select = [ent] | |
self.select_lst[-1].str0 += op | |
self.select_lst[-1].str0 += " " | |
elif op in ["MIN", "MAX", "AVG", "SUM", "COUNT", "SAMPLE"]: | |
self.new_select_lst() | |
self.find_select_lst(ent) | |
t = {} | |
t["attr"] = "tmp" | |
t["id"] = self.num | |
self.num += 1 | |
self.select_lst[-1].new_select = [t] | |
self.select_lst[-1].tmp.append(t) | |
self.select_lst[-1].select = [ent] | |
self.select_lst[-1].str0 += "(" | |
self.select_lst[-1].str0 += op | |
self.select_lst[-1].str0 += "(" | |
self.select_lst[-1].str1 += ") AS " | |
self.select_lst[-1].str1 += enc(t) | |
self.select_lst[-1].str1 += ")" | |
def give_str(self, sparqlIdx=-1): | |
return self.select_lst[sparqlIdx].give_str(), self.select_lst[sparqlIdx].select | |
def give_tmp(self, sparqlIdx=-1): | |
return self.select_lst[sparqlIdx].tmp | |
def change_tmpidx(self, ent1, sparqlIdx=-1): | |
# 将ent1的tmp_id更新 | |
t = {} | |
t["attr"] = "tmp" | |
t["id"] = self.num | |
self.num += 1 | |
tmp = [] | |
self.select_lst[sparqlIdx].select = [t] | |
self.select_lst[sparqlIdx].tmp.append(t) | |
self.select_lst[sparqlIdx].tmp.remove(ent1) | |
for line in self.select_lst[sparqlIdx].trip: | |
nline = line.replace(enc(ent1), enc(t)) | |
tmp.append(nline) | |
self.select_lst[sparqlIdx].trip = tmp | |
for line in self.select_lst[sparqlIdx].state: | |
for i in line: | |
if i == ent1: | |
i = t | |
tmp = [] | |
for line in self.select_lst[sparqlIdx].tail: | |
nline = line.replace(enc(ent1), enc(t)) | |
tmp.append(nline) | |
self.select_lst[sparqlIdx].tail = tmp | |
self.select_lst[sparqlIdx].str2 = self.select_lst[sparqlIdx].str2.replace( | |
enc(ent1), enc(t) | |
) | |