Skip to content

Commit

Permalink
fix apache/camel-k#1119: moving transport related stuff out of core
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaferraro committed Dec 13, 2019
1 parent 933f2a5 commit 2a2e6e1
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,25 @@ public interface KnativeTransport extends Service {
* Create a camel {@link Producer} in place of the original endpoint for a specific protocol.
*
* @param endpoint the endpoint for which the producer should be created
* @param configuration the general transport configuration
* @param service the service definition containing information about how make reach the target service.
* @return
*/
Producer createProducer(
Endpoint endpoint,
KnativeTransportConfiguration configuration,
KnativeEnvironment.KnativeServiceDefinition service);

/**
* Create a camel {@link Consumer} in place of the original endpoint for a specific protocol.
*
* @param endpoint the endpoint for which the consumer should be created.
* @param configuration the general transport configuration
* @param service the service definition containing information about how make the route reachable from knative.
* @return
*/
Consumer createConsumer(
Endpoint endpoint,
KnativeTransportConfiguration configuration,
KnativeEnvironment.KnativeServiceDefinition service, Processor processor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative.spi;

public final class KnativeTransportConfiguration {

private final CloudEvent cloudEvent;

private final boolean removeCloudEventHeadersInReply;

public KnativeTransportConfiguration(CloudEvent cloudEvent, boolean removeCloudEventHeadersInReply) {
this.cloudEvent = cloudEvent;
this.removeCloudEventHeadersInReply = removeCloudEventHeadersInReply;
}

public CloudEvent getCloudEvent() {
return cloudEvent;
}

public boolean isRemoveCloudEventHeadersInReply() {
return removeCloudEventHeadersInReply;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@
import java.util.stream.Collectors;

import io.vertx.core.http.HttpServerRequest;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.knative.spi.CloudEvent;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
import org.apache.camel.util.ObjectHelper;

public final class KnativeHttpSupport {
public final class KnativeHttpSupport {
private KnativeHttpSupport() {
}

Expand Down Expand Up @@ -94,4 +99,23 @@ public static Predicate<HttpServerRequest> createFilter(KnativeEnvironment.Knati
return true;
};
}

/**
* Removes cloud event headers at the end of the processing.
*/
public static Processor withoutCloudEventHeaders(Processor delegate, CloudEvent ce) {
return new DelegateAsyncProcessor(delegate) {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
return processor.process(exchange, doneSync -> {
// remove CloudEvent headers
for (CloudEvent.Attribute attr : ce.attributes()) {
exchange.getMessage().removeHeader(attr.http());
}
callback.done(doneSync);
});
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.camel.Producer;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.knative.spi.KnativeTransport;
import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
Expand Down Expand Up @@ -198,12 +199,17 @@ protected void doStop() throws Exception {
// *****************************

@Override
public Producer createProducer(Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service) {
return new KnativeHttpProducer(this, endpoint, service, vertx, vertxHttpClientOptions);
}

@Override
public Consumer createConsumer(Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
return new KnativeHttpConsumer(this, endpoint, service, processor);
public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
Processor next = processor;
if (config.isRemoveCloudEventHeadersInReply()) {
next = KnativeHttpSupport.withoutCloudEventHeaders(processor, config.getCloudEvent());
}
return new KnativeHttpConsumer(this, endpoint, service, next);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class KnativeConfiguration implements Cloneable {
@UriParam(label = "advanced")
private String kind;
@UriParam(label = "consumer", defaultValue = "false")
private boolean replyWithCloudEvent = false;
private boolean replyWithCloudEvent;

public KnativeConfiguration() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import org.apache.camel.component.knative.spi.CloudEvent;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.support.PropertyBindingSupport;


/**
* This component allows to interact with KNative events.
*/
Expand Down Expand Up @@ -76,7 +76,7 @@ public Producer createProducer() throws Exception {
final KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(Knative.EndpointKind.sink);
final Processor ceProcessor = cloudEvent.producer(this, service);
final Processor ceConverter = new KnativeConversionProcessor(configuration.isJsonSerializationEnabled());
final Producer producer = getComponent().getTransport().createProducer(this, service);
final Producer producer = getComponent().getTransport().createProducer(this, createTransportConfiguration(), service);

PropertyBindingSupport.build()
.withCamelContext(getCamelContext())
Expand All @@ -94,7 +94,7 @@ public Consumer createConsumer(Processor processor) throws Exception {
final Processor ceProcessor = cloudEvent.consumer(this, service);
final Processor replyProcessor = new KnativeReplyProcessor(this, service, cloudEvent, configuration.isReplyWithCloudEvent());
final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor, replyProcessor);
final Consumer consumer = getComponent().getTransport().createConsumer(this, service, pipeline);
final Consumer consumer = getComponent().getTransport().createConsumer(this, createTransportConfiguration(), service, pipeline);

PropertyBindingSupport.build()
.withCamelContext(getCamelContext())
Expand Down Expand Up @@ -149,7 +149,7 @@ KnativeEnvironment.KnativeServiceDefinition lookupServiceDefinition(Knative.Endp
Map<String, String> metadata = new HashMap<>();
metadata.putAll(service.get().getMetadata());

for (Map.Entry<String, Object> entry: configuration.getFilters().entrySet()) {
for (Map.Entry<String, Object> entry : configuration.getFilters().entrySet()) {
String key = entry.getKey();
Object val = entry.getValue();

Expand All @@ -162,7 +162,7 @@ KnativeEnvironment.KnativeServiceDefinition lookupServiceDefinition(Knative.Endp
}
}

for (Map.Entry<String, Object> entry: configuration.getCeOverride().entrySet()) {
for (Map.Entry<String, Object> entry : configuration.getCeOverride().entrySet()) {
String key = entry.getKey();
Object val = entry.getValue();

Expand Down Expand Up @@ -211,4 +211,11 @@ Optional<KnativeEnvironment.KnativeServiceDefinition> lookupServiceDefinition(St
})
.findFirst();
}

private KnativeTransportConfiguration createTransportConfiguration() {
return new KnativeTransportConfiguration(
this.cloudEvent.cloudEvent(),
!this.configuration.isReplyWithCloudEvent()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.component.knative.ce.CloudEventProcessor;
import org.apache.camel.component.knative.spi.CloudEvent;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.support.processor.DelegateAsyncProcessor;

Expand All @@ -32,7 +31,8 @@ public class KnativeReplyProcessor extends DelegateAsyncProcessor {

private final CloudEventProcessor cloudEventProcessor;

public KnativeReplyProcessor(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, CloudEventProcessor cloudEventProcessor, boolean cloudEventEnabled) {
public KnativeReplyProcessor(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, CloudEventProcessor cloudEventProcessor,
boolean cloudEventEnabled) {
super(cloudEventEnabled ? cloudEventProcessor.producer(endpoint, service) : null);

this.cloudEventEnabled = cloudEventEnabled;
Expand All @@ -46,10 +46,6 @@ public boolean process(Exchange exchange, AsyncCallback callback) {
return processor.process(exchange, callback);
}

// remove CloudEvent headers
for (CloudEvent.Attribute attr : cloudEventProcessor.cloudEvent().attributes()) {
exchange.getMessage().removeHeader(attr.http());
}
callback.done(true);
return true;
}
Expand Down

0 comments on commit 2a2e6e1

Please sign in to comment.