File size: 11,892 Bytes
c6e7238 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
import argparse
import os
from pathlib import Path
import ftfy
import tensorflow as tf
from lm_dataformat import Reader
from tokenizers import Tokenizer
from transformers import GPT2TokenizerFast
from tqdm import tqdm
import logging
from multiprocessing import Pool, cpu_count
from itertools import repeat
import re
logging.getLogger("transformers").setLevel(logging.ERROR)
parser = argparse.ArgumentParser()
parser.add_argument("--input_dir", type=str, help="Path to where your files are located. Files ending in .zst are "
"treated as archives, all others as raw text.")
parser.add_argument("--files_per", type=int, default=100000, help="Text files per tfrecord")
parser.add_argument("--name", type=str, default="openwebtext",
help="Name of output files will be name_i.tfrecords where i is the number of the file")
parser.add_argument("--output_dir", type=str, default="./tfrecords", help="Where to put tfrecords")
parser.add_argument("--encoder_path", type=str,
help="Path to encoder files, or leave unspecified to use GPT2 tokenizer")
parser.add_argument("--minimum_size", type=int, default=100, help="Minimum size a document has to be to be included")
parser.add_argument("--ftfy", action="store_false", help="normalize with ftfy")
parser.add_argument("--wikitext-detokenize", action="store_false", help="use wikitext detokenizer")
parser.add_argument("--separator", nargs="+", type=int, default=[50256],
help="separator to place between files in chunk mode")
parser.add_argument("--chunk_size", type=int, default=2048, help="How big a chunk should be in chunk mode. "
"Should equal your model's context size")
parser.add_argument("--write_dataset_config", action="store_true", help="Write the dataset config file on completion")
parser.add_argument("--processes", type=int, default=0, help="Number of processes to use. Defaults to cpu count.")
args = parser.parse_args()
if not args.output_dir.endswith("/"):
args.output_dir = args.output_dir + "/"
if not args.input_dir.endswith("/"):
args.input_dir = args.input_dir + "/"
assert len(args.separator) == 1
def wikitext_detokenizer(string):
# contractions
string = string.replace("s '", "s'")
string = re.sub(r"/' [0-9]/", r"/'[0-9]/", string)
# number separators
string = string.replace(" @-@ ", "-")
string = string.replace(" @,@ ", ",")
string = string.replace(" @.@ ", ".")
# punctuation
string = string.replace(" : ", ": ")
string = string.replace(" ; ", "; ")
string = string.replace(" . ", ". ")
string = string.replace(" ! ", "! ")
string = string.replace(" ? ", "? ")
string = string.replace(" , ", ", ")
# double brackets
string = re.sub(r"\(\s*([^\)]*?)\s*\)", r"(\1)", string)
string = re.sub(r"\[\s*([^\]]*?)\s*\]", r"[\1]", string)
string = re.sub(r"{\s*([^}]*?)\s*}", r"{\1}", string)
string = re.sub(r"\"\s*([^\"]*?)\s*\"", r'"\1"', string)
string = re.sub(r"'\s*([^']*?)\s*'", r"'\1'", string)
# miscellaneous
string = string.replace("= = = =", "====")
string = string.replace("= = =", "===")
string = string.replace("= =", "==")
string = string.replace(" " + chr(176) + " ", chr(176))
string = string.replace(" \n", "\n")
string = string.replace("\n ", "\n")
string = string.replace(" N ", " 1 ")
string = string.replace(" 's", "'s")
return string
def _int64_feature(value):
"""
Returns an int64_list from a bool / enum / int / uint.
"""
return tf.train.Feature(int64_list=tf.train.Int64List(value=value))
def write_to_file(writer, data):
"""
writes data to tfrecord file
"""
feature = {
"text": _int64_feature(data)
}
tf_example = tf.train.Example(features=tf.train.Features(feature=feature))
writer.write(tf_example.SerializeToString())
def get_tokenizer(args):
if args.encoder_path is None:
return GPT2TokenizerFast.from_pretrained('gpt2')
else:
return Tokenizer.from_file(args.encoder_path)
def split_list(l, n):
# splits list/string into n size chunks
return [l[i:i + n] for i in range(0, len(l), n)]
def archive_to_tokens(f, encoder, args):
# Generator that yields the contents of the files in an archive
# if data_to_prepend is not None, prepend data_to_prepend + a EOS separator to the encoded data
reader = Reader(f)
for doc in reader.stream_data(threaded=False):
if args.ftfy: # fix text with ftfy if specified
doc = ftfy.fix_text(doc, normalization='NFKC')
if args.wikitext_detokenize:
doc = wikitext_detokenizer(doc)
doc = encoder.encode(doc) + args.separator # read document from lmd and append separator token
yield split_list(doc, args.chunk_size) # split into n_ctx + 1 size chunks
def write_files(files, files_per, output_dir, out_name, start_no, write_remainder=False, process_no=None):
# writes a list of files to .tfrecords
if files == None:
return
chunks = split_list(files, files_per)
if len(chunks[-1]) != files_per and not write_remainder: # pop the last file if it's length != files per
remainder = chunks.pop(-1)
else:
remainder = None # assuming files = remainder from an old chunk here
files_per = len(chunks[-1])
for files in chunks:
fp = f"{output_dir}/{out_name}_{start_no}"
if process_no is not None:
fp += f"_{process_no}"
fp += f"_{files_per}" # add number of files in tfrecord to end of fp
fp += ".tfrecords"
with tf.io.TFRecordWriter(fp) as writer:
for f in files:
write_to_file(writer, f)
start_no += 1
return start_no, remainder
def get_files(input_dir, filetypes=None):
# gets all files of <filetypes> in input_dir
if filetypes == None:
filetypes = ["jsonl.zst", ".txt", ".xz", ".tar.gz"]
files = [list(Path(input_dir).glob(f"*{ft}")) for ft in filetypes]
return [str(item) for sublist in files for item in sublist] # flatten list of list -> list and stringify Paths
def read_checkpoint(checkpoint_path, resume_from_checkpoint=True):
# init checkpointing
if resume_from_checkpoint and os.path.isfile(checkpoint_path):
try:
resume_files_processed, tfrecord_count = [int(i) for i in open(checkpoint_path, "r").read().split(", ")]
print(f"\nResuming from tfrecord no. {tfrecord_count} / file no. {resume_files_processed}")
return resume_files_processed, tfrecord_count
except:
pass
return 0, 0
def create_tfrecords(params, write_remainder=True, write_every_n_files=1, save_checkpoints=False,
resume_from_checkpoint=False, display_pbar=False):
# iterates through files in input_dir, splitting into <args.chunk_size> chunks and saving a tfrecords file every <args.files_per> chunks.
files, args, process_no = params
enc = get_tokenizer(args) # get tokenizer
# init metadata
discarded_files = 0
files_processed = 0
pbar = tqdm(desc=f"Writing TFRecord Files to {args.output_dir}. Parsed 0 input files. files_written ",
disable=not display_pbar)
checkpoint_path = f"{args.output_dir}/checkpoint.txt"
resume_files_processed, tfrecord_count = read_checkpoint(checkpoint_path, resume_from_checkpoint)
data_to_prepend = []
tokenized_files_array = []
for f in files:
for tokenized_files in archive_to_tokens(f, enc, args):
files_processed += 1
if files_processed < resume_files_processed:
continue # resume from checkpoint
# if the last chunk < chunk size, but > minimum_size, take it and append it to the beginning of the next file
n_tokens = len(tokenized_files[-1])
if n_tokens < args.chunk_size:
data = tokenized_files.pop(-1)
if n_tokens >= args.minimum_size:
data_to_prepend.extend(data)
else:
discarded_files += 1
if len(data_to_prepend) >= args.chunk_size:
# if length of data_to_prepend becomes greater than chunk size, add concatted files to tokenized files
tokenized_files_array.append(data_to_prepend[:args.chunk_size])
data_to_prepend = data_to_prepend[args.chunk_size:]
# add tokenized files > chunk size to main array
tokenized_files_array.extend(tokenized_files)
if len(tokenized_files_array) >= args.files_per * write_every_n_files: # write every n files
_tfrecord_count, remainder = write_files(tokenized_files_array, files_per=args.files_per,
output_dir=args.output_dir, out_name=args.name,
start_no=tfrecord_count, process_no=process_no)
pbar.update(_tfrecord_count - tfrecord_count) # update progress bar
pbar.set_description(
f"Writing TFRecord Files to {args.output_dir}. Parsed {files_processed} input files. files_written ")
tfrecord_count = _tfrecord_count
tokenized_files_array = remainder if remainder is not None else [] # add remaining files to next chunk
with open(checkpoint_path, "w") as checkpoint_file:
checkpoint_file.write(f"{files_processed}, {tfrecord_count}")
if len(tokenized_files_array) >= args.files_per: # also write at end
_tfrecord_count, remainder = write_files(tokenized_files_array, files_per=args.files_per,
output_dir=args.output_dir, out_name=args.name,
start_no=tfrecord_count, process_no=process_no)
pbar.update(_tfrecord_count - tfrecord_count)
pbar.set_description(
f"Writing TFRecord Files to {args.output_dir}. Parsed {files_processed} input files. files_written ")
tfrecord_count = _tfrecord_count
with open(checkpoint_path, "w") as checkpoint_file:
checkpoint_file.write(f"{files_processed}, {tfrecord_count}")
else:
remainder = tokenized_files_array # add remaining to remainder
if write_remainder:
# write out the remaining files even if there's less than files_per
write_files(remainder, files_per=args.files_per, output_dir=args.output_dir, out_name=args.name,
start_no=tfrecord_count, write_remainder=True)
successful_files = files_processed - discarded_files
return {"discarded": discarded_files, "processed": files_processed, "successful": successful_files}
def create_tfrecords_mp(files, args):
files = split_list(files, len(files) // args.processes)
with Pool(processes=args.processes) as pool:
pbar = tqdm(pool.imap(create_tfrecords, zip(files, repeat(args), range(len(files)))))
meta = {"discarded": 0, "processed": 0, "successful": 0}
for results in pbar:
pbar.update()
for k, v in results.items():
meta[k] += v # update metadata
return meta
if __name__ == "__main__":
os.makedirs(args.output_dir, exist_ok=True) # make output dir if it doesn't exist
files = get_files(args.input_dir)
args.chunk_size += 1 # we shift the data by 1 to the right for targets, so increment the chunk size here
if args.processes == 0:
args.processes = cpu_count()
if args.processes > 1:
results = create_tfrecords_mp(files, args)
else:
results = create_tfrecords((files, args, 0), display_pbar=True)
print(results)
|