From 6b4d1876f4a3e4d17481315fb84bf06e17209f6b Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 8 Jan 2020 17:30:11 +0200 Subject: [PATCH] fix #860 Add an API for disposing LoopResources with a specific quiet period and timeout --- src/docs/asciidoc/tcp-client.adoc | 2 +- src/docs/asciidoc/tcp-server.adoc | 2 +- src/docs/asciidoc/udp-client.adoc | 2 +- src/docs/asciidoc/udp-server.adoc | 2 +- src/main/java/reactor/netty/ReactorNetty.java | 11 ++++- .../reactor/netty/http/HttpResources.java | 22 ++++++++- .../netty/resources/DefaultLoopResources.java | 25 ++++++++--- .../netty/resources/LoopResources.java | 34 ++++++++++++++ .../java/reactor/netty/tcp/TcpResources.java | 45 +++++++++++++++++-- .../java/reactor/netty/udp/UdpResources.java | 44 +++++++++++++++++- .../reactor/netty/http/HttpResourcesTest.java | 3 +- .../reactor/netty/tcp/TcpResourcesTest.java | 3 +- 12 files changed, 175 insertions(+), 20 deletions(-) diff --git a/src/docs/asciidoc/tcp-client.adoc b/src/docs/asciidoc/tcp-client.adoc index f3327afb72..449ceb0f3a 100644 --- a/src/docs/asciidoc/tcp-client.adoc +++ b/src/docs/asciidoc/tcp-client.adoc @@ -269,7 +269,7 @@ The following listing shows the default configuration for the Event Loop Group: [source,java,indent=0] ./../../main/java/reactor/netty/ReactorNetty.java ---- -include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=75..95] +include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=76..105] ---- ==== diff --git a/src/docs/asciidoc/tcp-server.adoc b/src/docs/asciidoc/tcp-server.adoc index d59c37b512..897b6701dc 100644 --- a/src/docs/asciidoc/tcp-server.adoc +++ b/src/docs/asciidoc/tcp-server.adoc @@ -260,7 +260,7 @@ The default configuration for the `Event Loop Group` is the following: [source,java,indent=0] ./../../main/java/reactor/netty/ReactorNetty.java ---- -include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=75..95] +include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=76..105] ---- ==== diff --git a/src/docs/asciidoc/udp-client.adoc b/src/docs/asciidoc/udp-client.adoc index 5c053d14de..700c7576ea 100644 --- a/src/docs/asciidoc/udp-client.adoc +++ b/src/docs/asciidoc/udp-client.adoc @@ -277,7 +277,7 @@ The following listing shows the default configuration for the "`Event Loop Group [source,java,indent=0] ./../../main/java/reactor/netty/ReactorNetty.java ---- -include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=75..95] +include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=76..105] ---- ==== diff --git a/src/docs/asciidoc/udp-server.adoc b/src/docs/asciidoc/udp-server.adoc index fe5ab48ba4..acc46d6b3d 100644 --- a/src/docs/asciidoc/udp-server.adoc +++ b/src/docs/asciidoc/udp-server.adoc @@ -297,7 +297,7 @@ The default configuration for the "`Event Loop Group`" is the following: [source,java,indent=0] ./../../main/java/reactor/netty/ReactorNetty.java ---- -include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=75..95] +include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=76..105] ---- ==== diff --git a/src/main/java/reactor/netty/ReactorNetty.java b/src/main/java/reactor/netty/ReactorNetty.java index f3d7cb6fe8..94179a769a 100644 --- a/src/main/java/reactor/netty/ReactorNetty.java +++ b/src/main/java/reactor/netty/ReactorNetty.java @@ -87,7 +87,16 @@ public final class ReactorNetty { * (but with a minimum value of 4) */ public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount"; - + /** + * Default quite period that guarantees that the disposal of the underlying LoopResources + * will not happen, fallback to 2 seconds. + */ + public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod"; + /** + * Default maximum amount of time to wait until the disposal of the underlying LoopResources + * regardless if a task was submitted during the quiet period, fallback to 15 seconds. + */ + public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout"; /** * Default value whether the native transport (epoll, kqueue) will be preferred, diff --git a/src/main/java/reactor/netty/http/HttpResources.java b/src/main/java/reactor/netty/http/HttpResources.java index 224a948ceb..3c6f7850dd 100644 --- a/src/main/java/reactor/netty/http/HttpResources.java +++ b/src/main/java/reactor/netty/http/HttpResources.java @@ -16,6 +16,7 @@ package reactor.netty.http; +import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -88,14 +89,33 @@ public static void disposeLoopsAndConnections() { * Prepare to shutdown the global {@link HttpResources} without resetting them, * effectively cleaning up associated resources without creating new ones. This only * occurs when the returned {@link Mono} is subscribed to. + * The quiet period will be {@code 2s} and the timeout will be {@code 15s} * * @return a {@link Mono} triggering the {@link #disposeLoopsAndConnections()} when subscribed to. */ public static Mono disposeLoopsAndConnectionsLater() { + return disposeLoopsAndConnectionsLater(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD), + Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT)); + } + + /** + * Prepare to shutdown the global {@link HttpResources} without resetting them, + * effectively cleaning up associated resources without creating new ones. This only + * occurs when the returned {@link Mono} is subscribed to. + * It is guaranteed that the disposal of the underlying LoopResources will not happen before + * {@code quietPeriod} is over. If a task is submitted during the {@code quietPeriod}, + * it is guaranteed to be accepted and the {@code quietPeriod} will start over. + * + * @param quietPeriod the quiet period as described above + * @param timeout the maximum amount of time to wait until the disposal of the underlying + * LoopResources regardless if a task was submitted during the quiet period + * @return a {@link Mono} triggering the {@link #disposeLoopsAndConnections()} when subscribed to. + */ + public static Mono disposeLoopsAndConnectionsLater(Duration quietPeriod, Duration timeout) { return Mono.defer(() -> { HttpResources resources = httpResources.getAndSet(null); if (resources != null) { - return resources._disposeLater(); + return resources._disposeLater(quietPeriod, timeout); } return Mono.empty(); }); diff --git a/src/main/java/reactor/netty/resources/DefaultLoopResources.java b/src/main/java/reactor/netty/resources/DefaultLoopResources.java index 6eade0ab27..842f920b47 100644 --- a/src/main/java/reactor/netty/resources/DefaultLoopResources.java +++ b/src/main/java/reactor/netty/resources/DefaultLoopResources.java @@ -16,7 +16,9 @@ package reactor.netty.resources; +import java.time.Duration; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -92,8 +94,11 @@ public boolean isDisposed() { @Override @SuppressWarnings("unchecked") - public Mono disposeLater() { + public Mono disposeLater(Duration quietPeriod, Duration timeout) { return Mono.defer(() -> { + long quietPeriodMillis = quietPeriod.toMillis(); + long timeoutMillis = timeout.toMillis(); + EventLoopGroup serverLoopsGroup = serverLoops.get(); EventLoopGroup clientLoopsGroup = clientLoops.get(); EventLoopGroup serverSelectLoopsGroup = serverSelectLoops.get(); @@ -109,22 +114,28 @@ public Mono disposeLater() { Mono cnsrvlMono = Mono.empty(); if(running.compareAndSet(true, false)) { if (clientLoopsGroup != null) { - clMono = FutureMono.from((Future) clientLoopsGroup.shutdownGracefully()); + clMono = FutureMono.from((Future) clientLoopsGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); } if (serverSelectLoopsGroup != null) { - sslMono = FutureMono.from((Future) serverSelectLoopsGroup.shutdownGracefully()); + sslMono = FutureMono.from((Future) serverSelectLoopsGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); } if (serverLoopsGroup != null) { - slMono = FutureMono.from((Future) serverLoopsGroup.shutdownGracefully()); + slMono = FutureMono.from((Future) serverLoopsGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); } if(cacheNativeClientGroup != null){ - cnclMono = FutureMono.from((Future) cacheNativeClientGroup.shutdownGracefully()); + cnclMono = FutureMono.from((Future) cacheNativeClientGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); } if(cacheNativeSelectGroup != null){ - cnslMono = FutureMono.from((Future) cacheNativeSelectGroup.shutdownGracefully()); + cnslMono = FutureMono.from((Future) cacheNativeSelectGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); } if(cacheNativeServerGroup != null){ - cnsrvlMono = FutureMono.from((Future) cacheNativeServerGroup.shutdownGracefully()); + cnsrvlMono = FutureMono.from((Future) cacheNativeServerGroup.shutdownGracefully( + quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)); } } diff --git a/src/main/java/reactor/netty/resources/LoopResources.java b/src/main/java/reactor/netty/resources/LoopResources.java index ad311b84d2..5f1070d600 100644 --- a/src/main/java/reactor/netty/resources/LoopResources.java +++ b/src/main/java/reactor/netty/resources/LoopResources.java @@ -15,6 +15,7 @@ */ package reactor.netty.resources; +import java.time.Duration; import java.util.Objects; import io.netty.channel.Channel; @@ -61,6 +62,22 @@ public interface LoopResources extends Disposable { ReactorNetty.NATIVE, "true")); + /** + * Default quite period that guarantees that the disposal of the underlying LoopResources + * will not happen, fallback to 2 seconds. + */ + long DEFAULT_SHUTDOWN_QUIET_PERIOD = Long.parseLong(System.getProperty( + ReactorNetty.SHUTDOWN_QUIET_PERIOD, + "" + 2)); + + /** + * Default maximum amount of time to wait until the disposal of the underlying LoopResources + * regardless if a task was submitted during the quiet period, fallback to 15 seconds. + */ + long DEFAULT_SHUTDOWN_TIMEOUT = Long.parseLong(System.getProperty( + ReactorNetty.SHUTDOWN_TIMEOUT, + "" + 15)); + /** * Create a delegating {@link EventLoopGroup} which reuse local event loop if already * working @@ -240,10 +257,27 @@ default void dispose() { /** * Returns a Mono that triggers the disposal of the underlying LoopResources when subscribed to. + * The quiet period will be {@code 2s} and the timeout will be {@code 15s} * * @return a Mono representing the completion of the LoopResources disposal. **/ default Mono disposeLater() { + return disposeLater(Duration.ofSeconds(DEFAULT_SHUTDOWN_QUIET_PERIOD), + Duration.ofSeconds(DEFAULT_SHUTDOWN_TIMEOUT)); + } + + /** + * Returns a Mono that triggers the disposal of the underlying LoopResources when subscribed to. + * It is guaranteed that the disposal of the underlying LoopResources will not happen before + * {@code quietPeriod} is over. If a task is submitted during the {@code quietPeriod}, + * it is guaranteed to be accepted and the {@code quietPeriod} will start over. + * + * @param quietPeriod the quiet period as described above + * @param timeout the maximum amount of time to wait until the disposal of the underlying + * LoopResources regardless if a task was submitted during the quiet period + * @return a Mono representing the completion of the LoopResources disposal. + **/ + default Mono disposeLater(Duration quietPeriod, Duration timeout) { //noop default return Mono.empty(); } diff --git a/src/main/java/reactor/netty/tcp/TcpResources.java b/src/main/java/reactor/netty/tcp/TcpResources.java index 5f1f0d55ca..309344b305 100644 --- a/src/main/java/reactor/netty/tcp/TcpResources.java +++ b/src/main/java/reactor/netty/tcp/TcpResources.java @@ -17,6 +17,7 @@ package reactor.netty.tcp; import java.net.SocketAddress; +import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import javax.annotation.Nullable; @@ -98,14 +99,33 @@ public static void disposeLoopsAndConnections() { * Prepare to shutdown the global {@link TcpResources} without resetting them, * effectively cleaning up associated resources without creating new ones. This only * occurs when the returned {@link Mono} is subscribed to. + * The quiet period will be {@code 2s} and the timeout will be {@code 15s} * * @return a {@link Mono} triggering the {@link #disposeLoopsAndConnections()} when subscribed to. */ public static Mono disposeLoopsAndConnectionsLater() { + return disposeLoopsAndConnectionsLater(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD), + Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT)); + } + + /** + * Prepare to shutdown the global {@link TcpResources} without resetting them, + * effectively cleaning up associated resources without creating new ones. This only + * occurs when the returned {@link Mono} is subscribed to. + * It is guaranteed that the disposal of the underlying LoopResources will not happen before + * {@code quietPeriod} is over. If a task is submitted during the {@code quietPeriod}, + * it is guaranteed to be accepted and the {@code quietPeriod} will start over. + * + * @param quietPeriod the quiet period as described above + * @param timeout the maximum amount of time to wait until the disposal of the underlying + * LoopResources regardless if a task was submitted during the quiet period + * @return a {@link Mono} triggering the {@link #disposeLoopsAndConnections()} when subscribed to. + */ + public static Mono disposeLoopsAndConnectionsLater(Duration quietPeriod, Duration timeout) { return Mono.defer(() -> { TcpResources resources = tcpResources.getAndSet(null); if (resources != null) { - return resources._disposeLater(); + return resources._disposeLater(quietPeriod, timeout); } return Mono.empty(); }); @@ -147,11 +167,30 @@ protected void _dispose(){ /** * Dispose underlying resources in a listenable fashion. + * The quiet period will be {@code 2s} and the timeout will be {@code 15s} + * * @return the Mono that represents the end of disposal + * @deprecated Use {@link #_disposeLater(Duration, Duration)} */ + @Deprecated protected Mono _disposeLater() { - return Mono.when( - defaultLoops.disposeLater(), defaultProvider.disposeLater()); + return disposeLater(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD), + Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT)); + } + + /** + * Dispose underlying resources in a listenable fashion. + * It is guaranteed that the disposal of the underlying LoopResources will not happen before + * {@code quietPeriod} is over. If a task is submitted during the {@code quietPeriod}, + * it is guaranteed to be accepted and the {@code quietPeriod} will start over. + * + * @param quietPeriod the quiet period as described above + * @param timeout the maximum amount of time to wait until the disposal of the underlying + * LoopResources regardless if a task was submitted during the quiet period + * @return the Mono that represents the end of disposal + */ + protected Mono _disposeLater(Duration quietPeriod, Duration timeout) { + return Mono.when(defaultLoops.disposeLater(quietPeriod, timeout), defaultProvider.disposeLater()); } @Override diff --git a/src/main/java/reactor/netty/udp/UdpResources.java b/src/main/java/reactor/netty/udp/UdpResources.java index 437e542f05..5f2f4658fb 100644 --- a/src/main/java/reactor/netty/udp/UdpResources.java +++ b/src/main/java/reactor/netty/udp/UdpResources.java @@ -16,6 +16,7 @@ package reactor.netty.udp; +import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import javax.annotation.Nullable; @@ -86,14 +87,33 @@ public static void shutdown() { * Prepare to shutdown the global {@link UdpResources} without resetting them, * effectively cleaning up associated resources without creating new ones. This only * occurs when the returned {@link Mono} is subscribed to. + * The quiet period will be {@code 2s} and the timeout will be {@code 15s} * * @return a {@link Mono} triggering the {@link #shutdown()} when subscribed to. */ public static Mono shutdownLater() { + return shutdownLater(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD), + Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT)); + } + + /** + * Prepare to shutdown the global {@link UdpResources} without resetting them, + * effectively cleaning up associated resources without creating new ones. This only + * occurs when the returned {@link Mono} is subscribed to. + * It is guaranteed that the disposal of the underlying LoopResources will not happen before + * {@code quietPeriod} is over. If a task is submitted during the {@code quietPeriod}, + * it is guaranteed to be accepted and the {@code quietPeriod} will start over. + * + * @param quietPeriod the quiet period as described above + * @param timeout the maximum amount of time to wait until the disposal of the underlying + * LoopResources regardless if a task was submitted during the quiet period + * @return a {@link Mono} triggering the {@link #shutdown()} when subscribed to. + */ + public static Mono shutdownLater(Duration quietPeriod, Duration timeout) { return Mono.defer(() -> { UdpResources resources = udpResources.getAndSet(null); if (resources != null) { - return resources._disposeLater(); + return resources._disposeLater(quietPeriod, timeout); } return Mono.empty(); }); @@ -114,10 +134,30 @@ protected void _dispose(){ /** * Dispose underlying resources in a listenable fashion. + * The quiet period will be {@code 2s} and the timeout will be {@code 15s} + * * @return the Mono that represents the end of disposal + * @deprecated Use {@link #_disposeLater(Duration, Duration)} */ + @Deprecated protected Mono _disposeLater() { - return defaultLoops.disposeLater(); + return _disposeLater(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD), + Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT)); + } + + /** + * Dispose underlying resources in a listenable fashion. + * It is guaranteed that the disposal of the underlying LoopResources will not happen before + * {@code quietPeriod} is over. If a task is submitted during the {@code quietPeriod}, + * it is guaranteed to be accepted and the {@code quietPeriod} will start over. + * + * @param quietPeriod the quiet period as described above + * @param timeout the maximum amount of time to wait until the disposal of the underlying + * LoopResources regardless if a task was submitted during the quiet period + * @return the Mono that represents the end of disposal + */ + protected Mono _disposeLater(Duration quietPeriod, Duration timeout) { + return defaultLoops.disposeLater(quietPeriod, timeout); } @Override diff --git a/src/test/java/reactor/netty/http/HttpResourcesTest.java b/src/test/java/reactor/netty/http/HttpResourcesTest.java index 2f8b0ea44c..5759a78d19 100644 --- a/src/test/java/reactor/netty/http/HttpResourcesTest.java +++ b/src/test/java/reactor/netty/http/HttpResourcesTest.java @@ -15,6 +15,7 @@ */ package reactor.netty.http; +import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; import io.netty.bootstrap.Bootstrap; @@ -50,7 +51,7 @@ public EventLoopGroup onServer(boolean useNative) { } @Override - public Mono disposeLater() { + public Mono disposeLater(Duration quietPeriod, Duration timeout) { return Mono.empty().doOnSuccess(c -> loopDisposed.set(true)); } diff --git a/src/test/java/reactor/netty/tcp/TcpResourcesTest.java b/src/test/java/reactor/netty/tcp/TcpResourcesTest.java index 919a6af80b..0b8010fb9d 100644 --- a/src/test/java/reactor/netty/tcp/TcpResourcesTest.java +++ b/src/test/java/reactor/netty/tcp/TcpResourcesTest.java @@ -15,6 +15,7 @@ */ package reactor.netty.tcp; +import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,7 +57,7 @@ public EventLoopGroup onServer(boolean useNative) { } @Override - public Mono disposeLater() { + public Mono disposeLater(Duration quietPeriod, Duration timeout) { return Mono.empty().doOnSuccess(c -> loopDisposed.set(true)); }