Skip to content

Commit

Permalink
[pulseaudio] Make the process method asynchronous (openhab#15179)
Browse files Browse the repository at this point in the history
* [pulseaudio] Make the process method asynchronous

And use the new 'complete' system to signal core that the sound is fully played.

---------

Signed-off-by: Gwendal Roulleau <[email protected]>
Signed-off-by: Matt Myers <[email protected]>
  • Loading branch information
dalgwen authored and matchews committed Aug 9, 2023
1 parent 419ff94 commit 45e1034
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.audio.AudioFormat;
import org.openhab.core.audio.AudioStream;
import org.openhab.core.audio.FixedLengthAudioStream;
import org.openhab.core.audio.SizeableAudioStream;
import org.openhab.core.audio.UnsupportedAudioFormatException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,8 +58,8 @@ public ConvertedInputStream(AudioStream innerInputStream)
throws UnsupportedAudioFormatException, UnsupportedAudioFileException, IOException {
this.audioFormat = innerInputStream.getFormat();

if (innerInputStream instanceof FixedLengthAudioStream) {
length = ((FixedLengthAudioStream) innerInputStream).length();
if (innerInputStream instanceof SizeableAudioStream sizeableAudioStream) {
length = sizeableAudioStream.length();
}

pcmNormalizedInputStream = getPCMStreamNormalized(getPCMStream(new BufferedInputStream(innerInputStream)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
import java.net.Socket;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.sound.sampled.UnsupportedAudioFileException;

Expand All @@ -28,9 +29,11 @@
import org.openhab.core.audio.AudioFormat;
import org.openhab.core.audio.AudioSink;
import org.openhab.core.audio.AudioStream;
import org.openhab.core.audio.FixedLengthAudioStream;
import org.openhab.core.audio.FileAudioStream;
import org.openhab.core.audio.UnsupportedAudioFormatException;
import org.openhab.core.audio.UnsupportedAudioStreamException;
import org.openhab.core.audio.utils.AudioSinkUtils;
import org.openhab.core.common.Disposable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,24 +50,29 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen

private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSink.class);

private static final HashSet<AudioFormat> SUPPORTED_FORMATS = new HashSet<>();
private static final HashSet<Class<? extends AudioStream>> SUPPORTED_STREAMS = new HashSet<>();
private AudioSinkUtils audioSinkUtils;

static {
SUPPORTED_FORMATS.add(AudioFormat.WAV);
SUPPORTED_FORMATS.add(AudioFormat.MP3);
SUPPORTED_STREAMS.add(FixedLengthAudioStream.class);
}
private static final Set<AudioFormat> SUPPORTED_FORMATS = Set.of(AudioFormat.WAV, AudioFormat.MP3);
private static final Set<Class<? extends AudioStream>> SUPPORTED_STREAMS = Set.of(AudioStream.class);
private static final AudioFormat TARGET_FORMAT = new AudioFormat(AudioFormat.CONTAINER_WAVE,
AudioFormat.CODEC_PCM_SIGNED, false, 16, 4 * 44100, 44100L, 2);

public PulseAudioAudioSink(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
public PulseAudioAudioSink(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler,
AudioSinkUtils audioSinkUtils) {
super(pulseaudioHandler, scheduler);
this.audioSinkUtils = audioSinkUtils;
}

@Override
public void process(@Nullable AudioStream audioStream)
throws UnsupportedAudioFormatException, UnsupportedAudioStreamException {
processAndComplete(audioStream);
}

@Override
public CompletableFuture<@Nullable Void> processAndComplete(@Nullable AudioStream audioStream) {
if (audioStream == null) {
return;
return CompletableFuture.completedFuture(null);
}
addClientCount();
try (ConvertedInputStream normalizedPCMStream = new ConvertedInputStream(audioStream)) {
Expand All @@ -75,18 +83,38 @@ public void process(@Nullable AudioStream audioStream)
if (clientSocketLocal != null) {
// send raw audio to the socket and to pulse audio
Instant start = Instant.now();
normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
if (normalizedPCMStream.getDuration() != -1) { // ensure, if the sound has a duration
if (normalizedPCMStream.getDuration() != -1) {
// ensure, if the sound has a duration
// that we let at least this time for the system to play
normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
Instant end = Instant.now();
long millisSecondTimedToSendAudioData = Duration.between(start, end).toMillis();
if (millisSecondTimedToSendAudioData < normalizedPCMStream.getDuration()) {
long timeToSleep = normalizedPCMStream.getDuration() - millisSecondTimedToSendAudioData;
logger.debug("Sleep time to let the system play sound : {}", timeToSleep);
Thread.sleep(timeToSleep);
CompletableFuture<@Nullable Void> soundPlayed = new CompletableFuture<>();
long timeToWait = normalizedPCMStream.getDuration() - millisSecondTimedToSendAudioData;
logger.debug("Some time to let the system play sound : {}", timeToWait);
scheduler.schedule(() -> soundPlayed.complete(null), timeToWait, TimeUnit.MILLISECONDS);
return soundPlayed;
} else {
return CompletableFuture.completedFuture(null);
}
} else {
// We have a second method available to guess the duration, and it is during transfer
Long timeStampEnd = audioSinkUtils.transferAndAnalyzeLength(normalizedPCMStream,
clientSocketLocal.getOutputStream(), TARGET_FORMAT);
CompletableFuture<@Nullable Void> soundPlayed = new CompletableFuture<>();
if (timeStampEnd != null) {
long now = System.nanoTime();
long timeToWait = timeStampEnd - now;
if (timeToWait > 0) {
scheduler.schedule(() -> soundPlayed.complete(null), timeToWait,
TimeUnit.NANOSECONDS);
}
return soundPlayed;
} else {
return CompletableFuture.completedFuture(null);
}
}
break;
}
} catch (IOException e) {
disconnect(); // disconnect force to clear connection in case of socket not cleanly shutdown
Expand All @@ -97,19 +125,34 @@ public void process(@Nullable AudioStream audioStream)
logger.warn(
"Error while trying to send audio to pulseaudio audio sink. Cannot connect to {}:{}, error: {}",
pulseaudioHandler.getHost(), port, e.getMessage());
break;
return CompletableFuture.completedFuture(null);
}
} catch (InterruptedException ie) {
logger.info("Interrupted during sink audio connection: {}", ie.getMessage());
break;
return CompletableFuture.completedFuture(null);
}
}
} catch (UnsupportedAudioFileException | IOException e) {
throw new UnsupportedAudioFormatException("Cannot send sound to the pulseaudio sink",
audioStream.getFormat(), e);
} catch (UnsupportedAudioFileException | UnsupportedAudioFormatException | IOException e) {
return CompletableFuture.failedFuture(new UnsupportedAudioFormatException(
"Cannot send sound to the pulseaudio sink", audioStream.getFormat(), e));
} finally {
minusClientCount();
// if the stream is not needed anymore, then we should call back the AudioStream to let it a chance
// to auto dispose.
if (audioStream instanceof Disposable disposableAudioStream) {
try {
disposableAudioStream.dispose();
} catch (IOException e) {
String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown";
if (logger.isDebugEnabled()) {
logger.debug("Cannot dispose of stream {}", fileName, e);
} else {
logger.warn("Cannot dispose of stream {}, reason {}", fileName, e.getMessage());
}
}
}
}
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.openhab.binding.pulseaudio.internal.discovery.PulseaudioDeviceDiscoveryService;
import org.openhab.binding.pulseaudio.internal.handler.PulseaudioBridgeHandler;
import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
import org.openhab.core.audio.utils.AudioSinkUtils;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.config.discovery.DiscoveryService;
import org.openhab.core.thing.Bridge;
Expand All @@ -39,6 +40,7 @@
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,6 +63,13 @@ public class PulseaudioHandlerFactory extends BaseThingHandlerFactory {

private PulseAudioBindingConfiguration configuration = new PulseAudioBindingConfiguration();

private AudioSinkUtils audioSinkUtils;

@Activate
public PulseaudioHandlerFactory(@Reference AudioSinkUtils audioSinkUtils) {
this.audioSinkUtils = audioSinkUtils;
}

@Override
public boolean supportsThingType(ThingTypeUID thingTypeUID) {
return SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID);
Expand Down Expand Up @@ -119,7 +128,7 @@ protected void removeHandler(ThingHandler thingHandler) {
registerDeviceDiscoveryService(handler);
return handler;
} else if (PulseaudioHandler.SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID)) {
return new PulseaudioHandler(thing, bundleContext);
return new PulseaudioHandler(thing, bundleContext, audioSinkUtils);
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.openhab.core.audio.AudioFormat;
import org.openhab.core.audio.AudioSink;
import org.openhab.core.audio.AudioSource;
import org.openhab.core.audio.utils.AudioSinkUtils;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.library.types.DecimalType;
import org.openhab.core.library.types.IncreaseDecreaseType;
Expand Down Expand Up @@ -89,9 +90,12 @@ public class PulseaudioHandler extends BaseThingHandler {

private final BundleContext bundleContext;

public PulseaudioHandler(Thing thing, BundleContext bundleContext) {
private AudioSinkUtils audioSinkUtils;

public PulseaudioHandler(Thing thing, BundleContext bundleContext, AudioSinkUtils audioSinkUtils) {
super(thing);
this.bundleContext = bundleContext;
this.audioSinkUtils = audioSinkUtils;
}

@Override
Expand Down Expand Up @@ -127,7 +131,7 @@ private void audioSinkSetup() {
return;
}
final PulseaudioHandler thisHandler = this;
PulseAudioAudioSink audioSink = new PulseAudioAudioSink(thisHandler, scheduler);
PulseAudioAudioSink audioSink = new PulseAudioAudioSink(thisHandler, scheduler, audioSinkUtils);
scheduler.submit(new Runnable() {
@Override
public void run() {
Expand Down

0 comments on commit 45e1034

Please sign in to comment.