import os import torch import numpy as np from loguru import logger from tqdm import tqdm from dotenv import load_dotenv from fastapi import APIRouter from datetime import datetime from datasets import load_dataset from sklearn.metrics import accuracy_score, precision_score, recall_score from .utils.evaluation import ImageEvaluationRequest from .utils.emissions import tracker, clean_emissions_data, get_space_info from ultralytics import YOLO from ultralytics import RTDETR from torch.utils.data import DataLoader from torchvision import transforms from dotenv import load_dotenv load_dotenv() router = APIRouter() DESCRIPTION = "Image to detect smoke" ROUTE = "/image" device = torch.device("cuda") def load_camera_models(): models = {} folder = "models/cameras_dataset" cameras = ['brison-200', 'brison-110', 'courmettes-212', 'courmettes-160', 'brison-290', 'marguerite-29','default'] # Ensure the folder exists if not os.path.exists(folder): raise FileNotFoundError(f"The folder '{folder}' does not exist.") # Iterate over files in the folder for model_path in os.listdir(folder): full_path = os.path.join(folder, model_path) for camera in cameras: if camera in model_path: models[camera] = YOLO(full_path, task = 'detect') break return models def parse_boxes(annotation_string): """Parse multiple boxes from a single annotation string. Each box has 5 values: class_id, x_center, y_center, width, height""" values = [float(x) for x in annotation_string.strip().split()] boxes = [] # Each box has 5 values for i in range(0, len(values), 5): if i + 5 <= len(values): # Skip class_id (first value) and take the next 4 values box = values[i + 1:i + 5] boxes.append(box) return boxes def compute_iou(box1, box2): """Compute Intersection over Union (IoU) between two YOLO format boxes.""" # Convert YOLO format (x_center, y_center, width, height) to corners def yolo_to_corners(box): x_center, y_center, width, height = box x1 = x_center - width / 2 y1 = y_center - height / 2 x2 = x_center + width / 2 y2 = y_center + height / 2 return np.array([x1, y1, x2, y2]) box1_corners = yolo_to_corners(box1) box2_corners = yolo_to_corners(box2) # Calculate intersection x1 = max(box1_corners[0], box2_corners[0]) y1 = max(box1_corners[1], box2_corners[1]) x2 = min(box1_corners[2], box2_corners[2]) y2 = min(box1_corners[3], box2_corners[3]) intersection = max(0, x2 - x1) * max(0, y2 - y1) # Calculate union box1_area = (box1_corners[2] - box1_corners[0]) * (box1_corners[3] - box1_corners[1]) box2_area = (box2_corners[2] - box2_corners[0]) * (box2_corners[3] - box2_corners[1]) union = box1_area + box2_area - intersection return intersection / (union + 1e-6) def compute_max_iou(true_boxes, pred_box): """Compute maximum IoU between a predicted box and all true boxes""" max_iou = 0 for true_box in true_boxes: iou = compute_iou(true_box, pred_box) max_iou = max(max_iou, iou) return max_iou def parse_boxes(annotation_string): """Parse multiple boxes from a single annotation string. Each box has 5 values: class_id, x_center, y_center, width, height""" values = [float(x) for x in annotation_string.strip().split()] boxes = [] # Each box has 5 values for i in range(0, len(values), 5): if i + 5 <= len(values): # Skip class_id (first value) and take the next 4 values box = values[i+1:i+5] boxes.append(box) return boxes def compute_iou(box1, box2): """Compute Intersection over Union (IoU) between two YOLO format boxes.""" # Convert YOLO format (x_center, y_center, width, height) to corners def yolo_to_corners(box): x_center, y_center, width, height = box x1 = x_center - width/2 y1 = y_center - height/2 x2 = x_center + width/2 y2 = y_center + height/2 return np.array([x1, y1, x2, y2]) box1_corners = yolo_to_corners(box1) box2_corners = yolo_to_corners(box2) # Calculate intersection x1 = max(box1_corners[0], box2_corners[0]) y1 = max(box1_corners[1], box2_corners[1]) x2 = min(box1_corners[2], box2_corners[2]) y2 = min(box1_corners[3], box2_corners[3]) intersection = max(0, x2 - x1) * max(0, y2 - y1) # Calculate union box1_area = (box1_corners[2] - box1_corners[0]) * (box1_corners[3] - box1_corners[1]) box2_area = (box2_corners[2] - box2_corners[0]) * (box2_corners[3] - box2_corners[1]) union = box1_area + box2_area - intersection return intersection / (union + 1e-6) def compute_max_iou(true_boxes, pred_box): """Compute maximum IoU between a predicted box and all true boxes""" max_iou = 0 for true_box in true_boxes: iou = compute_iou(true_box, pred_box) max_iou = max(max_iou, iou) return max_iou # @router.post(ROUTE, tags=["Image Task"], # description=DESCRIPTION) async def evaluate_image( request: ImageEvaluationRequest = ImageEvaluationRequest()): # def evaluate_image(model_path: str, request: ImageEvaluationRequest = ImageEvaluationRequest()): """ Evaluate image classification and object detection for forest fire smoke. Current Model: Random Baseline - Makes random predictions for both classification and bounding boxes - Used as a baseline for comparison Metrics: - Classification accuracy: Whether an image contains smoke or not - Object Detection accuracy: IoU (Intersection over Union) for smoke bounding boxes """ # Get space info username, space_url = get_space_info() # Load and prepare the dataset dataset = load_dataset(request.dataset_name, token=os.getenv("HF_TOKEN")) # Split dataset train_test = dataset["train"] test_dataset = dataset["val"] models = load_camera_models() # Start tracking emissions tracker.start() tracker.start_task("inference") #-------------------------------------------------------------------------------------------- # YOUR MODEL INFERENCE CODE HERE # Update the code below to replace the random baseline with your model inference #-------------------------------------------------------------------------------------------- predictions = [] true_labels = [] pred_boxes = [] true_boxes_list = [] # List of lists, each inner list contains boxes for one image # list of cameras result_cameras = ['brison-200', 'brison-110', 'courmettes-212', 'courmettes-160', 'brison-290', 'marguerite-29'] for example in test_dataset: # Parse true annotation (YOLO format: class_id x_center y_center width height) annotation = example.get("annotations", "").strip() has_smoke = len(annotation) > 0 true_labels.append(int(has_smoke)) image_path = example["image_name"] image = example["image"] # Extract camera name from the image path camera = next((cam for cam in result_cameras if cam in image_path), None) if camera: results = models[camera](image, verbose=False,imgsz=1280) else: results = models["default"](image, verbose=False,imgsz=1280) boxes = results[0].boxes.xywh.tolist() pred_has_smoke = len(boxes) > 0 predictions.append(int(pred_has_smoke)) if has_smoke: # If there's a true box, parse it and make box prediction # Parse all true boxes from the annotation image_true_boxes = parse_boxes(annotation) # Predicted bboxes # Iterate through the results for box in boxes: x, y, w, h = box image_width, image_height = image.size x = x / image_width y = y / image_height w_n = w / image_width h_n = h / image_height formatted_box = [x, y, w_n, h_n] pred_boxes.append(formatted_box) true_boxes_list.append(image_true_boxes) #-------------------------------------------------------------------------------------------- # YOUR MODEL INFERENCE STOPS HERE #-------------------------------------------------------------------------------------------- # Stop tracking emissions emissions_data = tracker.stop_task() # Calculate classification metrics classification_accuracy = accuracy_score(true_labels, predictions) classification_precision = precision_score(true_labels, predictions) classification_recall = recall_score(true_labels, predictions) # Calculate mean IoU for object detection (only for images with smoke) # For each image, we compute the max IoU between the predicted box and all true boxes ious = [] for true_boxes, pred_box in zip(true_boxes_list, pred_boxes): max_iou = compute_max_iou(true_boxes, pred_box) ious.append(max_iou) mean_iou = float(np.mean(ious)) if ious else 0.0 # Prepare results dictionary results = { "username": username, "space_url": space_url, "submission_timestamp": datetime.now().isoformat(), "model_description": DESCRIPTION, "classification_accuracy": float(classification_accuracy), "classification_precision": float(classification_precision), "classification_recall": float(classification_recall), "mean_iou": mean_iou, "energy_consumed_wh": emissions_data.energy_consumed * 1000, "emissions_gco2eq": emissions_data.emissions * 1000, "emissions_data": clean_emissions_data(emissions_data), "api_route": ROUTE, "dataset_config": { "dataset_name": request.dataset_name, "test_size": request.test_size, "test_seed": request.test_seed } } return results