Skip to content

Commit

Permalink
fix #860 Add an API for disposing LoopResources with a specific quiet…
Browse files Browse the repository at this point in the history
… period and timeout
  • Loading branch information
violetagg committed Jan 10, 2020
1 parent 01da92b commit 6b4d187
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/docs/asciidoc/tcp-client.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ The following listing shows the default configuration for the Event Loop Group:
[source,java,indent=0]
./../../main/java/reactor/netty/ReactorNetty.java
----
include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=75..95]
include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=76..105]
----
====

Expand Down
2 changes: 1 addition & 1 deletion src/docs/asciidoc/tcp-server.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ The default configuration for the `Event Loop Group` is the following:
[source,java,indent=0]
./../../main/java/reactor/netty/ReactorNetty.java
----
include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=75..95]
include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=76..105]
----
====

Expand Down
2 changes: 1 addition & 1 deletion src/docs/asciidoc/udp-client.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ The following listing shows the default configuration for the "`Event Loop Group
[source,java,indent=0]
./../../main/java/reactor/netty/ReactorNetty.java
----
include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=75..95]
include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=76..105]
----
====

Expand Down
2 changes: 1 addition & 1 deletion src/docs/asciidoc/udp-server.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ The default configuration for the "`Event Loop Group`" is the following:
[source,java,indent=0]
./../../main/java/reactor/netty/ReactorNetty.java
----
include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=75..95]
include::{sourcedir}/reactor/netty/ReactorNetty.java[lines=76..105]
----
====

Expand Down
11 changes: 10 additions & 1 deletion src/main/java/reactor/netty/ReactorNetty.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,16 @@ public final class ReactorNetty {
* (but with a minimum value of 4)
*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";

/**
* Default quite period that guarantees that the disposal of the underlying LoopResources
* will not happen, fallback to 2 seconds.
*/
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
* Default maximum amount of time to wait until the disposal of the underlying LoopResources
* regardless if a task was submitted during the quiet period, fallback to 15 seconds.
*/
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
Expand Down
22 changes: 21 additions & 1 deletion src/main/java/reactor/netty/http/HttpResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package reactor.netty.http;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -88,14 +89,33 @@ public static void disposeLoopsAndConnections() {
* Prepare to shutdown the global {@link HttpResources} without resetting them,
* effectively cleaning up associated resources without creating new ones. This only
* occurs when the returned {@link Mono} is subscribed to.
* The quiet period will be {@code 2s} and the timeout will be {@code 15s}
*
* @return a {@link Mono} triggering the {@link #disposeLoopsAndConnections()} when subscribed to.
*/
public static Mono<Void> disposeLoopsAndConnectionsLater() {
return disposeLoopsAndConnectionsLater(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD),
Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT));
}

/**
* Prepare to shutdown the global {@link HttpResources} without resetting them,
* effectively cleaning up associated resources without creating new ones. This only
* occurs when the returned {@link Mono} is subscribed to.
* It is guaranteed that the disposal of the underlying LoopResources will not happen before
* {@code quietPeriod} is over. If a task is submitted during the {@code quietPeriod},
* it is guaranteed to be accepted and the {@code quietPeriod} will start over.
*
* @param quietPeriod the quiet period as described above
* @param timeout the maximum amount of time to wait until the disposal of the underlying
* LoopResources regardless if a task was submitted during the quiet period
* @return a {@link Mono} triggering the {@link #disposeLoopsAndConnections()} when subscribed to.
*/
public static Mono<Void> disposeLoopsAndConnectionsLater(Duration quietPeriod, Duration timeout) {
return Mono.defer(() -> {
HttpResources resources = httpResources.getAndSet(null);
if (resources != null) {
return resources._disposeLater();
return resources._disposeLater(quietPeriod, timeout);
}
return Mono.empty();
});
Expand Down
25 changes: 18 additions & 7 deletions src/main/java/reactor/netty/resources/DefaultLoopResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package reactor.netty.resources;

import java.time.Duration;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -92,8 +94,11 @@ public boolean isDisposed() {

@Override
@SuppressWarnings("unchecked")
public Mono<Void> disposeLater() {
public Mono<Void> disposeLater(Duration quietPeriod, Duration timeout) {
return Mono.defer(() -> {
long quietPeriodMillis = quietPeriod.toMillis();
long timeoutMillis = timeout.toMillis();

EventLoopGroup serverLoopsGroup = serverLoops.get();
EventLoopGroup clientLoopsGroup = clientLoops.get();
EventLoopGroup serverSelectLoopsGroup = serverSelectLoops.get();
Expand All @@ -109,22 +114,28 @@ public Mono<Void> disposeLater() {
Mono<?> cnsrvlMono = Mono.empty();
if(running.compareAndSet(true, false)) {
if (clientLoopsGroup != null) {
clMono = FutureMono.from((Future) clientLoopsGroup.shutdownGracefully());
clMono = FutureMono.from((Future) clientLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if (serverSelectLoopsGroup != null) {
sslMono = FutureMono.from((Future) serverSelectLoopsGroup.shutdownGracefully());
sslMono = FutureMono.from((Future) serverSelectLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if (serverLoopsGroup != null) {
slMono = FutureMono.from((Future) serverLoopsGroup.shutdownGracefully());
slMono = FutureMono.from((Future) serverLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if(cacheNativeClientGroup != null){
cnclMono = FutureMono.from((Future) cacheNativeClientGroup.shutdownGracefully());
cnclMono = FutureMono.from((Future) cacheNativeClientGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if(cacheNativeSelectGroup != null){
cnslMono = FutureMono.from((Future) cacheNativeSelectGroup.shutdownGracefully());
cnslMono = FutureMono.from((Future) cacheNativeSelectGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if(cacheNativeServerGroup != null){
cnsrvlMono = FutureMono.from((Future) cacheNativeServerGroup.shutdownGracefully());
cnsrvlMono = FutureMono.from((Future) cacheNativeServerGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
}

Expand Down
34 changes: 34 additions & 0 deletions src/main/java/reactor/netty/resources/LoopResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package reactor.netty.resources;

import java.time.Duration;
import java.util.Objects;

import io.netty.channel.Channel;
Expand Down Expand Up @@ -61,6 +62,22 @@ public interface LoopResources extends Disposable {
ReactorNetty.NATIVE,
"true"));

/**
* Default quite period that guarantees that the disposal of the underlying LoopResources
* will not happen, fallback to 2 seconds.
*/
long DEFAULT_SHUTDOWN_QUIET_PERIOD = Long.parseLong(System.getProperty(
ReactorNetty.SHUTDOWN_QUIET_PERIOD,
"" + 2));

/**
* Default maximum amount of time to wait until the disposal of the underlying LoopResources
* regardless if a task was submitted during the quiet period, fallback to 15 seconds.
*/
long DEFAULT_SHUTDOWN_TIMEOUT = Long.parseLong(System.getProperty(
ReactorNetty.SHUTDOWN_TIMEOUT,
"" + 15));

/**
* Create a delegating {@link EventLoopGroup} which reuse local event loop if already
* working
Expand Down Expand Up @@ -240,10 +257,27 @@ default void dispose() {

/**
* Returns a Mono that triggers the disposal of the underlying LoopResources when subscribed to.
* The quiet period will be {@code 2s} and the timeout will be {@code 15s}
*
* @return a Mono representing the completion of the LoopResources disposal.
**/
default Mono<Void> disposeLater() {
return disposeLater(Duration.ofSeconds(DEFAULT_SHUTDOWN_QUIET_PERIOD),
Duration.ofSeconds(DEFAULT_SHUTDOWN_TIMEOUT));
}

/**
* Returns a Mono that triggers the disposal of the underlying LoopResources when subscribed to.
* It is guaranteed that the disposal of the underlying LoopResources will not happen before
* {@code quietPeriod} is over. If a task is submitted during the {@code quietPeriod},
* it is guaranteed to be accepted and the {@code quietPeriod} will start over.
*
* @param quietPeriod the quiet period as described above
* @param timeout the maximum amount of time to wait until the disposal of the underlying
* LoopResources regardless if a task was submitted during the quiet period
* @return a Mono representing the completion of the LoopResources disposal.
**/
default Mono<Void> disposeLater(Duration quietPeriod, Duration timeout) {
//noop default
return Mono.empty();
}
Expand Down
45 changes: 42 additions & 3 deletions src/main/java/reactor/netty/tcp/TcpResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package reactor.netty.tcp;

import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -98,14 +99,33 @@ public static void disposeLoopsAndConnections() {
* Prepare to shutdown the global {@link TcpResources} without resetting them,
* effectively cleaning up associated resources without creating new ones. This only
* occurs when the returned {@link Mono} is subscribed to.
* The quiet period will be {@code 2s} and the timeout will be {@code 15s}
*
* @return a {@link Mono} triggering the {@link #disposeLoopsAndConnections()} when subscribed to.
*/
public static Mono<Void> disposeLoopsAndConnectionsLater() {
return disposeLoopsAndConnectionsLater(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD),
Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT));
}

/**
* Prepare to shutdown the global {@link TcpResources} without resetting them,
* effectively cleaning up associated resources without creating new ones. This only
* occurs when the returned {@link Mono} is subscribed to.
* It is guaranteed that the disposal of the underlying LoopResources will not happen before
* {@code quietPeriod} is over. If a task is submitted during the {@code quietPeriod},
* it is guaranteed to be accepted and the {@code quietPeriod} will start over.
*
* @param quietPeriod the quiet period as described above
* @param timeout the maximum amount of time to wait until the disposal of the underlying
* LoopResources regardless if a task was submitted during the quiet period
* @return a {@link Mono} triggering the {@link #disposeLoopsAndConnections()} when subscribed to.
*/
public static Mono<Void> disposeLoopsAndConnectionsLater(Duration quietPeriod, Duration timeout) {
return Mono.defer(() -> {
TcpResources resources = tcpResources.getAndSet(null);
if (resources != null) {
return resources._disposeLater();
return resources._disposeLater(quietPeriod, timeout);
}
return Mono.empty();
});
Expand Down Expand Up @@ -147,11 +167,30 @@ protected void _dispose(){

/**
* Dispose underlying resources in a listenable fashion.
* The quiet period will be {@code 2s} and the timeout will be {@code 15s}
*
* @return the Mono that represents the end of disposal
* @deprecated Use {@link #_disposeLater(Duration, Duration)}
*/
@Deprecated
protected Mono<Void> _disposeLater() {
return Mono.when(
defaultLoops.disposeLater(), defaultProvider.disposeLater());
return disposeLater(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD),
Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT));
}

/**
* Dispose underlying resources in a listenable fashion.
* It is guaranteed that the disposal of the underlying LoopResources will not happen before
* {@code quietPeriod} is over. If a task is submitted during the {@code quietPeriod},
* it is guaranteed to be accepted and the {@code quietPeriod} will start over.
*
* @param quietPeriod the quiet period as described above
* @param timeout the maximum amount of time to wait until the disposal of the underlying
* LoopResources regardless if a task was submitted during the quiet period
* @return the Mono that represents the end of disposal
*/
protected Mono<Void> _disposeLater(Duration quietPeriod, Duration timeout) {
return Mono.when(defaultLoops.disposeLater(quietPeriod, timeout), defaultProvider.disposeLater());
}

@Override
Expand Down
44 changes: 42 additions & 2 deletions src/main/java/reactor/netty/udp/UdpResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package reactor.netty.udp;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -86,14 +87,33 @@ public static void shutdown() {
* Prepare to shutdown the global {@link UdpResources} without resetting them,
* effectively cleaning up associated resources without creating new ones. This only
* occurs when the returned {@link Mono} is subscribed to.
* The quiet period will be {@code 2s} and the timeout will be {@code 15s}
*
* @return a {@link Mono} triggering the {@link #shutdown()} when subscribed to.
*/
public static Mono<Void> shutdownLater() {
return shutdownLater(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD),
Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT));
}

/**
* Prepare to shutdown the global {@link UdpResources} without resetting them,
* effectively cleaning up associated resources without creating new ones. This only
* occurs when the returned {@link Mono} is subscribed to.
* It is guaranteed that the disposal of the underlying LoopResources will not happen before
* {@code quietPeriod} is over. If a task is submitted during the {@code quietPeriod},
* it is guaranteed to be accepted and the {@code quietPeriod} will start over.
*
* @param quietPeriod the quiet period as described above
* @param timeout the maximum amount of time to wait until the disposal of the underlying
* LoopResources regardless if a task was submitted during the quiet period
* @return a {@link Mono} triggering the {@link #shutdown()} when subscribed to.
*/
public static Mono<Void> shutdownLater(Duration quietPeriod, Duration timeout) {
return Mono.defer(() -> {
UdpResources resources = udpResources.getAndSet(null);
if (resources != null) {
return resources._disposeLater();
return resources._disposeLater(quietPeriod, timeout);
}
return Mono.empty();
});
Expand All @@ -114,10 +134,30 @@ protected void _dispose(){

/**
* Dispose underlying resources in a listenable fashion.
* The quiet period will be {@code 2s} and the timeout will be {@code 15s}
*
* @return the Mono that represents the end of disposal
* @deprecated Use {@link #_disposeLater(Duration, Duration)}
*/
@Deprecated
protected Mono<Void> _disposeLater() {
return defaultLoops.disposeLater();
return _disposeLater(Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD),
Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT));
}

/**
* Dispose underlying resources in a listenable fashion.
* It is guaranteed that the disposal of the underlying LoopResources will not happen before
* {@code quietPeriod} is over. If a task is submitted during the {@code quietPeriod},
* it is guaranteed to be accepted and the {@code quietPeriod} will start over.
*
* @param quietPeriod the quiet period as described above
* @param timeout the maximum amount of time to wait until the disposal of the underlying
* LoopResources regardless if a task was submitted during the quiet period
* @return the Mono that represents the end of disposal
*/
protected Mono<Void> _disposeLater(Duration quietPeriod, Duration timeout) {
return defaultLoops.disposeLater(quietPeriod, timeout);
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/reactor/netty/http/HttpResourcesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package reactor.netty.http;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;

import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -50,7 +51,7 @@ public EventLoopGroup onServer(boolean useNative) {
}

@Override
public Mono<Void> disposeLater() {
public Mono<Void> disposeLater(Duration quietPeriod, Duration timeout) {
return Mono.<Void>empty().doOnSuccess(c -> loopDisposed.set(true));
}

Expand Down
3 changes: 2 additions & 1 deletion src/test/java/reactor/netty/tcp/TcpResourcesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package reactor.netty.tcp;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -56,7 +57,7 @@ public EventLoopGroup onServer(boolean useNative) {
}

@Override
public Mono<Void> disposeLater() {
public Mono<Void> disposeLater(Duration quietPeriod, Duration timeout) {
return Mono.<Void>empty().doOnSuccess(c -> loopDisposed.set(true));
}

Expand Down

0 comments on commit 6b4d187

Please sign in to comment.