diff --git a/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java b/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java index e273ae55c..84909e291 100644 --- a/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java +++ b/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java @@ -89,7 +89,7 @@ public static OpenTelemetry createOpenTelemetry() { * Converts current OpenTelemetry's context into Reactor's context. * @return Reactor's context. */ - public static reactor.util.context.Context getReactorContext() { + public static reactor.util.context.ContextView getReactorContext() { return getReactorContext(Context.current()); } diff --git a/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java b/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java index e1c1306b0..a6552f9cd 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java @@ -71,7 +71,7 @@ public static void main(String[] args) throws Exception { System.out.println("Going to publish message : " + message); } BulkPublishResponse res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages) - .subscriberContext(getReactorContext()).block(); + .contextWrite(getReactorContext()).block(); System.out.println("Published the set of messages in a single call to Dapr"); if (res != null) { if (res.getFailedEntries().size() > 0) { diff --git a/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java b/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java index fe9bf93e5..cc8b3d1ac 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception { client.publishEvent( PUBSUB_NAME, TOPIC_NAME, - message).subscriberContext(getReactorContext()).block(); + message).contextWrite(getReactorContext()).block(); System.out.println("Published message: " + message); try { diff --git a/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java b/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java index 9078fb0d5..3ddfa1d38 100644 --- a/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java +++ b/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java @@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception { InvokeMethodRequest sleepRequest = new InvokeMethodRequest(SERVICE_APP_ID, "proxy_sleep") .setHttpExtension(HttpExtension.POST); return client.invokeMethod(sleepRequest, TypeRef.get(Void.class)); - }).subscriberContext(getReactorContext()).block(); + }).contextWrite(getReactorContext()).block(); } } } diff --git a/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java b/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java index 9a3111e85..323d4c023 100644 --- a/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java +++ b/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java @@ -58,7 +58,7 @@ public Mono echo( InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "echo") .setBody(body) .setHttpExtension(HttpExtension.POST); - return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context)); + return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context)); } /** @@ -71,7 +71,7 @@ public Mono echo( public Mono sleep(@RequestAttribute(name = "opentelemetry-context") Context context) { InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "sleep") .setHttpExtension(HttpExtension.POST); - return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context)).then(); + return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context)).then(); } } diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java index a5b963a8f..844d2ace3 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java @@ -29,7 +29,7 @@ import io.grpc.stub.StreamObserver; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; -import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -65,7 +65,7 @@ public Mono invoke(String actorType, String actorId, String methodName, .setMethod(methodName) .setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload)) .build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, client).invokeActor(req, it) ) @@ -109,7 +109,7 @@ public void start(final Listener responseListener, final Metadata metadat * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) { + private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) { return GrpcWrapper.intercept(context, client); } diff --git a/sdk-tests/pom.xml b/sdk-tests/pom.xml index 2546b97d9..699db1606 100644 --- a/sdk-tests/pom.xml +++ b/sdk-tests/pom.xml @@ -131,7 +131,7 @@ io.projectreactor reactor-core - 3.3.11.RELEASE + 3.5.0 test diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java index 73ac5ca0a..ef1a66c03 100644 --- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java @@ -101,7 +101,7 @@ public void testInvokeTimeout() throws Exception { .block(Duration.ofMillis(10))).getMessage(); long delay = System.currentTimeMillis() - started; assertTrue(delay <= 500); // 500 ms is a reasonable delay if the request timed out. - assertEquals("Timeout on blocking read for 10 MILLISECONDS", message); + assertEquals("Timeout on blocking read for 10000000 NANOSECONDS", message); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java index 39cb07db8..f18cf5504 100644 --- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java @@ -118,7 +118,7 @@ public void testInvokeTimeout() throws Exception { }).getMessage(); long delay = System.currentTimeMillis() - started; assertTrue(delay <= 200); // 200 ms is a reasonable delay if the request timed out. - assertEquals("Timeout on blocking read for 10 MILLISECONDS", message); + assertEquals("Timeout on blocking read for 10000000 NANOSECONDS", message); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/TracingIT.java b/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/TracingIT.java index 7e604f357..facabb10e 100644 --- a/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/TracingIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/TracingIT.java @@ -79,7 +79,7 @@ public void testInvoke() throws Exception { try (Scope scope = span.makeCurrent()) { SleepRequest req = SleepRequest.newBuilder().setSeconds(1).build(); client.invokeMethod(daprRun.getAppName(), "sleepOverGRPC", req.toByteArray(), HttpExtension.POST) - .subscriberContext(getReactorContext()) + .contextWrite(getReactorContext()) .block(); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/http/TracingIT.java b/sdk-tests/src/test/java/io/dapr/it/tracing/http/TracingIT.java index a9301966f..0b13e4a27 100644 --- a/sdk-tests/src/test/java/io/dapr/it/tracing/http/TracingIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/tracing/http/TracingIT.java @@ -76,7 +76,7 @@ public void testInvoke() throws Exception { try (DaprClient client = new DaprClientBuilder().build()) { try (Scope scope = span.makeCurrent()) { client.invokeMethod(daprRun.getAppName(), "sleep", 1, HttpExtension.POST) - .subscriberContext(getReactorContext()) + .contextWrite(getReactorContext()) .block(); } } diff --git a/sdk/pom.xml b/sdk/pom.xml index 69a13deb5..3bb2b599a 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -44,7 +44,7 @@ io.projectreactor reactor-core - 3.3.11.RELEASE + 3.5.0 com.squareup.okhttp3 diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index cdc6c88b6..e038ea4c2 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -67,7 +67,7 @@ import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; -import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.io.Closeable; import java.io.IOException; @@ -181,7 +181,7 @@ public Mono publishEvent(PublishEventRequest request) { envelopeBuilder.putAllMetadata(metadata); } - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it) @@ -254,7 +254,7 @@ public Mono> publishEvents(BulkPublishRequest requ for (BulkPublishEntry entry: request.getEntries()) { entryMap.put(entry.getEntryId(), entry); } - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).bulkPublishEventAlpha1(envelopeBuilder.build(), it) @@ -298,7 +298,7 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef // gRPC to gRPC does not handle metadata in Dapr runtime proto. // gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342 - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).invokeService(envelope, it) ) @@ -345,7 +345,7 @@ public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) } DaprProtos.InvokeBindingRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).invokeBinding(envelope, it) ) @@ -392,7 +392,7 @@ public Mono> getState(GetStateRequest request, TypeRef type) { DaprProtos.GetStateRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).getState(envelope, it) @@ -441,7 +441,7 @@ public Mono>> getBulkState(GetBulkStateRequest request, TypeRe DaprProtos.GetBulkStateRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono(it -> intercept(context, asyncStub) .getBulkState(envelope, it) ) @@ -525,7 +525,7 @@ public Mono executeStateTransaction(ExecuteStateTransactionRequest request } DaprProtos.ExecuteStateTransactionRequest req = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono(it -> intercept(context, asyncStub).executeStateTransaction(req, it)) ).then(); } catch (Exception e) { @@ -551,7 +551,7 @@ public Mono saveBulkState(SaveStateRequest request) { } DaprProtos.SaveStateRequest req = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono(it -> intercept(context, asyncStub).saveState(req, it)) ).then(); } catch (Exception ex) { @@ -635,7 +635,7 @@ public Mono deleteState(DeleteStateRequest request) { DaprProtos.DeleteStateRequest req = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono(it -> intercept(context, asyncStub).deleteState(req, it)) ).then(); } catch (Exception ex) { @@ -713,7 +713,7 @@ public Mono> getSecret(GetSecretRequest request) { } DaprProtos.GetSecretRequest req = requestBuilder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono(it -> intercept(context, asyncStub).getSecret(req, it)) ).map(DaprProtos.GetSecretResponse::getDataMap); } @@ -738,7 +738,7 @@ public Mono>> getBulkSecret(GetBulkSecretRequest DaprProtos.GetBulkSecretRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).getBulkSecret(envelope, it) @@ -791,7 +791,7 @@ public Mono> queryState(QueryStateRequest request, Typ DaprProtos.QueryStateRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).queryStateAlpha1(envelope, it) ) @@ -855,7 +855,7 @@ public void close() throws Exception { */ @Override public Mono shutdown() { - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).shutdown(Empty.getDefaultInstance(), it)) ).then(); @@ -889,7 +889,7 @@ public Mono> getConfiguration(GetConfigurationReq } private Mono> getConfigurationAlpha1(DaprProtos.GetConfigurationRequest envelope) { - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).getConfigurationAlpha1(envelope, it) @@ -1034,7 +1034,7 @@ public void start(final Listener responseListener, final Metadata metadat * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) { + private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) { return GrpcWrapper.intercept(context, client); } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java index 2ec7bf780..286dbe8c1 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java @@ -177,7 +177,7 @@ public Mono publishEvent(PublishEventRequest request) { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic }; Map> queryArgs = metadataToQueryArgs(metadata); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client.invokeApi( DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, serializedEvent, headers, context ) @@ -237,7 +237,7 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef } else { headers.put(Metadata.CONTENT_TYPE, objectSerializer.getContentType()); } - Mono response = Mono.subscriberContext().flatMap( + Mono response = Mono.deferContextual( context -> this.client.invokeApi(httpMethod, pathSegments.toArray(new String[0]), httpExtension.getQueryParams(), serializedRequestBody, headers, context) ); @@ -309,7 +309,7 @@ public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "bindings", name }; - Mono response = Mono.subscriberContext().flatMap( + Mono response = Mono.deferContextual( context -> this.client.invokeApi( httpMethod, pathSegments, null, payload, null, context) ); @@ -349,7 +349,7 @@ public Mono>> getBulkState(GetBulkStateRequest request, TypeRe String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "bulk" }; Map> queryArgs = metadataToQueryArgs(metadata); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, requestBody, null, context) ).flatMap(s -> { @@ -394,7 +394,7 @@ public Mono> getState(GetStateRequest request, TypeRef type) { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryParams, null, context) ).flatMap(s -> { @@ -452,7 +452,7 @@ public Mono executeStateTransaction(ExecuteStateTransactionRequest request String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "transaction" }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client.invokeApi( DaprHttp.HttpMethods.POST.name(), pathSegments, null, serializedOperationBody, null, context ) @@ -500,7 +500,7 @@ public Mono saveBulkState(SaveStateRequest request) { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client.invokeApi( DaprHttp.HttpMethods.POST.name(), pathSegments, null, serializedStateBody, null, context) ).then(); @@ -543,7 +543,7 @@ public Mono deleteState(DeleteStateRequest request) { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client.invokeApi( DaprHttp.HttpMethods.DELETE.name(), pathSegments, queryParams, headers, context) ).then(); @@ -631,7 +631,7 @@ public Mono> getSecret(GetSecretRequest request) { Map> queryArgs = metadataToQueryArgs(metadata); String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, key }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryArgs, (String) null, null, context) ).flatMap(response -> { @@ -667,7 +667,7 @@ public Mono>> getBulkSecret(GetBulkSecretRequest Map> queryArgs = metadataToQueryArgs(metadata); String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, "bulk" }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryArgs, (String) null, null, context) ).flatMap(response -> { @@ -709,17 +709,17 @@ public Mono> queryState(QueryStateRequest request, Typ } else { throw new IllegalArgumentException("Both query and queryString fields are not set."); } - return Mono.subscriberContext().flatMap( - context -> this.client - .invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, - queryArgs, serializedRequest, null, context) - ).flatMap(response -> { - try { - return Mono.justOrEmpty(buildQueryStateResponse(response, type)); - } catch (Exception e) { - return DaprException.wrapMono(e); - } - }); + return Mono.deferContextual( + context -> this.client + .invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, + queryArgs, serializedRequest, null, context) + ).flatMap(response -> { + try { + return Mono.justOrEmpty(buildQueryStateResponse(response, type)); + } catch (Exception e) { + return DaprException.wrapMono(e); + } + }); } catch (Exception e) { return DaprException.wrapMono(e); } @@ -739,14 +739,14 @@ public void close() { @Override public Mono shutdown() { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "shutdown" }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> client.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, null, null, context)) .then(); } private QueryStateResponse buildQueryStateResponse(DaprHttp.Response response, - TypeRef type) throws IOException { + TypeRef type) throws IOException { JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody()); if (!root.has("results")) { return new QueryStateResponse<>(Collections.emptyList(), null); @@ -810,36 +810,36 @@ public Mono> getConfiguration(GetConfigurationReq queryParams.putAll(queryArgs); String[] pathSegments = new String[] {DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName }; - return Mono.subscriberContext().flatMap( - context -> this.client - .invokeApi( - DaprHttp.HttpMethods.GET.name(), - pathSegments, queryParams, - (String) null, null, context) + return Mono.deferContextual( + context -> this.client + .invokeApi( + DaprHttp.HttpMethods.GET.name(), + pathSegments, queryParams, + (String) null, null, context) ).map( - response -> { - try { - Map m = INTERNAL_SERIALIZER.deserialize(response.getBody(), Map.class); - Set set = m.keySet(); - JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody()); - Iterator itr = set.iterator(); - Map result = new HashMap<>(); - while (itr.hasNext()) { - String key = itr.next(); - String value = root.get(key).path("value").asText(); - String version = root.get(key).path("version").asText(); - result.put(key, new ConfigurationItem( - key, - value, - version, - new HashMap<>() - )); - } - return Collections.unmodifiableMap(result); - } catch (IOException e) { - throw new RuntimeException(e); - } + response -> { + try { + Map m = INTERNAL_SERIALIZER.deserialize(response.getBody(), Map.class); + Set set = m.keySet(); + JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody()); + Iterator itr = set.iterator(); + Map result = new HashMap<>(); + while (itr.hasNext()) { + String key = itr.next(); + String value = root.get(key).path("value").asText(); + String version = root.get(key).path("version").asText(); + result.put(key, new ConfigurationItem( + key, + value, + version, + new HashMap<>() + )); } + return Collections.unmodifiableMap(result); + } catch (IOException e) { + throw new RuntimeException(e); + } + } ); } catch (Exception ex) { return DaprException.wrapMono(ex); @@ -871,12 +871,12 @@ public Flux subscribeConfiguration(SubscribeConf String[] pathSegments = new String[] { DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName, "subscribe" }; - SubscribeConfigurationResponse res = Mono.subscriberContext().flatMap( - context -> this.client.invokeApi( - DaprHttp.HttpMethods.GET.name(), - pathSegments, queryParams, - (String) null, null, context - ) + SubscribeConfigurationResponse res = Mono.deferContextual( + context -> this.client.invokeApi( + DaprHttp.HttpMethods.GET.name(), + pathSegments, queryParams, + (String) null, null, context + ) ).map(response -> { try { JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody()); @@ -913,7 +913,7 @@ public Mono unsubscribeConfiguration(Unsubscri String[] pathSegments = new String[] { DaprHttp.ALPHA_1_API_VERSION, "configuration", configStoreName, id, "unsubscribe" }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi( DaprHttp.HttpMethods.GET.name(), diff --git a/sdk/src/main/java/io/dapr/client/DaprHttp.java b/sdk/src/main/java/io/dapr/client/DaprHttp.java index 66b9c9a06..830bfad61 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttp.java @@ -29,7 +29,7 @@ import okhttp3.ResponseBody; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; -import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -183,7 +183,7 @@ public Mono invokeApi( String[] pathSegments, Map> urlParameters, Map headers, - Context context) { + ContextView context) { return this.invokeApi(method, pathSegments, urlParameters, (byte[]) null, headers, context); } @@ -204,7 +204,7 @@ public Mono invokeApi( Map> urlParameters, String content, Map headers, - Context context) { + ContextView context) { return this.invokeApi( method, pathSegments, urlParameters, content == null @@ -224,12 +224,12 @@ public Mono invokeApi( * @return Asynchronous response */ public Mono invokeApi( - String method, - String[] pathSegments, - Map> urlParameters, - byte[] content, - Map headers, - Context context) { + String method, + String[] pathSegments, + Map> urlParameters, + byte[] content, + Map headers, + ContextView context) { // fromCallable() is needed so the invocation does not happen early, causing a hot mono. return Mono.fromCallable(() -> doInvokeApi(method, pathSegments, urlParameters, content, headers, context)) .flatMap(f -> Mono.fromFuture(f)); @@ -256,10 +256,10 @@ public void close() { * @return CompletableFuture for Response. */ private CompletableFuture doInvokeApi(String method, - String[] pathSegments, - Map> urlParameters, - byte[] content, Map headers, - Context context) { + String[] pathSegments, + Map> urlParameters, + byte[] content, Map headers, + ContextView context) { final String requestId = UUID.randomUUID().toString(); RequestBody body; @@ -282,8 +282,8 @@ private CompletableFuture doInvokeApi(String method, Optional.ofNullable(urlParameters).orElse(Collections.emptyMap()).entrySet().stream() .forEach(urlParameter -> Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream() - .forEach(urlParameterValue -> - urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue))); + .forEach(urlParameterValue -> + urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue))); Request.Builder requestBuilder = new Request.Builder() .url(urlBuilder.build()) @@ -305,7 +305,6 @@ private CompletableFuture doInvokeApi(String method, if (daprApiToken != null) { requestBuilder.addHeader(Headers.DAPR_API_TOKEN, daprApiToken); } - requestBuilder.addHeader(Headers.DAPR_USER_AGENT, Version.getSdkVersion()); if (headers != null) { diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java b/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java index b4940d853..61db83627 100644 --- a/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java +++ b/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java @@ -23,9 +23,9 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.util.Map; -import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -61,7 +61,7 @@ private GrpcWrapper() { * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - public static DaprGrpc.DaprStub intercept(final Context context, DaprGrpc.DaprStub client) { + public static DaprGrpc.DaprStub intercept(final ContextView context, DaprGrpc.DaprStub client) { ClientInterceptor interceptor = new ClientInterceptor() { @Override public ClientCall interceptCall( diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java index e323d9b57..2b747a3bf 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import reactor.util.context.ContextView; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -189,7 +190,7 @@ public void invokeServiceVoidWithTracingTest() { .setBody("request") .setHttpExtension(HttpExtension.NONE); Mono result = this.client.invokeMethod(req, TypeRef.get(Void.class)) - .subscriberContext(it -> it.putAll(contextCopy == null ? Context.empty() : contextCopy)); + .contextWrite(it -> it.putAll(contextCopy == null ? (ContextView) Context.empty() : contextCopy)); result.block(); } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index d1831fb23..d5df3de38 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -53,6 +53,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import reactor.util.context.ContextView; import static io.dapr.utils.TestUtils.assertThrowsDaprException; import static io.dapr.utils.TestUtils.findFreePort; @@ -422,7 +423,7 @@ public void invokeServiceWithContext() { .setBody("request") .setHttpExtension(HttpExtension.POST); Mono result = daprClientHttp.invokeMethod(req, TypeRef.get(Void.class)) - .subscriberContext(it -> it.putAll(context)); + .contextWrite(it -> it.putAll((ContextView) context)); result.block(); } diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpStub.java b/sdk/src/test/java/io/dapr/client/DaprHttpStub.java index d83f517c5..f6a52b8c4 100644 --- a/sdk/src/test/java/io/dapr/client/DaprHttpStub.java +++ b/sdk/src/test/java/io/dapr/client/DaprHttpStub.java @@ -14,7 +14,7 @@ package io.dapr.client; import reactor.core.publisher.Mono; -import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.util.List; import java.util.Map; @@ -45,7 +45,7 @@ public Mono invokeApi(String method, String[] pathSegments, Map> urlParameters, Map headers, - Context context) { + ContextView context) { return Mono.empty(); } @@ -58,7 +58,7 @@ public Mono invokeApi(String method, Map> urlParameters, String content, Map headers, - Context context) { + ContextView context) { return Mono.empty(); } @@ -71,7 +71,7 @@ public Mono invokeApi(String method, Map> urlParameters, byte[] content, Map headers, - Context context) { + ContextView context) { return Mono.empty(); }