Spaces:
Paused
Paused
import requests | |
import concurrent.futures | |
import time | |
import os | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Access variables | |
dummy_key = os.getenv("dummy_key") | |
HUGGINGFACE_AUTH_TOKEN = dummy_key | |
print("dummy_key",dummy_key) | |
# Define the API endpoint | |
#http://43.204.234.114:8000/api/aadhar_ocr | |
# API_URL = "http://127.0.0.1:8000/api/aadhar_ocr" | |
API_URL = "https://auditedge-optimised-ocr.hf.space/api/aadhar_ocr" | |
auth_token = HUGGINGFACE_AUTH_TOKEN # Replace with your actual token | |
# Request Headers | |
headers = { | |
"Authorization": f"Bearer {auth_token}"} | |
response = requests.get(API_URL,headers=headers) | |
print("this is response\n\n",response.text) | |
# Define the file paths | |
FILE_PATHS = { | |
"aadhar_file": "uploads/aadhar/test_one.jpg", | |
# "pan_file": "test_images_pan/6ea33087.jpeg", | |
# "cheque_file": "test_images_cheque/0f81678a.jpeg", | |
# "gst_file": "test_images_gst/0a52fbcb_page3_image_0.jpg", | |
} | |
files = {key: open(path, "rb") for key, path in FILE_PATHS.items()} | |
response = requests.post(API_URL,headers=headers, files=files) | |
print(response.text) | |
# import sys | |
# sys.exit() | |
# Function to send a single POST request | |
def send_request(): | |
try: | |
start_time = time.time() | |
# Open files dynamically for each request | |
files = {key: open(path, "rb") for key, path in FILE_PATHS.items()} | |
response = requests.post(API_URL,headers=headers, files=files) | |
print("this is response\n\n",response) | |
end_time = time.time() | |
print(f"\nTime taken for one request: {end_time - start_time:.2f} seconds") | |
# Close the files after the request | |
for file in files.values(): | |
file.close() | |
return response.status_code, response.text | |
except requests.exceptions.RequestException as e: | |
return "Error", str(e) | |
# Main function to send multiple concurrent requests | |
def test_api_concurrency(num_requests): | |
start_time = time.time() | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
# Launch multiple requests concurrently | |
results = list(executor.map(lambda _: send_request(), range(num_requests))) | |
end_time = time.time() | |
# Print results | |
for idx, (status, text) in enumerate(results): | |
print(f"Request {idx + 1}: Status Code: {status}, Response: {text}") | |
print(f"\nTotal time taken: {end_time - start_time:.2f} seconds") | |
# Number of concurrent requests | |
NUM_REQUESTS = 8 # Adjust this number based on your testing needs | |
if __name__ == "__main__": | |
test_api_concurrency(NUM_REQUESTS) | |