|
import json |
|
import numpy as np |
|
import re |
|
from itertools import combinations as itertools_combinations |
|
import os |
|
import sys |
|
from SPARQLWrapper import SPARQLWrapper, JSON |
|
from sentence_transformers import SentenceTransformer |
|
import aiohttp |
|
import asyncio |
|
import streamlit as st |
|
import time |
|
from openai import OpenAI |
|
import sys |
|
import time |
|
from bs4 import BeautifulSoup |
|
import requests |
|
import nest_asyncio |
|
import httpx |
|
|
|
|
|
nest_asyncio.apply() |
|
|
|
|
|
folder_path = '/home/user/app/qids_folder' |
|
|
|
if not os.path.exists(folder_path): |
|
os.mkdir(folder_path) |
|
else: |
|
pass |
|
|
|
|
|
folder_path_1 = '/home/user/app/info_extraction' |
|
|
|
if not os.path.exists(folder_path_1): |
|
os.mkdir(folder_path_1) |
|
print(f"Folder created at {folder_path_1}") |
|
else: |
|
pass |
|
|
|
model = SentenceTransformer("Lajavaness/bilingual-embedding-large", trust_remote_code=True) |
|
|
|
async def fetch_json(url, session): |
|
async with session.get(url) as response: |
|
return await response.json() |
|
|
|
async def combination_method(name, session): |
|
async with aiohttp.ClientSession() as session: |
|
data = set() |
|
new_name = name.split() |
|
x = itertools_combinations(new_name, 2) |
|
for i in x: |
|
new_word = (i[0] + " " + i[1]) |
|
url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={new_word}&srlimit=20&srprop=&srenablerewrites=True&format=json" |
|
json_data = await fetch_json(url, session) |
|
suggestion = json_data.get('query', {}).get('search', {}) |
|
for pageid in suggestion: |
|
data.add(pageid.get('title', {})) |
|
return data |
|
|
|
async def single_method(name, session): |
|
async with aiohttp.ClientSession() as session: |
|
data = set() |
|
new_name = name.replace("-", " ").replace("/", " ").split() |
|
for i in new_name: |
|
url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={i}&srlimit=20&srprop=&srenablerewrites=True&format=json" |
|
json_data = await fetch_json(url, session) |
|
suggestion = json_data.get('query', {}).get('search', {}) |
|
for pageid in suggestion: |
|
data.add(pageid.get('title', {})) |
|
return data |
|
|
|
async def mains(name, deep_search): |
|
data = set() |
|
disam_data = set() |
|
qids = set() |
|
|
|
async with aiohttp.ClientSession() as session: |
|
url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={name}&srlimit=20&srprop=&srenablerewrites=True&format=json" |
|
json_data = await fetch_json(url, session) |
|
suggestion = json_data.get('query', {}).get('search', {}) |
|
for pageid in suggestion: |
|
data.add(pageid.get('title', {})) |
|
|
|
wikipedia_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={name}&srlimit=1&srprop=&srenablerewrites=True&srinfo=suggestion&format=json" |
|
json_data = await fetch_json(wikipedia_url, session) |
|
suggestion = json_data.get('query', {}).get('searchinfo', {}).get('suggestion') |
|
|
|
if suggestion: |
|
suggested_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={suggestion}&srlimit=10&srprop=&srenablerewrites=True&srinfo=suggestion&format=json" |
|
json_suggestion = await fetch_json(suggested_url, session) |
|
results = json_suggestion.get('query', {}).get('search') |
|
for i in results: |
|
data.add(i.get('title')) |
|
|
|
|
|
if data != {0}: |
|
for ids in data: |
|
titles = set() |
|
wikipedia_disambiguation = f"https://en.wikipedia.org/w/api.php?action=query&generator=links&format=json&redirects=1&titles={ids}&prop=pageprops&gpllimit=500&ppprop=wikibase_item" |
|
json_id = await fetch_json(wikipedia_disambiguation, session) |
|
try: |
|
title = json_id.get('query').get('pages') |
|
for k, v in title.items(): |
|
titles.add(v.get("title")) |
|
except: |
|
pass |
|
|
|
if "Help:Disambiguation" in titles: |
|
for i in titles: |
|
if ":" not in i and "disambiguation" not in i: |
|
disam_data.add(i) |
|
else: |
|
disam_data.add(ids) |
|
|
|
|
|
if deep_search == "Yes": |
|
if len(name.replace("-", " ").split()) >= 3: |
|
combination_names = await combination_method(name, session) |
|
for i in combination_names: |
|
disam_data.add(i) |
|
|
|
|
|
if deep_search == "Yes": |
|
if len(name.replace("-", " ").replace("/", " ").split()) >= 2: |
|
singles = await single_method(name, session) |
|
for i in singles: |
|
disam_data.add(i) |
|
|
|
for ids in disam_data: |
|
try: |
|
wikibase_url = f"https://en.wikipedia.org/w/api.php?action=query&titles={ids}&prop=pageprops&format=json" |
|
json_qid = await fetch_json(wikibase_url, session) |
|
wikidata_qid = json_qid.get('query', {}).get('pages', {}) |
|
for page_id, page_data in wikidata_qid.items(): |
|
page_props = page_data.get('pageprops', {}) |
|
wikibase_item = page_props.get('wikibase_item', None) |
|
if wikibase_item: |
|
qids.add(wikibase_item) |
|
except: |
|
pass |
|
|
|
with open(f"/home/user/app/qids_folder/{name}.json", "w") as f: |
|
json.dump(list(qids), f) |
|
|
|
|
|
async def get_results(query): |
|
user_agent = "WDQS-example Python/%s.%s" % (sys.version_info[0], sys.version_info[1]) |
|
url = "https://query.wikidata.org/sparql" |
|
sparql = SPARQLWrapper(url, agent=user_agent) |
|
sparql.setQuery(query) |
|
sparql.setReturnFormat(JSON) |
|
return sparql.query().convert() |
|
|
|
def get_resultss(query): |
|
user_agent = "WDQS-example Python/%s.%s" % (sys.version_info[0], sys.version_info[1]) |
|
url = "https://query.wikidata.org/sparql" |
|
sparql = SPARQLWrapper(url, agent=user_agent) |
|
sparql.setQuery(query) |
|
sparql.setReturnFormat(JSON) |
|
return sparql.query().convert() |
|
|
|
|
|
def cleaner(text): |
|
text = text.replace('\\', '').replace('\n', ' ') |
|
text = re.sub(r'\{.*?\}', '', text) |
|
text = re.sub(' +', ' ', text).strip() |
|
return text |
|
|
|
async def retriever(qid): |
|
async with aiohttp.ClientSession() as session: |
|
list_with_sent = [] |
|
|
|
query_label = f"""SELECT ?subjectLabel |
|
WHERE {{ |
|
wd:{qid} rdfs:label ?subjectLabel . |
|
FILTER(LANG(?subjectLabel) = "en") |
|
}} |
|
""" |
|
|
|
results = await get_results(query_label) |
|
|
|
label = None |
|
if results["results"]["bindings"]: |
|
for result in results["results"]["bindings"]: |
|
for key, value in result.items(): |
|
label = value.get("value", {}).lower() |
|
|
|
query_alias = f"""SELECT ?alias |
|
WHERE {{ |
|
wd:{qid} skos:altLabel ?alias |
|
FILTER(LANG(?alias) = "en") |
|
}} |
|
""" |
|
|
|
alias_list = [] |
|
results = await get_results(query_alias) |
|
|
|
for result in results["results"]["bindings"]: |
|
for key, value in result.items(): |
|
alias = value.get("value", "None") |
|
alias_list.append(alias) |
|
|
|
query_desci = f"""SELECT ?subjectLabel |
|
WHERE {{ |
|
?subjectLabel schema:about wd:{qid} ; |
|
schema:inLanguage "en" ; |
|
schema:isPartOf <https://en.wikipedia.org/> . |
|
}} |
|
""" |
|
|
|
results = await get_results(query_desci) |
|
cleaned_first_para = "None" |
|
|
|
if results["results"]["bindings"]: |
|
for result in results["results"]["bindings"]: |
|
for key, value in result.items(): |
|
desc = value.get("value", "None") |
|
|
|
title = desc.split("/wiki/")[1] |
|
|
|
url = f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&titles={title}&exintro=&exsentences=2&explaintext=&redirects=&formatversion=2&format=json" |
|
|
|
|
|
json_data = await fetch_json(url, session) |
|
cleaned_first_para = cleaner(json_data.get('query', {}).get('pages', [{}])[0].get('extract', 'None')) |
|
else: |
|
query_desc = f"""SELECT ?subjectLabel |
|
WHERE {{ |
|
wd:{qid} schema:description ?subjectLabel . |
|
FILTER(LANG(?subjectLabel) = "en") |
|
}} |
|
""" |
|
|
|
results = await get_results(query_desc) |
|
if results["results"]["bindings"]: |
|
for result in results["results"]["bindings"]: |
|
for key, value in result.items(): |
|
cleaned_first_para = value.get("value", "None") |
|
|
|
list_with_sent.append({"qid": qid, "label": label, "description": cleaned_first_para}) |
|
|
|
if alias_list: |
|
for alias in alias_list: |
|
list_with_sent.append({"qid": qid, "label": alias.lower(), "description": cleaned_first_para}) |
|
|
|
return list_with_sent |
|
|
|
async def main(name): |
|
with open(f"/home/user/app/qids_folder/{name}.json", "r") as f: |
|
final_list = [] |
|
qids = json.load(f) |
|
for q in qids: |
|
returned_list = await retriever(q) |
|
if returned_list: |
|
final_list.extend(returned_list) |
|
|
|
with open(f"/home/user/app/info_extraction/{name}.json", "w", encoding="utf-8") as flast: |
|
json.dump(final_list, flast) |
|
|
|
def main_cli(): |
|
st.title("✨ Entity Linking Application ✨") |
|
st.caption("This web application is part of my master’s dissertation.") |
|
|
|
if 'run_button' in st.session_state and st.session_state.run_button == True: |
|
st.session_state.running = True |
|
else: |
|
st.session_state.running = False |
|
|
|
api_token = st.text_input("Enter your API key from [GitHub](https://github.com/marketplace/models/azure-openai/gpt-4o):", "", type="password", disabled=st.session_state.running) |
|
|
|
if api_token: |
|
endpoint = "https://models.inference.ai.azure.com" |
|
model_name = "gpt-4o" |
|
client = OpenAI( |
|
base_url=endpoint, |
|
api_key=api_token, |
|
) |
|
st.success("API Token is set for this session.") |
|
else: |
|
st.warning("Please enter an API token to proceed.") |
|
|
|
input_sentence_user = st.text_input("Enter a sentence:", "", disabled=st.session_state.running) |
|
input_mention_user = st.text_input("Enter a textural reference (mention) that is inside the sentence:", "", disabled=st.session_state.running) |
|
deep_search = st.selectbox("Perform deep search? (Useful for difficult mentions)", ['Yes', 'No'], index=1, disabled=st.session_state.running) |
|
|
|
if st.button("Run Entity Linking", key="run_button", disabled=st.session_state.running): |
|
if input_sentence_user and input_mention_user: |
|
|
|
if input_mention_user in input_sentence_user: |
|
with st.spinner("Applying Data Normalization module... (1/5)"): |
|
|
|
start_time = time.time() |
|
|
|
list_with_full_names = [] |
|
list_with_names_to_show = [] |
|
|
|
response = client.chat.completions.create( |
|
messages=[ |
|
{ |
|
"role": "system", |
|
"content": """ |
|
I will give you one or more labels within a sentence. Your task is as follows: |
|
|
|
Identify each label in the sentence, and check if it is an acronym. |
|
|
|
If the label is an acronym, respond with the full name of the acronym. |
|
If the label is not an acronym, respond with the label exactly as it was given to you. |
|
If a label contains multiple terms (e.g., 'phase and DIC microscopy'), treat each term within the label as a separate label. |
|
|
|
This means you should identify and explain each part of the label individually. |
|
Each part should be on its own line in the response. |
|
Context-Specific Terms: If the sentence context suggests a relevant term that applies to each label (such as "study" in 'morphological, sedimentological, and stratigraphical study'), add that term to each label’s explanation. |
|
|
|
Use context clues to determine the appropriate term to add (e.g., 'study' or 'microscopy'). |
|
Output Format: Your response should contain only the explanations, formatted as follows: |
|
|
|
Each label or part of a label should be on a new line. |
|
Do not include any additional text, and do not repeat the original sentence. |
|
Example 1: |
|
|
|
Input: |
|
|
|
label: phase and DIC microscopy |
|
context: Tardigrades have been extracted from samples using centrifugation with Ludox AM™ and mounted on individual microscope slides in Hoyer's medium for identification under phase and DIC microscopy. |
|
Expected response: |
|
|
|
phase: phase microscopy |
|
DIC microscopy: Differential interference contrast microscopy |
|
Example 2: |
|
|
|
Input: |
|
|
|
label: morphological, sedimentological, and stratigraphical study |
|
context: This paper presents results of a morphological, sedimentological, and stratigraphical study of relict beach ridges formed on a prograded coastal barrier in Bream Bay, North Island New Zealand. |
|
Expected response: |
|
|
|
morphological: morphological study |
|
sedimentological: sedimentological study |
|
stratigraphical: stratigraphical study |
|
IMPORTANT: |
|
|
|
Each label, even if nested within another, should be treated as an individual item. |
|
Each individual label or acronym should be output on a separate line. |
|
""" |
|
}, |
|
{ |
|
"role": "user", |
|
"content": f"label:{input_mention_user}, context:{input_sentence_user}" |
|
} |
|
], |
|
temperature=1.0, |
|
top_p=1.0, |
|
max_tokens=1000, |
|
model=model_name |
|
) |
|
|
|
|
|
kati = response.choices[0].message.content.splitlines() |
|
print(response.choices[0].message.content) |
|
for i in kati: |
|
context = i.split(":")[-1].strip() |
|
original_name = i.split(":")[0].strip() |
|
list_with_full_names.append(context) |
|
list_with_names_to_show.append(original_name) |
|
|
|
name = ",".join(list_with_full_names) |
|
|
|
|
|
input_sentence_user = input_sentence_user.replace(input_mention_user, name) |
|
|
|
response = client.chat.completions.create( |
|
messages=[ |
|
{ |
|
"role": "system", |
|
"content": "Given a label or labels within a sentence, provide a brief description (2-3 sentences) explaining what the label represents, similar to how a Wikipedia entry would. Format your response as follows: label: description. I want only the description of the label, not the role in the context. Include the label in the description as well. For example: Sentiment analysis: Sentiment analysis is the use of natural language processing, text analysis, computational linguistics, and biometrics to systematically identify, extract, quantify, and study affective states and subjective information.\nText analysis: Text mining, text data mining (TDM) or text analytics is the process of deriving high-quality information from text. It involves the discovery by computer of new, previously unknown information, by automatically extracting information from different written resources.", |
|
}, |
|
{ |
|
"role": "user", |
|
"content": f"label:{name}, context:{input_sentence_user}" |
|
} |
|
], |
|
temperature=1.0, |
|
top_p=1.0, |
|
max_tokens=1000, |
|
model=model_name |
|
) |
|
|
|
|
|
z = response.choices[0].message.content.splitlines() |
|
print(response.choices[0].message.content) |
|
list_with_contexts = [] |
|
for i in z: |
|
context = i.split(":")[-1].strip() |
|
list_with_contexts.append(context) |
|
st.write("✅ Applied Data Normilzation module (1/5)") |
|
|
|
async def big_main(mention, deep_search): |
|
mention = mention.split(",") |
|
with st.spinner("Applying Candidate Retrieval module... (2/5)"): |
|
for i in mention: |
|
await mains(i, deep_search) |
|
st.write("✅ Applied Candidate Retrieval module (2/5)") |
|
with st.spinner("Applying Information Gathering module... (3/5)"): |
|
for i in mention: |
|
await main(i) |
|
st.write("✅ Applied Information Gathering module (3/5)") |
|
|
|
asyncio.run(big_main(name, deep_search)) |
|
|
|
number = 0 |
|
for i,j,o in zip(list_with_full_names,list_with_contexts,list_with_names_to_show): |
|
number += 1 |
|
with st.spinner(f"Applying Candidate Selection module... (4/5) [{number}/{len(list_with_full_names)}] (This may take a while due to limited resources)"): |
|
with open(f"/home/user/app/info_extraction/{i}.json", "r") as f: |
|
json_file = json.load(f) |
|
lista = [] |
|
lista_1 = [] |
|
my_bar = st.progress(0) |
|
for index, element in enumerate(json_file): |
|
qid = element.get("qid") |
|
link = f"https://www.wikidata.org/wiki/{qid}" |
|
label = element.get("label") |
|
description = element.get("description") |
|
|
|
label_emb = model.encode([label]) |
|
desc_emb = model.encode([description]) |
|
|
|
lista.append({link: [label_emb, desc_emb]}) |
|
my_bar.progress((index + 1) / len(json_file)) |
|
print(qid) |
|
|
|
label_dataset_emb = model.encode([i]) |
|
desc_dataset_emb = model.encode([j]) |
|
|
|
for emb in lista: |
|
for k, v in emb.items(): |
|
cossim_label = model.similarity(label_dataset_emb, v[0][0]) |
|
desc_label = model.similarity(desc_dataset_emb, v[1][0]) |
|
emb_mean = np.mean([cossim_label, desc_label]) |
|
lista_1.append({k: emb_mean}) |
|
|
|
sorted_data = sorted(lista_1, key=lambda x: list(x.values())[0], reverse=True) |
|
|
|
my_bar.empty() |
|
st.write(f"✅ Applined Candidate Selection module (4/5) [{number}/{len(list_with_full_names)}]") |
|
with st.spinner(f"Applying Candidate Matching module... (5/5) [{number}/{len(list_with_full_names)}]"): |
|
if sorted_data: |
|
sorted_top = sorted_data[0] |
|
for k, v in sorted_top.items(): |
|
qid = k.split("/")[-1] |
|
|
|
wikidata2wikipedia = f""" |
|
SELECT ?wikipedia |
|
WHERE {{ |
|
?wikipedia schema:about wd:{qid} . |
|
?wikipedia schema:isPartOf <https://en.wikipedia.org/> . |
|
}} |
|
""" |
|
results = get_resultss(wikidata2wikipedia) |
|
|
|
for result in results["results"]["bindings"]: |
|
for key, value in result.items(): |
|
wikipedia = value.get("value", "None") |
|
|
|
sparql = SPARQLWrapper("http://dbpedia.org/sparql") |
|
wikidata2dbpedia = f""" |
|
SELECT ?dbpedia |
|
WHERE {{ |
|
?dbpedia owl:sameAs <http://www.wikidata.org/entity/{qid}>. |
|
}} |
|
""" |
|
sparql.setQuery(wikidata2dbpedia) |
|
sparql.setReturnFormat(JSON) |
|
results = sparql.query().convert() |
|
|
|
for result in results["results"]["bindings"]: |
|
dbpedia = result["dbpedia"]["value"] |
|
|
|
st.write(f"✅ Applied Candidate Matching module (5/5) [{number}/{len(list_with_full_names)}]") |
|
st.text(f"The correct entity for '{o}' is:") |
|
st.success(f"Wikipedia: {wikipedia}") |
|
st.success(f"Wikidata: {k}") |
|
st.success(f"DBpedia: {dbpedia}") |
|
else: |
|
st.warning(f"The entity: {o} is NIL.") |
|
else: |
|
st.warning(f"The mention '{input_mention_user}' was NOT found in the sentence.") |
|
else: |
|
st.warning("Please fill in both fields.") |
|
end_time = time.time() |
|
execution_time = end_time - start_time |
|
ETA = time.strftime("%H:%M:%S", time.gmtime(execution_time)) |
|
st.write(f"⌛ Execution time: {ETA}") |
|
|
|
st.button("Rerun", disabled=False) |
|
|
|
|
|
folder_path = "qids_folder" |
|
for filename in os.listdir(folder_path): |
|
file_path = os.path.join(folder_path, filename) |
|
os.remove(file_path) |
|
|
|
folder_path_1 = "info_extraction" |
|
for filename in os.listdir(folder_path_1): |
|
file_path = os.path.join(folder_path_1, filename) |
|
os.remove(file_path) |
|
|
|
if __name__ == "__main__": |
|
main_cli() |