TCMVince's picture
Upload tokenizer
9e06b74
# Largely inspired from https://github.com/king-menin/yttm_transformers_tokenizer/blob/master/tokenization_yttm.py
from collections import OrderedDict
from fairseq.data import Dictionary
from transformers.tokenization_utils import PreTrainedTokenizer
from transformers.dynamic_module_utils import custom_object_save
from transformers.utils import (
is_tokenizers_available,
logging,
)
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Sequence, Tuple, Union
import copy
import os
import stanza
import youtokentome as yttm
import json
logger = logging.get_logger(__name__)
# Slow tokenizers used to be saved in three separated files
SPECIAL_TOKENS_MAP_FILE = "special_tokens_map.json"
ADDED_TOKENS_FILE = "added_tokens.json"
TOKENIZER_CONFIG_FILE = "tokenizer_config.json"
if is_tokenizers_available():
from tokenizers import AddedToken
from tokenizers import Encoding as EncodingFast
else:
@dataclass(frozen=True, eq=True)
class AddedToken:
"""
AddedToken represents a token to be added to a Tokenizer An AddedToken can have special options defining the
way it should behave.
"""
content: str = field(default_factory=str)
single_word: bool = False
lstrip: bool = False
rstrip: bool = False
normalized: bool = True
def __getstate__(self):
return self.__dict__
@dataclass
class EncodingFast:
"""This is dummy class because without the `tokenizers` library we don't have these objects anyway"""
pass
class BertDictionary(Dictionary):
"""Dictionary for BERT tasks
extended from Dictionary by adding support for cls as well as mask symbols"""
def __init__(
self,
pad='[PAD]',
unk='[UNK]',
cls='[CLS]',
mask='[MASK]',
sep='[SEP]'
):
super().__init__(pad=pad, unk=unk)
(
self.cls_word,
self.mask_word,
self.sep_word,
) = cls, mask, sep
self.is_end = None
self.nspecial = len(self.symbols)
def mask(self):
"""Helper to get index of mask symbol"""
idx = self.index(self.mask_word)
return idx
def is_end_word(self, idx):
if self.is_end is None:
self.is_end = [self.symbols[i].endswith("</w>") for i in range(len(self))]
return self.is_end[idx]
class FB2Tokenizer(PreTrainedTokenizer):
"""
YTTMTransformersTokenizer BPE tokenizer. Peculiarities:
- Byte-level Byte-Pair-Encoding
- Requires a space to start the input string => the encoding methods should be called with the
``add_prefix_space`` flag set to ``True``.
Otherwise, this tokenizer ``encode`` and ``decode`` method will not conserve
the absence of a space at the beginning of a string:
::
tokenizer.decode(tokenizer.encode("Hello", add_special_tokens=False))
This tokenizer inherits from :class:`~transformers.PreTrainedTokenizer` which contains most of the methods. Users
should refer to the superclass for more information regarding methods.
Args:
vocab_file (:obj:`str`):
Path to the vocabulary file.
unk_token (:obj:`string`, `optional`, defaults to <UNK>`):
The unknown token. A token that is not in the vocabulary cannot be converted to an ID and is set to be this
token instead.
bos_token (:obj:`string`, `optional`, defaults to `<BOS>`):
The beginning of sequence token.
eos_token (:obj:`string`, `optional`, defaults to `<EOS>`):
The end of sequence token.
pad_token (:obj:`string`, `optional`, defaults to `<PAD>`):
The padding of sequence token.
model_max_length: (`Optional`) int: the maximum length in number of tokens for the inputs to the transformer
model. When the tokenizer is loaded with `from_pretrained`,
this will be set to the value stored for the associated.
"""
vocab_files_names = {"vocab_file": "vocab.txt", "bpe_model": "bpe.model"}
def __init__(
self,
vocab_file,
bpe_model,
unk_token="[UNK]",
bos_token="<s>",
cls_token="<s>",
eos_token="</s>",
pad_token="[PAD]",
mask_token="[MASK]",
sep_token="</s>",
model_max_length=512,
**kwargs
):
super().__init__(
bos_token=bos_token,
eos_token=eos_token,
unk_token=unk_token,
pad_token=pad_token,
cls_token=cls_token,
sep_token=sep_token,
mask_token=mask_token,
model_max_length=model_max_length,
**kwargs
)
# no default special tokens - you can update this value if you add special tokens
#self.max_len_single_sentence = model_max_length - 2
# no default special tokens - you can update this value if you add special tokens
#self.max_len_sentences_pair = model_max_length - 2
vocab_file = str(vocab_file)
self.vocab_file = str(vocab_file)
self.bpe_model_path = str(bpe_model)
self.vocab_files_names = {'vocab_file': 'vocab.txt', 'bpe_model': 'bpe.model'}
try:
import stanza
import youtokentome as yttm
import fairseq
except ImportError:
raise ImportError("You need to install stanza, youtokentome and fairseq to use this tokenizer")
if os.path.isfile(bpe_model):
self.bpe = yttm.BPE(bpe_model, n_threads=-1)
else:
raise OSError("bpe_model should be a path to model file")
self.nlp = stanza.Pipeline(lang='fr',
processors='tokenize',
tokenize_no_ssplit=True,
use_gpu=True, tokenize_batch_size=128, verbose=False)
self.vocab_file = vocab_file
self.cache = {}
self.dictionary = BertDictionary.load(vocab_file)
self.dictionary.add_symbol(mask_token)
self.vocab = OrderedDict([(key, val) for val, key in enumerate(self.dictionary.symbols)])
self.encoder = self.vocab
self.decoder = {k: v for k, v in enumerate(self.dictionary.symbols)}
@property
def vocab_size(self) -> int:
return len(self.vocab)
def get_vocab(self):
return dict(self.vocab)
def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None) -> Tuple[str]:
"""
Save only the vocabulary of the tokenizer (vocabulary + added tokens).
This method won't save the configuration and special token mappings of the tokenizer. Use
[`~PreTrainedTokenizerFast._save_pretrained`] to save the whole state of the tokenizer.
Args:
save_directory (`str`):
The directory in which to save the vocabulary.
filename_prefix (`str`, *optional*):
An optional prefix to add to the named of the saved files.
Returns:
`Tuple(str)`: Paths to the files saved.
"""
if not os.path.isdir(save_directory):
exit(f"Provided path ({save_directory}) should be a directory")
bpe_save_file = os.path.join(save_directory, (filename_prefix + "-" if filename_prefix else "") + "bpe.model")
os.system(f"cp {self.bpe_model_path} {bpe_save_file}")
self.bpe_model_path = bpe_save_file
vocab_save_file = os.path.join(save_directory, (filename_prefix + "-" if filename_prefix else "") + "vocab.txt")
os.system(f"cp {self.vocab_file} {vocab_save_file}")
self.vocab_file = vocab_save_file
return bpe_save_file, vocab_save_file
def replace_brackets(self, sentence):
sent = [None] * 10000
for j, tok in enumerate(sentence.tokens):
if j > len(sent) - 1:
break
tok = tok.text
if tok == "(":
tok = "-LRB-"
elif tok == ")":
tok = "-RRB-"
sent[j] = tok
return sent[:len(sentence.tokens)]
def _tokenize(self, text: str, **kwargs):
"""Converts a string in a sequence of tokens (string), using the tokenizer.
Split in words for word-based vocabulary or sub-words for sub-word-based vocabularies (BPE).
"""
sent = self.nlp([stanza.Document([], text=text)])[0].sentences[0]
sent = ' '.join(self.replace_brackets(sent))
bpe = self.bpe.encode([sent], output_type=yttm.OutputType.SUBWORD)[0]
return bpe
def tokenize(self, text: Union[List[str], str], add_special_tokens=True, **kwargs):
if isinstance(text, list):
return list(map(
lambda x: self.tokenize(x, add_special_tokens=add_special_tokens, **kwargs),
text
))
res = self._tokenize(text)
if add_special_tokens:
res = [self.bos_token] + res + [self.eos_token]
return res
def _convert_token_to_id(self, token):
""" Converts a token (str) in an id using the vocab. """
return self.vocab.get(token, self.vocab.get(self.unk_token))
def _convert_id_to_token(self, index):
"""Converts an index (integer) in a token (str) using the vocab."""
return self.decoder.get(index)
def convert_tokens_to_string(self, tokens: List[str]):
"""Converts a sequence of tokens (string) in a single string. """
if tokens[0] == self.bos_token:
tokens = tokens[1:]
if tokens[-1] == self.eos_token:
tokens = tokens[:-1]
return self.bpe.decode(list(map(self.bpe.subword_to_id, tokens)))[0]
#@classmethod
#def from_pretrained(self, cls, **kwargs):
# """Load from file. Actually only call __init__"""
# return cls(**kwargs)
def save_pretrained(
self,
save_directory: Union[str, os.PathLike],
legacy_format: Optional[bool] = None,
filename_prefix: Optional[str] = None,
push_to_hub: bool = False,
**kwargs,
) -> Tuple[str]:
"""
Save the full tokenizer state.
This method make sure the full tokenizer can then be re-loaded using the
[`~tokenization_utils_base.PreTrainedTokenizer.from_pretrained`] class method..
Warning,None This won't save modifications you may have applied to the tokenizer after the instantiation (for
instance, modifying `tokenizer.do_lower_case` after creation).
Args:
save_directory (`str` or `os.PathLike`): The path to a directory where the tokenizer will be saved.
legacy_format (`bool`, *optional*):
Only applicable for a fast tokenizer. If unset (default), will save the tokenizer in the unified JSON
format as well as in legacy format if it exists, i.e. with tokenizer specific vocabulary and a separate
added_tokens files.
If `False`, will only save the tokenizer in the unified JSON format. This format is incompatible with
"slow" tokenizers (not powered by the *tokenizers* library), so the tokenizer will not be able to be
loaded in the corresponding "slow" tokenizer.
If `True`, will save the tokenizer in legacy format. If the "slow" tokenizer doesn't exits, a value
error is raised.
filename_prefix: (`str`, *optional*):
A prefix to add to the names of the files saved by the tokenizer.
push_to_hub (`bool`, *optional*, defaults to `False`):
Whether or not to push your model to the Hugging Face model hub after saving it. You can specify the
repository you want to push to with `repo_id` (will default to the name of `save_directory` in your
namespace).
kwargs:
Additional key word arguments passed along to the [`~utils.PushToHubMixin.push_to_hub`] method.
Returns:
A tuple of `str`: The files saved.
"""
if os.path.isfile(save_directory):
logger.error(f"Provided path ({save_directory}) should be a directory, not a file")
return
os.makedirs(save_directory, exist_ok=True)
if push_to_hub:
commit_message = kwargs.pop("commit_message", None)
repo_id = kwargs.pop("repo_id", save_directory.split(os.path.sep)[-1])
repo_id, token = self._create_repo(repo_id, **kwargs)
files_timestamps = self._get_files_timestamps(save_directory)
special_tokens_map_file = os.path.join(
save_directory, (filename_prefix + "-" if filename_prefix else "") + SPECIAL_TOKENS_MAP_FILE
)
tokenizer_config_file = os.path.join(
save_directory, (filename_prefix + "-" if filename_prefix else "") + TOKENIZER_CONFIG_FILE
)
tokenizer_config = copy.deepcopy(self.init_kwargs)
# TODO: Ensure the modified attributes (those are also in the __init__ kwargs) will give identical tokenizers
# target_keys = self.init_kwargs.keys()
target_keys = ["model_max_length"]
for k in target_keys:
if hasattr(self, k):
tokenizer_config[k] = getattr(self, k)
if len(self.init_inputs) > 0:
tokenizer_config["init_inputs"] = copy.deepcopy(self.init_inputs)
for file_id in self.vocab_files_names.keys():
tokenizer_config.pop(file_id, None)
# Sanitize AddedTokens
def convert_added_tokens(obj: Union[AddedToken, Any], add_type_field=True):
if isinstance(obj, AddedToken):
out = obj.__getstate__()
if add_type_field:
out["__type"] = "AddedToken"
return out
elif isinstance(obj, (list, tuple)):
return list(convert_added_tokens(o, add_type_field=add_type_field) for o in obj)
elif isinstance(obj, dict):
return {k: convert_added_tokens(v, add_type_field=add_type_field) for k, v in obj.items()}
return obj
# add_type_field=True to allow dicts in the kwargs / differentiate from AddedToken serialization
tokenizer_config = convert_added_tokens(tokenizer_config, add_type_field=True)
# Add tokenizer class to the tokenizer config to be able to reload it with from_pretrained
tokenizer_class = self.__class__.__name__
# Remove the Fast at the end unless we have a special `PreTrainedTokenizerFast`
if tokenizer_class.endswith("Fast") and tokenizer_class != "PreTrainedTokenizerFast":
tokenizer_class = tokenizer_class[:-4]
tokenizer_config["tokenizer_class"] = tokenizer_class
if getattr(self, "_auto_map", None) is not None:
tokenizer_config["auto_map"] = self._auto_map
if getattr(self, "_processor_class", None) is not None:
tokenizer_config["processor_class"] = self._processor_class
# If we have a custom model, we copy the file defining it in the folder and set the attributes so it can be
# loaded from the Hub.
if self._auto_class is not None:
custom_object_save(self, save_directory, config=tokenizer_config)
#tokenizer_config["vocab_file"] = "vocab.txt"
#tokenizer_config["bpe_model"] = "bpe.model"
with open(tokenizer_config_file, "w", encoding="utf-8") as f:
out_str = json.dumps(tokenizer_config, indent=2, sort_keys=True, ensure_ascii=False) + "\n"
f.write(out_str)
logger.info(f"tokenizer config file saved in {tokenizer_config_file}")
# Sanitize AddedTokens in special_tokens_map
write_dict = convert_added_tokens(self.special_tokens_map_extended, add_type_field=False)
with open(special_tokens_map_file, "w", encoding="utf-8") as f:
out_str = json.dumps(write_dict, indent=2, sort_keys=True, ensure_ascii=False) + "\n"
f.write(out_str)
logger.info(f"Special tokens file saved in {special_tokens_map_file}")
file_names = (tokenizer_config_file, special_tokens_map_file)
save_files = self._save_pretrained(
save_directory=save_directory,
file_names=file_names,
legacy_format=legacy_format,
filename_prefix=filename_prefix,
)
if push_to_hub:
self._upload_modified_files(
save_directory, repo_id, files_timestamps, commit_message=commit_message, token=token
)
return save_files
def _save_pretrained(
self,
save_directory: Union[str, os.PathLike],
file_names: Tuple[str],
legacy_format: Optional[bool] = None,
filename_prefix: Optional[str] = None,
) -> Tuple[str]:
"""
Save a tokenizer using the slow-tokenizer/legacy format: vocabulary + added tokens.
Fast tokenizers can also be saved in a unique JSON file containing {config + vocab + added-tokens} using the
specific [`~tokenization_utils_fast.PreTrainedTokenizerFast._save_pretrained`]
"""
if legacy_format is False:
raise ValueError(
"Only fast tokenizers (instances of PreTrainedTokenizerFast) can be saved in non legacy format."
)
save_directory = str(save_directory)
added_tokens_file = os.path.join(
save_directory, (filename_prefix + "-" if filename_prefix else "") + ADDED_TOKENS_FILE
)
added_vocab = self.get_added_vocab()
if added_vocab:
with open(added_tokens_file, "w", encoding="utf-8") as f:
out_str = json.dumps(added_vocab, indent=2, sort_keys=True, ensure_ascii=False) + "\n"
f.write(out_str)
logger.info(f"added tokens file saved in {added_tokens_file}")
vocab_files = self.save_vocabulary(save_directory, filename_prefix=filename_prefix)
return file_names + vocab_files + (added_tokens_file,)