Spaces:
Sleeping
Sleeping
import argparse | |
import cv2 | |
import numpy as np | |
import yaml | |
import os | |
import time | |
from boxmot import DeepOCSORT | |
def main(args): | |
# Get configs. | |
try: | |
with open(args.config_path) as file: | |
configs = yaml.safe_load(file) | |
except: | |
raise FileNotFoundError("Please give the correct path to the configs YAML file.") | |
videoSrcPath = args.source | |
if not os.path.exists(videoSrcPath): | |
print(f" Exit as the video path {videoSrcPath} doesnt exist") | |
return | |
cap = cv2.VideoCapture(videoSrcPath) | |
frames_count, fps, width, height = cap.get(cv2.CAP_PROP_FRAME_COUNT), cap.get(cv2.CAP_PROP_FPS), cap.get( | |
cv2.CAP_PROP_FRAME_WIDTH), cap.get(cv2.CAP_PROP_FRAME_HEIGHT) | |
width = int(width) | |
height = int(height) | |
configs["width"] = width | |
configs["height"] = height | |
print(f"Input video #frames ={frames_count}, fps ={fps}, width ={width}, height={height}") | |
frame_number = 0 | |
# cv2.namedWindow("output", cv2.WINDOW_AUTOSIZE) | |
from ultralytics.utils.checks import check_imgsz # resize? | |
imgsz = check_imgsz([width, height], min_dim=2) | |
from ultralytics import YOLO | |
model = YOLO(configs["model"]) | |
# from deep_sort_realtime.deepsort_tracker import DeepSort | |
# tracker = DeepSort() | |
from pathlib import Path | |
tracker = DeepOCSORT( | |
model_weights=Path('osnet_x0_25_msmt17.pt'), # which ReID model to use | |
device='cpu:0', | |
fp16=False, | |
) | |
tracked_vehicles = {} | |
totalOutgoing = 0 | |
totalIncoming = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if ret: | |
results = model(frame, imgsz=imgsz) # predict on an image | |
configs["classes"] = results[0].names | |
object_classes = results[0].boxes.cls.numpy() # .cpu() | |
object_confidences = results[0].boxes.conf.numpy() # .cpu() | |
object_coordinates = results[0].boxes.xyxy.numpy() # .cpu() | |
detections = [] | |
for i in range(len(object_classes)): | |
if configs["classes"][object_classes[i]] in configs["vehicle_classes"]: | |
if object_confidences[i] > configs["min_class_confidence"]: | |
startX, startY = (int(object_coordinates[i][0]), int(object_coordinates[i][1])) | |
endX, endY = (int(object_coordinates[i][2]), int(object_coordinates[i][3])) | |
detections.append([startX, startY, endX, endY, object_confidences[i], object_classes[i]]) | |
# detections.append(([startX, startY, endX - startX, endY - startY], object_confidences[i], object_classes[i])) | |
detections = np.array(detections) | |
tracks = tracker.update(detections, frame) | |
# tracks = tracker.update_tracks(detections, frame=frame) | |
# Counting lines for the vehicles in the frame. | |
frame = cv2.line( | |
frame, (0, configs["height"] - 150), (configs["width"], configs["height"] - 150), (0, 255, 255), 2 | |
) | |
frame = cv2.line( | |
frame, (0, configs["height"] - 325), (configs["width"], configs["height"] - 325), (0, 255, 255), 2 | |
) | |
for track in tracks: | |
(startX, startY, endX, endY, objectID, confidence, class_idx, _) = track | |
# startX, startY, endX, endY = track.original_ltwh[0], track.original_ltwh[1], track.original_ltwh[0] + \ | |
# track.original_ltwh[2], track.original_ltwh[1] + track.original_ltwh[3] | |
# objectID = track.track_id | |
# confidence = track.det_conf | |
# class_idx = track.det_class | |
centroid = (int((startX + endX) / 2.0), int((startY + endY) / 2.0)) | |
tracked_vehicle = tracked_vehicles.get(objectID, None) | |
if tracked_vehicle is None: | |
tracked_vehicle = TrackedVehicle(objectID, centroid) | |
else: | |
direction = centroid[1] - np.mean([centroid[1] for centroid in tracked_vehicle.centroids]) | |
tracked_vehicle.centroids.append(centroid) | |
if not tracked_vehicle.counted: | |
if direction < 0 and centroid[1] < configs["height"] - 150 and centroid[1] > configs["height"] - 325: | |
totalOutgoing += 1 | |
tracked_vehicle.direction = "Outgoing" | |
tracked_vehicle.counted = True | |
elif direction > 0 and centroid[1] > configs["height"] - 325 and centroid[1] < configs["height"] - 150: | |
totalIncoming += 1 | |
tracked_vehicle.direction = "Incoming" | |
tracked_vehicle.counted = True | |
tracked_vehicles[objectID] = tracked_vehicle | |
y = startY - 10 if startY - 10 > 10 else startY + 10 | |
text = f'id: {int(objectID)}, class: {configs["classes"][class_idx]}, conf: {confidence * 100:.1f}%' | |
frame = cv2.rectangle(frame, (int(startX), int(startY)), (int(endX), int(endY)), | |
configs["class_colours"][configs["classes"][class_idx]], 2) | |
frame = cv2.circle(frame, (centroid[0], centroid[1]), 4, | |
configs["class_colours"][configs["classes"][class_idx]], -1) | |
frame = cv2.putText(frame, text, (int(startX), int(y)), cv2.FONT_HERSHEY_SIMPLEX, 0.45, | |
configs["class_colours"][configs["classes"][class_idx]], 2) | |
frame = cv2.putText(frame, "Frame#: " + str(frame_number), (0, configs["height"] - 30), cv2.FONT_HERSHEY_SIMPLEX, | |
1, (2, 10, 200), 2) | |
frame = cv2.putText(frame, "Incoming#: " + str(totalIncoming), (0, 30), | |
cv2.FONT_HERSHEY_SIMPLEX, 1, (2, 10, 200), 2) | |
frame = cv2.putText(frame, "Outgoing#: " + str(totalOutgoing), (configs["width"] - 300, 30), | |
cv2.FONT_HERSHEY_SIMPLEX, 1, (2, 10, 200), 2) | |
# cv2.imshow('output', frame) | |
# key = cv2.waitKey(1) | |
# Quit when 'q' is pressed | |
# if key == ord('q'): | |
# break | |
# elif key == ord('k'): | |
# cv2.waitKey(0) | |
frame_number = frame_number + 1 | |
yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
class TrackedVehicle: | |
def __init__(self, objectID, centroid): | |
# store the object ID, then initialize a list of centroids | |
# using the current centroid | |
self.objectID = objectID | |
self.centroids = [centroid] | |
self.direction = None | |
# initialize a boolean used to indicate if the object has | |
# already been counted or not | |
self.counted = False | |
def get_arguments(): | |
parser = argparse.ArgumentParser(description='program to open a video and display; ', | |
formatter_class=argparse.RawTextHelpFormatter, | |
usage='\n #1: open a single video: >> python3 main.py -s "videoname.MP4"') | |
parser.add_argument('--source', "-s", type=str, required=True, help='source') # file/folder, 0 for webcam | |
parser.add_argument('--device', default='0', help='cuda device, i.e. 0 or 0,1,2,3 or cpu') | |
parser.add_argument('--img-size', type=int, default=640, help='inference size (pixels)') # not used 640 for now | |
parser.add_argument("--config_path", "-c", required=True, help="Path to user defined configs", type=str) | |
return parser.parse_args() | |
if __name__ == "__main__": | |
args = get_arguments() | |
print(args) | |
main(args) | |