Skip to content

Commit

Permalink
fix apache/camel-k#1119: do not return cloud events by default
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaferraro committed Dec 13, 2019
1 parent 5f21a87 commit 933f2a5
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,9 @@ public void configure() throws Exception {
from("knative:endpoint/from")
.convertBodyTo(String.class)
.setBody()
.constant("consumer");
.constant("consumer")
.setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())
.constant("custom");
from("direct:source")
.to("knative://endpoint/to")
.log("${body}")
Expand All @@ -599,6 +601,60 @@ public void configure() throws Exception {

MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
mock.expectedBodiesReceived("consumer");
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), null);
mock.expectedMessageCount(1);

context.start();
context.createProducerTemplate().sendBody("direct:source", "");

mock.assertIsSatisfied();
}

@ParameterizedTest
@EnumSource(CloudEvents.class)
void testReplyCloudEventHeaders(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.source,
"from",
"localhost",
port,
KnativeSupport.mapOf(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)),
endpoint(
Knative.EndpointKind.sink,
"to",
"localhost",
port,
KnativeSupport.mapOf(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);

context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("knative:endpoint/from?replyWithCloudEvent=true")
.convertBodyTo(String.class)
.setBody()
.constant("consumer")
.setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())
.constant("custom");
from("direct:source")
.to("knative://endpoint/to")
.log("${body}")
.to("mock:to");
}
});

MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
mock.expectedBodiesReceived("consumer");
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "custom");
mock.expectedMessageCount(1);

context.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class KnativeConfiguration implements Cloneable {
private String apiVersion;
@UriParam(label = "advanced")
private String kind;
@UriParam(label = "consumer", defaultValue = "false")
private boolean replyWithCloudEvent = false;

public KnativeConfiguration() {
}
Expand Down Expand Up @@ -79,6 +81,23 @@ public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}

public boolean isReplyWithCloudEvent() {
return replyWithCloudEvent;
}

/**
* Transforms the reply into a cloud event that will be processed by the caller.
*
* When listening to events from a Knative Broker, if this flag is enabled, replies will
* be published to the same Broker where the request comes from (beware that if you don't
* change the "type" of the received message, you may create a loop and receive your same reply).
*
* When this flag is disabled, CloudEvent headers are removed from the reply.
*/
public void setReplyWithCloudEvent(boolean replyWithCloudEvent) {
this.replyWithCloudEvent = replyWithCloudEvent;
}

@Deprecated
public boolean isJsonSerializationEnabled() {
return jsonSerializationEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public Producer createProducer() throws Exception {
public Consumer createConsumer(Processor processor) throws Exception {
final KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(Knative.EndpointKind.source);
final Processor ceProcessor = cloudEvent.consumer(this, service);
final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor);
final Processor replyProcessor = new KnativeReplyProcessor(this, service, cloudEvent, configuration.isReplyWithCloudEvent());
final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor, replyProcessor);
final Consumer consumer = getComponent().getTransport().createConsumer(this, service, pipeline);

PropertyBindingSupport.build()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative;

import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.component.knative.ce.CloudEventProcessor;
import org.apache.camel.component.knative.spi.CloudEvent;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.support.processor.DelegateAsyncProcessor;

/**
* The KnativeReplyProcessor handles the processing of replies returned by the consumer.
*/
public class KnativeReplyProcessor extends DelegateAsyncProcessor {

private final boolean cloudEventEnabled;

private final CloudEventProcessor cloudEventProcessor;

public KnativeReplyProcessor(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, CloudEventProcessor cloudEventProcessor, boolean cloudEventEnabled) {
super(cloudEventEnabled ? cloudEventProcessor.producer(endpoint, service) : null);

this.cloudEventEnabled = cloudEventEnabled;
this.cloudEventProcessor = cloudEventProcessor;
}

@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
if (cloudEventEnabled) {
// Delegate to CloudEvent processor
return processor.process(exchange, callback);
}

// remove CloudEvent headers
for (CloudEvent.Attribute attr : cloudEventProcessor.cloudEvent().attributes()) {
exchange.getMessage().removeHeader(attr.http());
}
callback.done(true);
return true;
}

}

0 comments on commit 933f2a5

Please sign in to comment.