import spacy import streamlit as st import re import logging from presidio_anonymizer import AnonymizerEngine from presidio_analyzer import AnalyzerEngine, PatternRecognizer, RecognizerResult, EntityRecognizer from annotated_text import annotated_text from flair_recognizer import FlairRecognizer from detoxify import Detoxify ############################### #### Render Streamlit page #### ############################### st.title("Anonymise your text!") st.markdown( "This mini-app anonymises text using Flair and Presidio. You can find the code in the Files and Versions tabs in the [HuggingFace page](https://huggingface.co./spaces/arogeriogel/anonymise_this)" ) # Configure logger logging.basicConfig(format="\n%(asctime)s\n%(message)s", level=logging.INFO, force=True) ############################## ###### Define functions ###### ############################## # @st.cache_resource(show_spinner="Fetching model from cache...") @st.cache(allow_output_mutation=True) def analyzer_engine(): """Return AnalyzerEngine.""" analyzer = AnalyzerEngine() flair_recognizer = FlairRecognizer() analyzer.registry.add_recognizer(flair_recognizer) return analyzer def analyze(**kwargs): """Analyze input using Analyzer engine and input arguments (kwargs).""" if "entities" not in kwargs or "All" in kwargs["entities"]: kwargs["entities"] = None results = analyzer_engine().analyze(**kwargs) st.session_state.analyze_results = results def annotate(): text = st.session_state.text analyze_results = st.session_state.analyze_results tokens = [] starts=[] # sort by start index results = sorted(analyze_results, key=lambda x: x.start) for i, res in enumerate(results): # if we already have an entity for this token don't add another if res.start not in starts: if i == 0: tokens.append(text[:res.start]) # append entity text and entity type tokens.append((text[res.start: res.end], res.entity_type)) # if another entity coming i.e. we're not at the last results element, add text up to next entity if i != len(results) - 1: tokens.append(text[res.end:results[i+1].start]) # if no more entities coming, add all remaining text else: tokens.append(text[res.end:]) # append this token to the list so we don't repeat results per token starts.append(res.start) return tokens def get_supported_entities(): """Return supported entities from the Analyzer Engine.""" return analyzer_engine().get_supported_entities() def analyze_text(): if not st.session_state.text: st.session_state.text_error = "Please enter your text" return toxicity_results = Detoxify('original').predict(st.session_state.text) is_toxic=False for k in toxicity_results.keys(): for k in toxicity_results.keys(): if k!='toxicity': if toxicity_results[k]>0.5: is_toxic=True else: if toxicity_results[k]>0.65: is_toxic=True if is_toxic: st.session_state.text_error = "Your text entry was detected as toxic, please re-write it." return else: with text_spinner_placeholder: with st.spinner("Please wait while your text is being analysed..."): logging.info(f"This is the text being analysed: {st.session_state.text}") st.session_state.text_error = "" st.session_state.n_requests += 1 analyze( text=st.session_state.text, entities=st_entities, language="en", return_decision_process=False, ) if st.session_state.excluded_words: exclude_manual_input() if st.session_state.allowed_words: allow_manual_input() logging.info( f"analyse results: {st.session_state.analyze_results}\n" ) def exclude_manual_input(): deny_list = [i.strip() for i in st.session_state.excluded_words.split(',')] def _deny_list_to_regex(deny_list): """ Convert a list of words to a matching regex. To be analyzed by the analyze method as any other regex patterns. :param deny_list: the list of words to detect :return:the regex of the words for detection """ # Escape deny list elements as preparation for regex escaped_deny_list = [re.escape(element) for element in deny_list] regex = r"(?:^|(?<=\W))(" + "|".join(escaped_deny_list) + r")(?:(?=\W)|$)" return regex deny_list_pattern = _deny_list_to_regex(deny_list) matches = re.finditer(deny_list_pattern, st.session_state.text) results = [] for match in matches: start, end = match.span() current_match = st.session_state.text[start:end] # Skip empty results if current_match == "": continue pattern_result = RecognizerResult( entity_type='MANUALLY ADDED', start=start, end=end, score=1.0, ) # check if already in detected strings found=False for token in st.session_state.analyze_results: if token.start==start and token.end==end: found=True if found==False: results.append(pattern_result) results = EntityRecognizer.remove_duplicates(results) st.session_state.analyze_results.extend(results) logging.info( f"analyse results after adding excluded words: {st.session_state.analyze_results}\n" ) def allow_manual_input(): analyze_results_fltered=[] for token in st.session_state.analyze_results: if st.session_state.text[token.start:token.end] not in st.session_state.allowed_words: analyze_results_fltered.append(token) logging.info( f"analyse results after removing allowed words: {analyze_results_fltered}\n" ) st.session_state.analyze_results = analyze_results_fltered # @st.cache_resource(show_spinner="Fetching model from cache...") @st.cache(allow_output_mutation=True) def anonymizer_engine(): """Return AnonymizerEngine.""" return AnonymizerEngine() def anonymise_text(): if st.session_state.n_requests >= 50: st.session_state.text_error = "Too many requests. Please wait a few seconds before anonymising more text." logging.info(f"Session request limit reached: {st.session_state.n_requests}") st.session_state.n_requests = 1 st.session_state.text_error = "" if not st.session_state.text: st.session_state.text_error = "Please enter your text" return if not st.session_state.analyze_results: analyze_text() with text_spinner_placeholder: with st.spinner("Please wait while your text is being anonymised..."): anon_results = anonymizer_engine().anonymize(st.session_state.text, st.session_state.analyze_results) st.session_state.text_error = "" st.session_state.n_requests += 1 st.session_state.anon_results = anon_results logging.info( f"text anonymised: {st.session_state.anon_results}" ) def clear_results(): st.session_state.anon_results="" st.session_state.analyze_results="" ####################################### #### Initialize "global" variables #### ####################################### if "text_error" not in st.session_state: st.session_state.text_error = "" if "analyze_results" not in st.session_state: st.session_state.analyze_results = "" if "anon_results" not in st.session_state: st.session_state.anon_results = "" if "n_requests" not in st.session_state: st.session_state.n_requests = 0 ############################## ####### Page arguments ####### ############################## # Every widget with a key is automatically added to Session State as a global variable. # In Streamlit, interacting with a widget triggers a rerun and variables defined # in the code get reinitialized after each rerun. # If a callback function is associated with a widget then a change in the widget # triggers the following sequence: First the callback function is executed and then # the app executes from top to bottom. st.text_input( label="Text", placeholder="Write your text here", key='text', on_change=clear_results ) st.text_input( label="Data to be redacted (optional)", placeholder="John, Mary, London", key='excluded_words', on_change=clear_results ) st.text_input( label="Data to be ignored (optional)", placeholder="NHS, GEL, Lab", key='allowed_words', on_change=clear_results ) st_entities = st.sidebar.multiselect( label="Which entities to look for?", options=get_supported_entities(), default=list(get_supported_entities()), ) ############################## ######## Page buttons ######## ############################## # button return true when clicked col1, col2 = st.columns(2) analyze_now=False with col1: analyze_now = st.button( label="Analyse text", type="primary", on_click=analyze_text, ) anonymise_now=False with col2: anonymise_now = st.button( label="Anonymise text", type="primary", on_click=anonymise_text, ) ############################## ######## Page actions ######## ############################## text_spinner_placeholder = st.empty() if st.session_state.text_error: st.error(st.session_state.text_error) with col1: if st.session_state.analyze_results: annotated_tokens=annotate() annotated_text(*annotated_tokens) st.write(st.session_state.analyze_results) if not st.session_state.analyze_results and analyze_now and not st.session_state.text_error: st.write("### No PII was found. ###") with col2: if st.session_state.anon_results: st.write(st.session_state.anon_results.text) if not st.session_state.analyze_results and anonymise_now and not st.session_state.text_error: st.write("### No PII was found. ###")