-
Notifications
You must be signed in to change notification settings - Fork 150
/
play_long_file.py
executable file
·109 lines (95 loc) · 3.52 KB
/
play_long_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python3
"""Play an audio file using a limited amount of memory.
The soundfile module (https://PySoundFile.readthedocs.io/) must be
installed for this to work. NumPy is not needed.
In contrast to play_file.py, which loads the whole file into memory
before starting playback, this example program only holds a given number
of audio blocks in memory and is therefore able to play files that are
larger than the available RAM.
A similar example could of course be implemented using NumPy,
but this example shows what can be done when NumPy is not available.
"""
import argparse
import queue
import sys
import threading
import sounddevice as sd
import soundfile as sf
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument(
'filename', metavar='FILENAME',
help='audio file to be played back')
parser.add_argument(
'-d', '--device', type=int_or_str,
help='output device (numeric ID or substring)')
parser.add_argument(
'-b', '--blocksize', type=int, default=2048,
help='block size (default: %(default)s)')
parser.add_argument(
'-q', '--buffersize', type=int, default=20,
help='number of blocks used for buffering (default: %(default)s)')
args = parser.parse_args(remaining)
if args.blocksize == 0:
parser.error('blocksize must not be zero')
if args.buffersize < 1:
parser.error('buffersize must be at least 1')
q = queue.Queue(maxsize=args.buffersize)
event = threading.Event()
def callback(outdata, frames, time, status):
assert frames == args.blocksize
if status.output_underflow:
print('Output underflow: increase blocksize?', file=sys.stderr)
raise sd.CallbackAbort
assert not status
try:
data = q.get_nowait()
except queue.Empty as e:
print('Buffer is empty: increase buffersize?', file=sys.stderr)
raise sd.CallbackAbort from e
if len(data) < len(outdata):
outdata[:len(data)] = data
outdata[len(data):] = b'\x00' * (len(outdata) - len(data))
raise sd.CallbackStop
else:
outdata[:] = data
try:
with sf.SoundFile(args.filename) as f:
for _ in range(args.buffersize):
data = f.buffer_read(args.blocksize, dtype='float32')
if not data:
break
q.put_nowait(data) # Pre-fill queue
stream = sd.RawOutputStream(
samplerate=f.samplerate, blocksize=args.blocksize,
device=args.device, channels=f.channels, dtype='float32',
callback=callback, finished_callback=event.set)
with stream:
timeout = args.blocksize * args.buffersize / f.samplerate
while data:
data = f.buffer_read(args.blocksize, dtype='float32')
q.put(data, timeout=timeout)
event.wait() # Wait until playback is finished
except KeyboardInterrupt:
parser.exit('\nInterrupted by user')
except queue.Full:
# A timeout occurred, i.e. there was an error in the callback
parser.exit(1)
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))