Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds AsyncCloseable #21991

Merged
merged 9 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -298,8 +299,8 @@ private void setAndClearChannel() {
}

private void close(T channel) {
if (channel instanceof AsyncAutoCloseable) {
((AsyncAutoCloseable) channel).closeAsync().subscribe();
if (channel instanceof AsyncCloseable) {
((AsyncCloseable) channel).closeAsync().subscribe();
} else if (channel instanceof AutoCloseable) {
try {
((AutoCloseable) channel).close();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
Expand Down Expand Up @@ -37,7 +38,7 @@
/**
* Handles receiving events from Event Hubs service and translating them to proton-j messages.
*/
public class ReactorReceiver implements AmqpReceiveLink, AsyncAutoCloseable, AutoCloseable {
public class ReactorReceiver implements AmqpReceiveLink, AsyncCloseable, AutoCloseable {
private final String entityPath;
private final Receiver receiver;
private final ReceiveLinkHandler handler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.OperationCancelledException;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
Expand Down Expand Up @@ -62,7 +63,7 @@
/**
* Handles scheduling and transmitting events through proton-j to Event Hubs service.
*/
class ReactorSender implements AmqpSendLink, AsyncAutoCloseable, AutoCloseable {
class ReactorSender implements AmqpSendLink, AsyncCloseable, AutoCloseable {
private final String entityPath;
private final Sender sender;
private final SendLinkHandler handler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.UnsignedLong;
Expand Down Expand Up @@ -50,7 +51,7 @@
* Represents a bidirectional link between the message broker and the client. Allows client to send a request to the
* broker and receive the associated response.
*/
public class RequestResponseChannel implements AsyncAutoCloseable {
public class RequestResponseChannel implements AsyncCloseable {
private final ConcurrentSkipListMap<UnsignedLong, MonoSink<Message>> unconfirmedSends =
new ConcurrentSkipListMap<>();
private final AtomicBoolean hasError = new AtomicBoolean();
Expand Down
3 changes: 3 additions & 0 deletions sdk/core/azure-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 1.17.0-beta.1 (Unreleased)

### Features Added

- Added `AsyncCloseable`

## 1.16.0 (2021-05-07)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.util;

import reactor.core.publisher.Mono;

/**
* Interface for close operations that are asynchronous.
*
* <p><strong>Asynchronously closing a class</strong></p>
* <p>In the snippet below, we have a long-lived {@code NetworkResource} class. There are some operations such
* as closing {@literal I/O}. Instead of returning a sync {@code close()}, we use {@code closeAsync()} so users'
* programs don't block waiting for this operation to complete.</p>
*
* {@codesnippet com.azure.core.util.AsyncCloseable.closeAsync}
*/
public interface AsyncCloseable {
/**
* Begins the close operation. If one is in progress, will return that existing close operation. If the close
* operation is unsuccessful, the Mono completes with an error.
*
* @return A Mono representing the close operation. If the close operation is unsuccessful, the Mono completes with
* an error.
*/
Mono<Void> closeAsync();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.util;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;

/**
* Code snippets for {@link AsyncCloseable}.
*/
public class AsyncCloseableJavaDocCodeSnippet {
public void asyncResource() throws IOException {
// BEGIN: com.azure.core.util.AsyncCloseable.closeAsync
conniey marked this conversation as resolved.
Show resolved Hide resolved
NetworkResource resource = new NetworkResource();
resource.longRunningDownload("https://longdownload.com")
.subscribe(
byteBuffer -> System.out.println("Buffer received: " + byteBuffer),
error -> System.err.printf("Error occurred while downloading: %s%n", error),
() -> System.out.println("Completed download operation."));

System.out.println("Press enter to stop downloading.");
System.in.read();

// We block here because it is the end of the main Program function. A real-life program may chain this
// with some other close operations like save download/program state, etc.
resource.closeAsync().block();
// END: com.azure.core.util.AsyncCloseable.closeAsync
}

/**
* A long lived network resource.
*/
static class NetworkResource implements AsyncCloseable {
private final AtomicBoolean isClosed = new AtomicBoolean();
private final Sinks.Empty<Void> closeMono = Sinks.empty();

/**
* Downloads a resource.
*
* @param url URL for the download.
*
* @return A stream of bytes.
*/
Flux<ByteBuffer> longRunningDownload(String url) {
final byte[] bytes = url.getBytes(StandardCharsets.UTF_8);

// Does nothing real but it represents taking from this possibly infinite Flux until
// the closeMono emits a signal.
return Flux.fromStream(IntStream.range(0, bytes.length)
.mapToObj(index -> ByteBuffer.wrap(bytes)))
.takeUntilOther(closeMono.asMono());
}

@Override
public Mono<Void> closeAsync() {
// If the close operation has started, then
if (isClosed.getAndSet(true)) {
return closeMono.asMono();
}

return startAsyncClose().then(closeMono.asMono());
}

private Mono<Void> startAsyncClose() {
return Mono.delay(Duration.ofSeconds(10)).then()
.doOnError(error -> closeMono.emitError(error, Sinks.EmitFailureHandler.FAIL_FAST))
.doOnSuccess(unused -> closeMono.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST));
}
}
}
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AsyncAutoCloseable;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -599,8 +599,8 @@ private void disposeReceiver(AmqpReceiveLink link) {
}

try {
if (link instanceof AsyncAutoCloseable) {
((AsyncAutoCloseable) link).closeAsync().subscribe();
if (link instanceof AsyncCloseable) {
((AsyncCloseable) link).closeAsync().subscribe();
} else {
link.dispose();
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/azure-messaging-servicebus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.AsyncAutoCloseable;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.LockContainer;
Expand All @@ -29,7 +29,7 @@
/**
* Represents an session that is received when "any" session is accepted from the service.
*/
class ServiceBusSessionReceiver implements AsyncAutoCloseable, AutoCloseable {
class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final LockContainer<OffsetDateTime> lockContainer;
private final AtomicReference<OffsetDateTime> sessionLockedUntil = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AsyncAutoCloseable;
import com.azure.core.util.AsyncCloseable;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import reactor.core.publisher.Mono;

Expand All @@ -13,7 +13,7 @@
/**
* Represents an AMQP receive link.
*/
public interface ServiceBusReceiveLink extends AmqpReceiveLink, AsyncAutoCloseable {
public interface ServiceBusReceiveLink extends AmqpReceiveLink, AsyncCloseable {
/**
* Gets the session id associated with the link.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AsyncAutoCloseable;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
Expand Down Expand Up @@ -585,8 +585,8 @@ private void disposeReceiver(AmqpReceiveLink link) {
}

try {
if (link instanceof AsyncAutoCloseable) {
((AsyncAutoCloseable) link).closeAsync().subscribe();
if (link instanceof AsyncCloseable) {
((AsyncCloseable) link).closeAsync().subscribe();
} else {
link.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyAuthenticationType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.implementation.AsyncAutoCloseable;
import com.azure.core.test.TestBase;
import com.azure.core.test.TestMode;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.Configuration;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
Expand Down Expand Up @@ -335,8 +335,8 @@ protected void dispose(AutoCloseable... closeables) {
continue;
}

if (closeable instanceof AsyncAutoCloseable) {
final Mono<Void> voidMono = ((AsyncAutoCloseable) closeable).closeAsync();
if (closeable instanceof AsyncCloseable) {
final Mono<Void> voidMono = ((AsyncCloseable) closeable).closeAsync();
closeableMonos.add(voidMono);

voidMono.subscribe();
Expand Down