|
|
|
import datetime |
|
|
|
import re |
|
import json |
|
import pandas as pd |
|
|
|
import gradio as gr |
|
|
|
from gliner import GLiNER |
|
|
|
from transformers import pipeline |
|
|
|
|
|
model = GLiNER.from_pretrained("chris32/gliner_multi_pii_real_state-v2") |
|
model.eval() |
|
|
|
|
|
model_name = "chris32/distilbert-base-spanish-uncased-finetuned-text-intelligence" |
|
pipe = pipeline(model = model_name, device = "cpu") |
|
|
|
|
|
YEAR_OF_REMODELING_LIMIT = 100 |
|
CURRENT_YEAR = int(datetime.date.today().year) |
|
SCORE_LIMIT_SIMILARITY_NAMES = 70 |
|
|
|
def clean_text(text): |
|
|
|
replacement_char = " # " |
|
text = re.sub(r'<br\s*\/?>', replacement_char, text) |
|
|
|
|
|
cleaned_text = re.sub(r'<[^>]*>', '', text) |
|
cleaned_text = re.sub(r' ', ' ', cleaned_text) |
|
cleaned_text = re.sub(r'&', '&', cleaned_text) |
|
|
|
|
|
|
|
|
|
|
|
|
|
cleaned_text = re.sub(r'\s+', ' ', cleaned_text) |
|
|
|
|
|
cleaned_text = cleaned_text.strip() |
|
|
|
|
|
cleaned_text = cleaned_text.replace("..", ".").replace(",,", ",") |
|
|
|
return cleaned_text |
|
|
|
def format_gliner_predictions(prediction): |
|
if len(prediction) > 0: |
|
|
|
prediction_df = pd.DataFrame(prediction)\ |
|
.sort_values("score", ascending = False)\ |
|
.drop_duplicates(subset = "label", keep = "first") |
|
|
|
|
|
prediction_df["position"] = prediction_df.apply(lambda x: (x["start"], x["end"]) ,axis = 1) |
|
|
|
|
|
prediction_df["label_text"] = prediction_df["label"].apply(lambda x: f"pred_{x}") |
|
prediction_df["label_prob"] = prediction_df["label"].apply(lambda x: f"prob_{x}") |
|
prediction_df["label_position"] = prediction_df["label"].apply(lambda x: f"pos_{x}") |
|
|
|
|
|
entities = prediction_df.set_index("label_text")["text"].to_dict() |
|
entities_probs = prediction_df.set_index("label_prob")["score"].to_dict() |
|
entities_positions = prediction_df.set_index("label_position")["position"].to_dict() |
|
predictions_formatted = {**entities, **entities_probs, **entities_positions} |
|
|
|
return predictions_formatted |
|
else: |
|
return dict() |
|
|
|
def clean_prediction(row, feature_name, threshols_dict, clean_functions_dict): |
|
|
|
prediction = row[f"pred_{feature_name}"] |
|
prob = row[f"prob_{feature_name}"] |
|
|
|
|
|
if prob > threshols_dict[feature_name]: |
|
clean_function = clean_functions_dict[feature_name] |
|
prediction_clean = clean_function(prediction) |
|
return prediction_clean |
|
else: |
|
return None |
|
|
|
surfaces_words_to_omit = ["ha", "hect", "lts", "litros", "mil"] |
|
tower_name_key_words_to_keep = ["torr", "towe"] |
|
|
|
def has_number(string): |
|
return bool(re.search(r'\d', string)) |
|
|
|
def contains_multiplication(string): |
|
|
|
pattern = r'\b([\d,]+(?:\.\d+)?)\s*(?:\w+\s*)*[xX]\s*([\d,]+(?:\.\d+)?)\s*(?:\w+\s*)*\b' |
|
|
|
|
|
match = re.search(pattern, string) |
|
|
|
|
|
if match: |
|
return True |
|
else: |
|
return False |
|
|
|
def extract_first_number_from_string(text): |
|
if isinstance(text, str): |
|
match = re.search(r'\b\d*\.?\d+\b|\d*\.?\d+', text) |
|
if match: |
|
start_pos = match.start() |
|
end_pos = match.end() |
|
number = int(float(match.group())) |
|
return number, start_pos, end_pos |
|
else: |
|
return None, None, None |
|
else: |
|
return None, None, None |
|
|
|
def get_character(string, index): |
|
if len(string) > index: |
|
return string[index] |
|
else: |
|
return None |
|
|
|
def find_valid_comma_separated_number(string): |
|
|
|
match = re.match(r'^(\d{1,3},\d{3})(?:[^0-9,]|$)', string) |
|
if match: |
|
valid_number = int(match.group(1).replace(",", "")) |
|
return valid_number |
|
else: |
|
return None |
|
|
|
def extract_surface_from_string(string: str) -> int: |
|
if isinstance(string, str): |
|
|
|
if not(has_number(string)): return None |
|
|
|
|
|
if contains_multiplication(string): return None |
|
|
|
|
|
if any([word in string.lower() for word in surfaces_words_to_omit]): return None |
|
|
|
|
|
number, start_pos, end_pos = extract_first_number_from_string(string) |
|
|
|
|
|
if isinstance(number, int): |
|
if get_character(string, end_pos) == ",": |
|
valid_comma_separated_number = find_valid_comma_separated_number(string[start_pos: -1]) |
|
return valid_comma_separated_number |
|
else: |
|
return number |
|
else: |
|
return None |
|
else: |
|
return None |
|
|
|
def clean_prediction(row, feature_name, threshols_dict, clean_functions_dict): |
|
|
|
prediction = row[f"pred_{feature_name}"] |
|
prob = row[f"prob_{feature_name}"] |
|
|
|
|
|
if prob > threshols_dict[feature_name]: |
|
clean_function = clean_functions_dict[feature_name] |
|
prediction_clean = clean_function(prediction) |
|
return prediction_clean |
|
else: |
|
return None |
|
|
|
def extract_remodeling_year_from_string(string): |
|
if isinstance(string, str): |
|
|
|
match = re.search(r'\b\d{4}\b', string) |
|
if match: |
|
year_predicted = int(match.group()) |
|
else: |
|
|
|
match = re.search(r'(\d+) (year|years|anio|año|an|añ)', string.lower(), re.IGNORECASE) |
|
if match: |
|
past_years_predicted = int(match.group(1)) |
|
year_predicted = CURRENT_YEAR - past_years_predicted |
|
else: |
|
return None |
|
|
|
|
|
is_valid_year = (year_predicted <= CURRENT_YEAR) and (YEAR_OF_REMODELING_LIMIT > CURRENT_YEAR - year_predicted) |
|
return year_predicted if is_valid_year else None |
|
|
|
return None |
|
|
|
def extract_valid_string_left_dotted(string, text, pos): |
|
if isinstance(string, str): |
|
|
|
left_pos, rigth_pos = pos |
|
|
|
|
|
if left_pos < 5: |
|
return None |
|
|
|
if string[0].isdigit(): |
|
|
|
sub_text = text[left_pos - 5: rigth_pos] |
|
|
|
|
|
if text[left_pos - 1] == ".": |
|
|
|
|
|
if text[left_pos - 2].isdigit(): |
|
|
|
|
|
pattern = r'^(?![\d.,])\D*\d{1,3}\.' + re.escape(string) |
|
match = re.search(pattern, sub_text) |
|
if match: |
|
return match.group(0) |
|
else: |
|
return None |
|
else: |
|
return string |
|
else: |
|
return string |
|
else: |
|
return string |
|
else: |
|
return None |
|
|
|
|
|
clean_functions_dict = { |
|
"SUPERFICIE_TERRAZA": extract_surface_from_string, |
|
"SUPERFICIE_JARDIN": extract_surface_from_string, |
|
"SUPERFICIE_TERRENO": extract_surface_from_string, |
|
"SUPERFICIE_HABITABLE": extract_surface_from_string, |
|
"SUPERFICIE_BALCON": extract_surface_from_string, |
|
"AÑO_REMODELACIÓN": extract_remodeling_year_from_string, |
|
"NOMBRE_COMPLETO_ARQUITECTO": lambda x: x, |
|
'NOMBRE_CLUB_GOLF': lambda x: x, |
|
'NOMBRE_TORRE': lambda x: x, |
|
'NOMBRE_CONDOMINIO': lambda x: x, |
|
'NOMBRE_DESARROLLO': lambda x: x, |
|
} |
|
|
|
threshols_dict = { |
|
"SUPERFICIE_TERRAZA": 0.9, |
|
"SUPERFICIE_JARDIN": 0.9, |
|
"SUPERFICIE_TERRENO": 0.9, |
|
"SUPERFICIE_HABITABLE": 0.9, |
|
"SUPERFICIE_BALCON": 0.9, |
|
"AÑO_REMODELACIÓN": 0.9, |
|
"NOMBRE_COMPLETO_ARQUITECTO": 0.9, |
|
'NOMBRE_CLUB_GOLF': 0.9, |
|
'NOMBRE_TORRE': 0.9, |
|
'NOMBRE_CONDOMINIO': 0.9, |
|
'NOMBRE_DESARROLLO': 0.9, |
|
} |
|
|
|
threshols_dict = { |
|
"SUPERFICIE_BALCON": 0.7697697697697697, |
|
"SUPERFICIE_TERRAZA": 0.953953953953954, |
|
"SUPERFICIE_JARDIN": 0.9519519519519519, |
|
"SUPERFICIE_TERRENO": 0.980980980980981 - 0.05, |
|
"SUPERFICIE_HABITABLE": 0.978978978978979 - 0.02, |
|
"AÑO_REMODELACIÓN": 0.996996996996997 - 0.01, |
|
"NOMBRE_COMPLETO_ARQUITECTO": 0.8878878878878879, |
|
"NOMBRE_CLUB_GOLF": 0.8708708708708709, |
|
"NOMBRE_TORRE": 0.8458458458458459 - 0.04, |
|
"NOMBRE_CONDOMINIO": 0.965965965965966, |
|
"NOMBRE_DESARROLLO": 0.9229229229229229 |
|
} |
|
|
|
label_names_dict = { |
|
'LABEL_0': None, |
|
'LABEL_1': 1, |
|
'LABEL_2': 2, |
|
'LABEL_3': 3, |
|
} |
|
BERT_SCORE_LIMIT = 0.980819808198082 |
|
|
|
def extract_max_label_score(probabilities): |
|
|
|
max_item = max(probabilities, key=lambda x: x['score']) |
|
|
|
label = max_item['label'] |
|
score = max_item['score'] |
|
|
|
return label, score |
|
|
|
def clean_prediction_bert(label, score): |
|
if score > BERT_SCORE_LIMIT: |
|
label_formatted = label_names_dict.get(label, None) |
|
return label_formatted |
|
else: |
|
return None |
|
|
|
|
|
pipe_config = { |
|
"batch_size": 8, |
|
"truncation": True, |
|
"max_length": 250, |
|
"add_special_tokens": True, |
|
"return_all_scores": True, |
|
"padding": True, |
|
} |
|
|
|
def generate_answer(text): |
|
labels = [ |
|
'SUPERFICIE_JARDIN', |
|
'NOMBRE_CLUB_GOLF', |
|
'SUPERFICIE_TERRENO', |
|
'SUPERFICIE_HABITABLE', |
|
'SUPERFICIE_TERRAZA', |
|
'NOMBRE_COMPLETO_ARQUITECTO', |
|
'SUPERFICIE_BALCON', |
|
'NOMBRE_DESARROLLO', |
|
'NOMBRE_TORRE', |
|
'NOMBRE_CONDOMINIO', |
|
'AÑO_REMODELACIÓN' |
|
] |
|
|
|
|
|
text = clean_text(text) |
|
|
|
|
|
entities = model.predict_entities(text, labels, threshold=0.4) |
|
|
|
|
|
entities_formatted = format_gliner_predictions(entities) |
|
|
|
|
|
feature_surfaces = ['SUPERFICIE_BALCON', 'SUPERFICIE_TERRAZA', 'SUPERFICIE_JARDIN', 'SUPERFICIE_TERRENO', 'SUPERFICIE_HABITABLE'] |
|
for feature_name in feature_surfaces: |
|
if entities_formatted.get(f"pred_{feature_name}", None) != None: |
|
entities_formatted[f"pred_{feature_name}"] = extract_valid_string_left_dotted(entities_formatted[f"pred_{feature_name}"], text, entities_formatted[f"pos_{feature_name}"]) |
|
|
|
|
|
entities_names = list({c.replace("pred_", "").replace("prob_", "").replace("pos_", "") for c in list(entities_formatted.keys())}) |
|
entities_cleaned = dict() |
|
for feature_name in entities_names: |
|
entity_prediction_cleaned = clean_prediction(entities_formatted, feature_name, threshols_dict, clean_functions_dict) |
|
if isinstance(entity_prediction_cleaned, str) or isinstance(entity_prediction_cleaned, int): |
|
entities_cleaned[feature_name] = entity_prediction_cleaned |
|
|
|
|
|
predictions = pipe([text], **pipe_config) |
|
|
|
|
|
label, score = extract_max_label_score(predictions[0]) |
|
entities_formatted["NIVELES_CASA"] = label |
|
entities_formatted["prob_NIVELES_CASA"] = score |
|
prediction_cleaned = clean_prediction_bert(label, score) |
|
if isinstance(prediction_cleaned, int): |
|
entities_cleaned["NIVELES_CASA"] = prediction_cleaned |
|
|
|
|
|
result_json = json.dumps(entities_cleaned, indent = 4, ensure_ascii = False) |
|
|
|
return "Clean Result:" + result_json + "\n \n" + "Raw Result:" + json.dumps(entities_formatted, indent = 4, ensure_ascii = False) |
|
|
|
|
|
|
|
|
|
iface = gr.Interface( |
|
fn=generate_answer, |
|
inputs="text", |
|
outputs="text", |
|
title="Text Intelligence for Real State", |
|
description="Input text describing the property." |
|
) |
|
|
|
iface.launch() |
|
|