File size: 7,394 Bytes
6df3c38 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
import os
import pandas as pd
import cv2
import numpy as np
import json
import requests
import traceback
import tempfile
from PIL import Image
def preprocess_image(image_path, max_file_size_mb=1, target_file_size_mb=0.5):
try:
# Read the image
image = cv2.imread(image_path)
# Enhance text
enhanced = enhance_txt(image)
# Save the enhanced image to a temporary file
temp_file_path = tempfile.NamedTemporaryFile(suffix='.jpg').name
cv2.imwrite(temp_file_path, enhanced)
# Check file size of the temporary file
file_size_mb = os.path.getsize(temp_file_path) / (1024 * 1024) # Convert to megabytes
while file_size_mb > max_file_size_mb:
print(f"File size ({file_size_mb} MB) exceeds the maximum allowed size ({max_file_size_mb} MB). Resizing the image.")
ratio = np.sqrt(target_file_size_mb / file_size_mb)
new_width = int(image.shape[1] * ratio)
new_height = int(image.shape[0] * ratio)
# Resize the image
enhanced = cv2.resize(enhanced, (new_width, new_height))
# Save the resized image to a temporary file
temp_file_path = tempfile.NamedTemporaryFile(suffix='.jpg').name
cv2.imwrite(temp_file_path, enhanced)
# Update file size
file_size_mb = os.path.getsize(temp_file_path) / (1024 * 1024)
print(f"New file size: ({file_size_mb} MB)")
# Return the final resized image
image_resized = cv2.imread(temp_file_path)
return image_resized
except Exception as e:
print(f"An error occurred in preprocess_image: {str(e)}")
return None
def enhance_txt(img, intensity_increase=20, bilateral_filter_diameter=9, bilateral_filter_sigma_color=75, bilateral_filter_sigma_space=75):
# Get the width and height of the image
w = img.shape[1]
h = img.shape[0]
w1 = int(w * 0.05)
w2 = int(w * 0.95)
h1 = int(h * 0.05)
h2 = int(h * 0.95)
ROI = img[h1:h2, w1:w2] # 95% of the center of the image
threshold = np.mean(ROI) * 0.88 # % of average brightness
# Convert image to grayscale
grayscale_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Apply Gaussian blur
blurred = cv2.GaussianBlur(grayscale_img, (1, 1), 0)
edged = 255 - cv2.Canny(blurred, 100, 150, apertureSize=7)
# Increase intensity by adding a constant value
img = np.clip(img + intensity_increase, 0, 255).astype(np.uint8)
# Apply bilateral filter to reduce noise
img = cv2.bilateralFilter(img, bilateral_filter_diameter, bilateral_filter_sigma_color, bilateral_filter_sigma_space)
_, binary = cv2.threshold(blurred, threshold, 255, cv2.THRESH_BINARY)
return binary
def run_tesseract_on_preprocessed_image(preprocessed_image, image_path):
try:
image_name = os.path.basename(image_path)
image_name = image_name[:image_name.find('.')]
# Create the "temp" folder if it doesn't exist
temp_folder = "static/temp"
if not os.path.exists(temp_folder):
os.makedirs(temp_folder)
# Define the OCR API endpoint
url = "https://api.ocr.space/parse/image"
# Define the API key and the language
api_key = "K88232854988957" # Replace with your actual OCR Space API key
language = "eng"
# Save the preprocessed image
cv2.imwrite(os.path.join(temp_folder, f"{image_name}_preprocessed.jpg"), preprocessed_image)
# Open the preprocessed image file as binary
with open(os.path.join(temp_folder, f"{image_name}_preprocessed.jpg"), "rb") as f:
# Define the payload for the API request
payload = {
"apikey": api_key,
"language": language,
"isOverlayRequired": True,
"OCREngine": 2
}
# Define the file parameter for the API request
file = {
"file": f
}
# Send the POST request to the OCR API
response = requests.post(url, data=payload, files=file)
# Check the status code of the response
if response.status_code == 200:
# Parse the JSON response
result = response.json()
print("---JSON file saved")
# Save the OCR result as JSON
with open(os.path.join(temp_folder, f"{image_name}_ocr.json"), 'w') as f:
json.dump(result, f)
return os.path.join(temp_folder, f"{image_name}_ocr.json")
else:
# Print the error message
print("Error: " + response.text)
return None
except Exception as e:
print(f"An error occurred during OCR request: {str(e)}")
return None
def clean_tesseract_output(json_output_path):
try:
with open(json_output_path, 'r') as json_file:
data = json.load(json_file)
lines = data['ParsedResults'][0]['TextOverlay']['Lines']
words = []
for line in lines:
for word_info in line['Words']:
word = {}
origin_box = [
word_info['Left'],
word_info['Top'],
word_info['Left'] + word_info['Width'],
word_info['Top'] + word_info['Height']
]
word['word_text'] = word_info['WordText']
word['word_box'] = origin_box
words.append(word)
return words
except (KeyError, IndexError, FileNotFoundError, json.JSONDecodeError) as e:
print(f"Error cleaning Tesseract output: {str(e)}")
return None
def prepare_batch_for_inference(image_paths):
# print("my_function was called")
# traceback.print_stack() # This will print the stack trace
print(f"Number of images to process: {len(image_paths)}") # Print the total number of images to be processed
print("1. Preparing for Inference")
tsv_output_paths = []
inference_batch = dict()
print("2. Starting Preprocessing")
# Ensure that the image is only 1
for image_path in image_paths:
print(f"Processing the image: {image_path}") # Print the image being processed
print("3. Preprocessing the Receipt")
preprocessed_image = preprocess_image(image_path)
if preprocessed_image is not None:
print("4. Preprocessing done. Running OCR")
json_output_path = run_tesseract_on_preprocessed_image(preprocessed_image, image_path)
print("5. OCR Complete")
if json_output_path:
tsv_output_paths.append(json_output_path)
print("6. Preprocessing and OCR Done")
# clean_outputs is a list of lists
clean_outputs = [clean_tesseract_output(tsv_path) for tsv_path in tsv_output_paths]
print("7. Cleaned OCR output")
word_lists = [[word['word_text'] for word in clean_output] for clean_output in clean_outputs]
print("8. Word List Created")
boxes_lists = [[word['word_box'] for word in clean_output] for clean_output in clean_outputs]
print("9. Box List Created")
inference_batch = {
"image_path": image_paths,
"bboxes": boxes_lists,
"words": word_lists
}
print("10. Prepared for Inference Batch")
return inference_batch |