Skip to content

Commit

Permalink
AddingChannelAcquisitionConext (#23464)
Browse files Browse the repository at this point in the history
* add channel acquisition context

Co-authored-by: annie-mac <[email protected]>
Co-authored-by: Kushagra Thapar <[email protected]>
  • Loading branch information
3 people authored Aug 11, 2021
1 parent 4e3ea3e commit bc2bdc4
Show file tree
Hide file tree
Showing 20 changed files with 489 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,12 @@
<Bug pattern="SE_BAD_FIELD"/>
</Match>

<Match>
<Class name="com.azure.cosmos.CosmosException"/>
<Field name="channelAcquisitionTimeline"/>
<Bug pattern="SE_BAD_FIELD"/>
</Match>

<!-- Bug: https://github.com/Azure/azure-sdk-for-java/issues/9088 -->
<Match>
<Class name="com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdContextException"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.StoreResult;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
import com.azure.cosmos.implementation.patch.PatchOperation;
import com.azure.cosmos.implementation.query.QueryInfo;
Expand Down Expand Up @@ -216,6 +217,17 @@ public static <E extends CosmosException> RequestTimeline getRequestTimeline(E e
return e.getRequestTimeline();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> E setChannelAcquisitionTimeline(E e, RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
e.setChannelAcquisitionTimeline(channelAcquisitionTimeline);
return e;
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> RntbdChannelAcquisitionTimeline getChannelAcqusitionTimeline(E e) {
return e.getChannelAcquisitionTimeline();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> E setChannelTaskQueueSize(E e, int value) {
e.setRntbdChannelTaskQueueSize(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -444,6 +445,7 @@ public <T> CosmosPagedFlux<T> queryItems(String query, Class<T> classType) {
*/
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public Mono<Void> openConnectionsAndInitCaches() {

if(isInitialized.compareAndSet(false, true)) {
return this.getFeedRanges().flatMap(feedRanges -> {
List<Flux<FeedResponse<ObjectNode>>> fluxList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class CosmosException extends AzureException {

private CosmosDiagnostics cosmosDiagnostics;
private RequestTimeline requestTimeline;
private RntbdChannelAcquisitionTimeline channelAcquisitionTimeline;
private CosmosError cosmosError;
private int rntbdChannelTaskQueueSize;

Expand Down Expand Up @@ -385,6 +387,14 @@ void setRequestTimeline(RequestTimeline requestTimeline) {
this.requestTimeline = requestTimeline;
}

RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() {
return this.channelAcquisitionTimeline;
}

void setChannelAcquisitionTimeline(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
this.channelAcquisitionTimeline = channelAcquisitionTimeline;
}

void setResourceAddress(String resourceAddress) {
this.resourceAddress = resourceAddress;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ public Mono<StoreResponse> invokeStoreAsync(final Uri addressUri, final RxDocume
response.setRequestPayloadLength(request.getContentLength());
response.setRntbdChannelTaskQueueSize(record.channelTaskQueueLength());
response.setRntbdPendingRequestSize(record.pendingRequestQueueSize());
response.setChannelAcquisitionTimeline(record.getChannelAcquisitionTimeline());
}

})).onErrorMap(throwable -> {
Expand Down Expand Up @@ -262,6 +263,7 @@ public Mono<StoreResponse> invokeStoreAsync(final Uri addressUri, final RxDocume
BridgeInternal.setRntbdPendingRequestQueueSize(cosmosException, record.pendingRequestQueueSize());
BridgeInternal.setChannelTaskQueueSize(cosmosException, record.channelTaskQueueLength());
BridgeInternal.setSendingRequestStarted(cosmosException, record.hasSendingRequestStarted());
BridgeInternal.setChannelAcquisitionTimeline(cosmosException, record.getChannelAcquisitionTimeline());

return cosmosException;
});
Expand Down Expand Up @@ -417,6 +419,9 @@ public static final class Options {
@JsonIgnore()
private final UserAgentContainer userAgent;

@JsonProperty()
private final boolean channelAcquisitionContextEnabled;

// endregion

// region Constructors
Expand Down Expand Up @@ -445,6 +450,7 @@ private Options(final Builder builder) {
this.shutdownTimeout = builder.shutdownTimeout;
this.threadCount = builder.threadCount;
this.userAgent = builder.userAgent;
this.channelAcquisitionContextEnabled = builder.channelAcquisitionContextEnabled;

this.connectTimeout = builder.connectTimeout == null
? builder.requestTimeout
Expand Down Expand Up @@ -472,6 +478,7 @@ private Options(final ConnectionPolicy connectionPolicy) {
this.shutdownTimeout = Duration.ofSeconds(15L);
this.threadCount = 2 * Runtime.getRuntime().availableProcessors();
this.userAgent = new UserAgentContainer();
this.channelAcquisitionContextEnabled = true;
}

// endregion
Expand Down Expand Up @@ -554,6 +561,8 @@ public UserAgentContainer userAgent() {
return this.userAgent;
}

public boolean isChannelAcquisitionContextEnabled() { return this.channelAcquisitionContextEnabled; }

// endregion

// region Methods
Expand Down Expand Up @@ -695,6 +704,7 @@ public static class Builder {
private Duration shutdownTimeout;
private int threadCount;
private UserAgentContainer userAgent;
private boolean channelAcquisitionContextEnabled;

// endregion

Expand Down Expand Up @@ -723,6 +733,7 @@ public Builder(ConnectionPolicy connectionPolicy) {
this.shutdownTimeout = DEFAULT_OPTIONS.shutdownTimeout;
this.threadCount = DEFAULT_OPTIONS.threadCount;
this.userAgent = DEFAULT_OPTIONS.userAgent;
this.channelAcquisitionContextEnabled = DEFAULT_OPTIONS.channelAcquisitionContextEnabled;
}

// endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.RequestTimeline;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -28,6 +29,7 @@ public class StoreResponse {
private int pendingRequestQueueSize;
private int requestPayloadLength;
private RequestTimeline requestTimeline;
private RntbdChannelAcquisitionTimeline channelAcquisitionTimeline;
private int rntbdChannelTaskQueueSize;
private RntbdEndpointStatistics rntbdEndpointStatistics;
private int rntbdRequestLength;
Expand Down Expand Up @@ -170,6 +172,14 @@ RequestTimeline getRequestTimeline() {
return this.requestTimeline;
}

void setChannelAcquisitionTimeline(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
this.channelAcquisitionTimeline = channelAcquisitionTimeline;
}

RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() {
return this.channelAcquisitionTimeline;
}

void setEndpointStatistics(RntbdEndpointStatistics rntbdEndpointStatistics) {
this.rntbdEndpointStatistics = rntbdEndpointStatistics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ public void serialize(StoreResult storeResult,
jsonGenerator.writeObjectField("transportRequestTimeline", storeResult.storeResponse != null ?
storeResult.storeResponse.getRequestTimeline() :
storeResult.exception != null ? BridgeInternal.getRequestTimeline(storeResult.exception) : null);

this.writeNonNullObjectField(
jsonGenerator,
"transportRequestChannelAcquisitionContext",
storeResult.storeResponse != null ? storeResult.storeResponse.getChannelAcquisitionTimeline() :
storeResult.exception != null? BridgeInternal.getChannelAcqusitionTimeline(storeResult.exception) : null);

jsonGenerator.writeObjectField("rntbdRequestLengthInBytes", storeResult.storeResponse != null ?
storeResult.storeResponse.getRntbdRequestLength() : BridgeInternal.getRntbdRequestLength(storeResult.exception));
jsonGenerator.writeObjectField("rntbdResponseLengthInBytes", storeResult.storeResponse != null ?
Expand All @@ -225,5 +232,13 @@ public void serialize(StoreResult storeResult,

jsonGenerator.writeEndObject();
}

private void writeNonNullObjectField(JsonGenerator jsonGenerator, String fieldName, Object object) throws IOException {
if (object == null) {
return;
}

jsonGenerator.writeObjectField(fieldName, object);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@
class ChannelPromiseWithExpiryTime implements Promise<Channel> {
private final Promise<Channel> channelPromise;
private final long expiryTimeInNanos;
private final RntbdChannelAcquisitionTimeline channelAcquisitionTimeline;

public ChannelPromiseWithExpiryTime(Promise<Channel> channelPromise, long expiryTimeInNanos) {
this(channelPromise, expiryTimeInNanos, null);
}

public ChannelPromiseWithExpiryTime(
Promise<Channel> channelPromise,
long expiryTimeInNanos,
RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
checkNotNull(channelPromise, "channelPromise must not be null");
checkNotNull(expiryTimeInNanos, "expiryTimeInNanos must not be null");

this.channelPromise = channelPromise;
this.expiryTimeInNanos = expiryTimeInNanos;
this.channelAcquisitionTimeline = channelAcquisitionTimeline;
}

public long getExpiryTimeInNanos() {
Expand Down Expand Up @@ -172,4 +181,8 @@ public Promise<Channel> sync() throws InterruptedException {
public Promise<Channel> syncUninterruptibly() {
return this.channelPromise.syncUninterruptibly();
}
}

public RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() {
return this.channelAcquisitionTimeline;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;

@JsonSerialize(using = RntbdChannelAcquisitionEvent.RntbdChannelAcquisitionEventJsonSerializer.class)
public class RntbdChannelAcquisitionEvent {
private final Instant createdTime;
private final RntbdChannelAcquisitionEventType eventType;
private volatile Instant completeTime;

public RntbdChannelAcquisitionEvent(RntbdChannelAcquisitionEventType eventType, Instant createdTime) {
this.eventType = eventType;
this.createdTime = createdTime;
}

public Instant getCreatedTime() {
return createdTime;
}

public RntbdChannelAcquisitionEventType getEventType() {
return eventType;
}

public Instant getCompleteTime() {
return completeTime;
}

public void setCompleteTime(Instant completeTime) {
this.completeTime = completeTime;
}

public void complete(Instant completeTime) {
this.completeTime = completeTime;
}

public void addDetail(Object detail) {}

public static void addDetail(RntbdChannelAcquisitionEvent event, Object detail) {
if (event != null) {
event.addDetail(detail);
}
}

public static class RntbdChannelAcquisitionEventJsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer<RntbdChannelAcquisitionEvent> {
@Override
public void serialize(RntbdChannelAcquisitionEvent event,
JsonGenerator writer,
SerializerProvider serializerProvider) throws IOException {
writer.writeStartObject();

writer.writeStringField(event.eventType.toString(), event.createdTime.toString());
if (event.completeTime != null) {
writer.writeNumberField("durationInMicroSec",Duration.between(event.createdTime, event.completeTime).toNanos()/1000L);
}

writer.writeEndObject();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.directconnectivity.rntbd;

public enum RntbdChannelAcquisitionEventType {
ATTEMPT_TO_POLL_CHANNEL("poll"),
ADD_TO_PENDING_QUEUE("pending"),
PENDING_TIME_OUT("pendingTimeout"),
ATTEMPT_TO_CREATE_NEW_CHANNEL("startNew"),
ATTEMPT_TO_CREATE_NEW_CHANNEL_COMPLETE("completeNew");

private String name;
RntbdChannelAcquisitionEventType(String name) {
this.name = name;
}

@Override
public String toString(){
return name;
}
}
Loading

0 comments on commit bc2bdc4

Please sign in to comment.