-
Notifications
You must be signed in to change notification settings - Fork 16
/
segmentor.py
145 lines (120 loc) · 5.44 KB
/
segmentor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import os
import random
from typing import List, Tuple
import matplotlib.pyplot as plt
import numpy as np
import torch
from PIL.Image import Image
from matplotlib.pyplot import figure, imshow, axis
from pytube import YouTube
from torch import nn
from pipeline import BuildDataset
# images constituting a segments and the length in seconds
Segment = Tuple[List[Image], int]
class Segmentor:
def __init__(self,
model: nn.Module,
min_frames: int,
threshold: float):
self.model = model
self.min_frames = min_frames
self.threshold = threshold
@staticmethod
def _segmentor(preds: List[int],
min_frames: int,
threshold: float) -> List[List[int]]:
candidates = []
n = len(preds)
for idx_start in range(n):
if preds[idx_start] == 1:
if n - idx_start >= min_frames:
best_here = (-1, (-1, -1))
for idx_end in range(idx_start + min_frames - 1, len(preds)):
if preds[idx_end] == 1:
if np.mean(preds[idx_start:idx_end + 1]) >= threshold:
frames = idx_end - idx_start + 1
endpoints = (idx_start, idx_end)
if frames > best_here[0]:
best_here = (frames, endpoints)
if best_here[0] > 0:
candidates.append(best_here[1])
overlap = True
while overlap:
overlap = False
for i in range(len(candidates)):
ref_idx_start, ref_idx_end = candidates[i]
for j in range(i + 1, len(candidates)):
comp_idx_start, comp_idx_end = candidates[j]
if ref_idx_start <= comp_idx_end <= ref_idx_end or ref_idx_start <= comp_idx_start <= ref_idx_end:
# overlapping, take the longer one
if comp_idx_end - comp_idx_end > ref_idx_end - ref_idx_start:
del candidates[i]
else:
del candidates[j]
overlap = True
if overlap:
break
if overlap:
break
return [list(range(idx_start, idx_end + 1)) for idx_start, idx_end in candidates]
@staticmethod
def _torch_img_to_pil(img: torch.Tensor) -> Image:
return BuildDataset.transform_reverse(img)
@staticmethod
def _get_segment_len(indices: List[int]):
return max(indices) - min(indices) + 1
def segmentor(self, preds: List[int], images: List[torch.Tensor]) -> List[Segment]:
segment_list = self._segmentor(preds, self.min_frames, self.threshold)
return [
([self._torch_img_to_pil(images[idx])
for idx in segment_idx], self._get_segment_len(segment_idx))
for segment_idx in segment_list]
def _predict(self, audio: torch.Tensor, image: torch.Tensor) -> int:
return int(torch.max(self.model(audio.unsqueeze(0), image.unsqueeze(0)), 1)[1][0])
def get_segments(self, path_video: str) -> List[Segment]:
audio, images = BuildDataset.one_video_extract_audio_and_stills(path_video)
preds = [self._predict(audio[idx], images[idx]) for idx in range(len(images))]
return self.segmentor(preds, images)
@staticmethod
def show_images_horizontally(images: List[Image]) -> None:
# https://stackoverflow.com/questions/36006136/how-to-display-images-in-a-row-with-ipython-display
fig = figure(figsize=(20, 20))
number_of_files = len(images)
for i in range(number_of_files):
a = fig.add_subplot(1, number_of_files, i + 1)
image = images[i]
imshow(image)
axis('off')
plt.show()
def visualize_segments(self, path_video: str, n_to_show: int = 10) -> None:
segments = self.get_segments(path_video)
n_segments = len(segments)
print(f'Found {len(segments)} segments')
if n_segments > 0:
for i, (segment_images, segment_len) in enumerate(segments):
print(f'Segment {i + 1}, {segment_len} seconds')
print(f'First {n_to_show}')
self.show_images_horizontally(segment_images[:n_to_show])
print(f'{n_to_show} random shots')
self.show_images_horizontally(random.sample(segment_images, n_to_show))
print('Last 10')
self.show_images_horizontally(segment_images[-n_to_show:])
print('=' * 10)
@staticmethod
def _download_youtube_video(youtube_id: str, show_title: bool = True) -> str:
yt = YouTube(f'http://youtube.com/watch?v={youtube_id}')
if show_title:
print(f'Title: {yt.title}')
yt_stream = yt.streams.first()
path = f'{yt_stream.default_filename}'
yt_stream.download()
return path
def visualize_segments_youtube(self,
youtube_id: str,
n_to_show: int = 10,
show_title: bool = True,
remove_file: bool = True):
path = self._download_youtube_video(youtube_id, show_title)
self.visualize_segments(path, n_to_show)
if remove_file:
os.remove(path)