|
import gradio as gr |
|
from PIL import Image, ImageDraw, ImageFont |
|
from transformers import pipeline |
|
import cv2 |
|
import numpy as np |
|
import tempfile |
|
import os |
|
|
|
|
|
object_detector = pipeline("object-detection", |
|
model="facebook/detr-resnet-50") |
|
|
|
def draw_bounding_boxes(frame, detections): |
|
""" |
|
Draws bounding boxes on the video frame based on the detections. |
|
""" |
|
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
pil_image = Image.fromarray(frame_rgb) |
|
draw = ImageDraw.Draw(pil_image) |
|
|
|
|
|
font = ImageFont.load_default() |
|
|
|
for detection in detections: |
|
box = detection['box'] |
|
xmin = int(box['xmin']) |
|
ymin = int(box['ymin']) |
|
xmax = int(box['xmax']) |
|
ymax = int(box['ymax']) |
|
|
|
|
|
draw.rectangle([(xmin, ymin), (xmax, ymax)], outline="red", width=3) |
|
|
|
|
|
label = detection['label'] |
|
score = detection['score'] |
|
text = f"{label} {score:.2f}" |
|
|
|
|
|
text_bbox = draw.textbbox((xmin, ymin), text, font=font) |
|
draw.rectangle([ |
|
(text_bbox[0], text_bbox[1]), |
|
(text_bbox[2], text_bbox[3]) |
|
], fill="red") |
|
draw.text((xmin, ymin), text, fill="white", font=font) |
|
|
|
|
|
frame_with_boxes = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) |
|
return frame_with_boxes |
|
|
|
def create_output_writer(cap, output_path): |
|
""" |
|
Create video writer with different codecs, trying multiple options |
|
""" |
|
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
fps = int(cap.get(cv2.CAP_PROP_FPS)) |
|
|
|
|
|
codecs = [ |
|
('mp4v', '.mp4'), |
|
('avc1', '.mp4'), |
|
('XVID', '.avi'), |
|
('MJPG', '.avi') |
|
] |
|
|
|
for codec, ext in codecs: |
|
try: |
|
output_file = os.path.splitext(output_path)[0] + ext |
|
fourcc = cv2.VideoWriter_fourcc(*codec) |
|
out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height)) |
|
|
|
if out is not None and out.isOpened(): |
|
return out, output_file |
|
|
|
except Exception as e: |
|
print(f"Failed with codec {codec}: {str(e)}") |
|
continue |
|
|
|
raise ValueError("Could not initialize any video codec") |
|
|
|
def frame_to_pil(frame): |
|
"""Convert OpenCV frame to PIL Image""" |
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
return Image.fromarray(rgb_frame) |
|
|
|
def process_video(video_path, progress=gr.Progress()): |
|
""" |
|
Process the video file and return the path to the processed video |
|
""" |
|
try: |
|
|
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
raise ValueError("Could not open video file") |
|
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
output_dir = os.path.join(os.path.expanduser("~"), "Videos", "ObjectDetection") |
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
output_path = os.path.join(output_dir, "output_video.mp4") |
|
|
|
|
|
out, output_path = create_output_writer(cap, output_path) |
|
|
|
frame_count = 0 |
|
process_every_n_frames = 1 |
|
|
|
progress(0, desc="Processing video...") |
|
|
|
while True: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
frame_count += 1 |
|
|
|
|
|
if frame_count % process_every_n_frames == 0: |
|
|
|
pil_frame = frame_to_pil(frame) |
|
|
|
try: |
|
|
|
detections = object_detector(pil_frame) |
|
|
|
|
|
frame = draw_bounding_boxes(frame, detections) |
|
except Exception as e: |
|
print(f"Error processing frame {frame_count}: {str(e)}") |
|
|
|
pass |
|
|
|
|
|
out.write(frame) |
|
|
|
|
|
progress((frame_count / total_frames), desc=f"Processing frame {frame_count}/{total_frames}") |
|
|
|
|
|
cap.release() |
|
out.release() |
|
|
|
|
|
if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: |
|
raise ValueError("Output video file is empty or was not created") |
|
|
|
return output_path |
|
|
|
except Exception as e: |
|
print(f"Error processing video: {str(e)}") |
|
raise gr.Error(f"Error processing video: {str(e)}") |
|
|
|
def detect_objects_in_video(video): |
|
""" |
|
Gradio interface function for video object detection |
|
""" |
|
if video is None: |
|
raise gr.Error("Please upload a video file") |
|
|
|
try: |
|
|
|
output_path = process_video(video) |
|
return output_path |
|
|
|
except Exception as e: |
|
raise gr.Error(f"Error during video processing: {str(e)}") |
|
|
|
|
|
demo = gr.Interface( |
|
fn=detect_objects_in_video, |
|
inputs=[ |
|
gr.Video(label="Upload Video") |
|
], |
|
outputs=[ |
|
gr.Video(label="Processed Video") |
|
], |
|
title="@GenAILearniverse Project: Video Object Detection", |
|
description=""" |
|
Upload a video to detect and track objects within it. |
|
The application will process the video and draw bounding boxes around detected objects |
|
with their labels and confidence scores. |
|
Note: Processing may take some time depending on the video length. |
|
""", |
|
examples=[], |
|
cache_examples=False |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |