Spaces:
Runtime error
Runtime error
import random | |
import re | |
from typing import List, Union | |
from interfaces import IProcess | |
from helpers import get_freq_dict, load_text_file, remove_long_spaces | |
from transformers import AutoTokenizer | |
class LoadFile(IProcess): | |
def execute(self, file_path: str): | |
return load_text_file( | |
file_path | |
) | |
class LinesSplitter(IProcess): | |
def __init__(self, sep: str) -> None: | |
super().__init__() | |
self.sep = sep | |
def split(self, line): | |
return line.split(self.sep) | |
def execute(self, data: Union[List[str], str]) -> List[str]: | |
if isinstance(data, str): | |
return data.split(self.sep) | |
results = [] | |
for lines in map(self.split, data): | |
results.extend(lines) | |
return results | |
class LengthFilter(IProcess): | |
def __init__( | |
self, min_length: int, max_length: int | |
) -> None: | |
super().__init__() | |
self.min_length = min_length | |
self.max_length = max_length | |
def execute(self, lines: List[str]): | |
return list(filter( | |
lambda x: self.min_length <= len(x) <= self.max_length, lines | |
)) | |
class WordsNumberFilter(IProcess): | |
def __init__(self, min_words: int, max_words: int) -> None: | |
super().__init__() | |
self.min_words = min_words | |
self.max_words = max_words | |
def _is_valid(self, line: str) -> bool: | |
return self.min_words < line.count(' ') < self.max_words | |
def execute(self, lines: List[str]): | |
return list(filter(self._is_valid, lines)) | |
class TokenizerLengthFilter(IProcess): | |
def __init__(self, max_length: int = 1024) -> None: | |
super().__init__() | |
self.max_length = max_length | |
self.tokenizer = AutoTokenizer.from_pretrained("./tokenizer") | |
def _is_valid(self, line: str) -> bool: | |
data = self.tokenizer.batch_encode_plus([line], max_length=self.max_length, truncation=True,return_overflowing_tokens=True ) | |
if len(data["input_ids"]) > 1: | |
return True | |
else: | |
return False | |
def execute(self, lines: List[str]): | |
return list(filter(self._is_valid, lines)) | |
class WordsFilter(IProcess): | |
def __init__(self, words: List[str]) -> None: | |
super().__init__() | |
self.words = set(words) | |
def _not_contain(self, line: str) -> bool: | |
return not any(( | |
word in line for word in self.words | |
)) | |
def execute(self, lines: List[str]): | |
return list(filter(self._not_contain, lines)) | |
class SoloCharFilter(IProcess): | |
def _not_contain(self, line: str) -> bool: | |
return re.search('^. | . | .$', line) is None | |
def execute(self, lines: List[str]): | |
return list(filter(self._not_contain, lines)) | |
class NumbersFilter(IProcess): | |
def _not_contain(self, line: str) -> bool: | |
return re.search('[0-9]+', line) is None | |
def execute(self, lines: List[str]): | |
return list(filter(self._not_contain, lines)) | |
class OOVFilter(IProcess): | |
def __init__(self, max_oov: int) -> None: | |
super().__init__() | |
self.max_oov = max_oov | |
self.__freq = {} | |
def _is_valid(self, line: str): | |
counter = 0 | |
for word in line.split(' '): | |
counter += (self.__freq[word] == 1) | |
return counter < self.max_oov | |
def execute(self, lines: List[str]): | |
self.__freq = get_freq_dict(lines) | |
return list(filter(self._is_valid, lines)) | |
# text = ["کوردستان وڵاتی کوردانە هەی هەی هەی هەی", "کورد بوون گەوادیە", "ژیان سەختە"] | |
# result = OOVFilter(5).execute(text) | |
# print(result) | |
class CharsRemover(IProcess): | |
def __init__(self, chars: str) -> None: | |
super().__init__() | |
self.pat = f'[{chars}]' | |
def remove(self, line: str) -> str: | |
return re.sub(self.pat, '', line) | |
def execute(self, lines: List[str]) -> List[str]: | |
return list(map(self.remove, lines)) | |
class RepeatedCharsCollapsor(IProcess): | |
def __init__(self, max_repeteion: int) -> None: | |
super().__init__() | |
self.pat = r"(.)\1{}".format(f"{{{2},}}") | |
def collaps(self, line: str) -> str: | |
return re.sub(self.pat, r"\1" * 1, line) | |
def execute(self, lines: List[str]) -> List[str]: | |
return list(map(self.collaps, lines)) | |
class ValidCharsKeeper(IProcess): | |
def __init__(self, valid_chars: str, rep_with=' ') -> None: | |
super().__init__() | |
self.valid_chars = valid_chars | |
self.rep_with = rep_with | |
self.pat = f'[^{self.valid_chars}]' | |
def __keep(self, line: str) -> str: | |
return re.sub(self.pat, ' ', line) | |
def execute(self, lines: List[str]) -> List[str]: | |
return list(map(self.__keep, lines)) | |
class SpacesRemover(IProcess): | |
def __remove(self, line: str) -> str: | |
return remove_long_spaces(line).strip() | |
def execute(self, lines: List[str]): | |
return list(map(self.__remove, lines)) | |
class RandomCharsInjector(IProcess): | |
def __init__(self, chars: str) -> None: | |
super().__init__() | |
self.chars = chars | |
def get_char(self) -> str: | |
return random.choice(self.chars) | |
def execute(self, line: str): | |
length = len(line) | |
idx = random.randint(0, length - 1) | |
return line[:idx] + self.get_char() + line[idx:] | |
class PunctuationRemover(IProcess): | |
def __init__(self) -> None: | |
super().__init__() | |
self.clean_punctuation = re.compile(r"(?<!\d)[.,;:'?!،.؟؛:»«](?!\d)") | |
def __remove_punctuation(self, text: str): | |
"""Remove all punctuation from string, except if it's between digits""" | |
return self.clean_punctuation.sub("", text) | |
def execute(self, line: str): | |
return self.__remove_punctuation(line) | |
class RandomCharsSwapper(IProcess): | |
def execute(self, line: str) -> str: | |
length = len(line) | |
idx = random.randint(0, length - 2) | |
return line[:idx] + line[idx + 1] + line[idx] + line[idx + 2:] | |
class RandomCharRemover(IProcess): | |
def execute(self, line: str) -> str: | |
length = len(line) | |
idx = random.randint(0, length - 1) | |
return line[:idx] + line[idx + 1:] | |
class RandomWordsCollapsor(IProcess): | |
def execute(self, line: str) -> str: | |
indices = [ | |
i for i, char in enumerate(line) | |
if char == ' ' | |
] | |
if len(indices) == 0: | |
return line | |
idx = random.choice(indices) | |
return line[: idx] + line[idx + 1:] | |
class RandomNeighborReplacer(IProcess): | |
def __init__(self, keyboard_rows: List[str], blank: str) -> None: | |
super().__init__() | |
self.lines = keyboard_rows | |
self.blank = blank | |
self.n_rows = len(keyboard_rows) | |
self._mapper = {} | |
self.set_mapper() | |
def __get_left( | |
self, row_idx: int, col_idx: int | |
) -> List[str]: | |
if col_idx == 0: | |
return [] | |
return [self.lines[row_idx][col_idx - 1]] | |
def __get_right( | |
self, row_idx: int, col_idx: int | |
) -> List[str]: | |
if col_idx == (len(self.lines[row_idx]) - 1): | |
return [] | |
return self.lines[row_idx][col_idx + 1] | |
def __get_upper( | |
self, row_idx: int, col_idx: int | |
) -> List[str]: | |
if row_idx == 0: | |
return [] | |
line = self.lines[row_idx - 1] | |
start = max(0, col_idx - 1) | |
end = min(len(line), col_idx + 2) | |
return list(line[start: end]) | |
def __get_lower( | |
self, row_idx: int, col_idx: int | |
) -> List[str]: | |
if row_idx == (self.n_rows - 1): | |
return [] | |
line = self.lines[row_idx + 1] | |
start = max(0, col_idx - 1) | |
end = min(len(line), col_idx + 2) | |
return list(line[start: end]) | |
def set_mapper(self) -> None: | |
funcs = [ | |
self.__get_left, | |
self.__get_right, | |
self.__get_upper, | |
self.__get_lower | |
] | |
for row_idx in range(self.n_rows): | |
for col_idx in range(len(self.lines[row_idx])): | |
items = [] | |
for func in funcs: | |
items.extend(func(row_idx, col_idx)) | |
items = list( | |
filter(lambda x: x != self.blank, items) | |
) | |
char = self.lines[row_idx][col_idx] | |
self._mapper[char] = items.copy() | |
def get_char(self, char: str) -> str: | |
if char not in self._mapper: | |
return char | |
return random.choice(self._mapper[char]) | |
def execute(self, line: str) -> str: | |
length = len(line) | |
idx = random.randint(0, length - 1) | |
return line[:idx] + self.get_char(line[idx]) + line[idx + 1:] | |
class CharsNormalizer(IProcess): | |
def __init__(self, mapper: dict) -> None: | |
super().__init__() | |
self.mapper = mapper | |
def _normalize(self, line: str) -> str: | |
for key, value in self.mapper.items(): | |
line = line.replace(key, value) | |
return line | |
def execute(self, lines: List[str]): | |
return list(filter(self._normalize, lines)) | |
class SentencePermutation(IProcess): | |
def __init__(self, sentences: List[str], augmentation_probability: float = 1) -> None: | |
super().__init__() | |
self.sentences = sentences | |
self.augmentation_probability = augmentation_probability | |
def _combine(self, text: str) -> str: | |
if random.random() < self.augmentation_probability: | |
sentences_to_sample = random.randint(0,10) | |
augmentation_sentences = random.sample(self.sentences, sentences_to_sample) | |
return text + " " + " ".join(augmentation_sentences) | |
else: | |
return text | |
def execute(self, line: str) -> str: | |
# return [self._combine(line) for line in lines] | |
return self._combine(line) | |