File size: 5,118 Bytes
3ad7899
 
 
79de1f8
3ad7899
5c59636
b97a311
 
3ad7899
b97a311
5c59636
 
3ad7899
b97a311
3ad7899
5c59636
3ad7899
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5c59636
3ad7899
 
 
 
 
 
 
 
 
 
 
5806da1
3ad7899
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5c59636
3ad7899
5c59636
 
 
 
 
 
287a33f
3ad7899
 
 
 
 
 
 
 
5c59636
 
3ad7899
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import logging
from pathlib import Path
from typing import List, Optional

import pandas as pd
from presidio_analyzer import RecognizerResult
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from presidio_anonymizer.operators import OperatorType

from names_database import NameDatabase

name_table = Path('data', 'ascii_names.parquet')

logger = logging.getLogger('anonymizer')


class surrogate_anonymizer(AnonymizerEngine):
    def __init__(self):
        super().__init__()
        self.names_db = NameDatabase()
        self.names_df = pd.read_parquet(name_table)
        
        
    def get_random_name(
            self,
            country: Optional[str] = None,
            gender: Optional[str] = None
    ) -> pd.DataFrame:
        '''Returns two random names from the database as a DataFrame.
        Both rows match gender and country, if provided.
        :country: ISO country code e.g. "CO" for Columbia
        :gender: 'M' or 'F'
        returns two rows of the names dataframe
        '''
        names_view = self.names_df
        if country:
            names_view = names_view[names_view['country'] == country]
        if gender:
            names_view = names_view[names_view['gender'] == gender]
        if names_view.size < 25:
            return self.names_df.sample(n=2, weights=self.names_df['count'])
        return names_view.sample(n=2, weights=names_view['count'])

    def split_name(self, original_name: str):
        '''Splits name into parts.
        If one token, assume it is a first name.
        If two tokens, first and last name.
        If three tokens, one first name and two last names.
        If four tokens, two first names and two last names.'''
        names = original_name.split()
        if len(names) == 1:
            logger.info(f'Splitting to 1 first name: {names}')
            return names[0], None
        elif len(names) == 2:
            logger.info(f'Splitting to 1 first name, 1 last name: {names}')
            return names[0], names[1]
        elif len(names) == 3:
            logger.info(f'Splitting to 1 first name, 2 last names: {names}')
            return names[0], ' '.join(names[1:])
        elif len(names) == 4:
            logger.info(f'Splitting to 2 first names and 2 last names: {names}')
            return ' '.join(names[:2]), ' '.join(names[2:])
        else:
            logger.info(f'Splitting failed, do not match gender/country: {names}')
            return None, None

    def generate_surrogate(self, original_name: str):
        '''Generate a surrogate name.
        '''
        first_names, last_names = self.split_name(original_name)
        gender = self.names_db.get_gender(first_names) if first_names else None
        logger.debug(f'Gender set to {gender}')
        country = self.names_db.get_country(last_names) if last_names else None
        logger.debug(f'Country set to {country}')
        
        surrogate_name = ''
        
        name_candidates = self.get_random_name(gender=gender, country=country)
        
        surrogate_name += name_candidates.iloc[0]['first']
        logger.info(f'First name surrogate is {surrogate_name}')
        
        if last_names:
            logger.info(f'Combining with {name_candidates.iloc[1]["last"]}')
            surrogate_name += ' ' + name_candidates.iloc[1]['last']
            
        logger.info(f'Returning surrogate name {surrogate_name}')
        return surrogate_name

    def anonymize(
        self,
        text: str,
        analyzer_results: List[RecognizerResult]
        ):
        '''Anonymize identified input using Presidio Anonymizer.'''
        
        if not text:
            return
        
        analyzer_results = self._remove_conflicts_and_get_text_manipulation_data(
            analyzer_results
        )
        
        operators = self._AnonymizerEngine__check_or_add_default_operator(
            {
            'STUDENT': OperatorConfig('custom',
                                      {'lambda': self.generate_surrogate}),
            'EMAIL_ADDRESS': OperatorConfig('replace',
                                            {'new_value': '[email protected]'}),
            'PHONE_NUMBER': OperatorConfig('replace',
                                           {'new_value': '888-888-8888'}),
            'URL': OperatorConfig('replace',
                                  {'new_value': 'aol.com'}),
            }
        )
        
        res = self._operate(text,
                            analyzer_results,
                            operators,
                            OperatorType.Anonymize)
                
        return res.text

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    anonymizer = surrogate_anonymizer()
    test_names = ['Nora Wang',
                  'MJ',
                  '',
                  '(',
                  'Mario Escobar Sanchez',
                  'Jane Fonda Michelle Rousseau',
                  'Sir Phillipe Ricardo de la Sota Mayor']
    for name in test_names:
        anonymizer.generate_surrogate(name)