diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index 21b1b1d39c..7a02482b4b 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -566,6 +566,8 @@ public void dispose() { // the others had non-zero. By removing this 'blocking' child, the others // are now free to receive events parent.manageRequests(); + // make sure the last known node is not retained + index = null; } } /** @@ -824,6 +826,7 @@ public final void replay(InnerSubscription output) { } for (;;) { if (output.isDisposed()) { + output.index = null; return; } @@ -864,6 +867,7 @@ public final void replay(InnerSubscription output) { break; } if (output.isDisposed()) { + output.index = null; return; } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index 89db184d6b..197ada9706 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -453,6 +453,8 @@ public void dispose() { cancelled = true; // remove this from the parent parent.remove(this); + // make sure the last known node is not retained + index = null; } } /** @@ -686,6 +688,7 @@ public final void replay(InnerDisposable output) { for (;;) { if (output.isDisposed()) { + output.index = null; return; } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java index dcf7eea347..b3bea718bd 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java @@ -17,6 +17,7 @@ import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.lang.management.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -1976,4 +1977,67 @@ public void currentDisposedWhenConnecting() { assertFalse(fr.current.get().isDisposed()); } + + @Test + public void noBoundedRetentionViaThreadLocal() throws Exception { + Flowable source = Flowable.range(1, 200) + .map(new Function() { + @Override + public byte[] apply(Integer v) throws Exception { + return new byte[1024 * 1024]; + } + }) + .replay(new Function, Publisher>() { + @Override + public Publisher apply(final Flowable f) throws Exception { + return f.take(1) + .concatMap(new Function>() { + @Override + public Publisher apply(byte[] v) throws Exception { + return f; + } + }); + } + }, 1) + .takeLast(1) + ; + + System.out.println("Bounded Replay Leak check: Wait before GC"); + Thread.sleep(1000); + + System.out.println("Bounded Replay Leak check: GC"); + System.gc(); + + Thread.sleep(500); + + final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + long initial = memHeap.getUsed(); + + System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0); + + final AtomicLong after = new AtomicLong(); + + source.subscribe(new Consumer() { + @Override + public void accept(byte[] v) throws Exception { + System.out.println("Bounded Replay Leak check: Wait before GC 2"); + Thread.sleep(1000); + + System.out.println("Bounded Replay Leak check: GC 2"); + System.gc(); + + Thread.sleep(500); + + after.set(memoryMXBean.getHeapMemoryUsage().getUsed()); + } + }); + + System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0); + + if (initial + 100 * 1024 * 1024 < after.get()) { + Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0) + + " -> " + after.get() / 1024.0 / 1024.0); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index 2592361cd6..40d09f4f33 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -17,9 +17,10 @@ import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.lang.management.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.junit.*; import org.mockito.InOrder; @@ -1713,4 +1714,66 @@ public void noHeadRetentionTime() { assertSame(o, buf.get()); } -} + + @Test + public void noBoundedRetentionViaThreadLocal() throws Exception { + Observable source = Observable.range(1, 200) + .map(new Function() { + @Override + public byte[] apply(Integer v) throws Exception { + return new byte[1024 * 1024]; + } + }) + .replay(new Function, Observable>() { + @Override + public Observable apply(final Observable o) throws Exception { + return o.take(1) + .concatMap(new Function>() { + @Override + public Observable apply(byte[] v) throws Exception { + return o; + } + }); + } + }, 1) + .takeLast(1) + ; + + System.out.println("Bounded Replay Leak check: Wait before GC"); + Thread.sleep(1000); + + System.out.println("Bounded Replay Leak check: GC"); + System.gc(); + + Thread.sleep(500); + + final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + long initial = memHeap.getUsed(); + + System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0); + + final AtomicLong after = new AtomicLong(); + + source.subscribe(new Consumer() { + @Override + public void accept(byte[] v) throws Exception { + System.out.println("Bounded Replay Leak check: Wait before GC 2"); + Thread.sleep(1000); + + System.out.println("Bounded Replay Leak check: GC 2"); + System.gc(); + + Thread.sleep(500); + + after.set(memoryMXBean.getHeapMemoryUsage().getUsed()); + } + }); + + System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0); + + if (initial + 100 * 1024 * 1024 < after.get()) { + Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0) + + " -> " + after.get() / 1024.0 / 1024.0); + } + }} diff --git a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java index 810e80d9e5..cf930b062a 100644 --- a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java +++ b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java @@ -17,18 +17,19 @@ import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.lang.management.*; import java.util.Arrays; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; -import org.junit.Test; +import org.junit.*; import org.mockito.*; import org.reactivestreams.*; import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.TestException; -import io.reactivex.functions.Function; +import io.reactivex.functions.*; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.processors.ReplayProcessor.*; import io.reactivex.schedulers.*; @@ -1692,4 +1693,62 @@ public void noHeadRetentionTime() { public void invalidRequest() { TestHelper.assertBadRequestReported(ReplayProcessor.create()); } + + @Test + public void noBoundedRetentionViaThreadLocal() throws Exception { + final ReplayProcessor rp = ReplayProcessor.createWithSize(1); + + Flowable source = rp.take(1) + .concatMap(new Function>() { + @Override + public Publisher apply(byte[] v) throws Exception { + return rp; + } + }) + .takeLast(1) + ; + + System.out.println("Bounded Replay Leak check: Wait before GC"); + Thread.sleep(1000); + + System.out.println("Bounded Replay Leak check: GC"); + System.gc(); + + Thread.sleep(500); + + final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + long initial = memHeap.getUsed(); + + System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0); + + final AtomicLong after = new AtomicLong(); + + source.subscribe(new Consumer() { + @Override + public void accept(byte[] v) throws Exception { + System.out.println("Bounded Replay Leak check: Wait before GC 2"); + Thread.sleep(1000); + + System.out.println("Bounded Replay Leak check: GC 2"); + System.gc(); + + Thread.sleep(500); + + after.set(memoryMXBean.getHeapMemoryUsage().getUsed()); + } + }); + + for (int i = 0; i < 200; i++) { + rp.onNext(new byte[1024 * 1024]); + } + rp.onComplete(); + + System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0); + + if (initial + 100 * 1024 * 1024 < after.get()) { + Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0) + + " -> " + after.get() / 1024.0 / 1024.0); + } + } } diff --git a/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java b/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java index 2326241cfd..aa91270226 100644 --- a/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java +++ b/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java @@ -17,17 +17,18 @@ import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.lang.management.*; import java.util.Arrays; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; -import org.junit.Test; +import org.junit.*; import org.mockito.*; import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; -import io.reactivex.functions.Function; +import io.reactivex.functions.*; import io.reactivex.observers.*; import io.reactivex.schedulers.*; import io.reactivex.subjects.ReplaySubject.*; @@ -1284,4 +1285,62 @@ public void noHeadRetentionTime() { assertSame(o, buf.head); } + + @Test + public void noBoundedRetentionViaThreadLocal() throws Exception { + final ReplaySubject rs = ReplaySubject.createWithSize(1); + + Observable source = rs.take(1) + .concatMap(new Function>() { + @Override + public Observable apply(byte[] v) throws Exception { + return rs; + } + }) + .takeLast(1) + ; + + System.out.println("Bounded Replay Leak check: Wait before GC"); + Thread.sleep(1000); + + System.out.println("Bounded Replay Leak check: GC"); + System.gc(); + + Thread.sleep(500); + + final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + long initial = memHeap.getUsed(); + + System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0); + + final AtomicLong after = new AtomicLong(); + + source.subscribe(new Consumer() { + @Override + public void accept(byte[] v) throws Exception { + System.out.println("Bounded Replay Leak check: Wait before GC 2"); + Thread.sleep(1000); + + System.out.println("Bounded Replay Leak check: GC 2"); + System.gc(); + + Thread.sleep(500); + + after.set(memoryMXBean.getHeapMemoryUsage().getUsed()); + } + }); + + for (int i = 0; i < 200; i++) { + rs.onNext(new byte[1024 * 1024]); + } + rs.onComplete(); + + System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0); + + if (initial + 100 * 1024 * 1024 < after.get()) { + Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0) + + " -> " + after.get() / 1024.0 / 1024.0); + } + } }