Spaces:
Running
on
T4
Running
on
T4
from io import StringIO | |
from typing import List | |
from unstructured.partition.pptx import partition_pptx | |
from ordered_multimap import OrderedMultiIndexMapWeakRef | |
WRONG_NOUNS = {"BY", "IN", "ON", "INTO", "A", "AT", "WITH", "FROM", "TO", "AND", "OR", "BUT", "OWN", 'BEHIND', 'THROUGH', 'FIERCE', "'S"} | |
NON_ENDING_PUNCT = {',', ':', ';', "'", '/', '-'} | |
def process_chunk(chunk, nlp): | |
marked = [] | |
for i in range(len(chunk[1])): | |
current = chunk[1][i] | |
if (type(current) is list) and current[1].isupper() and (current[0] == ('Title' or 'UncategorizedText')): | |
tokens = nlp(current[1]) | |
try: | |
next_ = chunk[1][i+1] | |
if type(next_) is not list: | |
continue | |
except IndexError: | |
continue | |
if next_[1].isupper() and (next_[0] == ('Title' or 'UncategorizedText')): | |
if (tokens[-1].pos_ in {'SYM', "ADP", 'ADV', 'PART', 'PRON', 'DET', "AUX", 'SCONJ', 'CONJ', "CCONJ"}) or ((tokens[-1].pos_ in {'PROPN', 'NOUN', 'VERB'}) and (str(tokens[-1]) in WRONG_NOUNS)): | |
chunk[1][i+1][1] = current[1] + ' ' + next_[1] | |
marked.append(i) | |
for i in marked: | |
del chunk[1][i] | |
return chunk | |
def ppt_chunk(file_like, nlp): | |
import time | |
s = time.time() | |
elements = partition_pptx(file=file_like) | |
e = time.time() | |
f = e - s | |
print(f'TIME {f}') | |
chunks = [] | |
current_chunk = [] | |
list_items = set() | |
marked = set() | |
for i, elem in enumerate(elements): | |
if elem.category == "PageBreak": | |
if current_chunk or list_items: | |
if current_chunk: | |
current_chunk = [elem for elem in current_chunk if elem[1] not in marked] | |
if list_items: | |
duplicate = marked.intersection(list_items) | |
if duplicate: | |
list_items = list_items - duplicate | |
current_chunk.append("\n".join(list_items)) | |
list_items = set() | |
chunks.append([elem.id, current_chunk]) | |
current_chunk = [] | |
else: | |
if (elem.text[-1] in NON_ENDING_PUNCT) and (elem.category != 'Table'): | |
try: | |
next_ = elements[i+1] | |
except IndexError: | |
pass | |
elements[i+1].text = elem.text + ' ' + next_.text | |
marked.add(elem.text) | |
if (elem.category == "ListItem") or (elem.category == 'NarrativeText'): | |
list_items.add(elem.text) | |
else: | |
current_chunk.append([elem.category, elem.text]) | |
sr = time.time() | |
for chunk in chunks: | |
chunk = process_chunk(chunk, nlp) | |
tables = [] | |
while i < len(chunks): | |
new_sub_chunks = [] | |
only_tables = True | |
for sub_chunk in chunk[1]: | |
print(f'TEST : {sub_chunk[0]}') | |
if (type(sub_chunk) is list) and (sub_chunk[0] == 'Table'): | |
tables.append([chunk[0], sub_chunk]) | |
else: | |
new_sub_chunks.append(sub_chunk) | |
only_tables = False | |
if only_tables: | |
del chunks[i] | |
else: | |
chunks[i] = [chunk[0], new_sub_chunks] | |
i += 1 | |
er = time.time() | |
fr = er - s | |
print(f'TIME INTERMEDIATE {fr}') | |
weakDict = OrderedMultiIndexMapWeakRef() | |
metadata_main_title = None | |
metadata_sub_title = None | |
for chunk in chunks: | |
nb_titles = 0 | |
nb_sub_titles = 0 | |
for i, sub_chunk in enumerate(chunk[1]): | |
if type(sub_chunk) is list: | |
if sub_chunk[0] == 'Title': | |
nb_titles += 1 | |
elif sub_chunk[0] == 'UncategorizedText': | |
nb_sub_titles += 1 | |
else: | |
if (nb_titles <= 1) and (nb_sub_titles <= 1): | |
try: | |
first_chunk = chunk[1][i-1] | |
if first_chunk[0] == 'UncategorizedText': | |
if metadata_sub_title != first_chunk[1]: | |
metadata_sub_title = first_chunk[1] | |
try: | |
ok = chunk[1][i-2] | |
if ok[0] == 'Title': | |
if metadata_main_title != ok[1]: | |
metadata_main_title = ok[1] | |
weakDict.insert(chunk[0], sub_chunk, metadata_main_title, metadata_sub_title) | |
break | |
except IndexError: | |
weakDict.insert(chunk[0], sub_chunk, metadata_sub_title) | |
break | |
elif first_chunk[0] == 'Title': | |
if metadata_main_title != first_chunk[1]: | |
metadata_main_title = first_chunk[1] | |
weakDict.insert(chunk[0], sub_chunk, metadata_main_title) | |
break | |
except IndexError: | |
weakDict.insert(chunk[0], sub_chunk) | |
break | |
#if i == len(chunk) - 1: | |
# weakDict.insert("\n".join()) | |
et = time.time() | |
ft = et - s | |
print(f'TIME FINAL {ft}') | |
#for test in weakDict: | |
return weakDict, tables | |
raise NotImplementedError( | |
"file type not supported yet(pptx)") |