Infinity / utils /large_file_util.py
MohamedRashad's picture
Add initial project structure with requirements and utility functions
32287b3
import os
import os.path as osp
import time
import itertools
import shutil
import glob
import argparse
import tqdm
import numpy as np
import threading
def save_lines(lines, filename):
os.makedirs(osp.dirname(filename), exist_ok=True)
with open(filename, 'w') as f:
f.writelines(lines)
del lines
def get_part_jsonls(filepath, total_line_number, parts=512):
dirname, filename, ext = osp.dirname(filepath), osp.splitext(osp.basename(filepath))[0], osp.splitext(osp.basename(filepath))[1]
if parts == 1:
return False, {1: filepath}
save_dir = osp.join(dirname, f'{parts:04d}_parts')
chunk_id2save_files = {}
missing = False
chunk_size = int(total_line_number/parts)
for chunk_id in range(1, parts+1):
if chunk_id == parts:
num_of_lines = total_line_number - chunk_size * (parts-1)
else:
num_of_lines = chunk_size
chunk_id2save_files[chunk_id] = osp.join(save_dir, f'{filename}_{chunk_id:04d}_{parts:04d}_{num_of_lines:09d}{ext}')
if not osp.exists(chunk_id2save_files[chunk_id]):
missing = True
return missing, chunk_id2save_files
def split_large_txt_files(filepath, chunk_id2save_files):
thread_list = []
chunk_id = 1
with open(filepath, 'r') as f:
chunk = []
pbar = tqdm.tqdm(total=len(chunk_id2save_files))
for line in f:
chunk.append(line)
cur_chunk_size = int(osp.splitext(osp.basename(chunk_id2save_files[chunk_id]))[0].split('_')[-1])
if len(chunk) >= cur_chunk_size:
pbar.update(1)
thread_list.append(threading.Thread(target=save_lines, args=(chunk, chunk_id2save_files[chunk_id])))
thread_list[-1].start()
chunk = []
chunk_id += 1
if len(chunk):
import ipdb; ipdb.set_trace()
assert not len(chunk)
for thread in thread_list:
thread.join()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--jsonl_folder', type=str, default='')
parser.add_argument('--parts', type=int, default=600)
args = parser.parse_args()
for jsonl_filepath in sorted(glob.glob(osp.join(args.jsonl_folder, '*.jsonl'))):
print(jsonl_filepath)
t1 = time.time()
line_num = int(jsonl_filepath.split('_')[-1].split('.')[0])
missing, chunk_id2save_files = get_part_jsonls(jsonl_filepath, line_num, parts=args.parts)
split_large_txt_files(jsonl_filepath, chunk_id2save_files)
t2 = time.time()
print(f'split takes {t2-t1}s')