From bfa283c5ce352c6d34eeba9c9ab0af1fcf747230 Mon Sep 17 00:00:00 2001 From: Gwendal Roulleau Date: Sat, 11 Mar 2023 03:23:04 +0100 Subject: [PATCH] [audio] More capabilities for AudioSink using the AudioServlet AudioServlet can now serve all type of AudioStream multiple times by buffering data in memory or in temporary file. Adding method to ease disposal of temporary file after playing a sound Adding an identifyier to audio stream for further development (allow audio sink to cache computation data) Signed-off-by: Gwendal Roulleau --- .../openhab/core/audio/AudioHTTPServer.java | 8 +- .../org/openhab/core/audio/AudioSink.java | 16 +++ .../org/openhab/core/audio/AudioStream.java | 11 ++ .../openhab/core/audio/FileAudioStream.java | 9 ++ .../core/audio/internal/AudioManagerImpl.java | 9 ++ .../core/audio/internal/AudioServlet.java | 101 +++++++++++++++--- .../internal/webaudio/WebAudioAudioSink.java | 18 ++-- .../core/voice/internal/DialogProcessor.java | 17 ++- .../core/voice/internal/VoiceManagerImpl.java | 10 ++ .../internal/cache/AudioStreamFromCache.java | 9 +- .../voice/internal/cache/TTSLRUCacheImpl.java | 2 +- 11 files changed, 182 insertions(+), 28 deletions(-) diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioHTTPServer.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioHTTPServer.java index 9788b3acbdd..d7192a38700 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioHTTPServer.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioHTTPServer.java @@ -40,13 +40,15 @@ public interface AudioHTTPServer { /** * Creates a relative url for a given {@link AudioStream} where it can be requested multiple times within the given * time frame. - * This method only accepts {@link FixedLengthAudioStream}s, since it needs to be able to create multiple concurrent - * streams from it, which isn't possible with a regular {@link AudioStream}. + * This method accepts all {@link AudioStream}s, but it is better to use {@link FixedLengthAudioStream}s since it + * needs to be able to create multiple concurrent streams from it. + * If generic {@link AudioStream} is used, we try to keep this capability by storing it in a small memory buffer, + * e.g {@link ByteArrayAudioStream}, or in a cached file if the stream is too long. * Streams are closed, once they expire. * * @param stream the stream to serve on HTTP * @param seconds number of seconds for which the stream is available through HTTP * @return the relative URL to access the stream starting with a '/' */ - String serve(FixedLengthAudioStream stream, int seconds); + String serve(AudioStream stream, int seconds); } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSink.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSink.java index f1ecca1d43c..a312e88a187 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSink.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSink.java @@ -58,6 +58,9 @@ public interface AudioSink { * * In case the audioStream is null, this should be interpreted as a request to end any currently playing stream. * + * If you call this method and if the sink is synchronous, you should thereafter get rid of a stream implementing + * the {@link FileAudioStream.Disposable} interface by calling the dispose method. + * * @param audioStream the audio stream to play or null to keep quiet * @throws UnsupportedAudioFormatException If audioStream format is not supported * @throws UnsupportedAudioStreamException If audioStream is not supported @@ -94,4 +97,17 @@ void process(@Nullable AudioStream audioStream) * @throws IOException if the volume can not be set */ void setVolume(PercentType volume) throws IOException; + + /** + * Tell if the sink is synchronous. + * If true, caller may dispose of the stream immediately after the process method. + * On the contrary, if in the process method, the sink returns before the input stream is entirely consumed, + * then the sink should override this method and return false. + * Please note that by doing so, the sink should then take care itself of the InputStream implementing the + * {@link FileAudioStream.Disposable} interface by calling the dispose method when finishing + * reading it. + */ + default boolean isSynchronous() { + return true; + } } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioStream.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioStream.java index c122aaf5fc6..560fcc21278 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioStream.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioStream.java @@ -15,6 +15,7 @@ import java.io.InputStream; import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; /** * Wrapper for a source of audio data. @@ -37,4 +38,14 @@ public abstract class AudioStream extends InputStream { * @return The supported audio format */ public abstract AudioFormat getFormat(); + + /** + * Usefull for sinks playing the same stream multiple times, + * to avoid already done computation (like reencoding). + * + * @return A string uniquely identifying the stream. + */ + public @Nullable String getId() { + return null; + } } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/FileAudioStream.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/FileAudioStream.java index 481d62353aa..be0f9f83748 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/FileAudioStream.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/FileAudioStream.java @@ -125,4 +125,13 @@ public synchronized void reset() throws IOException { public InputStream getClonedStream() throws AudioException { return getInputStream(file); } + + /** + * Implements this interface if you want the stream consumer to make a callback when the audio stream is not needed + * anymore. You should then delete your file in the delete method. + */ + @FunctionalInterface + public interface Disposable { + void dispose() throws IOException; + } } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioManagerImpl.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioManagerImpl.java index 914bedf9115..c18012167c6 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioManagerImpl.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioManagerImpl.java @@ -37,6 +37,7 @@ import org.openhab.core.audio.AudioSource; import org.openhab.core.audio.AudioStream; import org.openhab.core.audio.FileAudioStream; +import org.openhab.core.audio.FileAudioStream.Disposable; import org.openhab.core.audio.URLAudioStream; import org.openhab.core.audio.UnsupportedAudioFormatException; import org.openhab.core.audio.UnsupportedAudioStreamException; @@ -142,8 +143,16 @@ public void play(@Nullable AudioStream audioStream, @Nullable String sinkId, @Nu } try { sink.process(audioStream); + // if the stream is not needed anymore, then we should call back the AudioStream to let it a chance + // to auto dispose: + if (sink.isSynchronous() && audioStream instanceof Disposable disposableAudioStream) { + disposableAudioStream.dispose(); + } } catch (UnsupportedAudioFormatException | UnsupportedAudioStreamException e) { logger.warn("Error playing '{}': {}", audioStream, e.getMessage(), e); + } catch (IOException e) { + String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown"; + logger.warn("Cannot dispose of audio stream {}", fileName, e); } finally { if (volume != null && oldVolume != null) { // restore volume only if it was set before diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioServlet.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioServlet.java index 8a704810bbd..aa9a4d3ef57 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioServlet.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioServlet.java @@ -12,8 +12,12 @@ */ package org.openhab.core.audio.internal; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; import java.util.Collections; import java.util.List; import java.util.Map; @@ -22,6 +26,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -31,12 +36,16 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.openhab.core.audio.AudioException; import org.openhab.core.audio.AudioFormat; import org.openhab.core.audio.AudioHTTPServer; import org.openhab.core.audio.AudioStream; +import org.openhab.core.audio.ByteArrayAudioStream; +import org.openhab.core.audio.FileAudioStream; +import org.openhab.core.audio.FileAudioStream.Disposable; import org.openhab.core.audio.FixedLengthAudioStream; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; @@ -60,12 +69,16 @@ public class AudioServlet extends HttpServlet implements AudioHTTPServer { private static final List WAV_MIME_TYPES = List.of("audio/wav", "audio/x-wav", "audio/vnd.wave"); + // this one MB buffer will help playing multiple times an AudioStream, if the sink cannot do otherwise + private static final int ONETIME_STREAM_BUFFER_MAX_SIZE = 1048576; + static final String SERVLET_PATH = "/audio"; private final Logger logger = LoggerFactory.getLogger(AudioServlet.class); private final Map oneTimeStreams = new ConcurrentHashMap<>(); private final Map multiTimeStreams = new ConcurrentHashMap<>(); + private final Map currentlyServedResponseByStreamId = new ConcurrentHashMap<>(); private final Map streamTimeouts = new ConcurrentHashMap<>(); @@ -160,12 +173,18 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se .map(String::trim).collect(Collectors.toList()); try (final InputStream stream = prepareInputStream(streamId, resp, acceptedMimeTypes)) { - if (stream == null) { - logger.debug("Received request for invalid stream id at {}", requestURI); - resp.sendError(HttpServletResponse.SC_NOT_FOUND); - } else { - stream.transferTo(resp.getOutputStream()); - resp.flushBuffer(); + try { + currentlyServedResponseByStreamId.computeIfAbsent(streamId, (String) -> new AtomicInteger()) + .incrementAndGet(); + if (stream == null) { + logger.debug("Received request for invalid stream id at {}", requestURI); + resp.sendError(HttpServletResponse.SC_NOT_FOUND); + } else { + stream.transferTo(resp.getOutputStream()); + resp.flushBuffer(); + } + } finally { + currentlyServedResponseByStreamId.get(streamId).decrementAndGet(); } } catch (final AudioException ex) { resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, ex.getMessage()); @@ -179,10 +198,24 @@ private synchronized void removeTimedOutStreams() { .map(Entry::getKey).collect(Collectors.toList()); toRemove.forEach(streamId -> { - // the stream has expired, we need to remove it! - final FixedLengthAudioStream stream = multiTimeStreams.remove(streamId); - streamTimeouts.remove(streamId); - tryClose(stream); + // the stream has expired, if no one is using it we need to remove it + Integer numberOfStreamServed = currentlyServedResponseByStreamId.getOrDefault(streamId, new AtomicInteger()) + .decrementAndGet(); + if (numberOfStreamServed <= 0) { + final FixedLengthAudioStream stream = multiTimeStreams.remove(streamId); + streamTimeouts.remove(streamId); + tryClose(stream); + if (stream instanceof Disposable disposableStream) { + try { + disposableStream.dispose(); + } catch (IOException e) { + String fileName = disposableStream instanceof FileAudioStream file ? file.toString() + : "unknown"; + logger.warn("Cannot delete temporary file {} for the AudioServlet", fileName, e); + } + } + } + logger.debug("Removed timed out stream {}", streamId); }); } @@ -195,13 +228,42 @@ public String serve(AudioStream stream) { } @Override - public String serve(FixedLengthAudioStream stream, int seconds) { + public String serve(AudioStream stream, int seconds) { String streamId = UUID.randomUUID().toString(); - multiTimeStreams.put(streamId, stream); + if (stream instanceof FixedLengthAudioStream fixedLengthAudioStream) { + multiTimeStreams.put(streamId, fixedLengthAudioStream); + } else { // we prefer a FixedLengthAudioStream, but we can try to make one + try { + multiTimeStreams.put(streamId, createFixedLengthInputStream(stream, streamId)); + } catch (AudioException | IOException e) { + logger.warn("Cannot precache the audio stream to serve it", e); + } + } streamTimeouts.put(streamId, System.nanoTime() + TimeUnit.SECONDS.toNanos(seconds)); return getRelativeURL(streamId); } + private FixedLengthAudioStream createFixedLengthInputStream(AudioStream stream, String streamId) + throws IOException, AudioException { + byte[] dataBytes = stream.readNBytes(ONETIME_STREAM_BUFFER_MAX_SIZE + 1); + FixedLengthAudioStream fixedLengthAudioStreamResult; + if (dataBytes.length <= ONETIME_STREAM_BUFFER_MAX_SIZE) { + // we will use an in memory buffer to avoid disk operation + fixedLengthAudioStreamResult = new ByteArrayAudioStream(dataBytes, stream.getFormat()); + } else { + // in memory max size exceeded, sound is too long, we will use a file + File tempFile = File.createTempFile(streamId, ".snd"); + tempFile.deleteOnExit(); + try (OutputStream outputStream = new FileOutputStream(tempFile)) { + outputStream.write(dataBytes); + stream.transferTo(outputStream); + } + fixedLengthAudioStreamResult = new TemporaryFileAudioStream(tempFile, stream.getFormat()); + } + tryClose(stream); + return fixedLengthAudioStreamResult; + } + Map getMultiTimeStreams() { return Collections.unmodifiableMap(multiTimeStreams); } @@ -213,4 +275,19 @@ Map getOneTimeStreams() { private String getRelativeURL(String streamId) { return SERVLET_PATH + "/" + streamId; } + + private static class TemporaryFileAudioStream extends FileAudioStream implements FileAudioStream.Disposable { + + private File file; + + public TemporaryFileAudioStream(File file, AudioFormat format) throws AudioException { + super(file, format); + this.file = file; + } + + @Override + public void dispose() throws IOException { + Files.delete(file.toPath()); + } + } } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/webaudio/WebAudioAudioSink.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/webaudio/WebAudioAudioSink.java index 8227f10de77..282fe8d2ed7 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/webaudio/WebAudioAudioSink.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/webaudio/WebAudioAudioSink.java @@ -22,7 +22,6 @@ import org.openhab.core.audio.AudioHTTPServer; import org.openhab.core.audio.AudioSink; import org.openhab.core.audio.AudioStream; -import org.openhab.core.audio.FixedLengthAudioStream; import org.openhab.core.audio.URLAudioStream; import org.openhab.core.audio.UnsupportedAudioFormatException; import org.openhab.core.audio.UnsupportedAudioStreamException; @@ -49,8 +48,7 @@ public class WebAudioAudioSink implements AudioSink { private final Logger logger = LoggerFactory.getLogger(WebAudioAudioSink.class); private static final Set SUPPORTED_AUDIO_FORMATS = Set.of(AudioFormat.MP3, AudioFormat.WAV); - private static final Set> SUPPORTED_AUDIO_STREAMS = Set - .of(FixedLengthAudioStream.class, URLAudioStream.class); + private static final Set> SUPPORTED_AUDIO_STREAMS = Set.of(AudioStream.class); private AudioHTTPServer audioHTTPServer; private EventPublisher eventPublisher; @@ -76,14 +74,9 @@ public void process(@Nullable AudioStream audioStream) // it is an external URL, so we can directly pass this on. URLAudioStream urlAudioStream = (URLAudioStream) audioStream; sendEvent(urlAudioStream.getURL()); - } else if (audioStream instanceof FixedLengthAudioStream) { - // we need to serve it for a while and make it available to multiple clients, hence only - // FixedLengthAudioStreams are supported. - sendEvent(audioHTTPServer.serve((FixedLengthAudioStream) audioStream, 10).toString()); } else { - throw new UnsupportedAudioStreamException( - "Web audio sink can only handle FixedLengthAudioStreams and URLAudioStreams.", - audioStream.getClass()); + // we need to serve it for a while and make it available to multiple clients + sendEvent(audioHTTPServer.serve(audioStream, 10).toString()); } } catch (IOException e) { logger.debug("Error while closing the audio stream: {}", e.getMessage(), e); @@ -124,4 +117,9 @@ public PercentType getVolume() throws IOException { public void setVolume(final PercentType volume) throws IOException { throw new IOException("Web Audio sink does not support volume level changes."); } + + @Override + public boolean isSynchronous() { + return false; + } } diff --git a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/DialogProcessor.java b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/DialogProcessor.java index 7fd5a8bfa24..73d71166c6f 100644 --- a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/DialogProcessor.java +++ b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/DialogProcessor.java @@ -25,6 +25,8 @@ import org.openhab.core.audio.AudioException; import org.openhab.core.audio.AudioFormat; import org.openhab.core.audio.AudioStream; +import org.openhab.core.audio.FileAudioStream; +import org.openhab.core.audio.FileAudioStream.Disposable; import org.openhab.core.audio.UnsupportedAudioFormatException; import org.openhab.core.audio.UnsupportedAudioStreamException; import org.openhab.core.audio.utils.ToneSynthesizer; @@ -147,7 +149,7 @@ private void initToneSynthesizer(@Nullable String listeningMelodyText) { /** * Starts a persistent dialog - * + * * @throws IllegalStateException if keyword spot service is misconfigured */ public void start() throws IllegalStateException { @@ -383,12 +385,25 @@ protected void say(@Nullable String text) { if (dialogContext.sink().getSupportedStreams().stream().anyMatch(clazz -> clazz.isInstance(audioStream))) { try { dialogContext.sink().process(audioStream); + // if the stream is not needed anymore, then we should call back the AudioStream to let it a chance + // to auto dispose: + if (dialogContext.sink().isSynchronous() + && audioStream instanceof Disposable disposableAudioStream) { + disposableAudioStream.dispose(); + } } catch (UnsupportedAudioFormatException | UnsupportedAudioStreamException e) { if (logger.isDebugEnabled()) { logger.debug("Error saying '{}': {}", text, e.getMessage(), e); } else { logger.warn("Error saying '{}': {}", text, e.getMessage()); } + } catch (IOException e) { + String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown"; + if (logger.isDebugEnabled()) { + logger.debug("Cannot dispose of stream {}", fileName, e); + } else { + logger.warn("Cannot dispose of stream {}", fileName); + } } } else { logger.warn("Failed playing audio stream '{}' as audio doesn't support it.", audioStream); diff --git a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/VoiceManagerImpl.java b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/VoiceManagerImpl.java index 105eb6e7d75..cfd792531c6 100644 --- a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/VoiceManagerImpl.java +++ b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/VoiceManagerImpl.java @@ -41,6 +41,8 @@ import org.openhab.core.audio.AudioSink; import org.openhab.core.audio.AudioSource; import org.openhab.core.audio.AudioStream; +import org.openhab.core.audio.FileAudioStream; +import org.openhab.core.audio.FileAudioStream.Disposable; import org.openhab.core.audio.UnsupportedAudioFormatException; import org.openhab.core.audio.UnsupportedAudioStreamException; import org.openhab.core.common.ThreadPoolManager; @@ -293,6 +295,14 @@ public void say(String text, @Nullable String voiceId, @Nullable String sinkId, } try { sink.process(audioStream); + // if the stream is not needed anymore, then we should call back the AudioStream to let it a chance + // to auto dispose: + if (sink.isSynchronous() && audioStream instanceof Disposable disposableAudioStream) { + disposableAudioStream.dispose(); + } + } catch (IOException e) { + String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown"; + logger.warn("Cannot dispose of stream {}", fileName, e); } finally { if (volume != null && oldVolume != null) { // restore volume only if it was set before diff --git a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/AudioStreamFromCache.java b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/AudioStreamFromCache.java index 1cc79ea458a..360769944e3 100644 --- a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/AudioStreamFromCache.java +++ b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/AudioStreamFromCache.java @@ -32,10 +32,12 @@ public class AudioStreamFromCache extends FixedLengthAudioStream { private InputStreamCacheWrapper inputStream; private AudioFormat audioFormat; + private String key; - public AudioStreamFromCache(InputStreamCacheWrapper inputStream, AudioFormatInfo audioFormat) { + public AudioStreamFromCache(InputStreamCacheWrapper inputStream, AudioFormatInfo audioFormat, String key) { this.inputStream = inputStream; this.audioFormat = audioFormat.toAudioFormat(); + this.key = key; } @Override @@ -101,4 +103,9 @@ public synchronized void reset() throws IOException { public boolean markSupported() { return inputStream.markSupported(); } + + @Override + public @Nullable String getId() { + return key; + } } diff --git a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/TTSLRUCacheImpl.java b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/TTSLRUCacheImpl.java index 015aa9e4218..dba5cfddddf 100644 --- a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/TTSLRUCacheImpl.java +++ b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/TTSLRUCacheImpl.java @@ -136,7 +136,7 @@ public AudioStream get(CachedTTSService tts, String text, Voice voice, AudioForm // we are sure that the cache is used, and so we can use an AudioStream // implementation that use convenient methods for some client, like getClonedStream() // or mark /reset - return new AudioStreamFromCache(inputStreamCacheWrapper, metadata); + return new AudioStreamFromCache(inputStreamCacheWrapper, metadata, key); } else { // the cache is not used, we can use the original response AudioStream return (AudioStream) fileAndMetadata.getInputStream();