|
import os |
|
import io |
|
import fitz |
|
from PIL import Image, ImageOps, ImageChops |
|
from docx import Document |
|
from rembg import remove |
|
import gradio as gr |
|
from hezar.models import Model |
|
from ultralytics import YOLO |
|
import json |
|
import logging |
|
import shutil |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
os.makedirs("static", exist_ok=True) |
|
os.makedirs("output_images", exist_ok=True) |
|
|
|
def remove_readonly(func, path, excinfo): |
|
os.chmod(path, stat.S_IWRITE) |
|
func(path) |
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
ultralytics_path = os.path.join(current_dir, 'runs') |
|
|
|
if os.path.exists(ultralytics_path): |
|
shutil.rmtree(ultralytics_path, onerror=remove_readonly) |
|
|
|
def trim_whitespace(image): |
|
gray_image = ImageOps.grayscale(image) |
|
inverted_image = ImageChops.invert(gray_image) |
|
bbox = inverted_image.getbbox() |
|
trimmed_image = image.crop(bbox) |
|
return trimmed_image |
|
|
|
def convert_pdf_to_images(pdf_path, zoom=2): |
|
pdf_document = fitz.open(pdf_path) |
|
images = [] |
|
for page_num in range(len(pdf_document)): |
|
page = pdf_document.load_page(page_num) |
|
matrix = fitz.Matrix(zoom, zoom) |
|
pix = page.get_pixmap(matrix=matrix) |
|
image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
|
trimmed_image = trim_whitespace(image) |
|
images.append(trimmed_image) |
|
logging.info(f"Converted PDF {pdf_path} to images.") |
|
return images |
|
|
|
def convert_docx_to_jpeg(docx_bytes): |
|
document = Document(io.BytesIO(docx_bytes)) |
|
images = [] |
|
for rel in document.part.rels.values(): |
|
if "image" in rel.target_ref: |
|
image_stream = rel.target_part.blob |
|
image = Image.open(io.BytesIO(image_stream)) |
|
jpeg_image = io.BytesIO() |
|
image.convert('RGB').save(jpeg_image, format="JPEG") |
|
jpeg_image.seek(0) |
|
images.append(Image.open(jpeg_image)) |
|
logging.info("Converted DOCX to images.") |
|
return images |
|
|
|
def remove_background_from_image(image): |
|
result = remove(image) |
|
logging.info("Removed background from image.") |
|
return result |
|
|
|
def process_file(input_file): |
|
file_extension = os.path.splitext(input_file.name)[1].lower() |
|
images = [] |
|
|
|
if file_extension in ['.png', '.jpeg', '.jpg', '.bmp', '.gif']: |
|
image = Image.open(input_file) |
|
output_image = remove_background_from_image(image) |
|
images.append(output_image) |
|
elif file_extension == '.pdf': |
|
images = convert_pdf_to_images(input_file.name) |
|
images = [remove_background_from_image(image) for image in images] |
|
elif file_extension in ['.docx', '.doc']: |
|
images = convert_docx_to_jpeg(input_file.name) |
|
images = [remove_background_from_image(image) for image in images] |
|
else: |
|
logging.error("File format not supported.") |
|
return "File format not supported." |
|
|
|
input_folder = 'output_images' |
|
for i, img in enumerate(images): |
|
if img.mode == 'RGBA': |
|
img = img.convert('RGB') |
|
img.save(os.path.join(input_folder, f'image_{i}.jpg')) |
|
logging.info("Processed file and saved images.") |
|
return images |
|
|
|
def run_detection_and_ocr(): |
|
|
|
ocr_model = Model.load('hezarai/crnn-fa-printed-96-long') |
|
yolo_model_check = YOLO("best_300_D_check.pt") |
|
yolo_model_numbers = YOLO("P_D_T.pt") |
|
|
|
input_folder = 'output_images' |
|
yolo_model_check.predict(input_folder, save=True, conf=0.5, save_crop=True) |
|
logging.info("Ran YOLO detection for check model.") |
|
|
|
output_folder = 'runs/detect/predict' |
|
crop_folder = os.path.join(output_folder, 'crops') |
|
|
|
results = [] |
|
|
|
for filename in os.listdir(input_folder): |
|
if filename.endswith('.JPEG') or filename.endswith('.jpg'): |
|
image_path = os.path.join(input_folder, filename) |
|
if os.path.exists(crop_folder): |
|
crops = [] |
|
for crop_label in os.listdir(crop_folder): |
|
crop_label_folder = os.path.join(crop_folder, crop_label) |
|
if os.path.isdir(crop_label_folder): |
|
for crop_filename in os.listdir(crop_label_folder): |
|
crop_image_path = os.path.join(crop_label_folder, crop_filename) |
|
if crop_label in ['mablagh_H', 'owner', 'vajh']: |
|
text_prediction = predict_text(ocr_model, crop_image_path) |
|
else: |
|
text_prediction = process_numbers(yolo_model_numbers, crop_image_path) |
|
crops.append({ |
|
'crop_image_path': crop_image_path, |
|
'text_prediction': text_prediction, |
|
'class_label': crop_label |
|
}) |
|
results.append({ |
|
'image': filename, |
|
'crops': crops |
|
}) |
|
logging.info("Processed detection and OCR.") |
|
output_json_path = 'output.json' |
|
with open(output_json_path, 'w', encoding='utf-8') as f: |
|
json.dump(results, f, ensure_ascii=False, indent=4) |
|
logging.info("Saved results to JSON.") |
|
return output_json_path |
|
|
|
def predict_text(model, image_path): |
|
try: |
|
image = Image.open(image_path) |
|
image = image.resize((320, 320)) |
|
output = model.predict(image) |
|
if isinstance(output, list): |
|
result = ' '.join([item['text'] for item in output]) |
|
logging.info(f"Predicted text for {image_path}.") |
|
return result |
|
return str(output) |
|
except FileNotFoundError: |
|
logging.error(f"File not found: {image_path}.") |
|
return "N/A" |
|
|
|
def process_numbers(model, image_path): |
|
label_map = { |
|
'-': '/', |
|
'0': '0', |
|
'1': '1', |
|
'2': '2', |
|
'3': '3', |
|
'4': '4', |
|
'4q': '4', |
|
'5': '5', |
|
'6': '6', |
|
'6q': '6', |
|
'7': '7', |
|
'8': '8', |
|
'9': '9' |
|
} |
|
results = model(image_path, conf=0.5, save_crop=False) |
|
detected_objects = [] |
|
for result in results[0].boxes: |
|
class_id = int(result.cls[0].cpu().numpy()) |
|
label = model.names[class_id] |
|
mapped_label = label_map.get(label, '') |
|
detected_objects.append({'bbox': result.xyxy[0].cpu().numpy().tolist(), 'label': mapped_label}) |
|
sorted_objects = sorted(detected_objects, key=lambda x: x['bbox'][0]) |
|
logging.info(f"Processed numbers for {image_path}.") |
|
return ''.join([obj['label'] for obj in sorted_objects]) |
|
|
|
def gradio_interface(input_file): |
|
process_file(input_file) |
|
json_output = run_detection_and_ocr() |
|
with open(json_output, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
logging.info("Generated JSON output for Gradio interface.") |
|
return data |
|
|
|
iface = gr.Interface( |
|
fn=gradio_interface, |
|
inputs=gr.File(label="Upload Word, PDF, or Image"), |
|
outputs=gr.JSON(label="JSON Output"), |
|
title="Document to JSON Converter with Background Removal" |
|
) |
|
|
|
if __name__ == "__main__": |
|
logging.info("Starting Gradio interface.") |
|
iface.launch() |