Skip to content

Commit

Permalink
[audio] More capabilities for AudioSink using the AudioServlet
Browse files Browse the repository at this point in the history
AudioServlet can now serve all type of AudioStream multiple times by buffering data in memory or in temporary file.
Adding method to ease disposal of temporary file after playing a sound
Adding an identifyier to audio stream for further development (allow audio sink to cache computation data)

Signed-off-by: Gwendal Roulleau <[email protected]>
  • Loading branch information
dalgwen committed Mar 17, 2023
1 parent a32f1e0 commit bfa283c
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ public interface AudioHTTPServer {
/**
* Creates a relative url for a given {@link AudioStream} where it can be requested multiple times within the given
* time frame.
* This method only accepts {@link FixedLengthAudioStream}s, since it needs to be able to create multiple concurrent
* streams from it, which isn't possible with a regular {@link AudioStream}.
* This method accepts all {@link AudioStream}s, but it is better to use {@link FixedLengthAudioStream}s since it
* needs to be able to create multiple concurrent streams from it.
* If generic {@link AudioStream} is used, we try to keep this capability by storing it in a small memory buffer,
* e.g {@link ByteArrayAudioStream}, or in a cached file if the stream is too long.
* Streams are closed, once they expire.
*
* @param stream the stream to serve on HTTP
* @param seconds number of seconds for which the stream is available through HTTP
* @return the relative URL to access the stream starting with a '/'
*/
String serve(FixedLengthAudioStream stream, int seconds);
String serve(AudioStream stream, int seconds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public interface AudioSink {
*
* In case the audioStream is null, this should be interpreted as a request to end any currently playing stream.
*
* If you call this method and if the sink is synchronous, you should thereafter get rid of a stream implementing
* the {@link FileAudioStream.Disposable} interface by calling the dispose method.
*
* @param audioStream the audio stream to play or null to keep quiet
* @throws UnsupportedAudioFormatException If audioStream format is not supported
* @throws UnsupportedAudioStreamException If audioStream is not supported
Expand Down Expand Up @@ -94,4 +97,17 @@ void process(@Nullable AudioStream audioStream)
* @throws IOException if the volume can not be set
*/
void setVolume(PercentType volume) throws IOException;

/**
* Tell if the sink is synchronous.
* If true, caller may dispose of the stream immediately after the process method.
* On the contrary, if in the process method, the sink returns before the input stream is entirely consumed,
* then the sink should override this method and return false.
* Please note that by doing so, the sink should then take care itself of the InputStream implementing the
* {@link FileAudioStream.Disposable} interface by calling the dispose method when finishing
* reading it.
*/
default boolean isSynchronous() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.InputStream;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;

/**
* Wrapper for a source of audio data.
Expand All @@ -37,4 +38,14 @@ public abstract class AudioStream extends InputStream {
* @return The supported audio format
*/
public abstract AudioFormat getFormat();

/**
* Usefull for sinks playing the same stream multiple times,
* to avoid already done computation (like reencoding).
*
* @return A string uniquely identifying the stream.
*/
public @Nullable String getId() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,13 @@ public synchronized void reset() throws IOException {
public InputStream getClonedStream() throws AudioException {
return getInputStream(file);
}

/**
* Implements this interface if you want the stream consumer to make a callback when the audio stream is not needed
* anymore. You should then delete your file in the delete method.
*/
@FunctionalInterface
public interface Disposable {
void dispose() throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.openhab.core.audio.AudioSource;
import org.openhab.core.audio.AudioStream;
import org.openhab.core.audio.FileAudioStream;
import org.openhab.core.audio.FileAudioStream.Disposable;
import org.openhab.core.audio.URLAudioStream;
import org.openhab.core.audio.UnsupportedAudioFormatException;
import org.openhab.core.audio.UnsupportedAudioStreamException;
Expand Down Expand Up @@ -142,8 +143,16 @@ public void play(@Nullable AudioStream audioStream, @Nullable String sinkId, @Nu
}
try {
sink.process(audioStream);
// if the stream is not needed anymore, then we should call back the AudioStream to let it a chance
// to auto dispose:
if (sink.isSynchronous() && audioStream instanceof Disposable disposableAudioStream) {
disposableAudioStream.dispose();
}
} catch (UnsupportedAudioFormatException | UnsupportedAudioStreamException e) {
logger.warn("Error playing '{}': {}", audioStream, e.getMessage(), e);
} catch (IOException e) {
String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown";
logger.warn("Cannot dispose of audio stream {}", fileName, e);
} finally {
if (volume != null && oldVolume != null) {
// restore volume only if it was set before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
*/
package org.openhab.core.audio.internal;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -22,6 +26,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -31,12 +36,16 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.audio.AudioException;
import org.openhab.core.audio.AudioFormat;
import org.openhab.core.audio.AudioHTTPServer;
import org.openhab.core.audio.AudioStream;
import org.openhab.core.audio.ByteArrayAudioStream;
import org.openhab.core.audio.FileAudioStream;
import org.openhab.core.audio.FileAudioStream.Disposable;
import org.openhab.core.audio.FixedLengthAudioStream;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
Expand All @@ -60,12 +69,16 @@ public class AudioServlet extends HttpServlet implements AudioHTTPServer {

private static final List<String> WAV_MIME_TYPES = List.of("audio/wav", "audio/x-wav", "audio/vnd.wave");

// this one MB buffer will help playing multiple times an AudioStream, if the sink cannot do otherwise
private static final int ONETIME_STREAM_BUFFER_MAX_SIZE = 1048576;

static final String SERVLET_PATH = "/audio";

private final Logger logger = LoggerFactory.getLogger(AudioServlet.class);

private final Map<String, AudioStream> oneTimeStreams = new ConcurrentHashMap<>();
private final Map<String, FixedLengthAudioStream> multiTimeStreams = new ConcurrentHashMap<>();
private final Map<String, @NonNull AtomicInteger> currentlyServedResponseByStreamId = new ConcurrentHashMap<>();

private final Map<String, Long> streamTimeouts = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -160,12 +173,18 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se
.map(String::trim).collect(Collectors.toList());

try (final InputStream stream = prepareInputStream(streamId, resp, acceptedMimeTypes)) {
if (stream == null) {
logger.debug("Received request for invalid stream id at {}", requestURI);
resp.sendError(HttpServletResponse.SC_NOT_FOUND);
} else {
stream.transferTo(resp.getOutputStream());
resp.flushBuffer();
try {
currentlyServedResponseByStreamId.computeIfAbsent(streamId, (String) -> new AtomicInteger())
.incrementAndGet();
if (stream == null) {
logger.debug("Received request for invalid stream id at {}", requestURI);
resp.sendError(HttpServletResponse.SC_NOT_FOUND);
} else {
stream.transferTo(resp.getOutputStream());
resp.flushBuffer();
}
} finally {
currentlyServedResponseByStreamId.get(streamId).decrementAndGet();
}
} catch (final AudioException ex) {
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, ex.getMessage());
Expand All @@ -179,10 +198,24 @@ private synchronized void removeTimedOutStreams() {
.map(Entry::getKey).collect(Collectors.toList());

toRemove.forEach(streamId -> {
// the stream has expired, we need to remove it!
final FixedLengthAudioStream stream = multiTimeStreams.remove(streamId);
streamTimeouts.remove(streamId);
tryClose(stream);
// the stream has expired, if no one is using it we need to remove it
Integer numberOfStreamServed = currentlyServedResponseByStreamId.getOrDefault(streamId, new AtomicInteger())
.decrementAndGet();
if (numberOfStreamServed <= 0) {
final FixedLengthAudioStream stream = multiTimeStreams.remove(streamId);
streamTimeouts.remove(streamId);
tryClose(stream);
if (stream instanceof Disposable disposableStream) {
try {
disposableStream.dispose();
} catch (IOException e) {
String fileName = disposableStream instanceof FileAudioStream file ? file.toString()
: "unknown";
logger.warn("Cannot delete temporary file {} for the AudioServlet", fileName, e);
}
}
}

logger.debug("Removed timed out stream {}", streamId);
});
}
Expand All @@ -195,13 +228,42 @@ public String serve(AudioStream stream) {
}

@Override
public String serve(FixedLengthAudioStream stream, int seconds) {
public String serve(AudioStream stream, int seconds) {
String streamId = UUID.randomUUID().toString();
multiTimeStreams.put(streamId, stream);
if (stream instanceof FixedLengthAudioStream fixedLengthAudioStream) {
multiTimeStreams.put(streamId, fixedLengthAudioStream);
} else { // we prefer a FixedLengthAudioStream, but we can try to make one
try {
multiTimeStreams.put(streamId, createFixedLengthInputStream(stream, streamId));
} catch (AudioException | IOException e) {
logger.warn("Cannot precache the audio stream to serve it", e);
}
}
streamTimeouts.put(streamId, System.nanoTime() + TimeUnit.SECONDS.toNanos(seconds));
return getRelativeURL(streamId);
}

private FixedLengthAudioStream createFixedLengthInputStream(AudioStream stream, String streamId)
throws IOException, AudioException {
byte[] dataBytes = stream.readNBytes(ONETIME_STREAM_BUFFER_MAX_SIZE + 1);
FixedLengthAudioStream fixedLengthAudioStreamResult;
if (dataBytes.length <= ONETIME_STREAM_BUFFER_MAX_SIZE) {
// we will use an in memory buffer to avoid disk operation
fixedLengthAudioStreamResult = new ByteArrayAudioStream(dataBytes, stream.getFormat());
} else {
// in memory max size exceeded, sound is too long, we will use a file
File tempFile = File.createTempFile(streamId, ".snd");
tempFile.deleteOnExit();
try (OutputStream outputStream = new FileOutputStream(tempFile)) {
outputStream.write(dataBytes);
stream.transferTo(outputStream);
}
fixedLengthAudioStreamResult = new TemporaryFileAudioStream(tempFile, stream.getFormat());
}
tryClose(stream);
return fixedLengthAudioStreamResult;
}

Map<String, FixedLengthAudioStream> getMultiTimeStreams() {
return Collections.unmodifiableMap(multiTimeStreams);
}
Expand All @@ -213,4 +275,19 @@ Map<String, AudioStream> getOneTimeStreams() {
private String getRelativeURL(String streamId) {
return SERVLET_PATH + "/" + streamId;
}

private static class TemporaryFileAudioStream extends FileAudioStream implements FileAudioStream.Disposable {

private File file;

public TemporaryFileAudioStream(File file, AudioFormat format) throws AudioException {
super(file, format);
this.file = file;
}

@Override
public void dispose() throws IOException {
Files.delete(file.toPath());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.openhab.core.audio.AudioHTTPServer;
import org.openhab.core.audio.AudioSink;
import org.openhab.core.audio.AudioStream;
import org.openhab.core.audio.FixedLengthAudioStream;
import org.openhab.core.audio.URLAudioStream;
import org.openhab.core.audio.UnsupportedAudioFormatException;
import org.openhab.core.audio.UnsupportedAudioStreamException;
Expand All @@ -49,8 +48,7 @@ public class WebAudioAudioSink implements AudioSink {
private final Logger logger = LoggerFactory.getLogger(WebAudioAudioSink.class);

private static final Set<AudioFormat> SUPPORTED_AUDIO_FORMATS = Set.of(AudioFormat.MP3, AudioFormat.WAV);
private static final Set<Class<? extends AudioStream>> SUPPORTED_AUDIO_STREAMS = Set
.of(FixedLengthAudioStream.class, URLAudioStream.class);
private static final Set<Class<? extends AudioStream>> SUPPORTED_AUDIO_STREAMS = Set.of(AudioStream.class);

private AudioHTTPServer audioHTTPServer;
private EventPublisher eventPublisher;
Expand All @@ -76,14 +74,9 @@ public void process(@Nullable AudioStream audioStream)
// it is an external URL, so we can directly pass this on.
URLAudioStream urlAudioStream = (URLAudioStream) audioStream;
sendEvent(urlAudioStream.getURL());
} else if (audioStream instanceof FixedLengthAudioStream) {
// we need to serve it for a while and make it available to multiple clients, hence only
// FixedLengthAudioStreams are supported.
sendEvent(audioHTTPServer.serve((FixedLengthAudioStream) audioStream, 10).toString());
} else {
throw new UnsupportedAudioStreamException(
"Web audio sink can only handle FixedLengthAudioStreams and URLAudioStreams.",
audioStream.getClass());
// we need to serve it for a while and make it available to multiple clients
sendEvent(audioHTTPServer.serve(audioStream, 10).toString());
}
} catch (IOException e) {
logger.debug("Error while closing the audio stream: {}", e.getMessage(), e);
Expand Down Expand Up @@ -124,4 +117,9 @@ public PercentType getVolume() throws IOException {
public void setVolume(final PercentType volume) throws IOException {
throw new IOException("Web Audio sink does not support volume level changes.");
}

@Override
public boolean isSynchronous() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.openhab.core.audio.AudioException;
import org.openhab.core.audio.AudioFormat;
import org.openhab.core.audio.AudioStream;
import org.openhab.core.audio.FileAudioStream;
import org.openhab.core.audio.FileAudioStream.Disposable;
import org.openhab.core.audio.UnsupportedAudioFormatException;
import org.openhab.core.audio.UnsupportedAudioStreamException;
import org.openhab.core.audio.utils.ToneSynthesizer;
Expand Down Expand Up @@ -147,7 +149,7 @@ private void initToneSynthesizer(@Nullable String listeningMelodyText) {

/**
* Starts a persistent dialog
*
*
* @throws IllegalStateException if keyword spot service is misconfigured
*/
public void start() throws IllegalStateException {
Expand Down Expand Up @@ -383,12 +385,25 @@ protected void say(@Nullable String text) {
if (dialogContext.sink().getSupportedStreams().stream().anyMatch(clazz -> clazz.isInstance(audioStream))) {
try {
dialogContext.sink().process(audioStream);
// if the stream is not needed anymore, then we should call back the AudioStream to let it a chance
// to auto dispose:
if (dialogContext.sink().isSynchronous()
&& audioStream instanceof Disposable disposableAudioStream) {
disposableAudioStream.dispose();
}
} catch (UnsupportedAudioFormatException | UnsupportedAudioStreamException e) {
if (logger.isDebugEnabled()) {
logger.debug("Error saying '{}': {}", text, e.getMessage(), e);
} else {
logger.warn("Error saying '{}': {}", text, e.getMessage());
}
} catch (IOException e) {
String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown";
if (logger.isDebugEnabled()) {
logger.debug("Cannot dispose of stream {}", fileName, e);
} else {
logger.warn("Cannot dispose of stream {}", fileName);
}
}
} else {
logger.warn("Failed playing audio stream '{}' as audio doesn't support it.", audioStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.openhab.core.audio.AudioSink;
import org.openhab.core.audio.AudioSource;
import org.openhab.core.audio.AudioStream;
import org.openhab.core.audio.FileAudioStream;
import org.openhab.core.audio.FileAudioStream.Disposable;
import org.openhab.core.audio.UnsupportedAudioFormatException;
import org.openhab.core.audio.UnsupportedAudioStreamException;
import org.openhab.core.common.ThreadPoolManager;
Expand Down Expand Up @@ -293,6 +295,14 @@ public void say(String text, @Nullable String voiceId, @Nullable String sinkId,
}
try {
sink.process(audioStream);
// if the stream is not needed anymore, then we should call back the AudioStream to let it a chance
// to auto dispose:
if (sink.isSynchronous() && audioStream instanceof Disposable disposableAudioStream) {
disposableAudioStream.dispose();
}
} catch (IOException e) {
String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown";
logger.warn("Cannot dispose of stream {}", fileName, e);
} finally {
if (volume != null && oldVolume != null) {
// restore volume only if it was set before
Expand Down
Loading

0 comments on commit bfa283c

Please sign in to comment.