|
import re |
|
import uuid |
|
import pandas as pd |
|
from io import StringIO |
|
from typing import List |
|
|
|
from unstructured.partition.pptx import partition_pptx |
|
from unstructured.cleaners.core import clean_trailing_punctuation, clean_bullets, clean |
|
|
|
from ordered_multimap import OrderedMultiIndexMapWeakRef |
|
|
|
WRONG_NOUNS = {"BY", "IN", "ON", "INTO", "A", "AT", "WITH", "FROM", "TO", "AND", "OR", "BUT", "OWN", 'BEHIND', 'THROUGH', 'FIERCE', "'S", '&'} |
|
NON_ENDING_PUNCT = {',', ':', ';', "'", '/', '-'} |
|
|
|
def process_chunk(chunk, nlp): |
|
marked = [] |
|
|
|
for i in range(len(chunk[1])): |
|
current = chunk[1][i] |
|
|
|
if (type(current) is list) and current[1].isupper() and (current[0] == ('Title' or 'UncategorizedText')): |
|
tokens = nlp(current[1]) |
|
|
|
try: |
|
next_ = chunk[1][i+1] |
|
|
|
if type(next_) is not list: |
|
continue |
|
|
|
except IndexError: |
|
continue |
|
|
|
if next_[1].isupper() and (next_[0] == ('Title' or 'UncategorizedText')): |
|
if (tokens[-1].pos_ in {'SYM', "ADP", 'ADV', 'PART', 'PRON', 'DET', "AUX", 'SCONJ', 'CONJ', "CCONJ"}) or ((tokens[-1].pos_ in {'PROPN', 'NOUN', 'VERB'}) and (str(tokens[-1]) in WRONG_NOUNS)): |
|
chunk[1][i+1][1] = current[1] + ' ' + next_[1] |
|
marked.append(i) |
|
|
|
for i in marked: |
|
del chunk[1][i] |
|
|
|
return chunk |
|
|
|
def ppt_chunk(file_like, nlp): |
|
import time |
|
|
|
s = time.time() |
|
elements = partition_pptx(file=file_like) |
|
|
|
for elem in elements: |
|
print(f'TYPE : {elem.category} TEXT: {elem.text}') |
|
|
|
e = time.time() |
|
f = e - s |
|
print(f'TIME {f}') |
|
|
|
chunks = [] |
|
current_chunk = [] |
|
list_items = set() |
|
marked = set() |
|
|
|
for i, elem in enumerate(elements): |
|
elem.text = clean_bullets(elem.text) |
|
|
|
if elem.category == "PageBreak": |
|
if current_chunk or list_items: |
|
if current_chunk: |
|
current_chunk = [elem for elem in current_chunk if elem[1] not in marked] |
|
|
|
if list_items: |
|
duplicate = marked.intersection(list_items) |
|
if duplicate: |
|
list_items = list_items - duplicate |
|
|
|
current_chunk.append("\n".join(list_items)) |
|
list_items = set() |
|
|
|
chunks.append([elem.id, current_chunk]) |
|
current_chunk = [] |
|
else: |
|
if (elem.text[-1] in NON_ENDING_PUNCT) and (elem.category != 'Table'): |
|
try: |
|
next_ = elements[i+1] |
|
except IndexError: |
|
pass |
|
elements[i+1].text = elem.text + ' ' + next_.text |
|
marked.add(elem.text) |
|
|
|
if (elem.category == "ListItem") or (elem.category == 'NarrativeText'): |
|
list_items.add(clean_trailing_punctuation(elem.text)) |
|
else: |
|
current_chunk.append([elem.category, elem.text]) |
|
|
|
sr = time.time() |
|
|
|
for chunk in chunks: |
|
chunk = process_chunk(chunk, nlp) |
|
|
|
tables = [] |
|
j = 0 |
|
|
|
while j < len(chunks): |
|
new_sub_chunks = [] |
|
only_tables = True |
|
title = '' |
|
|
|
for i, sub_chunk in enumerate(chunks[j][1]): |
|
if (i == 0) and ((sub_chunk[0] == 'Title') or (sub_chunk[0] == 'UncategorizedText')): |
|
title = sub_chunk[1] |
|
|
|
if sub_chunk[0] == 'Table': |
|
if title != '': |
|
tables.append([chunks[j][0], title, sub_chunk]) |
|
else: |
|
tables.append([chunks[j][0], sub_chunk]) |
|
else: |
|
new_sub_chunks.append(sub_chunk) |
|
only_tables = False |
|
|
|
if only_tables: |
|
del chunks[j] |
|
else: |
|
chunks[j] = [chunks[j][0], new_sub_chunks] |
|
j += 1 |
|
|
|
er = time.time() |
|
fr = er - s |
|
print(f'TIME INTERMEDIATE {fr}') |
|
|
|
weakDict = OrderedMultiIndexMapWeakRef() |
|
metadata_main_title = '' |
|
|
|
for chunk in chunks: |
|
nb_titles = 0 |
|
nb_sub_titles = 0 |
|
metadata_sub_title = '' |
|
condition_met = False |
|
|
|
for i, sub_chunk in enumerate(chunk[1]): |
|
if type(sub_chunk) is list: |
|
if (sub_chunk[0] == 'Title') and sub_chunk[1].isupper(): |
|
if (i == 0) and (metadata_main_title != sub_chunk[1]): |
|
metadata_main_title = sub_chunk[1] |
|
nb_titles += 1 |
|
elif (sub_chunk[0] == 'UncategorizedText') and sub_chunk[1].isupper(): |
|
if ((i == 1) or (i == 0)) and (metadata_sub_title != sub_chunk[1]): |
|
metadata_sub_title = sub_chunk[1] |
|
nb_sub_titles += 1 |
|
else: |
|
if (nb_titles <= 1) and (nb_sub_titles <= 1): |
|
weakDict.insert( |
|
chunk[0], |
|
sub_chunk, |
|
clean_trailing_punctuation(metadata_main_title), |
|
clean_trailing_punctuation(metadata_sub_title) |
|
) |
|
condition_met = True |
|
break |
|
|
|
if not condition_met: |
|
cleaned_titles_chunk = "\n".join([c[1].lower() for c in chunk[1] if type(c) is list]) |
|
weakDict.insert(chunk[0], cleaned_titles_chunk, metadata_main_title, metadata_sub_title) |
|
print(metadata_main_title) |
|
print(metadata_sub_title) |
|
|
|
return weakDict, tables |
|
|
|
raise NotImplementedError( |
|
"file type not supported yet(pptx)") |
|
|
|
def build_prompt_conv(): |
|
return [ |
|
{ |
|
'role': 'system', |
|
'content': """Assume the role of an innovator who thrives on creativity and resourcefulness. Your responses should encourage new approaches and challenge conventional thinking. |
|
|
|
Behavior: Focus on brainstorming and ideation, offering unconventional solutions to problems. |
|
|
|
Mannerisms: Use energetic, enthusiastic language that reflects your innovative spirit. Frequently propose ideas that are bold and forward-looking.""" |
|
}, |
|
{ |
|
'role': 'user', |
|
'content': f"""Generate a short, single-sentence summary of the user's intent or topic based on their question, capturing the main focus of what they want to discuss. |
|
|
|
Question : {st.session_state.user_input} |
|
""" |
|
} |
|
] |
|
|
|
def find_next_word_after_spaces(input_string): |
|
match = re.search(r'\s{2,}(\S+)', input_string) |
|
|
|
if match: |
|
return match.group(1) |
|
return None |
|
|
|
|
|
def ppt_chunker(file_like, llm): |
|
import time |
|
|
|
s = time.time() |
|
elements = partition_pptx(file=file_like) |
|
|
|
ids = [] |
|
chunks = [] |
|
current_chunk = '' |
|
|
|
for elem in elements: |
|
if elem.category == 'PageBreak': |
|
ids.append(int(uuid.UUID(elem.id))) |
|
chunks.append(current_chunk) |
|
current_chunk = '' |
|
continue |
|
|
|
if elem.category == 'Table': |
|
if current_chunk == '': |
|
lines = elem.text.split('\n') |
|
result = [find_next_word_after_spaces(line) for line in lines] |
|
print(f'TAB : {pd.DataFrame(result)}') |
|
current_chunk = elem.text |
|
else: |
|
lines = elem.text.split('\n') |
|
result = [find_next_word_after_spaces(line) for line in lines] |
|
print(f'TAB : {pd.DataFrame(result)}') |
|
current_chunk += '\n' + elem.text |
|
continue |
|
|
|
if current_chunk == '': |
|
current_chunk = clean(elem.text, extra_whitespace=True, dashes=True, bullets=True, lowercase=True, trailing_punctuation=True) |
|
else: |
|
current_chunk += '\n' + clean(elem.text, extra_whitespace=True, dashes=True, bullets=True, lowercase=True, trailing_punctuation=True) |
|
|
|
for chunk in chunks: |
|
print(f' TEXT : {chunk}') |
|
return chunks, ids |