From 698a8807e84403c20da6fd497b191a7ad10f2d03 Mon Sep 17 00:00:00 2001 From: lburgazzoli Date: Fri, 15 May 2020 14:21:07 +0200 Subject: [PATCH] https://github.com/apache/camel-k-runtime/issues/326 --- camel-knative/camel-knative-http/pom.xml | 5 - .../knative/http/KnativeHttpSupport.java | 30 +- .../knative/http/KnativeHttpTransport.java | 8 +- .../knative/http/KnativeHttpServer.java | 214 ++++++ .../knative/http/KnativeHttpTest.java | 656 ++++++++---------- .../ce/AbstractCloudEventProcessor.java | 32 +- 6 files changed, 563 insertions(+), 382 deletions(-) create mode 100644 camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java diff --git a/camel-knative/camel-knative-http/pom.xml b/camel-knative/camel-knative-http/pom.xml index abd18b096..12f6b5d1c 100644 --- a/camel-knative/camel-knative-http/pom.xml +++ b/camel-knative/camel-knative-http/pom.xml @@ -92,11 +92,6 @@ camel-log test - - org.apache.camel - camel-undertow - test - org.apache.camel camel-http diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java index f113e781b..36ef1ca2b 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java @@ -26,6 +26,7 @@ import io.vertx.core.http.HttpServerRequest; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.knative.spi.CloudEvent; import org.apache.camel.component.knative.spi.Knative; @@ -101,10 +102,37 @@ public static Processor withoutCloudEventHeaders(Processor delegate, CloudEvent @Override public boolean process(Exchange exchange, AsyncCallback callback) { return processor.process(exchange, doneSync -> { + final Message message = exchange.getMessage(); + // remove CloudEvent headers for (CloudEvent.Attribute attr : ce.attributes()) { - exchange.getMessage().removeHeader(attr.http()); + message.removeHeader(attr.http()); + } + + callback.done(doneSync); + }); + } + }; + } + + /** + * Remap camel headers to cloud event http headers. + */ + public static Processor remalCloudEventHeaders(Processor delegate, CloudEvent ce) { + return new DelegateAsyncProcessor(delegate) { + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + return processor.process(exchange, doneSync -> { + final Message message = exchange.getMessage(); + + // remap CloudEvent camel --> http + for (CloudEvent.Attribute attr : ce.attributes()) { + Object value = message.getHeader(attr.id()); + if (value != null) { + message.setHeader(attr.http(), value); + } } + callback.done(doneSync); }); } diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java index 9e7eb51c6..cdd7c8b09 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java @@ -32,7 +32,7 @@ public class KnativeHttpTransport extends ServiceSupport implements CamelContextAware, KnativeTransport { public static final int DEFAULT_PORT = 8080; public static final String DEFAULT_PATH = "/"; - + private PlatformHttp platformHttp; private WebClientOptions vertxHttpClientOptions; private CamelContext camelContext; @@ -96,10 +96,12 @@ public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration @Override public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) { - Processor next = processor; + Processor next = KnativeHttpSupport.remalCloudEventHeaders(processor, config.getCloudEvent()); + if (config.isRemoveCloudEventHeadersInReply()) { - next = KnativeHttpSupport.withoutCloudEventHeaders(processor, config.getCloudEvent()); + next = KnativeHttpSupport.withoutCloudEventHeaders(next, config.getCloudEvent()); } + return new KnativeHttpConsumer(this, endpoint, service, this.platformHttp, next); } diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java new file mode 100644 index 000000000..cf3a50700 --- /dev/null +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.knative.http; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.handler.BodyHandler; +import org.apache.camel.CamelContext; +import org.apache.camel.component.platform.http.PlatformHttpConstants; +import org.apache.camel.support.service.ServiceSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KnativeHttpServer extends ServiceSupport { + private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpServer.class); + + private final CamelContext context; + private final String host; + private final int port; + private final String path; + + private Vertx vertx; + private Router router; + private ExecutorService executor; + private HttpServer server; + private BlockingQueue requests; + private Handler handler; + + public KnativeHttpServer(CamelContext context, int port) { + this(context, "localhost", port, "/", null); + } + + public KnativeHttpServer(CamelContext context, int port, Handler handler) { + this(context, "localhost", port, "/", handler); + } + + public KnativeHttpServer(CamelContext context, String host, int port, String path) { + this(context, host, port, path, null); + } + + public KnativeHttpServer(CamelContext context, String host, int port, String path, Handler handler) { + this.context = context; + this.host = host; + this.port = port; + this.path = path; + this.requests = new LinkedBlockingQueue<>(); + this.handler = handler != null + ? handler + : event -> { + event.response().setStatusCode(200); + event.response().end(); + }; + } + + public HttpServerRequest poll(int timeout, TimeUnit unit) throws InterruptedException { + return requests.poll(timeout, unit); + } + + @Override + protected void doStart() throws Exception { + this.executor = context.getExecutorServiceManager().newSingleThreadExecutor(this, "knative-http-server"); + this.vertx = Vertx.vertx(); + this.server = vertx.createHttpServer(); + this.router = Router.router(vertx); + this.router.route(path) + .handler(event -> { + event.request().resume(); + BodyHandler.create().handle(event); + }) + .handler(event -> { + this.requests.offer(event.request()); + event.next(); + }) + .handler(handler); + + CompletableFuture.runAsync( + () -> { + CountDownLatch latch = new CountDownLatch(1); + server.requestHandler(router).listen(port, host, result -> { + try { + if (result.failed()) { + LOGGER.warn("Failed to start Vert.x HttpServer on {}:{}, reason: {}", + host, + port, + result.cause().getMessage() + ); + + throw new RuntimeException(result.cause()); + } + + LOGGER.info("Vert.x HttpServer started on {}:{}", host, port); + } finally { + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + executor + ).toCompletableFuture().join(); + } + + @Override + protected void doStop() throws Exception { + try { + if (server != null) { + CompletableFuture.runAsync( + () -> { + CountDownLatch latch = new CountDownLatch(1); + + // remove the platform-http component + context.removeComponent(PlatformHttpConstants.PLATFORM_HTTP_COMPONENT_NAME); + + server.close(result -> { + try { + if (result.failed()) { + LOGGER.warn("Failed to close Vert.x HttpServer reason: {}", + result.cause().getMessage() + ); + + throw new RuntimeException(result.cause()); + } + + LOGGER.info("Vert.x HttpServer stopped"); + } finally { + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + executor + ).toCompletableFuture().join(); + } + } finally { + this.server = null; + } + + if (vertx != null) { + Future future = executor.submit( + () -> { + CountDownLatch latch = new CountDownLatch(1); + + vertx.close(result -> { + try { + if (result.failed()) { + LOGGER.warn("Failed to close Vert.x reason: {}", + result.cause().getMessage() + ); + + throw new RuntimeException(result.cause()); + } + + LOGGER.info("Vert.x stopped"); + } finally { + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + ); + + try { + future.get(); + } finally { + vertx = null; + } + } + + if (executor != null) { + context.getExecutorServiceManager().shutdown(executor); + executor = null; + } + } +} diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index 657a74ed3..e0d661218 100644 --- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -23,15 +23,12 @@ import java.util.Objects; import java.util.Random; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import io.restassured.RestAssured; import io.restassured.mapper.ObjectMapperType; -import io.undertow.Undertow; -import io.undertow.server.HttpServerExchange; -import io.undertow.util.HeaderMap; +import io.vertx.core.http.HttpServerRequest; import org.apache.camel.CamelContext; import org.apache.camel.CamelException; import org.apache.camel.Exchange; @@ -117,7 +114,7 @@ void testCreateComponent() { void doTestKnativeSource(CloudEvent ce, String basePath, String path) throws Exception { KnativeComponent component = configureKnativeComponent( context, - CloudEvents.V03, + ce, sourceEndpoint( "myEndpoint", mapOf( @@ -131,23 +128,20 @@ void doTestKnativeSource(CloudEvent ce, String basePath, String path) throws Exc component.getConfiguration().addTransportOptions("basePath", basePath); } - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("knative:endpoint/myEndpoint") - .to("mock:ce"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("knative:endpoint/myEndpoint") + .to("mock:ce"); }); context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere"); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version()); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event"); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "/somewhere"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http())); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME)); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_ID)); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -214,14 +208,11 @@ void testInvokeEndpoint(CloudEvent ce) throws Exception { )) ); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:source") - .to("knative:endpoint/myEndpoint"); - from("platform-http:/a/path") - .to("mock:ce"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("direct:source") + .to("knative:endpoint/myEndpoint"); + b.from("platform-http:/a/path") + .to("mock:ce"); }); context.start(); @@ -236,7 +227,7 @@ public void configure() throws Exception { mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); - context.createProducerTemplate().sendBody("direct:source", "test"); + template.sendBody("direct:source", "test"); mock.assertIsSatisfied(); } @@ -255,23 +246,20 @@ void testConsumeStructuredContent(CloudEvent ce) throws Exception { )) ); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("knative:endpoint/myEndpoint") - .to("mock:ce"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("knative:endpoint/myEndpoint") + .to("mock:ce"); }); context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "/somewhere"); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version()); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event"); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID"); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "/somewhere"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME)); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -280,19 +268,19 @@ public void configure() throws Exception { .contentType(Knative.MIME_STRUCTURED_CONTENT_MODE) .body( mapOf( - "cloudEventsVersion", ce.version(), - "eventType", "org.apache.camel.event", - "eventID", "myEventID", - "eventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()), - "source", "/somewhere", - "contentType", "text/plain", - "data", "test" + "cloudEventsVersion", ce.version(), + "eventType", "org.apache.camel.event", + "eventID", "myEventID", + "eventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()), + "source", "/somewhere", + "contentType", "text/plain", + "data", "test" ), ObjectMapperType.JACKSON_2 ) - .when() + .when() .post() - .then() + .then() .statusCode(200); } else if (Objects.equals(CloudEvents.V02.version(), ce.version())) { given() @@ -309,9 +297,9 @@ public void configure() throws Exception { ), ObjectMapperType.JACKSON_2 ) - .when() + .when() .post() - .then() + .then() .statusCode(200); } else if (Objects.equals(CloudEvents.V03.version(), ce.version())) { given() @@ -328,9 +316,9 @@ public void configure() throws Exception { ), ObjectMapperType.JACKSON_2 ) - .when() + .when() .post() - .then() + .then() .statusCode(200); } else { throw new IllegalArgumentException("Unknown CloudEvent spec: " + ce.version()); @@ -353,28 +341,20 @@ void testConsumeContent(CloudEvent ce) throws Exception { )) ); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("knative:endpoint/myEndpoint") - .to("mock:ce"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("knative:endpoint/myEndpoint") + .to("mock:ce"); }); context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "/somewhere"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere"); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version()); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event"); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID"); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "/somewhere"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME)); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -416,37 +396,34 @@ void testConsumeContentWithFilter(CloudEvent ce) throws Exception { )) ); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("knative:endpoint/ep1") - .convertBodyTo(String.class) - .to("log:ce1?showAll=true&multiline=true") - .to("mock:ce1"); - from("knative:endpoint/ep2") - .convertBodyTo(String.class) - .to("log:ce2?showAll=true&multiline=true") - .to("mock:ce2"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("knative:endpoint/ep1") + .convertBodyTo(String.class) + .to("log:ce1?showAll=true&multiline=true") + .to("mock:ce1"); + b.from("knative:endpoint/ep2") + .convertBodyTo(String.class) + .to("log:ce2?showAll=true&multiline=true") + .to("mock:ce2"); }); context.start(); MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); - mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); - mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); - mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); - mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID1"); - mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE1"); + mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME)); + mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version()); + mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event"); + mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID1"); + mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE1"); mock1.expectedBodiesReceived("test"); mock1.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); - mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); - mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); - mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID2"); - mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE2"); + mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME)); + mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version()); + mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event"); + mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID2"); + mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE2"); mock2.expectedBodiesReceived("test"); mock2.expectedMessageCount(1); @@ -458,9 +435,9 @@ public void configure() throws Exception { .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1") .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1") - .when() + .when() .post() - .then() + .then() .statusCode(200); given() @@ -502,37 +479,34 @@ void testConsumeContentWithRegExFilter(CloudEvent ce) throws Exception { )) ); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("knative:endpoint/ep1") - .convertBodyTo(String.class) - .to("log:ce1?showAll=true&multiline=true") - .to("mock:ce1"); - from("knative:endpoint/ep2") - .convertBodyTo(String.class) - .to("log:ce2?showAll=true&multiline=true") - .to("mock:ce2"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("knative:endpoint/ep1") + .convertBodyTo(String.class) + .to("log:ce1?showAll=true&multiline=true") + .to("mock:ce1"); + b.from("knative:endpoint/ep2") + .convertBodyTo(String.class) + .to("log:ce2?showAll=true&multiline=true") + .to("mock:ce2"); }); context.start(); MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); - mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); - mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); - mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); - mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID1"); - mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE0"); + mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME)); + mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version()); + mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event"); + mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID1"); + mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE0"); mock1.expectedBodiesReceived("test"); mock1.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); - mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); - mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); - mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID2"); - mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE5"); + mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME)); + mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version()); + mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event"); + mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID2"); + mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE5"); mock2.expectedBodiesReceived("test"); mock2.expectedMessageCount(1); @@ -575,37 +549,34 @@ void testConsumeEventContent(CloudEvent ce) throws Exception { sourceEvent("default") ); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("knative:event/event1") - .convertBodyTo(String.class) - .to("log:ce1?showAll=true&multiline=true") - .to("mock:ce1"); - from("knative:event/event2") - .convertBodyTo(String.class) - .to("log:ce2?showAll=true&multiline=true") - .to("mock:ce2"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("knative:event/event1") + .convertBodyTo(String.class) + .to("log:ce1?showAll=true&multiline=true") + .to("mock:ce1"); + b.from("knative:event/event2") + .convertBodyTo(String.class) + .to("log:ce2?showAll=true&multiline=true") + .to("mock:ce2"); }); context.start(); MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); - mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); - mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); - mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "event1"); - mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID1"); - mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE1"); + mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME)); + mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version()); + mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "event1"); + mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID1"); + mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE1"); mock1.expectedBodiesReceived("test"); mock1.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); - mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); - mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "event2"); - mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID2"); - mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE2"); + mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME)); + mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version()); + mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "event2"); + mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID2"); + mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE2"); mock2.expectedBodiesReceived("test"); mock2.expectedMessageCount(1); @@ -662,20 +633,17 @@ void testReply(CloudEvent ce) throws Exception { )) ); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("knative:endpoint/from") - .convertBodyTo(String.class) - .setBody() - .constant("consumer") - .setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http()) - .constant("custom"); - from("direct:source") - .to("knative://endpoint/to") - .log("${body}") - .to("mock:to"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("knative:endpoint/from") + .convertBodyTo(String.class) + .setBody() + .constant("consumer") + .setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE) + .constant("custom"); + b.from("direct:source") + .to("knative://endpoint/to") + .log("${body}") + .to("mock:to"); }); MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class); @@ -684,7 +652,7 @@ public void configure() throws Exception { mock.expectedMessageCount(1); context.start(); - context.createProducerTemplate().sendBody("direct:source", ""); + template.sendBody("direct:source", ""); mock.assertIsSatisfied(); } @@ -712,20 +680,17 @@ void testReplyCloudEventHeaders(CloudEvent ce) throws Exception { )) ); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("knative:endpoint/from?replyWithCloudEvent=true") - .convertBodyTo(String.class) - .setBody() - .constant("consumer") - .setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http()) - .constant("custom"); - from("direct:source") - .to("knative://endpoint/to") - .log("${body}") - .to("mock:to"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("knative:endpoint/from?replyWithCloudEvent=true") + .convertBodyTo(String.class) + .setBody() + .constant("consumer") + .setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE) + .constant("custom"); + b.from("direct:source") + .to("knative://endpoint/to") + .log("${body}") + .to("mock:to"); }); MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class); @@ -734,7 +699,7 @@ public void configure() throws Exception { mock.expectedMessageCount(1); context.start(); - context.createProducerTemplate().sendBody("direct:source", ""); + template.sendBody("direct:source", ""); mock.assertIsSatisfied(); } @@ -968,28 +933,25 @@ void testEvents(CloudEvent ce) throws Exception { )) ); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:source") - .to("knative:event/myEvent"); - fromF("knative:event/myEvent") - .to("mock:ce"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("direct:source") + .to("knative:event/myEvent"); + b.from("knative:event/myEvent") + .to("mock:ce"); }); context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "myEvent"); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version()); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "myEvent"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME)); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_ID)); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); - context.createProducerTemplate().sendBody("direct:source", "test"); + template.sendBody("direct:source", "test"); mock.assertIsSatisfied(); } @@ -1021,28 +983,25 @@ void testEventsWithTypeAndVersion(CloudEvent ce) throws Exception { )) ); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:source") - .to("knative:event/myEvent?kind=MyObject&apiVersion=v1"); - from("knative:event/myEvent?kind=MyOtherObject&apiVersion=v2") - .to("mock:ce"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("direct:source") + .to("knative:event/myEvent?kind=MyObject&apiVersion=v1"); + b.from("knative:event/myEvent?kind=MyOtherObject&apiVersion=v2") + .to("mock:ce"); }); context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "myEvent"); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version()); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "myEvent"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME)); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_ID)); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); - context.createProducerTemplate().sendBody("direct:source", "test"); + template.sendBody("direct:source", "test"); mock.assertIsSatisfied(); } @@ -1071,23 +1030,20 @@ void testConsumeContentWithTypeAndVersion(CloudEvent ce) throws Exception { )) ); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("knative:endpoint/myEndpoint?kind=MyObject&apiVersion=v2") - .to("mock:ce"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("knative:endpoint/myEndpoint?kind=MyObject&apiVersion=v2") + .to("mock:ce"); }); context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "/somewhere"); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version()); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event"); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID"); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "/somewhere"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME)); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -1121,12 +1077,9 @@ void testWrongMethod(CloudEvent ce) throws Exception { )) ); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("knative:endpoint/myEndpoint") - .to("mock:ce"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("knative:endpoint/myEndpoint") + .to("mock:ce"); }); context.start(); @@ -1134,9 +1087,9 @@ public void configure() throws Exception { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .when() + .when() .get() - .then() + .then() .statusCode(404); } @@ -1157,12 +1110,9 @@ void testNoBody(CloudEvent ce) throws Exception { )) ); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:start") - .to("knative:endpoint/myEndpoint"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("direct:start") + .to("knative:endpoint/myEndpoint"); }); context.start(); @@ -1177,6 +1127,10 @@ public void configure() throws Exception { @EnumSource(CloudEvents.class) void testNoContent(CloudEvent ce) throws Exception { final int wordsPort = AvailablePortFinder.getNextAvailable(); + final KnativeHttpServer server = new KnativeHttpServer(context, wordsPort, event -> { + event.response().setStatusCode(204); + event.response().end(""); + }); configureKnativeComponent( context, @@ -1210,25 +1164,14 @@ void testNoContent(CloudEvent ce) throws Exception { )) ); - Undertow server = Undertow.builder() - .addHttpListener(wordsPort, "localhost") - .setHandler(exchange -> { - exchange.setStatusCode(204); - exchange.getResponseSender().send(""); - }) - .build(); - try { server.start(); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("knative:channel/messages") - .transform().simple("transformed ${body}") - .log("${body}") - .to("knative:channel/words"); - } + RouteBuilder.addRoutes(context, b -> { + b.from("knative:channel/messages") + .transform().simple("transformed ${body}") + .log("${body}") + .to("knative:channel/words"); }); context.start(); @@ -1255,26 +1198,23 @@ void testOrdering(CloudEvent ce) throws Exception { configureKnativeComponent(context, ce, hops); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:start") - .routeId("http") - .toF("http://localhost:%d", platformHttpPort) - .convertBodyTo(String.class); - - for (KnativeEnvironment.KnativeServiceDefinition definition: hops) { - fromF("knative:endpoint/%s", definition.getName()) - .routeId(definition.getName()) - .setBody().constant(definition.getName()); - } + RouteBuilder.addRoutes(context, b -> { + b.from("direct:start") + .routeId("http") + .toF("http://localhost:%d", platformHttpPort) + .convertBodyTo(String.class); + + for (KnativeEnvironment.KnativeServiceDefinition definition : hops) { + b.fromF("knative:endpoint/%s", definition.getName()) + .routeId(definition.getName()) + .setBody().constant(definition.getName()); } }); context.start(); List hopsDone = new ArrayList<>(); - for (KnativeEnvironment.KnativeServiceDefinition definition: hops) { + for (KnativeEnvironment.KnativeServiceDefinition definition : hops) { hopsDone.add(definition.getName()); Exchange result = template.request( @@ -1293,6 +1233,7 @@ public void configure() throws Exception { @EnumSource(CloudEvents.class) void testHeaders(CloudEvent ce) throws Exception { final int port = AvailablePortFinder.getNextAvailable(); + final KnativeHttpServer server = new KnativeHttpServer(context, port); configureKnativeComponent( context, @@ -1309,17 +1250,6 @@ void testHeaders(CloudEvent ce) throws Exception { ) ); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference exchange = new AtomicReference<>(); - - Undertow server = Undertow.builder() - .addHttpListener(port, "localhost") - .setHandler(se -> { - exchange.set(se); - latch.countDown(); - }) - .build(); - RouteBuilder.addRoutes(context, b -> { b.from("direct:start") .to("knative:endpoint/ep"); @@ -1330,16 +1260,13 @@ void testHeaders(CloudEvent ce) throws Exception { server.start(); template.sendBody("direct:start", ""); - latch.await(); - - HeaderMap headers = exchange.get().getRequestHeaders(); - - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("org.apache.camel.event"); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep"); - assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); - } finally { + HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("org.apache.camel.event"); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep"); + assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); + } finally { server.stop(); } } @@ -1348,6 +1275,7 @@ void testHeaders(CloudEvent ce) throws Exception { @EnumSource(CloudEvents.class) void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception { final int port = AvailablePortFinder.getNextAvailable(); + final KnativeHttpServer server = new KnativeHttpServer(context, port); final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); final String typeHeaderVal = UUID.randomUUID().toString(); final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(); @@ -1370,17 +1298,6 @@ void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception { ) ); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference exchange = new AtomicReference<>(); - - Undertow server = Undertow.builder() - .addHttpListener(port, "localhost") - .setHandler(se -> { - exchange.set(se); - latch.countDown(); - }) - .build(); - RouteBuilder.addRoutes(context, b -> { b.from("direct:start") .to("knative:endpoint/ep"); @@ -1391,16 +1308,13 @@ void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception { server.start(); template.sendBody("direct:start", ""); - latch.await(); - - HeaderMap headers = exchange.get().getRequestHeaders(); - - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal); - assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); - } finally { + HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal); + assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); + } finally { server.stop(); } } @@ -1409,6 +1323,7 @@ void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception { @EnumSource(CloudEvents.class) void testHeadersOverrideFromURI(CloudEvent ce) throws Exception { final int port = AvailablePortFinder.getNextAvailable(); + final KnativeHttpServer server = new KnativeHttpServer(context, port); final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); final String typeHeaderVal = UUID.randomUUID().toString(); final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(); @@ -1429,17 +1344,6 @@ void testHeadersOverrideFromURI(CloudEvent ce) throws Exception { ) ); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference exchange = new AtomicReference<>(); - - Undertow server = Undertow.builder() - .addHttpListener(port, "localhost") - .setHandler(se -> { - exchange.set(se); - latch.countDown(); - }) - .build(); - RouteBuilder.addRoutes(context, b -> { b.from("direct:start") .toF("knative:endpoint/ep?%s=%s&%s=%s", @@ -1452,16 +1356,13 @@ void testHeadersOverrideFromURI(CloudEvent ce) throws Exception { server.start(); template.sendBody("direct:start", ""); - latch.await(); - - HeaderMap headers = exchange.get().getRequestHeaders(); - - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal); - assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); - } finally { + HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal); + assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); + } finally { server.stop(); } } @@ -1470,6 +1371,7 @@ void testHeadersOverrideFromURI(CloudEvent ce) throws Exception { @EnumSource(CloudEvents.class) void testHeadersOverrideFromConf(CloudEvent ce) throws Exception { final int port = AvailablePortFinder.getNextAvailable(); + final KnativeHttpServer server = new KnativeHttpServer(context, port); final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); final String typeHeaderVal = UUID.randomUUID().toString(); final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(); @@ -1495,17 +1397,6 @@ void testHeadersOverrideFromConf(CloudEvent ce) throws Exception { Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal )); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference exchange = new AtomicReference<>(); - - Undertow server = Undertow.builder() - .addHttpListener(port, "localhost") - .setHandler(se -> { - exchange.set(se); - latch.countDown(); - }) - .build(); - RouteBuilder.addRoutes(context, b -> { b.from("direct:start") .to("knative:endpoint/ep"); @@ -1516,16 +1407,13 @@ void testHeadersOverrideFromConf(CloudEvent ce) throws Exception { server.start(); template.sendBody("direct:start", ""); - latch.await(); - - HeaderMap headers = exchange.get().getRequestHeaders(); - - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal); - assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); - } finally { + HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal); + assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); + } finally { server.stop(); } } @@ -1534,6 +1422,7 @@ void testHeadersOverrideFromConf(CloudEvent ce) throws Exception { @EnumSource(CloudEvents.class) void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws Exception { final int port = AvailablePortFinder.getNextAvailable(); + final KnativeHttpServer server = new KnativeHttpServer(context, port); configureKnativeComponent( context, @@ -1550,17 +1439,6 @@ void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws Exception ) ); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference exchange = new AtomicReference<>(); - - Undertow server = Undertow.builder() - .addHttpListener(port, "localhost") - .setHandler(se -> { - exchange.set(se); - latch.countDown(); - }) - .build(); - RouteBuilder.addRoutes(context, b -> { b.from("direct:start") .setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("myType") @@ -1572,16 +1450,13 @@ void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws Exception server.start(); template.sendBody("direct:start", ""); - latch.await(); - - HeaderMap headers = exchange.get().getRequestHeaders(); - - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("myType"); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep"); - assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); - } finally { + HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("myType"); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep"); + assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); + } finally { server.stop(); } } @@ -1590,6 +1465,7 @@ void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws Exception @EnumSource(CloudEvents.class) void testHeadersOverrideFromRouteWithCEHeader(CloudEvent ce) throws Exception { final int port = AvailablePortFinder.getNextAvailable(); + final KnativeHttpServer server = new KnativeHttpServer(context, port); configureKnativeComponent( context, @@ -1606,17 +1482,6 @@ void testHeadersOverrideFromRouteWithCEHeader(CloudEvent ce) throws Exception { ) ); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference exchange = new AtomicReference<>(); - - Undertow server = Undertow.builder() - .addHttpListener(port, "localhost") - .setHandler(se -> { - exchange.set(se); - latch.countDown(); - }) - .build(); - RouteBuilder.addRoutes(context, b -> { b.from("direct:start") .setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http()).constant("fromCEHeader") @@ -1629,16 +1494,69 @@ void testHeadersOverrideFromRouteWithCEHeader(CloudEvent ce) throws Exception { server.start(); template.sendBody("direct:start", ""); - latch.await(); + HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("fromCEHeader"); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep"); + assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); + } finally { + server.stop(); + } + } - HeaderMap headers = exchange.get().getRequestHeaders(); + @ParameterizedTest + @EnumSource(CloudEvents.class) + void testEventBridge(CloudEvent ce) throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + final KnativeHttpServer server = new KnativeHttpServer(context, port); + + configureKnativeComponent( + context, + ce, + event( + Knative.EndpointKind.sink, + "event.sink", + "localhost", + port, + mapOf( + Knative.CONTENT_TYPE, "text/plain" + )), + sourceEvent( + "event.source", + mapOf( + Knative.CONTENT_TYPE, "text/plain" + )) + ); + + RouteBuilder.addRoutes(context, b -> { + b.from("knative:event/event.source") + .to("knative:event/event.sink"); + }); + + context.start(); + + try { + server.start(); + + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event.source") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .when() + .post() + .then() + .statusCode(204); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("fromCEHeader"); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); - assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep"); - assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); - } finally { + HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("event.sink"); + assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); + } finally { server.stop(); } } diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java index 73fac6e5c..d5a73da1f 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java @@ -31,6 +31,8 @@ import org.apache.camel.component.knative.spi.CloudEvent; import org.apache.camel.component.knative.spi.Knative; import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; abstract class AbstractCloudEventProcessor implements CloudEventProcessor { private final CloudEvent cloudEvent; @@ -57,7 +59,7 @@ public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeSe final Map headers = exchange.getIn().getHeaders(); for (CloudEvent.Attribute attribute: ce.attributes()) { - Object val = headers.get(attribute.http()); + Object val = headers.remove(attribute.http()); if (val != null) { headers.put(attribute.id(), val); } @@ -75,6 +77,7 @@ public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeSe @Override public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) { final CloudEvent ce = cloudEvent(); + final Logger logger = LoggerFactory.getLogger(getClass()); return exchange -> { final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE); @@ -89,12 +92,33 @@ public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeSe headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); + // + // in case of events, the type of the event defined as URI param so we need + // to override it to avoid the event type be overridden by Messages's headers + // + if (endpoint.getType() == Knative.Type.event) { + Object eventType = headers.get(CloudEvent.CAMEL_CLOUD_EVENT_TYPE); + + if (eventType != null) { + logger.debug("Detected the presence of {} header with value {}: it will be ignored and replaced by value set as uri parameter {}", + CloudEvent.CAMEL_CLOUD_EVENT_TYPE, + eventType, + endpoint.getName()); + } + + headers.put(cloudEvent().mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), endpoint.getName()); + } else { + setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TYPE, () -> { + return service.getMetadata().getOrDefault( + Knative.KNATIVE_EVENT_TYPE, + endpoint.getConfiguration().getCloudEventsType() + ); + }); + } + setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_ID, exchange::getExchangeId); setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, endpoint::getEndpointUri); setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce::version); - setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TYPE, () -> { - return service.getMetadata().getOrDefault(Knative.KNATIVE_EVENT_TYPE, endpoint.getConfiguration().getCloudEventsType()); - }); setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TIME, () -> { final ZonedDateTime created = ZonedDateTime.ofInstant(Instant.ofEpochMilli(exchange.getCreated()), ZoneId.systemDefault()); final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);