Spaces:
Starting
on
T4
Starting
on
T4
from io import StringIO | |
from typing import List | |
from unstructured.partition.pptx import partition_pptx | |
from unstructured.cleaners.core import clean_trailing_punctuation, clean_bullets | |
from ordered_multimap import OrderedMultiIndexMapWeakRef | |
WRONG_NOUNS = {"BY", "IN", "ON", "INTO", "A", "AT", "WITH", "FROM", "TO", "AND", "OR", "BUT", "OWN", 'BEHIND', 'THROUGH', 'FIERCE', "'S", '&'} | |
NON_ENDING_PUNCT = {',', ':', ';', "'", '/', '-'} | |
def process_chunk(chunk, nlp): | |
marked = [] | |
for i in range(len(chunk[1])): | |
current = chunk[1][i] | |
if (type(current) is list) and current[1].isupper() and (current[0] == ('Title' or 'UncategorizedText')): | |
tokens = nlp(current[1]) | |
try: | |
next_ = chunk[1][i+1] | |
if type(next_) is not list: | |
continue | |
except IndexError: | |
continue | |
if next_[1].isupper() and (next_[0] == ('Title' or 'UncategorizedText')): | |
if (tokens[-1].pos_ in {'SYM', "ADP", 'ADV', 'PART', 'PRON', 'DET', "AUX", 'SCONJ', 'CONJ', "CCONJ"}) or ((tokens[-1].pos_ in {'PROPN', 'NOUN', 'VERB'}) and (str(tokens[-1]) in WRONG_NOUNS)): | |
chunk[1][i+1][1] = current[1] + ' ' + next_[1] | |
marked.append(i) | |
for i in marked: | |
del chunk[1][i] | |
return chunk | |
def ppt_chunk(file_like, nlp): | |
import time | |
s = time.time() | |
elements = partition_pptx(file=file_like) | |
e = time.time() | |
f = e - s | |
print(f'TIME {f}') | |
chunks = [] | |
current_chunk = [] | |
list_items = set() | |
marked = set() | |
for i, elem in enumerate(elements): | |
elem.text = clean_bullets(elem.text) | |
if elem.category == "PageBreak": | |
if current_chunk or list_items: | |
if current_chunk: | |
current_chunk = [elem for elem in current_chunk if elem[1] not in marked] | |
if list_items: | |
duplicate = marked.intersection(list_items) | |
if duplicate: | |
list_items = list_items - duplicate | |
current_chunk.append("\n".join(list_items)) | |
list_items = set() | |
chunks.append([elem.id, current_chunk]) | |
current_chunk = [] | |
else: | |
if (elem.text[-1] in NON_ENDING_PUNCT) and (elem.category != 'Table'): | |
try: | |
next_ = elements[i+1] | |
except IndexError: | |
pass | |
elements[i+1].text = elem.text + ' ' + next_.text | |
marked.add(elem.text) | |
if (elem.category == "ListItem") or (elem.category == 'NarrativeText'): | |
list_items.add(clean_trailing_punctuation(elem.text)) | |
else: | |
current_chunk.append([elem.category, elem.text]) | |
sr = time.time() | |
for chunk in chunks: | |
chunk = process_chunk(chunk, nlp) | |
tables = [] | |
j = 0 | |
while j < len(chunks): | |
new_sub_chunks = [] | |
only_tables = True | |
title = '' | |
for i, sub_chunk in enumerate(chunks[j][1]): | |
if (i == 0) and ((sub_chunk[0] == 'Title') or (sub_chunk[0] == 'UncategorizedText')): | |
title = sub_chunk[1] | |
if sub_chunk[0] == 'Table': | |
if title != '': | |
tables.append([chunks[j][0], title, sub_chunk]) | |
else: | |
tables.append([chunks[j][0], sub_chunk]) | |
else: | |
new_sub_chunks.append(sub_chunk) | |
only_tables = False | |
if only_tables: | |
del chunks[j] | |
else: | |
chunks[j] = [chunks[j][0], new_sub_chunks] | |
j += 1 | |
er = time.time() | |
fr = er - s | |
print(f'TIME INTERMEDIATE {fr}') | |
weakDict = OrderedMultiIndexMapWeakRef() | |
metadata_main_title = '' | |
for chunk in chunks: | |
nb_titles = 0 | |
nb_sub_titles = 0 | |
metadata_sub_title = '' | |
condition_met = False | |
for i, sub_chunk in enumerate(chunk[1]): | |
if type(sub_chunk) is list: | |
if (sub_chunk[0] == 'Title') and sub_chunk[1].isupper(): | |
if (i == 0) and (metadata_main_title != sub_chunk[1]): | |
metadata_main_title = sub_chunk[1] | |
nb_titles += 1 | |
elif (sub_chunk[0] == 'UncategorizedText') and sub_chunk[1].isupper(): | |
if ((i == 1) or (i == 0)) and (metadata_sub_title != sub_chunk[1]): | |
metadata_sub_title = sub_chunk[1] | |
nb_sub_titles += 1 | |
else: | |
if (nb_titles <= 1) and (nb_sub_titles <= 1): | |
weakDict.insert( | |
chunk[0], | |
sub_chunk, | |
clean_trailing_punctuation(metadata_main_title), | |
clean_trailing_punctuation(metadata_sub_title) | |
) | |
condition_met = True | |
break | |
if not condition_met: | |
cleaned_titles_chunk = "\n".join([c[1].lower() for c in chunk[1] if type(c) is list]) | |
weakDict.insert(chunk[0], cleaned_titles_chunk, metadata_main_title, metadata_sub_title) | |
et = time.time() | |
ft = et - s | |
print(f'TIME FINAL {ft}') | |
return weakDict, tables | |
raise NotImplementedError( | |
"file type not supported yet(pptx)") |