import argparse import cv2 import numpy as np import yaml import os import time from boxmot import DeepOCSORT def main(args): # Get configs. try: with open(args.config_path) as file: configs = yaml.safe_load(file) except: raise FileNotFoundError("Please give the correct path to the configs YAML file.") videoSrcPath = args.source if not os.path.exists(videoSrcPath): print(f" Exit as the video path {videoSrcPath} doesnt exist") return cap = cv2.VideoCapture(videoSrcPath) frames_count, fps, width, height = cap.get(cv2.CAP_PROP_FRAME_COUNT), cap.get(cv2.CAP_PROP_FPS), cap.get( cv2.CAP_PROP_FRAME_WIDTH), cap.get(cv2.CAP_PROP_FRAME_HEIGHT) width = int(width) height = int(height) configs["width"] = width configs["height"] = height print(f"Input video #frames ={frames_count}, fps ={fps}, width ={width}, height={height}") frame_number = 0 # cv2.namedWindow("output", cv2.WINDOW_AUTOSIZE) from ultralytics.utils.checks import check_imgsz # resize? imgsz = check_imgsz([width, height], min_dim=2) from ultralytics import YOLO model = YOLO(configs["model"]) # from deep_sort_realtime.deepsort_tracker import DeepSort # tracker = DeepSort() from pathlib import Path tracker = DeepOCSORT( model_weights=Path('osnet_x0_25_msmt17.pt'), # which ReID model to use device='cpu:0', fp16=False, ) tracked_vehicles = {} totalOutgoing = 0 totalIncoming = 0 while cap.isOpened(): ret, frame = cap.read() if ret: results = model(frame, imgsz=imgsz) # predict on an image configs["classes"] = results[0].names object_classes = results[0].boxes.cls.numpy() # .cpu() object_confidences = results[0].boxes.conf.numpy() # .cpu() object_coordinates = results[0].boxes.xyxy.numpy() # .cpu() detections = [] for i in range(len(object_classes)): if configs["classes"][object_classes[i]] in configs["vehicle_classes"]: if object_confidences[i] > configs["min_class_confidence"]: startX, startY = (int(object_coordinates[i][0]), int(object_coordinates[i][1])) endX, endY = (int(object_coordinates[i][2]), int(object_coordinates[i][3])) detections.append([startX, startY, endX, endY, object_confidences[i], object_classes[i]]) # detections.append(([startX, startY, endX - startX, endY - startY], object_confidences[i], object_classes[i])) detections = np.array(detections) tracks = tracker.update(detections, frame) # tracks = tracker.update_tracks(detections, frame=frame) # Counting lines for the vehicles in the frame. frame = cv2.line( frame, (0, configs["height"] - 150), (configs["width"], configs["height"] - 150), (0, 255, 255), 2 ) frame = cv2.line( frame, (0, configs["height"] - 325), (configs["width"], configs["height"] - 325), (0, 255, 255), 2 ) for track in tracks: (startX, startY, endX, endY, objectID, confidence, class_idx, _) = track # startX, startY, endX, endY = track.original_ltwh[0], track.original_ltwh[1], track.original_ltwh[0] + \ # track.original_ltwh[2], track.original_ltwh[1] + track.original_ltwh[3] # objectID = track.track_id # confidence = track.det_conf # class_idx = track.det_class centroid = (int((startX + endX) / 2.0), int((startY + endY) / 2.0)) tracked_vehicle = tracked_vehicles.get(objectID, None) if tracked_vehicle is None: tracked_vehicle = TrackedVehicle(objectID, centroid) else: direction = centroid[1] - np.mean([centroid[1] for centroid in tracked_vehicle.centroids]) tracked_vehicle.centroids.append(centroid) if not tracked_vehicle.counted: if direction < 0 and centroid[1] < configs["height"] - 150 and centroid[1] > configs["height"] - 325: totalOutgoing += 1 tracked_vehicle.direction = "Outgoing" tracked_vehicle.counted = True elif direction > 0 and centroid[1] > configs["height"] - 325 and centroid[1] < configs["height"] - 150: totalIncoming += 1 tracked_vehicle.direction = "Incoming" tracked_vehicle.counted = True tracked_vehicles[objectID] = tracked_vehicle y = startY - 10 if startY - 10 > 10 else startY + 10 text = f'id: {int(objectID)}, class: {configs["classes"][class_idx]}, conf: {confidence * 100:.1f}%' frame = cv2.rectangle(frame, (int(startX), int(startY)), (int(endX), int(endY)), configs["class_colours"][configs["classes"][class_idx]], 2) frame = cv2.circle(frame, (centroid[0], centroid[1]), 4, configs["class_colours"][configs["classes"][class_idx]], -1) frame = cv2.putText(frame, text, (int(startX), int(y)), cv2.FONT_HERSHEY_SIMPLEX, 0.45, configs["class_colours"][configs["classes"][class_idx]], 2) frame = cv2.putText(frame, "Frame#: " + str(frame_number), (0, configs["height"] - 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (2, 10, 200), 2) frame = cv2.putText(frame, "Incoming#: " + str(totalIncoming), (0, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (2, 10, 200), 2) frame = cv2.putText(frame, "Outgoing#: " + str(totalOutgoing), (configs["width"] - 300, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (2, 10, 200), 2) # cv2.imshow('output', frame) # key = cv2.waitKey(1) # Quit when 'q' is pressed # if key == ord('q'): # break # elif key == ord('k'): # cv2.waitKey(0) frame_number = frame_number + 1 yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) class TrackedVehicle: def __init__(self, objectID, centroid): # store the object ID, then initialize a list of centroids # using the current centroid self.objectID = objectID self.centroids = [centroid] self.direction = None # initialize a boolean used to indicate if the object has # already been counted or not self.counted = False def get_arguments(): parser = argparse.ArgumentParser(description='program to open a video and display; ', formatter_class=argparse.RawTextHelpFormatter, usage='\n #1: open a single video: >> python3 main.py -s "videoname.MP4"') parser.add_argument('--source', "-s", type=str, required=True, help='source') # file/folder, 0 for webcam parser.add_argument('--device', default='0', help='cuda device, i.e. 0 or 0,1,2,3 or cpu') parser.add_argument('--img-size', type=int, default=640, help='inference size (pixels)') # not used 640 for now parser.add_argument("--config_path", "-c", required=True, help="Path to user defined configs", type=str) return parser.parse_args() if __name__ == "__main__": args = get_arguments() print(args) main(args)