-
Notifications
You must be signed in to change notification settings - Fork 9
/
harvest_frq.py
119 lines (105 loc) · 3.95 KB
/
harvest_frq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import logging
logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO, datefmt='%x %a %X')
if __name__ == '__main__':
logging.info('Importing packages')
import numpy as np
import soundfile as sf
import pyworld as world
import sys
import time
import struct
import os
import traceback
from multiprocessing import freeze_support
import concurrent.futures
from argparse import ArgumentParser
f0_floor = world.default_f0_floor
f0_ceil = world.default_f0_ceil
def base_frq(f0, f0_min=None, f0_max=None):
q = 0
avg_frq = 0
tally = 0
N = len(f0)
if f0_min is None:
f0_min = f0_floor
if f0_max is None:
f0_max = f0_ceil
for i in range(N):
if f0[i] >= f0_min and f0[i] <= f0_max:
if i < 1:
q = f0[i+1] - f0[i]
elif i == N - 1:
q = f0[i] - f0[i-1]
else:
q = (f0[i+1] - f0[i-1]) / 2
weight = 2 ** (-q * q)
avg_frq += f0[i] * weight
tally += weight
if tally > 0:
avg_frq /= tally
return avg_frq
def frq_gen(floc, f0_max=880, hop=256):
t0 = time.perf_counter()
fname, _ = os.path.splitext(floc)
basename = os.path.basename(fname)
logging.info(f'Making {basename}_wav.frq 1/4')
x, fs = sf.read(floc)
logging.info(f'Making {basename}_wav.frq 2/4')
frame_period = 1000 * hop / fs
f0, t = world.harvest(x, fs, f0_ceil=f0_max, frame_period=frame_period)
base_f0 = base_frq(f0, f0_max=f0_max)
logging.info(f'Making {basename}_wav.frq 3/4')
sp = world.cheaptrick(x, f0, t, fs)
amp = np.mean(np.sqrt(sp), axis=1)
logging.info(f'Making {basename}_wav.frq 4/4')
with open(fname + '_wav.frq', 'wb') as f:
f.write(b'FREQ0003')
f.write(struct.pack('i', hop))
f.write(struct.pack('d', base_f0))
f.write(bytes(16))
f.write(struct.pack('i', f0.shape[0]))
for i in range(0, f0.shape[0]):
f.write(struct.pack('2d', f0[i], amp[i]))
t = time.perf_counter()
logging.info(f'{basename}_wav.frq finished at {t - t0:.3f} seconds.')
def process_directory(args):
samples = []
logging.info('Received directory. Listing files')
for root, dirs, files in os.walk(args.path):
for file in files:
if file.endswith('.wav'):
fname, _ = os.path.splitext(file)
samples.append(os.path.join(root, file))
logging.info(f'Listed {len(samples)} file{"s" if len(samples) != 1 else ""}')
num_threads = max(0, args.num_threads)
if num_threads == 1:
logging.info('Running single threaded')
t0 = time.perf_counter()
for sample in samples:
frq_gen(sample)
t = time.perf_counter()
else:
workers = os.cpu_count if num_threads == 0 else num_threads
logging.info(f'Starting process pool with {workers} threads.')
t0 = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
executor.map(frq_gen, samples)
t = time.perf_counter()
logging.info(f'Whole operation took {t - t0:.3f} seconds.')
if __name__ == '__main__':
freeze_support()
try:
parser = ArgumentParser(description="Generate .frq files using WORLD's Harvest F0 estimation algorithm.")
parser.add_argument('path', help='The path to a .wav file or a directory with .wav files.')
parser.add_argument('--num-threads', '-n', type=int, default=1, help='How many threads to use. Default is running single threaded. Input zero to use all available threads.')
args, _ = parser.parse_known_args()
if os.path.isfile(args.path):
logging.info('Received file')
frq_gen(args.path)
else:
process_directory(args)
os.system('pause')
except Exception as e:
for i in traceback.format_exception(e.__class__, e, e.__traceback__):
print(i, end='')
os.system('pause')