Spaces:
Sleeping
Sleeping
from fastapi import APIRouter | |
from datetime import datetime | |
from datasets import load_dataset | |
import numpy as np | |
from sklearn.metrics import accuracy_score | |
import random | |
import os | |
from torch.utils.data import DataLoader | |
from torch.utils.data import Dataset | |
from PIL import Image | |
import torch | |
from ultralytics import YOLO | |
from .utils.evaluation import ImageEvaluationRequest | |
from .utils.emissions import tracker, clean_emissions_data, get_space_info | |
import os | |
import torch | |
import numpy as np | |
from PIL import Image | |
from transformers import MobileViTImageProcessor, MobileViTForSemanticSegmentation | |
import cv2 | |
from tqdm import tqdm | |
from torch.utils.data import DataLoader | |
from dotenv import load_dotenv | |
load_dotenv() | |
router = APIRouter() | |
DESCRIPTION = "Mobile-ViT Smoke Detection" | |
ROUTE = "/image" | |
device = "cpu" | |
model_path = "mobilevit_segmentation_full_data.pth" | |
feature_extractor = MobileViTImageProcessor.from_pretrained("apple/deeplabv3-mobilevit-xx-small") | |
model = MobileViTForSemanticSegmentation.from_pretrained("apple/deeplabv3-mobilevit-xx-small").to(device) | |
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu'))) | |
model.eval() | |
class SmokeDataset(torch.utils.data.Dataset): | |
def __init__(self, dataset, feature_extractor, target_size=(224, 224)): | |
self.dataset = dataset | |
self.feature_extractor = feature_extractor | |
self.target_size = target_size | |
def __len__(self): | |
return len(self.dataset) | |
def __getitem__(self, idx): | |
example = self.dataset[idx] | |
image = example["image"] | |
annotation = example.get("annotations", "").strip() | |
# Ensure image is resized to a fixed target size using PIL | |
if isinstance(image, torch.Tensor): | |
image = Image.fromarray(image.numpy()) | |
resized_image = image.resize(self.target_size, Image.ANTIALIAS) | |
# Process image using feature extractor | |
features = self.feature_extractor(images=resized_image, return_tensors="pt").pixel_values | |
return features.squeeze(0), annotation | |
def collate_fn(batch): | |
images, annotations = zip(*batch) | |
images = torch.stack(images) # Ensure batch has uniform shape | |
return images, annotations | |
def preprocess(image): | |
# Ensure input image is resized to a fixed size (512, 512) | |
image = image.resize((512, 512)) | |
# Convert to NumPy and ensure BGR normalization | |
image = np.array(image)[:, :, ::-1] # Convert RGB to BGR | |
image = np.array(image, dtype=np.float32) / 255.0 | |
# Return as a PIL Image for feature extractor compatibility | |
return Image.fromarray((image * 255).astype(np.uint8)) | |
def preprocess_batch(images): | |
""" | |
Preprocess a batch of images for MobileViT inference. | |
Resize to a fixed size (512, 512) and return as PIL Images. | |
""" | |
preprocessed_images = [] | |
for image in images: | |
resized_image = image.resize((512, 512)) | |
image_array = np.array(resized_image)[:, :, ::-1] # Convert RGB to BGR | |
image_float = np.array(image_array, dtype=np.float32) / 255.0 | |
processed_image = Image.fromarray((image_float * 255).astype(np.uint8)) | |
preprocessed_images.append(processed_image) | |
return preprocessed_images | |
def get_bounding_boxes_from_mask(mask): | |
"""Extract bounding boxes from a binary mask.""" | |
pred_boxes = [] | |
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
for contour in contours: | |
if len(contour) > 5: # Ignore small/noisy contours | |
x, y, w, h = cv2.boundingRect(contour) | |
pred_boxes.append((x, y, x + w, y + h)) | |
return pred_boxes | |
def parse_boxes(annotation_string): | |
"""Parse multiple boxes from a single annotation string. | |
Each box has 5 values: class_id, x_center, y_center, width, height""" | |
values = [float(x) for x in annotation_string.strip().split()] | |
boxes = [] | |
# Each box has 5 values | |
for i in range(0, len(values), 5): | |
if i + 5 <= len(values): | |
# Skip class_id (first value) and take the next 4 values | |
box = values[i+1:i+5] | |
boxes.append(box) | |
return boxes | |
def compute_iou(box1, box2): | |
"""Compute Intersection over Union (IoU) between two YOLO format boxes.""" | |
# Convert YOLO format (x_center, y_center, width, height) to corners | |
def yolo_to_corners(box): | |
x_center, y_center, width, height = box | |
x1 = x_center - width/2 | |
y1 = y_center - height/2 | |
x2 = x_center + width/2 | |
y2 = y_center + height/2 | |
return np.array([x1, y1, x2, y2]) | |
box1_corners = yolo_to_corners(box1) | |
box2_corners = yolo_to_corners(box2) | |
# Calculate intersection | |
x1 = max(box1_corners[0], box2_corners[0]) | |
y1 = max(box1_corners[1], box2_corners[1]) | |
x2 = min(box1_corners[2], box2_corners[2]) | |
y2 = min(box1_corners[3], box2_corners[3]) | |
intersection = max(0, x2 - x1) * max(0, y2 - y1) | |
# Calculate union | |
box1_area = (box1_corners[2] - box1_corners[0]) * (box1_corners[3] - box1_corners[1]) | |
box2_area = (box2_corners[2] - box2_corners[0]) * (box2_corners[3] - box2_corners[1]) | |
union = box1_area + box2_area - intersection | |
return intersection / (union + 1e-6) | |
def compute_max_iou(true_boxes, pred_box): | |
"""Compute maximum IoU between a predicted box and all true boxes""" | |
max_iou = 0 | |
for true_box in true_boxes: | |
iou = compute_iou(true_box, pred_box) | |
max_iou = max(max_iou, iou) | |
return max_iou | |
async def evaluate_image(request: ImageEvaluationRequest): | |
""" | |
Evaluate image classification and object detection for forest fire smoke. | |
Current Model: Random Baseline | |
- Makes random predictions for both classification and bounding boxes | |
- Used as a baseline for comparison | |
Metrics: | |
- Classification accuracy: Whether an image contains smoke or not | |
- Object Detection accuracy: IoU (Intersection over Union) for smoke bounding boxes | |
""" | |
# Get space info | |
username, space_url = get_space_info() | |
# Load and prepare the dataset | |
dataset = load_dataset(request.dataset_name, token=os.getenv("HF_TOKEN")) | |
# Split dataset | |
test_dataset = dataset["test"] | |
# Start tracking emissions | |
tracker.start() | |
tracker.start_task("inference") | |
#-------------------------------------------------------------------------------------------- | |
# YOUR MODEL INFERENCE CODE HERE | |
# Update the code below to replace the random baseline with your model inference | |
#-------------------------------------------------------------------------------------------- | |
smoke_dataset = SmokeDataset(test_dataset,feature_extractor) | |
# dataloader = DataLoader(smoke_dataset, batch_size=16, shuffle=False) | |
dataloader = DataLoader(dataset["test"], batch_size=8, collate_fn=collate_fn) | |
predictions = [] | |
true_labels = [] | |
pred_boxes = [] | |
true_boxes_list = [] | |
for batch_images, batch_annotations in dataloader: | |
batch_images = batch_images.to(device) | |
with torch.no_grad(): | |
outputs = model(pixel_values=batch_images) | |
logits = outputs.logits | |
probabilities = torch.sigmoid(logits) | |
batch_predicted_masks = (probabilities[:, 1, :, :] > 0.30).cpu().numpy().astype(np.uint8) | |
# Post-process predictions and compute metrics | |
for mask, annotation in zip(batch_predicted_masks, batch_annotations): | |
predicted_mask_resized = cv2.resize(mask, (512, 512), interpolation=cv2.INTER_NEAREST) | |
predicted_boxes = get_bounding_boxes_from_mask(predicted_mask_resized) | |
pred_boxes.append(predicted_boxes) | |
predictions.append(1 if len(predicted_boxes) > 0 else 0) | |
true_labels.append(1 if annotation else 0) | |
# Append smoke detection based on bounding boxes | |
predictions.append(1 if len(predicted_boxes) > 0 else 0) | |
print(f"Batch {batch_idx + 1}, Image Prediction: {1 if len(predicted_boxes) > 0 else 0}") | |
# Parse true boxes for this batch | |
for annotation in annotations: | |
if len(annotation) > 0: | |
true_boxes_list.append(parse_boxes(annotation)) | |
else: | |
true_boxes_list.append([]) | |
# for example in test_dataset: | |
# # Extract image and annotations | |
# image = example["image"] | |
# original_shape = image.size | |
# annotation = example.get("annotations", "").strip() | |
# has_smoke = len(annotation) > 0 | |
# true_labels.append(1 if has_smoke else 0) | |
# if has_smoke: | |
# image_true_boxes = parse_boxes(annotation) | |
# if image_true_boxes: | |
# true_boxes_list.append(image_true_boxes) | |
# else: | |
# true_boxes_list.append([]) | |
# else: | |
# true_boxes_list.append([]) | |
# # Model Inference | |
# # Preprocess image | |
# image = preprocess(image) | |
# # Ensure correct feature extraction | |
# image_input = feature_extractor(images=image, return_tensors="pt").pixel_values | |
# # Perform inference | |
# with torch.no_grad(): | |
# outputs = model(pixel_values=image_input) | |
# logits = outputs.logits | |
# # Threshold and process the segmentation mask | |
# probabilities = torch.sigmoid(logits) | |
# predicted_mask = (probabilities[0, 1] > 0.30).cpu().numpy().astype(np.uint8) | |
# predicted_mask_resized = cv2.resize(predicted_mask, (512,512), interpolation=cv2.INTER_NEAREST) | |
# # Extract bounding boxes | |
# predicted_boxes = get_bounding_boxes_from_mask(predicted_mask_resized) | |
# pred_boxes.append(predicted_boxes) | |
# # Smoke prediction based on bounding box presence | |
# predictions.append(1 if len(predicted_boxes) > 0 else 0) | |
# print(f"Prediction : {1 if len(predicted_boxes) > 0 else 0}") | |
# # Filter only valid box pairs | |
# filtered_true_boxes_list = [] | |
# filtered_pred_boxes = [] | |
# for true_boxes, pred_boxes_entry in zip(true_boxes_list, pred_boxes): | |
# if true_boxes and pred_boxes_entry: | |
# filtered_true_boxes_list.append(true_boxes) | |
# filtered_pred_boxes.append(pred_boxes_entry) | |
# true_boxes_list = filtered_true_boxes_list | |
# pred_boxes = filtered_pred_boxes | |
#-------------------------------------------------------------------------------------------- | |
# YOUR MODEL INFERENCE STOPS HERE | |
#-------------------------------------------------------------------------------------------- | |
# Stop tracking emissions | |
emissions_data = tracker.stop_task() | |
# Calculate classification accuracy | |
classification_accuracy = accuracy_score(true_labels, predictions) | |
# Calculate mean IoU for object detection (only for images with smoke) | |
# For each image, we compute the max IoU between the predicted box and all true boxes | |
ious = [] | |
for true_boxes, pred_box in zip(true_boxes_list, pred_boxes): | |
max_iou = compute_max_iou(true_boxes, pred_box) | |
ious.append(max_iou) | |
mean_iou = float(np.mean(ious)) if ious else 0.0 | |
# Prepare results dictionary | |
results = { | |
"username": username, | |
"space_url": space_url, | |
"submission_timestamp": datetime.now().isoformat(), | |
"model_description": DESCRIPTION, | |
"classification_accuracy": float(classification_accuracy), | |
"mean_iou": mean_iou, | |
"energy_consumed_wh": emissions_data.energy_consumed * 1000, | |
"emissions_gco2eq": emissions_data.emissions * 1000, | |
"emissions_data": clean_emissions_data(emissions_data), | |
"api_route": ROUTE, | |
"dataset_config": { | |
"dataset_name": request.dataset_name, | |
"test_size": request.test_size, | |
"test_seed": request.test_seed | |
} | |
} | |
return results |