File size: 6,052 Bytes
8ae97f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import os
import cv2
import torch
import pandas as pd
from tqdm import tqdm
from ultralytics import YOLO
from PIL import Image
import pillow_heif
import numpy as np

class MediaProcessor:
    def __init__(self, output_path, model_path, batch_size=16):
        self.output_path = output_path
        self.model_path = model_path
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model = YOLO(self.model_path).to(self.device)
        self.colors = {
            0: (255, 0, 0),    # quadrotor - красный
            1: (0, 255, 0),    # airplane - зеленый
            2: (0, 0, 255),    # helicopter - синий
            3: (255, 255, 0),  # bird - желтый
            4: (255, 0, 255)   # uav-plane - фиолетовый
        }
        self.batch_size = batch_size

    def process_single_video(self, video_path):
        cap = cv2.VideoCapture(video_path)
        output_video_path = os.path.join(self.output_path, os.path.basename(video_path))
        fourcc = cv2.VideoWriter_fourcc(*'avc1')#*'avc1')
        fps = cap.get(cv2.CAP_PROP_FPS)
        out = cv2.VideoWriter(output_video_path, fourcc, fps, (int(cap.get(3)), int(cap.get(4))))

        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frames = []

        columns = ['frame_num', 'timestamp', 'class', 'confidence', 'x1', 'y1', 'x2', 'y2']
        data = []

        frame_num = 0

        with tqdm(total=total_frames, desc=f"Processing Video {os.path.basename(video_path)}", position=0, leave=True) as pbar:
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break

                frames.append(frame)
                frame_num += 1

                if len(frames) == self.batch_size or frame_num == total_frames:
                    results = self.model(frames, verbose=False)
                    
                    for i, result in enumerate(results):
                        current_frame_num = frame_num - len(frames) + i + 1
                        timestamp = current_frame_num / fps
                        for box in result.boxes:
                            x1, y1, x2, y2 = box.xyxy[0].tolist()
                            conf = box.conf[0].item()
                            cls = box.cls[0].item()
                            label = f'{self.model.names[int(cls)]} {conf:.2f}'
                            color = self.colors.get(int(cls), (0, 255, 0))
                            cv2.rectangle(frames[i], (int(x1), int(y1)), (int(x2), int(y2)), color, 1)
                            cv2.putText(frames[i], label, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

                            data.append([current_frame_num, timestamp, self.model.names[int(cls)], conf, int(x1), int(y1), int(x2), int(y2)])

                        out.write(frames[i])
                        pbar.update(1)

                    frames = []

        cap.release()
        out.release()
        cv2.destroyAllWindows()

        df = pd.DataFrame(data, columns=columns)
        df.to_csv(os.path.join('metadata', f"{os.path.basename(video_path)}_detection_results.csv"), index=False)
        print(df)
        return output_video_path

    def load_image(self, path):
        if path.lower().endswith('.heic'):
            heif_file = pillow_heif.open_heif(path)
            image = Image.frombytes(
                heif_file.mode,
                heif_file.size,
                heif_file.data,
                "raw",
                heif_file.mode,
                heif_file.stride,
            )
            return cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
        else:
            return cv2.imread(path)

    def process_images(self, input_paths):
        images = [self.load_image(path) for path in input_paths]
        results = self.model(images, verbose=False)
        #print(results)
        processed_images = []

        for i, result in enumerate(results):
            for box in result.boxes:
                x1, y1, x2, y2 = box.xyxy[0].tolist()
                conf = box.conf[0].item()
                cls = box.cls[0].item()
                label = f'{self.model.names[int(cls)]} {conf:.2f}'
                color = self.colors.get(int(cls), (0, 255, 0))
                cv2.rectangle(images[i], (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
                cv2.putText(images[i], label, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

            # Сохраняем все изображения в формате PNG
            processed_image_path = os.path.join(self.output_path, str(os.path.splitext(os.path.basename(input_paths[i]))[0]) + '.png')
            print(f"Сохранение изображения по пути: {processed_image_path}")
            processed_image = Image.fromarray(cv2.cvtColor(images[i], cv2.COLOR_BGR2RGB))
            processed_image.save(processed_image_path, format='PNG')
            processed_images.append(processed_image_path)

        return processed_images

    def process_videos(self, input_paths):
        vids = []
        for video_path in input_paths:
            output_video_path = self.process_single_video(video_path)
            vids.append(output_video_path)
        return vids

def process_media(input_paths, processor):
    image_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.heic', '.heif', '.webp')
    video_extensions = ('.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm')

    image_paths = [path for path in input_paths if path.lower().endswith(image_extensions)]
    video_paths = [path for path in input_paths if path.lower().endswith(video_extensions)]

    imgs, vids = [], []

    if image_paths:
        imgs = processor.process_images(image_paths)
    if video_paths:
        vids = processor.process_videos(video_paths)
    return imgs, vids