-
Notifications
You must be signed in to change notification settings - Fork 5
/
h5_generator.py
177 lines (122 loc) · 5.36 KB
/
h5_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from tensorflow.keras.utils import Sequence
import h5py as h5
import numpy as np
import math
"""
Features H5Generator which can be used in keras model.fit method for batching large h5 files and
train_test_split which returns train and test H5Generators.
Note that for only test or train datasets without shuffling H5Generator will work faster.
Usage example for only train or test set:
from h5_generator import H5Generator
X_train = H5Generator(path_to_file, 480)
# Load model here
history = model.fit(X_train, epochs = 100)
Usage example for train/test split:
from h5_generator import train_test_split as h5_tts
X_train, X_test = h5_tts(path_to_file, batch_size = 480, test_size = 0.2, random_state = 42)
# Load model here
history = model.fit(X_train, epochs = 100, validation_data = (X_test))
"""
class H5Generator(Sequence):
def __init__(self, path, batch_size, x_name = 'X', y_name = 'Y',
idxs = None, length = None, start_pos = 0):
self.h5_file = path
self.x_name = 'X'
self.y_name = 'Y'
self.batch_size = batch_size
self.idxs = idxs
self.length = length
self.start_pos = start_pos
def __len__(self):
if self.idxs is not None:
l = self.idxs.shape[0]
else:
if not self.length:
with h5.File(self.h5_file) as f:
self.length = f[self.y_name].shape[0]
l = self.length
return math.ceil(l / self.batch_size)
def __getitem__(self, idx):
with h5.File(self.h5_file, 'r') as f:
X, Y = (f[self.x_name], f[self.y_name])
if self.idxs is not None:
batch_idxs = list(np.sort(self.idxs[idx * self.batch_size : (idx + 1) * self.batch_size]))
batch_x = X[batch_idxs]
batch_y = Y[batch_idxs]
else:
batch_x = X[self.start_pos + idx * self.batch_size : self.start_pos + (idx + 1) * self.batch_size]
batch_y = Y[self.start_pos + idx * self.batch_size : self.start_pos + (idx + 1) * self.batch_size]
return batch_x, batch_y
def train_test_split(path, batch_size, x_name = 'X', y_name = 'Y',
test_size = None, train_size = None,
random_state = None, shuffle = True):
"""
Returns H5Generator objects for train/test split.
Positional arguments are made to replicate sklearn.model_selection.train_test_split arguments behaviour.
Note, that for only test or train generator without shuffling H5Generator(...) is more preferable than
x, _, y, _ = train_test_split(...), due to faster data reading.
Arguments:
path - path to h5 file.
batch_size - batch size.
x_name - name of X dataset in h5 file, default; "X".
y_name - name of Y dataset in h5 file, default: "Y".
test_size - [0, 1] - test split ratio, default: None.
train_size - [0, 1] - train split ratio, default: None.
If both test_size and train_size are None, then test_size will default to 0.25.
random_state - random state for numpy.random.seed().
shuffle - shuffle data? Default: True.
"""
if random_state:
np.random.seed(random_state)
idxs = None
data_length = 0
if shuffle:
with h5.File(path, 'r') as f:
idxs = np.arange(f[y_name].shape[0])
np.random.shuffle(idxs)
data_length = idxs.shape[0]
else:
with h5.File(path, 'r') as f:
data_length = f[y_name].shape[0]
# Split
r_train_size = None
if test_size is None and train_size is None:
r_test_size = 0.25
elif test_size is None:
r_test_size = 1. - train_size
elif train_size is None:
r_test_size = test_size
else:
r_test_size = test_size
r_train_size = train_size
if idxs is not None:
if r_train_size is None:
test_pos = math.ceil(idxs.shape[0] * r_test_size)
test_idxs = idxs[:test_pos]
train_idxs = idxs[test_pos:]
else:
if r_test_size + r_train_size > 1.:
raise ValueError('test_size and train_size parameters are invalid!')
test_pos = math.ceil(idxs.shape[0] * r_test_size)
train_pos = math.floor(idxs.shape[0] * r_train_size)
test_idxs = idxs[:test_pos]
train_idxs = idxs[test_pos : test_pos + train_pos]
# Create datasets
X_train = H5Generator(path, batch_size, x_name, y_name, train_idxs)
X_test = H5Generator(path, batch_size, x_name, y_name, test_idxs)
return X_train, X_test
else:
if r_train_size is None:
test_size = math.ceil(data_length * r_test_size)
train_size = data_length - test_size
else:
if r_test_size + r_train_size > 1.:
raise ValueError('test_size and train_size parameters are invalid!')
test_size = math.ceil(data_length * r_test_size)
train_size = math.floor(data_length * r_train_size)
# Create datasets
X_train = H5Generator(path, batch_size, x_name, y_name,
start_pos = test_size, length = train_size)
X_test = H5Generator(path, batch_size, x_name, y_name,
start_pos = 0, length = test_size)
return X_train, X_test