Tools / kg /wikidata /api.py
ZackBradshaw's picture
Upload folder using huggingface_hub
e67043b verified
from .utils import *
import pandas as pd
import requests
import json
from ...tool import Tool
def build_tool(config) -> Tool:
tool = Tool(
"Search in Wikidata",
"answering factual questions in wikidata.",
description_for_model="Plugin for answering factual questions in wikidata.",
logo_url="https://your-app-url.com/.well-known/logo.png",
contact_email="[email protected]",
legal_info_url="[email protected]",
)
sparql = Slot2Sparql()
@tool.get("/find_entity")
def find_entity(input):
"""Find all <r, t> that has the relation <input, r, t>. It looks like viewing the main page of the input entity. The result is a table."""
try:
sparqlIdx = -1
if input[0] == "#":
input = {"id": int(input[1:]), "attr": "tmp"}
elif input[0] == "Q":
input = {"id": input, "attr": "wd"}
elif input[0] == "P":
input = {"id": input, "attr": "wdt"}
elif input[0] == "@":
input = {"id": input[1:], "attr": "wds"}
else:
input = {"id": input, "attr": "val"}
sparql.find_entity(input)
sparqlIdx = len(sparql.select_lst) - 1
query, ids = sparql.give_str(sparqlIdx)
query += "\nLIMIT 2000"
ids = ["#" + str(id["id"]) for id in ids]
result = getResult(query)
variable_name = [
enc(sparql.select_lst[sparqlIdx].state[-1][1])[1:],
enc(sparql.select_lst[sparqlIdx].state[-1][2])[1:],
enc(sparql.select_lst[sparqlIdx].state[-1][3])[1:],
]
response = [{} for i in range(0, len(result))]
print("RESULT:", result)
for idx, re in enumerate(result):
response[idx].update(
get_property_details(re[variable_name[0]]["value"])
if re[variable_name[0]]["type"] == "uri"
else {
"relation": "",
"relationLabel": re[variable_name[0]]["value"],
"relationDescription": "",
# 'propuri': ''
}
)
response[idx].update(
{
"tail": re[variable_name[1]]["value"].split("/")[-1]
if re[variable_name[1]]["type"] == "uri"
else "",
"tailLabel": re.get(variable_name[1] + "Label", {"value": ""})[
"value"
]
if re[variable_name[1]]["type"] == "uri"
else re[variable_name[1]]["value"],
"tailDescription": re.get(
variable_name[1] + "Description", {"value": ""}
)["value"],
# 'tailuri': re[variable_name[1]]['value'] if re[variable_name[1]]['type'] == 'uri' else '',
# 'tailtype': 'uri' if re[variable_name[1]]['type'] == 'uri' else re[variable_name[1]].get('datatype', '')
}
)
if variable_name[2] in re:
response[idx].update(
{
"time": re.get(variable_name[2] + "Label", {"value": ""})[
"value"
]
if re[variable_name[2]]["type"] == "uri"
else re[variable_name[2]]["value"],
}
)
else:
response[idx].update({"time": "ALWAYS"})
df = pd.DataFrame.from_dict(response)
return df.to_markdown()
except Exception:
print("Invalid option!\n", Exception)
return df.to_markdown()
@tool.get("/find_entity_by_tail")
def find_entity_by_tail(input: str):
"""Find all <h, r> that has the relation <h, r, input>. It looks like viewing the reverse main page of the input entity. The result is a table."""
try:
sparqlIdx = -1
if input[0] == "#":
input = {"id": int(input[1:]), "attr": "tmp"}
elif input[0] == "Q":
input = {"id": input, "attr": "wd"}
elif input[0] == "P":
input = {"id": input, "attr": "wdt"}
elif input[0] == "@":
input = {"id": input[1:], "attr": "wds"}
else:
input = {"id": input, "attr": "val"}
sparql.find_entity_by_tail(input)
sparqlIdx = len(sparql.select_lst) - 1
query, ids = sparql.give_str(sparqlIdx)
query += "\nLIMIT 2000"
ids = ["#" + str(id["id"]) for id in ids]
result = getResult(query)
variable_name = [
enc(sparql.select_lst[sparqlIdx].state[-1][0])[1:],
enc(sparql.select_lst[sparqlIdx].state[-1][1])[1:],
]
response = [{} for i in range(0, len(result))]
for idx, re in enumerate(result):
response[idx].update(
get_property_details(re[variable_name[1]]["value"])
if re[variable_name[1]]["type"] == "uri"
else {
"relation": "",
"relationLabel": re[variable_name[1]]["value"],
"relationDescription": "",
# 'labelUri': ''
}
)
response[idx].update(
{
"head": re[variable_name[0]]["value"].split("/")[-1]
if re[variable_name[0]]["type"] == "uri"
else "",
"headLabel": re.get(variable_name[0] + "Label", {"value": ""})[
"value"
]
if re[variable_name[0]]["type"] == "uri"
else re[variable_name[0]]["value"],
"headDescription": re.get(
variable_name[0] + "Description", {"value": ""}
)["value"],
# 'headUri': re[variable_name[0]]['value'] if re[variable_name[0]]['type'] == 'uri' else '',
# 'headType': 'uri' if re[variable_name[0]]['type'] == 'uri' else re[variable_name[0]].get('datatype', '')
}
)
df = pd.DataFrame.from_dict(response)
return df.to_markdown()
except Exception:
print("Invalid option!\n", Exception)
return pd.DataFrame().to_markdown()
@tool.get("/get_entity_id")
def get_entity_id(input: str):
"""Search for all the entities that has the surface form as the input. For example, all the entities that are named ``Obama'', including either person, book, anything else."""
try:
result = requests.get(
"https://www.wikidata.org/w/api.php",
params={
"type": "item",
"action": "wbsearchentities",
"language": "en",
"search": input,
"origin": "*",
"format": "json",
},
).text
result = json.loads(result)["search"]
# print(result)
df = pd.DataFrame.from_dict(result)
for row in df.axes[1]:
if row != "id" and row != "label" and row != "description":
df.pop(row)
return df.to_markdown()
except Exception:
print("Invalid option!\n", Exception)
return pd.DataFrame().to_markdown()
@tool.get("/get_relation_id")
def get_relation_id(input: str):
"""Search for all the relations that has the surface form as the input. For example, all the relations that are named ``tax''."""
try:
result = requests.get(
"https://www.wikidata.org/w/api.php",
params={
"type": "property",
"action": "wbsearchentities",
"language": "en",
"search": input,
"origin": "*",
"format": "json",
},
).text
result = json.loads(result)["search"]
df = pd.DataFrame.from_dict(result)
for row in df.axes[1]:
if row != "id" and row != "label" and row != "description":
df.pop(row)
return df.to_markdown()
except Exception:
print("Invalid option!\n", Exception)
return pd.DataFrame().to_markdown()
@tool.get("/search_by_code")
def search_by_code(query: str):
"""After knowing the unique id of entity or relation, perform a sparql query. E.g.,
Select ?music\nWhere {{\nwd:Q00 wdt:P00 ?music.\n}} The entity label will be automatically retrieved.
"""
try:
query, basic_sel = convert_sparql_to_backend(query)
result = getResult(query)
for i in range(0, len(result)):
for sel in basic_sel:
if sel not in result[i]:
continue
if (
len(result[i][sel]["value"]) < 4
or result[i][sel]["value"][0:4] != "http"
):
continue
id = result[i][sel]["value"].split("/")[-1]
if type(id) == str and len(id) > 0 and id[0] == "P":
result[i].update(
convert(
get_property_details_with_name(
result[i][sel]["value"], sel
)
)
)
df = pd.DataFrame.from_dict(result)
return df.to_markdown()
except Exception:
print("Invalid option!\n", Exception)
return pd.DataFrame().to_markdown()
return tool