Skip to content

Commit

Permalink
knative: split producer and consumer #521
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Nov 11, 2020
1 parent c9ada01 commit d3e54ee
Show file tree
Hide file tree
Showing 19 changed files with 378 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
import io.quarkus.vertx.http.deployment.BodyHandlerBuildItem;
import io.quarkus.vertx.http.deployment.VertxWebRouterBuildItem;
import org.apache.camel.component.knative.KnativeComponent;
import org.apache.camel.component.knative.KnativeConstants;
import org.apache.camel.component.knative.spi.KnativeConsumerFactory;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.knative.spi.KnativeProducerFactory;
import org.apache.camel.k.quarkus.knative.KnativeRecorder;
import org.apache.camel.quarkus.core.deployment.spi.CamelRuntimeBeanBuildItem;
import org.apache.camel.quarkus.core.deployment.spi.CamelServiceFilter;
Expand Down Expand Up @@ -63,16 +65,20 @@ List<CamelServiceFilterBuildItem> servicesFilters() {
CamelRuntimeBeanBuildItem knativeComponent(
KnativeRecorder recorder,
CoreVertxBuildItem vertx,
VertxWebRouterBuildItem router,
BodyHandlerBuildItem bodyHandlerBuildItem) {
VertxWebRouterBuildItem router) {

RuntimeValue<KnativeProducerFactory> producerFactory = vertx != null
? recorder.createKnativeHttpProducerFactory(vertx.getVertx())
: null;

RuntimeValue<KnativeConsumerFactory> consumerFactory = router != null
? recorder.createKnativeHttpConsumerFactory(router.getRouter())
: null;

return new CamelRuntimeBeanBuildItem(
KnativeConstants.SCHEME,
KnativeComponent.class.getName(),
recorder.createKnativeComponent(
vertx.getVertx(),
router.getRouter(),
bodyHandlerBuildItem.getHandler())
recorder.createKnativeComponent(producerFactory, consumerFactory)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,47 @@

import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import org.apache.camel.component.knative.KnativeComponent;
import org.apache.camel.component.knative.http.KnativeHttpTransport;
import org.apache.camel.component.platform.http.vertx.VertxPlatformHttpRouter;
import org.apache.camel.component.knative.http.KnativeHttpConsumerFactory;
import org.apache.camel.component.knative.http.KnativeHttpProducerFactory;
import org.apache.camel.component.knative.spi.KnativeConsumerFactory;
import org.apache.camel.component.knative.spi.KnativeProducerFactory;

@Recorder
public class KnativeRecorder {

public RuntimeValue<KnativeProducerFactory> createKnativeHttpProducerFactory(
Supplier<Vertx> vertx) {

KnativeHttpProducerFactory producerFactory = new KnativeHttpProducerFactory();
producerFactory.setVertx(vertx.get());

return new RuntimeValue<>(producerFactory);
}

public RuntimeValue<KnativeConsumerFactory> createKnativeHttpConsumerFactory(
RuntimeValue<Router> router) {

KnativeHttpConsumerFactory consumerFactory = new KnativeHttpConsumerFactory();
consumerFactory.setRouter(router.getValue());

return new RuntimeValue<>(consumerFactory);
}

public RuntimeValue<KnativeComponent> createKnativeComponent(
Supplier<Vertx> vertx,
RuntimeValue<Router> router,
Handler<RoutingContext> bodyHandler) {

KnativeHttpTransport transport = new KnativeHttpTransport();
transport.setRouter(new VertxPlatformHttpRouter(vertx.get(), router.getValue()) {
@Override
public Handler<RoutingContext> bodyHandler() {
return bodyHandler;
}
});
RuntimeValue<KnativeProducerFactory> producerFactory,
RuntimeValue<KnativeConsumerFactory> consumerFactory) {

KnativeComponent component = new KnativeComponent();
component.setTransport(transport);

if (producerFactory != null) {
component.setProducerFactory(producerFactory.getValue());
}
if (consumerFactory != null) {
component.setConsumerFactory(consumerFactory.getValue());
}

return new RuntimeValue<>(component);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,9 @@
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Service;

public interface KnativeTransport extends Service {
/**
* Create a camel {@link Producer} in place of the original endpoint for a specific protocol.
*
* @param endpoint the endpoint for which the producer should be created
* @param configuration the general transport configuration
* @param service the service definition containing information about how make reach the target service.
* @return
*/
Producer createProducer(
Endpoint endpoint,
KnativeTransportConfiguration configuration,
KnativeEnvironment.KnativeResource service);

public interface KnativeConsumerFactory extends Service {
/**
* Create a camel {@link Consumer} in place of the original endpoint for a specific protocol.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative.spi;

import org.apache.camel.Endpoint;
import org.apache.camel.Producer;
import org.apache.camel.Service;

public interface KnativeProducerFactory extends Service {
/**
* Create a camel {@link Producer} in place of the original endpoint for a specific protocol.
*
* @param endpoint the endpoint for which the producer should be created
* @param configuration the general transport configuration
* @param service the service definition containing information about how make reach the target service.
* @return
*/
Producer createProducer(
Endpoint endpoint,
KnativeTransportConfiguration configuration,
KnativeEnvironment.KnativeResource service);
}
16 changes: 11 additions & 5 deletions components/camel-knative/camel-knative-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@
</dependency>

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-platform-http-vertx</artifactId>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
<version>${vertx-version}</version>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
<artifactId>vertx-web</artifactId>
<version>${vertx-version}</version>
</dependency>

Expand Down Expand Up @@ -95,12 +96,17 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-http</artifactId>
<artifactId>camel-core-languages</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core-languages</artifactId>
<artifactId>camel-platform-http-vertx</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-http</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@

import java.io.PrintWriter;
import java.io.StringWriter;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Map;
import java.util.function.Predicate;

import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
Expand All @@ -37,7 +41,6 @@
import org.apache.camel.TypeConverter;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
import org.apache.camel.component.platform.http.vertx.VertxPlatformHttpRouter;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.ExchangeHelper;
Expand All @@ -52,17 +55,19 @@ public class KnativeHttpConsumer extends DefaultConsumer {
private final KnativeTransportConfiguration configuration;
private final Predicate<HttpServerRequest> filter;
private final KnativeEnvironment.KnativeResource serviceDefinition;
private final VertxPlatformHttpRouter router;
private final Router router;
private final HeaderFilterStrategy headerFilterStrategy;

private String basePath;
private Route route;
private BigInteger maxBodySize;
private boolean preallocateBodyBuffer;

public KnativeHttpConsumer(
KnativeTransportConfiguration configuration,
Endpoint endpoint,
KnativeEnvironment.KnativeResource serviceDefinition,
VertxPlatformHttpRouter router,
Router router,
Processor processor) {

super(endpoint, processor);
Expand All @@ -72,6 +77,7 @@ public KnativeHttpConsumer(
this.router = router;
this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy();
this.filter = KnativeHttpSupport.createFilter(serviceDefinition);
this.preallocateBodyBuffer = true;
}

public String getBasePath() {
Expand All @@ -82,10 +88,29 @@ public void setBasePath(String basePath) {
this.basePath = basePath;
}

public BigInteger getMaxBodySize() {
return maxBodySize;
}

public void setMaxBodySize(BigInteger maxBodySize) {
this.maxBodySize = maxBodySize;
}

public boolean isPreallocateBodyBuffer() {
return preallocateBodyBuffer;
}

public void setPreallocateBodyBuffer(boolean preallocateBodyBuffer) {
this.preallocateBodyBuffer = preallocateBodyBuffer;
}

@Override
protected void doStart() throws Exception {
if (route == null) {
String path = ObjectHelper.supplyIfEmpty(serviceDefinition.getPath(), () -> KnativeHttpTransport.DEFAULT_PATH);
String path = serviceDefinition.getPath();
if (ObjectHelper.isEmpty(path)) {
path = "/";
}
if (ObjectHelper.isNotEmpty(basePath)) {
path = basePath + path;
}
Expand All @@ -97,8 +122,20 @@ protected void doStart() throws Exception {
path
);

BodyHandler bodyHandler = BodyHandler.create();
bodyHandler.setPreallocateBodyBuffer(this.preallocateBodyBuffer);
if (this.maxBodySize != null) {
bodyHandler.setBodyLimit(this.maxBodySize.longValueExact());
}

// add body handler
route.handler(router.bodyHandler());
route.handler(new Handler<RoutingContext>() {
@Override
public void handle(RoutingContext event) {
event.request().resume();
bodyHandler.handle(event);
}
});

// add knative handler
route.handler(routingContext -> {
Expand Down Expand Up @@ -161,7 +198,7 @@ private void handleRequest(RoutingContext routingContext) {
// from("knative:event/my.event")
// .to("http://{{env:PROJECT}}.{{env:NAMESPACE}}.svc.cluster.local/service");
//
router.vertx().executeBlocking(
routingContext.vertx().executeBlocking(
promise -> {
try {
createUoW(exchange);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative.http;

import java.util.Objects;

import io.vertx.ext.web.Router;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.component.knative.spi.KnativeConsumerFactory;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
import org.apache.camel.support.service.ServiceSupport;

public class KnativeHttpConsumerFactory extends ServiceSupport implements CamelContextAware, KnativeConsumerFactory {
private Router router;
private CamelContext camelContext;

public Router getRouter() {
return router;
}

public KnativeHttpConsumerFactory setRouter(Router router) {
this.router = router;
return this;
}

@Override
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}

@Override
public CamelContext getCamelContext() {
return camelContext;
}

@Override
public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeResource service, Processor processor) {
Objects.requireNonNull(this.router, "router");

return new KnativeHttpConsumer(
config,
endpoint,
service,
this.router,
processor);
}

}
Loading

0 comments on commit d3e54ee

Please sign in to comment.