diff --git a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml
index 743abfc4fa083..5457436b8cafe 100755
--- a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml
+++ b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml
@@ -1235,6 +1235,12 @@
+
+
+
+
+
+
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java
index 321f254f0e1a1..93ff9afe934aa 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java
@@ -32,6 +32,7 @@
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.StoreResult;
import com.azure.cosmos.implementation.directconnectivity.Uri;
+import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
import com.azure.cosmos.implementation.patch.PatchOperation;
import com.azure.cosmos.implementation.query.QueryInfo;
@@ -216,6 +217,17 @@ public static RequestTimeline getRequestTimeline(E e
return e.getRequestTimeline();
}
+ @Warning(value = INTERNAL_USE_ONLY_WARNING)
+ public static E setChannelAcquisitionTimeline(E e, RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
+ e.setChannelAcquisitionTimeline(channelAcquisitionTimeline);
+ return e;
+ }
+
+ @Warning(value = INTERNAL_USE_ONLY_WARNING)
+ public static RntbdChannelAcquisitionTimeline getChannelAcqusitionTimeline(E e) {
+ return e.getChannelAcquisitionTimeline();
+ }
+
@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static E setChannelTaskQueueSize(E e, int value) {
e.setRntbdChannelTaskQueueSize(value);
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java
index 8ebb80c36817c..ff83afa920f5a 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java
@@ -60,6 +60,7 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -444,6 +445,7 @@ public CosmosPagedFlux queryItems(String query, Class classType) {
*/
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public Mono openConnectionsAndInitCaches() {
+
if(isInitialized.compareAndSet(false, true)) {
return this.getFeedRanges().flatMap(feedRanges -> {
List>> fluxList = new ArrayList<>();
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java
index c5d31e888a5ec..5005791479ab1 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java
@@ -11,6 +11,7 @@
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.directconnectivity.Uri;
+import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -51,6 +52,7 @@ public class CosmosException extends AzureException {
private CosmosDiagnostics cosmosDiagnostics;
private RequestTimeline requestTimeline;
+ private RntbdChannelAcquisitionTimeline channelAcquisitionTimeline;
private CosmosError cosmosError;
private int rntbdChannelTaskQueueSize;
@@ -385,6 +387,14 @@ void setRequestTimeline(RequestTimeline requestTimeline) {
this.requestTimeline = requestTimeline;
}
+ RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() {
+ return this.channelAcquisitionTimeline;
+ }
+
+ void setChannelAcquisitionTimeline(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
+ this.channelAcquisitionTimeline = channelAcquisitionTimeline;
+ }
+
void setResourceAddress(String resourceAddress) {
this.resourceAddress = resourceAddress;
}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java
index 46da32ba61a3a..d6244ed266182 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java
@@ -229,6 +229,7 @@ public Mono invokeStoreAsync(final Uri addressUri, final RxDocume
response.setRequestPayloadLength(request.getContentLength());
response.setRntbdChannelTaskQueueSize(record.channelTaskQueueLength());
response.setRntbdPendingRequestSize(record.pendingRequestQueueSize());
+ response.setChannelAcquisitionTimeline(record.getChannelAcquisitionTimeline());
}
})).onErrorMap(throwable -> {
@@ -262,6 +263,7 @@ public Mono invokeStoreAsync(final Uri addressUri, final RxDocume
BridgeInternal.setRntbdPendingRequestQueueSize(cosmosException, record.pendingRequestQueueSize());
BridgeInternal.setChannelTaskQueueSize(cosmosException, record.channelTaskQueueLength());
BridgeInternal.setSendingRequestStarted(cosmosException, record.hasSendingRequestStarted());
+ BridgeInternal.setChannelAcquisitionTimeline(cosmosException, record.getChannelAcquisitionTimeline());
return cosmosException;
});
@@ -417,6 +419,9 @@ public static final class Options {
@JsonIgnore()
private final UserAgentContainer userAgent;
+ @JsonProperty()
+ private final boolean channelAcquisitionContextEnabled;
+
// endregion
// region Constructors
@@ -445,6 +450,7 @@ private Options(final Builder builder) {
this.shutdownTimeout = builder.shutdownTimeout;
this.threadCount = builder.threadCount;
this.userAgent = builder.userAgent;
+ this.channelAcquisitionContextEnabled = builder.channelAcquisitionContextEnabled;
this.connectTimeout = builder.connectTimeout == null
? builder.requestTimeout
@@ -472,6 +478,7 @@ private Options(final ConnectionPolicy connectionPolicy) {
this.shutdownTimeout = Duration.ofSeconds(15L);
this.threadCount = 2 * Runtime.getRuntime().availableProcessors();
this.userAgent = new UserAgentContainer();
+ this.channelAcquisitionContextEnabled = true;
}
// endregion
@@ -554,6 +561,8 @@ public UserAgentContainer userAgent() {
return this.userAgent;
}
+ public boolean isChannelAcquisitionContextEnabled() { return this.channelAcquisitionContextEnabled; }
+
// endregion
// region Methods
@@ -695,6 +704,7 @@ public static class Builder {
private Duration shutdownTimeout;
private int threadCount;
private UserAgentContainer userAgent;
+ private boolean channelAcquisitionContextEnabled;
// endregion
@@ -723,6 +733,7 @@ public Builder(ConnectionPolicy connectionPolicy) {
this.shutdownTimeout = DEFAULT_OPTIONS.shutdownTimeout;
this.threadCount = DEFAULT_OPTIONS.threadCount;
this.userAgent = DEFAULT_OPTIONS.userAgent;
+ this.channelAcquisitionContextEnabled = DEFAULT_OPTIONS.channelAcquisitionContextEnabled;
}
// endregion
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java
index 2e1740d9c76cb..8b09ab664a536 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java
@@ -7,6 +7,7 @@
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.RequestTimeline;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
+import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +29,7 @@ public class StoreResponse {
private int pendingRequestQueueSize;
private int requestPayloadLength;
private RequestTimeline requestTimeline;
+ private RntbdChannelAcquisitionTimeline channelAcquisitionTimeline;
private int rntbdChannelTaskQueueSize;
private RntbdEndpointStatistics rntbdEndpointStatistics;
private int rntbdRequestLength;
@@ -170,6 +172,14 @@ RequestTimeline getRequestTimeline() {
return this.requestTimeline;
}
+ void setChannelAcquisitionTimeline(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
+ this.channelAcquisitionTimeline = channelAcquisitionTimeline;
+ }
+
+ RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() {
+ return this.channelAcquisitionTimeline;
+ }
+
void setEndpointStatistics(RntbdEndpointStatistics rntbdEndpointStatistics) {
this.rntbdEndpointStatistics = rntbdEndpointStatistics;
}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java
index 702c3774a4855..cd06187e75107 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java
@@ -208,6 +208,13 @@ public void serialize(StoreResult storeResult,
jsonGenerator.writeObjectField("transportRequestTimeline", storeResult.storeResponse != null ?
storeResult.storeResponse.getRequestTimeline() :
storeResult.exception != null ? BridgeInternal.getRequestTimeline(storeResult.exception) : null);
+
+ this.writeNonNullObjectField(
+ jsonGenerator,
+ "transportRequestChannelAcquisitionContext",
+ storeResult.storeResponse != null ? storeResult.storeResponse.getChannelAcquisitionTimeline() :
+ storeResult.exception != null? BridgeInternal.getChannelAcqusitionTimeline(storeResult.exception) : null);
+
jsonGenerator.writeObjectField("rntbdRequestLengthInBytes", storeResult.storeResponse != null ?
storeResult.storeResponse.getRntbdRequestLength() : BridgeInternal.getRntbdRequestLength(storeResult.exception));
jsonGenerator.writeObjectField("rntbdResponseLengthInBytes", storeResult.storeResponse != null ?
@@ -225,5 +232,13 @@ public void serialize(StoreResult storeResult,
jsonGenerator.writeEndObject();
}
+
+ private void writeNonNullObjectField(JsonGenerator jsonGenerator, String fieldName, Object object) throws IOException {
+ if (object == null) {
+ return;
+ }
+
+ jsonGenerator.writeObjectField(fieldName, object);
+ }
}
}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/ChannelPromiseWithExpiryTime.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/ChannelPromiseWithExpiryTime.java
index 19823e2bb99ca..4ab41117b76f0 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/ChannelPromiseWithExpiryTime.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/ChannelPromiseWithExpiryTime.java
@@ -17,13 +17,22 @@
class ChannelPromiseWithExpiryTime implements Promise {
private final Promise channelPromise;
private final long expiryTimeInNanos;
+ private final RntbdChannelAcquisitionTimeline channelAcquisitionTimeline;
public ChannelPromiseWithExpiryTime(Promise channelPromise, long expiryTimeInNanos) {
+ this(channelPromise, expiryTimeInNanos, null);
+ }
+
+ public ChannelPromiseWithExpiryTime(
+ Promise channelPromise,
+ long expiryTimeInNanos,
+ RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
checkNotNull(channelPromise, "channelPromise must not be null");
checkNotNull(expiryTimeInNanos, "expiryTimeInNanos must not be null");
this.channelPromise = channelPromise;
this.expiryTimeInNanos = expiryTimeInNanos;
+ this.channelAcquisitionTimeline = channelAcquisitionTimeline;
}
public long getExpiryTimeInNanos() {
@@ -172,4 +181,8 @@ public Promise sync() throws InterruptedException {
public Promise syncUninterruptibly() {
return this.channelPromise.syncUninterruptibly();
}
-}
\ No newline at end of file
+
+ public RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() {
+ return this.channelAcquisitionTimeline;
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEvent.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEvent.java
new file mode 100644
index 0000000000000..4be91e871404c
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEvent.java
@@ -0,0 +1,68 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.implementation.directconnectivity.rntbd;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+
+@JsonSerialize(using = RntbdChannelAcquisitionEvent.RntbdChannelAcquisitionEventJsonSerializer.class)
+public class RntbdChannelAcquisitionEvent {
+ private final Instant createdTime;
+ private final RntbdChannelAcquisitionEventType eventType;
+ private volatile Instant completeTime;
+
+ public RntbdChannelAcquisitionEvent(RntbdChannelAcquisitionEventType eventType, Instant createdTime) {
+ this.eventType = eventType;
+ this.createdTime = createdTime;
+ }
+
+ public Instant getCreatedTime() {
+ return createdTime;
+ }
+
+ public RntbdChannelAcquisitionEventType getEventType() {
+ return eventType;
+ }
+
+ public Instant getCompleteTime() {
+ return completeTime;
+ }
+
+ public void setCompleteTime(Instant completeTime) {
+ this.completeTime = completeTime;
+ }
+
+ public void complete(Instant completeTime) {
+ this.completeTime = completeTime;
+ }
+
+ public void addDetail(Object detail) {}
+
+ public static void addDetail(RntbdChannelAcquisitionEvent event, Object detail) {
+ if (event != null) {
+ event.addDetail(detail);
+ }
+ }
+
+ public static class RntbdChannelAcquisitionEventJsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer {
+ @Override
+ public void serialize(RntbdChannelAcquisitionEvent event,
+ JsonGenerator writer,
+ SerializerProvider serializerProvider) throws IOException {
+ writer.writeStartObject();
+
+ writer.writeStringField(event.eventType.toString(), event.createdTime.toString());
+ if (event.completeTime != null) {
+ writer.writeNumberField("durationInMicroSec",Duration.between(event.createdTime, event.completeTime).toNanos()/1000L);
+ }
+
+ writer.writeEndObject();
+ }
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEventType.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEventType.java
new file mode 100644
index 0000000000000..d67a5dad3ad04
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEventType.java
@@ -0,0 +1,22 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.implementation.directconnectivity.rntbd;
+
+public enum RntbdChannelAcquisitionEventType {
+ ATTEMPT_TO_POLL_CHANNEL("poll"),
+ ADD_TO_PENDING_QUEUE("pending"),
+ PENDING_TIME_OUT("pendingTimeout"),
+ ATTEMPT_TO_CREATE_NEW_CHANNEL("startNew"),
+ ATTEMPT_TO_CREATE_NEW_CHANNEL_COMPLETE("completeNew");
+
+ private String name;
+ RntbdChannelAcquisitionEventType(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString(){
+ return name;
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionTimeline.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionTimeline.java
new file mode 100644
index 0000000000000..4692a64dbe6c3
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionTimeline.java
@@ -0,0 +1,66 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.implementation.directconnectivity.rntbd;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RntbdChannelAcquisitionTimeline {
+ private static final Logger logger = LoggerFactory.getLogger(RntbdChannelAcquisitionTimeline.class);
+ private final List events;
+ private volatile RntbdChannelAcquisitionEvent currentEvent;
+
+ public RntbdChannelAcquisitionTimeline() {
+ this.events = new ArrayList<>();
+ }
+
+ public List getEvents() {
+ return events;
+ }
+
+ public static RntbdChannelAcquisitionEvent startNewEvent(
+ RntbdChannelAcquisitionTimeline timeline,
+ RntbdChannelAcquisitionEventType eventType) {
+
+ if (timeline != null) {
+ RntbdChannelAcquisitionEvent newEvent = new RntbdChannelAcquisitionEvent(eventType, Instant.now());
+ timeline.addNewEvent(newEvent);
+
+ return newEvent;
+ }
+ return null;
+ }
+
+ public static RntbdPollChannelEvent startNewPollEvent(
+ RntbdChannelAcquisitionTimeline timeline,
+ int availableChannels,
+ int acquiredChannels) {
+
+ if (timeline != null) {
+ RntbdPollChannelEvent newEvent = new RntbdPollChannelEvent(availableChannels, acquiredChannels, Instant.now());
+ timeline.addNewEvent(newEvent);
+ return newEvent;
+ }
+
+ return null;
+ }
+
+ private void addNewEvent(RntbdChannelAcquisitionEvent event) {
+ if (this.currentEvent != null) {
+ this.currentEvent.complete(event.getCreatedTime());
+ }
+ this.events.add(event);
+ this.currentEvent = event;
+ }
+
+ public static void addDetailsToLastEvent(RntbdChannelAcquisitionTimeline timeline, Object detail) {
+ if (timeline != null && timeline.currentEvent != null){
+ RntbdChannelAcquisitionEvent.addDetail(timeline.currentEvent, detail);
+ }
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelState.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelState.java
new file mode 100644
index 0000000000000..34aa51d710a7e
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelState.java
@@ -0,0 +1,69 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.implementation.directconnectivity.rntbd;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.io.IOException;
+
+@JsonSerialize(using = RntbdChannelState.RntbdChannelStateJsonSerializer.class)
+public class RntbdChannelState {
+ public static final RntbdChannelState NULL_REQUEST_MANAGER = new RntbdChannelState(State.NULL_REQUEST_MANAGER, 0);
+ public static final RntbdChannelState CLOSED = new RntbdChannelState(State.CLOSED, 0);
+
+ private final int pendingRequests;
+ private final State state;
+
+ public RntbdChannelState(State state, int pendingRequests) {
+ this.state = state;
+ this.pendingRequests = pendingRequests;
+ }
+
+ public static RntbdChannelState ok(int pendingRequests) {
+ return new RntbdChannelState(State.OK, pendingRequests);
+ }
+
+ public static RntbdChannelState pendingLimit(int pendingRequests) {
+ return new RntbdChannelState(State.PENDING_LIMIT, pendingRequests);
+ }
+
+ public static RntbdChannelState contextNegotiationPending(int pendingRequests) {
+ return new RntbdChannelState(State.CONTEXT_NEGOTIATION_PENDING, pendingRequests);
+ }
+
+ public boolean isOk() {
+ return this.state == State.OK;
+ }
+
+ public static class RntbdChannelStateJsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer {
+ @Override
+ public void serialize(RntbdChannelState channelState,
+ JsonGenerator writer,
+ SerializerProvider serializerProvider) throws IOException {
+ writer.writeStartObject();
+ writer.writeNumberField(channelState.state.toString(), channelState.pendingRequests);
+ writer.writeEndObject();
+ }
+ }
+
+ enum State{
+ OK("ok"),
+ CLOSED("closed"),
+ NULL_REQUEST_MANAGER("nullRequestManager"),
+ PENDING_LIMIT("pendingLimit"),
+ CONTEXT_NEGOTIATION_PENDING("contextNegotiationPending");
+
+ private String value;
+ State(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString(){
+ return value;
+ }
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java
index 4636d4a5c7171..66b4487540ab9 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java
@@ -237,6 +237,9 @@ protected void initChannel(final Channel channel) throws Exception {
@Override
public void onTimeout(AcquireListener task) {
task.originalPromise.setFailure(ACQUISITION_TIMEOUT);
+ RntbdChannelAcquisitionTimeline.startNewEvent(
+ task.originalPromise.getChannelAcquisitionTimeline(),
+ RntbdChannelAcquisitionEventType.PENDING_TIME_OUT);
}
};
@@ -421,6 +424,13 @@ public Future acquire() {
System.nanoTime() + this.acquisitionTimeoutInNanos));
}
+ public Future acquire(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
+ return this.acquire(new ChannelPromiseWithExpiryTime(
+ this.bootstrap.config().group().next().newPromise(),
+ System.nanoTime() + this.acquisitionTimeoutInNanos,
+ channelAcquisitionTimeline));
+ }
+
/**
* Acquire a {@link Channel channel} from the current {@link RntbdClientChannelPool pool}.
*
@@ -588,20 +598,23 @@ public String toString() {
*
* @param promise the promise of a {@link Channel channel}.
*
- * @see #isChannelServiceable(Channel)
+ * @see #getChannelState(Channel) (Channel)
* @see AcquireTimeoutTask
*/
private void acquireChannel(final ChannelPromiseWithExpiryTime promise) {
this.ensureInEventLoop();
+ reportIssueUnless(logger, promise != null, this, "Channel promise should not be null");
+ RntbdChannelAcquisitionTimeline channelAcquisitionTimeline = promise.getChannelAcquisitionTimeline();
+
if (this.isClosed()) {
promise.setFailure(POOL_CLOSED_ON_ACQUIRE);
return;
}
try {
- Channel candidate = this.pollChannel();
+ Channel candidate = this.pollChannel(channelAcquisitionTimeline);
if (candidate != null) {
@@ -624,6 +637,11 @@ private void acquireChannel(final ChannelPromiseWithExpiryTime promise) {
// If our connection attempt fails, notifyChannelConnect will call us again
final Promise anotherPromise = this.newChannelPromiseForToBeEstablishedChannel(promise);
+
+ RntbdChannelAcquisitionTimeline.startNewEvent(
+ channelAcquisitionTimeline,
+ RntbdChannelAcquisitionEventType.ATTEMPT_TO_CREATE_NEW_CHANNEL);
+
final ChannelFuture future = this.bootstrap.clone().attr(POOL_KEY, this).connect();
if (future.isDone()) {
@@ -653,9 +671,14 @@ private void acquireChannel(final ChannelPromiseWithExpiryTime promise) {
// we accept the risk of reusing the channel even if more than maxPendingRequests are
// queued - by picking the channel with the least number of outstanding requests we load
// balance reasonably
- if (pendingRequestCount < pendingRequestCountMin && isChannelServiceable(channel)) {
- pendingRequestCountMin = pendingRequestCount;
- candidate = channel;
+ if (pendingRequestCount < pendingRequestCountMin) {
+ RntbdChannelState channelState = this.getChannelState(channel);
+ RntbdChannelAcquisitionTimeline.addDetailsToLastEvent(channelAcquisitionTimeline, channelState);
+
+ if (channelState.isOk()) {
+ pendingRequestCountMin = pendingRequestCount;
+ candidate = channel;
+ }
}
}
}
@@ -669,7 +692,10 @@ private void acquireChannel(final ChannelPromiseWithExpiryTime promise) {
// we pick the first available channel to avoid the additional cost of load balancing
// as long as the load is lower than the load factor threshold above.
- if (isChannelServiceable(channel)) {
+ RntbdChannelState channelState = this.getChannelState(channel);
+ RntbdChannelAcquisitionTimeline.addDetailsToLastEvent(channelAcquisitionTimeline, channelState);
+
+ if (channelState.isOk()) {
if (this.availableChannels.remove(channel)) {
this.doAcquireChannel(promise, channel);
return;
@@ -714,6 +740,10 @@ private void addTaskToPendingAcquisitionQueue(ChannelPromiseWithExpiryTime promi
if (!this.pendingAcquisitions.offer(acquireTask)) {
promise.setFailure(TOO_MANY_PENDING_ACQUISITIONS);
+ } else {
+ RntbdChannelAcquisitionTimeline.startNewEvent(
+ promise.getChannelAcquisitionTimeline(),
+ RntbdChannelAcquisitionEventType.ADD_TO_PENDING_QUEUE);
}
}
}
@@ -874,7 +904,7 @@ private void doClose() {
for (; ; ) {
// will remove from available channels
- final Channel channel = this.pollChannel();
+ final Channel channel = this.pollChannel(null);
if (channel == null) {
break;
}
@@ -908,22 +938,6 @@ private void ensureInEventLoop() {
Thread.currentThread());
}
- /**
- * {@code true} if the given {@link Channel channel} is serviceable; {@code false} otherwise.
- *
- * A serviceable channel is one that is open, has an {@link RntbdContext RNTBD context}, and has fewer than {@link
- * #maxRequestsPerChannel} requests in its pipeline. An inactive channel will not have a {@link RntbdRequestManager
- * request manager}. Hence, this method first checks that the channel's request manager is non-null.
- *
- * @param channel the channel to check.
- *
- * @return {@code true} if the given {@link Channel channel} is serviceable; {@code false} otherwise.
- */
- private boolean isChannelServiceable(final Channel channel) {
- final RntbdRequestManager manager = channel.pipeline().get(RntbdRequestManager.class);
- return manager != null && manager.isServiceable(this.maxRequestsPerChannel) && channel.isOpen();
- }
-
/**
* Creates a new {@link Channel channel} {@link Promise promise} that completes on a dedicated
* {@link EventExecutor executor} to avoid spamming the {@link RntbdClientChannelPool pool}'s
@@ -967,7 +981,7 @@ private ChannelPromiseWithExpiryTime createNewChannelPromise(
listener.acquired();
anotherPromise.addListener(listener);
- return new ChannelPromiseWithExpiryTime(anotherPromise, promise.getExpiryTimeInNanos());
+ return new ChannelPromiseWithExpiryTime(anotherPromise, promise.getExpiryTimeInNanos(), promise.getChannelAcquisitionTimeline());
}
private void newTimeout(
@@ -1101,6 +1115,12 @@ private void notifyChannelConnect(final ChannelFuture future, final Promise
+ * A serviceable channel is one that is open, has an {@link RntbdContext RNTBD context}, and has fewer than {@link
+ * #maxRequestsPerChannel} requests in its pipeline. An inactive channel will not have a {@link RntbdRequestManager
+ * request manager}. Hence, this method first checks that the channel's request manager is non-null.
+ *
+ * @param channel the channel to check.
+ *
+ * @return {@link RntbdChannelState}.
+ */
+ private RntbdChannelState getChannelState(Channel channel) {
+ checkNotNull(channel, "Channel cannot be null");
+
+ final RntbdRequestManager manager = channel.pipeline().get(RntbdRequestManager.class);
+ if (manager == null) {
+ return RntbdChannelState.NULL_REQUEST_MANAGER;
+ }
+ if (!channel.isOpen()) {
+ return RntbdChannelState.CLOSED;
+ }
+
+ return manager.getChannelState(this.maxPendingAcquisitions);
+ }
+
/**
* Poll a {@link Channel} out of internal storage to reuse it
*
@@ -1163,13 +1208,21 @@ private boolean offerChannel(final Channel channel) {
* this method serially on a single-threaded EventExecutor. As a result this method need not (and should not) be
* synchronized.
*
+ *
+ * @param channelAcquisitionTimeline the {@link RntbdChannelAcquisitionTimeline}.
* @return a value of {@code null}, if no {@link Channel} is ready to be reused
*
* @see #acquire(Promise)
*/
- private Channel pollChannel() {
+ private Channel pollChannel(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
ensureInEventLoop();
+ RntbdPollChannelEvent event =
+ RntbdChannelAcquisitionTimeline.startNewPollEvent(
+ channelAcquisitionTimeline,
+ this.availableChannels.size(),
+ this.acquiredChannels.size());
+
final Channel first = this.availableChannels.pollFirst();
if (first == null) {
@@ -1182,7 +1235,10 @@ private Channel pollChannel() {
// Only return channels as servicable here if less than maxPendingRequests
// are queued on them
- if (this.isChannelServiceable(first)) {
+ RntbdChannelState channelState = this.getChannelState(first);
+ RntbdChannelAcquisitionEvent.addDetail(event, channelState);
+
+ if (channelState.isOk()) {
return first;
}
@@ -1193,9 +1249,12 @@ private Channel pollChannel() {
if (next.isActive()) {
- // Only return channels as servicable here if less than maxPendingRequests
+ // Only return channels as serviceable here if less than maxPendingRequests
// are queued on them
- if (this.isChannelServiceable(next)) {
+ RntbdChannelState state = this.getChannelState(next);
+ RntbdChannelAcquisitionEvent.addDetail(event, state);
+
+ if (state.isOk()) {
return next;
}
this.availableChannels.offer(next);
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java
index 17645ce6a08ee..53e5de298ef09 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java
@@ -225,6 +225,9 @@ public LogLevel wireLogLevel() {
return this.wireLogLevel;
}
+ @JsonProperty
+ public boolean isChannelAcquisitionContextEnabled() { return this.options.isChannelAcquisitionContextEnabled(); }
+
@Override
public String toString() {
return RntbdObjectMapper.toString(this);
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdPollChannelEvent.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdPollChannelEvent.java
new file mode 100644
index 0000000000000..c2dc5b945c340
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdPollChannelEvent.java
@@ -0,0 +1,64 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.implementation.directconnectivity.rntbd;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+@JsonSerialize(using = RntbdPollChannelEvent.RntbdPollChannelEventJsonSerializer.class)
+public class RntbdPollChannelEvent extends RntbdChannelAcquisitionEvent {
+ private final int availableChannels;
+ private final int acquiredChannels;
+ private final List