optimised-ocr / multiple_request.py
AuditEdge's picture
final commit
cce4094
import requests
import concurrent.futures
import time
import os
from dotenv import load_dotenv
load_dotenv()
# Access variables
dummy_key = os.getenv("dummy_key")
HUGGINGFACE_AUTH_TOKEN = dummy_key
print("dummy_key",dummy_key)
# Define the API endpoint
#http://43.204.234.114:8000/api/aadhar_ocr
# API_URL = "http://127.0.0.1:8000/api/aadhar_ocr"
API_URL = "https://auditedge-optimised-ocr.hf.space/api/aadhar_ocr"
auth_token = HUGGINGFACE_AUTH_TOKEN # Replace with your actual token
# Request Headers
headers = {
"Authorization": f"Bearer {auth_token}"}
response = requests.get(API_URL,headers=headers)
print("this is response\n\n",response.text)
# Define the file paths
FILE_PATHS = {
"aadhar_file": "uploads/aadhar/test_one.jpg",
# "pan_file": "test_images_pan/6ea33087.jpeg",
# "cheque_file": "test_images_cheque/0f81678a.jpeg",
# "gst_file": "test_images_gst/0a52fbcb_page3_image_0.jpg",
}
files = {key: open(path, "rb") for key, path in FILE_PATHS.items()}
response = requests.post(API_URL,headers=headers, files=files)
print(response.text)
# import sys
# sys.exit()
# Function to send a single POST request
def send_request():
try:
start_time = time.time()
# Open files dynamically for each request
files = {key: open(path, "rb") for key, path in FILE_PATHS.items()}
response = requests.post(API_URL,headers=headers, files=files)
print("this is response\n\n",response)
end_time = time.time()
print(f"\nTime taken for one request: {end_time - start_time:.2f} seconds")
# Close the files after the request
for file in files.values():
file.close()
return response.status_code, response.text
except requests.exceptions.RequestException as e:
return "Error", str(e)
# Main function to send multiple concurrent requests
def test_api_concurrency(num_requests):
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
# Launch multiple requests concurrently
results = list(executor.map(lambda _: send_request(), range(num_requests)))
end_time = time.time()
# Print results
for idx, (status, text) in enumerate(results):
print(f"Request {idx + 1}: Status Code: {status}, Response: {text}")
print(f"\nTotal time taken: {end_time - start_time:.2f} seconds")
# Number of concurrent requests
NUM_REQUESTS = 8 # Adjust this number based on your testing needs
if __name__ == "__main__":
test_api_concurrency(NUM_REQUESTS)