|
import os |
|
import pandas as pd |
|
import cv2 |
|
import numpy as np |
|
import json |
|
import requests |
|
import traceback |
|
import tempfile |
|
|
|
FLASK_DEBUG=1 |
|
from PIL import Image |
|
|
|
def preprocess_image(image_path, max_file_size_mb=1, target_file_size_mb=0.5): |
|
try: |
|
|
|
image = cv2.imread(image_path) |
|
|
|
enhanced = enhance_txt(image) |
|
|
|
|
|
temp_file_path = tempfile.NamedTemporaryFile(suffix='.jpg').name |
|
cv2.imwrite(temp_file_path, enhanced) |
|
|
|
|
|
file_size_mb = os.path.getsize(temp_file_path) / (1024 * 1024) |
|
|
|
while file_size_mb > max_file_size_mb: |
|
print(f"File size ({file_size_mb} MB) exceeds the maximum allowed size ({max_file_size_mb} MB). Resizing the image.") |
|
ratio = np.sqrt(target_file_size_mb / file_size_mb) |
|
new_width = int(image.shape[1] * ratio) |
|
new_height = int(image.shape[0] * ratio) |
|
|
|
|
|
enhanced = cv2.resize(enhanced, (new_width, new_height)) |
|
|
|
|
|
temp_file_path = tempfile.NamedTemporaryFile(suffix='.jpg').name |
|
cv2.imwrite(temp_file_path, enhanced) |
|
|
|
|
|
file_size_mb = os.path.getsize(temp_file_path) / (1024 * 1024) |
|
print(f"New file size: ({file_size_mb} MB)") |
|
|
|
|
|
image_resized = cv2.imread(temp_file_path) |
|
return image_resized |
|
|
|
except Exception as e: |
|
print(f"An error occurred in preprocess_image: {str(e)}") |
|
return None |
|
|
|
|
|
def enhance_txt(img, intensity_increase=20, bilateral_filter_diameter=9, bilateral_filter_sigma_color=75, bilateral_filter_sigma_space=75): |
|
|
|
w = img.shape[1] |
|
h = img.shape[0] |
|
w1 = int(w * 0.05) |
|
w2 = int(w * 0.95) |
|
h1 = int(h * 0.05) |
|
h2 = int(h * 0.95) |
|
ROI = img[h1:h2, w1:w2] |
|
threshold = np.mean(ROI) * 0.88 |
|
|
|
|
|
grayscale_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
blurred = cv2.GaussianBlur(grayscale_img, (1, 1), 0) |
|
|
|
edged = 255 - cv2.Canny(blurred, 100, 150, apertureSize=7) |
|
|
|
|
|
img = np.clip(img + intensity_increase, 0, 255).astype(np.uint8) |
|
|
|
|
|
img = cv2.bilateralFilter(img, bilateral_filter_diameter, bilateral_filter_sigma_color, bilateral_filter_sigma_space) |
|
|
|
_, binary = cv2.threshold(blurred, threshold, 255, cv2.THRESH_BINARY) |
|
|
|
|
|
contours, _ = cv2.findContours(edged.copy(), cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) |
|
contours = sorted(contours, key = cv2.contourArea, reverse = True)[:5] |
|
|
|
|
|
screenContour = None |
|
|
|
|
|
for c in contours: |
|
|
|
peri = cv2.arcLength(c, True) |
|
approx = cv2.approxPolyDP(c, 0.02 * peri, True) |
|
|
|
|
|
if len(approx) == 4: |
|
screenContour = approx |
|
break |
|
|
|
|
|
if screenContour is None or cv2.contourArea(screenContour) < 500: |
|
screenContour = np.array([[[0, 0]], [[w-1, 0]], [[w-1, h-1]], [[0, h-1]]]) |
|
|
|
|
|
x, y, w, h = cv2.boundingRect(screenContour) |
|
|
|
|
|
if x >= 0 and y >= 0 and x + w <= img.shape[1] and y + h <= img.shape[0]: |
|
|
|
cropped_img = img[y:y+h, x:x+w] |
|
else: |
|
print("Bounding rectangle is out of image boundaries") |
|
cropped_img = img |
|
|
|
return cropped_img |
|
|
|
def run_tesseract_on_preprocessed_image(preprocessed_image, image_path): |
|
try: |
|
image_name = os.path.basename(image_path) |
|
image_name = image_name[:image_name.find('.')] |
|
|
|
|
|
temp_folder = "static/temp" |
|
if not os.path.exists(temp_folder): |
|
os.makedirs(temp_folder) |
|
|
|
|
|
url = "https://api.ocr.space/parse/image" |
|
|
|
|
|
api_key = "K88232854988957" |
|
language = "eng" |
|
|
|
|
|
cv2.imwrite(os.path.join(temp_folder, f"{image_name}_preprocessed.jpg"), preprocessed_image) |
|
|
|
|
|
with open(os.path.join(temp_folder, f"{image_name}_preprocessed.jpg"), "rb") as f: |
|
|
|
payload = { |
|
"apikey": api_key, |
|
"language": language, |
|
"isOverlayRequired": True, |
|
"OCREngine": 2 |
|
} |
|
|
|
file = { |
|
"file": f |
|
} |
|
|
|
response = requests.post(url, data=payload, files=file) |
|
|
|
|
|
if response.status_code == 200: |
|
|
|
result = response.json() |
|
print("---JSON file saved") |
|
|
|
with open(os.path.join(temp_folder, f"{image_name}_ocr.json"), 'w') as f: |
|
json.dump(result, f) |
|
|
|
return os.path.join(temp_folder, f"{image_name}_ocr.json") |
|
else: |
|
|
|
print("Error: " + response.text) |
|
return None |
|
|
|
except Exception as e: |
|
print(f"An error occurred during OCR request: {str(e)}") |
|
return None |
|
|
|
def clean_tesseract_output(json_output_path): |
|
try: |
|
with open(json_output_path, 'r') as json_file: |
|
data = json.load(json_file) |
|
|
|
lines = data['ParsedResults'][0]['TextOverlay']['Lines'] |
|
|
|
words = [] |
|
for line in lines: |
|
for word_info in line['Words']: |
|
word = {} |
|
origin_box = [ |
|
word_info['Left'], |
|
word_info['Top'], |
|
word_info['Left'] + word_info['Width'], |
|
word_info['Top'] + word_info['Height'] |
|
] |
|
|
|
word['word_text'] = word_info['WordText'] |
|
word['word_box'] = origin_box |
|
words.append(word) |
|
|
|
return words |
|
except (KeyError, IndexError, FileNotFoundError, json.JSONDecodeError) as e: |
|
print(f"Error cleaning Tesseract output: {str(e)}") |
|
return None |
|
|
|
def prepare_batch_for_inference(image_paths): |
|
|
|
|
|
print(f"Number of images to process: {len(image_paths)}") |
|
print("1. Preparing for Inference") |
|
tsv_output_paths = [] |
|
|
|
inference_batch = dict() |
|
print("2. Starting Preprocessing") |
|
|
|
for image_path in image_paths: |
|
print(f"Processing the image: {image_path}") |
|
print("3. Preprocessing the Receipt") |
|
preprocessed_image = preprocess_image(image_path) |
|
if preprocessed_image is not None: |
|
print("4. Preprocessing done. Running OCR") |
|
json_output_path = run_tesseract_on_preprocessed_image(preprocessed_image, image_path) |
|
print("5. OCR Complete") |
|
if json_output_path: |
|
tsv_output_paths.append(json_output_path) |
|
|
|
print("6. Preprocessing and OCR Done") |
|
|
|
clean_outputs = [clean_tesseract_output(tsv_path) for tsv_path in tsv_output_paths] |
|
print("7. Cleaned OCR output") |
|
word_lists = [[word['word_text'] for word in clean_output] for clean_output in clean_outputs] |
|
print("8. Word List Created") |
|
boxes_lists = [[word['word_box'] for word in clean_output] for clean_output in clean_outputs] |
|
print("9. Box List Created") |
|
inference_batch = { |
|
"image_path": image_paths, |
|
"bboxes": boxes_lists, |
|
"words": word_lists |
|
} |
|
|
|
print("10. Prepared for Inference Batch") |
|
return inference_batch |