ariahmed's picture
Upload folder using huggingface_hub
e489264 verified
import random
import re
from typing import List, Union
from interfaces import IProcess
from helpers import get_freq_dict, load_text_file, remove_long_spaces
from transformers import AutoTokenizer
class LoadFile(IProcess):
def execute(self, file_path: str):
return load_text_file(
file_path
)
class LinesSplitter(IProcess):
def __init__(self, sep: str) -> None:
super().__init__()
self.sep = sep
def split(self, line):
return line.split(self.sep)
def execute(self, data: Union[List[str], str]) -> List[str]:
if isinstance(data, str):
return data.split(self.sep)
results = []
for lines in map(self.split, data):
results.extend(lines)
return results
class LengthFilter(IProcess):
def __init__(
self, min_length: int, max_length: int
) -> None:
super().__init__()
self.min_length = min_length
self.max_length = max_length
def execute(self, lines: List[str]):
return list(filter(
lambda x: self.min_length <= len(x) <= self.max_length, lines
))
class WordsNumberFilter(IProcess):
def __init__(self, min_words: int, max_words: int) -> None:
super().__init__()
self.min_words = min_words
self.max_words = max_words
def _is_valid(self, line: str) -> bool:
return self.min_words < line.count(' ') < self.max_words
def execute(self, lines: List[str]):
return list(filter(self._is_valid, lines))
class TokenizerLengthFilter(IProcess):
def __init__(self, max_length: int = 1024) -> None:
super().__init__()
self.max_length = max_length
self.tokenizer = AutoTokenizer.from_pretrained("./tokenizer")
def _is_valid(self, line: str) -> bool:
data = self.tokenizer.batch_encode_plus([line], max_length=self.max_length, truncation=True,return_overflowing_tokens=True )
if len(data["input_ids"]) > 1:
return True
else:
return False
def execute(self, lines: List[str]):
return list(filter(self._is_valid, lines))
class WordsFilter(IProcess):
def __init__(self, words: List[str]) -> None:
super().__init__()
self.words = set(words)
def _not_contain(self, line: str) -> bool:
return not any((
word in line for word in self.words
))
def execute(self, lines: List[str]):
return list(filter(self._not_contain, lines))
class SoloCharFilter(IProcess):
def _not_contain(self, line: str) -> bool:
return re.search('^. | . | .$', line) is None
def execute(self, lines: List[str]):
return list(filter(self._not_contain, lines))
class NumbersFilter(IProcess):
def _not_contain(self, line: str) -> bool:
return re.search('[0-9]+', line) is None
def execute(self, lines: List[str]):
return list(filter(self._not_contain, lines))
class OOVFilter(IProcess):
def __init__(self, max_oov: int) -> None:
super().__init__()
self.max_oov = max_oov
self.__freq = {}
def _is_valid(self, line: str):
counter = 0
for word in line.split(' '):
counter += (self.__freq[word] == 1)
return counter < self.max_oov
def execute(self, lines: List[str]):
self.__freq = get_freq_dict(lines)
return list(filter(self._is_valid, lines))
# text = ["کوردستان وڵاتی کوردانە هەی هەی هەی هەی", "کورد بوون گەوادیە", "ژیان سەختە"]
# result = OOVFilter(5).execute(text)
# print(result)
class CharsRemover(IProcess):
def __init__(self, chars: str) -> None:
super().__init__()
self.pat = f'[{chars}]'
def remove(self, line: str) -> str:
return re.sub(self.pat, '', line)
def execute(self, lines: List[str]) -> List[str]:
return list(map(self.remove, lines))
class RepeatedCharsCollapsor(IProcess):
def __init__(self, max_repeteion: int) -> None:
super().__init__()
self.pat = r"(.)\1{}".format(f"{{{2},}}")
def collaps(self, line: str) -> str:
return re.sub(self.pat, r"\1" * 1, line)
def execute(self, lines: List[str]) -> List[str]:
return list(map(self.collaps, lines))
class ValidCharsKeeper(IProcess):
def __init__(self, valid_chars: str, rep_with=' ') -> None:
super().__init__()
self.valid_chars = valid_chars
self.rep_with = rep_with
self.pat = f'[^{self.valid_chars}]'
def __keep(self, line: str) -> str:
return re.sub(self.pat, ' ', line)
def execute(self, lines: List[str]) -> List[str]:
return list(map(self.__keep, lines))
class SpacesRemover(IProcess):
def __remove(self, line: str) -> str:
return remove_long_spaces(line).strip()
def execute(self, lines: List[str]):
return list(map(self.__remove, lines))
class RandomCharsInjector(IProcess):
def __init__(self, chars: str) -> None:
super().__init__()
self.chars = chars
def get_char(self) -> str:
return random.choice(self.chars)
def execute(self, line: str):
length = len(line)
idx = random.randint(0, length - 1)
return line[:idx] + self.get_char() + line[idx:]
class PunctuationRemover(IProcess):
def __init__(self) -> None:
super().__init__()
self.clean_punctuation = re.compile(r"(?<!\d)[.,;:'?!،.؟؛:»«](?!\d)")
def __remove_punctuation(self, text: str):
"""Remove all punctuation from string, except if it's between digits"""
return self.clean_punctuation.sub("", text)
def execute(self, line: str):
return self.__remove_punctuation(line)
class RandomCharsSwapper(IProcess):
def execute(self, line: str) -> str:
length = len(line)
idx = random.randint(0, length - 2)
return line[:idx] + line[idx + 1] + line[idx] + line[idx + 2:]
class RandomCharRemover(IProcess):
def execute(self, line: str) -> str:
length = len(line)
idx = random.randint(0, length - 1)
return line[:idx] + line[idx + 1:]
class RandomWordsCollapsor(IProcess):
def execute(self, line: str) -> str:
indices = [
i for i, char in enumerate(line)
if char == ' '
]
if len(indices) == 0:
return line
idx = random.choice(indices)
return line[: idx] + line[idx + 1:]
class RandomNeighborReplacer(IProcess):
def __init__(self, keyboard_rows: List[str], blank: str) -> None:
super().__init__()
self.lines = keyboard_rows
self.blank = blank
self.n_rows = len(keyboard_rows)
self._mapper = {}
self.set_mapper()
def __get_left(
self, row_idx: int, col_idx: int
) -> List[str]:
if col_idx == 0:
return []
return [self.lines[row_idx][col_idx - 1]]
def __get_right(
self, row_idx: int, col_idx: int
) -> List[str]:
if col_idx == (len(self.lines[row_idx]) - 1):
return []
return self.lines[row_idx][col_idx + 1]
def __get_upper(
self, row_idx: int, col_idx: int
) -> List[str]:
if row_idx == 0:
return []
line = self.lines[row_idx - 1]
start = max(0, col_idx - 1)
end = min(len(line), col_idx + 2)
return list(line[start: end])
def __get_lower(
self, row_idx: int, col_idx: int
) -> List[str]:
if row_idx == (self.n_rows - 1):
return []
line = self.lines[row_idx + 1]
start = max(0, col_idx - 1)
end = min(len(line), col_idx + 2)
return list(line[start: end])
def set_mapper(self) -> None:
funcs = [
self.__get_left,
self.__get_right,
self.__get_upper,
self.__get_lower
]
for row_idx in range(self.n_rows):
for col_idx in range(len(self.lines[row_idx])):
items = []
for func in funcs:
items.extend(func(row_idx, col_idx))
items = list(
filter(lambda x: x != self.blank, items)
)
char = self.lines[row_idx][col_idx]
self._mapper[char] = items.copy()
def get_char(self, char: str) -> str:
if char not in self._mapper:
return char
return random.choice(self._mapper[char])
def execute(self, line: str) -> str:
length = len(line)
idx = random.randint(0, length - 1)
return line[:idx] + self.get_char(line[idx]) + line[idx + 1:]
class CharsNormalizer(IProcess):
def __init__(self, mapper: dict) -> None:
super().__init__()
self.mapper = mapper
def _normalize(self, line: str) -> str:
for key, value in self.mapper.items():
line = line.replace(key, value)
return line
def execute(self, lines: List[str]):
return list(filter(self._normalize, lines))
class SentencePermutation(IProcess):
def __init__(self, sentences: List[str], augmentation_probability: float = 1) -> None:
super().__init__()
self.sentences = sentences
self.augmentation_probability = augmentation_probability
def _combine(self, text: str) -> str:
if random.random() < self.augmentation_probability:
sentences_to_sample = random.randint(0,10)
augmentation_sentences = random.sample(self.sentences, sentences_to_sample)
return text + " " + " ".join(augmentation_sentences)
else:
return text
def execute(self, line: str) -> str:
# return [self._combine(line) for line in lines]
return self._combine(line)