Skip to content

Commit

Permalink
No type converter warning if final type of body is java.util.Map apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Jul 15, 2020
1 parent 90a38e0 commit 61284fa
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class Knative {
public static final String KNATIVE_EVENT_TYPE = "knative.event.type";
public static final String KNATIVE_KIND = "knative.kind";
public static final String KNATIVE_API_VERSION = "knative.apiVersion";
public static final String KNATIVE_REPLY = "knative.reply";
public static final String CONTENT_TYPE = "content.type";
public static final String MIME_STRUCTURED_CONTENT_MODE = "application/cloudevents+json";
public static final String MIME_BATCH_CONTENT_MODE = "application/cloudevents-batch+json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -139,6 +140,13 @@ public static KnativeServiceDefinition channel(Knative.EndpointKind endpointKind
.build();
}

public static KnativeServiceDefinition sourceChannel(String name, Map<String, String> metadata) {
return serviceBuilder(Knative.Type.channel, name)
.withMeta(metadata)
.withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source)
.build();
}

public static KnativeServiceDefinition event(Knative.EndpointKind endpointKind, String name, String host, int port) {
return serviceBuilder(Knative.Type.event, name)
.withHost(host)
Expand Down Expand Up @@ -330,6 +338,10 @@ public String getMetadata(String key) {
return getMetadata().get(key);
}

public Optional<String> getOptionalMetadata(String key) {
return Optional.ofNullable(getMetadata(key));
}

public boolean matches(Knative.Type type, String name) {
return Objects.equals(type.name(), getMetadata(Knative.KNATIVE_TYPE))
&& Objects.equals(name, getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
public final class KnativeTransportConfiguration {

private final CloudEvent cloudEvent;

private final boolean removeCloudEventHeadersInReply;
private final boolean reply;

public KnativeTransportConfiguration(CloudEvent cloudEvent, boolean removeCloudEventHeadersInReply) {
public KnativeTransportConfiguration(CloudEvent cloudEvent, boolean removeCloudEventHeadersInReply, boolean reply) {
this.cloudEvent = cloudEvent;
this.removeCloudEventHeadersInReply = removeCloudEventHeadersInReply;
this.reply = reply;
}

public CloudEvent getCloudEvent() {
Expand All @@ -35,4 +36,7 @@ public boolean isRemoveCloudEventHeadersInReply() {
return removeCloudEventHeadersInReply;
}

public boolean isReply() {
return reply;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@
import io.vertx.ext.web.RoutingContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
import org.apache.camel.component.platform.http.vertx.VertxPlatformHttpRouter;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.util.ObjectHelper;
Expand All @@ -50,7 +49,7 @@
public class KnativeHttpConsumer extends DefaultConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpConsumer.class);

private final KnativeHttpTransport transport;
private final KnativeTransportConfiguration configuration;
private final Predicate<HttpServerRequest> filter;
private final KnativeEnvironment.KnativeServiceDefinition serviceDefinition;
private final VertxPlatformHttpRouter router;
Expand All @@ -60,15 +59,15 @@ public class KnativeHttpConsumer extends DefaultConsumer {
private Route route;

public KnativeHttpConsumer(
KnativeHttpTransport transport,
KnativeTransportConfiguration configuration,
Endpoint endpoint,
KnativeEnvironment.KnativeServiceDefinition serviceDefinition,
VertxPlatformHttpRouter router,
Processor processor) {

super(endpoint, processor);

this.transport = transport;
this.configuration = configuration;
this.serviceDefinition = serviceDefinition;
this.router = router;
this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy();
Expand Down Expand Up @@ -149,18 +148,16 @@ protected void doResume() throws Exception {

private void handleRequest(RoutingContext routingContext) {
final HttpServerRequest request = routingContext.request();
final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut);
final Message in = toMessage(request, exchange);
final Exchange exchange = getEndpoint().createExchange();
final Message message = toMessage(request, exchange);

Buffer payload = routingContext.getBody();
if (payload != null) {
in.setBody(payload.getBytes());
message.setBody(payload.getBytes());
} else {
in.setBody(null);
message.setBody(null);
}

exchange.setIn(in);

try {
createUoW(exchange);

Expand Down Expand Up @@ -192,7 +189,7 @@ private void handleRequest(RoutingContext routingContext) {
HttpServerResponse response = toHttpResponse(request, exchange.getMessage());
Buffer body = null;

if (request.response().getStatusCode() != 204) {
if (request.response().getStatusCode() != 204 && configuration.isReply()) {
body = computeResponseBody(exchange.getMessage());

// set the content type in the response.
Expand Down Expand Up @@ -234,7 +231,7 @@ private void handleRequest(RoutingContext routingContext) {
}

private Message toMessage(HttpServerRequest request, Exchange exchange) {
Message message = new DefaultMessage(exchange.getContext());
Message message = exchange.getMessage();
String path = request.path();

if (serviceDefinition.getPath() != null) {
Expand Down Expand Up @@ -275,17 +272,19 @@ private HttpServerResponse toHttpResponse(HttpServerRequest request, Message mes

response.setStatusCode(code);

for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
final String key = entry.getKey();
final Object value = entry.getValue();
if (configuration.isReply()) {
for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
final String key = entry.getKey();
final Object value = entry.getValue();

for (Object it: org.apache.camel.support.ObjectHelper.createIterable(value, null)) {
String headerValue = tc.convertTo(String.class, it);
if (headerValue == null) {
continue;
}
if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
response.putHeader(key, headerValue);
for (Object it : org.apache.camel.support.ObjectHelper.createIterable(value, null)) {
String headerValue = tc.convertTo(String.class, it);
if (headerValue == null) {
continue;
}
if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
response.putHeader(key, headerValue);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
public class KnativeHttpProducer extends DefaultAsyncProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpProducer.class);

private final KnativeHttpTransport transport;
private final KnativeEnvironment.KnativeServiceDefinition serviceDefinition;
private final Vertx vertx;
private final WebClientOptions clientOptions;
Expand All @@ -56,14 +55,12 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
private WebClient client;

public KnativeHttpProducer(
KnativeHttpTransport transport,
Endpoint endpoint,
KnativeEnvironment.KnativeServiceDefinition serviceDefinition,
Vertx vertx,
WebClientOptions clientOptions) {
super(endpoint);

this.transport = transport;
this.serviceDefinition = serviceDefinition;
this.vertx = ObjectHelper.notNull(vertx, "vertx");
this.clientOptions = ObjectHelper.supplyIfEmpty(clientOptions, WebClientOptions::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected void doStop() throws Exception {

@Override
public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service) {
return new KnativeHttpProducer(this, endpoint, service, this.router.vertx(), vertxHttpClientOptions);
return new KnativeHttpProducer(endpoint, service, this.router.vertx(), vertxHttpClientOptions);
}

@Override
Expand All @@ -102,7 +102,7 @@ public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration
next = KnativeHttpSupport.withoutCloudEventHeaders(next, config.getCloudEvent());
}

return new KnativeHttpConsumer(this, endpoint, service, this.router, next);
return new KnativeHttpConsumer(config, endpoint, service, this.router, next);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
Expand Down Expand Up @@ -59,10 +60,13 @@
import static org.apache.camel.component.knative.spi.KnativeEnvironment.channel;
import static org.apache.camel.component.knative.spi.KnativeEnvironment.endpoint;
import static org.apache.camel.component.knative.spi.KnativeEnvironment.event;
import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceChannel;
import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceEndpoint;
import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceEvent;
import static org.apache.camel.util.CollectionHelper.mapOf;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.is;

public class KnativeHttpTest {

Expand Down Expand Up @@ -1250,6 +1254,116 @@ void testNoContent(CloudEvent ce) throws Exception {
}
}

@ParameterizedTest
@EnumSource(CloudEvents.class)
void testNoReply(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceChannel(
"channel",
Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);

RouteBuilder.addRoutes(context, b -> {
b.from("knative:channel/channel?reply=false")
.setBody().constant(Map.of());
});

context.start();

given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
.when()
.post()
.then()
.statusCode(204)
.body(is(emptyOrNullString()));
}

@ParameterizedTest
@EnumSource(CloudEvents.class)
void testNoReplyMeta(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceChannel(
"channel",
Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_REPLY, "false"
))
);

RouteBuilder.addRoutes(context, b -> {
b.from("knative:channel/channel")
.setBody().constant(Map.of());
});

context.start();

given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
.when()
.post()
.then()
.statusCode(204)
.body(is(emptyOrNullString()));
}

@ParameterizedTest
@EnumSource(CloudEvents.class)
void testNoReplyMetaOverride(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceChannel(
"channel",
Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_REPLY, "true"
))
);

RouteBuilder.addRoutes(context, b -> {
b.from("knative:channel/channel?reply=false")
.setBody().constant(Map.of());
});

context.start();

given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
.when()
.post()
.then()
.statusCode(204)
.body(is(emptyOrNullString()));
}

@ParameterizedTest
@EnumSource(CloudEvents.class)
void testOrdering(CloudEvent ce) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "kind": target.getConfiguration().setKind(property(camelContext, java.lang.String.class, value)); return true;
case "lazystartproducer":
case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
case "reply": target.getConfiguration().setReply(property(camelContext, java.lang.Boolean.class, value)); return true;
case "replywithcloudevent":
case "replyWithCloudEvent": target.getConfiguration().setReplyWithCloudEvent(property(camelContext, boolean.class, value)); return true;
case "servicename":
Expand All @@ -66,6 +67,7 @@ public Map<String, Object> getAllOptions(Object target) {
answer.put("filters", java.util.Map.class);
answer.put("kind", java.lang.String.class);
answer.put("lazyStartProducer", boolean.class);
answer.put("reply", java.lang.Boolean.class);
answer.put("replyWithCloudEvent", boolean.class);
answer.put("serviceName", java.lang.String.class);
answer.put("synchronous", boolean.class);
Expand Down Expand Up @@ -98,6 +100,7 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "kind": return target.getConfiguration().getKind();
case "lazystartproducer":
case "lazyStartProducer": return target.isLazyStartProducer();
case "reply": return target.getConfiguration().getReply();
case "replywithcloudevent":
case "replyWithCloudEvent": return target.getConfiguration().isReplyWithCloudEvent();
case "servicename":
Expand Down
Loading

0 comments on commit 61284fa

Please sign in to comment.