diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioHTTPServer.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioHTTPServer.java index 9788b3acbdd..2cb85ee5833 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioHTTPServer.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioHTTPServer.java @@ -12,6 +12,9 @@ */ package org.openhab.core.audio; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + import org.eclipse.jdt.annotation.NonNullByDefault; import org.openhab.core.audio.internal.AudioServlet; @@ -34,19 +37,48 @@ public interface AudioHTTPServer { * * @param stream the stream to serve on HTTP * @return the relative URL to access the stream starting with a '/' + * @deprecated Use {@link AudioHTTPServer#serve(AudioStream, int, boolean, CompletableFuture)} */ + @Deprecated String serve(AudioStream stream); /** * Creates a relative url for a given {@link AudioStream} where it can be requested multiple times within the given * time frame. - * This method only accepts {@link FixedLengthAudioStream}s, since it needs to be able to create multiple concurrent - * streams from it, which isn't possible with a regular {@link AudioStream}. + * This method accepts all {@link AudioStream}s, but it is better to use {@link ClonableAudioStream}s. If generic + * {@link AudioStream} is used, the method tries to add the Clonable capability by storing it in a small memory + * buffer, e.g {@link ByteArrayAudioStream}, or in a cached file if the stream reached the buffer capacity, + * or fails if the stream is too long. * Streams are closed, once they expire. * * @param stream the stream to serve on HTTP * @param seconds number of seconds for which the stream is available through HTTP * @return the relative URL to access the stream starting with a '/' + * @deprecated Use {@link AudioHTTPServer#serve(AudioStream, int, boolean, CompletableFuture)} + */ + @Deprecated + String serve(AudioStream stream, int seconds); + + /** + * Creates a relative url for a given {@link AudioStream} where it can be requested one or multiple times within the + * given time frame. + * This method accepts all {@link AudioStream}s, but if multiTimeStream is set to true it is better to use + * {@link ClonableAudioStream}s. Otherwise, if a generic {@link AudioStream} is used, the method will then try + * to add the Clonable capability by storing it in a small memory buffer, e.g {@link ByteArrayAudioStream}, or in a + * cached file if the stream reached the buffer capacity, or fails to render the sound completely if the stream is + * too long. + * A {@link CompletableFuture} is used to inform the caller that the playback ends in order to clean + * resources and run delayed task, such as restoring volume. + * Streams are closed, once they expire. + * + * @param stream the stream to serve on HTTP + * @param seconds number of seconds for which the stream is available through HTTP. The stream will be deleted only + * if not started, so you can set a duration shorter than the track's duration. + * @param multiTimeStream set to true if this stream should be played multiple time, and thus needs to be made + * Cloneable if it is not already. + * @return information about the {@link StreamServed}, including the relative URL to access the stream starting with + * a '/', and a CompletableFuture to know when the playback ends. + * @throws IOException when the stream is not a {@link ClonableAudioStream} and we cannot get or store it on disk. */ - String serve(FixedLengthAudioStream stream, int seconds); + StreamServed serve(AudioStream stream, int seconds, boolean multiTimeStream) throws IOException; } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioManager.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioManager.java index a60e5fe4172..d640c0f3996 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioManager.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioManager.java @@ -252,4 +252,15 @@ public interface AudioManager { * @return ids of matching sinks */ Set getSinkIds(String pattern); + + /** + * Handles a volume command change and returns a Runnable to restore it. + * Returning a Runnable allows us to have a no-op Runnable if changing volume back is not needed, and conveniently + * keeping it as one liner usable in a chain for the caller. + * + * @param volume The volume to set + * @param sink The sink to set the volume to + * @return A runnable to restore the volume to its previous value, or no-operation if no change is required. + */ + Runnable handleVolumeCommand(@Nullable PercentType volume, AudioSink sink); } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSink.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSink.java index f1ecca1d43c..adc542a5ce4 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSink.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSink.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.Locale; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -58,13 +59,47 @@ public interface AudioSink { * * In case the audioStream is null, this should be interpreted as a request to end any currently playing stream. * + * When the stream is not needed anymore, if the stream implements the {@link org.openhab.core.common.Disposable} + * interface, the sink should hereafter get rid of it by calling the dispose method. + * * @param audioStream the audio stream to play or null to keep quiet * @throws UnsupportedAudioFormatException If audioStream format is not supported * @throws UnsupportedAudioStreamException If audioStream is not supported + * @deprecated Use {@link AudioSink#processAndComplete(AudioStream)} */ + @Deprecated void process(@Nullable AudioStream audioStream) throws UnsupportedAudioFormatException, UnsupportedAudioStreamException; + /** + * Processes the passed {@link AudioStream}, and returns a CompletableFuture that should complete when the sound is + * fully played. It is the sink responsibility to complete this future. + * + * If the passed {@link AudioStream} is not supported by this instance, an {@link UnsupportedAudioStreamException} + * is thrown. + * + * If the passed {@link AudioStream} has an {@link AudioFormat} not supported by this instance, + * an {@link UnsupportedAudioFormatException} is thrown. + * + * In case the audioStream is null, this should be interpreted as a request to end any currently playing stream. + * + * When the stream is not needed anymore, if the stream implements the {@link org.openhab.core.common.Disposable} + * interface, the sink should hereafter get rid of it by calling the dispose method. + * + * @param audioStream the audio stream to play or null to keep quiet + * @return A future completed when the sound is fully played. The method can instead complete with + * UnsupportedAudioFormatException if the audioStream format is not supported, or + * UnsupportedAudioStreamException If audioStream is not supported + */ + default CompletableFuture<@Nullable Void> processAndComplete(@Nullable AudioStream audioStream) { + try { + process(audioStream); + } catch (UnsupportedAudioFormatException | UnsupportedAudioStreamException e) { + return CompletableFuture.failedFuture(e); + } + return CompletableFuture.completedFuture(null); + } + /** * Gets a set containing all supported audio formats * diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSinkAsync.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSinkAsync.java new file mode 100644 index 00000000000..c88d20c93c9 --- /dev/null +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSinkAsync.java @@ -0,0 +1,113 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.audio; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.common.Disposable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Definition of an audio output like headphones, a speaker or for writing to + * a file / clip. + * Helper class for asynchronous sink : when the process() method returns, the {@link AudioStream} + * may or may not be played. It is the responsibility of the implementing AudioSink class to + * complete the CompletableFuture when playing is done. Any delayed tasks will then be performed, such as volume + * restoration. + * + * @author Gwendal Roulleau - Initial contribution + */ +@NonNullByDefault +public abstract class AudioSinkAsync implements AudioSink { + + private final Logger logger = LoggerFactory.getLogger(AudioSinkAsync.class); + + protected final Map> runnableByAudioStream = new HashMap<>(); + + @Override + public CompletableFuture<@Nullable Void> processAndComplete(@Nullable AudioStream audioStream) { + CompletableFuture<@Nullable Void> completableFuture = new CompletableFuture<@Nullable Void>(); + if (audioStream != null) { + runnableByAudioStream.put(audioStream, completableFuture); + } + try { + processAsynchronously(audioStream); + } catch (UnsupportedAudioFormatException | UnsupportedAudioStreamException e) { + completableFuture.completeExceptionally(e); + } + if (audioStream == null) { + // No need to delay the post process task + completableFuture.complete(null); + } + return completableFuture; + } + + @Override + public void process(@Nullable AudioStream audioStream) + throws UnsupportedAudioFormatException, UnsupportedAudioStreamException { + processAsynchronously(audioStream); + } + + /** + * Processes the passed {@link AudioStream} asynchronously. This method is expected to return before the stream is + * fully played. This is the sink responsibility to call the {@link AudioSinkAsync#playbackFinished(AudioStream)} + * when it is. + * + * If the passed {@link AudioStream} is not supported by this instance, an {@link UnsupportedAudioStreamException} + * is thrown. + * + * If the passed {@link AudioStream} has an {@link AudioFormat} not supported by this instance, + * an {@link UnsupportedAudioFormatException} is thrown. + * + * In case the audioStream is null, this should be interpreted as a request to end any currently playing stream. + * + * @param audioStream the audio stream to play or null to keep quiet + * @throws UnsupportedAudioFormatException If audioStream format is not supported + * @throws UnsupportedAudioStreamException If audioStream is not supported + */ + protected abstract void processAsynchronously(@Nullable AudioStream audioStream) + throws UnsupportedAudioFormatException, UnsupportedAudioStreamException; + + /** + * Will complete the future previously returned, allowing the core to run delayed task. + * + * @param audioStream The AudioStream is the key to find the delayed CompletableFuture in the storage. + */ + protected void playbackFinished(AudioStream audioStream) { + CompletableFuture<@Nullable Void> completableFuture = runnableByAudioStream.remove(audioStream); + if (completableFuture != null) { + completableFuture.complete(null); + } + + // if the stream is not needed anymore, then we should call back the AudioStream to let it a chance + // to auto dispose. + if (audioStream instanceof Disposable disposableAudioStream) { + try { + disposableAudioStream.dispose(); + } catch (IOException e) { + String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown"; + if (logger.isDebugEnabled()) { + logger.debug("Cannot dispose of stream {}", fileName, e); + } else { + logger.warn("Cannot dispose of stream {}, reason {}", fileName, e.getMessage()); + } + } + } + } +} diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSinkSync.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSinkSync.java new file mode 100644 index 00000000000..97c888f7468 --- /dev/null +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioSinkSync.java @@ -0,0 +1,85 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.audio; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.common.Disposable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Definition of an audio output like headphones, a speaker or for writing to + * a file / clip. + * Helper class for synchronous sink : when the process() method returns, + * the source is considered played, and could be disposed. + * Any delayed tasks can then be performed, such as volume restoration. + * + * @author Gwendal Roulleau - Initial contribution + */ +@NonNullByDefault +public abstract class AudioSinkSync implements AudioSink { + + private final Logger logger = LoggerFactory.getLogger(AudioSinkSync.class); + + @Override + public CompletableFuture<@Nullable Void> processAndComplete(@Nullable AudioStream audioStream) { + try { + processSynchronously(audioStream); + return CompletableFuture.completedFuture(null); + } catch (UnsupportedAudioFormatException | UnsupportedAudioStreamException e) { + return CompletableFuture.failedFuture(e); + } finally { + // as the stream is not needed anymore, we should dispose of it + if (audioStream instanceof Disposable disposableAudioStream) { + try { + disposableAudioStream.dispose(); + } catch (IOException e) { + String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown"; + if (logger.isDebugEnabled()) { + logger.debug("Cannot dispose of stream {}", fileName, e); + } else { + logger.warn("Cannot dispose of stream {}, reason {}", fileName, e.getMessage()); + } + } + } + } + } + + @Override + public void process(@Nullable AudioStream audioStream) + throws UnsupportedAudioFormatException, UnsupportedAudioStreamException { + processSynchronously(audioStream); + } + + /** + * Processes the passed {@link AudioStream} and returns only when the playback is ended. + * + * If the passed {@link AudioStream} is not supported by this instance, an {@link UnsupportedAudioStreamException} + * is thrown. + * + * If the passed {@link AudioStream} has an {@link AudioFormat} not supported by this instance, + * an {@link UnsupportedAudioFormatException} is thrown. + * + * In case the audioStream is null, this should be interpreted as a request to end any currently playing stream. + * + * @param audioStream the audio stream to play or null to keep quiet + * @throws UnsupportedAudioFormatException If audioStream format is not supported + * @throws UnsupportedAudioStreamException If audioStream is not supported + */ + protected abstract void processSynchronously(@Nullable AudioStream audioStream) + throws UnsupportedAudioFormatException, UnsupportedAudioStreamException; +} diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioStream.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioStream.java index c122aaf5fc6..560fcc21278 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioStream.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioStream.java @@ -15,6 +15,7 @@ import java.io.InputStream; import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; /** * Wrapper for a source of audio data. @@ -37,4 +38,14 @@ public abstract class AudioStream extends InputStream { * @return The supported audio format */ public abstract AudioFormat getFormat(); + + /** + * Usefull for sinks playing the same stream multiple times, + * to avoid already done computation (like reencoding). + * + * @return A string uniquely identifying the stream. + */ + public @Nullable String getId() { + return null; + } } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/ClonableAudioStream.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/ClonableAudioStream.java new file mode 100644 index 00000000000..111e1c1d2d9 --- /dev/null +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/ClonableAudioStream.java @@ -0,0 +1,35 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.audio; + +import java.io.InputStream; + +import org.eclipse.jdt.annotation.NonNullByDefault; + +/** + * This is an {@link AudioStream}, that can be cloned + * + * @author Gwendal Roulleau - Initial contribution, separation from FixedLengthAudioStream + */ +@NonNullByDefault +public abstract class ClonableAudioStream extends AudioStream { + + /** + * Returns a new, fully independent stream instance, which can be read and closed without impacting the original + * instance. + * + * @return a new input stream that can be consumed by the caller + * @throws AudioException if stream cannot be created + */ + public abstract InputStream getClonedStream() throws AudioException; +} diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/FileAudioStream.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/FileAudioStream.java index 481d62353aa..7247742aaa6 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/FileAudioStream.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/FileAudioStream.java @@ -18,10 +18,12 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Files; import org.eclipse.jdt.annotation.NonNullByDefault; import org.openhab.core.audio.utils.AudioStreamUtils; import org.openhab.core.audio.utils.AudioWaveUtils; +import org.openhab.core.common.Disposable; /** * This is an AudioStream from an audio file @@ -31,7 +33,7 @@ * @author Christoph Weitkamp - Refactored use of filename extension */ @NonNullByDefault -public class FileAudioStream extends FixedLengthAudioStream { +public class FileAudioStream extends FixedLengthAudioStream implements Disposable { public static final String WAV_EXTENSION = "wav"; public static final String MP3_EXTENSION = "mp3"; @@ -42,16 +44,22 @@ public class FileAudioStream extends FixedLengthAudioStream { private final AudioFormat audioFormat; private InputStream inputStream; private final long length; + private final boolean isTemporaryFile; public FileAudioStream(File file) throws AudioException { this(file, getAudioFormat(file)); } public FileAudioStream(File file, AudioFormat format) throws AudioException { + this(file, format, false); + } + + public FileAudioStream(File file, AudioFormat format, boolean isTemporaryFile) throws AudioException { this.file = file; this.inputStream = getInputStream(file); this.audioFormat = format; this.length = file.length(); + this.isTemporaryFile = isTemporaryFile; } private static AudioFormat getAudioFormat(File file) throws AudioException { @@ -125,4 +133,11 @@ public synchronized void reset() throws IOException { public InputStream getClonedStream() throws AudioException { return getInputStream(file); } + + @Override + public void dispose() throws IOException { + if (isTemporaryFile) { + Files.delete(file.toPath()); + } + } } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/FixedLengthAudioStream.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/FixedLengthAudioStream.java index 6e47d35b054..4a737d50f99 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/FixedLengthAudioStream.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/FixedLengthAudioStream.java @@ -12,18 +12,16 @@ */ package org.openhab.core.audio; -import java.io.InputStream; - import org.eclipse.jdt.annotation.NonNullByDefault; /** - * This is an {@link AudioStream}, which can provide information about its absolute length and is able to provide - * cloned streams. + * This is a {@link ClonableAudioStream}, which can also provide information about its absolute length. * * @author Kai Kreuzer - Initial contribution + * @author Gwendal Roulleau - Separate getClonedStream into its own class */ @NonNullByDefault -public abstract class FixedLengthAudioStream extends AudioStream { +public abstract class FixedLengthAudioStream extends ClonableAudioStream { /** * Provides the length of the stream in bytes. @@ -31,13 +29,4 @@ public abstract class FixedLengthAudioStream extends AudioStream { * @return absolute length in bytes */ public abstract long length(); - - /** - * Returns a new, fully independent stream instance, which can be read and closed without impacting the original - * instance. - * - * @return a new input stream that can be consumed by the caller - * @throws AudioException if stream cannot be created - */ - public abstract InputStream getClonedStream() throws AudioException; } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/StreamServed.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/StreamServed.java new file mode 100644 index 00000000000..35bf5946957 --- /dev/null +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/StreamServed.java @@ -0,0 +1,30 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.audio; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; + +/** + * Streams served by the AudioHTTPServer. + * + * @author Gwendal Roulleau - Initial contribution + */ +@NonNullByDefault +public record StreamServed(String url, AudioStream audioStream, AtomicInteger currentlyServedStream, AtomicLong timeout, + boolean multiTimeStream, CompletableFuture<@Nullable Void> playEnd) { +} diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/URLAudioStream.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/URLAudioStream.java index fef7e76f579..e43ab639760 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/URLAudioStream.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/URLAudioStream.java @@ -40,7 +40,7 @@ * @author Christoph Weitkamp - Refactored use of filename extension */ @NonNullByDefault -public class URLAudioStream extends AudioStream { +public class URLAudioStream extends ClonableAudioStream { private static final Pattern PLS_STREAM_PATTERN = Pattern.compile("^File[0-9]=(.+)$"); @@ -154,4 +154,9 @@ public void close() throws IOException { public String toString() { return url; } + + @Override + public InputStream getClonedStream() throws AudioException { + return new URLAudioStream(url); + } } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioManagerImpl.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioManagerImpl.java index 914bedf9115..eb79b9344f5 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioManagerImpl.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioManagerImpl.java @@ -38,8 +38,6 @@ import org.openhab.core.audio.AudioStream; import org.openhab.core.audio.FileAudioStream; import org.openhab.core.audio.URLAudioStream; -import org.openhab.core.audio.UnsupportedAudioFormatException; -import org.openhab.core.audio.UnsupportedAudioStreamException; import org.openhab.core.audio.utils.ToneSynthesizer; import org.openhab.core.config.core.ConfigOptionProvider; import org.openhab.core.config.core.ConfigurableService; @@ -122,39 +120,11 @@ public void play(@Nullable AudioStream audioStream, @Nullable String sinkId) { public void play(@Nullable AudioStream audioStream, @Nullable String sinkId, @Nullable PercentType volume) { AudioSink sink = getSink(sinkId); if (sink != null) { - PercentType oldVolume = null; - // set notification sound volume - if (volume != null) { - try { - // get current volume - oldVolume = sink.getVolume(); - } catch (IOException e) { - logger.debug("An exception occurred while getting the volume of sink '{}' : {}", sink.getId(), - e.getMessage(), e); - } - - try { - sink.setVolume(volume); - } catch (IOException e) { - logger.debug("An exception occurred while setting the volume of sink '{}' : {}", sink.getId(), - e.getMessage(), e); - } - } - try { - sink.process(audioStream); - } catch (UnsupportedAudioFormatException | UnsupportedAudioStreamException e) { - logger.warn("Error playing '{}': {}", audioStream, e.getMessage(), e); - } finally { - if (volume != null && oldVolume != null) { - // restore volume only if it was set before - try { - sink.setVolume(oldVolume); - } catch (IOException e) { - logger.debug("An exception occurred while setting the volume of sink '{}' : {}", sink.getId(), - e.getMessage(), e); - } - } - } + Runnable restoreVolume = handleVolumeCommand(volume, sink); + sink.processAndComplete(audioStream).exceptionally((exception) -> { + logger.warn("Error playing '{}': {}", audioStream, exception.getMessage(), exception); + return null; + }).thenRun(restoreVolume); } else { logger.warn("Failed playing audio stream '{}' as no audio sink was found.", audioStream); } @@ -351,6 +321,53 @@ public Set getSinkIds(String pattern) { return null; } + @Override + public Runnable handleVolumeCommand(@Nullable PercentType volume, AudioSink sink) { + boolean volumeChanged = false; + PercentType oldVolume = null; + + Runnable toRunWhenProcessFinished = () -> { + }; + + if (volume == null) { + return toRunWhenProcessFinished; + } + + // set notification sound volume + try { + // get current volume + oldVolume = sink.getVolume(); + } catch (IOException | UnsupportedOperationException e) { + logger.debug("An exception occurred while getting the volume of sink '{}' : {}", sink.getId(), + e.getMessage(), e); + } + + if (!volume.equals(oldVolume) || oldVolume == null) { + try { + sink.setVolume(volume); + volumeChanged = true; + } catch (IOException | UnsupportedOperationException e) { + logger.debug("An exception occurred while setting the volume of sink '{}' : {}", sink.getId(), + e.getMessage(), e); + } + } + + final PercentType oldVolumeFinal = oldVolume; + // restore volume only if it was set before + if (volumeChanged && oldVolumeFinal != null) { + toRunWhenProcessFinished = () -> { + try { + sink.setVolume(oldVolumeFinal); + } catch (IOException | UnsupportedOperationException e) { + logger.debug("An exception occurred while setting the volume of sink '{}' : {}", sink.getId(), + e.getMessage(), e); + } + }; + } + + return toRunWhenProcessFinished; + } + @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC) protected void addAudioSource(AudioSource audioSource) { this.audioSources.put(audioSource.getId(), audioSource); diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioServlet.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioServlet.java index 9bcee8cef5b..e7acbb867ee 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioServlet.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/AudioServlet.java @@ -12,16 +12,24 @@ */ package org.openhab.core.audio.internal; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -37,9 +45,17 @@ import org.openhab.core.audio.AudioFormat; import org.openhab.core.audio.AudioHTTPServer; import org.openhab.core.audio.AudioStream; +import org.openhab.core.audio.ByteArrayAudioStream; +import org.openhab.core.audio.ClonableAudioStream; +import org.openhab.core.audio.FileAudioStream; import org.openhab.core.audio.FixedLengthAudioStream; +import org.openhab.core.audio.StreamServed; +import org.openhab.core.audio.utils.AudioSinkUtils; +import org.openhab.core.common.ThreadPoolManager; +import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; import org.osgi.service.http.whiteboard.propertytypes.HttpWhiteboardServletName; import org.osgi.service.http.whiteboard.propertytypes.HttpWhiteboardServletPattern; import org.slf4j.Logger; @@ -60,23 +76,34 @@ public class AudioServlet extends HttpServlet implements AudioHTTPServer { private static final List WAV_MIME_TYPES = List.of("audio/wav", "audio/x-wav", "audio/vnd.wave"); + // A 1MB in memory buffer will help playing multiple times an AudioStream, if the sink cannot do otherwise + private static final int ONETIME_STREAM_BUFFER_MAX_SIZE = 1048576; + // 5MB max for a file buffer + private static final int ONETIME_STREAM_FILE_MAX_SIZE = 5242880; + static final String SERVLET_PATH = "/audio"; private final Logger logger = LoggerFactory.getLogger(AudioServlet.class); - private final Map oneTimeStreams = new ConcurrentHashMap<>(); - private final Map multiTimeStreams = new ConcurrentHashMap<>(); + private final Map servedStreams = new ConcurrentHashMap<>(); + + private final ScheduledExecutorService threadPool = ThreadPoolManager + .getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON); + @Nullable + ScheduledFuture periodicCleaner; - private final Map streamTimeouts = new ConcurrentHashMap<>(); + private AudioSinkUtils audioSinkUtils; + + @Activate + public AudioServlet(@Reference AudioSinkUtils audioSinkUtils) { + super(); + this.audioSinkUtils = audioSinkUtils; + } @Deactivate protected synchronized void deactivate() { - multiTimeStreams.values().forEach(this::tryClose); - multiTimeStreams.clear(); - streamTimeouts.clear(); - - oneTimeStreams.values().forEach(this::tryClose); - oneTimeStreams.clear(); + servedStreams.values().stream().map(streamServed -> streamServed.audioStream()).forEach(this::tryClose); + servedStreams.clear(); } private void tryClose(@Nullable AudioStream stream) { @@ -88,29 +115,17 @@ private void tryClose(@Nullable AudioStream stream) { } } - private @Nullable InputStream prepareInputStream(final String streamId, final HttpServletResponse resp, + private InputStream prepareInputStream(final StreamServed streamServed, final HttpServletResponse resp, List acceptedMimeTypes) throws AudioException { - final AudioStream stream; - final boolean multiAccess; - if (oneTimeStreams.containsKey(streamId)) { - stream = oneTimeStreams.remove(streamId); - multiAccess = false; - } else if (multiTimeStreams.containsKey(streamId)) { - stream = multiTimeStreams.get(streamId); - multiAccess = true; - } else { - return null; - } - - logger.debug("Stream to serve is {}", streamId); + logger.debug("Stream to serve is {}", streamServed.url()); // try to set the content-type, if possible final String mimeType; - if (AudioFormat.CODEC_MP3.equals(stream.getFormat().getCodec())) { + if (AudioFormat.CODEC_MP3.equals(streamServed.audioStream().getFormat().getCodec())) { mimeType = "audio/mpeg"; - } else if (AudioFormat.CONTAINER_WAVE.equals(stream.getFormat().getContainer())) { + } else if (AudioFormat.CONTAINER_WAVE.equals(streamServed.audioStream().getFormat().getContainer())) { mimeType = WAV_MIME_TYPES.stream().filter(acceptedMimeTypes::contains).findFirst().orElse("audio/wav"); - } else if (AudioFormat.CONTAINER_OGG.equals(stream.getFormat().getContainer())) { + } else if (AudioFormat.CONTAINER_OGG.equals(streamServed.audioStream().getFormat().getContainer())) { mimeType = "audio/ogg"; } else { mimeType = null; @@ -120,16 +135,17 @@ private void tryClose(@Nullable AudioStream stream) { } // try to set the content-length, if possible - if (stream instanceof FixedLengthAudioStream audioStream) { - final long size = audioStream.length(); + if (streamServed.audioStream() instanceof FixedLengthAudioStream fixedLengthServedStream) { + final long size = fixedLengthServedStream.length(); resp.setContentLength((int) size); } - if (multiAccess) { + if (streamServed.multiTimeStream() + && streamServed.audioStream() instanceof ClonableAudioStream clonableAudioStream) { // we need to care about concurrent access and have a separate stream for each thread - return ((FixedLengthAudioStream) stream).getClonedStream(); + return clonableAudioStream.getClonedStream(); } else { - return stream; + return streamServed.audioStream(); } } @@ -146,8 +162,6 @@ private String substringBefore(String str, String separator) { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - removeTimedOutStreams(); - String requestURI = req.getRequestURI(); if (requestURI == null) { resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "requestURI is null"); @@ -159,55 +173,154 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se List acceptedMimeTypes = Stream.of(Objects.requireNonNullElse(req.getHeader("Accept"), "").split(",")) .map(String::trim).collect(Collectors.toList()); - try (final InputStream stream = prepareInputStream(streamId, resp, acceptedMimeTypes)) { - if (stream == null) { - logger.debug("Received request for invalid stream id at {}", requestURI); - resp.sendError(HttpServletResponse.SC_NOT_FOUND); - } else { - stream.transferTo(resp.getOutputStream()); + StreamServed servedStream = servedStreams.get(streamId); + if (servedStream == null) { + logger.debug("Received request for invalid stream id at {}", requestURI); + resp.sendError(HttpServletResponse.SC_NOT_FOUND); + return; + } + + // we count the number of active process using the input stream + AtomicInteger currentlyServedStream = servedStream.currentlyServedStream(); + if (currentlyServedStream.incrementAndGet() == 1 || servedStream.multiTimeStream()) { + try (final InputStream stream = prepareInputStream(servedStream, resp, acceptedMimeTypes)) { + Long endOfPlayTimestamp = audioSinkUtils.transferAndAnalyzeLength(stream, resp.getOutputStream(), + servedStream.audioStream().getFormat()); + // update timeout with the sound duration : + if (endOfPlayTimestamp != null) { + servedStream.timeout().set(Math.max(servedStream.timeout().get(), endOfPlayTimestamp)); + } resp.flushBuffer(); + } catch (final AudioException ex) { + resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, ex.getMessage()); + } finally { + currentlyServedStream.decrementAndGet(); } - } catch (final AudioException ex) { - resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, ex.getMessage()); + } else { + logger.debug("Received request for already consumed stream id at {}", requestURI); + resp.sendError(HttpServletResponse.SC_NOT_FOUND); + return; + } + + // we can immediately dispose and remove, if it is a one time stream + if (!servedStream.multiTimeStream()) { + servedStreams.remove(streamId); + servedStream.playEnd().complete(null); + logger.debug("Removed timed out stream {}", streamId); } } private synchronized void removeTimedOutStreams() { // Build list of expired streams. long now = System.nanoTime(); - final List toRemove = streamTimeouts.entrySet().stream().filter(e -> e.getValue() < now) + final List toRemove = servedStreams.entrySet().stream() + .filter(e -> e.getValue().timeout().get() < now && e.getValue().currentlyServedStream().get() <= 0) .map(Entry::getKey).collect(Collectors.toList()); toRemove.forEach(streamId -> { - // the stream has expired, we need to remove it! - final FixedLengthAudioStream stream = multiTimeStreams.remove(streamId); - streamTimeouts.remove(streamId); - tryClose(stream); - logger.debug("Removed timed out stream {}", streamId); + // the stream has expired and no one is using it, we need to remove it! + StreamServed streamServed = servedStreams.remove(streamId); + if (streamServed != null) { + tryClose(streamServed.audioStream()); + // we can notify the caller of the stream consumption + streamServed.playEnd().complete(null); + logger.debug("Removed timed out stream {}", streamId); + } }); + + // Because the callback should be executed as soon as possible, + // we cannot wait for the next doGet to perform a clean. So we have to schedule a periodic cleaner. + ScheduledFuture periodicCleanerLocal = periodicCleaner; + if (!servedStreams.isEmpty()) { + if (periodicCleanerLocal == null || periodicCleanerLocal.isDone()) { + // reschedule a clean + periodicCleaner = threadPool.scheduleWithFixedDelay(this::removeTimedOutStreams, 5, 5, + TimeUnit.SECONDS); + } + } else if (periodicCleanerLocal != null) { // no more stream to serve, shut the periodic cleaning thread: + periodicCleanerLocal.cancel(true); + periodicCleaner = null; + } } @Override public String serve(AudioStream stream) { - String streamId = UUID.randomUUID().toString(); - oneTimeStreams.put(streamId, stream); - return getRelativeURL(streamId); + try { + // In case the stream is never played, we cannot wait indefinitely before executing the callback. + // so we set a timeout (even if this is a one time stream). + return serve(stream, 10, false).url(); + } catch (IOException e) { + logger.warn("Cannot precache the audio stream to serve it", e); + return getRelativeURL("error"); + } + } + + @Override + public String serve(AudioStream stream, int seconds) { + try { + return serve(stream, seconds, true).url(); + } catch (IOException e) { + logger.warn("Cannot precache the audio stream to serve it", e); + return getRelativeURL("error"); + } } @Override - public String serve(FixedLengthAudioStream stream, int seconds) { + public StreamServed serve(AudioStream originalStream, int seconds, boolean multiTimeStream) throws IOException { String streamId = UUID.randomUUID().toString(); - multiTimeStreams.put(streamId, stream); - streamTimeouts.put(streamId, System.nanoTime() + TimeUnit.SECONDS.toNanos(seconds)); - return getRelativeURL(streamId); + AudioStream audioStream = originalStream; + if (!(originalStream instanceof ClonableAudioStream) && multiTimeStream) { + // we we can try to make a Cloneable stream as it is needed + audioStream = createClonableInputStream(originalStream, streamId); + } + long timeOut = System.nanoTime() + TimeUnit.SECONDS.toNanos(seconds); + CompletableFuture<@Nullable Void> playEnd = new CompletableFuture<@Nullable Void>(); + StreamServed streamToServe = new StreamServed(getRelativeURL(streamId), audioStream, new AtomicInteger(), + new AtomicLong(timeOut), multiTimeStream, playEnd); + servedStreams.put(streamId, streamToServe); + + // try to clean, or a least launch the periodic cleanse: + removeTimedOutStreams(); + + return streamToServe; } - Map getMultiTimeStreams() { - return Collections.unmodifiableMap(multiTimeStreams); + private ClonableAudioStream createClonableInputStream(AudioStream stream, String streamId) throws IOException { + byte[] dataBytes = stream.readNBytes(ONETIME_STREAM_BUFFER_MAX_SIZE + 1); + ClonableAudioStream clonableAudioStreamResult; + if (dataBytes.length <= ONETIME_STREAM_BUFFER_MAX_SIZE) { + // we will use an in memory buffer to avoid disk operation + clonableAudioStreamResult = new ByteArrayAudioStream(dataBytes, stream.getFormat()); + } else { + // in memory max size exceeded, sound is too long, we will use a file + File tempFile = File.createTempFile(streamId, ".snd"); + tempFile.deleteOnExit(); + try (OutputStream outputStream = new FileOutputStream(tempFile)) { + // copy already read data to file : + outputStream.write(dataBytes); + // copy the remaining stream data to a file. + byte[] buf = new byte[8192]; + int length; + // but with a limit + int fileSize = ONETIME_STREAM_BUFFER_MAX_SIZE + 1; + while ((length = stream.read(buf)) != -1 && fileSize < ONETIME_STREAM_FILE_MAX_SIZE) { + int lengthToWrite = Math.min(length, ONETIME_STREAM_FILE_MAX_SIZE - fileSize); + outputStream.write(buf, 0, lengthToWrite); + fileSize += lengthToWrite; + } + } + try { + clonableAudioStreamResult = new FileAudioStream(tempFile, stream.getFormat(), true); + } catch (AudioException e) { // this is in fact a FileNotFoundException and should not happen + throw new IOException("Cannot find the cache file we just created.", e); + } + } + tryClose(stream); + return clonableAudioStreamResult; } - Map getOneTimeStreams() { - return Collections.unmodifiableMap(oneTimeStreams); + Map getServedStreams() { + return Collections.unmodifiableMap(servedStreams); } private String getRelativeURL(String streamId) { diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/javasound/JavaSoundAudioSink.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/javasound/JavaSoundAudioSink.java index 606b20e0195..41208b514f6 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/javasound/JavaSoundAudioSink.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/javasound/JavaSoundAudioSink.java @@ -13,7 +13,6 @@ package org.openhab.core.audio.internal.javasound; import java.io.IOException; -import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.util.Locale; import java.util.Scanner; @@ -32,6 +31,7 @@ import org.eclipse.jdt.annotation.Nullable; import org.openhab.core.audio.AudioFormat; import org.openhab.core.audio.AudioSink; +import org.openhab.core.audio.AudioSinkAsync; import org.openhab.core.audio.AudioStream; import org.openhab.core.audio.URLAudioStream; import org.openhab.core.audio.UnsupportedAudioFormatException; @@ -55,7 +55,7 @@ */ @NonNullByDefault @Component(service = AudioSink.class, immediate = true) -public class JavaSoundAudioSink implements AudioSink { +public class JavaSoundAudioSink extends AudioSinkAsync { private static final Logger LOGGER = LoggerFactory.getLogger(JavaSoundAudioSink.class); @@ -79,13 +79,14 @@ protected void activate(BundleContext context) { } @Override - public synchronized void process(final @Nullable AudioStream audioStream) + public synchronized void processAsynchronously(final @Nullable AudioStream audioStream) throws UnsupportedAudioFormatException, UnsupportedAudioStreamException { if (audioStream != null && !AudioFormat.CODEC_MP3.equals(audioStream.getFormat().getCodec())) { AudioPlayer audioPlayer = new AudioPlayer(audioStream); audioPlayer.start(); try { audioPlayer.join(); + playbackFinished(audioStream); } catch (InterruptedException e) { LOGGER.error("Playing audio has been interrupted."); } @@ -103,8 +104,7 @@ public synchronized void process(final @Nullable AudioStream audioStream) } else { try { // we start a new continuous stream and store its handle - streamPlayer = new Player(audioStream); - playInThread(streamPlayer); + playInThread(audioStream, true); } catch (JavaLayerException e) { LOGGER.error("An exception occurred while playing url audio stream : '{}'", e.getMessage()); } @@ -113,7 +113,7 @@ public synchronized void process(final @Nullable AudioStream audioStream) } else { // we are playing some normal file (no url stream) try { - playInThread(new Player(audioStream)); + playInThread(audioStream, false); } catch (JavaLayerException e) { LOGGER.error("An exception occurred while playing audio : '{}'", e.getMessage()); } @@ -121,17 +121,20 @@ public synchronized void process(final @Nullable AudioStream audioStream) } } - private void playInThread(final @Nullable Player player) { + private void playInThread(final AudioStream audioStream, boolean store) throws JavaLayerException { // run in new thread + Player streamPlayerFinal = new Player(audioStream); + if (store) { // we store its handle in case we want to interrupt it. + streamPlayer = streamPlayerFinal; + } threadFactory.newThread(() -> { - if (player != null) { - try { - player.play(); - } catch (Exception e) { - LOGGER.error("An exception occurred while playing audio : '{}'", e.getMessage()); - } finally { - player.close(); - } + try { + streamPlayerFinal.play(); + } catch (Exception e) { + LOGGER.error("An exception occurred while playing audio : '{}'", e.getMessage()); + } finally { + streamPlayerFinal.close(); + playbackFinished(audioStream); } }).start(); } @@ -174,7 +177,7 @@ public PercentType getVolume() throws IOException { return true; }); if (volumes[0] != null) { - return new PercentType(new BigDecimal(volumes[0] * 100f)); + return new PercentType(Math.round(volumes[0] * 100f)); } else { LOGGER.warn("Cannot determine master volume level - assuming 100%"); return PercentType.HUNDRED; diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/webaudio/WebAudioAudioSink.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/webaudio/WebAudioAudioSink.java index 7c9299b5847..36e492ee772 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/webaudio/WebAudioAudioSink.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/webaudio/WebAudioAudioSink.java @@ -21,8 +21,9 @@ import org.openhab.core.audio.AudioFormat; import org.openhab.core.audio.AudioHTTPServer; import org.openhab.core.audio.AudioSink; +import org.openhab.core.audio.AudioSinkAsync; import org.openhab.core.audio.AudioStream; -import org.openhab.core.audio.FixedLengthAudioStream; +import org.openhab.core.audio.StreamServed; import org.openhab.core.audio.URLAudioStream; import org.openhab.core.audio.UnsupportedAudioFormatException; import org.openhab.core.audio.UnsupportedAudioStreamException; @@ -44,13 +45,12 @@ */ @NonNullByDefault @Component(service = AudioSink.class, immediate = true) -public class WebAudioAudioSink implements AudioSink { +public class WebAudioAudioSink extends AudioSinkAsync { private final Logger logger = LoggerFactory.getLogger(WebAudioAudioSink.class); private static final Set SUPPORTED_AUDIO_FORMATS = Set.of(AudioFormat.MP3, AudioFormat.WAV); - private static final Set> SUPPORTED_AUDIO_STREAMS = Set - .of(FixedLengthAudioStream.class, URLAudioStream.class); + private static final Set> SUPPORTED_AUDIO_STREAMS = Set.of(AudioStream.class); private AudioHTTPServer audioHTTPServer; private EventPublisher eventPublisher; @@ -62,7 +62,7 @@ public WebAudioAudioSink(@Reference AudioHTTPServer audioHTTPServer, @Reference } @Override - public void process(@Nullable AudioStream audioStream) + public void processAsynchronously(@Nullable AudioStream audioStream) throws UnsupportedAudioFormatException, UnsupportedAudioStreamException { if (audioStream == null) { // in case the audioStream is null, this should be interpreted as a request to end any currently playing @@ -70,22 +70,26 @@ public void process(@Nullable AudioStream audioStream) logger.debug("Web Audio sink does not support stopping the currently playing stream."); return; } - try (AudioStream stream = audioStream) { - logger.debug("Received audio stream of format {}", audioStream.getFormat()); - if (audioStream instanceof URLAudioStream urlAudioStream) { + logger.debug("Received audio stream of format {}", audioStream.getFormat()); + if (audioStream instanceof URLAudioStream urlAudioStream) { + try (AudioStream stream = urlAudioStream) { + // in this case only, we need to close the stream by ourself in a try with block, + // because nothing will consume it // it is an external URL, so we can directly pass this on. sendEvent(urlAudioStream.getURL()); - } else if (audioStream instanceof FixedLengthAudioStream lengthAudioStream) { - // we need to serve it for a while and make it available to multiple clients, hence only - // FixedLengthAudioStreams are supported. - sendEvent(audioHTTPServer.serve(lengthAudioStream, 10)); - } else { - throw new UnsupportedAudioStreamException( - "Web audio sink can only handle FixedLengthAudioStreams and URLAudioStreams.", - audioStream.getClass()); + } catch (IOException e) { + logger.debug("Error while closing the audio stream: {}", e.getMessage(), e); + } + } else { + // we need to serve it for a while and make it available to multiple clients + try { + StreamServed servedStream = audioHTTPServer.serve(audioStream, 10, true); + // we will let the HTTP servlet run the delayed task when finished with the stream + servedStream.playEnd().thenRun(() -> this.playbackFinished(audioStream)); + sendEvent(servedStream.url()); + } catch (IOException e) { + logger.warn("Cannot precache the audio stream to serve it", e); } - } catch (IOException e) { - logger.debug("Error while closing the audio stream: {}", e.getMessage(), e); } } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/utils/AudioSinkUtils.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/utils/AudioSinkUtils.java new file mode 100644 index 00000000000..b5a96e10811 --- /dev/null +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/utils/AudioSinkUtils.java @@ -0,0 +1,43 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.audio.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.audio.AudioFormat; + +/** + * Some utility methods for sink + * + * @author Gwendal Roulleau - Initial contribution + * + */ +@NonNullByDefault +public interface AudioSinkUtils { + + /** + * Transfers data from an input stream to an output stream and computes on the fly its duration + * + * @param in the input stream giving audio data ta play + * @param out the output stream receiving data to play + * @return the timestamp (from System.nanoTime) when the sound should be fully played. Returns null if computing + * time fails. + * @throws IOException if reading from the stream or writing to the stream failed + */ + @Nullable + Long transferAndAnalyzeLength(InputStream in, OutputStream out, AudioFormat audioFormat) throws IOException; +} diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/utils/AudioSinkUtilsImpl.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/utils/AudioSinkUtilsImpl.java new file mode 100644 index 00000000000..f49da0ce8e0 --- /dev/null +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/utils/AudioSinkUtilsImpl.java @@ -0,0 +1,91 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.audio.utils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import javazoom.jl.decoder.Bitstream; +import javazoom.jl.decoder.BitstreamException; +import javazoom.jl.decoder.Header; + +import javax.sound.sampled.AudioInputStream; +import javax.sound.sampled.AudioSystem; +import javax.sound.sampled.UnsupportedAudioFileException; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.audio.AudioFormat; +import org.osgi.service.component.annotations.Component; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Some utility methods for sink + * + * @author Gwendal Roulleau - Initial contribution + * + */ +@NonNullByDefault +@Component +public class AudioSinkUtilsImpl implements AudioSinkUtils { + + private final Logger logger = LoggerFactory.getLogger(AudioSinkUtilsImpl.class); + + @Override + public @Nullable Long transferAndAnalyzeLength(InputStream in, OutputStream out, AudioFormat audioFormat) + throws IOException { + // take some data from the stream beginning + byte[] dataBytes = in.readNBytes(8192); + + // beginning sound timestamp : + long startTime = System.nanoTime(); + // copy already read data to the output stream : + out.write(dataBytes); + // transfer everything else + Long dataTransferedLength = dataBytes.length + in.transferTo(out); + + if (dataTransferedLength > 0) { + if (AudioFormat.CODEC_PCM_SIGNED.equals(audioFormat.getCodec())) { + try (AudioInputStream audioInputStream = AudioSystem + .getAudioInputStream(new ByteArrayInputStream(dataBytes))) { + int frameSize = audioInputStream.getFormat().getFrameSize(); + float frameRate = audioInputStream.getFormat().getFrameRate(); + long computedDuration = Float.valueOf((dataTransferedLength / (frameSize * frameRate)) * 1000000000) + .longValue(); + return startTime + computedDuration; + } catch (IOException | UnsupportedAudioFileException e) { + logger.debug("Cannot compute the duration of input stream", e); + return null; + } + } else if (AudioFormat.CODEC_MP3.equals(audioFormat.getCodec())) { + // not precise, no VBR, but better than nothing + Bitstream bitstream = new Bitstream(new ByteArrayInputStream(dataBytes)); + try { + Header h = bitstream.readFrame(); + if (h != null) { + long computedDuration = Float.valueOf(h.total_ms(dataTransferedLength.intValue()) * 1000000) + .longValue(); + return startTime + computedDuration; + } + } catch (BitstreamException ex) { + logger.debug("Cannot compute the duration of input stream", ex); + return null; + } + } + } + + return null; + } +} diff --git a/bundles/org.openhab.core.audio/src/test/java/org/openhab/core/audio/internal/AbstractAudioServletTest.java b/bundles/org.openhab.core.audio/src/test/java/org/openhab/core/audio/internal/AbstractAudioServletTest.java index 14224cb2ac6..c70545adc4e 100644 --- a/bundles/org.openhab.core.audio/src/test/java/org/openhab/core/audio/internal/AbstractAudioServletTest.java +++ b/bundles/org.openhab.core.audio/src/test/java/org/openhab/core/audio/internal/AbstractAudioServletTest.java @@ -33,7 +33,8 @@ import org.openhab.core.audio.AudioFormat; import org.openhab.core.audio.AudioStream; import org.openhab.core.audio.ByteArrayAudioStream; -import org.openhab.core.audio.FixedLengthAudioStream; +import org.openhab.core.audio.utils.AudioSinkUtils; +import org.openhab.core.audio.utils.AudioSinkUtilsImpl; import org.openhab.core.test.TestPortUtil; import org.openhab.core.test.TestServer; import org.openhab.core.test.java.JavaTest; @@ -62,10 +63,11 @@ public abstract class AbstractAudioServletTest extends JavaTest { public @Mock @NonNullByDefault({}) HttpService httpServiceMock; public @Mock @NonNullByDefault({}) HttpContext httpContextMock; + public AudioSinkUtils audioSinkUtils = new AudioSinkUtilsImpl(); @BeforeEach public void setupServerAndClient() { - audioServlet = new AudioServlet(); + audioServlet = new AudioServlet(audioSinkUtils); ServletHolder servletHolder = new ServletHolder(audioServlet); @@ -126,7 +128,7 @@ protected String serveStream(AudioStream stream, @Nullable Integer timeInterval) String path; if (timeInterval != null) { - path = audioServlet.serve((FixedLengthAudioStream) stream, timeInterval); + path = audioServlet.serve(stream, timeInterval); } else { path = audioServlet.serve(stream); } diff --git a/bundles/org.openhab.core.audio/src/test/java/org/openhab/core/audio/internal/AudioServletTest.java b/bundles/org.openhab.core.audio/src/test/java/org/openhab/core/audio/internal/AudioServletTest.java index 13e4957b5a6..6f7200604a6 100644 --- a/bundles/org.openhab.core.audio/src/test/java/org/openhab/core/audio/internal/AudioServletTest.java +++ b/bundles/org.openhab.core.audio/src/test/java/org/openhab/core/audio/internal/AudioServletTest.java @@ -14,10 +14,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.*; import java.io.File; import java.util.concurrent.TimeUnit; @@ -29,8 +27,10 @@ import org.junit.jupiter.api.Test; import org.openhab.core.audio.AudioFormat; import org.openhab.core.audio.AudioStream; +import org.openhab.core.audio.ByteArrayAudioStream; import org.openhab.core.audio.FileAudioStream; import org.openhab.core.audio.FixedLengthAudioStream; +import org.openhab.core.audio.StreamServed; import org.openhab.core.audio.internal.utils.BundledSoundFileHandler; /** @@ -128,7 +128,7 @@ public void onlyOneRequestToOneTimeStreamsCanBeMade() throws Exception { } @Test - public void requestToMultitimeStreamCannotBeDoneAfterTheTimeoutOfTheStreamHasExipred() throws Exception { + public void requestToMultitimeStreamCannotBeDoneAfterTheTimeoutOfTheStreamHasExpired() throws Exception { final int streamTimeout = 3; AudioStream audioStream = getByteArrayAudioStream(testByteArray, AudioFormat.CONTAINER_NONE, @@ -151,8 +151,8 @@ public void requestToMultitimeStreamCannotBeDoneAfterTheTimeoutOfTheStreamHasExi assertThat("The response media type was not as expected", response.getMediaType(), is(MEDIA_TYPE_AUDIO_MPEG)); - assertThat("The audio stream was not added to the multitime streams", - audioServlet.getMultiTimeStreams().containsValue(audioStream), is(true)); + assertThat("The audio stream was not added to the multitime streams", audioServlet.getServedStreams() + .values().stream().map(StreamServed::audioStream).toList().contains(audioStream), is(true)); } waitForAssert(() -> { @@ -161,27 +161,54 @@ public void requestToMultitimeStreamCannotBeDoneAfterTheTimeoutOfTheStreamHasExi } catch (Exception e) { throw new IllegalStateException(e); } - assertThat("The audio stream was not removed from multitime streams", - audioServlet.getMultiTimeStreams().containsValue(audioStream), is(false)); + assertThat("The audio stream was not removed from multitime streams", audioServlet.getServedStreams() + .values().stream().map(StreamServed::audioStream).toList().contains(audioStream), is(false)); }); response = getHttpRequest(url).send(); assertThat("The response status was not as expected", response.getStatus(), is(HttpStatus.NOT_FOUND_404)); } + @Test + public void oneTimeStreamIsRecreatedAsAClonable() throws Exception { + AudioStream audioStream = mock(AudioStream.class); + AudioFormat audioFormat = mock(AudioFormat.class); + when(audioStream.getFormat()).thenReturn(audioFormat); + when(audioFormat.getCodec()).thenReturn(AudioFormat.CODEC_MP3); + when(audioStream.readNBytes(anyInt())).thenReturn(testByteArray); + + String url = serveStream(audioStream, 10); + String uuid = url.substring(url.lastIndexOf("/") + 1); + StreamServed servedStream = audioServlet.getServedStreams().get(uuid); + + // does not contain directly the stream because it is now a new stream wrapper + assertThat(servedStream.audioStream(), not(audioStream)); + // it is now a ByteArrayAudioStream wrapper : + assertThat(servedStream.audioStream(), instanceOf(ByteArrayAudioStream.class)); + + ContentResponse response = getHttpRequest(url).send(); + assertThat("The response content was not as expected", response.getContent(), is(testByteArray)); + + verify(audioStream).close(); + } + @Test public void oneTimeStreamIsClosedAndRemovedAfterServed() throws Exception { AudioStream audioStream = mock(AudioStream.class); AudioFormat audioFormat = mock(AudioFormat.class); when(audioStream.getFormat()).thenReturn(audioFormat); when(audioFormat.getCodec()).thenReturn(AudioFormat.CODEC_MP3); + when(audioStream.readNBytes(anyInt())).thenReturn(new byte[] { 1, 2, 3 }); String url = serveStream(audioStream); + assertThat(audioServlet.getServedStreams().values().stream().map(StreamServed::audioStream).toList(), + contains(audioStream)); getHttpRequest(url).send(); verify(audioStream).close(); - assertThat(audioServlet.getOneTimeStreams().values(), not(contains(audioStream))); + assertThat(audioServlet.getServedStreams().values().stream().map(StreamServed::audioStream).toList(), + not(contains(audioStream))); } @Test @@ -195,9 +222,13 @@ public void multiTimeStreamIsClosedAfterExpired() throws Exception { cloneCounter.getAndIncrement(); return clonedStream; }); + when(audioStream.readNBytes(anyInt())).thenReturn(new byte[] { 1, 2, 3 }); + when(clonedStream.readNBytes(anyInt())).thenReturn(new byte[] { 1, 2, 3 }); when(audioFormat.getCodec()).thenReturn(AudioFormat.CODEC_MP3); String url = serveStream(audioStream, 2); + assertThat(audioServlet.getServedStreams().values().stream().map(StreamServed::audioStream).toList(), + contains(audioStream)); waitForAssert(() -> { try { @@ -210,7 +241,8 @@ public void multiTimeStreamIsClosedAfterExpired() throws Exception { }); verify(audioStream).close(); - assertThat(audioServlet.getMultiTimeStreams().values(), not(contains(audioStream))); + assertThat(audioServlet.getServedStreams().values().stream().map(StreamServed::audioStream).toList(), + not(contains(audioStream))); verify(clonedStream, times(cloneCounter.get())).close(); } diff --git a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/VoiceManagerImpl.java b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/VoiceManagerImpl.java index f08df2250ab..de770481a69 100644 --- a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/VoiceManagerImpl.java +++ b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/VoiceManagerImpl.java @@ -12,7 +12,6 @@ */ package org.openhab.core.voice.internal; -import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -41,8 +40,6 @@ import org.openhab.core.audio.AudioSink; import org.openhab.core.audio.AudioSource; import org.openhab.core.audio.AudioStream; -import org.openhab.core.audio.UnsupportedAudioFormatException; -import org.openhab.core.audio.UnsupportedAudioStreamException; import org.openhab.core.common.ThreadPoolManager; import org.openhab.core.config.core.ConfigOptionProvider; import org.openhab.core.config.core.ConfigurableService; @@ -272,39 +269,12 @@ public void say(String text, @Nullable String voiceId, @Nullable String sinkId, throw new TTSException( "Failed playing audio stream '" + audioStream + "' as audio sink doesn't support it"); } - - PercentType oldVolume = null; - // set notification sound volume - if (volume != null) { - try { - // get current volume - oldVolume = sink.getVolume(); - } catch (IOException e) { - logger.debug("An exception occurred while getting the volume of sink '{}' : {}", sink.getId(), - e.getMessage(), e); - } - - try { - sink.setVolume(volume); - } catch (IOException e) { - logger.debug("An exception occurred while setting the volume of sink '{}' : {}", sink.getId(), - e.getMessage(), e); - } - } - try { - sink.process(audioStream); - } finally { - if (volume != null && oldVolume != null) { - // restore volume only if it was set before - try { - sink.setVolume(oldVolume); - } catch (IOException e) { - logger.debug("An exception occurred while setting the volume of sink '{}' : {}", sink.getId(), - e.getMessage(), e); - } - } - } - } catch (TTSException | UnsupportedAudioFormatException | UnsupportedAudioStreamException e) { + Runnable restoreVolume = audioManager.handleVolumeCommand(volume, sink); + sink.processAndComplete(audioStream).exceptionally(exception -> { + logger.warn("Error playing '{}': {}", audioStream, exception.getMessage(), exception); + return null; + }).thenRun(restoreVolume); + } catch (TTSException e) { if (logger.isDebugEnabled()) { logger.debug("Error saying '{}': {}", text, e.getMessage(), e); } else { diff --git a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/AudioStreamFromCache.java b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/AudioStreamFromCache.java index 1cc79ea458a..360769944e3 100644 --- a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/AudioStreamFromCache.java +++ b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/AudioStreamFromCache.java @@ -32,10 +32,12 @@ public class AudioStreamFromCache extends FixedLengthAudioStream { private InputStreamCacheWrapper inputStream; private AudioFormat audioFormat; + private String key; - public AudioStreamFromCache(InputStreamCacheWrapper inputStream, AudioFormatInfo audioFormat) { + public AudioStreamFromCache(InputStreamCacheWrapper inputStream, AudioFormatInfo audioFormat, String key) { this.inputStream = inputStream; this.audioFormat = audioFormat.toAudioFormat(); + this.key = key; } @Override @@ -101,4 +103,9 @@ public synchronized void reset() throws IOException { public boolean markSupported() { return inputStream.markSupported(); } + + @Override + public @Nullable String getId() { + return key; + } } diff --git a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/TTSLRUCacheImpl.java b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/TTSLRUCacheImpl.java index 03ba00db2ad..ace6df0dce6 100644 --- a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/TTSLRUCacheImpl.java +++ b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/cache/TTSLRUCacheImpl.java @@ -135,7 +135,7 @@ public AudioStream get(CachedTTSService tts, String text, Voice voice, AudioForm // we are sure that the cache is used, and so we can use an AudioStream // implementation that use convenient methods for some client, like getClonedStream() // or mark /reset - return new AudioStreamFromCache(inputStreamCacheWrapper, metadata); + return new AudioStreamFromCache(inputStreamCacheWrapper, metadata, key); } else { // the cache is not used, we can use the original response AudioStream return (AudioStream) fileAndMetadata.getInputStream(); diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/cache/lru/LRUMediaCacheEntry.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/cache/lru/LRUMediaCacheEntry.java index 74c83aaabde..8bf60773cc8 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/cache/lru/LRUMediaCacheEntry.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/cache/lru/LRUMediaCacheEntry.java @@ -25,6 +25,7 @@ import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.common.Disposable; import org.openhab.core.storage.Storage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,6 +234,9 @@ protected void closeStreamClient() throws IOException { if (inputStreamLocal != null) { inputStreamLocal.close(); } + if (inputStreamLocal instanceof Disposable disposableStream) { + disposableStream.dispose(); + } } } } finally { diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/common/Disposable.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/Disposable.java new file mode 100644 index 00000000000..b2b4fd9db0d --- /dev/null +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/Disposable.java @@ -0,0 +1,28 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.common; + +import java.io.IOException; + +import org.eclipse.jdt.annotation.NonNullByDefault; + +/** + * For resource needing a callback when they are not needed anymore. + * + * @author Gwendal Roulleau - Initial contribution + */ +@NonNullByDefault +@FunctionalInterface +public interface Disposable { + void dispose() throws IOException; +} diff --git a/bundles/org.openhab.core/src/test/java/org/openhab/core/cache/lru/LRUMediaCacheEntryTest.java b/bundles/org.openhab.core/src/test/java/org/openhab/core/cache/lru/LRUMediaCacheEntryTest.java index 078f80ea043..619ee0c0923 100644 --- a/bundles/org.openhab.core/src/test/java/org/openhab/core/cache/lru/LRUMediaCacheEntryTest.java +++ b/bundles/org.openhab.core/src/test/java/org/openhab/core/cache/lru/LRUMediaCacheEntryTest.java @@ -26,6 +26,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.junit.jupiter.api.BeforeEach; @@ -191,10 +193,21 @@ public void loadTwoThreadsAtTheSameTimeFromTheSameSupplierTest() throws IOExcept InputStream actualAudioStream2 = lruMediaCacheEntry.getInputStream(); // read bytes from the two stream concurrently + Mutable<@Nullable IOException> exceptionCatched = new MutableObject<>(); List parallelAudioStreamList = Arrays.asList(actualAudioStream1, actualAudioStream2); - List bytesResultList = parallelAudioStreamList.parallelStream().map(this::readSafe) - .collect(Collectors.toList()); + List bytesResultList = parallelAudioStreamList.parallelStream().map(stream -> { + try { + return stream.readAllBytes(); + } catch (IOException e) { + exceptionCatched.setValue(e); + return new byte[0]; + } + }).collect(Collectors.toList()); + IOException possibleException = exceptionCatched.getValue(); + if (possibleException != null) { + throw possibleException; + } assertArrayEquals(randomData, bytesResultList.get(0)); assertArrayEquals(randomData, bytesResultList.get(1)); @@ -208,14 +221,6 @@ public void loadTwoThreadsAtTheSameTimeFromTheSameSupplierTest() throws IOExcept verifyNoMoreInteractions(ttsServiceMock); } - private byte[] readSafe(InputStream InputStream) { - try { - return InputStream.readAllBytes(); - } catch (IOException e) { - return new byte[0]; - } - } - private byte[] getRandomData(int length) { Random random = new Random(); byte[] randomBytes = new byte[length];