Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix apache/camel-k#1119: do not return cloud events by default #208

Merged
merged 4 commits into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,25 @@ public interface KnativeTransport extends Service {
* Create a camel {@link Producer} in place of the original endpoint for a specific protocol.
*
* @param endpoint the endpoint for which the producer should be created
* @param configuration the general transport configuration
* @param service the service definition containing information about how make reach the target service.
* @return
*/
Producer createProducer(
Endpoint endpoint,
KnativeTransportConfiguration configuration,
KnativeEnvironment.KnativeServiceDefinition service);

/**
* Create a camel {@link Consumer} in place of the original endpoint for a specific protocol.
*
* @param endpoint the endpoint for which the consumer should be created.
* @param configuration the general transport configuration
* @param service the service definition containing information about how make the route reachable from knative.
* @return
*/
Consumer createConsumer(
Endpoint endpoint,
KnativeTransportConfiguration configuration,
KnativeEnvironment.KnativeServiceDefinition service, Processor processor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative.spi;

public final class KnativeTransportConfiguration {

private final CloudEvent cloudEvent;

private final boolean removeCloudEventHeadersInReply;

public KnativeTransportConfiguration(CloudEvent cloudEvent, boolean removeCloudEventHeadersInReply) {
this.cloudEvent = cloudEvent;
this.removeCloudEventHeadersInReply = removeCloudEventHeadersInReply;
}

public CloudEvent getCloudEvent() {
return cloudEvent;
}

public boolean isRemoveCloudEventHeadersInReply() {
return removeCloudEventHeadersInReply;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@
import java.util.stream.Collectors;

import io.vertx.core.http.HttpServerRequest;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.knative.spi.CloudEvent;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
import org.apache.camel.util.ObjectHelper;

public final class KnativeHttpSupport {
public final class KnativeHttpSupport {
private KnativeHttpSupport() {
}

Expand Down Expand Up @@ -94,4 +99,23 @@ public static Predicate<HttpServerRequest> createFilter(KnativeEnvironment.Knati
return true;
};
}

/**
* Removes cloud event headers at the end of the processing.
*/
public static Processor withoutCloudEventHeaders(Processor delegate, CloudEvent ce) {
return new DelegateAsyncProcessor(delegate) {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
return processor.process(exchange, doneSync -> {
// remove CloudEvent headers
for (CloudEvent.Attribute attr : ce.attributes()) {
exchange.getMessage().removeHeader(attr.http());
}
callback.done(doneSync);
});
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.camel.Producer;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.knative.spi.KnativeTransport;
import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
Expand Down Expand Up @@ -198,12 +199,17 @@ protected void doStop() throws Exception {
// *****************************

@Override
public Producer createProducer(Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service) {
return new KnativeHttpProducer(this, endpoint, service, vertx, vertxHttpClientOptions);
}

@Override
public Consumer createConsumer(Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
return new KnativeHttpConsumer(this, endpoint, service, processor);
public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
Processor next = processor;
if (config.isRemoveCloudEventHeadersInReply()) {
next = KnativeHttpSupport.withoutCloudEventHeaders(processor, config.getCloudEvent());
}
return new KnativeHttpConsumer(this, endpoint, service, next);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,9 @@ public void configure() throws Exception {
from("knative:endpoint/from")
.convertBodyTo(String.class)
.setBody()
.constant("consumer");
.constant("consumer")
.setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())
.constant("custom");
from("direct:source")
.to("knative://endpoint/to")
.log("${body}")
Expand All @@ -599,6 +601,60 @@ public void configure() throws Exception {

MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
mock.expectedBodiesReceived("consumer");
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), null);
mock.expectedMessageCount(1);

context.start();
context.createProducerTemplate().sendBody("direct:source", "");

mock.assertIsSatisfied();
}

@ParameterizedTest
@EnumSource(CloudEvents.class)
void testReplyCloudEventHeaders(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.source,
"from",
"localhost",
port,
KnativeSupport.mapOf(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)),
endpoint(
Knative.EndpointKind.sink,
"to",
"localhost",
port,
KnativeSupport.mapOf(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);

context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("knative:endpoint/from?replyWithCloudEvent=true")
.convertBodyTo(String.class)
.setBody()
.constant("consumer")
.setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())
.constant("custom");
from("direct:source")
.to("knative://endpoint/to")
.log("${body}")
.to("mock:to");
}
});

MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
mock.expectedBodiesReceived("consumer");
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "custom");
mock.expectedMessageCount(1);

context.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ public void setEnvironment(KnativeEnvironment environment) {
configuration.setEnvironment(environment);
}

@Deprecated
public boolean isJsonSerializationEnabled() {
return configuration.isJsonSerializationEnabled();
}

@Deprecated
public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) {
configuration.setJsonSerializationEnabled(jsonSerializationEnabled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class KnativeConfiguration implements Cloneable {
@UriParam
private String serviceName;
@UriParam(defaultValue = "false")
@Deprecated
private boolean jsonSerializationEnabled;
@UriParam(defaultValue = "0.3", enums = "0.1,0.2,0.3")
private String cloudEventsSpecVersion = CloudEvents.V03.version();
Expand All @@ -46,6 +47,8 @@ public class KnativeConfiguration implements Cloneable {
private String apiVersion;
@UriParam(label = "advanced")
private String kind;
@UriParam(label = "consumer", defaultValue = "false")
private boolean replyWithCloudEvent;

public KnativeConfiguration() {
}
Expand Down Expand Up @@ -78,13 +81,32 @@ public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}

public boolean isReplyWithCloudEvent() {
return replyWithCloudEvent;
}

/**
* Transforms the reply into a cloud event that will be processed by the caller.
*
* When listening to events from a Knative Broker, if this flag is enabled, replies will
* be published to the same Broker where the request comes from (beware that if you don't
* change the "type" of the received message, you may create a loop and receive your same reply).
*
* When this flag is disabled, CloudEvent headers are removed from the reply.
*/
public void setReplyWithCloudEvent(boolean replyWithCloudEvent) {
this.replyWithCloudEvent = replyWithCloudEvent;
}

@Deprecated
public boolean isJsonSerializationEnabled() {
return jsonSerializationEnabled;
}

/**
* Enables automatic serialization to JSON of the produced events.
*/
@Deprecated
public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) {
this.jsonSerializationEnabled = jsonSerializationEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
/**
* Converts objects prior to serializing them to external endpoints or channels
*/
@Deprecated
public class KnativeConversionProcessor implements Processor {

private boolean enabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import org.apache.camel.component.knative.spi.CloudEvent;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.support.PropertyBindingSupport;


/**
* This component allows to interact with KNative events.
*/
Expand Down Expand Up @@ -76,7 +76,7 @@ public Producer createProducer() throws Exception {
final KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(Knative.EndpointKind.sink);
final Processor ceProcessor = cloudEvent.producer(this, service);
final Processor ceConverter = new KnativeConversionProcessor(configuration.isJsonSerializationEnabled());
final Producer producer = getComponent().getTransport().createProducer(this, service);
final Producer producer = getComponent().getTransport().createProducer(this, createTransportConfiguration(), service);

PropertyBindingSupport.build()
.withCamelContext(getCamelContext())
Expand All @@ -92,8 +92,12 @@ public Producer createProducer() throws Exception {
public Consumer createConsumer(Processor processor) throws Exception {
final KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(Knative.EndpointKind.source);
final Processor ceProcessor = cloudEvent.consumer(this, service);
final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor);
final Consumer consumer = getComponent().getTransport().createConsumer(this, service, pipeline);
Processor replyProcessor = null;
if (configuration.isReplyWithCloudEvent()) {
replyProcessor = cloudEvent.producer(this, service);
}
final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor, replyProcessor);
final Consumer consumer = getComponent().getTransport().createConsumer(this, createTransportConfiguration(), service, pipeline);

PropertyBindingSupport.build()
.withCamelContext(getCamelContext())
Expand Down Expand Up @@ -148,7 +152,7 @@ KnativeEnvironment.KnativeServiceDefinition lookupServiceDefinition(Knative.Endp
Map<String, String> metadata = new HashMap<>();
metadata.putAll(service.get().getMetadata());

for (Map.Entry<String, Object> entry: configuration.getFilters().entrySet()) {
for (Map.Entry<String, Object> entry : configuration.getFilters().entrySet()) {
String key = entry.getKey();
Object val = entry.getValue();

Expand All @@ -161,7 +165,7 @@ KnativeEnvironment.KnativeServiceDefinition lookupServiceDefinition(Knative.Endp
}
}

for (Map.Entry<String, Object> entry: configuration.getCeOverride().entrySet()) {
for (Map.Entry<String, Object> entry : configuration.getCeOverride().entrySet()) {
String key = entry.getKey();
Object val = entry.getValue();

Expand Down Expand Up @@ -210,4 +214,11 @@ Optional<KnativeEnvironment.KnativeServiceDefinition> lookupServiceDefinition(St
})
.findFirst();
}

private KnativeTransportConfiguration createTransportConfiguration() {
return new KnativeTransportConfiguration(
this.cloudEvent.cloudEvent(),
!this.configuration.isReplyWithCloudEvent()
);
}
}