|
from io import StringIO |
|
from typing import List |
|
|
|
from unstructured.partition.pptx import partition_pptx |
|
|
|
from ordered_multimap import OrderedMultiIndexMapWeakRef |
|
|
|
WRONG_NOUNS = {"BY", "IN", "ON", "INTO", "A", "AT", "WITH", "FROM", "TO", "AND", "OR", "BUT", "OWN", 'BEHIND', 'THROUGH', 'FIERCE', "'S", '&'} |
|
NON_ENDING_PUNCT = {',', ':', ';', "'", '/', '-'} |
|
|
|
def process_chunk(chunk, nlp): |
|
marked = [] |
|
|
|
for i in range(len(chunk[1])): |
|
current = chunk[1][i] |
|
|
|
if (type(current) is list) and current[1].isupper() and (current[0] == ('Title' or 'UncategorizedText')): |
|
tokens = nlp(current[1]) |
|
|
|
try: |
|
next_ = chunk[1][i+1] |
|
|
|
if type(next_) is not list: |
|
continue |
|
|
|
except IndexError: |
|
continue |
|
|
|
if next_[1].isupper() and (next_[0] == ('Title' or 'UncategorizedText')): |
|
if (tokens[-1].pos_ in {'SYM', "ADP", 'ADV', 'PART', 'PRON', 'DET', "AUX", 'SCONJ', 'CONJ', "CCONJ"}) or ((tokens[-1].pos_ in {'PROPN', 'NOUN', 'VERB'}) and (str(tokens[-1]) in WRONG_NOUNS)): |
|
chunk[1][i+1][1] = current[1] + ' ' + next_[1] |
|
marked.append(i) |
|
|
|
for i in marked: |
|
del chunk[1][i] |
|
|
|
return chunk |
|
|
|
def ppt_chunk(file_like, nlp): |
|
import time |
|
|
|
s = time.time() |
|
elements = partition_pptx(file=file_like) |
|
|
|
e = time.time() |
|
f = e - s |
|
print(f'TIME {f}') |
|
|
|
chunks = [] |
|
current_chunk = [] |
|
list_items = set() |
|
marked = set() |
|
|
|
for i, elem in enumerate(elements): |
|
if elem.category == "PageBreak": |
|
if current_chunk or list_items: |
|
if current_chunk: |
|
current_chunk = [elem for elem in current_chunk if elem[1] not in marked] |
|
|
|
if list_items: |
|
duplicate = marked.intersection(list_items) |
|
if duplicate: |
|
list_items = list_items - duplicate |
|
|
|
current_chunk.append("\n".join(list_items)) |
|
list_items = set() |
|
|
|
chunks.append([elem.id, current_chunk]) |
|
current_chunk = [] |
|
else: |
|
if (elem.text[-1] in NON_ENDING_PUNCT) and (elem.category != 'Table'): |
|
try: |
|
next_ = elements[i+1] |
|
except IndexError: |
|
pass |
|
elements[i+1].text = elem.text + ' ' + next_.text |
|
marked.add(elem.text) |
|
|
|
if (elem.category == "ListItem") or (elem.category == 'NarrativeText'): |
|
list_items.add(elem.text) |
|
else: |
|
current_chunk.append([elem.category, elem.text]) |
|
|
|
sr = time.time() |
|
|
|
for chunk in chunks: |
|
chunk = process_chunk(chunk, nlp) |
|
|
|
tables = [] |
|
j = 0 |
|
|
|
while j < len(chunks): |
|
new_sub_chunks = [] |
|
only_tables = True |
|
title = '' |
|
|
|
for i, sub_chunk in enumerate(chunks[j][1]): |
|
print(f'TEST : {sub_chunk}') |
|
if (i == 0) and ((sub_chunk[0] == 'Title') or (sub_chunk[0] == 'UncategorizedText')): |
|
title = sub_chunk[1] |
|
|
|
if sub_chunk[0] == 'Table': |
|
if title != '': |
|
tables.append([chunks[j][0], title, sub_chunk]) |
|
else: |
|
tables.append([chunks[j][0], sub_chunk]) |
|
else: |
|
new_sub_chunks.append(sub_chunk) |
|
only_tables = False |
|
|
|
if only_tables: |
|
del chunks[j] |
|
else: |
|
chunks[j] = [chunks[j][0], new_sub_chunks] |
|
j += 1 |
|
|
|
er = time.time() |
|
fr = er - s |
|
print(f'TIME INTERMEDIATE {fr}') |
|
|
|
weakDict = OrderedMultiIndexMapWeakRef() |
|
metadata_main_title = None |
|
metadata_sub_title = None |
|
|
|
for chunk in chunks: |
|
nb_titles = 0 |
|
nb_sub_titles = 0 |
|
|
|
for i, sub_chunk in enumerate(chunk[1]): |
|
if type(sub_chunk) is list: |
|
if sub_chunk[0] == 'Title': |
|
nb_titles += 1 |
|
elif sub_chunk[0] == 'UncategorizedText': |
|
nb_sub_titles += 1 |
|
else: |
|
if (nb_titles <= 1) and (nb_sub_titles <= 1): |
|
try: |
|
first_chunk = chunk[1][i-1] |
|
|
|
if first_chunk[0] == 'UncategorizedText': |
|
if metadata_sub_title != first_chunk[1]: |
|
metadata_sub_title = first_chunk[1] |
|
|
|
try: |
|
ok = chunk[1][i-2] |
|
|
|
if ok[0] == 'Title': |
|
if metadata_main_title != ok[1]: |
|
metadata_main_title = ok[1] |
|
weakDict.insert(chunk[0], sub_chunk, metadata_main_title, metadata_sub_title) |
|
break |
|
except IndexError: |
|
weakDict.insert(chunk[0], sub_chunk, metadata_sub_title) |
|
break |
|
elif first_chunk[0] == 'Title': |
|
if metadata_main_title != first_chunk[1]: |
|
metadata_main_title = first_chunk[1] |
|
weakDict.insert(chunk[0], sub_chunk, metadata_main_title) |
|
break |
|
except IndexError: |
|
weakDict.insert(chunk[0], sub_chunk) |
|
break |
|
|
|
|
|
if i == len(chunk) - 1: |
|
weakDict.insert(chunk[0], "\n".join([c[1] for c in chunk[1]])) |
|
|
|
|
|
|
|
et = time.time() |
|
ft = et - s |
|
print(f'TIME FINAL {ft}') |
|
|
|
|
|
|
|
|
|
return weakDict, tables |
|
|
|
raise NotImplementedError( |
|
"file type not supported yet(pptx)") |