From bc2bdc4f8f9b230b12c41262a6dc071e4e5ecbb4 Mon Sep 17 00:00:00 2001 From: Annie Liang <64233642+xinlian12@users.noreply.github.com> Date: Wed, 11 Aug 2021 15:05:57 -0700 Subject: [PATCH] AddingChannelAcquisitionConext (#23464) * add channel acquisition context Co-authored-by: annie-mac Co-authored-by: Kushagra Thapar --- .../resources/spotbugs/spotbugs-exclude.xml | 6 + .../java/com/azure/cosmos/BridgeInternal.java | 12 ++ .../azure/cosmos/CosmosAsyncContainer.java | 2 + .../com/azure/cosmos/CosmosException.java | 10 ++ .../RntbdTransportClient.java | 11 ++ .../directconnectivity/StoreResponse.java | 10 ++ .../directconnectivity/StoreResult.java | 15 +++ .../rntbd/ChannelPromiseWithExpiryTime.java | 15 ++- .../rntbd/RntbdChannelAcquisitionEvent.java | 68 +++++++++++ .../RntbdChannelAcquisitionEventType.java | 22 ++++ .../RntbdChannelAcquisitionTimeline.java | 66 ++++++++++ .../rntbd/RntbdChannelState.java | 69 +++++++++++ .../rntbd/RntbdClientChannelPool.java | 115 +++++++++++++----- .../rntbd/RntbdEndpoint.java | 3 + .../rntbd/RntbdPollChannelEvent.java | 64 ++++++++++ .../rntbd/RntbdRequestManager.java | 12 +- .../rntbd/RntbdRequestRecord.java | 13 ++ .../rntbd/RntbdServiceEndpoint.java | 6 +- ...ainerOpenConnectionsAndInitCachesTest.java | 1 + .../azure/cosmos/CosmosDiagnosticsTest.java | 1 + 20 files changed, 489 insertions(+), 32 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEvent.java create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEventType.java create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionTimeline.java create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelState.java create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdPollChannelEvent.java diff --git a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml index 743abfc4fa083..5457436b8cafe 100755 --- a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml +++ b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml @@ -1235,6 +1235,12 @@ + + + + + + diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java index 321f254f0e1a1..93ff9afe934aa 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java @@ -32,6 +32,7 @@ import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.azure.cosmos.implementation.directconnectivity.StoreResult; import com.azure.cosmos.implementation.directconnectivity.Uri; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics; import com.azure.cosmos.implementation.patch.PatchOperation; import com.azure.cosmos.implementation.query.QueryInfo; @@ -216,6 +217,17 @@ public static RequestTimeline getRequestTimeline(E e return e.getRequestTimeline(); } + @Warning(value = INTERNAL_USE_ONLY_WARNING) + public static E setChannelAcquisitionTimeline(E e, RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) { + e.setChannelAcquisitionTimeline(channelAcquisitionTimeline); + return e; + } + + @Warning(value = INTERNAL_USE_ONLY_WARNING) + public static RntbdChannelAcquisitionTimeline getChannelAcqusitionTimeline(E e) { + return e.getChannelAcquisitionTimeline(); + } + @Warning(value = INTERNAL_USE_ONLY_WARNING) public static E setChannelTaskQueueSize(E e, int value) { e.setRntbdChannelTaskQueueSize(value); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java index 8ebb80c36817c..ff83afa920f5a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java @@ -60,6 +60,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -444,6 +445,7 @@ public CosmosPagedFlux queryItems(String query, Class classType) { */ @Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) public Mono openConnectionsAndInitCaches() { + if(isInitialized.compareAndSet(false, true)) { return this.getFeedRanges().flatMap(feedRanges -> { List>> fluxList = new ArrayList<>(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java index c5d31e888a5ec..5005791479ab1 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java @@ -11,6 +11,7 @@ import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.directconnectivity.Uri; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics; import com.azure.cosmos.models.ModelBridgeInternal; import com.fasterxml.jackson.core.JsonProcessingException; @@ -51,6 +52,7 @@ public class CosmosException extends AzureException { private CosmosDiagnostics cosmosDiagnostics; private RequestTimeline requestTimeline; + private RntbdChannelAcquisitionTimeline channelAcquisitionTimeline; private CosmosError cosmosError; private int rntbdChannelTaskQueueSize; @@ -385,6 +387,14 @@ void setRequestTimeline(RequestTimeline requestTimeline) { this.requestTimeline = requestTimeline; } + RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() { + return this.channelAcquisitionTimeline; + } + + void setChannelAcquisitionTimeline(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) { + this.channelAcquisitionTimeline = channelAcquisitionTimeline; + } + void setResourceAddress(String resourceAddress) { this.resourceAddress = resourceAddress; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java index 46da32ba61a3a..d6244ed266182 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java @@ -229,6 +229,7 @@ public Mono invokeStoreAsync(final Uri addressUri, final RxDocume response.setRequestPayloadLength(request.getContentLength()); response.setRntbdChannelTaskQueueSize(record.channelTaskQueueLength()); response.setRntbdPendingRequestSize(record.pendingRequestQueueSize()); + response.setChannelAcquisitionTimeline(record.getChannelAcquisitionTimeline()); } })).onErrorMap(throwable -> { @@ -262,6 +263,7 @@ public Mono invokeStoreAsync(final Uri addressUri, final RxDocume BridgeInternal.setRntbdPendingRequestQueueSize(cosmosException, record.pendingRequestQueueSize()); BridgeInternal.setChannelTaskQueueSize(cosmosException, record.channelTaskQueueLength()); BridgeInternal.setSendingRequestStarted(cosmosException, record.hasSendingRequestStarted()); + BridgeInternal.setChannelAcquisitionTimeline(cosmosException, record.getChannelAcquisitionTimeline()); return cosmosException; }); @@ -417,6 +419,9 @@ public static final class Options { @JsonIgnore() private final UserAgentContainer userAgent; + @JsonProperty() + private final boolean channelAcquisitionContextEnabled; + // endregion // region Constructors @@ -445,6 +450,7 @@ private Options(final Builder builder) { this.shutdownTimeout = builder.shutdownTimeout; this.threadCount = builder.threadCount; this.userAgent = builder.userAgent; + this.channelAcquisitionContextEnabled = builder.channelAcquisitionContextEnabled; this.connectTimeout = builder.connectTimeout == null ? builder.requestTimeout @@ -472,6 +478,7 @@ private Options(final ConnectionPolicy connectionPolicy) { this.shutdownTimeout = Duration.ofSeconds(15L); this.threadCount = 2 * Runtime.getRuntime().availableProcessors(); this.userAgent = new UserAgentContainer(); + this.channelAcquisitionContextEnabled = true; } // endregion @@ -554,6 +561,8 @@ public UserAgentContainer userAgent() { return this.userAgent; } + public boolean isChannelAcquisitionContextEnabled() { return this.channelAcquisitionContextEnabled; } + // endregion // region Methods @@ -695,6 +704,7 @@ public static class Builder { private Duration shutdownTimeout; private int threadCount; private UserAgentContainer userAgent; + private boolean channelAcquisitionContextEnabled; // endregion @@ -723,6 +733,7 @@ public Builder(ConnectionPolicy connectionPolicy) { this.shutdownTimeout = DEFAULT_OPTIONS.shutdownTimeout; this.threadCount = DEFAULT_OPTIONS.threadCount; this.userAgent = DEFAULT_OPTIONS.userAgent; + this.channelAcquisitionContextEnabled = DEFAULT_OPTIONS.channelAcquisitionContextEnabled; } // endregion diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java index 2e1740d9c76cb..8b09ab664a536 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java @@ -7,6 +7,7 @@ import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.RequestTimeline; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +29,7 @@ public class StoreResponse { private int pendingRequestQueueSize; private int requestPayloadLength; private RequestTimeline requestTimeline; + private RntbdChannelAcquisitionTimeline channelAcquisitionTimeline; private int rntbdChannelTaskQueueSize; private RntbdEndpointStatistics rntbdEndpointStatistics; private int rntbdRequestLength; @@ -170,6 +172,14 @@ RequestTimeline getRequestTimeline() { return this.requestTimeline; } + void setChannelAcquisitionTimeline(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) { + this.channelAcquisitionTimeline = channelAcquisitionTimeline; + } + + RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() { + return this.channelAcquisitionTimeline; + } + void setEndpointStatistics(RntbdEndpointStatistics rntbdEndpointStatistics) { this.rntbdEndpointStatistics = rntbdEndpointStatistics; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java index 702c3774a4855..cd06187e75107 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java @@ -208,6 +208,13 @@ public void serialize(StoreResult storeResult, jsonGenerator.writeObjectField("transportRequestTimeline", storeResult.storeResponse != null ? storeResult.storeResponse.getRequestTimeline() : storeResult.exception != null ? BridgeInternal.getRequestTimeline(storeResult.exception) : null); + + this.writeNonNullObjectField( + jsonGenerator, + "transportRequestChannelAcquisitionContext", + storeResult.storeResponse != null ? storeResult.storeResponse.getChannelAcquisitionTimeline() : + storeResult.exception != null? BridgeInternal.getChannelAcqusitionTimeline(storeResult.exception) : null); + jsonGenerator.writeObjectField("rntbdRequestLengthInBytes", storeResult.storeResponse != null ? storeResult.storeResponse.getRntbdRequestLength() : BridgeInternal.getRntbdRequestLength(storeResult.exception)); jsonGenerator.writeObjectField("rntbdResponseLengthInBytes", storeResult.storeResponse != null ? @@ -225,5 +232,13 @@ public void serialize(StoreResult storeResult, jsonGenerator.writeEndObject(); } + + private void writeNonNullObjectField(JsonGenerator jsonGenerator, String fieldName, Object object) throws IOException { + if (object == null) { + return; + } + + jsonGenerator.writeObjectField(fieldName, object); + } } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/ChannelPromiseWithExpiryTime.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/ChannelPromiseWithExpiryTime.java index 19823e2bb99ca..4ab41117b76f0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/ChannelPromiseWithExpiryTime.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/ChannelPromiseWithExpiryTime.java @@ -17,13 +17,22 @@ class ChannelPromiseWithExpiryTime implements Promise { private final Promise channelPromise; private final long expiryTimeInNanos; + private final RntbdChannelAcquisitionTimeline channelAcquisitionTimeline; public ChannelPromiseWithExpiryTime(Promise channelPromise, long expiryTimeInNanos) { + this(channelPromise, expiryTimeInNanos, null); + } + + public ChannelPromiseWithExpiryTime( + Promise channelPromise, + long expiryTimeInNanos, + RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) { checkNotNull(channelPromise, "channelPromise must not be null"); checkNotNull(expiryTimeInNanos, "expiryTimeInNanos must not be null"); this.channelPromise = channelPromise; this.expiryTimeInNanos = expiryTimeInNanos; + this.channelAcquisitionTimeline = channelAcquisitionTimeline; } public long getExpiryTimeInNanos() { @@ -172,4 +181,8 @@ public Promise sync() throws InterruptedException { public Promise syncUninterruptibly() { return this.channelPromise.syncUninterruptibly(); } -} \ No newline at end of file + + public RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() { + return this.channelAcquisitionTimeline; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEvent.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEvent.java new file mode 100644 index 0000000000000..4be91e871404c --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEvent.java @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.directconnectivity.rntbd; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; + +@JsonSerialize(using = RntbdChannelAcquisitionEvent.RntbdChannelAcquisitionEventJsonSerializer.class) +public class RntbdChannelAcquisitionEvent { + private final Instant createdTime; + private final RntbdChannelAcquisitionEventType eventType; + private volatile Instant completeTime; + + public RntbdChannelAcquisitionEvent(RntbdChannelAcquisitionEventType eventType, Instant createdTime) { + this.eventType = eventType; + this.createdTime = createdTime; + } + + public Instant getCreatedTime() { + return createdTime; + } + + public RntbdChannelAcquisitionEventType getEventType() { + return eventType; + } + + public Instant getCompleteTime() { + return completeTime; + } + + public void setCompleteTime(Instant completeTime) { + this.completeTime = completeTime; + } + + public void complete(Instant completeTime) { + this.completeTime = completeTime; + } + + public void addDetail(Object detail) {} + + public static void addDetail(RntbdChannelAcquisitionEvent event, Object detail) { + if (event != null) { + event.addDetail(detail); + } + } + + public static class RntbdChannelAcquisitionEventJsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(RntbdChannelAcquisitionEvent event, + JsonGenerator writer, + SerializerProvider serializerProvider) throws IOException { + writer.writeStartObject(); + + writer.writeStringField(event.eventType.toString(), event.createdTime.toString()); + if (event.completeTime != null) { + writer.writeNumberField("durationInMicroSec",Duration.between(event.createdTime, event.completeTime).toNanos()/1000L); + } + + writer.writeEndObject(); + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEventType.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEventType.java new file mode 100644 index 0000000000000..d67a5dad3ad04 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionEventType.java @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.directconnectivity.rntbd; + +public enum RntbdChannelAcquisitionEventType { + ATTEMPT_TO_POLL_CHANNEL("poll"), + ADD_TO_PENDING_QUEUE("pending"), + PENDING_TIME_OUT("pendingTimeout"), + ATTEMPT_TO_CREATE_NEW_CHANNEL("startNew"), + ATTEMPT_TO_CREATE_NEW_CHANNEL_COMPLETE("completeNew"); + + private String name; + RntbdChannelAcquisitionEventType(String name) { + this.name = name; + } + + @Override + public String toString(){ + return name; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionTimeline.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionTimeline.java new file mode 100644 index 0000000000000..4692a64dbe6c3 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelAcquisitionTimeline.java @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.directconnectivity.rntbd; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +public class RntbdChannelAcquisitionTimeline { + private static final Logger logger = LoggerFactory.getLogger(RntbdChannelAcquisitionTimeline.class); + private final List events; + private volatile RntbdChannelAcquisitionEvent currentEvent; + + public RntbdChannelAcquisitionTimeline() { + this.events = new ArrayList<>(); + } + + public List getEvents() { + return events; + } + + public static RntbdChannelAcquisitionEvent startNewEvent( + RntbdChannelAcquisitionTimeline timeline, + RntbdChannelAcquisitionEventType eventType) { + + if (timeline != null) { + RntbdChannelAcquisitionEvent newEvent = new RntbdChannelAcquisitionEvent(eventType, Instant.now()); + timeline.addNewEvent(newEvent); + + return newEvent; + } + return null; + } + + public static RntbdPollChannelEvent startNewPollEvent( + RntbdChannelAcquisitionTimeline timeline, + int availableChannels, + int acquiredChannels) { + + if (timeline != null) { + RntbdPollChannelEvent newEvent = new RntbdPollChannelEvent(availableChannels, acquiredChannels, Instant.now()); + timeline.addNewEvent(newEvent); + return newEvent; + } + + return null; + } + + private void addNewEvent(RntbdChannelAcquisitionEvent event) { + if (this.currentEvent != null) { + this.currentEvent.complete(event.getCreatedTime()); + } + this.events.add(event); + this.currentEvent = event; + } + + public static void addDetailsToLastEvent(RntbdChannelAcquisitionTimeline timeline, Object detail) { + if (timeline != null && timeline.currentEvent != null){ + RntbdChannelAcquisitionEvent.addDetail(timeline.currentEvent, detail); + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelState.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelState.java new file mode 100644 index 0000000000000..34aa51d710a7e --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdChannelState.java @@ -0,0 +1,69 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.directconnectivity.rntbd; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; + +@JsonSerialize(using = RntbdChannelState.RntbdChannelStateJsonSerializer.class) +public class RntbdChannelState { + public static final RntbdChannelState NULL_REQUEST_MANAGER = new RntbdChannelState(State.NULL_REQUEST_MANAGER, 0); + public static final RntbdChannelState CLOSED = new RntbdChannelState(State.CLOSED, 0); + + private final int pendingRequests; + private final State state; + + public RntbdChannelState(State state, int pendingRequests) { + this.state = state; + this.pendingRequests = pendingRequests; + } + + public static RntbdChannelState ok(int pendingRequests) { + return new RntbdChannelState(State.OK, pendingRequests); + } + + public static RntbdChannelState pendingLimit(int pendingRequests) { + return new RntbdChannelState(State.PENDING_LIMIT, pendingRequests); + } + + public static RntbdChannelState contextNegotiationPending(int pendingRequests) { + return new RntbdChannelState(State.CONTEXT_NEGOTIATION_PENDING, pendingRequests); + } + + public boolean isOk() { + return this.state == State.OK; + } + + public static class RntbdChannelStateJsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(RntbdChannelState channelState, + JsonGenerator writer, + SerializerProvider serializerProvider) throws IOException { + writer.writeStartObject(); + writer.writeNumberField(channelState.state.toString(), channelState.pendingRequests); + writer.writeEndObject(); + } + } + + enum State{ + OK("ok"), + CLOSED("closed"), + NULL_REQUEST_MANAGER("nullRequestManager"), + PENDING_LIMIT("pendingLimit"), + CONTEXT_NEGOTIATION_PENDING("contextNegotiationPending"); + + private String value; + State(String value) { + this.value = value; + } + + @Override + public String toString(){ + return value; + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java index 4636d4a5c7171..66b4487540ab9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java @@ -237,6 +237,9 @@ protected void initChannel(final Channel channel) throws Exception { @Override public void onTimeout(AcquireListener task) { task.originalPromise.setFailure(ACQUISITION_TIMEOUT); + RntbdChannelAcquisitionTimeline.startNewEvent( + task.originalPromise.getChannelAcquisitionTimeline(), + RntbdChannelAcquisitionEventType.PENDING_TIME_OUT); } }; @@ -421,6 +424,13 @@ public Future acquire() { System.nanoTime() + this.acquisitionTimeoutInNanos)); } + public Future acquire(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) { + return this.acquire(new ChannelPromiseWithExpiryTime( + this.bootstrap.config().group().next().newPromise(), + System.nanoTime() + this.acquisitionTimeoutInNanos, + channelAcquisitionTimeline)); + } + /** * Acquire a {@link Channel channel} from the current {@link RntbdClientChannelPool pool}. *

@@ -588,20 +598,23 @@ public String toString() { * * @param promise the promise of a {@link Channel channel}. * - * @see #isChannelServiceable(Channel) + * @see #getChannelState(Channel) (Channel) * @see AcquireTimeoutTask */ private void acquireChannel(final ChannelPromiseWithExpiryTime promise) { this.ensureInEventLoop(); + reportIssueUnless(logger, promise != null, this, "Channel promise should not be null"); + RntbdChannelAcquisitionTimeline channelAcquisitionTimeline = promise.getChannelAcquisitionTimeline(); + if (this.isClosed()) { promise.setFailure(POOL_CLOSED_ON_ACQUIRE); return; } try { - Channel candidate = this.pollChannel(); + Channel candidate = this.pollChannel(channelAcquisitionTimeline); if (candidate != null) { @@ -624,6 +637,11 @@ private void acquireChannel(final ChannelPromiseWithExpiryTime promise) { // If our connection attempt fails, notifyChannelConnect will call us again final Promise anotherPromise = this.newChannelPromiseForToBeEstablishedChannel(promise); + + RntbdChannelAcquisitionTimeline.startNewEvent( + channelAcquisitionTimeline, + RntbdChannelAcquisitionEventType.ATTEMPT_TO_CREATE_NEW_CHANNEL); + final ChannelFuture future = this.bootstrap.clone().attr(POOL_KEY, this).connect(); if (future.isDone()) { @@ -653,9 +671,14 @@ private void acquireChannel(final ChannelPromiseWithExpiryTime promise) { // we accept the risk of reusing the channel even if more than maxPendingRequests are // queued - by picking the channel with the least number of outstanding requests we load // balance reasonably - if (pendingRequestCount < pendingRequestCountMin && isChannelServiceable(channel)) { - pendingRequestCountMin = pendingRequestCount; - candidate = channel; + if (pendingRequestCount < pendingRequestCountMin) { + RntbdChannelState channelState = this.getChannelState(channel); + RntbdChannelAcquisitionTimeline.addDetailsToLastEvent(channelAcquisitionTimeline, channelState); + + if (channelState.isOk()) { + pendingRequestCountMin = pendingRequestCount; + candidate = channel; + } } } } @@ -669,7 +692,10 @@ private void acquireChannel(final ChannelPromiseWithExpiryTime promise) { // we pick the first available channel to avoid the additional cost of load balancing // as long as the load is lower than the load factor threshold above. - if (isChannelServiceable(channel)) { + RntbdChannelState channelState = this.getChannelState(channel); + RntbdChannelAcquisitionTimeline.addDetailsToLastEvent(channelAcquisitionTimeline, channelState); + + if (channelState.isOk()) { if (this.availableChannels.remove(channel)) { this.doAcquireChannel(promise, channel); return; @@ -714,6 +740,10 @@ private void addTaskToPendingAcquisitionQueue(ChannelPromiseWithExpiryTime promi if (!this.pendingAcquisitions.offer(acquireTask)) { promise.setFailure(TOO_MANY_PENDING_ACQUISITIONS); + } else { + RntbdChannelAcquisitionTimeline.startNewEvent( + promise.getChannelAcquisitionTimeline(), + RntbdChannelAcquisitionEventType.ADD_TO_PENDING_QUEUE); } } } @@ -874,7 +904,7 @@ private void doClose() { for (; ; ) { // will remove from available channels - final Channel channel = this.pollChannel(); + final Channel channel = this.pollChannel(null); if (channel == null) { break; } @@ -908,22 +938,6 @@ private void ensureInEventLoop() { Thread.currentThread()); } - /** - * {@code true} if the given {@link Channel channel} is serviceable; {@code false} otherwise. - *

- * A serviceable channel is one that is open, has an {@link RntbdContext RNTBD context}, and has fewer than {@link - * #maxRequestsPerChannel} requests in its pipeline. An inactive channel will not have a {@link RntbdRequestManager - * request manager}. Hence, this method first checks that the channel's request manager is non-null. - * - * @param channel the channel to check. - * - * @return {@code true} if the given {@link Channel channel} is serviceable; {@code false} otherwise. - */ - private boolean isChannelServiceable(final Channel channel) { - final RntbdRequestManager manager = channel.pipeline().get(RntbdRequestManager.class); - return manager != null && manager.isServiceable(this.maxRequestsPerChannel) && channel.isOpen(); - } - /** * Creates a new {@link Channel channel} {@link Promise promise} that completes on a dedicated * {@link EventExecutor executor} to avoid spamming the {@link RntbdClientChannelPool pool}'s @@ -967,7 +981,7 @@ private ChannelPromiseWithExpiryTime createNewChannelPromise( listener.acquired(); anotherPromise.addListener(listener); - return new ChannelPromiseWithExpiryTime(anotherPromise, promise.getExpiryTimeInNanos()); + return new ChannelPromiseWithExpiryTime(anotherPromise, promise.getExpiryTimeInNanos(), promise.getChannelAcquisitionTimeline()); } private void newTimeout( @@ -1101,6 +1115,12 @@ private void notifyChannelConnect(final ChannelFuture future, final Promise + * A serviceable channel is one that is open, has an {@link RntbdContext RNTBD context}, and has fewer than {@link + * #maxRequestsPerChannel} requests in its pipeline. An inactive channel will not have a {@link RntbdRequestManager + * request manager}. Hence, this method first checks that the channel's request manager is non-null. + * + * @param channel the channel to check. + * + * @return {@link RntbdChannelState}. + */ + private RntbdChannelState getChannelState(Channel channel) { + checkNotNull(channel, "Channel cannot be null"); + + final RntbdRequestManager manager = channel.pipeline().get(RntbdRequestManager.class); + if (manager == null) { + return RntbdChannelState.NULL_REQUEST_MANAGER; + } + if (!channel.isOpen()) { + return RntbdChannelState.CLOSED; + } + + return manager.getChannelState(this.maxPendingAcquisitions); + } + /** * Poll a {@link Channel} out of internal storage to reuse it *

@@ -1163,13 +1208,21 @@ private boolean offerChannel(final Channel channel) { * this method serially on a single-threaded EventExecutor. As a result this method need not (and should not) be * synchronized. * + * + * @param channelAcquisitionTimeline the {@link RntbdChannelAcquisitionTimeline}. * @return a value of {@code null}, if no {@link Channel} is ready to be reused * * @see #acquire(Promise) */ - private Channel pollChannel() { + private Channel pollChannel(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) { ensureInEventLoop(); + RntbdPollChannelEvent event = + RntbdChannelAcquisitionTimeline.startNewPollEvent( + channelAcquisitionTimeline, + this.availableChannels.size(), + this.acquiredChannels.size()); + final Channel first = this.availableChannels.pollFirst(); if (first == null) { @@ -1182,7 +1235,10 @@ private Channel pollChannel() { // Only return channels as servicable here if less than maxPendingRequests // are queued on them - if (this.isChannelServiceable(first)) { + RntbdChannelState channelState = this.getChannelState(first); + RntbdChannelAcquisitionEvent.addDetail(event, channelState); + + if (channelState.isOk()) { return first; } @@ -1193,9 +1249,12 @@ private Channel pollChannel() { if (next.isActive()) { - // Only return channels as servicable here if less than maxPendingRequests + // Only return channels as serviceable here if less than maxPendingRequests // are queued on them - if (this.isChannelServiceable(next)) { + RntbdChannelState state = this.getChannelState(next); + RntbdChannelAcquisitionEvent.addDetail(event, state); + + if (state.isOk()) { return next; } this.availableChannels.offer(next); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java index 17645ce6a08ee..53e5de298ef09 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java @@ -225,6 +225,9 @@ public LogLevel wireLogLevel() { return this.wireLogLevel; } + @JsonProperty + public boolean isChannelAcquisitionContextEnabled() { return this.options.isChannelAcquisitionContextEnabled(); } + @Override public String toString() { return RntbdObjectMapper.toString(this); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdPollChannelEvent.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdPollChannelEvent.java new file mode 100644 index 0000000000000..c2dc5b945c340 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdPollChannelEvent.java @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.directconnectivity.rntbd; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +@JsonSerialize(using = RntbdPollChannelEvent.RntbdPollChannelEventJsonSerializer.class) +public class RntbdPollChannelEvent extends RntbdChannelAcquisitionEvent { + private final int availableChannels; + private final int acquiredChannels; + private final List details; + + public RntbdPollChannelEvent(int availableChannels, int acquiredChannels, Instant createdTime) { + + super(RntbdChannelAcquisitionEventType.ATTEMPT_TO_POLL_CHANNEL, createdTime); + this.availableChannels = availableChannels; + this.acquiredChannels = acquiredChannels; + this.details = new ArrayList<>(); + } + + @Override + public void addDetail(Object detail) { + this.details.add(detail); + } + + public static class RntbdPollChannelEventJsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(RntbdPollChannelEvent event, + JsonGenerator writer, + SerializerProvider serializerProvider) throws IOException { + writer.writeStartObject(); + + writer.writeStringField(event.getEventType().toString(), event.getCreatedTime().toString()); + if (event.availableChannels > 0) { + writer.writeNumberField("availableChannels", event.availableChannels); + } + + if (event.acquiredChannels > 0) { + writer.writeNumberField("acquiredChannels", event.acquiredChannels); + } + + if (event.getCompleteTime() != null) { + writer.writeNumberField( + "durationInMicroSec", + Duration.between(event.getCompleteTime(), event.getCompleteTime()).toNanos()/1000L); + } + + if (event.details != null && event.details.size() > 0) { + writer.writeObjectField("details", event.details); + } + + writer.writeEndObject(); + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java index 98acb16092277..4f4c3165a18a7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java @@ -568,10 +568,18 @@ boolean hasRntbdContext() { return this.contextFuture.getNow(null) != null; } - boolean isServiceable(final int demand) { + RntbdChannelState getChannelState(final int demand) { reportIssueUnless(this.hasRequestedRntbdContext(), this, "Direct TCP context request was not issued"); + final int limit = this.hasRntbdContext() ? this.pendingRequestLimit : Math.min(this.pendingRequestLimit, demand); - return this.pendingRequests.size() < limit; + if (this.pendingRequests.size() < limit) { + return RntbdChannelState.ok(this.pendingRequests.size()); + } + if (this.hasRntbdContext()) { + return RntbdChannelState.pendingLimit(this.pendingRequests.size()); + } else { + return RntbdChannelState.contextNegotiationPending((this.pendingRequests.size())); + } } void pendWrite(final ByteBuf out, final ChannelPromise promise) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java index 350fae5d10172..813f88726a01f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java @@ -65,6 +65,8 @@ public abstract class RntbdRequestRecord extends CompletableFuture connectedChannel = this.channelPool.acquire(); + final Future connectedChannel = this.channelPool.acquire(requestRecord.getChannelAcquisitionTimeline()); logger.debug("\n [{}]\n {}\n WRITE WHEN CONNECTED {}", this, requestArgs, connectedChannel); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerOpenConnectionsAndInitCachesTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerOpenConnectionsAndInitCachesTest.java index a5abec5425a68..6951e4afb4a0c 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerOpenConnectionsAndInitCachesTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerOpenConnectionsAndInitCachesTest.java @@ -163,6 +163,7 @@ public void loadCachesAndOpenConnectionsToServiceAsyncContainer() throws Illegal @Test(groups = {"simple"}) public void loadCachesAndOpenConnectionsToServiceSyncContainer() throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException { + RntbdTransportClient rntbdTransportClient = (RntbdTransportClient) ReflectionUtils.getTransportClient(directCosmosClient); RntbdEndpoint.Provider provider = ReflectionUtils.getRntbdEndpointProvider(rntbdTransportClient); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java index 6c1eeb9a3ad9c..2cf268cbaa7f5 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java @@ -239,6 +239,7 @@ public void directDiagnostics() { assertThat(diagnostics).contains("\"serializationType\":\"PARTITION_KEY_FETCH_SERIALIZATION\""); assertThat(diagnostics).contains("\"userAgent\":\"" + Utils.getUserAgent() + "\""); assertThat(diagnostics).contains("\"backendLatencyInMs\""); + assertThat(diagnostics).contains("\"transportRequestChannelAcquisitionContext\""); assertThat(createResponse.getDiagnostics().getRegionsContacted()).isNotEmpty(); assertThat(createResponse.getDiagnostics().getDuration()).isNotNull(); validateTransportRequestTimelineDirect(diagnostics);