Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix the extra retention problem in ReplaySubject #5892

Merged
merged 2 commits into from
Mar 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 77 additions & 6 deletions src/main/java/io/reactivex/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@

package io.reactivex.subjects;

import io.reactivex.annotations.Nullable;
import java.lang.reflect.Array;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;

import io.reactivex.Observer;
import io.reactivex.Scheduler;
import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.util.NotificationLite;
Expand Down Expand Up @@ -94,8 +93,9 @@
* in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()},
* {@link #getValues()} or {@link #getValues(Object[])}.
* <p>
* Note that due to concurrency requirements, a size-bounded {@code ReplaySubject} may hold strong references to more
* source emissions than specified.
* Note that due to concurrency requirements, a size- and time-bounded {@code ReplaySubject} may hold strong references to more
* source emissions than specified while it isn't terminated yet. Use the {@link #cleanupBuffer()} to allow
* such inaccessible items to be cleaned up by GC once no consumer references it anymore.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ReplaySubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and
Expand Down Expand Up @@ -415,6 +415,24 @@ public T getValue() {
return buffer.getValue();
}

/**
* Makes sure the item cached by the head node in a bounded
* ReplaySubject is released (as it is never part of a replay).
* <p>
* By default, live bounded buffers will remember one item before
* the currently receivable one to ensure subscribers can always
* receive a continuous sequence of items. A terminated ReplaySubject
* automatically releases this inaccessible item.
* <p>
* The method must be called sequentially, similar to the standard
* {@code onXXX} methods.
* @since 2.1.11 - experimental
*/
@Experimental
public void cleanupBuffer() {
buffer.trimHead();
}

/** An empty array to avoid allocation in getValues(). */
private static final Object[] EMPTY_ARRAY = new Object[0];

Expand Down Expand Up @@ -563,6 +581,12 @@ interface ReplayBuffer<T> {
* @return true if successful
*/
boolean compareAndSet(Object expected, Object next);

/**
* Make sure an old inaccessible head value is released
* in a bounded buffer.
*/
void trimHead();
}

static final class ReplayDisposable<T> extends AtomicInteger implements Disposable {
Expand Down Expand Up @@ -619,10 +643,16 @@ public void add(T value) {
@Override
public void addFinal(Object notificationLite) {
buffer.add(notificationLite);
trimHead();
size++;
done = true;
}

@Override
public void trimHead() {
// no-op in this type of buffer
}

@Override
@Nullable
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -839,9 +869,24 @@ public void addFinal(Object notificationLite) {
size++;
t.lazySet(n); // releases both the tail and size

trimHead();
done = true;
}

/**
* Replace a non-empty head node with an empty one to
* allow the GC of the inaccessible old value.
*/
@Override
public void trimHead() {
Node<Object> h = head;
if (h.value != null) {
Node<Object> n = new Node<Object>(null);
n.lazySet(h.get());
head = n;
}
}

@Override
@Nullable
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -1047,12 +1092,24 @@ void trimFinal() {
for (;;) {
TimedNode<Object> next = h.get();
if (next.get() == null) {
head = h;
if (h.value != null) {
TimedNode<Object> lasth = new TimedNode<Object>(null, 0L);
lasth.lazySet(h.get());
head = lasth;
} else {
head = h;
}
break;
}

if (next.time > limit) {
head = h;
if (h.value != null) {
TimedNode<Object> lasth = new TimedNode<Object>(null, 0L);
lasth.lazySet(h.get());
head = lasth;
} else {
head = h;
}
break;
}

Expand Down Expand Up @@ -1085,6 +1142,20 @@ public void addFinal(Object notificationLite) {
done = true;
}

/**
* Replace a non-empty head node with an empty one to
* allow the GC of the inaccessible old value.
*/
@Override
public void trimHead() {
TimedNode<Object> h = head;
if (h.value != null) {
TimedNode<Object> n = new TimedNode<Object>(null, 0);
n.lazySet(h.get());
head = n;
}
}

@Override
@Nullable
@SuppressWarnings("unchecked")
Expand Down
89 changes: 89 additions & 0 deletions src/test/java/io/reactivex/subjects/ReplaySubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.reactivex.functions.Function;
import io.reactivex.observers.*;
import io.reactivex.schedulers.*;
import io.reactivex.subjects.ReplaySubject.*;

public class ReplaySubjectTest extends SubjectTest<Integer> {

Expand Down Expand Up @@ -1184,4 +1185,92 @@ public void timedNoOutdatedData() {

source.test().assertResult();
}

@Test
public void noHeadRetentionCompleteSize() {
ReplaySubject<Integer> source = ReplaySubject.createWithSize(1);

source.onNext(1);
source.onNext(2);
source.onComplete();

SizeBoundReplayBuffer<Integer> buf = (SizeBoundReplayBuffer<Integer>)source.buffer;

assertNull(buf.head.value);

Object o = buf.head;

source.cleanupBuffer();

assertSame(o, buf.head);
}


@Test
public void noHeadRetentionSize() {
ReplaySubject<Integer> source = ReplaySubject.createWithSize(1);

source.onNext(1);
source.onNext(2);

SizeBoundReplayBuffer<Integer> buf = (SizeBoundReplayBuffer<Integer>)source.buffer;

assertNotNull(buf.head.value);

source.cleanupBuffer();

assertNull(buf.head.value);

Object o = buf.head;

source.cleanupBuffer();

assertSame(o, buf.head);
}

@Test
public void noHeadRetentionCompleteTime() {
ReplaySubject<Integer> source = ReplaySubject.createWithTime(1, TimeUnit.MINUTES, Schedulers.computation());

source.onNext(1);
source.onNext(2);
source.onComplete();

SizeAndTimeBoundReplayBuffer<Integer> buf = (SizeAndTimeBoundReplayBuffer<Integer>)source.buffer;

assertNull(buf.head.value);

Object o = buf.head;

source.cleanupBuffer();

assertSame(o, buf.head);
}

@Test
public void noHeadRetentionTime() {
TestScheduler sch = new TestScheduler();

ReplaySubject<Integer> source = ReplaySubject.createWithTime(1, TimeUnit.MILLISECONDS, sch);

source.onNext(1);

sch.advanceTimeBy(2, TimeUnit.MILLISECONDS);

source.onNext(2);

SizeAndTimeBoundReplayBuffer<Integer> buf = (SizeAndTimeBoundReplayBuffer<Integer>)source.buffer;

assertNotNull(buf.head.value);

source.cleanupBuffer();

assertNull(buf.head.value);

Object o = buf.head;

source.cleanupBuffer();

assertSame(o, buf.head);
}
}