|
import json |
|
import pandas as pd |
|
import asyncio |
|
import base64 |
|
from PIL import Image, ImageDraw |
|
from aiohttp import ClientSession |
|
from io import BytesIO |
|
from data.model import OCR_model |
|
from data.data import name_distribute |
|
|
|
async def getImage(img_url): |
|
async with ClientSession() as session: |
|
try: |
|
async with session.get(img_url) as response: |
|
img_data = await response.read() |
|
return BytesIO(img_data) |
|
except Exception as e: |
|
print({"Error in getImage":str(e)}) |
|
|
|
|
|
async def detection(model,img_content): |
|
try: |
|
img = Image.open(img_content) |
|
|
|
result = model(img,device=0,conf=0.6) |
|
|
|
detection = {} |
|
data = json.loads(result[0].tojson()) |
|
rect_data = [] |
|
for items in data: |
|
rect_data.append(items["box"]) |
|
print(items["box"]) |
|
img = await create_rectangle(Image.open(img_content),rect_data) |
|
if len(data) == 0: |
|
res = {"AI": "No Detection"} |
|
detection.update(res) |
|
else: |
|
df = pd.DataFrame(data) |
|
name_counts = df['name'].value_counts().sort_index() |
|
|
|
for name, count in name_counts.items(): |
|
res = {name: count} |
|
detection.update(res) |
|
return detection, img |
|
except Exception as e: |
|
print({"Error in detection":str(e)}) |
|
|
|
|
|
async def share_iamge(img): |
|
try: |
|
|
|
buffer = BytesIO() |
|
img.save(buffer, format="JPEG") |
|
base64_image = base64.b64encode(buffer.getvalue()).decode("utf-8") |
|
data = {"Arabic Detected": base64_image} |
|
return data |
|
except Exception as e: |
|
print({"Error in share_iamge":str(e)}) |
|
|
|
|
|
|
|
async def create_rectangle(image_data,cords): |
|
try: |
|
drawing = ImageDraw.Draw(image_data) |
|
|
|
for item in cords: |
|
x1 = item["x1"] |
|
x2 = item["x2"] |
|
y1 = item["y1"] |
|
y2 = item["y2"] |
|
drawing.rectangle([x1,y1,x2,y2],outline="red",width=1) |
|
return image_data |
|
except Exception as e: |
|
print({"Error in create_rectangle":str(e)}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
async def format_result(res,conv): |
|
try: |
|
result = {} |
|
for key,value in conv.items(): |
|
if key in res: |
|
data = {value:res[key]} |
|
result.update(data) |
|
return result |
|
except Exception as e: |
|
print({"Error in format_result":str(e)}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
async def mainDet(url): |
|
try: |
|
image = await asyncio.create_task(getImage(url)) |
|
detect_data,img = await asyncio.create_task(detection(OCR_model, image)) |
|
tab_data = await asyncio.create_task(format_result(detect_data,name_distribute)) |
|
img_data = await asyncio.create_task(share_iamge(img)) |
|
result = { |
|
"tabularData":{} if len(tab_data)==0 else tab_data, |
|
"imageData":{} |
|
} |
|
return json.dumps(result) |
|
|
|
except Exception as e: |
|
print({"Error in mainDet":str(e)}) |
|
|
|
|