Spaces:
Sleeping
Sleeping
File size: 6,203 Bytes
613e5d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
"""
Download, preprocess and serve the TinyStories dataset as a DataLoader.
"""
import argparse
import glob
import json
import os
import random
from typing import List
from concurrent.futures import ThreadPoolExecutor, as_completed
import numpy as np
import requests
import torch
import torch.distributed as dist
from tqdm import tqdm
from tokenizer import Tokenizer
DATA_CACHE_DIR = "data"
def download_file(url: str, fname: str, chunk_size=1024):
"""Helper function to download a file from a given url"""
resp = requests.get(url, stream=True)
total = int(resp.headers.get("content-length", 0))
with open(fname, "wb") as file, tqdm(
desc=fname,
total=total,
unit="iB",
unit_scale=True,
unit_divisor=1024,
) as bar:
for data in resp.iter_content(chunk_size=chunk_size):
size = file.write(data)
bar.update(size)
def download():
"""Downloads the dataset to disk."""
os.makedirs(DATA_CACHE_DIR, exist_ok=True)
# download the TinyStories dataset, unless it's already downloaded
data_url = "https://huggingface.co./datasets/roneneldan/TinyStories/resolve/main/TinyStories_all_data.tar.gz"
data_filename = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data.tar.gz")
if not os.path.exists(data_filename):
print(f"Downloading {data_url} to {data_filename}...")
download_file(data_url, data_filename)
else:
print(f"{data_filename} already exists, skipping download...")
# unpack the tar.gz file into all the data shards (json files)
data_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data")
if not os.path.exists(data_dir):
os.makedirs(data_dir, exist_ok=True)
print(f"Unpacking {data_filename}...")
os.system(f"tar -xzf {data_filename} -C {data_dir}")
else:
print(f"{data_dir} already exists, skipping unpacking...")
# print a single example just for debugging and such
shard_filenames = sorted(glob.glob(os.path.join(data_dir, "*.json")))
with open(shard_filenames[0], "r") as f:
data = json.load(f)
print("Download done.")
print(f"Number of shards: {len(shard_filenames)}")
print(f"Example story:\n{data[0]}")
def pretokenize():
enc = Tokenizer()
def process_shard(shard):
with open(shard, "r") as f:
data = json.load(f)
all_tokens = []
for example in tqdm(data):
text = example["story"]
text = text.strip() # get rid of leading/trailing whitespace
tokens = enc.encode(text, bos=True, eos=False) # encode the text, use BOS
all_tokens.extend(tokens)
# convert to uint16 nparray
all_tokens = np.array(all_tokens, dtype=np.uint16)
# write to disk
tokenized_filename = shard.replace(".json", ".bin")
with open(tokenized_filename, "wb") as f:
f.write(all_tokens.tobytes())
print(f"Saved {tokenized_filename}")
# iterate the shards and tokenize all of them one by one
data_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data")
shard_filenames = sorted(glob.glob(os.path.join(data_dir, "*.json")))
# process all the shards in a threadpool
with ThreadPoolExecutor(max_workers=8) as executor:
executor.map(process_shard, shard_filenames)
print("Done.")
class PretokDataset(torch.utils.data.IterableDataset):
"""Loads pretokenized examples from disk and yields them as PyTorch tensors."""
def __init__(self, split, max_seq_len):
super().__init__()
self.split = split
self.max_seq_len = max_seq_len
def __iter__(self):
# get worker info within a DataLoader
worker_info = torch.utils.data.get_worker_info()
worker_id = worker_info.id if worker_info else 0
# get DDP rank info
rank = dist.get_rank() if dist.is_initialized() else 0
# combine the worker_id and worker_rank to create a unique seed for rng
seed = 42 + worker_id + 1337 * rank
rng = random.Random(seed)
print(f"Created a PretokDataset with rng seed {seed}")
data_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data")
shard_filenames = sorted(glob.glob(os.path.join(data_dir, "*.bin")))
# train/test split. let's use only shard 0 for test split, rest train
shard_filenames = shard_filenames[1:] if self.split == "train" else shard_filenames[:1]
while True:
rng.shuffle(shard_filenames)
for shard in shard_filenames:
# open the dataset for reading but keep it on disk with memmap
m = np.memmap(shard, dtype=np.uint16, mode="r")
num_batches = len(m) // self.max_seq_len
num_batches -= 1 # drop the last partial batch
assert num_batches > 0, "this shard is way too small? investigate."
ixs = list(range(num_batches))
rng.shuffle(ixs)
for ix in ixs:
start = ix * self.max_seq_len
end = start + self.max_seq_len + 1
# calling .astype will copy the data into a new numpy array, now in RAM
chunk = torch.from_numpy((m[start:end]).astype(np.int64))
x = chunk[:-1]
y = chunk[1:]
yield x, y
class Task:
@staticmethod
def iter_batches(split, batch_size, max_seq_len, device, num_workers=0):
ds = PretokDataset(split, max_seq_len)
dl = torch.utils.data.DataLoader(
ds, batch_size=batch_size, pin_memory=True, num_workers=num_workers
)
for x, y in dl:
x = x.to(device, non_blocking=True)
y = y.to(device, non_blocking=True)
yield x, y
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("stage", type=str, choices=["download", "train_tokenizer", "pretokenize"])
args = parser.parse_args()
# depending on the stage call the appropriate function
fun = {
"download": download,
"pretokenize": pretokenize,
}
fun[args.stage]() |