diff --git a/camel-k-main/camel-k-runtime-health/src/main/java/org/apache/camel/k/health/HealthContextCustomizer.java b/camel-k-main/camel-k-runtime-health/src/main/java/org/apache/camel/k/health/HealthContextCustomizer.java index f1308d2b2..cd31d2385 100644 --- a/camel-k-main/camel-k-runtime-health/src/main/java/org/apache/camel/k/health/HealthContextCustomizer.java +++ b/camel-k-main/camel-k-runtime-health/src/main/java/org/apache/camel/k/health/HealthContextCustomizer.java @@ -24,7 +24,6 @@ import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.Route; -import io.vertx.ext.web.Router; import org.apache.camel.CamelContext; import org.apache.camel.Ordered; import org.apache.camel.health.HealthCheck; @@ -35,7 +34,7 @@ import org.apache.camel.impl.health.ContextHealthCheck; import org.apache.camel.impl.health.RoutesHealthCheckRepository; import org.apache.camel.k.ContextCustomizer; -import org.apache.camel.k.http.PlatformHttpRouter; +import org.apache.camel.k.http.PlatformHttp; public class HealthContextCustomizer implements ContextCustomizer { public static final String DEFAULT_PATH = "/health"; @@ -110,12 +109,17 @@ public void apply(CamelContext camelContext) { // add health route addRoute( camelContext, - PlatformHttpRouter.lookup(camelContext).get() + PlatformHttp.lookup(camelContext) ); } - private Route addRoute(CamelContext camelContext, Router router) { - return router.route(HttpMethod.GET, path).handler(routingContext -> { + private Route addRoute(CamelContext camelContext, PlatformHttp platformHttp) { + Route route = platformHttp.router().route(HttpMethod.GET, path); + + // add global handlers first i.e. body handler + platformHttp.handlers().forEach(route::handler); + + route.handler(routingContext -> { int code = 200; Collection results = HealthCheckHelper.invoke( @@ -158,5 +162,7 @@ private Route addRoute(CamelContext camelContext, Router router) { .setStatusCode(code) .end(Json.encodePrettily(response)); }); + + return route; } } diff --git a/camel-k-main/pom.xml b/camel-k-main/pom.xml index eb217ef43..5bcb63b88 100644 --- a/camel-k-main/pom.xml +++ b/camel-k-main/pom.xml @@ -31,7 +31,6 @@ camel-k-runtime-main camel-k-runtime-health - camel-k-runtime-http diff --git a/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java b/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java index bb21f4f49..159f946cf 100644 --- a/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java +++ b/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java @@ -22,6 +22,8 @@ import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.vertx.core.deployment.CoreVertxBuildItem; +import io.quarkus.vertx.http.deployment.BodyHandlerBuildItem; +import io.quarkus.vertx.http.deployment.VertxWebRouterBuildItem; import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.k.quarkus.knative.KnativeRecorder; import org.apache.camel.quarkus.core.deployment.CamelRuntimeBeanBuildItem; @@ -41,11 +43,19 @@ void servicesFilters(BuildProducer serviceFilter) { @Record(ExecutionTime.RUNTIME_INIT) @BuildStep - CamelRuntimeBeanBuildItem knativeComponent(KnativeRecorder recorder, CoreVertxBuildItem vertx) { + CamelRuntimeBeanBuildItem knativeComponent( + KnativeRecorder recorder, + CoreVertxBuildItem vertx, + VertxWebRouterBuildItem router, + BodyHandlerBuildItem bodyHandlerBuildItem) { + return new CamelRuntimeBeanBuildItem( "knative", "org.apache.camel.component.knative.KnativeComponent", - recorder.createKnativeComponent(vertx.getVertx()) + recorder.createKnativeComponent( + vertx.getVertx(), + router.getRouter(), + bodyHandlerBuildItem.getHandler()) ); } diff --git a/camel-k-quarkus/camel-k-quarkus-knative/runtime/src/main/java/org/apache/camel/k/quarkus/knative/KnativeRecorder.java b/camel-k-quarkus/camel-k-quarkus-knative/runtime/src/main/java/org/apache/camel/k/quarkus/knative/KnativeRecorder.java index 24f078008..2ce29c6fc 100644 --- a/camel-k-quarkus/camel-k-quarkus-knative/runtime/src/main/java/org/apache/camel/k/quarkus/knative/KnativeRecorder.java +++ b/camel-k-quarkus/camel-k-quarkus-knative/runtime/src/main/java/org/apache/camel/k/quarkus/knative/KnativeRecorder.java @@ -16,19 +16,32 @@ */ package org.apache.camel.k.quarkus.knative; +import java.util.Collections; import java.util.function.Supplier; import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.annotations.Recorder; +import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; import org.apache.camel.component.knative.KnativeComponent; import org.apache.camel.component.knative.http.KnativeHttpTransport; +import org.apache.camel.k.http.PlatformHttp; @Recorder public class KnativeRecorder { - public RuntimeValue createKnativeComponent(Supplier vertx) { + public RuntimeValue createKnativeComponent( + Supplier vertx, + RuntimeValue router, + Handler bodyHandler) { + KnativeHttpTransport transport = new KnativeHttpTransport(); - transport.setVertx(vertx.get()); + transport.setPlatformHttp(new PlatformHttp( + vertx.get(), + router.getValue(), + Collections.singletonList(bodyHandler) + )); KnativeComponent component = new KnativeComponent(); component.setTransport(transport); diff --git a/camel-k-main/camel-k-runtime-http/pom.xml b/camel-k-runtime-http/pom.xml similarity index 98% rename from camel-k-main/camel-k-runtime-http/pom.xml rename to camel-k-runtime-http/pom.xml index 934c70e58..aabec84cf 100644 --- a/camel-k-main/camel-k-runtime-http/pom.xml +++ b/camel-k-runtime-http/pom.xml @@ -20,9 +20,10 @@ org.apache.camel.k - camel-k-main + camel-k-runtime-parent 1.2.2-SNAPSHOT + 4.0.0 camel-k-runtime-http diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpRouter.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttp.java similarity index 59% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpRouter.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttp.java index 5f33f2fff..52ab74ed7 100644 --- a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpRouter.java +++ b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttp.java @@ -16,35 +16,60 @@ */ package org.apache.camel.k.http; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; import org.apache.camel.CamelContext; import org.apache.camel.component.platform.http.PlatformHttpConstants; import org.apache.camel.support.CamelContextHelper; -public class PlatformHttpRouter { +public class PlatformHttp { public static final String PLATFORM_HTTP_ROUTER_NAME = PlatformHttpConstants.PLATFORM_HTTP_COMPONENT_NAME + "-router"; + private final Vertx vertx; private final Router router; + private final List> handlers; + + public PlatformHttp(Vertx vertx, Router router) { + this.vertx = vertx; + this.router = router; + this.handlers = Collections.emptyList(); + } - public PlatformHttpRouter(Router router) { + public PlatformHttp(Vertx vertx, Router router, List> handlers) { + this.vertx = vertx; this.router = router; + this.handlers = Collections.unmodifiableList(new ArrayList<>(handlers)); } - public Router get() { + public Vertx vertx() { + return vertx; + } + + public Router router() { return router; } + public List> handlers() { + return handlers; + } + // ********************** // // Helpers // // ********************** - public static PlatformHttpRouter lookup(CamelContext camelContext) { + public static PlatformHttp lookup(CamelContext camelContext) { return CamelContextHelper.mandatoryLookup( camelContext, - PlatformHttpRouter.PLATFORM_HTTP_ROUTER_NAME, - PlatformHttpRouter.class + PlatformHttp.PLATFORM_HTTP_ROUTER_NAME, + PlatformHttp.class ); } } diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServer.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServer.java similarity index 94% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServer.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServer.java index 6d1bd7de1..2e864091c 100644 --- a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServer.java +++ b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.k.http; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; @@ -33,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class PlatformHttpServer extends ServiceSupport { +public final class PlatformHttpServer extends ServiceSupport { private static final Logger LOGGER = LoggerFactory.getLogger(PlatformHttpServer.class); private final CamelContext context; @@ -70,16 +71,12 @@ private CompletionStage startAsync() { final Router router = Router.router(vertx); final Router subRouter = Router.router(vertx); - router.route() - .order(Integer.MIN_VALUE) - .handler(ctx -> { - ctx.request().resume(); - createBodyHandler().handle(ctx); - }); - router.mountSubRouter(configuration.getPath(), subRouter); - context.getRegistry().bind(PlatformHttpRouter.PLATFORM_HTTP_ROUTER_NAME, new PlatformHttpRouter(subRouter)); + context.getRegistry().bind( + PlatformHttp.PLATFORM_HTTP_ROUTER_NAME, + new PlatformHttp(vertx, subRouter, Collections.singletonList(createBodyHandler())) + ); //HttpServerOptions options = new HttpServerOptions(); if (configuration.getSslContextParameters() != null) { diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceConfiguration.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceConfiguration.java similarity index 91% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceConfiguration.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceConfiguration.java index 659ecab8d..5aa5a3ef7 100644 --- a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceConfiguration.java +++ b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceConfiguration.java @@ -30,7 +30,7 @@ public class PlatformHttpServiceConfiguration { private String path = DEFAULT_PATH; private BigInteger maxBodySize; - private BodyHandler bodyHandler = new BodyHandler(); + private BodyHandlerConfiguration bodyHandlerConfiguration = new BodyHandlerConfiguration(); private SSLContextParameters sslContextParameters; public String getBindHost() { @@ -65,12 +65,12 @@ public void setMaxBodySize(BigInteger maxBodySize) { this.maxBodySize = maxBodySize; } - public BodyHandler getBodyHandler() { - return bodyHandler; + public BodyHandlerConfiguration getBodyHandler() { + return bodyHandlerConfiguration; } - public void setBodyHandler(BodyHandler bodyHandler) { - this.bodyHandler = bodyHandler; + public void setBodyHandler(BodyHandlerConfiguration bodyHandler) { + this.bodyHandlerConfiguration = bodyHandler; } public SSLContextParameters getSslContextParameters() { @@ -81,7 +81,7 @@ public void setSslContextParameters(SSLContextParameters sslContextParameters) { this.sslContextParameters = sslContextParameters; } - public static class BodyHandler { + public static class BodyHandlerConfiguration { private boolean handleFileUploads = true; private String uploadsDirectory = "file-uploads"; private boolean mergeFormAttributes = true; diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceContextCustomizer.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceContextCustomizer.java similarity index 100% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceContextCustomizer.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceContextCustomizer.java diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceEndpoint.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceEndpoint.java similarity index 100% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceEndpoint.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/PlatformHttpServiceEndpoint.java diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpConsumer.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpConsumer.java similarity index 98% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpConsumer.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpConsumer.java index e438c3235..300fe915a 100644 --- a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpConsumer.java +++ b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpConsumer.java @@ -45,7 +45,7 @@ import org.apache.camel.TypeConverter; import org.apache.camel.component.platform.http.PlatformHttpEndpoint; import org.apache.camel.component.platform.http.spi.Method; -import org.apache.camel.k.http.PlatformHttpRouter; +import org.apache.camel.k.http.PlatformHttp; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.DefaultMessage; @@ -75,10 +75,12 @@ protected void doStart() throws Exception { super.doStart(); final PlatformHttpEndpoint endpoint = getEndpoint(); - final PlatformHttpRouter router = PlatformHttpRouter.lookup(endpoint.getCamelContext()); + final PlatformHttp router = PlatformHttp.lookup(endpoint.getCamelContext()); final String path = endpoint.getPath(); final String vertxPathParamPath = PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1"); - final Route newRoute = router.get().route(vertxPathParamPath); + final Route newRoute = router.router().route(vertxPathParamPath); + + router.handlers().forEach(newRoute::handler); final Set methods = Method.parseList(endpoint.getHttpMethodRestrict()); if (!methods.equals(Method.getAll())) { diff --git a/camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpEngine.java b/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpEngine.java similarity index 100% rename from camel-k-main/camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpEngine.java rename to camel-k-runtime-http/src/main/java/org/apache/camel/k/http/engine/RuntimePlatformHttpEngine.java diff --git a/camel-k-main/camel-k-runtime-http/src/main/resources/META-INF/services/org/apache/camel/k/customizer/platform-http b/camel-k-runtime-http/src/main/resources/META-INF/services/org/apache/camel/k/customizer/platform-http similarity index 100% rename from camel-k-main/camel-k-runtime-http/src/main/resources/META-INF/services/org/apache/camel/k/customizer/platform-http rename to camel-k-runtime-http/src/main/resources/META-INF/services/org/apache/camel/k/customizer/platform-http diff --git a/camel-k-main/camel-k-runtime-http/src/test/java/org/apache/camel/k/http/PlatformHttpServiceCustomizerTest.java b/camel-k-runtime-http/src/test/java/org/apache/camel/k/http/PlatformHttpServiceCustomizerTest.java similarity index 98% rename from camel-k-main/camel-k-runtime-http/src/test/java/org/apache/camel/k/http/PlatformHttpServiceCustomizerTest.java rename to camel-k-runtime-http/src/test/java/org/apache/camel/k/http/PlatformHttpServiceCustomizerTest.java index 5756d3711..61435ac3f 100644 --- a/camel-k-main/camel-k-runtime-http/src/test/java/org/apache/camel/k/http/PlatformHttpServiceCustomizerTest.java +++ b/camel-k-runtime-http/src/test/java/org/apache/camel/k/http/PlatformHttpServiceCustomizerTest.java @@ -50,7 +50,7 @@ public void testPlatformHttpServiceCustomizer(String path) throws Exception { httpService.apply(runtime.getCamelContext()); - PlatformHttpRouter.lookup(runtime.getCamelContext()).get().route(HttpMethod.GET, "/my/path") + PlatformHttp.lookup(runtime.getCamelContext()).router().route(HttpMethod.GET, "/my/path") .handler(routingContext -> { JsonObject response = new JsonObject(); response.put("status", "UP"); diff --git a/camel-k-main/camel-k-runtime-http/src/test/resources/log4j2-test.xml b/camel-k-runtime-http/src/test/resources/log4j2-test.xml similarity index 100% rename from camel-k-main/camel-k-runtime-http/src/test/resources/log4j2-test.xml rename to camel-k-runtime-http/src/test/resources/log4j2-test.xml diff --git a/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java index 271928969..e977e703f 100644 --- a/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java +++ b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java @@ -35,6 +35,7 @@ import org.apache.camel.k.Source; import org.apache.camel.k.SourceLoader; import org.apache.camel.k.Sources; +import org.apache.camel.k.http.PlatformHttpServiceContextCustomizer; import org.apache.camel.k.listener.RoutesConfigurer; import org.apache.camel.k.loader.java.JavaSourceLoader; import org.apache.camel.model.ModelCamelContext; @@ -69,6 +70,7 @@ public void testWrapLoader(String uri) throws Exception { LOGGER.info("uri: {}", uri); final int port = AvailablePortFinder.getNextAvailable(); + final int platformHttpPort = AvailablePortFinder.getNextAvailable(); final String data = UUID.randomUUID().toString(); KnativeComponent component = new KnativeComponent(); @@ -173,10 +175,16 @@ public void testWrapLoaderWithBeanRegistration() throws Exception { static class TestRuntime implements Runtime { private final CamelContext camelContext; private final List builders; + private final int platformHttpPort; public TestRuntime() { this.camelContext = new DefaultCamelContext(); this.builders = new ArrayList<>(); + this.platformHttpPort = AvailablePortFinder.getNextAvailable(); + + PlatformHttpServiceContextCustomizer httpService = new PlatformHttpServiceContextCustomizer(); + httpService.setBindPort(platformHttpPort); + httpService.apply(this.camelContext); } @Override diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java index 1702ae234..d0547bfcb 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java @@ -124,6 +124,17 @@ public static KnativeServiceDefinition endpoint(Knative.EndpointKind endpointKin ); } + public static KnativeServiceDefinition sourceEndpoint(String name, Map metadata) { + return entry( + Knative.EndpointKind.source, + Knative.Type.endpoint, + name, + null, + -1, + metadata + ); + } + public static KnativeServiceDefinition channel(Knative.EndpointKind endpointKind, String name, String host, int port) { return entry( endpointKind, @@ -157,6 +168,28 @@ public static KnativeServiceDefinition event(Knative.EndpointKind endpointKind, ); } + public static KnativeServiceDefinition sourceEvent(String name) { + return entry( + Knative.EndpointKind.source, + Knative.Type.event, + name, + null, + -1, + Collections.emptyMap() + ); + } + + public static KnativeServiceDefinition sourceEvent(String name, Map metadata) { + return entry( + Knative.EndpointKind.source, + Knative.Type.event, + name, + null, + -1, + metadata + ); + } + public static KnativeServiceDefinition event(Knative.EndpointKind endpointKind, String name, String host, int port, Map metadata) { return entry( endpointKind, @@ -198,8 +231,8 @@ public static final class KnativeServiceDefinition extends DefaultServiceDefinit public KnativeServiceDefinition( @JsonProperty(value = "type", required = true) Knative.Type type, @JsonProperty(value = "name", required = true) String name, - @JsonProperty(value = "host", required = true) String host, - @JsonProperty(value = "port", required = true) int port, + @JsonProperty(value = "host", required = false) String host, + @JsonProperty(value = "port", required = false) int port, @JsonProperty(value = "metadata", required = false) Map metadata) { super( diff --git a/camel-knative/camel-knative-http/pom.xml b/camel-knative/camel-knative-http/pom.xml index 58f9f1338..731bcface 100644 --- a/camel-knative/camel-knative-http/pom.xml +++ b/camel-knative/camel-knative-http/pom.xml @@ -35,12 +35,6 @@ - - org.slf4j - slf4j-api - ${slf4j.version} - - org.apache.camel camel-core-engine @@ -56,10 +50,10 @@ - io.vertx - vertx-web - ${vertx.version} + org.apache.camel.k + camel-k-runtime-http + io.vertx vertx-web-client @@ -116,6 +110,11 @@ camel-undertow test + + org.apache.camel + camel-http + test + org.apache.camel camel-bean @@ -140,12 +139,19 @@ ${junit.version} test + org.assertj assertj-core ${assertj.version} test + + io.rest-assured + rest-assured + ${rest-assured.version} + test + org.apache.logging.log4j diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java index aad1725a0..a7d83d333 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java @@ -16,56 +16,10 @@ */ package org.apache.camel.component.knative.http; -import java.util.Objects; -import java.util.regex.Pattern; - -import io.vertx.core.Handler; -import io.vertx.core.http.HttpServerRequest; - public final class KnativeHttp { public static final int DEFAULT_PORT = 8080; public static final String DEFAULT_PATH = "/"; - public static final Pattern ENDPOINT_PATTERN = Pattern.compile("([0-9a-zA-Z][\\w\\.-]+):(\\d+)\\/?(.*)"); private KnativeHttp() { } - - public static final class ServerKey { - private final String host; - private final int port; - - public ServerKey(String host, int port) { - this.host = host; - this.port = port; - } - - public String getHost() { - return host; - } - - public int getPort() { - return port; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ServerKey key = (ServerKey) o; - return getPort() == key.getPort() && getHost().equals(key.getHost()); - } - - @Override - public int hashCode() { - return Objects.hash(getHost(), getPort()); - } - } - - public interface PredicatedHandler extends Handler { - boolean canHandle(HttpServerRequest event); - } } diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java index 3759ccc66..d8d36888e 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java @@ -27,6 +27,8 @@ import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; +import io.vertx.ext.web.Route; +import io.vertx.ext.web.RoutingContext; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; @@ -35,109 +37,160 @@ import org.apache.camel.Processor; import org.apache.camel.TypeConverter; import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.k.http.PlatformHttp; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.DefaultMessage; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.MessageHelper; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KnativeHttpConsumer extends DefaultConsumer { + private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpConsumer.class); -public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp.PredicatedHandler { private final KnativeHttpTransport transport; private final Predicate filter; - private final KnativeHttp.ServerKey key; private final KnativeEnvironment.KnativeServiceDefinition serviceDefinition; + private final PlatformHttp platformHttp; private final HeaderFilterStrategy headerFilterStrategy; + private String basePath; + private Route route; + public KnativeHttpConsumer( - KnativeHttpTransport transport, - Endpoint endpoint, - KnativeEnvironment.KnativeServiceDefinition serviceDefinition, - Processor processor) { + KnativeHttpTransport transport, + Endpoint endpoint, + KnativeEnvironment.KnativeServiceDefinition serviceDefinition, + PlatformHttp platformHttp, + Processor processor) { super(endpoint, processor); this.transport = transport; this.serviceDefinition = serviceDefinition; + this.platformHttp = platformHttp; this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy(); - this.key = new KnativeHttp.ServerKey(serviceDefinition.getHost(), serviceDefinition.getPortOrDefault(KnativeHttp.DEFAULT_PORT)); this.filter = KnativeHttpSupport.createFilter(serviceDefinition); } + public String getBasePath() { + return basePath; + } + + public void setBasePath(String basePath) { + this.basePath = basePath; + } + @Override protected void doStart() throws Exception { - this.transport.getDispatcher(key).bind(this); + if (route == null) { + String path = ObjectHelper.supplyIfEmpty(serviceDefinition.getPath(), () -> KnativeHttp.DEFAULT_PATH); + if (ObjectHelper.isNotEmpty(basePath)) { + path = basePath + path; + } + + LOGGER.debug("Creating route for path: {}", path); + + route = platformHttp.router().route( + HttpMethod.POST, + path + ); + + // add common handlers + platformHttp.handlers().forEach(route::handler); + + route.handler(routingContext -> { + LOGGER.debug("Handling {}", routingContext); + + if (filter.test(routingContext.request())) { + handleRequest(routingContext); + } else { + LOGGER.debug("Cannot handle request on {}, next", getEndpoint().getEndpointUri()); + routingContext.next(); + } + }); + } super.doStart(); } @Override protected void doStop() throws Exception { - this.transport.getDispatcher(key).unbind(this); + if (route != null) { + route.remove(); + } super.doStop(); } @Override - public boolean canHandle(HttpServerRequest request) { - return filter.test(request); + protected void doSuspend() throws Exception { + if (route != null) { + route.disable(); + } } @Override - public void handle(HttpServerRequest request) { - if (request.method() == HttpMethod.POST) { - final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut); - final Message in = toMessage(request, exchange); + protected void doResume() throws Exception { + if (route != null) { + route.enable(); + } + } - request.bodyHandler(buffer -> { - in.setBody(buffer.getBytes()); + private void handleRequest(RoutingContext routingContext) { + final HttpServerRequest request = routingContext.request(); + final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut); + final Message in = toMessage(request, exchange); - exchange.setIn(in); + Buffer payload = routingContext.getBody(); + if (payload != null) { + in.setBody(payload.getBytes()); + } else { + in.setBody(null); + } + exchange.setIn(in); + + try { + createUoW(exchange); + getAsyncProcessor().process(exchange, doneSync -> { try { - createUoW(exchange); - getAsyncProcessor().process(exchange, doneSync -> { - try { - HttpServerResponse response = toHttpResponse(request, exchange.getMessage()); - Buffer body = null; - - if (request.response().getStatusCode() != 204) { - body = computeResponseBody(exchange.getMessage()); - - // set the content type in the response. - String contentType = MessageHelper.getContentType(exchange.getMessage()); - if (contentType != null) { - // set content-type - response.putHeader(Exchange.CONTENT_TYPE, contentType); - } - } - - if (body != null) { - request.response().end(body); - } else { - request.response().setStatusCode(204); - request.response().end(); - } - } catch (Exception e) { - getExceptionHandler().handleException(e); + HttpServerResponse response = toHttpResponse(request, exchange.getMessage()); + Buffer body = null; + + if (request.response().getStatusCode() != 204) { + body = computeResponseBody(exchange.getMessage()); + + // set the content type in the response. + String contentType = MessageHelper.getContentType(exchange.getMessage()); + if (contentType != null) { + // set content-type + response.putHeader(Exchange.CONTENT_TYPE, contentType); } - }); + } + + if (body != null) { + request.response().end(body); + } else { + request.response().setStatusCode(204); + request.response().end(); + } } catch (Exception e) { getExceptionHandler().handleException(e); - - request.response().setStatusCode(500); - request.response().putHeader(Exchange.CONTENT_TYPE, "text/plain"); - request.response().end(e.getMessage()); - } finally { - doneUoW(exchange); } }); - } else { - request.response().setStatusCode(405); - request.response().putHeader(Exchange.CONTENT_TYPE, "text/plain"); - request.response().end("Unsupported method"); + } catch (Exception e) { + getExceptionHandler().handleException(e); - throw new IllegalArgumentException("Unsupported method: " + request.method()); + request.response().setStatusCode(500); + request.response().putHeader(Exchange.CONTENT_TYPE, "text/plain"); + request.response().end(e.getMessage()); + } finally { + doneUoW(exchange); } + } private Message toMessage(HttpServerRequest request, Exchange exchange) { @@ -224,5 +277,4 @@ private Buffer computeResponseBody(Message message) throws NoTypeConversionAvail ? Buffer.buffer(message.getExchange().getContext().getTypeConverter().mandatoryConvertTo(byte[].class, body)) : null; } - } diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java deleted file mode 100644 index 621fb1a2f..000000000 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.knative.http; - -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; - -import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.core.http.HttpMethod; -import io.vertx.core.http.HttpServer; -import io.vertx.core.http.HttpServerOptions; -import io.vertx.core.http.HttpServerRequest; -import io.vertx.core.http.HttpServerResponse; -import org.apache.camel.Exchange; -import org.apache.camel.support.service.ServiceSupport; -import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.ReferenceCount; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class KnativeHttpConsumerDispatcher { - private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpConsumerDispatcher.class); - - private final Vertx vertx; - private final KnativeHttp.ServerKey key; - private final ReferenceCount refCnt; - private final Set handlers; - private final HttpServerWrapper server; - private final HttpServerOptions serverOptions; - private final ExecutorService executor; - - public KnativeHttpConsumerDispatcher(ExecutorService executor, Vertx vertx, KnativeHttp.ServerKey key, HttpServerOptions serverOptions) { - this.executor = executor; - this.vertx = vertx; - this.serverOptions = ObjectHelper.supplyIfEmpty(serverOptions, HttpServerOptions::new); - this.server = new HttpServerWrapper(); - - this.handlers = new CopyOnWriteArraySet<>(); - this.key = key; - this.refCnt = ReferenceCount.on(server::start, server::stop); - } - - public void bind(KnativeHttp.PredicatedHandler handler) { - if (handlers.add(handler)) { - refCnt.retain(); - } - } - - public void unbind(KnativeHttp.PredicatedHandler handler) { - if (handlers.remove(handler)) { - refCnt.release(); - } - } - - private final class HttpServerWrapper extends ServiceSupport implements Handler { - private HttpServer server; - - @Override - protected void doStart() throws Exception { - LOGGER.info("Starting Vert.x HttpServer on {}:{}}", - key.getHost(), - key.getPort() - ); - - startAsync().toCompletableFuture().join(); - } - - @Override - protected void doStop() throws Exception { - LOGGER.info("Stopping Vert.x HttpServer on {}:{}", - key.getHost(), - key.getPort()); - - try { - if (server != null) { - stopAsync().toCompletableFuture().join(); - } - } finally { - this.server = null; - } - } - - private CompletionStage startAsync() { - server = vertx.createHttpServer(serverOptions); - server.requestHandler(this); - - return CompletableFuture.runAsync( - () -> { - CountDownLatch latch = new CountDownLatch(1); - - server.listen(key.getPort(), key.getHost(), result -> { - try { - if (result.failed()) { - LOGGER.warn("Failed to start Vert.x HttpServer on {}:{}, reason: {}", - key.getHost(), - key.getPort(), - result.cause().getMessage() - ); - - throw new RuntimeException(result.cause()); - } - - LOGGER.info("Vert.x HttpServer started on {}:{}", key.getHost(), key.getPort()); - } finally { - latch.countDown(); - } - }); - - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }, - executor - ); - } - - protected CompletionStage stopAsync() { - return CompletableFuture.runAsync( - () -> { - CountDownLatch latch = new CountDownLatch(1); - - server.close(result -> { - try { - if (result.failed()) { - LOGGER.warn("Failed to close Vert.x HttpServer reason: {}", - result.cause().getMessage() - ); - - throw new RuntimeException(result.cause()); - } - - LOGGER.info("Vert.x HttpServer stopped"); - } finally { - latch.countDown(); - } - }); - - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }, - executor - ); - } - - @Override - public void handle(HttpServerRequest request) { - if (request.method() != HttpMethod.POST) { - HttpServerResponse response = request.response(); - response.setStatusCode(405); - response.putHeader(Exchange.CONTENT_TYPE, "text/plain"); - response.end("Unsupported method: " + request.method()); - - return; - } - - LOGGER.debug("received exchange on path: {}, headers: {}", - request.path(), - request.headers() - ); - - for (KnativeHttp.PredicatedHandler handler: handlers) { - if (handler.canHandle(request)) { - handler.handle(request); - return; - } - } - - LOGGER.warn("No handler found for path: {}, headers: {}", - request.path(), - request.headers() - ); - - HttpServerResponse response = request.response(); - response.setStatusCode(404); - response.putHeader(Exchange.CONTENT_TYPE, "text/plain"); - response.end("No matching condition found"); - } - } -} diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java index 124d91a4a..1e1b7ab7b 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java @@ -19,7 +19,6 @@ import java.util.Map; import io.vertx.core.MultiMap; -import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpHeaders; import io.vertx.ext.web.client.HttpResponse; @@ -32,6 +31,7 @@ import org.apache.camel.InvalidPayloadException; import org.apache.camel.Message; import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.k.http.PlatformHttp; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.DefaultMessage; @@ -46,7 +46,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { private final KnativeHttpTransport transport; private final KnativeEnvironment.KnativeServiceDefinition serviceDefinition; - private final Vertx vertx; + private final PlatformHttp platformHttp; private final WebClientOptions clientOptions; private final HeaderFilterStrategy headerFilterStrategy; @@ -56,13 +56,13 @@ public KnativeHttpProducer( KnativeHttpTransport transport, Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition serviceDefinition, - Vertx vertx, + PlatformHttp platformHttp, WebClientOptions clientOptions) { super(endpoint); this.transport = transport; this.serviceDefinition = serviceDefinition; - this.vertx = ObjectHelper.notNull(vertx, "vertx"); + this.platformHttp = ObjectHelper.notNull(platformHttp, "vertx"); this.clientOptions = ObjectHelper.supplyIfEmpty(clientOptions, WebClientOptions::new); this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy(); } @@ -166,7 +166,7 @@ public boolean process(Exchange exchange, AsyncCallback callback) { protected void doInit() throws Exception { super.doInit(); - this.client = WebClient.create(vertx, clientOptions); + this.client = WebClient.create(platformHttp.vertx(), clientOptions); } @Override @@ -181,11 +181,8 @@ protected void doStop() throws Exception { } private String getURI() { - String p = serviceDefinition.getPath(); - - if (p == null) { - p = KnativeHttp.DEFAULT_PATH; - } else if (!p.startsWith("/")) { + String p = ObjectHelper.supplyIfEmpty(serviceDefinition.getPath(), () -> KnativeHttp.DEFAULT_PATH); + if (!p.startsWith("/")) { p = "/" + p; } diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java index faeba6a08..f113e781b 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java @@ -31,7 +31,6 @@ import org.apache.camel.component.knative.spi.Knative; import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.support.processor.DelegateAsyncProcessor; -import org.apache.camel.util.ObjectHelper; public final class KnativeHttpSupport { private KnativeHttpSupport() { @@ -63,13 +62,7 @@ public static Predicate createFilter(KnativeEnvironment.Knati e -> e.getValue() )); - - String path = ObjectHelper.supplyIfEmpty(serviceDefinition.getPath(), () -> KnativeHttp.DEFAULT_PATH); - return v -> { - if (!Objects.equals(path, v.path())) { - return false; - } if (filters.isEmpty()) { return true; } diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java index 5850a534a..aa15464f4 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java @@ -16,16 +16,6 @@ */ package org.apache.camel.component.knative.http; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - -import io.vertx.core.Vertx; -import io.vertx.core.VertxOptions; -import io.vertx.core.http.HttpServerOptions; import io.vertx.ext.web.client.WebClientOptions; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; @@ -36,66 +26,33 @@ import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.component.knative.spi.KnativeTransport; import org.apache.camel.component.knative.spi.KnativeTransportConfiguration; +import org.apache.camel.k.http.PlatformHttp; import org.apache.camel.support.service.ServiceSupport; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class KnativeHttpTransport extends ServiceSupport implements CamelContextAware, KnativeTransport { - private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpTransport.class); - - private final Map registry; - - private Vertx vertx; - private VertxOptions vertxOptions; - private HttpServerOptions vertxHttpServerOptions; + private PlatformHttp platformHttp; private WebClientOptions vertxHttpClientOptions; private CamelContext camelContext; - private boolean localVertx; - private ExecutorService executor; - public KnativeHttpTransport() { - this.registry = new ConcurrentHashMap<>(); - this.localVertx = false; - } - - public Vertx getVertx() { - return vertx; - } - - public void setVertx(Vertx vertx) { - this.vertx = vertx; - } - - public VertxOptions getVertxOptions() { - return vertxOptions; } - public void setVertxOptions(VertxOptions vertxOptions) { - this.vertxOptions = vertxOptions; + public PlatformHttp getPlatformHttp() { + return platformHttp; } - public HttpServerOptions getVertxHttpServerOptions() { - return vertxHttpServerOptions; + public void setPlatformHttp(PlatformHttp platformHttp) { + this.platformHttp = platformHttp; } - public void setVertxHttpServerOptions(HttpServerOptions vertxHttpServerOptions) { - this.vertxHttpServerOptions = vertxHttpServerOptions; - } - - public WebClientOptions getVertxHttpClientOptions() { + public WebClientOptions getClientOptions() { return vertxHttpClientOptions; } - public void setVertxHttpClientOptions(WebClientOptions vertxHttpClientOptions) { + public void setClientOptions(WebClientOptions vertxHttpClientOptions) { this.vertxHttpClientOptions = vertxHttpClientOptions; } - KnativeHttpConsumerDispatcher getDispatcher(KnativeHttp.ServerKey key) { - return registry.computeIfAbsent(key, k -> new KnativeHttpConsumerDispatcher(executor, vertx, k, vertxHttpServerOptions)); - } - @Override public void setCamelContext(CamelContext camelContext) { this.camelContext = camelContext; @@ -114,82 +71,13 @@ public CamelContext getCamelContext() { @Override protected void doStart() throws Exception { - this.executor = getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "knative-http-component"); - - if (this.vertx != null) { - LOGGER.info("Using Vert.x instance configured on component: {}", this.vertx); - return; - } - - if (this.vertx == null) { - Set instances = getCamelContext().getRegistry().findByType(Vertx.class); - if (instances.size() == 1) { - this.vertx = instances.iterator().next(); - - // - // if this method is executed before the container is fully started, - // it may return a null reference, may be related to: - // - // https://groups.google.com/forum/#!topic/quarkus-dev/qSo65fTyYVA - // - if (this.vertx != null) { - LOGGER.info("Found Vert.x instance in registry: {}", this.vertx); - } - } - } - - if (this.vertx == null) { - LOGGER.info("Creating new Vert.x instance"); - - VertxOptions options = ObjectHelper.supplyIfEmpty(this.vertxOptions, VertxOptions::new); - - this.vertx = Vertx.vertx(options); - this.localVertx = true; + if (this.platformHttp == null) { + this.platformHttp = PlatformHttp.lookup(camelContext); } } @Override protected void doStop() throws Exception { - if (this.vertx != null && this.localVertx) { - Future future = this.executor.submit( - () -> { - CountDownLatch latch = new CountDownLatch(1); - - this.vertx.close(result -> { - try { - if (result.failed()) { - LOGGER.warn("Failed to close Vert.x HttpServer reason: {}", - result.cause().getMessage() - ); - - throw new RuntimeException(result.cause()); - } - - LOGGER.info("Vert.x HttpServer stopped"); - } finally { - latch.countDown(); - } - }); - - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - ); - - try { - future.get(); - } finally { - this.vertx = null; - this.localVertx = false; - } - } - - if (this.executor != null) { - getCamelContext().getExecutorServiceManager().shutdownNow(this.executor); - } } // ***************************** @@ -200,7 +88,7 @@ protected void doStop() throws Exception { @Override public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service) { - return new KnativeHttpProducer(this, endpoint, service, vertx, vertxHttpClientOptions); + return new KnativeHttpProducer(this, endpoint, service, this.platformHttp, vertxHttpClientOptions); } @Override @@ -209,7 +97,7 @@ public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration if (config.isRemoveCloudEventHeadersInReply()) { next = KnativeHttpSupport.withoutCloudEventHeaders(processor, config.getCloudEvent()); } - return new KnativeHttpConsumer(this, endpoint, service, next); + return new KnativeHttpConsumer(this, endpoint, service, this.platformHttp, next); } } diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index 06bba766d..405356d46 100644 --- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -27,7 +27,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import com.fasterxml.jackson.databind.ObjectMapper; +import io.restassured.RestAssured; +import io.restassured.mapper.ObjectMapperType; import io.undertow.Undertow; import io.undertow.server.HttpServerExchange; import io.undertow.util.HeaderMap; @@ -45,24 +46,32 @@ import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.http.base.HttpOperationFailedException; import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.k.http.PlatformHttpServiceContextCustomizer; import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.util.ObjectHelper; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import static io.restassured.RestAssured.config; +import static io.restassured.RestAssured.given; +import static io.restassured.config.EncoderConfig.encoderConfig; import static org.apache.camel.component.knative.http.KnativeHttpTestSupport.configureKnativeComponent; import static org.apache.camel.component.knative.spi.KnativeEnvironment.channel; import static org.apache.camel.component.knative.spi.KnativeEnvironment.endpoint; import static org.apache.camel.component.knative.spi.KnativeEnvironment.event; +import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceEndpoint; +import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceEvent; +import static org.apache.camel.util.CollectionHelper.mapOf; import static org.assertj.core.api.Assertions.assertThat; public class KnativeHttpTest { private CamelContext context; private ProducerTemplate template; - private int port; + private int platformHttpPort; // ************************** // @@ -74,7 +83,14 @@ public class KnativeHttpTest { public void before() { this.context = new DefaultCamelContext(); this.template = this.context.createProducerTemplate(); - this.port = AvailablePortFinder.getNextAvailable(); + this.platformHttpPort = AvailablePortFinder.getNextAvailable(); + + PlatformHttpServiceContextCustomizer httpService = new PlatformHttpServiceContextCustomizer(); + httpService.setBindPort(this.platformHttpPort); + httpService.apply(context); + + RestAssured.port = platformHttpPort; + RestAssured.config = config().encoderConfig(encoderConfig().appendDefaultContentCharsetToContentTypeIfUndefined(false)); } @AfterEach @@ -97,7 +113,88 @@ void testCreateComponent() { assertThat(context.getComponent("knative")).isInstanceOfSatisfying(KnativeComponent.class, c -> { assertThat(c.getTransport()).isInstanceOf(KnativeHttpTransport.class); }); + } + + void doTestKnativeSource(CloudEvent ce, String basePath, String path) throws Exception { + KnativeComponent component = configureKnativeComponent( + context, + CloudEvents.V03, + sourceEndpoint( + "myEndpoint", + KnativeSupport.mapOf( + Knative.SERVICE_META_PATH, path, + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + )) + ); + + if (ObjectHelper.isNotEmpty(basePath)) { + component.getConfiguration().addTransportOptions("basePath", basePath); + } + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("knative:endpoint/myEndpoint") + .to("mock:ce"); + } + }); + + context.start(); + MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere"); + mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())); + mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); + + String targetPath = ObjectHelper.supplyIfEmpty(path, () -> "/"); + if (ObjectHelper.isNotEmpty(basePath)) { + targetPath = basePath + targetPath; + } + + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .when() + .post(targetPath) + .then() + .statusCode(200); + + mock.assertIsSatisfied(); + } + + @ParameterizedTest + @EnumSource(CloudEvents.class) + void testKnativeSource(CloudEvent ce) throws Exception { + doTestKnativeSource(ce, null, null); + } + + @ParameterizedTest + @EnumSource(CloudEvents.class) + void testKnativeSourceWithPath(CloudEvent ce) throws Exception { + doTestKnativeSource(ce, null, "/a/path"); + } + + @ParameterizedTest + @EnumSource(CloudEvents.class) + void testKnativeSourceWithBasePath(CloudEvent ce) throws Exception { + doTestKnativeSource(ce, "/base", null); + } + + @ParameterizedTest + @EnumSource(CloudEvents.class) + void testKnativeSourceWithBasePathAndPath(CloudEvent ce) throws Exception { + doTestKnativeSource(ce, "/base", "/a/path"); } @ParameterizedTest @@ -110,7 +207,7 @@ void testInvokeEndpoint(CloudEvent ce) throws Exception { Knative.EndpointKind.sink, "myEndpoint", "localhost", - port, + platformHttpPort, KnativeSupport.mapOf( Knative.SERVICE_META_PATH, "/a/path", Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", @@ -123,7 +220,7 @@ void testInvokeEndpoint(CloudEvent ce) throws Exception { public void configure() throws Exception { from("direct:source") .to("knative:endpoint/myEndpoint"); - fromF("undertow:http://localhost:%d/a/path", port) + from("platform-http:/a/path") .to("mock:ce"); } }); @@ -151,13 +248,9 @@ void testConsumeStructuredContent(CloudEvent ce) throws Exception { configureKnativeComponent( context, ce, - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "myEndpoint", - "localhost", - port, KnativeSupport.mapOf( - Knative.SERVICE_META_PATH, "/a/path", Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) @@ -168,8 +261,6 @@ void testConsumeStructuredContent(CloudEvent ce) throws Exception { public void configure() throws Exception { from("knative:endpoint/myEndpoint") .to("mock:ce"); - from("direct:source") - .toF("undertow:http://localhost:%d/a/path", port); } }); @@ -185,23 +276,30 @@ public void configure() throws Exception { mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); - context.createProducerTemplate().send( - "direct:source", - e -> { - e.getMessage().setHeader(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE); - - if (Objects.equals(CloudEvents.V01.version(), ce.version())) { - e.getMessage().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf( - "cloudEventsVersion", ce.version(), - "eventType", "org.apache.camel.event", - "eventID", "myEventID", - "eventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()), - "source", "/somewhere", - "contentType", "text/plain", - "data", "test" - ))); - } else if (Objects.equals(CloudEvents.V02.version(), ce.version())) { - e.getMessage().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf( + if (Objects.equals(CloudEvents.V01.version(), ce.version())) { + given() + .contentType(Knative.MIME_STRUCTURED_CONTENT_MODE) + .body( + mapOf( + "cloudEventsVersion", ce.version(), + "eventType", "org.apache.camel.event", + "eventID", "myEventID", + "eventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()), + "source", "/somewhere", + "contentType", "text/plain", + "data", "test" + ), + ObjectMapperType.JACKSON_2 + ) + .when() + .post() + .then() + .statusCode(200); + } else if (Objects.equals(CloudEvents.V02.version(), ce.version())) { + given() + .contentType(Knative.MIME_STRUCTURED_CONTENT_MODE) + .body( + mapOf( "specversion", ce.version(), "type", "org.apache.camel.event", "id", "myEventID", @@ -209,9 +307,18 @@ public void configure() throws Exception { "source", "/somewhere", "contenttype", "text/plain", "data", "test" - ))); - } else if (Objects.equals(CloudEvents.V03.version(), ce.version())) { - e.getMessage().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf( + ), + ObjectMapperType.JACKSON_2 + ) + .when() + .post() + .then() + .statusCode(200); + } else if (Objects.equals(CloudEvents.V03.version(), ce.version())) { + given() + .contentType(Knative.MIME_STRUCTURED_CONTENT_MODE) + .body( + mapOf( "specversion", ce.version(), "type", "org.apache.camel.event", "id", "myEventID", @@ -219,12 +326,16 @@ public void configure() throws Exception { "source", "/somewhere", "datacontenttype", "text/plain", "data", "test" - ))); - } else { - throw new IllegalArgumentException("Unknown CloudEvent spec: " + ce.version()); - } - } - ); + ), + ObjectMapperType.JACKSON_2 + ) + .when() + .post() + .then() + .statusCode(200); + } else { + throw new IllegalArgumentException("Unknown CloudEvent spec: " + ce.version()); + } mock.assertIsSatisfied(); } @@ -235,13 +346,9 @@ void testConsumeContent(CloudEvent ce) throws Exception { configureKnativeComponent( context, ce, - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "myEndpoint", - "localhost", - port, KnativeSupport.mapOf( - Knative.SERVICE_META_PATH, "/a/path", Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) @@ -252,8 +359,6 @@ void testConsumeContent(CloudEvent ce) throws Exception { public void configure() throws Exception { from("knative:endpoint/myEndpoint") .to("mock:ce"); - from("direct:source") - .toF("undertow:http://localhost:%d/a/path", port); } }); @@ -274,18 +379,18 @@ public void configure() throws Exception { mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); - context.createProducerTemplate().send( - "direct:source", - e -> { - e.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/plain"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere"); - e.getMessage().setBody("test"); - } - ); + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .when() + .post() + .then() + .statusCode(200); mock.assertIsSatisfied(); } @@ -296,21 +401,15 @@ void testConsumeContentWithFilter(CloudEvent ce) throws Exception { configureKnativeComponent( context, ce, - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "ep1", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1" )), - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "ep2", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", @@ -329,13 +428,6 @@ public void configure() throws Exception { .convertBodyTo(String.class) .to("log:ce2?showAll=true&multiline=true") .to("mock:ce2"); - - from("direct:source") - .setBody() - .constant("test") - .setHeader(Exchange.HTTP_METHOD) - .constant("POST") - .toD("undertow:http://localhost:" + port); } }); @@ -359,26 +451,31 @@ public void configure() throws Exception { mock2.expectedBodiesReceived("test"); mock2.expectedMessageCount(1); - context.createProducerTemplate().send( - "direct:source", - e -> { - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1"); - } - ); - context.createProducerTemplate().send( - "direct:source", - e -> { - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2"); - } - ); + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1") + .when() + .post() + .then() + .statusCode(200); + + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2") + .when() + .post() + .then() + .statusCode(200); mock1.assertIsSatisfied(); mock2.assertIsSatisfied(); @@ -390,21 +487,15 @@ void testConsumeContentWithRegExFilter(CloudEvent ce) throws Exception { configureKnativeComponent( context, ce, - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "ep1", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE[01234]" )), - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "ep2", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", @@ -423,13 +514,6 @@ public void configure() throws Exception { .convertBodyTo(String.class) .to("log:ce2?showAll=true&multiline=true") .to("mock:ce2"); - - from("direct:source") - .setBody() - .constant("test") - .setHeader(Exchange.HTTP_METHOD) - .constant("POST") - .toD("undertow:http://localhost:" + port); } }); @@ -453,26 +537,31 @@ public void configure() throws Exception { mock2.expectedBodiesReceived("test"); mock2.expectedMessageCount(1); - context.createProducerTemplate().send( - "direct:source", - e -> { - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE0"); - } - ); - context.createProducerTemplate().send( - "direct:source", - e -> { - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE5"); - } - ); + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE0") + .when() + .post() + .then() + .statusCode(200); + + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE5") + .when() + .post() + .then() + .statusCode(200); mock1.assertIsSatisfied(); mock2.assertIsSatisfied(); @@ -484,11 +573,7 @@ void testConsumeEventContent(CloudEvent ce) throws Exception { configureKnativeComponent( context, ce, - event( - Knative.EndpointKind.source, - "default", - "localhost", - port) + sourceEvent("default") ); context.addRoutes(new RouteBuilder() { @@ -502,13 +587,6 @@ public void configure() throws Exception { .convertBodyTo(String.class) .to("log:ce2?showAll=true&multiline=true") .to("mock:ce2"); - - from("direct:source") - .setBody() - .constant("test") - .setHeader(Exchange.HTTP_METHOD) - .constant("POST") - .toD("undertow:http://localhost:" + port); } }); @@ -532,26 +610,31 @@ public void configure() throws Exception { mock2.expectedBodiesReceived("test"); mock2.expectedMessageCount(1); - context.createProducerTemplate().send( - "direct:source", - e -> { - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event1"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1"); - } - ); - context.createProducerTemplate().send( - "direct:source", - e -> { - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event2"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2"); - } - ); + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event1") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1") + .when() + .post() + .then() + .statusCode(200); + + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event2") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2") + .when() + .post() + .then() + .statusCode(200); mock1.assertIsSatisfied(); mock2.assertIsSatisfied(); @@ -563,11 +646,8 @@ void testReply(CloudEvent ce) throws Exception { configureKnativeComponent( context, ce, - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "from", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" @@ -576,7 +656,7 @@ void testReply(CloudEvent ce) throws Exception { Knative.EndpointKind.sink, "to", "localhost", - port, + platformHttpPort, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" @@ -616,11 +696,8 @@ void testReplyCloudEventHeaders(CloudEvent ce) throws Exception { configureKnativeComponent( context, ce, - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "from", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" @@ -629,7 +706,7 @@ void testReplyCloudEventHeaders(CloudEvent ce) throws Exception { Knative.EndpointKind.sink, "to", "localhost", - port, + platformHttpPort, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" @@ -673,7 +750,7 @@ void testInvokeServiceWithoutHost(CloudEvent ce) throws Exception { Knative.EndpointKind.sink, "test", "", - port, + platformHttpPort, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" @@ -705,7 +782,7 @@ void testInvokeNotExistingEndpoint(CloudEvent ce) throws Exception { Knative.EndpointKind.sink, "test", "localhost", - port, + platformHttpPort, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" @@ -724,7 +801,7 @@ void testInvokeNotExistingEndpoint(CloudEvent ce) throws Exception { Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody("")); assertThat(exchange.isFailed()).isTrue(); assertThat(exchange.getException()).isInstanceOf(CamelException.class); - assertThat(exchange.getException()).hasMessageStartingWith("HTTP operation failed invoking http://localhost:" + port + "/"); + assertThat(exchange.getException()).hasMessageStartingWith("HTTP operation failed invoking http://localhost:" + platformHttpPort + "/"); } @ParameterizedTest @@ -733,22 +810,16 @@ void testRemoveConsumer(CloudEvent ce) throws Exception { configureKnativeComponent( context, ce, - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "ep1", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + "h", "h1" ) ), - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "ep2", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", @@ -768,7 +839,7 @@ void testRemoveConsumer(CloudEvent ce) throws Exception { RouteBuilder.addRoutes(context, b -> { b.from("direct:start") .setHeader("h").body() - .toF("undertow:http://localhost:%d", port); + .toF("http://localhost:%d", platformHttpPort); }); context.start(); @@ -790,22 +861,16 @@ void testAddConsumer(CloudEvent ce) throws Exception { configureKnativeComponent( context, ce, - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "ep1", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + "h", "h1" ) ), - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "ep2", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", @@ -822,7 +887,7 @@ void testAddConsumer(CloudEvent ce) throws Exception { RouteBuilder.addRoutes(context, b -> { b.from("direct:start") .setHeader("h").body() - .toF("undertow:http://localhost:%d", port); + .toF("http://localhost:%d", platformHttpPort); }); context.start(); @@ -853,7 +918,7 @@ void testInvokeEndpointWithError(CloudEvent ce) throws Exception { Knative.EndpointKind.sink, "ep", "localhost", - port, + platformHttpPort, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" @@ -865,7 +930,7 @@ void testInvokeEndpointWithError(CloudEvent ce) throws Exception { b.from("direct:start") .to("knative:endpoint/ep") .to("mock:start"); - b.fromF("undertow:http://0.0.0.0:%d", port) + b.fromF("platform-http:/") .routeId("endpoint") .process(e -> { throw new RuntimeException("endpoint error"); @@ -891,16 +956,13 @@ void testEvents(CloudEvent ce) throws Exception { Knative.EndpointKind.sink, "default", "localhost", - port, + platformHttpPort, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )), - event( - Knative.EndpointKind.source, + sourceEvent( "default", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" @@ -943,18 +1005,15 @@ void testEventsWithTypeAndVersion(CloudEvent ce) throws Exception { Knative.EndpointKind.sink, "default", "localhost", - port, + platformHttpPort, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_KIND, "MyObject", Knative.KNATIVE_API_VERSION, "v1" )), - event( - Knative.EndpointKind.source, + sourceEvent( "default", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", @@ -995,22 +1054,16 @@ void testConsumeContentWithTypeAndVersion(CloudEvent ce) throws Exception { configureKnativeComponent( context, ce, - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "myEndpoint", - "localhost", - port + 1, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_KIND, "MyObject", Knative.KNATIVE_API_VERSION, "v1" )), - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "myEndpoint", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", @@ -1024,8 +1077,6 @@ void testConsumeContentWithTypeAndVersion(CloudEvent ce) throws Exception { public void configure() throws Exception { from("knative:endpoint/myEndpoint?kind=MyObject&apiVersion=v2") .to("mock:ce"); - from("direct:source") - .toF("undertow:http://localhost:%d", port); } }); @@ -1041,18 +1092,18 @@ public void configure() throws Exception { mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); - context.createProducerTemplate().send( - "direct:source", - e -> { - e.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/plain"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID"); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere"); - e.getMessage().setBody("test"); - } - ); + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .when() + .post() + .then() + .statusCode(200); mock.assertIsSatisfied(); } @@ -1063,11 +1114,8 @@ void testWrongMethod(CloudEvent ce) throws Exception { configureKnativeComponent( context, ce, - endpoint( - Knative.EndpointKind.source, + sourceEndpoint( "myEndpoint", - "localhost", - port, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" @@ -1079,18 +1127,18 @@ void testWrongMethod(CloudEvent ce) throws Exception { public void configure() throws Exception { from("knative:endpoint/myEndpoint") .to("mock:ce"); - from("direct:start") - .toF("undertow:http://localhost:%d", port); } }); context.start(); - Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody(null)); - assertThat(exchange.isFailed()).isTrue(); - assertThat(exchange.getException()).isInstanceOf(CamelException.class); - assertThat(exchange.getException()).hasMessageStartingWith("HTTP operation failed invoking"); - assertThat(exchange.getException()).hasMessageEndingWith("with statusCode: 405"); + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .when() + .get() + .then() + .statusCode(404); } @ParameterizedTest @@ -1103,7 +1151,7 @@ void testNoBody(CloudEvent ce) throws Exception { Knative.EndpointKind.sink, "myEndpoint", "localhost", - port, + platformHttpPort, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" @@ -1129,7 +1177,6 @@ public void configure() throws Exception { @ParameterizedTest @EnumSource(CloudEvents.class) void testNoContent(CloudEvent ce) throws Exception { - final int messagesPort = AvailablePortFinder.getNextAvailable(); final int wordsPort = AvailablePortFinder.getNextAvailable(); configureKnativeComponent( @@ -1138,8 +1185,8 @@ void testNoContent(CloudEvent ce) throws Exception { channel( Knative.EndpointKind.source, "messages", - "localhost", - messagesPort, + null, + -1, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" @@ -1148,7 +1195,7 @@ void testNoContent(CloudEvent ce) throws Exception { Knative.EndpointKind.sink, "messages", "localhost", - messagesPort, + platformHttpPort, KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" @@ -1202,11 +1249,8 @@ void testOrdering(CloudEvent ce) throws Exception { .ints(0, 100) .distinct() .limit(10) - .mapToObj(i -> endpoint( - Knative.EndpointKind.source, - "channel-" + i, - "localhost", - port, + .mapToObj(i -> sourceEndpoint( + "ep-" + i, KnativeSupport.mapOf(Knative.KNATIVE_FILTER_PREFIX + "MyHeader", "channel-" + i))) .collect(Collectors.toList()); @@ -1216,8 +1260,8 @@ void testOrdering(CloudEvent ce) throws Exception { @Override public void configure() throws Exception { from("direct:start") - .routeId("undertow") - .toF("undertow:http://localhost:%d", port) + .routeId("http") + .toF("http://localhost:%d", platformHttpPort) .convertBodyTo(String.class); for (KnativeEnvironment.KnativeServiceDefinition definition: hops) { @@ -1249,6 +1293,8 @@ public void configure() throws Exception { @ParameterizedTest @EnumSource(CloudEvents.class) void testHeaders(CloudEvent ce) throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + configureKnativeComponent( context, ce, @@ -1302,6 +1348,7 @@ void testHeaders(CloudEvent ce) throws Exception { @ParameterizedTest @EnumSource(CloudEvents.class) void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); final String typeHeaderVal = UUID.randomUUID().toString(); final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(); @@ -1362,6 +1409,7 @@ void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception { @ParameterizedTest @EnumSource(CloudEvents.class) void testHeadersOverrideFromURI(CloudEvent ce) throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); final String typeHeaderVal = UUID.randomUUID().toString(); final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(); @@ -1422,6 +1470,7 @@ void testHeadersOverrideFromURI(CloudEvent ce) throws Exception { @ParameterizedTest @EnumSource(CloudEvents.class) void testHeadersOverrideFromConf(CloudEvent ce) throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); final String typeHeaderVal = UUID.randomUUID().toString(); final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(); @@ -1485,6 +1534,8 @@ void testHeadersOverrideFromConf(CloudEvent ce) throws Exception { @ParameterizedTest @EnumSource(CloudEvents.class) void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + configureKnativeComponent( context, ce, @@ -1539,6 +1590,8 @@ void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws Exception @ParameterizedTest @EnumSource(CloudEvents.class) void testHeadersOverrideFromRouteWithCEHeader(CloudEvent ce) throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + configureKnativeComponent( context, ce, diff --git a/camel-knative/camel-knative-http/src/test/resources/log4j2-test.xml b/camel-knative/camel-knative-http/src/test/resources/log4j2-test.xml index 82b517bb2..8c95e5457 100644 --- a/camel-knative/camel-knative-http/src/test/resources/log4j2-test.xml +++ b/camel-knative/camel-knative-http/src/test/resources/log4j2-test.xml @@ -26,10 +26,11 @@ + + + - + diff --git a/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json b/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json index 310c12a48..5cebe3ca5 100644 --- a/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json +++ b/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json @@ -16,7 +16,7 @@ "firstVersion": "3.0.0", "groupId": "org.apache.camel.k", "artifactId": "camel-knative", - "version": "1.2.0-SNAPSHOT" + "version": "1.2.2-SNAPSHOT" }, "componentProperties": { "cloudEventsSpecVersion": { "kind": "property", "displayName": "Cloud Events Spec Version", "group": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Set the version of the cloudevents spec." }, @@ -24,7 +24,7 @@ "environment": { "kind": "property", "displayName": "Environment", "group": "common", "required": false, "type": "object", "javaType": "org.apache.camel.component.knative.spi.KnativeEnvironment", "deprecated": false, "secret": false, "description": "The environment" }, "environmentPath": { "kind": "property", "displayName": "Environment Path", "group": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "The path ot the environment definition" }, "transport": { "kind": "property", "displayName": "Transport", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.knative.spi.KnativeTransport", "deprecated": false, "secret": false, "defaultValue": "http", "description": "The transport implementation." }, - "transportOptions": { "kind": "property", "displayName": "Transport Options", "group": "common", "required": false, "type": "object", "javaType": "java.util.Map", "deprecated": false, "secret": false, "description": "Transport options." }, + "transportOptions": { "kind": "property", "displayName": "Transport Options", "group": "common", "label": "", "required": false, "type": "object", "javaType": "java.util.Map", "deprecated": false, "secret": false, "description": "Transport options." }, "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored." }, "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, "basicPropertyBinding": { "kind": "property", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" } diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java index f33553a65..9cbfa6b80 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java @@ -52,6 +52,9 @@ public class KnativeComponent extends DefaultComponent { @Metadata(defaultValue = "http") private KnativeTransport transport; + @Metadata + private Map transportOptions; + private boolean managedTransport = true; public KnativeComponent() { @@ -62,6 +65,7 @@ public KnativeComponent(CamelContext context) { super(context); this.configuration = new KnativeConfiguration(); + this.configuration.setTransportOptions(new HashMap<>()); } // ************************ @@ -114,17 +118,6 @@ public void setCloudEventsSpecVersion(String cloudEventSpecVersion) { configuration.setCloudEventsSpecVersion(cloudEventSpecVersion); } - public Map getTransportOptions() { - return configuration.getTransportOptions(); - } - - /** - * Transport options. - */ - public void setTransportOptions(Map transportOptions) { - configuration.setTransportOptions(transportOptions); - } - public Knative.Protocol getProtocol() { return protocol; } @@ -148,6 +141,17 @@ public void setTransport(KnativeTransport transport) { this.transport = transport; } + public Map getTransportOptions() { + return configuration.getTransportOptions(); + } + + /** + * Transport options. + */ + public void setTransportOptions(Map transportOptions) { + configuration.setTransportOptions(transportOptions); + } + // ************************ // // Lifecycle @@ -208,7 +212,7 @@ protected void doStop() throws Exception { @Override protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { if (ObjectHelper.isEmpty(remaining)) { - throw new IllegalArgumentException("Expecting URI in the forof: 'knative:type/name', got '" + uri + "'"); + throw new IllegalArgumentException("Expecting URI in the form of: 'knative:type/name', got '" + uri + "'"); } final String type = StringHelper.before(remaining, "/"); diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java index bd29d70dc..825275e86 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.knative; +import java.util.HashMap; import java.util.Map; import org.apache.camel.RuntimeCamelException; @@ -128,6 +129,17 @@ public void setTransportOptions(Map transportOptions) { this.transportOptions = transportOptions; } + /** + * Add a transport option. + */ + public void addTransportOptions(String key, Object value) { + if (this.transportOptions == null) { + this.transportOptions = new HashMap<>(); + } + + this.transportOptions.put(key, value); + } + public Map getFilters() { return filters; } diff --git a/examples/camel-k-runtime-example-knative/src/main/resources/application.properties b/examples/camel-k-runtime-example-knative/data/application.properties similarity index 88% rename from examples/camel-k-runtime-example-knative/src/main/resources/application.properties rename to examples/camel-k-runtime-example-knative/data/application.properties index 85a8fe9c8..1dc40847b 100644 --- a/examples/camel-k-runtime-example-knative/src/main/resources/application.properties +++ b/examples/camel-k-runtime-example-knative/data/application.properties @@ -20,6 +20,12 @@ # logging.level.org.apache.camel.k = DEBUG +# +# camel-k +# +customizer.platform-http.enabled = true +customizer.platform-http.bind-port = 8080 + # # camel - main # @@ -30,4 +36,4 @@ camel.main.stream-caching-spool-directory = ${java.io.tmpdir}/camel-k # # Camel - components # -camel.component.knative.environment-path = file:src/main/resources/env.json \ No newline at end of file +camel.component.knative.environment-path = file:data/env.json \ No newline at end of file diff --git a/examples/camel-k-runtime-example-knative/src/main/resources/env.json b/examples/camel-k-runtime-example-knative/data/env.json similarity index 76% rename from examples/camel-k-runtime-example-knative/src/main/resources/env.json rename to examples/camel-k-runtime-example-knative/data/env.json index 24dbf20d2..204a3e08f 100644 --- a/examples/camel-k-runtime-example-knative/src/main/resources/env.json +++ b/examples/camel-k-runtime-example-knative/data/env.json @@ -2,8 +2,6 @@ "services": [{ "type": "endpoint", "name": "from", - "host": "0.0.0.0", - "port": 9090, "metadata": { "camel.endpoint.kind": "source" } diff --git a/examples/camel-k-runtime-example-knative/src/main/resources/routes.yaml b/examples/camel-k-runtime-example-knative/data/routes.yaml similarity index 100% rename from examples/camel-k-runtime-example-knative/src/main/resources/routes.yaml rename to examples/camel-k-runtime-example-knative/data/routes.yaml diff --git a/examples/camel-k-runtime-example-knative/pom.xml b/examples/camel-k-runtime-example-knative/pom.xml index 57eb161fd..c8a706436 100644 --- a/examples/camel-k-runtime-example-knative/pom.xml +++ b/examples/camel-k-runtime-example-knative/pom.xml @@ -71,11 +71,11 @@ camel.k.conf - ${project.basedir}/src/main/resources/application.properties + ${project.basedir}/data/application.properties camel.k.routes - file:${project.basedir}/src/main/resources/routes.yaml + file:${project.basedir}/data/routes.yaml diff --git a/pom.xml b/pom.xml index d6a58fea1..912b8a4c7 100644 --- a/pom.xml +++ b/pom.xml @@ -217,6 +217,7 @@ camel-k-loader-java camel-k-runtime-cron + camel-k-runtime-http camel-k-runtime-knative camel-k-runtime-master camel-k-runtime-webhook