From f602c4794f80d01ee1b57326aabd50097f992a86 Mon Sep 17 00:00:00 2001 From: Freya Arbjerg Date: Mon, 8 Apr 2024 13:20:45 +0200 Subject: [PATCH] Add potential thread leak fix --- .../lavaplayer/track/BaseAudioTrack.java | 15 ++++-- .../track/playback/AudioTrackExecutor.java | 3 +- .../playback/LocalAudioTrackExecutor.java | 53 ++++++------------- 3 files changed, 27 insertions(+), 44 deletions(-) diff --git a/main/src/main/java/com/sedmelluq/discord/lavaplayer/track/BaseAudioTrack.java b/main/src/main/java/com/sedmelluq/discord/lavaplayer/track/BaseAudioTrack.java index be44fe8f..5b8e9932 100644 --- a/main/src/main/java/com/sedmelluq/discord/lavaplayer/track/BaseAudioTrack.java +++ b/main/src/main/java/com/sedmelluq/discord/lavaplayer/track/BaseAudioTrack.java @@ -11,6 +11,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** * Abstract base for all audio tracks with an executor @@ -18,7 +19,7 @@ public abstract class BaseAudioTrack implements InternalAudioTrack { private final PrimordialAudioTrackExecutor initialExecutor; private final AtomicBoolean executorAssigned; - private volatile AudioTrackExecutor activeExecutor; + private final AtomicReference activeExecutor; protected final AudioTrackInfo trackInfo; protected final AtomicLong accurateDuration; private volatile Object userData; @@ -29,7 +30,7 @@ public abstract class BaseAudioTrack implements InternalAudioTrack { public BaseAudioTrack(AudioTrackInfo trackInfo) { this.initialExecutor = new PrimordialAudioTrackExecutor(trackInfo); this.executorAssigned = new AtomicBoolean(); - this.activeExecutor = null; + this.activeExecutor = new AtomicReference<>(); this.trackInfo = trackInfo; this.accurateDuration = new AtomicLong(); } @@ -40,7 +41,7 @@ public void assignExecutor(AudioTrackExecutor executor, boolean applyPrimordialS if (applyPrimordialState) { initialExecutor.applyStateToExecutor(executor); } - activeExecutor = executor; + activeExecutor.set(executor); } else { throw new IllegalStateException("Cannot play the same instance of a track twice, use track.makeClone()."); } @@ -48,13 +49,17 @@ public void assignExecutor(AudioTrackExecutor executor, boolean applyPrimordialS @Override public AudioTrackExecutor getActiveExecutor() { - AudioTrackExecutor executor = activeExecutor; + AudioTrackExecutor executor = activeExecutor.get(); return executor != null ? executor : initialExecutor; } @Override public void stop() { - getActiveExecutor().stop(); + AudioTrackExecutor executor = activeExecutor.getAndSet(null); + if (executor == null) return; + + initialExecutor.setPosition(executor.getPosition()); + executor.stop(); } @Override diff --git a/main/src/main/java/com/sedmelluq/discord/lavaplayer/track/playback/AudioTrackExecutor.java b/main/src/main/java/com/sedmelluq/discord/lavaplayer/track/playback/AudioTrackExecutor.java index e88a255e..77c4f590 100644 --- a/main/src/main/java/com/sedmelluq/discord/lavaplayer/track/playback/AudioTrackExecutor.java +++ b/main/src/main/java/com/sedmelluq/discord/lavaplayer/track/playback/AudioTrackExecutor.java @@ -21,7 +21,8 @@ public interface AudioTrackExecutor extends AudioFrameProvider { void execute(TrackStateListener listener); /** - * Stop playing the track, terminating the thread that is filling the frame buffer. + * Stop playing the track, terminating the thread that is filling the frame buffer. Subsequent playback requires + * a new executor. */ void stop(); diff --git a/main/src/main/java/com/sedmelluq/discord/lavaplayer/track/playback/LocalAudioTrackExecutor.java b/main/src/main/java/com/sedmelluq/discord/lavaplayer/track/playback/LocalAudioTrackExecutor.java index 980f7f0e..a6ea5f0b 100644 --- a/main/src/main/java/com/sedmelluq/discord/lavaplayer/track/playback/LocalAudioTrackExecutor.java +++ b/main/src/main/java/com/sedmelluq/discord/lavaplayer/track/playback/LocalAudioTrackExecutor.java @@ -33,7 +33,7 @@ public class LocalAudioTrackExecutor implements AudioTrackExecutor { private final boolean useSeekGhosting; private final AudioFrameBuffer frameBuffer; private final AtomicReference playingThread = new AtomicReference<>(); - private final AtomicBoolean queuedStop = new AtomicBoolean(false); + private final AtomicBoolean disposedOf = new AtomicBoolean(false); private final AtomicLong queuedSeek = new AtomicLong(-1); private final AtomicLong lastFrameTimecode = new AtomicLong(0); private final AtomicReference state = new AtomicReference<>(AudioTrackState.INACTIVE); @@ -56,7 +56,7 @@ public LocalAudioTrackExecutor(InternalAudioTrack audioTrack, AudioConfiguration this.audioTrack = audioTrack; AudioDataFormat currentFormat = configuration.getOutputFormat(); - this.frameBuffer = configuration.getFrameBufferFactory().create(bufferDuration, currentFormat, queuedStop); + this.frameBuffer = configuration.getFrameBufferFactory().create(bufferDuration, currentFormat, disposedOf); this.processingContext = new AudioProcessingContext(configuration, frameBuffer, playerOptions, currentFormat); this.useSeekGhosting = useSeekGhosting; } @@ -92,7 +92,15 @@ public void execute(TrackStateListener listener) { log.debug("Cleared a stray interrupt."); } - if (playingThread.compareAndSet(null, Thread.currentThread())) { + synchronized (actionSynchronizer) { + if (disposedOf.get()) { + log.warn("Attempt to execute executor that has been disposed of"); + return; + } + } + + boolean wasUpdated = playingThread.compareAndSet(null, Thread.currentThread()); + if (wasUpdated) { log.debug("Starting to play track {} locally with listener {}", audioTrack.getInfo().identifier, listener); state.set(AudioTrackState.LOADING); @@ -105,7 +113,7 @@ public void execute(TrackStateListener listener) { // Temporarily clear the interrupted status so it would not disrupt listener methods. interrupt = findInterrupt(e); - if (interrupt != null && checkStopped()) { + if (interrupt != null) { log.debug("Track {} was interrupted outside of execution loop.", audioTrack.getIdentifier()); } else { frameBuffer.setTerminateOnEmpty(); @@ -140,12 +148,11 @@ public void execute(TrackStateListener listener) { @Override public void stop() { synchronized (actionSynchronizer) { + disposedOf.set(true); Thread thread = playingThread.get(); if (thread != null) { log.debug("Requesting stop for track {}", audioTrack.getIdentifier()); - - queuedStop.compareAndSet(false, true); thread.interrupt(); } else { log.debug("Tried to stop track {} which is not playing.", audioTrack.getIdentifier()); @@ -153,18 +160,6 @@ public void stop() { } } - /** - * @return True if the track has been scheduled to stop and then clears the scheduled stop bit. - */ - public boolean checkStopped() { - if (queuedStop.compareAndSet(true, false)) { - state.set(AudioTrackState.STOPPING); - return true; - } - - return false; - } - /** * Wait until all the frames from the frame buffer have been consumed. Keeps the buffering thread alive to keep it * interruptible for seeking until buffer is empty. @@ -176,24 +171,6 @@ public void waitOnEnd() throws InterruptedException { frameBuffer.waitForTermination(); } - /** - * Interrupt the buffering thread, either stop or seek should have been set beforehand. - * - * @return True if there was a thread to interrupt. - */ - public boolean interrupt() { - synchronized (actionSynchronizer) { - Thread thread = playingThread.get(); - - if (thread != null) { - thread.interrupt(); - return true; - } - - return false; - } - } - @Override public long getPosition() { long seek = queuedSeek.get(); @@ -336,7 +313,7 @@ private void interruptForSeek() { private boolean handlePlaybackInterrupt(InterruptedException interruption, SeekExecutor seekExecutor) { Thread.interrupted(); - if (checkStopped()) { + if (disposedOf.get()) { markerTracker.trigger(STOPPED); return false; } @@ -345,7 +322,7 @@ private boolean handlePlaybackInterrupt(InterruptedException interruption, SeekE if (seekResult != SeekResult.NO_SEEK) { // Double-check, might have received a stop request while seeking - if (checkStopped()) { + if (disposedOf.get()) { markerTracker.trigger(STOPPED); return false; } else {