# Largely inspired from https://github.com/king-menin/yttm_transformers_tokenizer/blob/master/tokenization_yttm.py from collections import OrderedDict from fairseq.data import Dictionary from transformers.tokenization_utils import PreTrainedTokenizer from transformers.dynamic_module_utils import custom_object_save from transformers.utils import ( is_tokenizers_available, logging, ) from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Sequence, Tuple, Union import copy import os import stanza import youtokentome as yttm import json logger = logging.get_logger(__name__) # Slow tokenizers used to be saved in three separated files SPECIAL_TOKENS_MAP_FILE = "special_tokens_map.json" ADDED_TOKENS_FILE = "added_tokens.json" TOKENIZER_CONFIG_FILE = "tokenizer_config.json" if is_tokenizers_available(): from tokenizers import AddedToken from tokenizers import Encoding as EncodingFast else: @dataclass(frozen=True, eq=True) class AddedToken: """ AddedToken represents a token to be added to a Tokenizer An AddedToken can have special options defining the way it should behave. """ content: str = field(default_factory=str) single_word: bool = False lstrip: bool = False rstrip: bool = False normalized: bool = True def __getstate__(self): return self.__dict__ @dataclass class EncodingFast: """This is dummy class because without the `tokenizers` library we don't have these objects anyway""" pass class BertDictionary(Dictionary): """Dictionary for BERT tasks extended from Dictionary by adding support for cls as well as mask symbols""" def __init__( self, pad='[PAD]', unk='[UNK]', cls='[CLS]', mask='[MASK]', sep='[SEP]' ): super().__init__(pad=pad, unk=unk) ( self.cls_word, self.mask_word, self.sep_word, ) = cls, mask, sep self.is_end = None self.nspecial = len(self.symbols) def mask(self): """Helper to get index of mask symbol""" idx = self.index(self.mask_word) return idx def is_end_word(self, idx): if self.is_end is None: self.is_end = [self.symbols[i].endswith("") for i in range(len(self))] return self.is_end[idx] class FB2Tokenizer(PreTrainedTokenizer): """ YTTMTransformersTokenizer BPE tokenizer. Peculiarities: - Byte-level Byte-Pair-Encoding - Requires a space to start the input string => the encoding methods should be called with the ``add_prefix_space`` flag set to ``True``. Otherwise, this tokenizer ``encode`` and ``decode`` method will not conserve the absence of a space at the beginning of a string: :: tokenizer.decode(tokenizer.encode("Hello", add_special_tokens=False)) This tokenizer inherits from :class:`~transformers.PreTrainedTokenizer` which contains most of the methods. Users should refer to the superclass for more information regarding methods. Args: vocab_file (:obj:`str`): Path to the vocabulary file. unk_token (:obj:`string`, `optional`, defaults to `): The unknown token. A token that is not in the vocabulary cannot be converted to an ID and is set to be this token instead. bos_token (:obj:`string`, `optional`, defaults to ``): The beginning of sequence token. eos_token (:obj:`string`, `optional`, defaults to ``): The end of sequence token. pad_token (:obj:`string`, `optional`, defaults to ``): The padding of sequence token. model_max_length: (`Optional`) int: the maximum length in number of tokens for the inputs to the transformer model. When the tokenizer is loaded with `from_pretrained`, this will be set to the value stored for the associated. """ vocab_files_names = {"vocab_file": "vocab.txt", "bpe_model": "bpe.model"} def __init__( self, vocab_file, bpe_model, unk_token="[UNK]", bos_token="", cls_token="", eos_token="", pad_token="[PAD]", mask_token="[MASK]", sep_token="", model_max_length=512, **kwargs ): super().__init__( bos_token=bos_token, eos_token=eos_token, unk_token=unk_token, pad_token=pad_token, cls_token=cls_token, sep_token=sep_token, mask_token=mask_token, model_max_length=model_max_length, **kwargs ) # no default special tokens - you can update this value if you add special tokens #self.max_len_single_sentence = model_max_length - 2 # no default special tokens - you can update this value if you add special tokens #self.max_len_sentences_pair = model_max_length - 2 vocab_file = str(vocab_file) self.vocab_file = str(vocab_file) self.bpe_model_path = str(bpe_model) self.vocab_files_names = {'vocab_file': 'vocab.txt', 'bpe_model': 'bpe.model'} try: import stanza import youtokentome as yttm import fairseq except ImportError: raise ImportError("You need to install stanza, youtokentome and fairseq to use this tokenizer") if os.path.isfile(bpe_model): self.bpe = yttm.BPE(bpe_model, n_threads=-1) else: raise OSError("bpe_model should be a path to model file") self.nlp = stanza.Pipeline(lang='fr', processors='tokenize', tokenize_no_ssplit=True, use_gpu=True, tokenize_batch_size=128, verbose=False) self.vocab_file = vocab_file self.cache = {} self.dictionary = BertDictionary.load(vocab_file) self.dictionary.add_symbol(mask_token) self.vocab = OrderedDict([(key, val) for val, key in enumerate(self.dictionary.symbols)]) self.encoder = self.vocab self.decoder = {k: v for k, v in enumerate(self.dictionary.symbols)} @property def vocab_size(self) -> int: return len(self.vocab) def get_vocab(self): return dict(self.vocab) def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None) -> Tuple[str]: """ Save only the vocabulary of the tokenizer (vocabulary + added tokens). This method won't save the configuration and special token mappings of the tokenizer. Use [`~PreTrainedTokenizerFast._save_pretrained`] to save the whole state of the tokenizer. Args: save_directory (`str`): The directory in which to save the vocabulary. filename_prefix (`str`, *optional*): An optional prefix to add to the named of the saved files. Returns: `Tuple(str)`: Paths to the files saved. """ if not os.path.isdir(save_directory): exit(f"Provided path ({save_directory}) should be a directory") bpe_save_file = os.path.join(save_directory, (filename_prefix + "-" if filename_prefix else "") + "bpe.model") os.system(f"cp {self.bpe_model_path} {bpe_save_file}") self.bpe_model_path = bpe_save_file vocab_save_file = os.path.join(save_directory, (filename_prefix + "-" if filename_prefix else "") + "vocab.txt") os.system(f"cp {self.vocab_file} {vocab_save_file}") self.vocab_file = vocab_save_file return bpe_save_file, vocab_save_file def replace_brackets(self, sentence): sent = [None] * 10000 for j, tok in enumerate(sentence.tokens): if j > len(sent) - 1: break tok = tok.text if tok == "(": tok = "-LRB-" elif tok == ")": tok = "-RRB-" sent[j] = tok return sent[:len(sentence.tokens)] def _tokenize(self, text: str, **kwargs): """Converts a string in a sequence of tokens (string), using the tokenizer. Split in words for word-based vocabulary or sub-words for sub-word-based vocabularies (BPE). """ sent = self.nlp([stanza.Document([], text=text)])[0].sentences[0] sent = ' '.join(self.replace_brackets(sent)) bpe = self.bpe.encode([sent], output_type=yttm.OutputType.SUBWORD)[0] return bpe def tokenize(self, text: Union[List[str], str], add_special_tokens=True, **kwargs): if isinstance(text, list): return list(map( lambda x: self.tokenize(x, add_special_tokens=add_special_tokens, **kwargs), text )) res = self._tokenize(text) if add_special_tokens: res = [self.bos_token] + res + [self.eos_token] return res def _convert_token_to_id(self, token): """ Converts a token (str) in an id using the vocab. """ return self.vocab.get(token, self.vocab.get(self.unk_token)) def _convert_id_to_token(self, index): """Converts an index (integer) in a token (str) using the vocab.""" return self.decoder.get(index) def convert_tokens_to_string(self, tokens: List[str]): """Converts a sequence of tokens (string) in a single string. """ if tokens[0] == self.bos_token: tokens = tokens[1:] if tokens[-1] == self.eos_token: tokens = tokens[:-1] return self.bpe.decode(list(map(self.bpe.subword_to_id, tokens)))[0] #@classmethod #def from_pretrained(self, cls, **kwargs): # """Load from file. Actually only call __init__""" # return cls(**kwargs) def save_pretrained( self, save_directory: Union[str, os.PathLike], legacy_format: Optional[bool] = None, filename_prefix: Optional[str] = None, push_to_hub: bool = False, **kwargs, ) -> Tuple[str]: """ Save the full tokenizer state. This method make sure the full tokenizer can then be re-loaded using the [`~tokenization_utils_base.PreTrainedTokenizer.from_pretrained`] class method.. Warning,None This won't save modifications you may have applied to the tokenizer after the instantiation (for instance, modifying `tokenizer.do_lower_case` after creation). Args: save_directory (`str` or `os.PathLike`): The path to a directory where the tokenizer will be saved. legacy_format (`bool`, *optional*): Only applicable for a fast tokenizer. If unset (default), will save the tokenizer in the unified JSON format as well as in legacy format if it exists, i.e. with tokenizer specific vocabulary and a separate added_tokens files. If `False`, will only save the tokenizer in the unified JSON format. This format is incompatible with "slow" tokenizers (not powered by the *tokenizers* library), so the tokenizer will not be able to be loaded in the corresponding "slow" tokenizer. If `True`, will save the tokenizer in legacy format. If the "slow" tokenizer doesn't exits, a value error is raised. filename_prefix: (`str`, *optional*): A prefix to add to the names of the files saved by the tokenizer. push_to_hub (`bool`, *optional*, defaults to `False`): Whether or not to push your model to the Hugging Face model hub after saving it. You can specify the repository you want to push to with `repo_id` (will default to the name of `save_directory` in your namespace). kwargs: Additional key word arguments passed along to the [`~utils.PushToHubMixin.push_to_hub`] method. Returns: A tuple of `str`: The files saved. """ if os.path.isfile(save_directory): logger.error(f"Provided path ({save_directory}) should be a directory, not a file") return os.makedirs(save_directory, exist_ok=True) if push_to_hub: commit_message = kwargs.pop("commit_message", None) repo_id = kwargs.pop("repo_id", save_directory.split(os.path.sep)[-1]) repo_id, token = self._create_repo(repo_id, **kwargs) files_timestamps = self._get_files_timestamps(save_directory) special_tokens_map_file = os.path.join( save_directory, (filename_prefix + "-" if filename_prefix else "") + SPECIAL_TOKENS_MAP_FILE ) tokenizer_config_file = os.path.join( save_directory, (filename_prefix + "-" if filename_prefix else "") + TOKENIZER_CONFIG_FILE ) tokenizer_config = copy.deepcopy(self.init_kwargs) # TODO: Ensure the modified attributes (those are also in the __init__ kwargs) will give identical tokenizers # target_keys = self.init_kwargs.keys() target_keys = ["model_max_length"] for k in target_keys: if hasattr(self, k): tokenizer_config[k] = getattr(self, k) if len(self.init_inputs) > 0: tokenizer_config["init_inputs"] = copy.deepcopy(self.init_inputs) for file_id in self.vocab_files_names.keys(): tokenizer_config.pop(file_id, None) # Sanitize AddedTokens def convert_added_tokens(obj: Union[AddedToken, Any], add_type_field=True): if isinstance(obj, AddedToken): out = obj.__getstate__() if add_type_field: out["__type"] = "AddedToken" return out elif isinstance(obj, (list, tuple)): return list(convert_added_tokens(o, add_type_field=add_type_field) for o in obj) elif isinstance(obj, dict): return {k: convert_added_tokens(v, add_type_field=add_type_field) for k, v in obj.items()} return obj # add_type_field=True to allow dicts in the kwargs / differentiate from AddedToken serialization tokenizer_config = convert_added_tokens(tokenizer_config, add_type_field=True) # Add tokenizer class to the tokenizer config to be able to reload it with from_pretrained tokenizer_class = self.__class__.__name__ # Remove the Fast at the end unless we have a special `PreTrainedTokenizerFast` if tokenizer_class.endswith("Fast") and tokenizer_class != "PreTrainedTokenizerFast": tokenizer_class = tokenizer_class[:-4] tokenizer_config["tokenizer_class"] = tokenizer_class if getattr(self, "_auto_map", None) is not None: tokenizer_config["auto_map"] = self._auto_map if getattr(self, "_processor_class", None) is not None: tokenizer_config["processor_class"] = self._processor_class # If we have a custom model, we copy the file defining it in the folder and set the attributes so it can be # loaded from the Hub. if self._auto_class is not None: custom_object_save(self, save_directory, config=tokenizer_config) #tokenizer_config["vocab_file"] = "vocab.txt" #tokenizer_config["bpe_model"] = "bpe.model" with open(tokenizer_config_file, "w", encoding="utf-8") as f: out_str = json.dumps(tokenizer_config, indent=2, sort_keys=True, ensure_ascii=False) + "\n" f.write(out_str) logger.info(f"tokenizer config file saved in {tokenizer_config_file}") # Sanitize AddedTokens in special_tokens_map write_dict = convert_added_tokens(self.special_tokens_map_extended, add_type_field=False) with open(special_tokens_map_file, "w", encoding="utf-8") as f: out_str = json.dumps(write_dict, indent=2, sort_keys=True, ensure_ascii=False) + "\n" f.write(out_str) logger.info(f"Special tokens file saved in {special_tokens_map_file}") file_names = (tokenizer_config_file, special_tokens_map_file) save_files = self._save_pretrained( save_directory=save_directory, file_names=file_names, legacy_format=legacy_format, filename_prefix=filename_prefix, ) if push_to_hub: self._upload_modified_files( save_directory, repo_id, files_timestamps, commit_message=commit_message, token=token ) return save_files def _save_pretrained( self, save_directory: Union[str, os.PathLike], file_names: Tuple[str], legacy_format: Optional[bool] = None, filename_prefix: Optional[str] = None, ) -> Tuple[str]: """ Save a tokenizer using the slow-tokenizer/legacy format: vocabulary + added tokens. Fast tokenizers can also be saved in a unique JSON file containing {config + vocab + added-tokens} using the specific [`~tokenization_utils_fast.PreTrainedTokenizerFast._save_pretrained`] """ if legacy_format is False: raise ValueError( "Only fast tokenizers (instances of PreTrainedTokenizerFast) can be saved in non legacy format." ) save_directory = str(save_directory) added_tokens_file = os.path.join( save_directory, (filename_prefix + "-" if filename_prefix else "") + ADDED_TOKENS_FILE ) added_vocab = self.get_added_vocab() if added_vocab: with open(added_tokens_file, "w", encoding="utf-8") as f: out_str = json.dumps(added_vocab, indent=2, sort_keys=True, ensure_ascii=False) + "\n" f.write(out_str) logger.info(f"added tokens file saved in {added_tokens_file}") vocab_files = self.save_vocabulary(save_directory, filename_prefix=filename_prefix) return file_names + vocab_files + (added_tokens_file,)