os1187 beki commited on
Commit
98d4ef4
0 Parent(s):

Duplicate from beki/pii-anonymizer

Browse files

Co-authored-by: Benjamin Kilimnik <[email protected]>

Files changed (6) hide show
  1. .gitattributes +31 -0
  2. README.md +14 -0
  3. app.py +212 -0
  4. flair_recognizer.py +245 -0
  5. requirements.txt +8 -0
  6. spacy_recognizer.py +131 -0
.gitattributes ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ *.7z filter=lfs diff=lfs merge=lfs -text
2
+ *.arrow filter=lfs diff=lfs merge=lfs -text
3
+ *.bin filter=lfs diff=lfs merge=lfs -text
4
+ *.bz2 filter=lfs diff=lfs merge=lfs -text
5
+ *.ftz filter=lfs diff=lfs merge=lfs -text
6
+ *.gz filter=lfs diff=lfs merge=lfs -text
7
+ *.h5 filter=lfs diff=lfs merge=lfs -text
8
+ *.joblib filter=lfs diff=lfs merge=lfs -text
9
+ *.lfs.* filter=lfs diff=lfs merge=lfs -text
10
+ *.model filter=lfs diff=lfs merge=lfs -text
11
+ *.msgpack filter=lfs diff=lfs merge=lfs -text
12
+ *.npy filter=lfs diff=lfs merge=lfs -text
13
+ *.npz filter=lfs diff=lfs merge=lfs -text
14
+ *.onnx filter=lfs diff=lfs merge=lfs -text
15
+ *.ot filter=lfs diff=lfs merge=lfs -text
16
+ *.parquet filter=lfs diff=lfs merge=lfs -text
17
+ *.pickle filter=lfs diff=lfs merge=lfs -text
18
+ *.pkl filter=lfs diff=lfs merge=lfs -text
19
+ *.pb filter=lfs diff=lfs merge=lfs -text
20
+ *.pt filter=lfs diff=lfs merge=lfs -text
21
+ *.pth filter=lfs diff=lfs merge=lfs -text
22
+ *.rar filter=lfs diff=lfs merge=lfs -text
23
+ saved_model/**/* filter=lfs diff=lfs merge=lfs -text
24
+ *.tar.* filter=lfs diff=lfs merge=lfs -text
25
+ *.tflite filter=lfs diff=lfs merge=lfs -text
26
+ *.tgz filter=lfs diff=lfs merge=lfs -text
27
+ *.wasm filter=lfs diff=lfs merge=lfs -text
28
+ *.xz filter=lfs diff=lfs merge=lfs -text
29
+ *.zip filter=lfs diff=lfs merge=lfs -text
30
+ *.zst filter=lfs diff=lfs merge=lfs -text
31
+ *tfevents* filter=lfs diff=lfs merge=lfs -text
README.md ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ---
2
+ title: Presidio with custom PII models trained on PII data generated by Privy
3
+ emoji: 📊
4
+ colorFrom: purple
5
+ colorTo: pink
6
+ sdk: streamlit
7
+ sdk_version: 1.10.0
8
+ app_file: app.py
9
+ pinned: false
10
+ license: mit
11
+ duplicated_from: beki/pii-anonymizer
12
+ ---
13
+
14
+ Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
app.py ADDED
@@ -0,0 +1,212 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ """Streamlit app for Presidio + Privy-trained PII models."""
3
+
4
+ import spacy
5
+ from spacy_recognizer import CustomSpacyRecognizer
6
+ from presidio_analyzer.nlp_engine import NlpEngineProvider
7
+ from presidio_anonymizer import AnonymizerEngine
8
+ from presidio_analyzer import AnalyzerEngine, RecognizerRegistry
9
+ import pandas as pd
10
+ from annotated_text import annotated_text
11
+ from json import JSONEncoder
12
+ import json
13
+ import warnings
14
+ import streamlit as st
15
+ import os
16
+ os.environ["TOKENIZERS_PARALLELISM"] = "false"
17
+ warnings.filterwarnings('ignore')
18
+ # from flair_recognizer import FlairRecognizer
19
+
20
+ # Helper methods
21
+ @st.cache(allow_output_mutation=True)
22
+ def analyzer_engine():
23
+ """Return AnalyzerEngine."""
24
+
25
+ spacy_recognizer = CustomSpacyRecognizer()
26
+
27
+ configuration = {
28
+ "nlp_engine_name": "spacy",
29
+ "models": [
30
+ {"lang_code": "en", "model_name": "en_spacy_pii_distilbert"}],
31
+ }
32
+
33
+ # Create NLP engine based on configuration
34
+ provider = NlpEngineProvider(nlp_configuration=configuration)
35
+ nlp_engine = provider.create_engine()
36
+
37
+ registry = RecognizerRegistry()
38
+ # add rule-based recognizers
39
+ registry.load_predefined_recognizers(nlp_engine=nlp_engine)
40
+ registry.add_recognizer(spacy_recognizer)
41
+ # remove the nlp engine we passed, to use custom label mappings
42
+ registry.remove_recognizer("SpacyRecognizer")
43
+
44
+ analyzer = AnalyzerEngine(nlp_engine=nlp_engine,
45
+ registry=registry, supported_languages=["en"])
46
+
47
+ # uncomment for flair-based NLP recognizer
48
+ # flair_recognizer = FlairRecognizer()
49
+ # registry.load_predefined_recognizers()
50
+ # registry.add_recognizer(flair_recognizer)
51
+ # analyzer = AnalyzerEngine(registry=registry, supported_languages=["en"])
52
+ return analyzer
53
+
54
+
55
+ @st.cache(allow_output_mutation=True)
56
+ def anonymizer_engine():
57
+ """Return AnonymizerEngine."""
58
+ return AnonymizerEngine()
59
+
60
+
61
+ def get_supported_entities():
62
+ """Return supported entities from the Analyzer Engine."""
63
+ return analyzer_engine().get_supported_entities()
64
+
65
+
66
+ def analyze(**kwargs):
67
+ """Analyze input using Analyzer engine and input arguments (kwargs)."""
68
+ if "entities" not in kwargs or "All" in kwargs["entities"]:
69
+ kwargs["entities"] = None
70
+ return analyzer_engine().analyze(**kwargs)
71
+
72
+
73
+ def anonymize(text, analyze_results):
74
+ """Anonymize identified input using Presidio Abonymizer."""
75
+ if not text:
76
+ return
77
+ res = anonymizer_engine().anonymize(text, analyze_results)
78
+ return res.text
79
+
80
+
81
+ def annotate(text, st_analyze_results, st_entities):
82
+ tokens = []
83
+ # sort by start index
84
+ results = sorted(st_analyze_results, key=lambda x: x.start)
85
+ for i, res in enumerate(results):
86
+ if i == 0:
87
+ tokens.append(text[:res.start])
88
+
89
+ # append entity text and entity type
90
+ tokens.append((text[res.start: res.end], res.entity_type))
91
+
92
+ # if another entity coming i.e. we're not at the last results element, add text up to next entity
93
+ if i != len(results) - 1:
94
+ tokens.append(text[res.end:results[i+1].start])
95
+ # if no more entities coming, add all remaining text
96
+ else:
97
+ tokens.append(text[res.end:])
98
+ return tokens
99
+
100
+
101
+ st.set_page_config(page_title="Privy + Presidio demo (English)", layout="wide")
102
+
103
+ # Side bar
104
+ st.sidebar.markdown(
105
+ """
106
+ Detect and anonymize PII in text using an [NLP model](https://huggingface.co/beki/en_spacy_pii_distilbert) trained on protocol traces (JSON, SQL, XML etc.) generated by
107
+ [Privy](https://github.com/pixie-io/pixie/tree/main/src/datagen/pii/privy) and rule-based classifiers from [Presidio](https://aka.ms/presidio).
108
+ """
109
+ )
110
+
111
+ st_entities = st.sidebar.multiselect(
112
+ label="Which entities to look for?",
113
+ options=get_supported_entities(),
114
+ default=list(get_supported_entities()),
115
+ )
116
+
117
+ st_threshold = st.sidebar.slider(
118
+ label="Acceptance threshold", min_value=0.0, max_value=1.0, value=0.35
119
+ )
120
+
121
+ st_return_decision_process = st.sidebar.checkbox(
122
+ "Add analysis explanations in json")
123
+
124
+ st.sidebar.info(
125
+ "Privy is an open source framework for synthetic data generation in protocol trace formats (json, sql, html etc). Presidio is an open source framework for PII detection and anonymization. "
126
+ "For more info visit [privy](https://github.com/pixie-io/pixie/tree/main/src/datagen/pii/privy) and [aka.ms/presidio](https://aka.ms/presidio)"
127
+ )
128
+
129
+
130
+ # Main panel
131
+ analyzer_load_state = st.info(
132
+ "Starting Presidio analyzer and loading Privy-trained PII model...")
133
+ engine = analyzer_engine()
134
+ analyzer_load_state.empty()
135
+
136
+
137
+ st_text = st.text_area(
138
+ label="Type in some text",
139
+ value="SELECT shipping FROM users WHERE shipping = '201 Thayer St Providence RI 02912'"
140
+ "\n\n"
141
+ "{user: Willie Porter, ip: 192.168.2.80, email: [email protected]}",
142
+ height=200,
143
+ )
144
+
145
+ button = st.button("Detect PII")
146
+
147
+ if 'first_load' not in st.session_state:
148
+ st.session_state['first_load'] = True
149
+
150
+ # After
151
+ st.subheader("Analyzed")
152
+ with st.spinner("Analyzing..."):
153
+ if button or st.session_state.first_load:
154
+ st_analyze_results = analyze(
155
+ text=st_text,
156
+ entities=st_entities,
157
+ language="en",
158
+ score_threshold=st_threshold,
159
+ return_decision_process=st_return_decision_process,
160
+ )
161
+ annotated_tokens = annotate(st_text, st_analyze_results, st_entities)
162
+ # annotated_tokens
163
+ annotated_text(*annotated_tokens)
164
+ # vertical space
165
+ st.text("")
166
+
167
+ st.subheader("Anonymized")
168
+
169
+ with st.spinner("Anonymizing..."):
170
+ if button or st.session_state.first_load:
171
+ st_anonymize_results = anonymize(st_text, st_analyze_results)
172
+ st_anonymize_results
173
+
174
+
175
+ # table result
176
+ st.subheader("Detailed Findings")
177
+ if st_analyze_results:
178
+ res_dicts = [r.to_dict() for r in st_analyze_results]
179
+ for d in res_dicts:
180
+ d['Value'] = st_text[d['start']:d['end']]
181
+ df = pd.DataFrame.from_records(res_dicts)
182
+ df = df[["entity_type", "Value", "score", "start", "end"]].rename(
183
+ {
184
+ "entity_type": "Entity type",
185
+ "start": "Start",
186
+ "end": "End",
187
+ "score": "Confidence",
188
+ },
189
+ axis=1,
190
+ )
191
+
192
+ st.dataframe(df, width=1000)
193
+ else:
194
+ st.text("No findings")
195
+
196
+ st.session_state['first_load'] = True
197
+
198
+ # json result
199
+
200
+
201
+ class ToDictListEncoder(JSONEncoder):
202
+ """Encode dict to json."""
203
+
204
+ def default(self, o):
205
+ """Encode to JSON using to_dict."""
206
+ if o:
207
+ return o.to_dict()
208
+ return []
209
+
210
+
211
+ if st_return_decision_process:
212
+ st.json(json.dumps(st_analyze_results, cls=ToDictListEncoder))
flair_recognizer.py ADDED
@@ -0,0 +1,245 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from typing import Optional, List, Tuple, Set
3
+
4
+ from presidio_analyzer import (
5
+ RecognizerResult,
6
+ EntityRecognizer,
7
+ AnalysisExplanation,
8
+ )
9
+ from presidio_analyzer.nlp_engine import NlpArtifacts
10
+
11
+ try:
12
+ from flair.data import Sentence
13
+ from flair.models import SequenceTagger
14
+ except ImportError:
15
+ print("Flair is not installed")
16
+
17
+
18
+ logger = logging.getLogger("presidio-analyzer")
19
+
20
+
21
+ class FlairRecognizer(EntityRecognizer):
22
+ """
23
+ Wrapper for a flair model, if needed to be used within Presidio Analyzer.
24
+ :example:
25
+ >from presidio_analyzer import AnalyzerEngine, RecognizerRegistry
26
+ >flair_recognizer = FlairRecognizer()
27
+ >registry = RecognizerRegistry()
28
+ >registry.add_recognizer(flair_recognizer)
29
+ >analyzer = AnalyzerEngine(registry=registry)
30
+ >results = analyzer.analyze(
31
+ > "My name is Christopher and I live in Irbid.",
32
+ > language="en",
33
+ > return_decision_process=True,
34
+ >)
35
+ >for result in results:
36
+ > print(result)
37
+ > print(result.analysis_explanation)
38
+ """
39
+
40
+ ENTITIES = [
41
+ "LOCATION",
42
+ "PERSON",
43
+ "NRP",
44
+ "GPE",
45
+ "ORGANIZATION",
46
+ "MAC_ADDRESS",
47
+ "US_BANK_NUMBER",
48
+ "IMEI",
49
+ "TITLE",
50
+ "LICENSE_PLATE",
51
+ "US_PASSPORT",
52
+ "CURRENCY",
53
+ "ROUTING_NUMBER",
54
+ "US_ITIN",
55
+ "US_BANK_NUMBER",
56
+ "US_DRIVER_LICENSE",
57
+ "AGE",
58
+ "PASSWORD",
59
+ "SWIFT_CODE",
60
+ ]
61
+
62
+ DEFAULT_EXPLANATION = "Identified as {} by Flair's Named Entity Recognition"
63
+
64
+ CHECK_LABEL_GROUPS = [
65
+ ({"LOCATION"}, {"LOC", "LOCATION", "STREET_ADDRESS", "COORDINATE"}),
66
+ ({"PERSON"}, {"PER", "PERSON"}),
67
+ ({"NRP"}, {"NORP", "NRP"}),
68
+ ({"GPE"}, {"GPE"}),
69
+ ({"ORGANIZATION"}, {"ORG"}),
70
+ ({"MAC_ADDRESS"}, {"MAC_ADDRESS"}),
71
+ ({"US_BANK_NUMBER"}, {"US_BANK_NUMBER"}),
72
+ ({"IMEI"}, {"IMEI"}),
73
+ ({"TITLE"}, {"TITLE"}),
74
+ ({"LICENSE_PLATE"}, {"LICENSE_PLATE"}),
75
+ ({"US_PASSPORT"}, {"US_PASSPORT"}),
76
+ ({"CURRENCY"}, {"CURRENCY"}),
77
+ ({"ROUTING_NUMBER"}, {"ROUTING_NUMBER"}),
78
+ ({"AGE"}, {"AGE"}),
79
+ ({"CURRENCY"}, {"CURRENCY"}),
80
+ ({"SWIFT_CODE"}, {"SWIFT_CODE"}),
81
+ ({"US_ITIN"}, {"US_ITIN"}),
82
+ ({"US_BANK_NUMBER"}, {"US_BANK_NUMBER"}),
83
+ ({"US_DRIVER_LICENSE"}, {"US_DRIVER_LICENSE"}),
84
+ ]
85
+
86
+ MODEL_LANGUAGES = {
87
+ "en":"beki/flair-pii-english-large",
88
+ # "en":"flair-trf.pt",
89
+ }
90
+
91
+ PRESIDIO_EQUIVALENCES = {
92
+ "PER": "PERSON",
93
+ "LOC": "LOCATION",
94
+ "ORG": "ORGANIZATION",
95
+ "NROP": "NRP",
96
+ "URL": "URL",
97
+ "US_ITIN": "US_ITIN",
98
+ "US_PASSPORT": "US_PASSPORT",
99
+ "IBAN_CODE": "IBAN_CODE",
100
+ "IP_ADDRESS": "IP_ADDRESS",
101
+ "EMAIL_ADDRESS": "EMAIL",
102
+ "US_DRIVER_LICENSE": "US_DRIVER_LICENSE",
103
+ "US_BANK_NUMBER": "US_BANK_NUMBER",
104
+ }
105
+
106
+ def __init__(
107
+ self,
108
+ supported_language: str = "en",
109
+ supported_entities: Optional[List[str]] = None,
110
+ check_label_groups: Optional[Tuple[Set, Set]] = None,
111
+ model: SequenceTagger = None,
112
+ ):
113
+ self.check_label_groups = (
114
+ check_label_groups if check_label_groups else self.CHECK_LABEL_GROUPS
115
+ )
116
+
117
+ supported_entities = supported_entities if supported_entities else self.ENTITIES
118
+ self.model = (
119
+ model
120
+ if model
121
+ else SequenceTagger.load(self.MODEL_LANGUAGES.get(supported_language))
122
+ )
123
+
124
+ super().__init__(
125
+ supported_entities=supported_entities,
126
+ supported_language=supported_language,
127
+ name="Flair Analytics",
128
+ )
129
+
130
+ def load(self) -> None:
131
+ """Load the model, not used. Model is loaded during initialization."""
132
+ pass
133
+
134
+ def get_supported_entities(self) -> List[str]:
135
+ """
136
+ Return supported entities by this model.
137
+ :return: List of the supported entities.
138
+ """
139
+ return self.supported_entities
140
+
141
+ # Class to use Flair with Presidio as an external recognizer.
142
+ def analyze(
143
+ self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts = None
144
+ ) -> List[RecognizerResult]:
145
+ """
146
+ Analyze text using Text Analytics.
147
+ :param text: The text for analysis.
148
+ :param entities: Not working properly for this recognizer.
149
+ :param nlp_artifacts: Not used by this recognizer.
150
+ :param language: Text language. Supported languages in MODEL_LANGUAGES
151
+ :return: The list of Presidio RecognizerResult constructed from the recognized
152
+ Flair detections.
153
+ """
154
+
155
+ results = []
156
+
157
+ sentences = Sentence(text)
158
+ self.model.predict(sentences)
159
+
160
+ # If there are no specific list of entities, we will look for all of it.
161
+ if not entities:
162
+ entities = self.supported_entities
163
+
164
+ for entity in entities:
165
+ if entity not in self.supported_entities:
166
+ continue
167
+
168
+ for ent in sentences.get_spans("ner"):
169
+ if not self.__check_label(
170
+ entity, ent.labels[0].value, self.check_label_groups
171
+ ):
172
+ continue
173
+ textual_explanation = self.DEFAULT_EXPLANATION.format(
174
+ ent.labels[0].value
175
+ )
176
+ explanation = self.build_flair_explanation(
177
+ round(ent.score, 2), textual_explanation
178
+ )
179
+ flair_result = self._convert_to_recognizer_result(ent, explanation)
180
+
181
+ results.append(flair_result)
182
+
183
+ return results
184
+
185
+ def _convert_to_recognizer_result(self, entity, explanation) -> RecognizerResult:
186
+
187
+ entity_type = self.PRESIDIO_EQUIVALENCES.get(entity.tag, entity.tag)
188
+ flair_score = round(entity.score, 2)
189
+
190
+ flair_results = RecognizerResult(
191
+ entity_type=entity_type,
192
+ start=entity.start_position,
193
+ end=entity.end_position,
194
+ score=flair_score,
195
+ analysis_explanation=explanation,
196
+ )
197
+
198
+ return flair_results
199
+
200
+ def build_flair_explanation(
201
+ self, original_score: float, explanation: str
202
+ ) -> AnalysisExplanation:
203
+ """
204
+ Create explanation for why this result was detected.
205
+ :param original_score: Score given by this recognizer
206
+ :param explanation: Explanation string
207
+ :return:
208
+ """
209
+ explanation = AnalysisExplanation(
210
+ recognizer=self.__class__.__name__,
211
+ original_score=original_score,
212
+ textual_explanation=explanation,
213
+ )
214
+ return explanation
215
+
216
+ @staticmethod
217
+ def __check_label(
218
+ entity: str, label: str, check_label_groups: Tuple[Set, Set]
219
+ ) -> bool:
220
+ return any(
221
+ [entity in egrp and label in lgrp for egrp, lgrp in check_label_groups]
222
+ )
223
+
224
+
225
+ if __name__ == "__main__":
226
+
227
+ from presidio_analyzer import AnalyzerEngine, RecognizerRegistry
228
+
229
+ flair_recognizer = (
230
+ FlairRecognizer()
231
+ ) # This would download a very large (+2GB) model on the first run
232
+
233
+ registry = RecognizerRegistry()
234
+ registry.add_recognizer(flair_recognizer)
235
+
236
+ analyzer = AnalyzerEngine(registry=registry)
237
+
238
+ results = analyzer.analyze(
239
+ "{first_name: Moustafa, sale_id: 235234}",
240
+ language="en",
241
+ return_decision_process=True,
242
+ )
243
+ for result in results:
244
+ print(result)
245
+ print(result.analysis_explanation)
requirements.txt ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ pandas
2
+ streamlit
3
+ presidio-anonymizer
4
+ presidio-analyzer
5
+ torch
6
+ #flair@git+https://github.com/flairNLP/flair.git@d4ed67bf663e4066517f00397412510d90043653
7
+ st-annotated-text
8
+ https://huggingface.co/beki/en_spacy_pii_distilbert/resolve/main/en_spacy_pii_distilbert-any-py3-none-any.whl
spacy_recognizer.py ADDED
@@ -0,0 +1,131 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from typing import Optional, List, Tuple, Set
3
+
4
+ from presidio_analyzer import (
5
+ RecognizerResult,
6
+ LocalRecognizer,
7
+ AnalysisExplanation,
8
+ )
9
+ from presidio_analyzer.nlp_engine import NlpArtifacts
10
+ from presidio_analyzer.predefined_recognizers.spacy_recognizer import SpacyRecognizer
11
+
12
+ logger = logging.getLogger("presidio-analyzer")
13
+
14
+
15
+ class CustomSpacyRecognizer(LocalRecognizer):
16
+
17
+ ENTITIES = [
18
+ "LOCATION",
19
+ "PERSON",
20
+ "NRP",
21
+ "ORGANIZATION",
22
+ "DATE_TIME",
23
+ ]
24
+
25
+ DEFAULT_EXPLANATION = "Identified as {} by Spacy's Named Entity Recognition (Privy-trained)"
26
+
27
+ CHECK_LABEL_GROUPS = [
28
+ ({"LOCATION"}, {"LOC", "LOCATION", "STREET_ADDRESS", "COORDINATE"}),
29
+ ({"PERSON"}, {"PER", "PERSON"}),
30
+ ({"NRP"}, {"NORP", "NRP"}),
31
+ ({"ORGANIZATION"}, {"ORG"}),
32
+ ({"DATE_TIME"}, {"DATE_TIME"}),
33
+ ]
34
+
35
+ MODEL_LANGUAGES = {
36
+ "en": "beki/en_spacy_pii_distilbert",
37
+ }
38
+
39
+ PRESIDIO_EQUIVALENCES = {
40
+ "PER": "PERSON",
41
+ "LOC": "LOCATION",
42
+ "ORG": "ORGANIZATION",
43
+ "NROP": "NRP",
44
+ "DATE_TIME": "DATE_TIME",
45
+ }
46
+
47
+ def __init__(
48
+ self,
49
+ supported_language: str = "en",
50
+ supported_entities: Optional[List[str]] = None,
51
+ check_label_groups: Optional[Tuple[Set, Set]] = None,
52
+ context: Optional[List[str]] = None,
53
+ ner_strength: float = 0.85,
54
+ ):
55
+ self.ner_strength = ner_strength
56
+ self.check_label_groups = (
57
+ check_label_groups if check_label_groups else self.CHECK_LABEL_GROUPS
58
+ )
59
+ supported_entities = supported_entities if supported_entities else self.ENTITIES
60
+ super().__init__(
61
+ supported_entities=supported_entities,
62
+ supported_language=supported_language,
63
+ )
64
+
65
+ def load(self) -> None:
66
+ """Load the model, not used. Model is loaded during initialization."""
67
+ pass
68
+
69
+ def get_supported_entities(self) -> List[str]:
70
+ """
71
+ Return supported entities by this model.
72
+ :return: List of the supported entities.
73
+ """
74
+ return self.supported_entities
75
+
76
+ def build_spacy_explanation(
77
+ self, original_score: float, explanation: str
78
+ ) -> AnalysisExplanation:
79
+ """
80
+ Create explanation for why this result was detected.
81
+ :param original_score: Score given by this recognizer
82
+ :param explanation: Explanation string
83
+ :return:
84
+ """
85
+ explanation = AnalysisExplanation(
86
+ recognizer=self.__class__.__name__,
87
+ original_score=original_score,
88
+ textual_explanation=explanation,
89
+ )
90
+ return explanation
91
+
92
+ def analyze(self, text, entities, nlp_artifacts=None): # noqa D102
93
+ results = []
94
+ if not nlp_artifacts:
95
+ logger.warning("Skipping SpaCy, nlp artifacts not provided...")
96
+ return results
97
+
98
+ ner_entities = nlp_artifacts.entities
99
+
100
+ for entity in entities:
101
+ if entity not in self.supported_entities:
102
+ continue
103
+ for ent in ner_entities:
104
+ if not self.__check_label(entity, ent.label_, self.check_label_groups):
105
+ continue
106
+ textual_explanation = self.DEFAULT_EXPLANATION.format(
107
+ ent.label_)
108
+ explanation = self.build_spacy_explanation(
109
+ self.ner_strength, textual_explanation
110
+ )
111
+ spacy_result = RecognizerResult(
112
+ entity_type=entity,
113
+ start=ent.start_char,
114
+ end=ent.end_char,
115
+ score=self.ner_strength,
116
+ analysis_explanation=explanation,
117
+ recognition_metadata={
118
+ RecognizerResult.RECOGNIZER_NAME_KEY: self.name
119
+ },
120
+ )
121
+ results.append(spacy_result)
122
+
123
+ return results
124
+
125
+ @staticmethod
126
+ def __check_label(
127
+ entity: str, label: str, check_label_groups: Tuple[Set, Set]
128
+ ) -> bool:
129
+ return any(
130
+ [entity in egrp and label in lgrp for egrp, lgrp in check_label_groups]
131
+ )