quantaji's picture
add post-process file
21a2300
raw
history blame
8.02 kB
import os
import hashlib
from subprocess import call
from shiny import App, reactive, render, ui
from core.read_pdf import process_pdf, temp_dir
from core.chatgpt.utils import generate_latex_slide
last_pdf_md5_preprocess_stage = None
def compute_hash(file_pth):
with open(file_pth, 'rb') as file_to_check:
# read contents of the file
data = file_to_check.read()
# pipe contents of the file through
md5_returned = hashlib.md5(data).hexdigest()
return md5_returned
def ui_card(title, *args):
return (ui.div(
{"class": "card mb-4"},
ui.div(title, class_="card-header"),
ui.div({"class": "card-body"}, *args),
), )
app_ui = ui.page_fluid(
ui.h1("Document2Slide Demo"),
ui_card(
"Upload PDF and Preprocess",
ui.input_file("input_pdf", "Choose a .pdf file to upload:", multiple=False),
ui.output_text("upload_file_status", ),
ui.p(
ui.input_action_button("preprocess_action", "Preprocess file", class_="btn-primary"),
ui.output_text("preprocess_result", ),
),
ui.output_text("preprocess_status", ),
ui.download_button("download_preprocessed", "Download preprocessed file"),
),
ui.h3("Due to gpt-4's unreliable service, we choose to show our demo locally. You can refer to ./core/chatgpt/generate_slides.py for this pipeline."),
ui_card(
"Upload the generated bullet points in pre-defined format.",
ui.input_file("input_bullet", "Choose a .tex bullet-point file to upload:", multiple=False),
ui.output_text("upload_bullet_status", ),
ui.p(
ui.input_action_button("process_bullet", "Generate .tex", class_="btn-primary"),
ui.output_text("process_bullet_result", ),
),
ui.p(ui.download_button("download_beamer", "Download beamer source code")),
ui.p(
ui.input_action_button("complie_latex", "Compile the latex file generated before.", class_="btn-primary"),
ui.output_text("complie_latex_result", ),
),
ui.p(ui.download_button("download_slide", "Download slide generated")),
),
)
def server(input, output, session):
@output
@render.text
def upload_file_status():
file_infos = input.input_pdf()
# print(file_infos) # [{'name': 'Poster.pdf', 'size': 598394, 'type': 'application/pdf', 'datapath': '/tmp/fileupload-2c21fv0a/tmpi91sy07h/0.pdf'}]
if not file_infos:
return "There is no file provided currently."
elif file_infos[0]['type'] != 'application/pdf':
return "the file you provide is not in PDF format, upload another one!"
else:
return "PDF file successfully uploaded!"
@output
@render.text
def preprocess_status():
global last_pdf_md5_preprocess_stage
file_infos = input.input_pdf()
file_md5 = compute_hash(file_infos[0]['datapath']) if file_infos else None
if (file_infos is not None) and file_infos[0]['type'] == 'application/pdf' and (file_md5 != last_pdf_md5_preprocess_stage):
return "Ready to preprocess the PDF!"
elif file_md5 == last_pdf_md5_preprocess_stage:
return "PDF already preprocessed! You can continue!"
else:
return "No PDF ready currently, please upload a PDF!"
@output
@render.text
@reactive.event(input.preprocess_action) # Take a dependency on the button
async def preprocess_result():
global last_pdf_md5_preprocess_stage
file_infos = input.input_pdf()
if (file_infos is not None) and file_infos[0]['type'] == 'application/pdf':
file_name = file_infos[0]['name']
original_pdf_pth = file_infos[0]['datapath']
dir_name = os.path.dirname(original_pdf_pth)
new_pdf_pth = os.path.join(dir_name, file_name)
os.rename(original_pdf_pth, new_pdf_pth)
file_infos[0]['datapath'] = new_pdf_pth
file_md5 = compute_hash(file_infos[0]['datapath'])
try:
if file_md5 != last_pdf_md5_preprocess_stage:
process_pdf(pdf_pth=new_pdf_pth, file_name=file_name)
last_pdf_md5_preprocess_stage = file_md5
return "Process successfully!"
else:
return "Already processed!!!"
except:
return "Something wrong happen, please switch to another file!"
else:
return "No PDF provided!"
@session.download()
def download_preprocessed():
file_infos = input.input_pdf()
file_name = file_infos[0]['name'][:-4]
preprocessed_file_dir = os.path.join(temp_dir, file_name)
if os.path.exists(preprocessed_file_dir): # this dir exists
args = ['zip', '-r', file_name + '.zip', './' + file_name]
call(args, cwd=temp_dir)
return str(os.path.join(temp_dir, file_name + '.zip'))
def upload_file_status():
file_infos = input.input_bullet()
# print(file_infos) # [{'name': 'Poster.pdf', 'size': 598394, 'type': 'application/pdf', 'datapath': '/tmp/fileupload-2c21fv0a/tmpi91sy07h/0.pdf'}]
if not file_infos:
return "There is no file provided currently."
elif file_infos[0]['type'] != 'text/plain':
return "the file you provide is not in txt format, upload another one!"
else:
return "txt file successfully uploaded!"
@output
@render.text
@reactive.event(input.process_bullet) # Take a dependency on the button
async def process_bullet_result():
file_infos = input.input_bullet()
file_name = file_infos[0]['name'] if file_infos else None
if (file_infos is not None) and file_infos[0]['type'] == 'text/plain':
txt_pth = file_infos[0]['datapath']
try:
with open(txt_pth, 'r') as f:
slide = f.read()
output_tex_pth = str(os.path.join(temp_dir, file_name[:-4] + '.tex'))
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
generate_latex_slide(slide, output_tex_pth)
return "Generate .tex file successful!"
except:
return "Something run happened please which to another file!"
else:
return "No .txt provided, please upload one!"
@session.download()
def download_beamer():
file_infos = input.input_bullet()
if not file_infos:
return
file_name = file_infos[0]['name']
tex_pth = str(os.path.join(temp_dir, file_name[:-4] + '.tex'))
if not os.path.exists(tex_pth):
return
else:
return tex_pth
@output
@render.text
@reactive.event(input.complie_latex) # Take a dependency on the button
async def complie_latex_result():
file_infos = input.input_bullet()
if not file_infos:
return "No file uploaded yet!"
file_name = file_infos[0]['name']
tex_pth = str(os.path.join(temp_dir, file_name[:-4] + '.tex'))
if not os.path.exists(tex_pth):
return "No .tex file yet, please upload a .txt bullet point file and convert it to beamer tex."
tex_file_name = tex_pth.split('/')[-1]
args = ["latexmk", "-xelatex", tex_file_name]
return_code = call(args, cwd=temp_dir)
if return_code == 0:
return "Compile sucessful!"
else:
return "Compile fail!"
@session.download()
def download_slide():
file_infos = input.input_bullet()
if not file_infos:
return
file_name = file_infos[0]['name']
pdf_pth = str(os.path.join(temp_dir, file_name[:-4] + '.pdf'))
if not os.path.exists(pdf_pth):
return
else:
return pdf_pth
app = App(app_ui, server)