test-space / main2.py
ayberkuckun
fix cv2-2
4109aa8
import argparse
import cv2
import numpy as np
import yaml
import os
import time
from boxmot import DeepOCSORT
def main(args):
# Get configs.
try:
with open(args.config_path) as file:
configs = yaml.safe_load(file)
except:
raise FileNotFoundError("Please give the correct path to the configs YAML file.")
videoSrcPath = args.source
if not os.path.exists(videoSrcPath):
print(f" Exit as the video path {videoSrcPath} doesnt exist")
return
cap = cv2.VideoCapture(videoSrcPath)
frames_count, fps, width, height = cap.get(cv2.CAP_PROP_FRAME_COUNT), cap.get(cv2.CAP_PROP_FPS), cap.get(
cv2.CAP_PROP_FRAME_WIDTH), cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
width = int(width)
height = int(height)
configs["width"] = width
configs["height"] = height
print(f"Input video #frames ={frames_count}, fps ={fps}, width ={width}, height={height}")
frame_number = 0
# cv2.namedWindow("output", cv2.WINDOW_AUTOSIZE)
from ultralytics.utils.checks import check_imgsz # resize?
imgsz = check_imgsz([width, height], min_dim=2)
from ultralytics import YOLO
model = YOLO(configs["model"])
# from deep_sort_realtime.deepsort_tracker import DeepSort
# tracker = DeepSort()
from pathlib import Path
tracker = DeepOCSORT(
model_weights=Path('osnet_x0_25_msmt17.pt'), # which ReID model to use
device='cpu:0',
fp16=False,
)
tracked_vehicles = {}
totalOutgoing = 0
totalIncoming = 0
while cap.isOpened():
ret, frame = cap.read()
if ret:
results = model(frame, imgsz=imgsz) # predict on an image
configs["classes"] = results[0].names
object_classes = results[0].boxes.cls.numpy() # .cpu()
object_confidences = results[0].boxes.conf.numpy() # .cpu()
object_coordinates = results[0].boxes.xyxy.numpy() # .cpu()
detections = []
for i in range(len(object_classes)):
if configs["classes"][object_classes[i]] in configs["vehicle_classes"]:
if object_confidences[i] > configs["min_class_confidence"]:
startX, startY = (int(object_coordinates[i][0]), int(object_coordinates[i][1]))
endX, endY = (int(object_coordinates[i][2]), int(object_coordinates[i][3]))
detections.append([startX, startY, endX, endY, object_confidences[i], object_classes[i]])
# detections.append(([startX, startY, endX - startX, endY - startY], object_confidences[i], object_classes[i]))
detections = np.array(detections)
tracks = tracker.update(detections, frame)
# tracks = tracker.update_tracks(detections, frame=frame)
# Counting lines for the vehicles in the frame.
frame = cv2.line(
frame, (0, configs["height"] - 150), (configs["width"], configs["height"] - 150), (0, 255, 255), 2
)
frame = cv2.line(
frame, (0, configs["height"] - 325), (configs["width"], configs["height"] - 325), (0, 255, 255), 2
)
for track in tracks:
(startX, startY, endX, endY, objectID, confidence, class_idx, _) = track
# startX, startY, endX, endY = track.original_ltwh[0], track.original_ltwh[1], track.original_ltwh[0] + \
# track.original_ltwh[2], track.original_ltwh[1] + track.original_ltwh[3]
# objectID = track.track_id
# confidence = track.det_conf
# class_idx = track.det_class
centroid = (int((startX + endX) / 2.0), int((startY + endY) / 2.0))
tracked_vehicle = tracked_vehicles.get(objectID, None)
if tracked_vehicle is None:
tracked_vehicle = TrackedVehicle(objectID, centroid)
else:
direction = centroid[1] - np.mean([centroid[1] for centroid in tracked_vehicle.centroids])
tracked_vehicle.centroids.append(centroid)
if not tracked_vehicle.counted:
if direction < 0 and centroid[1] < configs["height"] - 150 and centroid[1] > configs["height"] - 325:
totalOutgoing += 1
tracked_vehicle.direction = "Outgoing"
tracked_vehicle.counted = True
elif direction > 0 and centroid[1] > configs["height"] - 325 and centroid[1] < configs["height"] - 150:
totalIncoming += 1
tracked_vehicle.direction = "Incoming"
tracked_vehicle.counted = True
tracked_vehicles[objectID] = tracked_vehicle
y = startY - 10 if startY - 10 > 10 else startY + 10
text = f'id: {int(objectID)}, class: {configs["classes"][class_idx]}, conf: {confidence * 100:.1f}%'
frame = cv2.rectangle(frame, (int(startX), int(startY)), (int(endX), int(endY)),
configs["class_colours"][configs["classes"][class_idx]], 2)
frame = cv2.circle(frame, (centroid[0], centroid[1]), 4,
configs["class_colours"][configs["classes"][class_idx]], -1)
frame = cv2.putText(frame, text, (int(startX), int(y)), cv2.FONT_HERSHEY_SIMPLEX, 0.45,
configs["class_colours"][configs["classes"][class_idx]], 2)
frame = cv2.putText(frame, "Frame#: " + str(frame_number), (0, configs["height"] - 30), cv2.FONT_HERSHEY_SIMPLEX,
1, (2, 10, 200), 2)
frame = cv2.putText(frame, "Incoming#: " + str(totalIncoming), (0, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (2, 10, 200), 2)
frame = cv2.putText(frame, "Outgoing#: " + str(totalOutgoing), (configs["width"] - 300, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (2, 10, 200), 2)
# cv2.imshow('output', frame)
# key = cv2.waitKey(1)
# Quit when 'q' is pressed
# if key == ord('q'):
# break
# elif key == ord('k'):
# cv2.waitKey(0)
frame_number = frame_number + 1
yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
class TrackedVehicle:
def __init__(self, objectID, centroid):
# store the object ID, then initialize a list of centroids
# using the current centroid
self.objectID = objectID
self.centroids = [centroid]
self.direction = None
# initialize a boolean used to indicate if the object has
# already been counted or not
self.counted = False
def get_arguments():
parser = argparse.ArgumentParser(description='program to open a video and display; ',
formatter_class=argparse.RawTextHelpFormatter,
usage='\n #1: open a single video: >> python3 main.py -s "videoname.MP4"')
parser.add_argument('--source', "-s", type=str, required=True, help='source') # file/folder, 0 for webcam
parser.add_argument('--device', default='0', help='cuda device, i.e. 0 or 0,1,2,3 or cpu')
parser.add_argument('--img-size', type=int, default=640, help='inference size (pixels)') # not used 640 for now
parser.add_argument("--config_path", "-c", required=True, help="Path to user defined configs", type=str)
return parser.parse_args()
if __name__ == "__main__":
args = get_arguments()
print(args)
main(args)