File size: 5,393 Bytes
7dda936
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import logging
from pathlib import Path
from typing import List, Optional, Tuple

import pandas as pd
from presidio_analyzer import RecognizerResult
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from presidio_anonymizer.operators import OperatorType

from names_database import NameDatabase

name_table = Path('data', 'ascii_names.parquet')

logger = logging.getLogger('anonymizer')

class surrogate_anonymizer(AnonymizerEngine):
    def __init__(self):
        super().__init__()
        self.names_db = NameDatabase()
        self.names_df = pd.read_parquet(name_table)
        
    def get_random_name(
            self,
            country: Optional[str] = None,
            gender: Optional[str] = None
    ) -> pd.DataFrame:
        '''Returns two random names from the database as a DataFrame.
        Both rows match gender and country, if provided.
        :country: ISO country code e.g. "CO" for Columbia
        :gender: 'M' or 'F'
        returns two rows of the names dataframe
        '''
        names_view = self.names_df
        if country:
            names_view = names_view[names_view['country'] == country]
        if gender:
            names_view = names_view[names_view['gender'] == gender]
        if names_view.size < 25:
            return self.names_df.sample(n=2, weights=self.names_df['count'])
        return names_view.sample(n=2, weights=names_view['count'])

    def split_name(self, original_name: str) -> Tuple[str]:
        '''Splits name into parts.
        If one token, assume it is a first name.
        If two tokens, first and last name.
        If three tokens, one first name and two last names.
        If four tokens, two first names and two last names.'''
        names = original_name.split()
        if len(names) == 1:
            logger.info(f'Splitting to 1 first name: {names}')
            return names[0], None
        elif len(names) == 2:
            logger.info(f'Splitting to 1 first name, 1 last name: {names}')
            return names[0], names[1]
        elif len(names) == 3:
            logger.info(f'Splitting to 1 first name, 2 last names: {names}')
            return names[0], ' '.join(names[1:])
        elif len(names) == 4:
            logger.info(f'Splitting to 2 first names and 2 last names: {names}')
            return ' '.join(names[:2]), ' '.join(names[2:])
        else:
            logger.info(f'Splitting failed, do not match gender/country: {names}')
            return None, None

    def generate_surrogate(self, original_name: str) -> str:
        '''Generate a surrogate name.
        '''
        if original_name == 'PII':
            # Every time we call this function, Presidio will validate it
            # by testing that the function returns a str when the input is
            # 'PII'. Bypass this test.
            return 'PII'
        
        first_names, last_names = self.split_name(original_name)
        gender = self.names_db.get_gender(first_names) if first_names else None
        logger.debug(f'Gender set to {gender}')
        country = self.names_db.get_country(last_names) if last_names else None
        logger.debug(f'Country set to {country}')
        
        surrogate_name = ''
        
        name_candidates = self.get_random_name(gender=gender, country=country)
        
        surrogate_name += name_candidates.iloc[0]['first']
        logger.info(f'First name surrogate is {surrogate_name}')
        
        if last_names:
            logger.info(f'Combining with {name_candidates.iloc[1]["last"]}')
            surrogate_name += ' ' + name_candidates.iloc[1]['last']
            
        logger.info(f'Returning surrogate name {surrogate_name}')
        return surrogate_name

    def anonymize(
        self,
        text: str,
        analyzer_results: List[RecognizerResult]
        ):
        '''Anonymize identified input using Presidio Anonymizer.'''
        
        if not text:
            return

        analyzer_results = self._remove_conflicts_and_get_text_manipulation_data(
            analyzer_results
        )
                
        operators = self._AnonymizerEngine__check_or_add_default_operator(
            {
            'STUDENT': OperatorConfig('custom',
                                      {'lambda': self.generate_surrogate}),
            'EMAIL_ADDRESS': OperatorConfig('replace',
                                            {'new_value': '[email protected]'}),
            'PHONE_NUMBER': OperatorConfig('replace',
                                           {'new_value': '888-888-8888'}),
            'URL': OperatorConfig('replace',
                                  {'new_value': 'aol.com'}),
            }
        )
        
        res = self._operate(text,
                            analyzer_results,
                            operators,
                            OperatorType.Anonymize)
                
        return res.text

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    anonymizer = surrogate_anonymizer()
    test_names = ['Nora Wang',
                  'MJ',
                  '',
                  '(',
                  'Mario Escobar Sanchez',
                  'Jane Fonda Michelle Rousseau',
                  'Sir Phillipe Ricardo de la Sota Mayor']
    for name in test_names:
        anonymizer.generate_surrogate(name)