Skip to content

Commit

Permalink
Add potential thread leak fix
Browse files Browse the repository at this point in the history
  • Loading branch information
freyacodes committed Apr 8, 2024
1 parent 4232036 commit f602c47
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* Abstract base for all audio tracks with an executor
*/
public abstract class BaseAudioTrack implements InternalAudioTrack {
private final PrimordialAudioTrackExecutor initialExecutor;
private final AtomicBoolean executorAssigned;
private volatile AudioTrackExecutor activeExecutor;
private final AtomicReference<AudioTrackExecutor> activeExecutor;
protected final AudioTrackInfo trackInfo;
protected final AtomicLong accurateDuration;
private volatile Object userData;
Expand All @@ -29,7 +30,7 @@ public abstract class BaseAudioTrack implements InternalAudioTrack {
public BaseAudioTrack(AudioTrackInfo trackInfo) {
this.initialExecutor = new PrimordialAudioTrackExecutor(trackInfo);
this.executorAssigned = new AtomicBoolean();
this.activeExecutor = null;
this.activeExecutor = new AtomicReference<>();
this.trackInfo = trackInfo;
this.accurateDuration = new AtomicLong();
}
Expand All @@ -40,21 +41,25 @@ public void assignExecutor(AudioTrackExecutor executor, boolean applyPrimordialS
if (applyPrimordialState) {
initialExecutor.applyStateToExecutor(executor);
}
activeExecutor = executor;
activeExecutor.set(executor);
} else {
throw new IllegalStateException("Cannot play the same instance of a track twice, use track.makeClone().");
}
}

@Override
public AudioTrackExecutor getActiveExecutor() {
AudioTrackExecutor executor = activeExecutor;
AudioTrackExecutor executor = activeExecutor.get();
return executor != null ? executor : initialExecutor;
}

@Override
public void stop() {
getActiveExecutor().stop();
AudioTrackExecutor executor = activeExecutor.getAndSet(null);
if (executor == null) return;

initialExecutor.setPosition(executor.getPosition());
executor.stop();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public interface AudioTrackExecutor extends AudioFrameProvider {
void execute(TrackStateListener listener);

/**
* Stop playing the track, terminating the thread that is filling the frame buffer.
* Stop playing the track, terminating the thread that is filling the frame buffer. Subsequent playback requires
* a new executor.
*/
void stop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class LocalAudioTrackExecutor implements AudioTrackExecutor {
private final boolean useSeekGhosting;
private final AudioFrameBuffer frameBuffer;
private final AtomicReference<Thread> playingThread = new AtomicReference<>();
private final AtomicBoolean queuedStop = new AtomicBoolean(false);
private final AtomicBoolean disposedOf = new AtomicBoolean(false);
private final AtomicLong queuedSeek = new AtomicLong(-1);
private final AtomicLong lastFrameTimecode = new AtomicLong(0);
private final AtomicReference<AudioTrackState> state = new AtomicReference<>(AudioTrackState.INACTIVE);
Expand All @@ -56,7 +56,7 @@ public LocalAudioTrackExecutor(InternalAudioTrack audioTrack, AudioConfiguration

this.audioTrack = audioTrack;
AudioDataFormat currentFormat = configuration.getOutputFormat();
this.frameBuffer = configuration.getFrameBufferFactory().create(bufferDuration, currentFormat, queuedStop);
this.frameBuffer = configuration.getFrameBufferFactory().create(bufferDuration, currentFormat, disposedOf);
this.processingContext = new AudioProcessingContext(configuration, frameBuffer, playerOptions, currentFormat);
this.useSeekGhosting = useSeekGhosting;
}
Expand Down Expand Up @@ -92,7 +92,15 @@ public void execute(TrackStateListener listener) {
log.debug("Cleared a stray interrupt.");
}

if (playingThread.compareAndSet(null, Thread.currentThread())) {
synchronized (actionSynchronizer) {
if (disposedOf.get()) {
log.warn("Attempt to execute executor that has been disposed of");
return;
}
}

boolean wasUpdated = playingThread.compareAndSet(null, Thread.currentThread());
if (wasUpdated) {
log.debug("Starting to play track {} locally with listener {}", audioTrack.getInfo().identifier, listener);

state.set(AudioTrackState.LOADING);
Expand All @@ -105,7 +113,7 @@ public void execute(TrackStateListener listener) {
// Temporarily clear the interrupted status so it would not disrupt listener methods.
interrupt = findInterrupt(e);

if (interrupt != null && checkStopped()) {
if (interrupt != null) {
log.debug("Track {} was interrupted outside of execution loop.", audioTrack.getIdentifier());
} else {
frameBuffer.setTerminateOnEmpty();
Expand Down Expand Up @@ -140,31 +148,18 @@ public void execute(TrackStateListener listener) {
@Override
public void stop() {
synchronized (actionSynchronizer) {
disposedOf.set(true);
Thread thread = playingThread.get();

if (thread != null) {
log.debug("Requesting stop for track {}", audioTrack.getIdentifier());

queuedStop.compareAndSet(false, true);
thread.interrupt();
} else {
log.debug("Tried to stop track {} which is not playing.", audioTrack.getIdentifier());
}
}
}

/**
* @return True if the track has been scheduled to stop and then clears the scheduled stop bit.
*/
public boolean checkStopped() {
if (queuedStop.compareAndSet(true, false)) {
state.set(AudioTrackState.STOPPING);
return true;
}

return false;
}

/**
* Wait until all the frames from the frame buffer have been consumed. Keeps the buffering thread alive to keep it
* interruptible for seeking until buffer is empty.
Expand All @@ -176,24 +171,6 @@ public void waitOnEnd() throws InterruptedException {
frameBuffer.waitForTermination();
}

/**
* Interrupt the buffering thread, either stop or seek should have been set beforehand.
*
* @return True if there was a thread to interrupt.
*/
public boolean interrupt() {
synchronized (actionSynchronizer) {
Thread thread = playingThread.get();

if (thread != null) {
thread.interrupt();
return true;
}

return false;
}
}

@Override
public long getPosition() {
long seek = queuedSeek.get();
Expand Down Expand Up @@ -336,7 +313,7 @@ private void interruptForSeek() {
private boolean handlePlaybackInterrupt(InterruptedException interruption, SeekExecutor seekExecutor) {
Thread.interrupted();

if (checkStopped()) {
if (disposedOf.get()) {
markerTracker.trigger(STOPPED);
return false;
}
Expand All @@ -345,7 +322,7 @@ private boolean handlePlaybackInterrupt(InterruptedException interruption, SeekE

if (seekResult != SeekResult.NO_SEEK) {
// Double-check, might have received a stop request while seeking
if (checkStopped()) {
if (disposedOf.get()) {
markerTracker.trigger(STOPPED);
return false;
} else {
Expand Down

0 comments on commit f602c47

Please sign in to comment.