Skip to content

Commit

Permalink
Bump from reactor 2.3.5.RELEASE to 2.7.8 (#830)
Browse files Browse the repository at this point in the history
* Bump from reactor 2.3.5.RELEASE to 2.7.8

Signed-off-by: Sergio <[email protected]>

* Simplification

Signed-off-by: Sergio <[email protected]>

---------

Signed-off-by: Sergio <[email protected]>
  • Loading branch information
champel authored Mar 22, 2023
1 parent 7d78d18 commit 7649ae4
Show file tree
Hide file tree
Showing 19 changed files with 114 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static OpenTelemetry createOpenTelemetry() {
* Converts current OpenTelemetry's context into Reactor's context.
* @return Reactor's context.
*/
public static reactor.util.context.Context getReactorContext() {
public static reactor.util.context.ContextView getReactorContext() {
return getReactorContext(Context.current());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static void main(String[] args) throws Exception {
System.out.println("Going to publish message : " + message);
}
BulkPublishResponse<?> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages)
.subscriberContext(getReactorContext()).block();
.contextWrite(getReactorContext()).block();
System.out.println("Published the set of messages in a single call to Dapr");
if (res != null) {
if (res.getFailedEntries().size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception {
client.publishEvent(
PUBSUB_NAME,
TOPIC_NAME,
message).subscriberContext(getReactorContext()).block();
message).contextWrite(getReactorContext()).block();
System.out.println("Published message: " + message);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception {
InvokeMethodRequest sleepRequest = new InvokeMethodRequest(SERVICE_APP_ID, "proxy_sleep")
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(sleepRequest, TypeRef.get(Void.class));
}).subscriberContext(getReactorContext()).block();
}).contextWrite(getReactorContext()).block();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Mono<byte[]> echo(
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "echo")
.setBody(body)
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context));
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context));
}

/**
Expand All @@ -71,7 +71,7 @@ public Mono<byte[]> echo(
public Mono<Void> sleep(@RequestAttribute(name = "opentelemetry-context") Context context) {
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "sleep")
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context)).then();
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context)).then();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.grpc.stub.StreamObserver;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
Expand Down Expand Up @@ -65,7 +65,7 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
.setMethod(methodName)
.setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
.build();
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.InvokeActorResponse>createMono(
it -> intercept(context, client).invokeActor(req, it)
)
Expand Down Expand Up @@ -109,7 +109,7 @@ public void start(final Listener<RespT> responseListener, final Metadata metadat
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) {
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
return GrpcWrapper.intercept(context, client);
}

Expand Down
2 changes: 1 addition & 1 deletion sdk-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.11.RELEASE</version>
<version>3.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testInvokeTimeout() throws Exception {
.block(Duration.ofMillis(10))).getMessage();
long delay = System.currentTimeMillis() - started;
assertTrue(delay <= 500); // 500 ms is a reasonable delay if the request timed out.
assertEquals("Timeout on blocking read for 10 MILLISECONDS", message);
assertEquals("Timeout on blocking read for 10000000 NANOSECONDS", message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testInvokeTimeout() throws Exception {
}).getMessage();
long delay = System.currentTimeMillis() - started;
assertTrue(delay <= 200); // 200 ms is a reasonable delay if the request timed out.
assertEquals("Timeout on blocking read for 10 MILLISECONDS", message);
assertEquals("Timeout on blocking read for 10000000 NANOSECONDS", message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void testInvoke() throws Exception {
try (Scope scope = span.makeCurrent()) {
SleepRequest req = SleepRequest.newBuilder().setSeconds(1).build();
client.invokeMethod(daprRun.getAppName(), "sleepOverGRPC", req.toByteArray(), HttpExtension.POST)
.subscriberContext(getReactorContext())
.contextWrite(getReactorContext())
.block();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testInvoke() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
try (Scope scope = span.makeCurrent()) {
client.invokeMethod(daprRun.getAppName(), "sleep", 1, HttpExtension.POST)
.subscriberContext(getReactorContext())
.contextWrite(getReactorContext())
.block();
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.11.RELEASE</version>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
Expand Down
32 changes: 16 additions & 16 deletions sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -181,7 +181,7 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
envelopeBuilder.putAllMetadata(metadata);
}

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<Empty>createMono(
it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
Expand Down Expand Up @@ -254,7 +254,7 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
for (BulkPublishEntry<T> entry: request.getEntries()) {
entryMap.put(entry.getEntryId(), entry);
}
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.BulkPublishResponse>createMono(
it -> intercept(context, asyncStub).bulkPublishEventAlpha1(envelopeBuilder.build(), it)
Expand Down Expand Up @@ -298,7 +298,7 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
// gRPC to gRPC does not handle metadata in Dapr runtime proto.
// gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<CommonProtos.InvokeResponse>createMono(
it -> intercept(context, asyncStub).invokeService(envelope, it)
)
Expand Down Expand Up @@ -345,7 +345,7 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
}
DaprProtos.InvokeBindingRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
)
Expand Down Expand Up @@ -392,7 +392,7 @@ public <T> Mono<State<T>> getState(GetStateRequest request, TypeRef<T> type) {

DaprProtos.GetStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.GetStateResponse>createMono(
it -> intercept(context, asyncStub).getState(envelope, it)
Expand Down Expand Up @@ -441,7 +441,7 @@ public <T> Mono<List<State<T>>> getBulkState(GetBulkStateRequest request, TypeRe

DaprProtos.GetBulkStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(context, asyncStub)
.getBulkState(envelope, it)
)
Expand Down Expand Up @@ -525,7 +525,7 @@ public Mono<Void> executeStateTransaction(ExecuteStateTransactionRequest request
}
DaprProtos.ExecuteStateTransactionRequest req = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).executeStateTransaction(req, it))
).then();
} catch (Exception e) {
Expand All @@ -551,7 +551,7 @@ public Mono<Void> saveBulkState(SaveStateRequest request) {
}
DaprProtos.SaveStateRequest req = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).saveState(req, it))
).then();
} catch (Exception ex) {
Expand Down Expand Up @@ -635,7 +635,7 @@ public Mono<Void> deleteState(DeleteStateRequest request) {

DaprProtos.DeleteStateRequest req = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).deleteState(req, it))
).then();
} catch (Exception ex) {
Expand Down Expand Up @@ -713,7 +713,7 @@ public Mono<Map<String, String>> getSecret(GetSecretRequest request) {
}
DaprProtos.GetSecretRequest req = requestBuilder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.GetSecretResponse>createMono(it -> intercept(context, asyncStub).getSecret(req, it))
).map(DaprProtos.GetSecretResponse::getDataMap);
}
Expand All @@ -738,7 +738,7 @@ public Mono<Map<String, Map<String, String>>> getBulkSecret(GetBulkSecretRequest

DaprProtos.GetBulkSecretRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.GetBulkSecretResponse>createMono(
it -> intercept(context, asyncStub).getBulkSecret(envelope, it)
Expand Down Expand Up @@ -791,7 +791,7 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Typ

DaprProtos.QueryStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.QueryStateResponse>createMono(
it -> intercept(context, asyncStub).queryStateAlpha1(envelope, it)
)
Expand Down Expand Up @@ -855,7 +855,7 @@ public void close() throws Exception {
*/
@Override
public Mono<Void> shutdown() {
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<Empty>createMono(
it -> intercept(context, asyncStub).shutdown(Empty.getDefaultInstance(), it))
).then();
Expand Down Expand Up @@ -889,7 +889,7 @@ public Mono<Map<String, ConfigurationItem>> getConfiguration(GetConfigurationReq
}

private Mono<Map<String, ConfigurationItem>> getConfigurationAlpha1(DaprProtos.GetConfigurationRequest envelope) {
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.GetConfigurationResponse>createMono(
it -> intercept(context, asyncStub).getConfigurationAlpha1(envelope, it)
Expand Down Expand Up @@ -1034,7 +1034,7 @@ public void start(final Listener<RespT> responseListener, final Metadata metadat
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) {
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
return GrpcWrapper.intercept(context, client);
}

Expand Down
Loading

0 comments on commit 7649ae4

Please sign in to comment.