Spaces:
Runtime error
Runtime error
import pandas as pd | |
from typing import Type, Dict, List, Tuple | |
import recordlinkage | |
from datetime import datetime | |
PandasDataFrame = Type[pd.DataFrame] | |
PandasSeries = Type[pd.Series] | |
MatchedResults = Dict[str,Tuple[str,int]] | |
array = List[str] | |
today = datetime.now().strftime("%d%m%Y") | |
today_rev = datetime.now().strftime("%Y%m%d") | |
from tools.constants import score_cut_off_nnet_street | |
# ## Recordlinkage matching functions | |
def compute_match(predict_df_search, ref_search, orig_search_df, matching_variables, | |
text_columns, blocker_column, weights, fuzzy_method): | |
# Use the merge command to match group1 and group2 | |
predict_df_search[matching_variables] = predict_df_search[matching_variables].astype(str) | |
ref_search[matching_variables] = ref_search[matching_variables].astype(str).replace("-999","") | |
# SaoText needs to be exactly the same to get a 'full' match. So I moved that to the exact match group | |
exact_columns = list(set(matching_variables) - set(text_columns)) | |
# Replace all blanks with a space, so they can be included in the fuzzy match searches | |
for column in text_columns: | |
predict_df_search.loc[predict_df_search[column] == '', column] = ' ' | |
ref_search.loc[ref_search[column] == '', column] = ' ' | |
# Score based match functions | |
# Create an index of all pairs | |
indexer = recordlinkage.Index() | |
# Block on selected blocker column | |
## Remove all NAs from predict_df blocker column | |
if blocker_column[0] == "PaoStartNumber": | |
predict_df_search = predict_df_search[~(predict_df_search[blocker_column[0]].isna()) & ~(predict_df_search[blocker_column[0]] == '')& ~(predict_df_search[blocker_column[0]].str.contains(r'^\s*$', na=False))] | |
indexer.block(blocker_column) #matchkey.block(["Postcode", "PaoStartNumber"]) | |
# Generate candidate pairs | |
pairsSBM = indexer.index(predict_df_search,ref_search) | |
print('Running with ' + blocker_column[0] + ' as blocker has created', len(pairsSBM), 'pairs.') | |
# If no pairs are found, break | |
if len(pairsSBM) == 0: return pd.DataFrame() | |
# Call the compare class from the toolkit | |
compareSBM = recordlinkage.Compare() | |
# Assign variables to matching technique - exact | |
for columns in exact_columns: | |
compareSBM.exact(columns, columns, label = columns, missing_value = 0) | |
# Assign variables to matching technique - fuzzy | |
for columns in text_columns: | |
if columns == "Postcode": | |
compareSBM.string(columns, columns, label = columns, missing_value = 0, method = "levenshtein") | |
else: | |
compareSBM.string(columns, columns, label = columns, missing_value = 0, method = fuzzy_method) | |
## Run the match - compare each column within the blocks according to exact or fuzzy matching (defined in cells above) | |
scoresSBM = compareSBM.compute(pairs = pairsSBM, x = predict_df_search, x_link = ref_search) | |
return scoresSBM | |
def calc_final_nnet_scores(scoresSBM, weights, matching_variables): | |
#Modify the output scores by the weights set at the start of the code | |
scoresSBM_w = scoresSBM*weights | |
### Determine matched roles that score above a threshold | |
# Sum all columns | |
scoresSBM_r = scoresSBM_w | |
scoresSBM_r['score'] = scoresSBM_r[matching_variables].sum(axis = 1) | |
scoresSBM_r['score_max'] = sum(weights.values()) # + 2 for the additional scoring from the weighted variables a couple of cells above | |
scoresSBM_r['score_perc'] = (scoresSBM_r['score'] / scoresSBM_r['score_max'])*100 | |
scoresSBM_r = scoresSBM_r.reset_index() | |
# Rename the index if misnamed | |
scoresSBM_r = scoresSBM_r.rename(columns={"index":"level_1"}, errors = "ignore") | |
# Sort all comparisons by score in descending order | |
scoresSBM_r = scoresSBM_r.sort_values(by=["level_0","score_perc"], ascending = False) | |
# Within each search address, remove anything below the max | |
scoresSBM_g = scoresSBM_r.reset_index() | |
# Get maximum score to join on | |
scoresSBM_g = scoresSBM_g.groupby("level_0").max("score_perc").reset_index()[["level_0", "score_perc"]] | |
scoresSBM_g =scoresSBM_g.rename(columns={"score_perc":"score_perc_max"}) | |
scoresSBM_search = scoresSBM_r.merge(scoresSBM_g, on = "level_0", how="left") | |
scoresSBM_search['score_perc'] = round(scoresSBM_search['score_perc'],1).astype(float) | |
scoresSBM_search['score_perc_max'] = round(scoresSBM_search['score_perc_max'],1).astype(float) | |
return scoresSBM_search | |
def join_on_pred_ref_details(scoresSBM_search_m, ref_search, predict_df_search): | |
## Join back search and ref_df address details onto matching df | |
scoresSBM_search_m_j = scoresSBM_search_m.merge(ref_search, left_on="level_1", right_index=True, how = "left", suffixes=("", "_ref")) | |
scoresSBM_search_m_j = scoresSBM_search_m_j.merge(predict_df_search, left_on="level_0", right_index=True,how="left", suffixes=("", "_pred")) | |
scoresSBM_search_m_j = scoresSBM_search_m_j.reindex(sorted(scoresSBM_search_m_j.columns), axis=1) | |
return scoresSBM_search_m_j | |
def rearrange_columns(scoresSBM_search_m_j, new_join_col, search_df_key_field, blocker_column, standardise): | |
start_columns = new_join_col.copy() | |
start_columns.extend(["address", "fulladdress", "level_0", "level_1","score","score_max","score_perc","score_perc_max"]) | |
other_columns = list(set(scoresSBM_search_m_j.columns) - set(start_columns)) | |
all_columns_order = start_columns.copy() | |
all_columns_order.extend(sorted(other_columns)) | |
# Place important columns at start | |
scoresSBM_search_m_j = scoresSBM_search_m_j.reindex(all_columns_order, axis=1) | |
scoresSBM_search_m_j = scoresSBM_search_m_j.rename(columns={'address':'address_pred', | |
'fulladdress':'address_ref', | |
'level_0':'index_pred', | |
'level_1':'index_ref', | |
'score':'match_score', | |
'score_max':'max_possible_score', | |
'score_perc':'perc_weighted_columns_matched', | |
'score_perc_max':'perc_weighted_columns_matched_max_for_pred_address'}) | |
scoresSBM_search_m_j = scoresSBM_search_m_j.sort_values("index_pred", ascending = True) | |
# ref_index is just a duplicate of index_ref, needed for outputs | |
scoresSBM_search_m_j["ref_index"] = scoresSBM_search_m_j["index_ref"] | |
#search_df_j = orig_search_df[["full_address_search", search_df_key_field]] | |
#scoresSBM_out = scoresSBM_search_m_j.merge(search_df_j, left_on = "address_pred", right_on = "full_address_search", how = "left") | |
final_cols = new_join_col.copy() | |
final_cols.extend([search_df_key_field, 'full_match_score_based', 'address_pred', 'address_ref',\ | |
'match_score', 'max_possible_score', 'perc_weighted_columns_matched',\ | |
'perc_weighted_columns_matched_max_for_pred_address',\ | |
'Organisation', 'Organisation_ref', 'Organisation_pred',\ | |
'SaoText', 'SaoText_ref', 'SaoText_pred',\ | |
'SaoStartNumber', 'SaoStartNumber_ref', 'SaoStartNumber_pred',\ | |
'SaoStartSuffix', 'SaoStartSuffix_ref', 'SaoStartSuffix_pred',\ | |
'SaoEndNumber', 'SaoEndNumber_ref', 'SaoEndNumber_pred',\ | |
'SaoEndSuffix', 'SaoEndSuffix_ref', 'SaoEndSuffix_pred',\ | |
'PaoStartNumber', 'PaoStartNumber_ref', 'PaoStartNumber_pred',\ | |
'PaoStartSuffix', 'PaoStartSuffix_ref', 'PaoStartSuffix_pred',\ | |
'PaoEndNumber', 'PaoEndNumber_ref', 'PaoEndNumber_pred',\ | |
'PaoEndSuffix', 'PaoEndSuffix_ref', 'PaoEndSuffix_pred',\ | |
'PaoText', 'PaoText_ref', 'PaoText_pred',\ | |
'Street', 'Street_ref', 'Street_pred',\ | |
'PostTown', 'PostTown_ref', 'PostTown_pred',\ | |
'Postcode', 'Postcode_ref', 'Postcode_pred', 'Postcode_predict',\ | |
'index_pred', 'index_ref', 'Reference file' | |
]) | |
scoresSBM_out = scoresSBM_search_m_j[final_cols] | |
return scoresSBM_out, start_columns | |
def create_matched_results_nnet(scoresSBM_best, search_df_key_field, orig_search_df, new_join_col, standardise, ref_search, blocker_column, score_cut_off): | |
### Make the final 'matched output' file | |
scoresSBM_best_pred_cols = scoresSBM_best.filter(regex='_pred$').iloc[:,1:-1] | |
scoresSBM_best["search_orig_address"] = (scoresSBM_best_pred_cols.agg(' '.join, axis=1)).str.strip().str.replace("\s{2,}", " ", regex=True) | |
scoresSBM_best_ref_cols = scoresSBM_best.filter(regex='_ref$').iloc[:,1:-1] | |
scoresSBM_best['reference_mod_address'] = (scoresSBM_best_ref_cols.agg(' '.join, axis=1)).str.strip().str.replace("\s{2,}", " ", regex=True) | |
## Create matched output df | |
matched_output_SBM = orig_search_df[[search_df_key_field, "full_address", "postcode", "property_number", "prop_number", "flat_number", "apart_number", "block_number", 'unit_number', "room_number", "house_court_name"]].replace(r"\bnan\b", "", regex=True).infer_objects(copy=False) | |
matched_output_SBM[search_df_key_field] = matched_output_SBM[search_df_key_field].astype(str) | |
### | |
matched_output_SBM = matched_output_SBM.merge(scoresSBM_best[[search_df_key_field, 'index_ref','address_ref', | |
'full_match_score_based', 'Reference file']], on = search_df_key_field, how = "left").\ | |
rename(columns={"full_address":"search_orig_address"}) | |
if 'index' not in ref_search.columns: | |
ref_search['ref_index'] = ref_search.index | |
matched_output_SBM = matched_output_SBM.merge(ref_search.drop_duplicates("fulladdress")[["ref_index", "fulladdress", "Postcode", "property_number", "prop_number", "flat_number", "apart_number", "block_number", 'unit_number', "room_number", "house_court_name", "ref_address_stand"]], left_on = "address_ref", right_on = "fulladdress", how = "left", suffixes=('_search', '_reference')).rename(columns={"fulladdress":"reference_orig_address", "ref_address_stand":"reference_list_address"}) | |
# To replace with number check | |
matched_output_SBM = matched_output_SBM.rename(columns={"full_match_score_based":"full_match"}) | |
matched_output_SBM['property_number_match'] = matched_output_SBM['full_match'] | |
scores_SBM_best_cols = [search_df_key_field, 'full_match_score_based', 'perc_weighted_columns_matched', 'address_pred']#, "reference_mod_address"] | |
scores_SBM_best_cols.extend(new_join_col) | |
matched_output_SBM_b = scoresSBM_best[scores_SBM_best_cols] | |
matched_output_SBM = matched_output_SBM.merge(matched_output_SBM_b.drop_duplicates(search_df_key_field), on = search_df_key_field, how = "left") | |
from tools.fuzzy_match import create_diag_shortlist | |
matched_output_SBM = create_diag_shortlist(matched_output_SBM, "search_orig_address", score_cut_off, blocker_column, fuzzy_col='perc_weighted_columns_matched', search_mod_address="address_pred", resolve_tie_breaks=False) | |
matched_output_SBM['standardised_address'] = standardise | |
matched_output_SBM = matched_output_SBM.rename(columns={"address_pred":"search_mod_address", | |
'perc_weighted_columns_matched':"fuzzy_score"}) | |
matched_output_SBM_cols = [search_df_key_field, 'search_orig_address','reference_orig_address', | |
'full_match', | |
'full_number_match', | |
'flat_number_match', | |
'room_number_match', | |
'block_number_match', | |
'property_number_match', | |
'close_postcode_match', | |
'house_court_name_match', | |
'fuzzy_score_match', | |
"fuzzy_score", | |
'property_number_search', 'property_number_reference', | |
'flat_number_search', 'flat_number_reference', | |
'room_number_search', 'room_number_reference', | |
'block_number_search', 'block_number_reference', | |
"unit_number_search","unit_number_reference", | |
'house_court_name_search', 'house_court_name_reference', | |
"search_mod_address", 'reference_mod_address','Postcode', 'postcode', 'ref_index', 'Reference file'] | |
matched_output_SBM_cols.extend(new_join_col) | |
matched_output_SBM_cols.extend(['standardised_address']) | |
matched_output_SBM = matched_output_SBM[matched_output_SBM_cols] | |
matched_output_SBM = matched_output_SBM.sort_values(search_df_key_field, ascending=True) | |
return matched_output_SBM | |
def score_based_match(predict_df_search, ref_search, orig_search_df, matching_variables, text_columns, blocker_column, weights, fuzzy_method, score_cut_off, search_df_key_field, standardise, new_join_col, score_cut_off_nnet_street=score_cut_off_nnet_street): | |
scoresSBM = compute_match(predict_df_search, ref_search, orig_search_df, matching_variables, text_columns, blocker_column, weights, fuzzy_method) | |
if scoresSBM.empty: | |
# If no pairs are found, break | |
return pd.DataFrame(), pd.DataFrame() | |
scoresSBM_search = calc_final_nnet_scores(scoresSBM, weights, matching_variables) | |
# Filter potential matched address scores to those with highest scores only | |
scoresSBM_search_m = scoresSBM_search[scoresSBM_search["score_perc"] == scoresSBM_search["score_perc_max"]] | |
scoresSBM_search_m_j = join_on_pred_ref_details(scoresSBM_search_m, ref_search, predict_df_search) | |
# When blocking by street, may to have an increased threshold as this is more prone to making mistakes | |
if blocker_column[0] == "Street": scoresSBM_search_m_j['full_match_score_based'] = (scoresSBM_search_m_j['score_perc'] >= score_cut_off_nnet_street) | |
else: scoresSBM_search_m_j['full_match_score_based'] = (scoresSBM_search_m_j['score_perc'] >= score_cut_off) | |
### Reorder some columns | |
scoresSBM_out, start_columns = rearrange_columns(scoresSBM_search_m_j, new_join_col, search_df_key_field, blocker_column, standardise) | |
matched_output_SBM = create_matched_results_nnet(scoresSBM_out, search_df_key_field, orig_search_df, new_join_col, standardise, ref_search, blocker_column, score_cut_off) | |
matched_output_SBM_best = matched_output_SBM.sort_values([search_df_key_field, "full_match"], ascending = [True, False]).drop_duplicates(search_df_key_field) | |
scoresSBM_best = scoresSBM_out[scoresSBM_out[search_df_key_field].isin(matched_output_SBM_best[search_df_key_field])] | |
return scoresSBM_best, matched_output_SBM_best | |
def check_matches_against_fuzzy(match_results, scoresSBM, search_df_key_field): | |
if not match_results.empty: | |
if 'fuzz_full_match' not in match_results.columns: | |
match_results['fuzz_full_match'] = False | |
match_results = match_results.add_prefix("fuzz_").rename(columns={"fuzz_"+search_df_key_field:search_df_key_field}) | |
#Merge fuzzy match full matches onto model data | |
scoresSBM_m = scoresSBM.merge(match_results.drop_duplicates(search_df_key_field), on = search_df_key_field, how = "left") | |
else: | |
scoresSBM_m = scoresSBM | |
scoresSBM_m["fuzz_full_match"] = False | |
scoresSBM_m['fuzz_fuzzy_score_match'] = False | |
scoresSBM_m['fuzz_property_number_match'] = False | |
scoresSBM_m['fuzz_fuzzy_score'] = 0 | |
scoresSBM_m['fuzz_reference_orig_address'] = "" | |
scoresSBM_t = scoresSBM[scoresSBM["full_match_score_based"]==True] | |
### Create a df of matches the model finds that the fuzzy matching work did not | |
scoresSBM_m_model_add_matches = scoresSBM_m[(scoresSBM_m["full_match_score_based"] == True) &\ | |
(scoresSBM_m["fuzz_full_match"] == False)] | |
# Drop some irrelevant columns | |
first_cols = ['UPRN', search_df_key_field, 'full_match_score_based', 'fuzz_full_match', 'fuzz_fuzzy_score_match', 'fuzz_property_number_match',\ | |
'fuzz_fuzzy_score', 'match_score', 'max_possible_score', 'perc_weighted_columns_matched',\ | |
'perc_weighted_columns_matched_max_for_pred_address', 'address_pred',\ | |
'address_ref', 'fuzz_reference_orig_address'] | |
last_cols = [col for col in scoresSBM_m_model_add_matches.columns if col not in first_cols] | |
scoresSBM_m_model_add_matches = scoresSBM_m_model_add_matches[first_cols+last_cols].drop(['fuzz_search_mod_address', | |
'fuzz_reference_mod_address', 'fuzz_fulladdress', 'fuzz_UPRN'], axis=1, errors="ignore") | |
### Create a df for matches the fuzzy matching found that the neural net model does not | |
if not match_results.empty: | |
scoresSBM_t_model_failed = match_results[(~match_results[search_df_key_field].isin(scoresSBM_t[search_df_key_field])) &\ | |
(match_results["fuzz_full_match"] == True)] | |
scoresSBM_t_model_failed = scoresSBM_t_model_failed.\ | |
merge(scoresSBM.drop_duplicates(search_df_key_field), on = search_df_key_field, how = "left") | |
scoresSBM_t_model_failed = scoresSBM_t_model_failed[first_cols+last_cols].drop(['fuzz_search_mod_address', | |
'fuzz_reference_mod_address', 'fuzz_fulladdress', 'fuzz_UPRN'], axis=1, errors="ignore") | |
else: | |
scoresSBM_t_model_failed = pd.DataFrame() | |
## Join back onto original results file and export | |
scoresSBM_new_matches_from_model = scoresSBM_m_model_add_matches.drop_duplicates(search_df_key_field) | |
if not match_results.empty: | |
match_results_out = match_results.merge(scoresSBM_new_matches_from_model[[search_df_key_field, 'full_match_score_based', 'address_pred', | |
'address_ref']], on = search_df_key_field, how = "left") | |
match_results_out.loc[match_results_out['full_match_score_based'].isna(),'full_match_score_based'] = False | |
#match_results_out['full_match_score_based'].value_counts() | |
match_results_out["full_match_fuzzy_or_score_based"] = (match_results_out["fuzz_full_match"] == True) |\ | |
(match_results_out["full_match_score_based"] == True) | |
else: match_results_out = match_results | |
return scoresSBM_m_model_add_matches, scoresSBM_t_model_failed, match_results_out | |