langdonholmes commited on
Commit
3ad7899
1 Parent(s): e37bcc3

refactor anonymizer with inheritance

Browse files
Files changed (3) hide show
  1. anonymizer.py +115 -56
  2. app.py +4 -5
  3. names_database.py +21 -28
anonymizer.py CHANGED
@@ -1,67 +1,111 @@
1
- from typing import List
 
 
2
 
 
3
  from presidio_analyzer import RecognizerResult
4
  from presidio_anonymizer import AnonymizerEngine
5
  from presidio_anonymizer.entities import OperatorConfig
 
6
 
7
  from names_database import NameDatabase
8
 
9
- names_db = NameDatabase()
10
 
11
- def split_name(original_name: str):
12
- '''Splits name into parts.
13
- If one token, assume it is a first name.
14
- If two tokens, first and last name.
15
- If three tokens, one first name and two last names.
16
- If four tokens, two first names and two last names.'''
17
- names = original_name.split()
18
- if len(names) == 1:
19
- return names[0], None
20
- elif len(names) == 2:
21
- return names[0], names[1]
22
- elif len(names) == 3:
23
- return names[0], ' '.join(names[1:])
24
- elif len(names) == 4:
25
- return ' '.join(names[:2]), ' '.join(names[2:])
26
- else:
27
- return None, None
28
 
29
- def generate_surrogate(original_name: str):
30
- '''Generate a surrogate name.
31
- '''
32
- first_names, last_names = split_name(original_name)
33
- gender = names_db.get_gender(first_names) if first_names else None
34
- country = names_db.get_country(last_names) if last_names else None
35
-
36
- surrogate_name = ''
37
-
38
- name_candidates = names_db.get_random_name(
39
- gender=gender,
40
- country=country)
41
-
42
- surrogate_name += name_candidates.iloc[0]['first']
43
-
44
- if last_names:
45
- surrogate_name += ' ' + name_candidates.iloc[1]['last']
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
 
47
- return surrogate_name
 
 
 
 
 
 
 
 
 
 
48
 
49
- def anonymize(
50
- anonymizer: AnonymizerEngine,
51
- text: str,
52
- analyze_results: List[RecognizerResult]
53
- ):
54
- '''Anonymize identified input using Presidio Anonymizer.'''
55
-
56
- if not text:
57
- return
58
-
59
- res = anonymizer.anonymize(
60
- text,
61
- analyze_results,
62
- operators={
 
 
63
  'STUDENT': OperatorConfig('custom',
64
- {'lambda': generate_surrogate}),
65
  'EMAIL_ADDRESS': OperatorConfig('replace',
66
  {'new_value': '[email protected]'}),
67
  'PHONE_NUMBER': OperatorConfig('replace',
@@ -69,9 +113,24 @@ def anonymize(
69
  'URL': OperatorConfig('replace',
70
  {'new_value': 'aol.com'}),
71
  }
72
- )
73
-
74
- return res.text
 
 
 
 
 
75
 
76
  if __name__ == '__main__':
77
- print(generate_surrogate('Nora Wang'))
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from pathlib import Path
3
+ from typing import List, Optional
4
 
5
+ import pandas as pd
6
  from presidio_analyzer import RecognizerResult
7
  from presidio_anonymizer import AnonymizerEngine
8
  from presidio_anonymizer.entities import OperatorConfig
9
+ from presidio_anonymizer.operators import OperatorType
10
 
11
  from names_database import NameDatabase
12
 
13
+ name_table = Path('data', 'ascii_names.parquet')
14
 
15
+ logger = logging.getLogger('anonymizer')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
+
18
+ class surrogate_anonymizer(AnonymizerEngine):
19
+ def __init__(self):
20
+ super().__init__()
21
+ self.names_db = NameDatabase()
22
+ self.names_df = pd.read_parquet(name_table)
23
+
24
+
25
+ def get_random_name(
26
+ self,
27
+ country: Optional[str] = None,
28
+ gender: Optional[str] = None
29
+ ) -> pd.DataFrame:
30
+ '''Returns two random names from the database as a DataFrame.
31
+ Both rows match gender and country, if provided.
32
+ :country: ISO country code e.g. "CO" for Columbia
33
+ :gender: 'M' or 'F'
34
+ returns two rows of the names dataframe
35
+ '''
36
+ names_view = self.names_df
37
+ if country:
38
+ names_view = names_view[names_view['country'] == country]
39
+ if gender:
40
+ names_view = names_view[names_view['gender'] == gender]
41
+ if names_view.size < 25:
42
+ return self.names_df.sample(n=2, weights=self.names_df['count'])
43
+ return names_view.sample(n=2, weights=names_view['count'])
44
+
45
+ def split_name(self, original_name: str):
46
+ '''Splits name into parts.
47
+ If one token, assume it is a first name.
48
+ If two tokens, first and last name.
49
+ If three tokens, one first name and two last names.
50
+ If four tokens, two first names and two last names.'''
51
+ names = original_name.split()
52
+ if len(names) == 1:
53
+ logger.info(f'Splitting to 1 first name: {names}')
54
+ return names[0], None
55
+ elif len(names) == 2:
56
+ logger.info(f'Splitting to 1 first name, 1 last name: {names}')
57
+ return names[0], names[1]
58
+ elif len(names) == 3:
59
+ logger.info(f'Splitting to 1 first name, 2 last names: {names}')
60
+ return names[0], ' '.join(names[1:])
61
+ elif len(names) == 4:
62
+ logger.info(f'Splitting to 2 first names and 2 last names: {names}')
63
+ return ' '.join(names[:2]), ' '.join(names[2:])
64
+ else:
65
+ logger.info(f'Splitting failed, do not match gender/country: {names}')
66
+ return None, None
67
+
68
+ def generate_surrogate(self, original_name: str):
69
+ '''Generate a surrogate name.
70
+ '''
71
+ first_names, last_names = self.split_name(original_name)
72
+ gender = self.names_db.get_gender(first_names) if first_names else None
73
+ logger.debug(f'Gender set to {gender}')
74
+ country = self.names_db.get_country(last_names) if last_names else None
75
+ logger.debug(f'Country set to {country}')
76
+
77
+ surrogate_name = ''
78
 
79
+ name_candidates = self.get_random_name(gender=gender, country=country)
80
+
81
+ surrogate_name += name_candidates.iloc[0]['first']
82
+ logger.info(f'First name surrogate is {surrogate_name}')
83
+
84
+ if last_names:
85
+ logger.info(f'Combining with {name_candidates.iloc[1]["last"]}')
86
+ surrogate_name += ' ' + name_candidates.iloc[1]['last']
87
+
88
+ logger.info(f'Returning surrogate name {surrogate_name}')
89
+ return surrogate_name
90
 
91
+ def anonymize(
92
+ self,
93
+ text: str,
94
+ analyzer_results: List[RecognizerResult]
95
+ ):
96
+ '''Anonymize identified input using Presidio Anonymizer.'''
97
+
98
+ if not text:
99
+ return
100
+
101
+ analyzer_results = self._remove_conflicts_and_get_text_manipulation_data(
102
+ analyzer_results
103
+ )
104
+
105
+ operators = self._AnonymizerEngine__check_or_add_default_operator(
106
+ {
107
  'STUDENT': OperatorConfig('custom',
108
+ {'lambda': self.generate_surrogate}),
109
  'EMAIL_ADDRESS': OperatorConfig('replace',
110
  {'new_value': '[email protected]'}),
111
  'PHONE_NUMBER': OperatorConfig('replace',
 
113
  'URL': OperatorConfig('replace',
114
  {'new_value': 'aol.com'}),
115
  }
116
+ )
117
+
118
+ res = self._operate(text,
119
+ analyzer_results,
120
+ operators,
121
+ OperatorType.Anonymize)
122
+
123
+ return res.text
124
 
125
  if __name__ == '__main__':
126
+ logging.basicConfig(level=logging.DEBUG)
127
+ anonymizer = surrogate_anonymizer()
128
+ test_names = ['Nora Wang',
129
+ 'MJ',
130
+ '',
131
+ '(',
132
+ 'Mario Escobar Sanchez',
133
+ 'Jane Fonda Michelle Rousseau',
134
+ 'Sir Phillipe Ricardo de la Sota Mayor']
135
+ for name in test_names:
136
+ anonymizer.generate_surrogate(name)
app.py CHANGED
@@ -2,7 +2,7 @@
2
  '''Streamlit app for Student Name Detection models.'''
3
 
4
  from analyzer import prepare_analyzer
5
- from anonymizer import anonymize
6
  from presidio_anonymizer import AnonymizerEngine
7
  import pandas as pd
8
  from annotated_text import annotated_text
@@ -31,8 +31,8 @@ def analyzer_engine():
31
 
32
  @st.cache(allow_output_mutation=True)
33
  def anonymizer_engine():
34
- '''Return AnonymizerEngine.'''
35
- return AnonymizerEngine()
36
 
37
  def annotate(text, st_analyze_results, st_entities):
38
  tokens = []
@@ -116,10 +116,9 @@ with st.spinner('Analyzing...'):
116
  st.text('')
117
 
118
  st.subheader('Anonymized')
119
-
120
  with st.spinner('Anonymizing...'):
121
  if button or st.session_state.first_load:
122
- st_anonymize_results = anonymize(anonymizer_engine(),
123
  st_text,
124
  st_analyze_results)
125
  st_anonymize_results
 
2
  '''Streamlit app for Student Name Detection models.'''
3
 
4
  from analyzer import prepare_analyzer
5
+ from anonymizer import surrogate_anonymizer
6
  from presidio_anonymizer import AnonymizerEngine
7
  import pandas as pd
8
  from annotated_text import annotated_text
 
31
 
32
  @st.cache(allow_output_mutation=True)
33
  def anonymizer_engine():
34
+ '''Return generate surrogate anonymizer.'''
35
+ return surrogate_anonymizer()
36
 
37
  def annotate(text, st_analyze_results, st_entities):
38
  tokens = []
 
116
  st.text('')
117
 
118
  st.subheader('Anonymized')
 
119
  with st.spinner('Anonymizing...'):
120
  if button or st.session_state.first_load:
121
+ st_anonymize_results = anonymizer_engine().anonymize(
122
  st_text,
123
  st_analyze_results)
124
  st_anonymize_results
names_database.py CHANGED
@@ -1,42 +1,35 @@
1
- from pathlib import Path
2
- from typing import Optional
3
 
4
- import pandas as pd
5
  from names_dataset import NameDataset, NameWrapper
6
 
7
- name_table = Path('data', 'ascii_names.parquet')
8
 
9
  class NameDatabase(NameDataset):
10
  def __init__(self) -> None:
11
  super().__init__()
12
- self.names = pd.read_parquet(name_table)
13
-
14
- def get_random_name(
15
- self,
16
- country: Optional[str] = None,
17
- gender: Optional[str] = None
18
- ):
19
- '''country: ISO country code in 'alpha 2' format
20
- gender: 'M' or 'F'
21
- returns two rows of the names dataframe
22
- '''
23
- names_view = self.names
24
- if country:
25
- names_view = names_view[names_view['country'] == country]
26
- if gender:
27
- names_view = names_view[names_view['gender'] == gender]
28
- if names_view.size < 25:
29
- return self.names.sample(n=2, weights=self.names['count'])
30
- return names_view.sample(n=2, weights=names_view['count'])
31
 
32
- def search(self, name: str):
 
 
 
 
33
  key = name.strip().title()
34
  fn = self.first_names.get(key) if self.first_names is not None else None
35
  ln = self.last_names.get(key) if self.last_names is not None else None
36
  return {'first_name': fn, 'last_name': ln}
37
 
38
- def get_gender(self, first_names: str):
39
- return NameWrapper(self.search(first_names)).gender
 
 
 
 
40
 
41
- def get_country(self, last_names: str):
42
- return NameWrapper(self.search(last_names)).country
 
 
 
 
 
1
+ import logging
2
+
3
 
 
4
  from names_dataset import NameDataset, NameWrapper
5
 
 
6
 
7
  class NameDatabase(NameDataset):
8
  def __init__(self) -> None:
9
  super().__init__()
10
+
11
+ self.logger = logging.getLogger('anonymizer')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
 
13
+ def search(self, name: str) -> dict:
14
+ '''Returns all entries associated with a name string.
15
+ The name string can be multiple tokens.
16
+ Both first and last names will be matched.
17
+ '''
18
  key = name.strip().title()
19
  fn = self.first_names.get(key) if self.first_names is not None else None
20
  ln = self.last_names.get(key) if self.last_names is not None else None
21
  return {'first_name': fn, 'last_name': ln}
22
 
23
+ def get_gender(self, first_names: str) -> str:
24
+ '''Return the most frequent gender code for a specific last name,
25
+ or None if a match cannot be found.
26
+ '''
27
+ gender = NameWrapper(self.search(first_names)).gender
28
+ return gender if gender else None
29
 
30
+ def get_country(self, last_names: str) -> str:
31
+ '''Return the most frequent country code for a specific last name,
32
+ or None if a match cannot be found.
33
+ '''
34
+ country = NameWrapper(self.search(last_names)).country
35
+ return country if country else None