Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Vertx 4.5.1 #38034

Merged
merged 13 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<opentelemetry.version>1.32.0</opentelemetry.version>
<opentelemetry-alpha.version>1.32.0-alpha</opentelemetry-alpha.version>
<opentelemetry-semconv.version>1.21.0-alpha</opentelemetry-semconv.version> <!-- keep in sync with opentelemetry-java-instrumentation in the alpha bom-->
<quarkus-http.version>5.0.3.Final</quarkus-http.version>
<quarkus-http.version>5.1.0.Final</quarkus-http.version>
<micrometer.version>1.11.5</micrometer.version><!-- keep in sync with hdrhistogram -->
<hdrhistogram.version>2.1.12</hdrhistogram.version><!-- keep in sync with micrometer -->
<google-auth.version>0.22.0</google-auth.version>
Expand All @@ -61,9 +61,9 @@
<smallrye-context-propagation.version>2.1.0</smallrye-context-propagation.version>
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>3.0.1</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.7.2</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.13.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.4.0</smallrye-stork.version>
<smallrye-mutiny-vertx-binding.version>3.8.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.14.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.5.0</smallrye-stork.version>
<jakarta.activation.version>2.1.2</jakarta.activation.version>
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
<jakarta.authentication-api>3.0.0</jakarta.authentication-api>
Expand Down Expand Up @@ -121,7 +121,7 @@
<wildfly-client-config.version>1.0.1.Final</wildfly-client-config.version>
<wildfly-elytron.version>2.2.2.Final</wildfly-elytron.version>
<jboss-threads.version>3.5.1.Final</jboss-threads.version>
<vertx.version>4.4.6</vertx.version>
<vertx.version>4.5.1</vertx.version>
<httpclient.version>4.5.14</httpclient.version>
<httpcore.version>4.4.16</httpcore.version>
<httpasync.version>4.1.5</httpasync.version>
Expand All @@ -144,7 +144,7 @@
<infinispan.version>14.0.21.Final</infinispan.version>
<infinispan.protostream.version>4.6.5.Final</infinispan.protostream.version>
<caffeine.version>3.1.5</caffeine.version>
<netty.version>4.1.100.Final</netty.version>
<netty.version>4.1.103.Final</netty.version>
<brotli4j.version>1.12.0</brotli4j.version>
<reactive-streams.version>1.0.4</reactive-streams.version>
<jboss-logging.version>3.5.3.Final</jboss-logging.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.quarkus.funqy.runtime.bindings.http.FunqyHttpBindingRecorder;
import io.quarkus.jackson.runtime.ObjectMapperProducer;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
import io.quarkus.vertx.http.deployment.RequireBodyHandlerBuildItem;
import io.quarkus.vertx.http.deployment.RouteBuildItem;
import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig;
import io.vertx.core.Handler;
Expand All @@ -40,6 +41,15 @@ public void markObjectMapper(BuildProducer<UnremovableBeanBuildItem> unremovable
new UnremovableBeanBuildItem.BeanClassNameExclusion(ObjectMapperProducer.class.getName())));
}

@BuildStep
public RequireBodyHandlerBuildItem requestBodyHandler(List<FunctionBuildItem> functions) {
if (functions.isEmpty()) {
return null;
}
// Require the body handler if there are functions as they may require the HTTP body
return new RequireBodyHandlerBuildItem();
}

@BuildStep()
@Record(STATIC_INIT)
public void staticInit(FunqyHttpBindingRecorder binding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,25 @@ public void handle(RoutingContext routingContext) {
dispatch(routingContext, invoker, finalInput);
});
} else if (routingContext.request().method() == HttpMethod.POST) {
routingContext.request().bodyHandler(buff -> {
Object input = null;
if (buff.length() > 0) {
ByteBufInputStream in = new ByteBufInputStream(buff.getByteBuf());
ObjectReader reader = (ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName());
try {
input = reader.readValue((InputStream) in);
} catch (Exception e) {
log.error("Failed to unmarshal input", e);
routingContext.fail(400);
return;
}
var buff = routingContext.getBody();
Object input = null;
if (buff != null && buff.length() > 0) {
ByteBufInputStream in = new ByteBufInputStream(buff.getByteBuf());
ObjectReader reader = (ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName());
try {
input = reader.readValue((InputStream) in);
} catch (Exception e) {
log.error("Failed to unmarshal input", e);
routingContext.fail(400);
return;
}
}
Object finalInput = input;
executor.execute(new Runnable() {
@Override
public void run() {
VertxRequestHandler.this.dispatch(routingContext, invoker, finalInput);
}
Object finalInput = input;
executor.execute(() -> {
dispatch(routingContext, invoker, finalInput);
});
});
} else {
routingContext.fail(405);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.quarkus.funqy.runtime.bindings.knative.events.KnativeEventsBindingRecorder;
import io.quarkus.jackson.runtime.ObjectMapperProducer;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
import io.quarkus.vertx.http.deployment.RequireBodyHandlerBuildItem;
import io.quarkus.vertx.http.deployment.RouteBuildItem;
import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig;
import io.vertx.core.Handler;
Expand All @@ -42,6 +43,14 @@ public void markObjectMapper(BuildProducer<UnremovableBeanBuildItem> unremovable
new UnremovableBeanBuildItem.BeanClassNameExclusion(ObjectMapperProducer.class.getName())));
}

@BuildStep
public RequireBodyHandlerBuildItem requireBodyHandler(List<FunctionBuildItem> functions) {
if (!functions.isEmpty()) {
return new RequireBodyHandlerBuildItem();
}
return null;
}

@BuildStep()
@Record(STATIC_INIT)
public void staticInit(KnativeEventsBindingRecorder binding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ private void processCloudEvent(RoutingContext routingContext) {
final HttpServerResponse httpResponse = routingContext.response();
final boolean binaryCE = httpRequest.headers().contains("ce-id");

httpRequest.bodyHandler(bodyBuff -> executor.execute(() -> {
final Buffer bodyBuff = routingContext.body().buffer();
executor.execute(() -> {
try {
final String ceType;
final String ceSpecVersion;
Expand Down Expand Up @@ -409,7 +410,7 @@ private void processCloudEvent(RoutingContext routingContext) {
} catch (Throwable t) {
routingContext.fail(t);
}
}));
});

}

Expand Down Expand Up @@ -481,26 +482,25 @@ private void processHttpRequest(CloudEvent event, RoutingContext routingContext,
routingContext.fail(500, t);
}
} else if (routingContext.request().method() == HttpMethod.POST) {
routingContext.request().bodyHandler(buff -> {
try {
Object input = null;
if (buff.length() > 0) {
ByteBufInputStream in = new ByteBufInputStream(buff.getByteBuf());
ObjectReader reader = (ObjectReader) invoker.getBindingContext().get(DATA_OBJECT_READER);
try {
input = reader.readValue((InputStream) in);
} catch (JsonProcessingException e) {
log.error("Failed to unmarshal input", e);
routingContext.fail(400);
return;
}
Buffer buff = routingContext.body().buffer();
try {
Object input = null;
if (buff.length() > 0) {
ByteBufInputStream in = new ByteBufInputStream(buff.getByteBuf());
ObjectReader reader = (ObjectReader) invoker.getBindingContext().get(DATA_OBJECT_READER);
try {
input = reader.readValue((InputStream) in);
} catch (JsonProcessingException e) {
log.error("Failed to unmarshal input", e);
routingContext.fail(400);
return;
}
execute(event, routingContext, invoker, input);
} catch (Throwable t) {
log.error(t);
routingContext.fail(500, t);
}
});
execute(event, routingContext, invoker, input);
} catch (Throwable t) {
log.error(t);
routingContext.fail(500, t);
}
} else {
routingContext.fail(405);
log.error("Must be POST or GET for: " + invoker.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.WorkerContext;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

Expand Down Expand Up @@ -73,7 +71,7 @@ public String invokeFromIoThread(String s) {
service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext()).isNotNull().isEqualTo(context))
.invoke(() -> assertThat(Vertx.currentContext().getDelegate()).isInstanceOf(EventLoopContext.class))
.invoke(() -> assertThat(Vertx.currentContext().getDelegate()).isInstanceOf(ContextInternal.class))
.subscribe().with(e::complete, e::fail);
});
}).await().atMost(Duration.ofSeconds(5));
Expand All @@ -87,7 +85,6 @@ public String invokeFromDuplicatedContext(String s) {
service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext().getDelegate())
.isNotInstanceOf(EventLoopContext.class).isNotInstanceOf(WorkerContext.class)
.isEqualTo(duplicate))
.subscribe().with(e::complete, e::fail);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.WorkerContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

Expand Down Expand Up @@ -79,7 +78,7 @@ public String invokeFromIoThread(String s) {
service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext()).isNotNull().isEqualTo(context))
.invoke(() -> assertThat(Vertx.currentContext().getDelegate()).isInstanceOf(EventLoopContext.class))
.invoke(() -> assertThat(Vertx.currentContext().getDelegate()).isInstanceOf(ContextInternal.class))
.map(r -> r + " " + Thread.currentThread().getName())
.subscribe().with(e::complete, e::fail);
});
Expand All @@ -94,7 +93,6 @@ public String invokeFromDuplicatedContext(String s) {
service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext().getDelegate())
.isNotInstanceOf(EventLoopContext.class).isNotInstanceOf(WorkerContext.class)
.isEqualTo(duplicate))
.map(r -> r + " " + Thread.currentThread().getName())
.subscribe().with(e::complete, e::fail);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.ShutdownContext;
import io.vertx.core.Vertx;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;

public class InProcessGrpcServerBuilderProvider implements GrpcBuilderProvider<InProcessServerBuilder> {
Expand All @@ -35,7 +35,7 @@ public ServerBuilder<InProcessServerBuilder> createServerBuilder(Vertx vertx, Gr
// wrap with Vert.x context, so that the context interceptors work
VertxInternal vxi = (VertxInternal) vertx;
Executor delegate = vertx.nettyEventLoopGroup();
EventLoopContext context = vxi.createEventLoopContext();
ContextInternal context = vxi.createEventLoopContext();
Executor executor = command -> delegate.execute(() -> context.dispatch(command));
builder.executor(executor);
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;

import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition;
Expand Down Expand Up @@ -90,8 +91,14 @@ private static void forceSet(Object object, String fieldName, Object value)

public static void shutdown() {
if (server != null) {
server.shutdown();
server = null;
try {
server.shutdown();
} catch (RejectedExecutionException ignored) {
// Ignore this, it means the application is already shutting down
} finally {
server = null;
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.ShutdownContext;
import io.vertx.core.Vertx;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;

public class XdsGrpcServerBuilderProvider implements GrpcBuilderProvider<XdsServerBuilder> {
Expand All @@ -54,7 +54,7 @@ public ServerBuilder<XdsServerBuilder> createServerBuilder(Vertx vertx, GrpcServ
// wrap with Vert.x context, so that the context interceptors work
VertxInternal vxi = (VertxInternal) vertx;
Executor delegate = vertx.nettyEventLoopGroup();
EventLoopContext context = vxi.createEventLoopContext();
ContextInternal context = vxi.createEventLoopContext();
Executor executor = command -> delegate.execute(() -> context.dispatch(command));
builder.executor(executor);
// custom XDS interceptors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,11 @@ public interface RedisClient {

Response pfcount(List<String> args);

@Deprecated
Response pfdebug(List<String> args);

Response pfdebug(String command, String key);

Response pfmerge(List<String> args);

Response pfselftest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,16 @@ static ReactiveRedisClient createClient(String name) {

Response pfcountAndAwait(List<String> args);

@Deprecated
Uni<Response> pfdebug(List<String> args);

@Deprecated
Response pfdebugAndAwait(List<String> args);

Uni<Response> pfdebug(String command, String key);

Response pfdebugAndAwait(String command, String key);

Uni<Response> pfmerge(List<String> args);

Response pfmergeAndAwait(List<String> args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -986,12 +986,22 @@ public Response pfcountAndAwait(List<String> args) {

@Override
public Uni<Response> pfdebug(List<String> args) {
return redisAPI.pfdebug(args);
return redisAPI.pfdebug(args.get(0), args.get(1));
}

@Override
public Response pfdebugAndAwait(List<String> args) {
return redisAPI.pfdebugAndAwait(args);
return redisAPI.pfdebugAndAwait(args.get(0), args.get(1));
}

@Override
public Uni<Response> pfdebug(String command, String key) {
return redisAPI.pfdebug(command, key);
}

@Override
public Response pfdebugAndAwait(String command, String key) {
return redisAPI.pfdebugAndAwait(command, key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,12 @@ public Response pfcount(List<String> args) {

@Override
public Response pfdebug(List<String> args) {
return await(redisAPI.pfdebug(args));
return await(redisAPI.pfdebug(args.get(0), args.get(1)));
}

@Override
public Response pfdebug(String command, String key) {
return await(redisAPI.pfdebug(command, key));
}

@Override
Expand Down
Loading
Loading