diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java index a9dbbc08a..dea93d9bf 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java @@ -29,6 +29,7 @@ public final class Knative { public static final String KNATIVE_EVENT_TYPE = "knative.event.type"; public static final String KNATIVE_KIND = "knative.kind"; public static final String KNATIVE_API_VERSION = "knative.apiVersion"; + public static final String KNATIVE_REPLY = "knative.reply"; public static final String CONTENT_TYPE = "content.type"; public static final String MIME_STRUCTURED_CONTENT_MODE = "application/cloudevents+json"; public static final String MIME_BATCH_CONTENT_MODE = "application/cloudevents-batch+json"; diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java index 5c9b7a8e7..cfca7dca3 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Stream; import com.fasterxml.jackson.annotation.JsonCreator; @@ -139,6 +140,13 @@ public static KnativeServiceDefinition channel(Knative.EndpointKind endpointKind .build(); } + public static KnativeServiceDefinition sourceChannel(String name, Map metadata) { + return serviceBuilder(Knative.Type.channel, name) + .withMeta(metadata) + .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source) + .build(); + } + public static KnativeServiceDefinition event(Knative.EndpointKind endpointKind, String name, String host, int port) { return serviceBuilder(Knative.Type.event, name) .withHost(host) @@ -330,6 +338,10 @@ public String getMetadata(String key) { return getMetadata().get(key); } + public Optional getOptionalMetadata(String key) { + return Optional.ofNullable(getMetadata(key)); + } + public boolean matches(Knative.Type type, String name) { return Objects.equals(type.name(), getMetadata(Knative.KNATIVE_TYPE)) && Objects.equals(name, getName()); diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransportConfiguration.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransportConfiguration.java index fc94034da..8ce752f20 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransportConfiguration.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransportConfiguration.java @@ -19,12 +19,13 @@ public final class KnativeTransportConfiguration { private final CloudEvent cloudEvent; - private final boolean removeCloudEventHeadersInReply; + private final boolean reply; - public KnativeTransportConfiguration(CloudEvent cloudEvent, boolean removeCloudEventHeadersInReply) { + public KnativeTransportConfiguration(CloudEvent cloudEvent, boolean removeCloudEventHeadersInReply, boolean reply) { this.cloudEvent = cloudEvent; this.removeCloudEventHeadersInReply = removeCloudEventHeadersInReply; + this.reply = reply; } public CloudEvent getCloudEvent() { @@ -35,4 +36,7 @@ public boolean isRemoveCloudEventHeadersInReply() { return removeCloudEventHeadersInReply; } + public boolean isReply() { + return reply; + } } diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java index 4ce46b140..fc83036ed 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java @@ -31,16 +31,15 @@ import io.vertx.ext.web.RoutingContext; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; import org.apache.camel.TypeConverter; import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeTransportConfiguration; import org.apache.camel.component.platform.http.vertx.VertxPlatformHttpRouter; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultConsumer; -import org.apache.camel.support.DefaultMessage; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.MessageHelper; import org.apache.camel.util.ObjectHelper; @@ -50,7 +49,7 @@ public class KnativeHttpConsumer extends DefaultConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpConsumer.class); - private final KnativeHttpTransport transport; + private final KnativeTransportConfiguration configuration; private final Predicate filter; private final KnativeEnvironment.KnativeServiceDefinition serviceDefinition; private final VertxPlatformHttpRouter router; @@ -60,7 +59,7 @@ public class KnativeHttpConsumer extends DefaultConsumer { private Route route; public KnativeHttpConsumer( - KnativeHttpTransport transport, + KnativeTransportConfiguration configuration, Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition serviceDefinition, VertxPlatformHttpRouter router, @@ -68,7 +67,7 @@ public KnativeHttpConsumer( super(endpoint, processor); - this.transport = transport; + this.configuration = configuration; this.serviceDefinition = serviceDefinition; this.router = router; this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy(); @@ -149,18 +148,16 @@ protected void doResume() throws Exception { private void handleRequest(RoutingContext routingContext) { final HttpServerRequest request = routingContext.request(); - final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut); - final Message in = toMessage(request, exchange); + final Exchange exchange = getEndpoint().createExchange(); + final Message message = toMessage(request, exchange); Buffer payload = routingContext.getBody(); if (payload != null) { - in.setBody(payload.getBytes()); + message.setBody(payload.getBytes()); } else { - in.setBody(null); + message.setBody(null); } - exchange.setIn(in); - try { createUoW(exchange); @@ -192,7 +189,7 @@ private void handleRequest(RoutingContext routingContext) { HttpServerResponse response = toHttpResponse(request, exchange.getMessage()); Buffer body = null; - if (request.response().getStatusCode() != 204) { + if (request.response().getStatusCode() != 204 && configuration.isReply()) { body = computeResponseBody(exchange.getMessage()); // set the content type in the response. @@ -234,7 +231,7 @@ private void handleRequest(RoutingContext routingContext) { } private Message toMessage(HttpServerRequest request, Exchange exchange) { - Message message = new DefaultMessage(exchange.getContext()); + Message message = exchange.getMessage(); String path = request.path(); if (serviceDefinition.getPath() != null) { @@ -275,17 +272,19 @@ private HttpServerResponse toHttpResponse(HttpServerRequest request, Message mes response.setStatusCode(code); - for (Map.Entry entry : message.getHeaders().entrySet()) { - final String key = entry.getKey(); - final Object value = entry.getValue(); + if (configuration.isReply()) { + for (Map.Entry entry : message.getHeaders().entrySet()) { + final String key = entry.getKey(); + final Object value = entry.getValue(); - for (Object it: org.apache.camel.support.ObjectHelper.createIterable(value, null)) { - String headerValue = tc.convertTo(String.class, it); - if (headerValue == null) { - continue; - } - if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) { - response.putHeader(key, headerValue); + for (Object it : org.apache.camel.support.ObjectHelper.createIterable(value, null)) { + String headerValue = tc.convertTo(String.class, it); + if (headerValue == null) { + continue; + } + if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) { + response.putHeader(key, headerValue); + } } } } diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java index 010bfb332..67045d1be 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java @@ -46,7 +46,6 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpProducer.class); - private final KnativeHttpTransport transport; private final KnativeEnvironment.KnativeServiceDefinition serviceDefinition; private final Vertx vertx; private final WebClientOptions clientOptions; @@ -56,14 +55,12 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { private WebClient client; public KnativeHttpProducer( - KnativeHttpTransport transport, Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition serviceDefinition, Vertx vertx, WebClientOptions clientOptions) { super(endpoint); - this.transport = transport; this.serviceDefinition = serviceDefinition; this.vertx = ObjectHelper.notNull(vertx, "vertx"); this.clientOptions = ObjectHelper.supplyIfEmpty(clientOptions, WebClientOptions::new); diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java index 1666d7a79..af3f7d43e 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java @@ -91,7 +91,7 @@ protected void doStop() throws Exception { @Override public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service) { - return new KnativeHttpProducer(this, endpoint, service, this.router.vertx(), vertxHttpClientOptions); + return new KnativeHttpProducer(endpoint, service, this.router.vertx(), vertxHttpClientOptions); } @Override @@ -102,7 +102,7 @@ public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration next = KnativeHttpSupport.withoutCloudEventHeaders(next, config.getCloudEvent()); } - return new KnativeHttpConsumer(this, endpoint, service, this.router, next); + return new KnativeHttpConsumer(config, endpoint, service, this.router, next); } } diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index ee55fccf1..caaf91b10 100644 --- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -20,6 +20,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Random; import java.util.UUID; @@ -59,10 +60,13 @@ import static org.apache.camel.component.knative.spi.KnativeEnvironment.channel; import static org.apache.camel.component.knative.spi.KnativeEnvironment.endpoint; import static org.apache.camel.component.knative.spi.KnativeEnvironment.event; +import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceChannel; import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceEndpoint; import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceEvent; import static org.apache.camel.util.CollectionHelper.mapOf; import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.emptyOrNullString; +import static org.hamcrest.Matchers.is; public class KnativeHttpTest { @@ -1250,6 +1254,116 @@ void testNoContent(CloudEvent ce) throws Exception { } } + @ParameterizedTest + @EnumSource(CloudEvents.class) + void testNoReply(CloudEvent ce) throws Exception { + configureKnativeComponent( + context, + ce, + sourceChannel( + "channel", + Map.of( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + )) + ); + + RouteBuilder.addRoutes(context, b -> { + b.from("knative:channel/channel?reply=false") + .setBody().constant(Map.of()); + }); + + context.start(); + + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .when() + .post() + .then() + .statusCode(204) + .body(is(emptyOrNullString())); + } + + @ParameterizedTest + @EnumSource(CloudEvents.class) + void testNoReplyMeta(CloudEvent ce) throws Exception { + configureKnativeComponent( + context, + ce, + sourceChannel( + "channel", + Map.of( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain", + Knative.KNATIVE_REPLY, "false" + )) + ); + + RouteBuilder.addRoutes(context, b -> { + b.from("knative:channel/channel") + .setBody().constant(Map.of()); + }); + + context.start(); + + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .when() + .post() + .then() + .statusCode(204) + .body(is(emptyOrNullString())); + } + + @ParameterizedTest + @EnumSource(CloudEvents.class) + void testNoReplyMetaOverride(CloudEvent ce) throws Exception { + configureKnativeComponent( + context, + ce, + sourceChannel( + "channel", + Map.of( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain", + Knative.KNATIVE_REPLY, "true" + )) + ); + + RouteBuilder.addRoutes(context, b -> { + b.from("knative:channel/channel?reply=false") + .setBody().constant(Map.of()); + }); + + context.start(); + + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .when() + .post() + .then() + .statusCode(204) + .body(is(emptyOrNullString())); + } + @ParameterizedTest @EnumSource(CloudEvents.class) void testOrdering(CloudEvent ce) throws Exception { diff --git a/camel-knative/camel-knative/src/generated/java/org/apache/camel/component/knative/KnativeEndpointConfigurer.java b/camel-knative/camel-knative/src/generated/java/org/apache/camel/component/knative/KnativeEndpointConfigurer.java index 05a910050..d99b5103b 100644 --- a/camel-knative/camel-knative/src/generated/java/org/apache/camel/component/knative/KnativeEndpointConfigurer.java +++ b/camel-knative/camel-knative/src/generated/java/org/apache/camel/component/knative/KnativeEndpointConfigurer.java @@ -40,6 +40,7 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj case "kind": target.getConfiguration().setKind(property(camelContext, java.lang.String.class, value)); return true; case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; + case "reply": target.getConfiguration().setReply(property(camelContext, java.lang.Boolean.class, value)); return true; case "replywithcloudevent": case "replyWithCloudEvent": target.getConfiguration().setReplyWithCloudEvent(property(camelContext, boolean.class, value)); return true; case "servicename": @@ -66,6 +67,7 @@ public Map getAllOptions(Object target) { answer.put("filters", java.util.Map.class); answer.put("kind", java.lang.String.class); answer.put("lazyStartProducer", boolean.class); + answer.put("reply", java.lang.Boolean.class); answer.put("replyWithCloudEvent", boolean.class); answer.put("serviceName", java.lang.String.class); answer.put("synchronous", boolean.class); @@ -98,6 +100,7 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) { case "kind": return target.getConfiguration().getKind(); case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); + case "reply": return target.getConfiguration().getReply(); case "replywithcloudevent": case "replyWithCloudEvent": return target.getConfiguration().isReplyWithCloudEvent(); case "servicename": diff --git a/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json b/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json index 12371c697..4954d47c5 100644 --- a/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json +++ b/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json @@ -41,6 +41,7 @@ "replyWithCloudEvent": { "kind": "parameter", "displayName": "Reply With Cloud Event", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "Transforms the reply into a cloud event that will be processed by the caller. When listening to events from a Knative Broker, if this flag is enabled, replies will be published to the same Broker where the request comes from (beware that if you don't change the type of the received message, you may create a loop and receive your same reply). When this flag is disabled, CloudEvent headers are removed from the reply." }, "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored." }, "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "reply": { "kind": "parameter", "displayName": "Reply", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "secret": false, "defaultValue": "true", "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "If the consumer should construct a full reply to knative request." }, "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, "apiVersion": { "kind": "parameter", "displayName": "Api Version", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "The version of the k8s resource referenced by the endpoint." }, "basicPropertyBinding": { "kind": "parameter", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" }, diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java index 63b8e8b53..ee5f168c5 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java @@ -29,7 +29,6 @@ import org.apache.camel.spi.Metadata; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.DefaultComponent; -import org.apache.camel.support.PropertyBindingSupport; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.PropertiesHelper; @@ -227,14 +226,10 @@ protected Endpoint createEndpoint(String uri, String remaining, Map lookupServiceDefinition(St .findFirst(); } - private KnativeTransportConfiguration createTransportConfiguration() { + private KnativeTransportConfiguration createTransportConfiguration(KnativeEnvironment.KnativeServiceDefinition definition) { return new KnativeTransportConfiguration( this.cloudEvent.cloudEvent(), - !this.configuration.isReplyWithCloudEvent() + !this.configuration.isReplyWithCloudEvent(), + ObjectHelper.supplyIfEmpty( + this.configuration.getReply(), + () -> definition.getOptionalMetadata(Knative.KNATIVE_REPLY).map(Boolean::parseBoolean).orElse(true) + ) ); } diff --git a/tooling/camel-k-test/pom.xml b/tooling/camel-k-test/pom.xml index 508a3d328..5f43ebb57 100644 --- a/tooling/camel-k-test/pom.xml +++ b/tooling/camel-k-test/pom.xml @@ -39,6 +39,11 @@ assertj-core + + org.hamcrest + hamcrest-core + + io.rest-assured rest-assured