Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update WavPlayer to improved version from PicoFX #17

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 144 additions & 77 deletions lib/pimoroni_yukon/devices/audio.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# SPDX-FileCopyrightText: 2023 Christopher Parrott for Pimoroni Ltd
# SPDX-FileCopyrightText: 2024 Christopher Parrott for Pimoroni Ltd
#
# SPDX-License-Identifier: MIT

import os
import math
import struct
from machine import I2S
from machine import I2S, Pin

"""
A class for playing Wav files out of an I2S audio amp. It can also play pure tones.
Expand All @@ -14,6 +13,70 @@
"""


class WavReader:
def __init__(self, file):
self.wav_file = open(file, "rb")
self._parse(self.wav_file)

def _parse(self, wav_file):
chunk_ID = wav_file.read(4)
if chunk_ID != b"RIFF":
raise ValueError("WAV chunk ID invalid")
_ = wav_file.read(4) # chunk_size
if wav_file.read(4) != b"WAVE":
raise ValueError("WAV format invalid")
sub_chunk1_ID = wav_file.read(4)
if sub_chunk1_ID != b"fmt ":
raise ValueError("WAV sub chunk 1 ID invalid")
_ = wav_file.read(4) # sub_chunk1_size
_ = struct.unpack("<H", wav_file.read(2))[0] # audio_format
num_channels = struct.unpack("<H", wav_file.read(2))[0]

if num_channels == 1:
self.format = I2S.MONO
else:
self.format = I2S.STEREO

self.sample_rate = struct.unpack("<I", wav_file.read(4))[0]
# if sample_rate != 44_100 and sample_rate != 48_000:
# raise ValueError(f"WAV sample rate of {sample_rate} invalid. Only 44.1KHz or 48KHz audio are supported")

_ = struct.unpack("<I", wav_file.read(4))[0] # byte_rate
_ = struct.unpack("<H", wav_file.read(2))[0] # block_align
self.bits_per_sample = struct.unpack("<H", wav_file.read(2))[0]

# usually the sub chunk2 ID ("data") comes next, but
# some online MP3->WAV converters add
# binary data before "data". So, read a fairly large
# block of bytes and search for "data".

binary_block = wav_file.read(200)
offset = binary_block.find(b"data")
if offset == -1:
raise ValueError("WAV sub chunk 2 ID not found")

self.offset = offset + 44

wav_file.seek(offset + 40)
self.size = struct.unpack("<I", wav_file.read(4))[0]

wav_file.seek(self.offset)

def seek(self, pos):
return self.wav_file.seek(pos + self.offset)

def tell(self):
return self.wav_file.tell() - self.offset

def readinto(self, buf):
max_bytes = self.size - self.tell()
max_bytes = max(0, min(len(buf), max_bytes))
return self.wav_file.readinto(buf[:max_bytes])

def close(self):
self.wav_file.close()


class WavPlayer:
# Internal states
PLAY = 0
Expand All @@ -25,21 +88,32 @@ class WavPlayer:
MODE_WAV = 0
MODE_TONE = 1

TONE_SINE = 0
TONE_SQUARE = 1
TONE_TRIANGLE = 2

# Default buffer length
SILENCE_BUFFER_LENGTH = 1000
WAV_BUFFER_LENGTH = 10000
INTERNAL_BUFFER_LENGTH = 20000
SILENCE_BUFFER_LENGTH = 1024
WAV_BUFFER_LENGTH = 1024
INTERNAL_BUFFER_LENGTH = WAV_BUFFER_LENGTH * 2

TONE_SAMPLE_RATE = 44_100
TONE_BITS_PER_SAMPLE = 16
TONE_FULL_WAVES = 2

def __init__(self, id, sck_pin, ws_pin, sd_pin, ibuf_len=INTERNAL_BUFFER_LENGTH, root="/"):
def __init__(self, id, sck_pin, ws_pin, sd_pin, amp_enable=None, ibuf_len=INTERNAL_BUFFER_LENGTH, root="/"):
self.__id = id
self.__sck_pin = sck_pin
self.__ws_pin = ws_pin
self.__sd_pin = sd_pin
self.__ibuf_len = ibuf_len
self.__enable = None

# Manually tweak the tone amplitude for equal loudness of sine/square/triangle
self.__amplitude_scale = [1.0, 0.2, 0.5]

if amp_enable is not None:
self.__enable = Pin(amp_enable, Pin.OUT)

# Set the directory to search for files in
self.set_root(root)
Expand All @@ -48,7 +122,6 @@ def __init__(self, id, sck_pin, ws_pin, sd_pin, ibuf_len=INTERNAL_BUFFER_LENGTH,
self.__mode = WavPlayer.MODE_WAV
self.__wav_file = None
self.__loop_wav = False
self.__first_sample_offset = None
self.__flush_count = 0
self.__audio_out = None

Expand All @@ -66,44 +139,60 @@ def set_root(self, root):
self.__root = root.rstrip("/") + "/"

def play_wav(self, wav_file, loop=False):
if os.listdir(self.__root).count(wav_file) == 0:
raise ValueError(f"'{wav_file}' not found")

self.__stop_i2s() # Stop any active playback and terminate the I2S instance

self.__wav_file = open(self.__root + wav_file, "rb") # Open the chosen WAV file in read-only, binary mode
try:
self.__wav_file = open(self.__root + wav_file, "rb") # Open the chosen WAV file in read-only, binary mode
except OSError:
raise ValueError(f"'{wav_file}' not found")
self.__loop_wav = loop # Record if the user wants the file to loop
self.__loop_count = 0 # Count loops for debugging purposes

# Parse the WAV file, returning the necessary parameters to initialise I2S communication
format, sample_rate, bits_per_sample, self.__first_sample_offset = WavPlayer.__parse_wav(self.__wav_file)
self.__wav_file = WavReader(self.__root + wav_file)

self.__wav_file.seek(self.__first_sample_offset) # Advance to first byte of sample data

self.__start_i2s(bits=bits_per_sample,
format=format,
rate=sample_rate,
self.__start_i2s(bits=self.__wav_file.bits_per_sample,
format=self.__wav_file.format,
rate=self.__wav_file.sample_rate,
state=WavPlayer.PLAY,
mode=WavPlayer.MODE_WAV)

def play_tone(self, frequency, amplitude):
def play_tone(self, frequency, amplitude, shape=TONE_SINE):
if frequency < 20.0 or frequency > 20_000:
raise ValueError("frequency out of range. Expected between 20Hz and 20KHz")

if amplitude < 0.0 or amplitude > 1.0:
raise ValueError("amplitude out of range. Expected 0.0 to 1.0")

if not isinstance(shape, (list, tuple)):
shape = (shape, )

# Create a buffer containing the pure tone samples
samples_per_cycle = self.TONE_SAMPLE_RATE // frequency
sample_size_in_bytes = self.TONE_BITS_PER_SAMPLE // 8
samples = bytearray(self.TONE_FULL_WAVES * samples_per_cycle * sample_size_in_bytes)
range = pow(2, self.TONE_BITS_PER_SAMPLE) // 2
maximum = (pow(2, self.TONE_BITS_PER_SAMPLE) // 2 - 1) * amplitude

format = "<h" if self.TONE_BITS_PER_SAMPLE == 16 else "<l"

# Populate the buffer with multiple cycles to avoid it completing too quickly and causing drop outs
for i in range(samples_per_cycle * self.TONE_FULL_WAVES):
sample = int((range - 1) * (math.sin(2 * math.pi * i / samples_per_cycle)) * amplitude)
struct.pack_into(format, samples, i * sample_size_in_bytes, sample)
sample = 0
if self.TONE_TRIANGLE in shape:
triangle = (i % samples_per_cycle) - (samples_per_cycle // 2)
triangle /= samples_per_cycle
triangle *= self.__amplitude_scale[self.TONE_TRIANGLE]
sample += triangle
if self.TONE_SINE in shape:
sine = math.sin(2 * math.pi * i / samples_per_cycle)
sine *= self.__amplitude_scale[self.TONE_SINE]
sample += sine
if self.TONE_SQUARE in shape:
square = 1 if (i % samples_per_cycle) < (samples_per_cycle // 2) else -1
square *= self.__amplitude_scale[self.TONE_SQUARE]
sample += square
sample = max(-1, min(1, sample))
struct.pack_into(format, samples, i * sample_size_in_bytes, int(sample * maximum))

# Are we not already playing tones?
if not (self.__mode == WavPlayer.MODE_TONE and (self.__state == WavPlayer.PLAY or self.__state == WavPlayer.PAUSE)):
Expand Down Expand Up @@ -137,6 +226,9 @@ def stop(self):
else:
self.__state = WavPlayer.STOP

def deinit(self):
self.__stop_i2s()

def is_playing(self):
return self.__state != WavPlayer.NONE and self.__state != WavPlayer.STOP

Expand Down Expand Up @@ -164,45 +256,61 @@ def __start_i2s(self, bits=16, format=I2S.MONO, rate=44_100, state=STOP, mode=MO
self.__audio_out.irq(self.__i2s_callback)
self.__audio_out.write(self.__silence_samples)

if self.__enable is not None:
self.__enable.on()

def __stop_i2s(self):
self.stop() # Stop any active playback
while self.is_playing(): # and wait for it to complete
pass

if self.__enable is not None:
self.__enable.off()

if self.__audio_out is not None:
self.__audio_out.deinit() # Deinit any active I2S comms

self.__state == WavPlayer.NONE # Return to the none state

def __i2s_callback(self, arg):
### PLAY ###
# PLAY
if self.__state == WavPlayer.PLAY:
if self.__mode == WavPlayer.MODE_WAV:
num_read = self.__wav_file.readinto(self.__wav_samples_mv) # Read the next section of the WAV file

# Have we reached the end of the file?
if num_read == 0:
# Do we want to loop the WAV playback?
if self.__loop_wav:
_ = self.__wav_file.seek(self.__first_sample_offset) # Play again, so advance to first byte of sample data
if self.__loop_wav: # Looped playback
loop_read = 0
while loop_read < self.WAV_BUFFER_LENGTH:
num_read = self.__wav_file.readinto(self.__wav_samples_mv[loop_read:]) # Read the next section of the WAV file
loop_read += num_read
if num_read == 0:
_ = self.__wav_file.seek(0) # Play again, so advance to first byte of sample data
self.__loop_count += 1
self.__audio_out.write(self.__wav_samples_mv)

else: # Single shot playback
num_read = self.__wav_file.readinto(self.__wav_samples_mv)

if num_read:
self.__audio_out.write(self.__wav_samples_mv[: num_read]) # We are within the file, so write out the next audio samples
else:
self.__audio_out.write(self.__silence_samples) # Play silence to end this callback

# Have we reached the end of the file? (num_read is either 0 or a short read)
if num_read < self.WAV_BUFFER_LENGTH:
# Do we want to loop the WAV playback?
self.__wav_file.close() # Stop playing, so close the file
self.__state = WavPlayer.FLUSH # and enter the flush state on the next callback

self.__audio_out.write(self.__silence_samples) # In both cases play silence to end this callback
else:
self.__audio_out.write(self.__wav_samples_mv[: num_read]) # We are within the file, so write out the next audio samples
else:
if self.__queued_samples is not None:
self.__tone_samples = self.__queued_samples
self.__queued_samples = None
self.__audio_out.write(self.__tone_samples)

### PAUSE or STOP ###
# PAUSE or STOP
elif self.__state == WavPlayer.PAUSE or self.__state == WavPlayer.STOP:
self.__audio_out.write(self.__silence_samples) # Play silence

### FLUSH ###
# FLUSH
elif self.__state == WavPlayer.FLUSH:
# Flush is used to allow the residual audio samples in the internal buffer to be written
# to the I2S peripheral. This step avoids part of the sound file from being cut off
Expand All @@ -212,47 +320,6 @@ def __i2s_callback(self, arg):
self.__state = WavPlayer.STOP # Enter the stop state on the next callback
self.__audio_out.write(self.__silence_samples) # Play silence

### NONE ###
# NONE
elif self.__state == WavPlayer.NONE:
pass

@staticmethod
def __parse_wav(wav_file):
chunk_ID = wav_file.read(4)
if chunk_ID != b"RIFF":
raise ValueError("WAV chunk ID invalid")
_ = wav_file.read(4) # chunk_size
format = wav_file.read(4)
if format != b"WAVE":
raise ValueError("WAV format invalid")
sub_chunk1_ID = wav_file.read(4)
if sub_chunk1_ID != b"fmt ":
raise ValueError("WAV sub chunk 1 ID invalid")
_ = wav_file.read(4) # sub_chunk1_size
_ = struct.unpack("<H", wav_file.read(2))[0] # audio_format
num_channels = struct.unpack("<H", wav_file.read(2))[0]

if num_channels == 1:
format = I2S.MONO
else:
format = I2S.STEREO

sample_rate = struct.unpack("<I", wav_file.read(4))[0]
if sample_rate != 44_100 and sample_rate != 48_000:
raise ValueError(f"WAV sample rate of {sample_rate} invalid. Only 44.1KHz or 48KHz audio are supported")

_ = struct.unpack("<I", wav_file.read(4))[0] # byte_rate
_ = struct.unpack("<H", wav_file.read(2))[0] # block_align
bits_per_sample = struct.unpack("<H", wav_file.read(2))[0]

# usually the sub chunk2 ID ("data") comes next, but
# some online MP3->WAV converters add
# binary data before "data". So, read a fairly large
# block of bytes and search for "data".

binary_block = wav_file.read(200)
offset = binary_block.find(b"data")
if offset == -1:
raise ValueError("WAV sub chunk 2 ID not found")

return (format, sample_rate, bits_per_sample, 44 + offset)