import argparse import os import signal import sys import json import time import tempfile import zipfile from rich.console import Console from rich.progress import track import camelot import polars as pl import gradio as gr from gradio_pdf import PDF console = Console() class Interface: def get_tempdir(): timestamp = int(time.time()) temp_dir = tempfile.mkdtemp() return timestamp, temp_dir def create_zip(file_list, zip_path, password=None): with zipfile.ZipFile(zip_path, "w", zipfilep64=True) as zipf: if password: zipf.setpassword(bytes(password, 'utf-8')) for item in file_list: if os.path.isdir(item): for root, _, files in os.walk(item): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, item) zipf.write(file_path, arcname) else: arcname = os.path.basename(item) zipf.write(item, arcname) class PDFTableParser: def __init__(self, input_files, output_files, delimiter, edge_tol, row_tol, pages): self.input_files = input_files self.output_files = output_files self.delimiter = delimiter self.edge_tol = edge_tol self.row_tol = row_tol self.pages = pages def read_tables(self, file_name): try: console.print(f"Reading tables from {file_name}...") tables = camelot.read_pdf(file_name, flavor='stream', edge_tol=self.edge_tol, row_tol=self.row_tol, pages=self.pages) console.print(f"Found {len(tables)} tables in {file_name}.") return tables except Exception as e: console.print(f"[red]Error reading {file_name}: {e}[/red]") return None def save_tables_as_csv(self, tables, output_file): try: console.print(f"Saving tables to {output_file}...") df = pl.concat([pl.DataFrame(table.df) for table in tables]) df.write_csv(output_file, separator=self.delimiter) console.print(f"Saved tables to {output_file}.") except Exception as e: console.print(f"[red]Error saving to {output_file}: {e}[/red]") def estimate_processing_time(self, file_name): try: with open(file_name, 'rb') as f: content = f.read().decode('utf-8', errors='ignore') pages = content.count('\n') words = len(content.split()) chars = len(content) estimated_time = (lines / 1000) + (words / 1000) + (chars / 1000) console.print(f"Estimated processing time for {file_name}: {estimated_time:.2f} seconds.") return estimated_time except Exception as e: console.print(f"[red]Error estimating processing time for {file_name}: {e}[/red]") return 0 def process_files(self): for input_file, output_file in track(zip(self.input_files, self.output_files), description="Processing files"): self.estimate_processing_time(input_file) tables = self.read_tables(input_file) if tables: self.save_tables_as_csv(tables, output_file) class WebUI: def __init__(self): pass def process_pdf(pdf_file, output_path, edge_tol, row_tol, pages): ts, tempd = Interface.get_tempdir() tempf = os.path.join(tempd, output_path) parser = PDFTableParser([pdf_file], [tempf], ',', edge_tol, row_tol, pages) tables = parser.read_tables(pdf_file) if tables: parser.save_tables_as_csv(tables, tempf) df = pl.concat([pl.DataFrame(table.df) for table in tables]) return df, [tempf], {"status": "success", "message": f"Processed PDF and saved as {tempf}"} return None, None, {"status": "error", "message": "Failed to process PDF"} def run(self): with gr.Blocks(title="PDF Table Parser", css="body { font-family: Arial, sans-serif; } footer { visibility: hidden; }") as app: gr.Markdown("# PDF Table Parser") description="Upload a PDF file to extract tables" gr.Markdown(f"### {description}") with gr.Row(): with gr.Column(): pdf_in = PDF(label="Document") with gr.Row(): edge_tol = gr.Number(50, label="Edge tol") row_tol = gr.Number(50, label="Row tol") pages = gr.Textbox('1', label="Pages", info="You can pass 'all', '3-end', etc.") output_path = gr.Textbox(f"output.csv", label="Output Path") with gr.Column(): status_msg = gr.JSON(label="Status Message") output_files = gr.Files(label="Output Files") with gr.Row(): output_df = gr.Dataframe(label="Extracted Table") examples = gr.Examples([["data/demo.pdf"]], inputs=pdf_in) pdf_in.change(WebUI.process_pdf, inputs=[pdf_in, output_path, edge_tol, row_tol, pages], outputs=[output_df, output_files, status_msg]) app.launch() def handle_signal(signum, frame): console.print("\n[red]Process interrupted.[/red]") sys.exit(1) def main(args): parser = PDFTableParser(args.input_files, args.output_files, args.delimiter, args.edge_tol, args.row_tol, args.pages) parser.process_files() if __name__ == "__main__": signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) parser = argparse.ArgumentParser(description="PDF Table Parser") parser.add_argument("input_files", nargs='+', help="List of input PDF files") parser.add_argument("output_files", nargs='+', help="List of output CSV files") parser.add_argument("--delimiter", default=',', help="Output file delimiter (default: ,)") parser.add_argument("--edge_tol", type=int, default=50, help="Tolerance parameter used to specify the distance between text and table edges (default: 50)") parser.add_argument("--row_tol", type=int, default=50, help="Tolerance parameter used to specify the distance between table rows (default: 50)") parser.add_argument("--pages", type=str, default='all', help="Pages you can pass the number of pages to process. (default: all)") parser.add_argument("--webui", action='store_true', help="Launch the web UI") args = parser.parse_args() if len(args.input_files) != len(args.output_files): console.print("[red]The number of input files and output files must match.[/red]") sys.exit(1) if args.webui: webui = WebUI() webui.run() else: main(args)