diff --git a/camel-k-main/camel-k-runtime-health/src/main/java/org/apache/camel/k/health/HealthContextCustomizer.java b/camel-k-main/camel-k-runtime-health/src/main/java/org/apache/camel/k/health/HealthContextCustomizer.java index f1308d2b2..d12d9b54e 100644 --- a/camel-k-main/camel-k-runtime-health/src/main/java/org/apache/camel/k/health/HealthContextCustomizer.java +++ b/camel-k-main/camel-k-runtime-health/src/main/java/org/apache/camel/k/health/HealthContextCustomizer.java @@ -35,7 +35,7 @@ import org.apache.camel.impl.health.ContextHealthCheck; import org.apache.camel.impl.health.RoutesHealthCheckRepository; import org.apache.camel.k.ContextCustomizer; -import org.apache.camel.k.http.PlatformHttpRouter; +import org.apache.camel.k.http.PlatformHttp; public class HealthContextCustomizer implements ContextCustomizer { public static final String DEFAULT_PATH = "/health"; @@ -110,7 +110,7 @@ public void apply(CamelContext camelContext) { // add health route addRoute( camelContext, - PlatformHttpRouter.lookup(camelContext).get() + PlatformHttp.lookup(camelContext).router() ); } diff --git a/camel-k-main/pom.xml b/camel-k-main/pom.xml index f5b61900d..669c3bb70 100644 --- a/camel-k-main/pom.xml +++ b/camel-k-main/pom.xml @@ -31,7 +31,6 @@ camel-k-runtime-main camel-k-runtime-health - camel-k-runtime-http diff --git a/camel-k-quarkus/camel-k-quarkus-knative/runtime/src/main/java/org/apache/camel/k/quarkus/knative/KnativeRecorder.java b/camel-k-quarkus/camel-k-quarkus-knative/runtime/src/main/java/org/apache/camel/k/quarkus/knative/KnativeRecorder.java index 24f078008..fb6844e02 100644 --- a/camel-k-quarkus/camel-k-quarkus-knative/runtime/src/main/java/org/apache/camel/k/quarkus/knative/KnativeRecorder.java +++ b/camel-k-quarkus/camel-k-quarkus-knative/runtime/src/main/java/org/apache/camel/k/quarkus/knative/KnativeRecorder.java @@ -28,7 +28,6 @@ public class KnativeRecorder { public RuntimeValue createKnativeComponent(Supplier vertx) { KnativeHttpTransport transport = new KnativeHttpTransport(); - transport.setVertx(vertx.get()); KnativeComponent component = new KnativeComponent(); component.setTransport(transport); diff --git a/camel-k-main/camel-k-runtime-http/pom.xml b/camel-k-runtime-http/pom.xml similarity index 98% rename from camel-k-main/camel-k-runtime-http/pom.xml rename to camel-k-runtime-http/pom.xml index 97db7863a..012c9c9b6 100644 --- a/camel-k-main/camel-k-runtime-http/pom.xml +++ b/camel-k-runtime-http/pom.xml @@ -20,7 +20,7 @@ org.apache.camel.k - camel-k-main + camel-k-runtime-parent 1.2.0-SNAPSHOT 4.0.0 diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpRouter.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttp.java similarity index 78% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpRouter.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttp.java index 5f33f2fff..91914b0c1 100644 --- a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpRouter.java +++ b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttp.java @@ -16,21 +16,28 @@ */ package org.apache.camel.k.http; +import io.vertx.core.Vertx; import io.vertx.ext.web.Router; import org.apache.camel.CamelContext; import org.apache.camel.component.platform.http.PlatformHttpConstants; import org.apache.camel.support.CamelContextHelper; -public class PlatformHttpRouter { +public class PlatformHttp { public static final String PLATFORM_HTTP_ROUTER_NAME = PlatformHttpConstants.PLATFORM_HTTP_COMPONENT_NAME + "-router"; + private final Vertx vertx; private final Router router; - public PlatformHttpRouter(Router router) { + public PlatformHttp(Vertx vertx, Router router) { + this.vertx = vertx; this.router = router; } - public Router get() { + public Vertx vertx() { + return vertx; + } + + public Router router() { return router; } @@ -40,11 +47,11 @@ public Router get() { // // ********************** - public static PlatformHttpRouter lookup(CamelContext camelContext) { + public static PlatformHttp lookup(CamelContext camelContext) { return CamelContextHelper.mandatoryLookup( camelContext, - PlatformHttpRouter.PLATFORM_HTTP_ROUTER_NAME, - PlatformHttpRouter.class + PlatformHttp.PLATFORM_HTTP_ROUTER_NAME, + PlatformHttp.class ); } } diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServer.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServer.java similarity index 98% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServer.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServer.java index 6d1bd7de1..6126d2956 100644 --- a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServer.java +++ b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServer.java @@ -79,7 +79,7 @@ private CompletionStage startAsync() { router.mountSubRouter(configuration.getPath(), subRouter); - context.getRegistry().bind(PlatformHttpRouter.PLATFORM_HTTP_ROUTER_NAME, new PlatformHttpRouter(subRouter)); + context.getRegistry().bind(PlatformHttp.PLATFORM_HTTP_ROUTER_NAME, new PlatformHttp(subRouter)); //HttpServerOptions options = new HttpServerOptions(); if (configuration.getSslContextParameters() != null) { diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceConfiguration.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceConfiguration.java similarity index 100% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceConfiguration.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceConfiguration.java diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceContextCustomizer.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceContextCustomizer.java similarity index 100% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceContextCustomizer.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceContextCustomizer.java diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceEndpoint.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceEndpoint.java similarity index 100% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceEndpoint.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceEndpoint.java diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpConsumer.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpConsumer.java similarity index 98% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpConsumer.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpConsumer.java index e438c3235..1b1bd8c9b 100644 --- a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpConsumer.java +++ b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpConsumer.java @@ -45,7 +45,7 @@ import org.apache.camel.TypeConverter; import org.apache.camel.component.platform.http.PlatformHttpEndpoint; import org.apache.camel.component.platform.http.spi.Method; -import org.apache.camel.k.http.PlatformHttpRouter; +import org.apache.camel.k.http.PlatformHttp; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.DefaultMessage; @@ -75,10 +75,10 @@ protected void doStart() throws Exception { super.doStart(); final PlatformHttpEndpoint endpoint = getEndpoint(); - final PlatformHttpRouter router = PlatformHttpRouter.lookup(endpoint.getCamelContext()); + final PlatformHttp router = PlatformHttp.lookup(endpoint.getCamelContext()); final String path = endpoint.getPath(); final String vertxPathParamPath = PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1"); - final Route newRoute = router.get().route(vertxPathParamPath); + final Route newRoute = router.router().route(vertxPathParamPath); final Set methods = Method.parseList(endpoint.getHttpMethodRestrict()); if (!methods.equals(Method.getAll())) { diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpEngine.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpEngine.java similarity index 100% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpEngine.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpEngine.java diff --git a/camel-k-main/camel-k-runtime-http/src/main/resources/META-INF/services/org/apache/camel/k/customizer/platform-http b/camel-k-runtime-http/src/main/resources/META-INF/services/org/apache/camel/k/customizer/platform-http similarity index 100% rename from camel-k-main/camel-k-runtime-http/src/main/resources/META-INF/services/org/apache/camel/k/customizer/platform-http rename to camel-k-runtime-http/src/main/resources/META-INF/services/org/apache/camel/k/customizer/platform-http diff --git a/camel-k-main/camel-k-runtime-http/src/test/java/org/apache/camel/k/http/PlatformHttpServiceCustomizerTest.java b/camel-k-runtime-http/src/test/java/org/apache/camel/k/http/PlatformHttpServiceCustomizerTest.java similarity index 98% rename from camel-k-main/camel-k-runtime-http/src/test/java/org/apache/camel/k/http/PlatformHttpServiceCustomizerTest.java rename to camel-k-runtime-http/src/test/java/org/apache/camel/k/http/PlatformHttpServiceCustomizerTest.java index 5756d3711..61435ac3f 100644 --- a/camel-k-main/camel-k-runtime-http/src/test/java/org/apache/camel/k/http/PlatformHttpServiceCustomizerTest.java +++ b/camel-k-runtime-http/src/test/java/org/apache/camel/k/http/PlatformHttpServiceCustomizerTest.java @@ -50,7 +50,7 @@ public void testPlatformHttpServiceCustomizer(String path) throws Exception { httpService.apply(runtime.getCamelContext()); - PlatformHttpRouter.lookup(runtime.getCamelContext()).get().route(HttpMethod.GET, "/my/path") + PlatformHttp.lookup(runtime.getCamelContext()).router().route(HttpMethod.GET, "/my/path") .handler(routingContext -> { JsonObject response = new JsonObject(); response.put("status", "UP"); diff --git a/camel-k-main/camel-k-runtime-http/src/test/resources/log4j2-test.xml b/camel-k-runtime-http/src/test/resources/log4j2-test.xml similarity index 100% rename from camel-k-main/camel-k-runtime-http/src/test/resources/log4j2-test.xml rename to camel-k-runtime-http/src/test/resources/log4j2-test.xml diff --git a/camel-knative/camel-knative-http/pom.xml b/camel-knative/camel-knative-http/pom.xml index a63101593..3a4661615 100644 --- a/camel-knative/camel-knative-http/pom.xml +++ b/camel-knative/camel-knative-http/pom.xml @@ -35,12 +35,6 @@ - - org.slf4j - slf4j-api - ${slf4j.version} - - org.apache.camel camel-core-engine @@ -56,10 +50,10 @@ - io.vertx - vertx-web - ${vertx.version} + org.apache.camel.k + camel-k-runtime-http + io.vertx vertx-web-client diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java index 3759ccc66..748b519d6 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java @@ -44,35 +44,36 @@ public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp.PredicatedHandler { private final KnativeHttpTransport transport; private final Predicate filter; - private final KnativeHttp.ServerKey key; private final KnativeEnvironment.KnativeServiceDefinition serviceDefinition; + private final KnativeHttpConsumerDispatcher dispatcher; private final HeaderFilterStrategy headerFilterStrategy; public KnativeHttpConsumer( KnativeHttpTransport transport, Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition serviceDefinition, + KnativeHttpConsumerDispatcher dispatcher, Processor processor) { super(endpoint, processor); this.transport = transport; this.serviceDefinition = serviceDefinition; + this.dispatcher = dispatcher; this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy(); - this.key = new KnativeHttp.ServerKey(serviceDefinition.getHost(), serviceDefinition.getPortOrDefault(KnativeHttp.DEFAULT_PORT)); this.filter = KnativeHttpSupport.createFilter(serviceDefinition); } @Override protected void doStart() throws Exception { - this.transport.getDispatcher(key).bind(this); + this.dispatcher.bind(this); super.doStart(); } @Override protected void doStop() throws Exception { - this.transport.getDispatcher(key).unbind(this); + this.dispatcher.unbind(this); super.doStop(); } diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java index 621fb1a2f..71084caa4 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java @@ -17,187 +17,67 @@ package org.apache.camel.component.knative.http; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import io.vertx.core.Handler; -import io.vertx.core.Vertx; import io.vertx.core.http.HttpMethod; -import io.vertx.core.http.HttpServer; -import io.vertx.core.http.HttpServerOptions; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; +import io.vertx.ext.web.RoutingContext; import org.apache.camel.Exchange; -import org.apache.camel.support.service.ServiceSupport; -import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.ReferenceCount; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class KnativeHttpConsumerDispatcher { +public final class KnativeHttpConsumerDispatcher implements Handler { private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpConsumerDispatcher.class); - private final Vertx vertx; - private final KnativeHttp.ServerKey key; - private final ReferenceCount refCnt; private final Set handlers; - private final HttpServerWrapper server; - private final HttpServerOptions serverOptions; - private final ExecutorService executor; - - public KnativeHttpConsumerDispatcher(ExecutorService executor, Vertx vertx, KnativeHttp.ServerKey key, HttpServerOptions serverOptions) { - this.executor = executor; - this.vertx = vertx; - this.serverOptions = ObjectHelper.supplyIfEmpty(serverOptions, HttpServerOptions::new); - this.server = new HttpServerWrapper(); + public KnativeHttpConsumerDispatcher() { this.handlers = new CopyOnWriteArraySet<>(); - this.key = key; - this.refCnt = ReferenceCount.on(server::start, server::stop); } public void bind(KnativeHttp.PredicatedHandler handler) { - if (handlers.add(handler)) { - refCnt.retain(); - } + handlers.add(handler); } public void unbind(KnativeHttp.PredicatedHandler handler) { - if (handlers.remove(handler)) { - refCnt.release(); - } + handlers.remove(handler); } - private final class HttpServerWrapper extends ServiceSupport implements Handler { - private HttpServer server; - - @Override - protected void doStart() throws Exception { - LOGGER.info("Starting Vert.x HttpServer on {}:{}}", - key.getHost(), - key.getPort() - ); - - startAsync().toCompletableFuture().join(); - } - - @Override - protected void doStop() throws Exception { - LOGGER.info("Stopping Vert.x HttpServer on {}:{}", - key.getHost(), - key.getPort()); - - try { - if (server != null) { - stopAsync().toCompletableFuture().join(); - } - } finally { - this.server = null; - } - } - - private CompletionStage startAsync() { - server = vertx.createHttpServer(serverOptions); - server.requestHandler(this); - - return CompletableFuture.runAsync( - () -> { - CountDownLatch latch = new CountDownLatch(1); - - server.listen(key.getPort(), key.getHost(), result -> { - try { - if (result.failed()) { - LOGGER.warn("Failed to start Vert.x HttpServer on {}:{}, reason: {}", - key.getHost(), - key.getPort(), - result.cause().getMessage() - ); - - throw new RuntimeException(result.cause()); - } - - LOGGER.info("Vert.x HttpServer started on {}:{}", key.getHost(), key.getPort()); - } finally { - latch.countDown(); - } - }); + @Override + public void handle(RoutingContext routingContext) { + HttpServerRequest request = routingContext.request(); - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }, - executor - ); - } - - protected CompletionStage stopAsync() { - return CompletableFuture.runAsync( - () -> { - CountDownLatch latch = new CountDownLatch(1); - - server.close(result -> { - try { - if (result.failed()) { - LOGGER.warn("Failed to close Vert.x HttpServer reason: {}", - result.cause().getMessage() - ); - - throw new RuntimeException(result.cause()); - } - - LOGGER.info("Vert.x HttpServer stopped"); - } finally { - latch.countDown(); - } - }); + if (request.method() != HttpMethod.POST) { + HttpServerResponse response = request.response(); + response.setStatusCode(405); + response.putHeader(Exchange.CONTENT_TYPE, "text/plain"); + response.end("Unsupported method: " + request.method()); - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }, - executor - ); + return; } - @Override - public void handle(HttpServerRequest request) { - if (request.method() != HttpMethod.POST) { - HttpServerResponse response = request.response(); - response.setStatusCode(405); - response.putHeader(Exchange.CONTENT_TYPE, "text/plain"); - response.end("Unsupported method: " + request.method()); + LOGGER.debug("received exchange on path: {}, headers: {}", + request.path(), + request.headers() + ); + for (KnativeHttp.PredicatedHandler handler: handlers) { + if (handler.canHandle(request)) { + handler.handle(request); return; } + } - LOGGER.debug("received exchange on path: {}, headers: {}", - request.path(), - request.headers() - ); - - for (KnativeHttp.PredicatedHandler handler: handlers) { - if (handler.canHandle(request)) { - handler.handle(request); - return; - } - } - - LOGGER.warn("No handler found for path: {}, headers: {}", - request.path(), - request.headers() - ); + LOGGER.warn("No handler found for path: {}, headers: {}", + request.path(), + request.headers() + ); - HttpServerResponse response = request.response(); - response.setStatusCode(404); - response.putHeader(Exchange.CONTENT_TYPE, "text/plain"); - response.end("No matching condition found"); - } + HttpServerResponse response = request.response(); + response.setStatusCode(404); + response.putHeader(Exchange.CONTENT_TYPE, "text/plain"); + response.end("No matching condition found"); } } diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java index 5850a534a..dd6e50de0 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java @@ -16,16 +16,7 @@ */ package org.apache.camel.component.knative.http; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - -import io.vertx.core.Vertx; -import io.vertx.core.VertxOptions; -import io.vertx.core.http.HttpServerOptions; +import io.vertx.ext.web.Route; import io.vertx.ext.web.client.WebClientOptions; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; @@ -36,52 +27,25 @@ import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.component.knative.spi.KnativeTransport; import org.apache.camel.component.knative.spi.KnativeTransportConfiguration; +import org.apache.camel.k.http.PlatformHttp; import org.apache.camel.support.service.ServiceSupport; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class KnativeHttpTransport extends ServiceSupport implements CamelContextAware, KnativeTransport { - private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpTransport.class); - - private final Map registry; - - private Vertx vertx; - private VertxOptions vertxOptions; - private HttpServerOptions vertxHttpServerOptions; + private PlatformHttp platformHttp; private WebClientOptions vertxHttpClientOptions; private CamelContext camelContext; - - private boolean localVertx; - private ExecutorService executor; + private KnativeHttpConsumerDispatcher dispatcher; + private Route route; public KnativeHttpTransport() { - this.registry = new ConcurrentHashMap<>(); - this.localVertx = false; - } - - public Vertx getVertx() { - return vertx; - } - - public void setVertx(Vertx vertx) { - this.vertx = vertx; - } - - public VertxOptions getVertxOptions() { - return vertxOptions; - } - - public void setVertxOptions(VertxOptions vertxOptions) { - this.vertxOptions = vertxOptions; } - public HttpServerOptions getVertxHttpServerOptions() { - return vertxHttpServerOptions; + public PlatformHttp getPlatformHttp() { + return platformHttp; } - public void setVertxHttpServerOptions(HttpServerOptions vertxHttpServerOptions) { - this.vertxHttpServerOptions = vertxHttpServerOptions; + public void setPlatformHttp(PlatformHttp platformHttp) { + this.platformHttp = platformHttp; } public WebClientOptions getVertxHttpClientOptions() { @@ -92,10 +56,6 @@ public void setVertxHttpClientOptions(WebClientOptions vertxHttpClientOptions) { this.vertxHttpClientOptions = vertxHttpClientOptions; } - KnativeHttpConsumerDispatcher getDispatcher(KnativeHttp.ServerKey key) { - return registry.computeIfAbsent(key, k -> new KnativeHttpConsumerDispatcher(executor, vertx, k, vertxHttpServerOptions)); - } - @Override public void setCamelContext(CamelContext camelContext) { this.camelContext = camelContext; @@ -114,81 +74,17 @@ public CamelContext getCamelContext() { @Override protected void doStart() throws Exception { - this.executor = getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "knative-http-component"); - - if (this.vertx != null) { - LOGGER.info("Using Vert.x instance configured on component: {}", this.vertx); - return; - } - - if (this.vertx == null) { - Set instances = getCamelContext().getRegistry().findByType(Vertx.class); - if (instances.size() == 1) { - this.vertx = instances.iterator().next(); - - // - // if this method is executed before the container is fully started, - // it may return a null reference, may be related to: - // - // https://groups.google.com/forum/#!topic/quarkus-dev/qSo65fTyYVA - // - if (this.vertx != null) { - LOGGER.info("Found Vert.x instance in registry: {}", this.vertx); - } - } - } + this.dispatcher = new KnativeHttpConsumerDispatcher(); - if (this.vertx == null) { - LOGGER.info("Creating new Vert.x instance"); - - VertxOptions options = ObjectHelper.supplyIfEmpty(this.vertxOptions, VertxOptions::new); - - this.vertx = Vertx.vertx(options); - this.localVertx = true; - } + this.platformHttp = PlatformHttp.lookup(camelContext); + this.route = this.platformHttp.router().route("/"); + this.route.handler(this.dispatcher); } @Override protected void doStop() throws Exception { - if (this.vertx != null && this.localVertx) { - Future future = this.executor.submit( - () -> { - CountDownLatch latch = new CountDownLatch(1); - - this.vertx.close(result -> { - try { - if (result.failed()) { - LOGGER.warn("Failed to close Vert.x HttpServer reason: {}", - result.cause().getMessage() - ); - - throw new RuntimeException(result.cause()); - } - - LOGGER.info("Vert.x HttpServer stopped"); - } finally { - latch.countDown(); - } - }); - - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - ); - - try { - future.get(); - } finally { - this.vertx = null; - this.localVertx = false; - } - } - - if (this.executor != null) { - getCamelContext().getExecutorServiceManager().shutdownNow(this.executor); + if (this.route == null) { + this.route.remove(); } } @@ -200,7 +96,7 @@ protected void doStop() throws Exception { @Override public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service) { - return new KnativeHttpProducer(this, endpoint, service, vertx, vertxHttpClientOptions); + return new KnativeHttpProducer(this, endpoint, service, this.platformHttp.vertx(), vertxHttpClientOptions); } @Override @@ -209,7 +105,7 @@ public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration if (config.isRemoveCloudEventHeadersInReply()) { next = KnativeHttpSupport.withoutCloudEventHeaders(processor, config.getCloudEvent()); } - return new KnativeHttpConsumer(this, endpoint, service, next); + return new KnativeHttpConsumer(this, endpoint, service, new KnativeHttpConsumerDispatcher(), next); } } diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java index f33553a65..f67dfc957 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java @@ -208,7 +208,7 @@ protected void doStop() throws Exception { @Override protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { if (ObjectHelper.isEmpty(remaining)) { - throw new IllegalArgumentException("Expecting URI in the forof: 'knative:type/name', got '" + uri + "'"); + throw new IllegalArgumentException("Expecting URI in the form of: 'knative:type/name', got '" + uri + "'"); } final String type = StringHelper.before(remaining, "/"); diff --git a/pom.xml b/pom.xml index 926d79c77..7c2ac17ce 100644 --- a/pom.xml +++ b/pom.xml @@ -217,6 +217,7 @@ camel-k-loader-java camel-k-runtime-cron + camel-k-runtime-http camel-k-runtime-knative camel-k-runtime-master camel-k-runtime-webhook