Skip to content

Commit

Permalink
Merge pull request #38034 from cescoffier/vertx-4.5.1
Browse files Browse the repository at this point in the history
Update to Vertx 4.5.1
  • Loading branch information
gsmet authored Jan 5, 2024
2 parents e23f243 + e189b59 commit 76db2af
Show file tree
Hide file tree
Showing 21 changed files with 181 additions and 83 deletions.
12 changes: 6 additions & 6 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<opentelemetry.version>1.32.0</opentelemetry.version>
<opentelemetry-alpha.version>1.32.0-alpha</opentelemetry-alpha.version>
<opentelemetry-semconv.version>1.21.0-alpha</opentelemetry-semconv.version> <!-- keep in sync with opentelemetry-java-instrumentation in the alpha bom-->
<quarkus-http.version>5.0.3.Final</quarkus-http.version>
<quarkus-http.version>5.1.0.Final</quarkus-http.version>
<micrometer.version>1.11.5</micrometer.version><!-- keep in sync with hdrhistogram -->
<hdrhistogram.version>2.1.12</hdrhistogram.version><!-- keep in sync with micrometer -->
<google-auth.version>0.22.0</google-auth.version>
Expand All @@ -61,9 +61,9 @@
<smallrye-context-propagation.version>2.1.0</smallrye-context-propagation.version>
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>3.0.1</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.7.2</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.13.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.4.0</smallrye-stork.version>
<smallrye-mutiny-vertx-binding.version>3.8.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.14.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.5.0</smallrye-stork.version>
<jakarta.activation.version>2.1.2</jakarta.activation.version>
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
<jakarta.authentication-api>3.0.0</jakarta.authentication-api>
Expand Down Expand Up @@ -121,7 +121,7 @@
<wildfly-client-config.version>1.0.1.Final</wildfly-client-config.version>
<wildfly-elytron.version>2.2.2.Final</wildfly-elytron.version>
<jboss-threads.version>3.5.1.Final</jboss-threads.version>
<vertx.version>4.4.6</vertx.version>
<vertx.version>4.5.1</vertx.version>
<httpclient.version>4.5.14</httpclient.version>
<httpcore.version>4.4.16</httpcore.version>
<httpasync.version>4.1.5</httpasync.version>
Expand All @@ -144,7 +144,7 @@
<infinispan.version>14.0.21.Final</infinispan.version>
<infinispan.protostream.version>4.6.5.Final</infinispan.protostream.version>
<caffeine.version>3.1.5</caffeine.version>
<netty.version>4.1.100.Final</netty.version>
<netty.version>4.1.103.Final</netty.version>
<brotli4j.version>1.12.0</brotli4j.version>
<reactive-streams.version>1.0.4</reactive-streams.version>
<jboss-logging.version>3.5.3.Final</jboss-logging.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.quarkus.funqy.runtime.bindings.http.FunqyHttpBindingRecorder;
import io.quarkus.jackson.runtime.ObjectMapperProducer;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
import io.quarkus.vertx.http.deployment.RequireBodyHandlerBuildItem;
import io.quarkus.vertx.http.deployment.RouteBuildItem;
import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig;
import io.vertx.core.Handler;
Expand All @@ -40,6 +41,15 @@ public void markObjectMapper(BuildProducer<UnremovableBeanBuildItem> unremovable
new UnremovableBeanBuildItem.BeanClassNameExclusion(ObjectMapperProducer.class.getName())));
}

@BuildStep
public RequireBodyHandlerBuildItem requestBodyHandler(List<FunctionBuildItem> functions) {
if (functions.isEmpty()) {
return null;
}
// Require the body handler if there are functions as they may require the HTTP body
return new RequireBodyHandlerBuildItem();
}

@BuildStep()
@Record(STATIC_INIT)
public void staticInit(FunqyHttpBindingRecorder binding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,25 @@ public void handle(RoutingContext routingContext) {
dispatch(routingContext, invoker, finalInput);
});
} else if (routingContext.request().method() == HttpMethod.POST) {
routingContext.request().bodyHandler(buff -> {
Object input = null;
if (buff.length() > 0) {
ByteBufInputStream in = new ByteBufInputStream(buff.getByteBuf());
ObjectReader reader = (ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName());
try {
input = reader.readValue((InputStream) in);
} catch (Exception e) {
log.error("Failed to unmarshal input", e);
routingContext.fail(400);
return;
}
var buff = routingContext.getBody();
Object input = null;
if (buff != null && buff.length() > 0) {
ByteBufInputStream in = new ByteBufInputStream(buff.getByteBuf());
ObjectReader reader = (ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName());
try {
input = reader.readValue((InputStream) in);
} catch (Exception e) {
log.error("Failed to unmarshal input", e);
routingContext.fail(400);
return;
}
}
Object finalInput = input;
executor.execute(new Runnable() {
@Override
public void run() {
VertxRequestHandler.this.dispatch(routingContext, invoker, finalInput);
}
Object finalInput = input;
executor.execute(() -> {
dispatch(routingContext, invoker, finalInput);
});
});
} else {
routingContext.fail(405);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.quarkus.funqy.runtime.bindings.knative.events.KnativeEventsBindingRecorder;
import io.quarkus.jackson.runtime.ObjectMapperProducer;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
import io.quarkus.vertx.http.deployment.RequireBodyHandlerBuildItem;
import io.quarkus.vertx.http.deployment.RouteBuildItem;
import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig;
import io.vertx.core.Handler;
Expand All @@ -42,6 +43,14 @@ public void markObjectMapper(BuildProducer<UnremovableBeanBuildItem> unremovable
new UnremovableBeanBuildItem.BeanClassNameExclusion(ObjectMapperProducer.class.getName())));
}

@BuildStep
public RequireBodyHandlerBuildItem requireBodyHandler(List<FunctionBuildItem> functions) {
if (!functions.isEmpty()) {
return new RequireBodyHandlerBuildItem();
}
return null;
}

@BuildStep()
@Record(STATIC_INIT)
public void staticInit(KnativeEventsBindingRecorder binding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ private void processCloudEvent(RoutingContext routingContext) {
final HttpServerResponse httpResponse = routingContext.response();
final boolean binaryCE = httpRequest.headers().contains("ce-id");

httpRequest.bodyHandler(bodyBuff -> executor.execute(() -> {
final Buffer bodyBuff = routingContext.body().buffer();
executor.execute(() -> {
try {
final String ceType;
final String ceSpecVersion;
Expand Down Expand Up @@ -409,7 +410,7 @@ private void processCloudEvent(RoutingContext routingContext) {
} catch (Throwable t) {
routingContext.fail(t);
}
}));
});

}

Expand Down Expand Up @@ -481,26 +482,25 @@ private void processHttpRequest(CloudEvent event, RoutingContext routingContext,
routingContext.fail(500, t);
}
} else if (routingContext.request().method() == HttpMethod.POST) {
routingContext.request().bodyHandler(buff -> {
try {
Object input = null;
if (buff.length() > 0) {
ByteBufInputStream in = new ByteBufInputStream(buff.getByteBuf());
ObjectReader reader = (ObjectReader) invoker.getBindingContext().get(DATA_OBJECT_READER);
try {
input = reader.readValue((InputStream) in);
} catch (JsonProcessingException e) {
log.error("Failed to unmarshal input", e);
routingContext.fail(400);
return;
}
Buffer buff = routingContext.body().buffer();
try {
Object input = null;
if (buff.length() > 0) {
ByteBufInputStream in = new ByteBufInputStream(buff.getByteBuf());
ObjectReader reader = (ObjectReader) invoker.getBindingContext().get(DATA_OBJECT_READER);
try {
input = reader.readValue((InputStream) in);
} catch (JsonProcessingException e) {
log.error("Failed to unmarshal input", e);
routingContext.fail(400);
return;
}
execute(event, routingContext, invoker, input);
} catch (Throwable t) {
log.error(t);
routingContext.fail(500, t);
}
});
execute(event, routingContext, invoker, input);
} catch (Throwable t) {
log.error(t);
routingContext.fail(500, t);
}
} else {
routingContext.fail(405);
log.error("Must be POST or GET for: " + invoker.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.WorkerContext;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

Expand Down Expand Up @@ -73,7 +71,7 @@ public String invokeFromIoThread(String s) {
service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext()).isNotNull().isEqualTo(context))
.invoke(() -> assertThat(Vertx.currentContext().getDelegate()).isInstanceOf(EventLoopContext.class))
.invoke(() -> assertThat(Vertx.currentContext().getDelegate()).isInstanceOf(ContextInternal.class))
.subscribe().with(e::complete, e::fail);
});
}).await().atMost(Duration.ofSeconds(5));
Expand All @@ -87,7 +85,6 @@ public String invokeFromDuplicatedContext(String s) {
service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext().getDelegate())
.isNotInstanceOf(EventLoopContext.class).isNotInstanceOf(WorkerContext.class)
.isEqualTo(duplicate))
.subscribe().with(e::complete, e::fail);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.WorkerContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

Expand Down Expand Up @@ -79,7 +78,7 @@ public String invokeFromIoThread(String s) {
service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext()).isNotNull().isEqualTo(context))
.invoke(() -> assertThat(Vertx.currentContext().getDelegate()).isInstanceOf(EventLoopContext.class))
.invoke(() -> assertThat(Vertx.currentContext().getDelegate()).isInstanceOf(ContextInternal.class))
.map(r -> r + " " + Thread.currentThread().getName())
.subscribe().with(e::complete, e::fail);
});
Expand All @@ -94,7 +93,6 @@ public String invokeFromDuplicatedContext(String s) {
service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext().getDelegate())
.isNotInstanceOf(EventLoopContext.class).isNotInstanceOf(WorkerContext.class)
.isEqualTo(duplicate))
.map(r -> r + " " + Thread.currentThread().getName())
.subscribe().with(e::complete, e::fail);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.ShutdownContext;
import io.vertx.core.Vertx;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;

public class InProcessGrpcServerBuilderProvider implements GrpcBuilderProvider<InProcessServerBuilder> {
Expand All @@ -35,7 +35,7 @@ public ServerBuilder<InProcessServerBuilder> createServerBuilder(Vertx vertx, Gr
// wrap with Vert.x context, so that the context interceptors work
VertxInternal vxi = (VertxInternal) vertx;
Executor delegate = vertx.nettyEventLoopGroup();
EventLoopContext context = vxi.createEventLoopContext();
ContextInternal context = vxi.createEventLoopContext();
Executor executor = command -> delegate.execute(() -> context.dispatch(command));
builder.executor(executor);
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;

import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition;
Expand Down Expand Up @@ -90,8 +91,14 @@ private static void forceSet(Object object, String fieldName, Object value)

public static void shutdown() {
if (server != null) {
server.shutdown();
server = null;
try {
server.shutdown();
} catch (RejectedExecutionException ignored) {
// Ignore this, it means the application is already shutting down
} finally {
server = null;
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.ShutdownContext;
import io.vertx.core.Vertx;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;

public class XdsGrpcServerBuilderProvider implements GrpcBuilderProvider<XdsServerBuilder> {
Expand All @@ -54,7 +54,7 @@ public ServerBuilder<XdsServerBuilder> createServerBuilder(Vertx vertx, GrpcServ
// wrap with Vert.x context, so that the context interceptors work
VertxInternal vxi = (VertxInternal) vertx;
Executor delegate = vertx.nettyEventLoopGroup();
EventLoopContext context = vxi.createEventLoopContext();
ContextInternal context = vxi.createEventLoopContext();
Executor executor = command -> delegate.execute(() -> context.dispatch(command));
builder.executor(executor);
// custom XDS interceptors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,11 @@ public interface RedisClient {

Response pfcount(List<String> args);

@Deprecated
Response pfdebug(List<String> args);

Response pfdebug(String command, String key);

Response pfmerge(List<String> args);

Response pfselftest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,16 @@ static ReactiveRedisClient createClient(String name) {

Response pfcountAndAwait(List<String> args);

@Deprecated
Uni<Response> pfdebug(List<String> args);

@Deprecated
Response pfdebugAndAwait(List<String> args);

Uni<Response> pfdebug(String command, String key);

Response pfdebugAndAwait(String command, String key);

Uni<Response> pfmerge(List<String> args);

Response pfmergeAndAwait(List<String> args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -986,12 +986,22 @@ public Response pfcountAndAwait(List<String> args) {

@Override
public Uni<Response> pfdebug(List<String> args) {
return redisAPI.pfdebug(args);
return redisAPI.pfdebug(args.get(0), args.get(1));
}

@Override
public Response pfdebugAndAwait(List<String> args) {
return redisAPI.pfdebugAndAwait(args);
return redisAPI.pfdebugAndAwait(args.get(0), args.get(1));
}

@Override
public Uni<Response> pfdebug(String command, String key) {
return redisAPI.pfdebug(command, key);
}

@Override
public Response pfdebugAndAwait(String command, String key) {
return redisAPI.pfdebugAndAwait(command, key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,12 @@ public Response pfcount(List<String> args) {

@Override
public Response pfdebug(List<String> args) {
return await(redisAPI.pfdebug(args));
return await(redisAPI.pfdebug(args.get(0), args.get(1)));
}

@Override
public Response pfdebug(String command, String key) {
return await(redisAPI.pfdebug(command, key));
}

@Override
Expand Down
Loading

0 comments on commit 76db2af

Please sign in to comment.