File size: 5,793 Bytes
74c014c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
import torch, transformers
from typing import Any, Dict
from transformers import AutoTokenizer, AutoModelForCausalLM
import re
import unicodedata
class EndpointHandler:
def __init__(self, path):
tokenizer = AutoTokenizer.from_pretrained(path)
model = AutoModelForCausalLM.from_pretrained(
path, device_map="auto", torch_dtype=torch.bfloat16, load_in_4bit=True
)
self.pipeline = transformers.pipeline(
"text-generation", model=model, tokenizer=tokenizer
)
def __call__(self, data: Dict[str, Any]) -> Dict[str, str]:
# process input
inputs = data.pop("inputs", data)
# default parameters
parameters = {
"max_new_tokens": 128,
"do_sample": True,
"top_k": 10,
"temperature": 1.0,
"return_full_text": False,
}
# user parameters
parameters.update(data.pop("parameters", {}))
unique = isinstance(inputs, str)
inputs, denormalize_funcs = claire_text_preproc_conversation(inputs)
sequences = self.pipeline(inputs, **parameters)
if unique:
return [{"generated_text": denormalize_funcs(sequences[0]["generated_text"])}]
else:
assert len(denormalize_funcs) == len(sequences)
return [{"generated_text": denormalize_func(seq[0]["generated_text"])} for denormalize_func, seq in zip(denormalize_funcs, sequences)]
def claire_text_preproc_conversation(text):
if isinstance(text, (list, tuple)):
assert len(text)
# Apply and transpose
texts, denormalize_funcs = zip(*[claire_text_preproc_conversation(t) for t in text])
return list(texts), list(denormalize_funcs)
if not isinstance(text, str):
return text
text = format_special_characters(text)
text = re.sub(" - | -$|^- ", " ", text.strip(" "))
global _reverse_tag_transfo
_reverse_tag_transfo = {}
text = format_special_tags(text)
text = collapse_whitespaces_conversations(text)
if _reverse_tag_transfo:
reverse_tag_transfo = _reverse_tag_transfo.copy()
def denormalize_func(t):
for k, v in reverse_tag_transfo.items():
if k in t:
t = t.replace(k, v)
return t
return text, lambda x: denormalize_func(x)
else:
return text, lambda x: x
_brackets = re.compile(r"\[([^\]]*)\]")
_pattern_speaker = re.compile(r"[^\]]+:")
# Global variable to remember some normalizations that were done and apply it back
_reverse_tag_transfo = {}
_anonymized_prefix = None
def format_special_tags(text):
global _reverse_tag_transfo, _anonymized_prefix
_anonymized_prefix = None
text = re.sub(_brackets, _format_special_tags, text)
# At last the generic anonymization
if _anonymized_prefix:
_reverse_tag_transfo["[Intervenant "] = _anonymized_prefix
return text
def _format_special_tags(match):
content_within_brackets = match.group(1)
if re.match(_pattern_speaker, content_within_brackets):
return _format_tag(match.group())
else:
return ""
def _format_tag(text):
global _reverse_tag_transfo, _anonymized_prefix
if text.endswith(":]"):
anonymized_spk_prefixes = ["speaker", "spk", "locuteur"]
# Conversion "[speaker001:]" -> "[Intervenant 1:]"
for prefix in anonymized_spk_prefixes:
if text.lower().startswith("["+prefix):
try:
index = int(text[len(prefix)+1:-2])
except ValueError:
return text
new_spk_tag = f"[Intervenant {index}:]"
_reverse_tag_transfo[new_spk_tag] = text
if _anonymized_prefix is None:
prefix = "["+prefix
while len(prefix) < len(text) and text[len(prefix)] in " 0":
prefix += text[len(prefix)]
_anonymized_prefix = prefix
return "\n" + new_spk_tag
# Capitalize speaker name
speaker = text[1:-2]
speaker = capitalize(speaker)
new_spk_tag = f"[{speaker}:]"
if text != new_spk_tag:
_reverse_tag_transfo[new_spk_tag] = text
return "\n" + new_spk_tag
# if text == "[PII]":
# return "[Nom]"
# if text == "[NOISE]":
# return "[bruit]"
# if text == "[LAUGHTER]":
# return "[rire]"
return ""
def capitalize(text):
# Custom capitalization for first and last names
words = text.split(" ")
words = [w.capitalize() if (not w.isupper() or len(w) > 2) else w for w in words]
for i, w in enumerate(words):
for sep in "-", "'":
if sep in w:
words[i] = sep.join(
[x.capitalize() if not x.isupper() else x for x in w.split(sep)]
)
return " ".join(words)
def collapse_whitespaces_conversations(text):
text = re.sub(r"\n+", "\n", text)
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n ", "\n", text)
text = re.sub(r" ([\.,])", r"\1", text)
return text.lstrip().rstrip(" ")
def format_special_characters(text):
text = unicodedata.normalize("NFC", text)
for before, after in [
("…", "..."),
(r"[«“][^\S\r\n]*", '"'),
(r"[^\S\r\n]*[»”″„]", '"'),
(r"(``|'')", '"'),
(r"[’‘‛ʿ]", "'"),
("‚", ","),
(r"–", "-"),
("[ ]", " "), # unbreakable spaces
(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]", ""), # non-printable characters
# ("·", "."),
(r"ᵉʳ", "er"),
(r"ᵉ", "e"),
]:
text = re.sub(before, after, text)
return text
|