address_matcher / tools /recordlinkage_funcs.py
seanpedrickcase's picture
Allowed for custom output folder. Upgraded Gradio version
8c90944
import pandas as pd
from typing import Type, Dict, List, Tuple
import recordlinkage
from datetime import datetime
PandasDataFrame = Type[pd.DataFrame]
PandasSeries = Type[pd.Series]
MatchedResults = Dict[str,Tuple[str,int]]
array = List[str]
today = datetime.now().strftime("%d%m%Y")
today_rev = datetime.now().strftime("%Y%m%d")
from tools.constants import score_cut_off_nnet_street
# ## Recordlinkage matching functions
def compute_match(predict_df_search, ref_search, orig_search_df, matching_variables,
text_columns, blocker_column, weights, fuzzy_method):
# Use the merge command to match group1 and group2
predict_df_search[matching_variables] = predict_df_search[matching_variables].astype(str)
ref_search[matching_variables] = ref_search[matching_variables].astype(str).replace("-999","")
# SaoText needs to be exactly the same to get a 'full' match. So I moved that to the exact match group
exact_columns = list(set(matching_variables) - set(text_columns))
# Replace all blanks with a space, so they can be included in the fuzzy match searches
for column in text_columns:
predict_df_search.loc[predict_df_search[column] == '', column] = ' '
ref_search.loc[ref_search[column] == '', column] = ' '
# Score based match functions
# Create an index of all pairs
indexer = recordlinkage.Index()
# Block on selected blocker column
## Remove all NAs from predict_df blocker column
if blocker_column[0] == "PaoStartNumber":
predict_df_search = predict_df_search[~(predict_df_search[blocker_column[0]].isna()) & ~(predict_df_search[blocker_column[0]] == '')& ~(predict_df_search[blocker_column[0]].str.contains(r'^\s*$', na=False))]
indexer.block(blocker_column) #matchkey.block(["Postcode", "PaoStartNumber"])
# Generate candidate pairs
pairsSBM = indexer.index(predict_df_search,ref_search)
print('Running with ' + blocker_column[0] + ' as blocker has created', len(pairsSBM), 'pairs.')
# If no pairs are found, break
if len(pairsSBM) == 0: return pd.DataFrame()
# Call the compare class from the toolkit
compareSBM = recordlinkage.Compare()
# Assign variables to matching technique - exact
for columns in exact_columns:
compareSBM.exact(columns, columns, label = columns, missing_value = 0)
# Assign variables to matching technique - fuzzy
for columns in text_columns:
if columns == "Postcode":
compareSBM.string(columns, columns, label = columns, missing_value = 0, method = "levenshtein")
else:
compareSBM.string(columns, columns, label = columns, missing_value = 0, method = fuzzy_method)
## Run the match - compare each column within the blocks according to exact or fuzzy matching (defined in cells above)
scoresSBM = compareSBM.compute(pairs = pairsSBM, x = predict_df_search, x_link = ref_search)
return scoresSBM
def calc_final_nnet_scores(scoresSBM, weights, matching_variables):
#Modify the output scores by the weights set at the start of the code
scoresSBM_w = scoresSBM*weights
### Determine matched roles that score above a threshold
# Sum all columns
scoresSBM_r = scoresSBM_w
scoresSBM_r['score'] = scoresSBM_r[matching_variables].sum(axis = 1)
scoresSBM_r['score_max'] = sum(weights.values()) # + 2 for the additional scoring from the weighted variables a couple of cells above
scoresSBM_r['score_perc'] = (scoresSBM_r['score'] / scoresSBM_r['score_max'])*100
scoresSBM_r = scoresSBM_r.reset_index()
# Rename the index if misnamed
scoresSBM_r = scoresSBM_r.rename(columns={"index":"level_1"}, errors = "ignore")
# Sort all comparisons by score in descending order
scoresSBM_r = scoresSBM_r.sort_values(by=["level_0","score_perc"], ascending = False)
# Within each search address, remove anything below the max
scoresSBM_g = scoresSBM_r.reset_index()
# Get maximum score to join on
scoresSBM_g = scoresSBM_g.groupby("level_0").max("score_perc").reset_index()[["level_0", "score_perc"]]
scoresSBM_g =scoresSBM_g.rename(columns={"score_perc":"score_perc_max"})
scoresSBM_search = scoresSBM_r.merge(scoresSBM_g, on = "level_0", how="left")
scoresSBM_search['score_perc'] = round(scoresSBM_search['score_perc'],1).astype(float)
scoresSBM_search['score_perc_max'] = round(scoresSBM_search['score_perc_max'],1).astype(float)
return scoresSBM_search
def join_on_pred_ref_details(scoresSBM_search_m, ref_search, predict_df_search):
## Join back search and ref_df address details onto matching df
scoresSBM_search_m_j = scoresSBM_search_m.merge(ref_search, left_on="level_1", right_index=True, how = "left", suffixes=("", "_ref"))
scoresSBM_search_m_j = scoresSBM_search_m_j.merge(predict_df_search, left_on="level_0", right_index=True,how="left", suffixes=("", "_pred"))
scoresSBM_search_m_j = scoresSBM_search_m_j.reindex(sorted(scoresSBM_search_m_j.columns), axis=1)
return scoresSBM_search_m_j
def rearrange_columns(scoresSBM_search_m_j, new_join_col, search_df_key_field, blocker_column, standardise):
start_columns = new_join_col.copy()
start_columns.extend(["address", "fulladdress", "level_0", "level_1","score","score_max","score_perc","score_perc_max"])
other_columns = list(set(scoresSBM_search_m_j.columns) - set(start_columns))
all_columns_order = start_columns.copy()
all_columns_order.extend(sorted(other_columns))
# Place important columns at start
scoresSBM_search_m_j = scoresSBM_search_m_j.reindex(all_columns_order, axis=1)
scoresSBM_search_m_j = scoresSBM_search_m_j.rename(columns={'address':'address_pred',
'fulladdress':'address_ref',
'level_0':'index_pred',
'level_1':'index_ref',
'score':'match_score',
'score_max':'max_possible_score',
'score_perc':'perc_weighted_columns_matched',
'score_perc_max':'perc_weighted_columns_matched_max_for_pred_address'})
scoresSBM_search_m_j = scoresSBM_search_m_j.sort_values("index_pred", ascending = True)
# ref_index is just a duplicate of index_ref, needed for outputs
scoresSBM_search_m_j["ref_index"] = scoresSBM_search_m_j["index_ref"]
#search_df_j = orig_search_df[["full_address_search", search_df_key_field]]
#scoresSBM_out = scoresSBM_search_m_j.merge(search_df_j, left_on = "address_pred", right_on = "full_address_search", how = "left")
final_cols = new_join_col.copy()
final_cols.extend([search_df_key_field, 'full_match_score_based', 'address_pred', 'address_ref',\
'match_score', 'max_possible_score', 'perc_weighted_columns_matched',\
'perc_weighted_columns_matched_max_for_pred_address',\
'Organisation', 'Organisation_ref', 'Organisation_pred',\
'SaoText', 'SaoText_ref', 'SaoText_pred',\
'SaoStartNumber', 'SaoStartNumber_ref', 'SaoStartNumber_pred',\
'SaoStartSuffix', 'SaoStartSuffix_ref', 'SaoStartSuffix_pred',\
'SaoEndNumber', 'SaoEndNumber_ref', 'SaoEndNumber_pred',\
'SaoEndSuffix', 'SaoEndSuffix_ref', 'SaoEndSuffix_pred',\
'PaoStartNumber', 'PaoStartNumber_ref', 'PaoStartNumber_pred',\
'PaoStartSuffix', 'PaoStartSuffix_ref', 'PaoStartSuffix_pred',\
'PaoEndNumber', 'PaoEndNumber_ref', 'PaoEndNumber_pred',\
'PaoEndSuffix', 'PaoEndSuffix_ref', 'PaoEndSuffix_pred',\
'PaoText', 'PaoText_ref', 'PaoText_pred',\
'Street', 'Street_ref', 'Street_pred',\
'PostTown', 'PostTown_ref', 'PostTown_pred',\
'Postcode', 'Postcode_ref', 'Postcode_pred', 'Postcode_predict',\
'index_pred', 'index_ref', 'Reference file'
])
scoresSBM_out = scoresSBM_search_m_j[final_cols]
return scoresSBM_out, start_columns
def create_matched_results_nnet(scoresSBM_best, search_df_key_field, orig_search_df, new_join_col, standardise, ref_search, blocker_column, score_cut_off):
### Make the final 'matched output' file
scoresSBM_best_pred_cols = scoresSBM_best.filter(regex='_pred$').iloc[:,1:-1]
scoresSBM_best["search_orig_address"] = (scoresSBM_best_pred_cols.agg(' '.join, axis=1)).str.strip().str.replace("\s{2,}", " ", regex=True)
scoresSBM_best_ref_cols = scoresSBM_best.filter(regex='_ref$').iloc[:,1:-1]
scoresSBM_best['reference_mod_address'] = (scoresSBM_best_ref_cols.agg(' '.join, axis=1)).str.strip().str.replace("\s{2,}", " ", regex=True)
## Create matched output df
matched_output_SBM = orig_search_df[[search_df_key_field, "full_address", "postcode", "property_number", "prop_number", "flat_number", "apart_number", "block_number", 'unit_number', "room_number", "house_court_name"]].replace(r"\bnan\b", "", regex=True).infer_objects(copy=False)
matched_output_SBM[search_df_key_field] = matched_output_SBM[search_df_key_field].astype(str)
###
matched_output_SBM = matched_output_SBM.merge(scoresSBM_best[[search_df_key_field, 'index_ref','address_ref',
'full_match_score_based', 'Reference file']], on = search_df_key_field, how = "left").\
rename(columns={"full_address":"search_orig_address"})
if 'index' not in ref_search.columns:
ref_search['ref_index'] = ref_search.index
matched_output_SBM = matched_output_SBM.merge(ref_search.drop_duplicates("fulladdress")[["ref_index", "fulladdress", "Postcode", "property_number", "prop_number", "flat_number", "apart_number", "block_number", 'unit_number', "room_number", "house_court_name", "ref_address_stand"]], left_on = "address_ref", right_on = "fulladdress", how = "left", suffixes=('_search', '_reference')).rename(columns={"fulladdress":"reference_orig_address", "ref_address_stand":"reference_list_address"})
# To replace with number check
matched_output_SBM = matched_output_SBM.rename(columns={"full_match_score_based":"full_match"})
matched_output_SBM['property_number_match'] = matched_output_SBM['full_match']
scores_SBM_best_cols = [search_df_key_field, 'full_match_score_based', 'perc_weighted_columns_matched', 'address_pred']#, "reference_mod_address"]
scores_SBM_best_cols.extend(new_join_col)
matched_output_SBM_b = scoresSBM_best[scores_SBM_best_cols]
matched_output_SBM = matched_output_SBM.merge(matched_output_SBM_b.drop_duplicates(search_df_key_field), on = search_df_key_field, how = "left")
from tools.fuzzy_match import create_diag_shortlist
matched_output_SBM = create_diag_shortlist(matched_output_SBM, "search_orig_address", score_cut_off, blocker_column, fuzzy_col='perc_weighted_columns_matched', search_mod_address="address_pred", resolve_tie_breaks=False)
matched_output_SBM['standardised_address'] = standardise
matched_output_SBM = matched_output_SBM.rename(columns={"address_pred":"search_mod_address",
'perc_weighted_columns_matched':"fuzzy_score"})
matched_output_SBM_cols = [search_df_key_field, 'search_orig_address','reference_orig_address',
'full_match',
'full_number_match',
'flat_number_match',
'room_number_match',
'block_number_match',
'property_number_match',
'close_postcode_match',
'house_court_name_match',
'fuzzy_score_match',
"fuzzy_score",
'property_number_search', 'property_number_reference',
'flat_number_search', 'flat_number_reference',
'room_number_search', 'room_number_reference',
'block_number_search', 'block_number_reference',
"unit_number_search","unit_number_reference",
'house_court_name_search', 'house_court_name_reference',
"search_mod_address", 'reference_mod_address','Postcode', 'postcode', 'ref_index', 'Reference file']
matched_output_SBM_cols.extend(new_join_col)
matched_output_SBM_cols.extend(['standardised_address'])
matched_output_SBM = matched_output_SBM[matched_output_SBM_cols]
matched_output_SBM = matched_output_SBM.sort_values(search_df_key_field, ascending=True)
return matched_output_SBM
def score_based_match(predict_df_search, ref_search, orig_search_df, matching_variables, text_columns, blocker_column, weights, fuzzy_method, score_cut_off, search_df_key_field, standardise, new_join_col, score_cut_off_nnet_street=score_cut_off_nnet_street):
scoresSBM = compute_match(predict_df_search, ref_search, orig_search_df, matching_variables, text_columns, blocker_column, weights, fuzzy_method)
if scoresSBM.empty:
# If no pairs are found, break
return pd.DataFrame(), pd.DataFrame()
scoresSBM_search = calc_final_nnet_scores(scoresSBM, weights, matching_variables)
# Filter potential matched address scores to those with highest scores only
scoresSBM_search_m = scoresSBM_search[scoresSBM_search["score_perc"] == scoresSBM_search["score_perc_max"]]
scoresSBM_search_m_j = join_on_pred_ref_details(scoresSBM_search_m, ref_search, predict_df_search)
# When blocking by street, may to have an increased threshold as this is more prone to making mistakes
if blocker_column[0] == "Street": scoresSBM_search_m_j['full_match_score_based'] = (scoresSBM_search_m_j['score_perc'] >= score_cut_off_nnet_street)
else: scoresSBM_search_m_j['full_match_score_based'] = (scoresSBM_search_m_j['score_perc'] >= score_cut_off)
### Reorder some columns
scoresSBM_out, start_columns = rearrange_columns(scoresSBM_search_m_j, new_join_col, search_df_key_field, blocker_column, standardise)
matched_output_SBM = create_matched_results_nnet(scoresSBM_out, search_df_key_field, orig_search_df, new_join_col, standardise, ref_search, blocker_column, score_cut_off)
matched_output_SBM_best = matched_output_SBM.sort_values([search_df_key_field, "full_match"], ascending = [True, False]).drop_duplicates(search_df_key_field)
scoresSBM_best = scoresSBM_out[scoresSBM_out[search_df_key_field].isin(matched_output_SBM_best[search_df_key_field])]
return scoresSBM_best, matched_output_SBM_best
def check_matches_against_fuzzy(match_results, scoresSBM, search_df_key_field):
if not match_results.empty:
if 'fuzz_full_match' not in match_results.columns:
match_results['fuzz_full_match'] = False
match_results = match_results.add_prefix("fuzz_").rename(columns={"fuzz_"+search_df_key_field:search_df_key_field})
#Merge fuzzy match full matches onto model data
scoresSBM_m = scoresSBM.merge(match_results.drop_duplicates(search_df_key_field), on = search_df_key_field, how = "left")
else:
scoresSBM_m = scoresSBM
scoresSBM_m["fuzz_full_match"] = False
scoresSBM_m['fuzz_fuzzy_score_match'] = False
scoresSBM_m['fuzz_property_number_match'] = False
scoresSBM_m['fuzz_fuzzy_score'] = 0
scoresSBM_m['fuzz_reference_orig_address'] = ""
scoresSBM_t = scoresSBM[scoresSBM["full_match_score_based"]==True]
### Create a df of matches the model finds that the fuzzy matching work did not
scoresSBM_m_model_add_matches = scoresSBM_m[(scoresSBM_m["full_match_score_based"] == True) &\
(scoresSBM_m["fuzz_full_match"] == False)]
# Drop some irrelevant columns
first_cols = ['UPRN', search_df_key_field, 'full_match_score_based', 'fuzz_full_match', 'fuzz_fuzzy_score_match', 'fuzz_property_number_match',\
'fuzz_fuzzy_score', 'match_score', 'max_possible_score', 'perc_weighted_columns_matched',\
'perc_weighted_columns_matched_max_for_pred_address', 'address_pred',\
'address_ref', 'fuzz_reference_orig_address']
last_cols = [col for col in scoresSBM_m_model_add_matches.columns if col not in first_cols]
scoresSBM_m_model_add_matches = scoresSBM_m_model_add_matches[first_cols+last_cols].drop(['fuzz_search_mod_address',
'fuzz_reference_mod_address', 'fuzz_fulladdress', 'fuzz_UPRN'], axis=1, errors="ignore")
### Create a df for matches the fuzzy matching found that the neural net model does not
if not match_results.empty:
scoresSBM_t_model_failed = match_results[(~match_results[search_df_key_field].isin(scoresSBM_t[search_df_key_field])) &\
(match_results["fuzz_full_match"] == True)]
scoresSBM_t_model_failed = scoresSBM_t_model_failed.\
merge(scoresSBM.drop_duplicates(search_df_key_field), on = search_df_key_field, how = "left")
scoresSBM_t_model_failed = scoresSBM_t_model_failed[first_cols+last_cols].drop(['fuzz_search_mod_address',
'fuzz_reference_mod_address', 'fuzz_fulladdress', 'fuzz_UPRN'], axis=1, errors="ignore")
else:
scoresSBM_t_model_failed = pd.DataFrame()
## Join back onto original results file and export
scoresSBM_new_matches_from_model = scoresSBM_m_model_add_matches.drop_duplicates(search_df_key_field)
if not match_results.empty:
match_results_out = match_results.merge(scoresSBM_new_matches_from_model[[search_df_key_field, 'full_match_score_based', 'address_pred',
'address_ref']], on = search_df_key_field, how = "left")
match_results_out.loc[match_results_out['full_match_score_based'].isna(),'full_match_score_based'] = False
#match_results_out['full_match_score_based'].value_counts()
match_results_out["full_match_fuzzy_or_score_based"] = (match_results_out["fuzz_full_match"] == True) |\
(match_results_out["full_match_score_based"] == True)
else: match_results_out = match_results
return scoresSBM_m_model_add_matches, scoresSBM_t_model_failed, match_results_out