Skip to content

Commit

Permalink
kamelets: create a camel-kamelet component #375
Browse files Browse the repository at this point in the history
  • Loading branch information
davsclaus authored and lburgazzoli committed Sep 14, 2020
1 parent 4684d10 commit c7143ee
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,21 @@
*/
package org.apache.camel.component.kamelet;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.model.ModelCamelContext;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
import org.apache.camel.support.EventNotifierSupport;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.StringHelper;

@Component(Kamelet.SCHEME)
Expand All @@ -35,14 +43,19 @@ public KameletComponent(CamelContext context) {
super(context);
}

// use as temporary to keep track of created kamelet endpoints during startup as we need to defer
// create routes from templates until camel context has finished loading all routes and whatnot
private final List<KameletEndpoint> endpoints = new ArrayList<>();
private volatile RouteTemplateEventNotifier notifier;

@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
final String templateId = extractTemplateId(remaining);
final String routeId = extractRouteId(remaining);

//
// The properties for the kamelets are determined by global properties
// and local endpoint parametes,
// and local endpoint parameters,
//
// Global parameters are loaded in the following order:
//
Expand Down Expand Up @@ -104,4 +117,80 @@ private Map<String, Object> extractKameletProperties(String... elements) {

return properties;
}

@Override
protected void doInit() throws Exception {
super.doInit();

if (!getCamelContext().isRunAllowed()) {
// setup event listener which must be started to get triggered during initialization of camel context
notifier = new RouteTemplateEventNotifier(this);
ServiceHelper.startService(notifier);
getCamelContext().getManagementStrategy().addEventNotifier(notifier);
}
}

@Override
protected void doStop() throws Exception {
if (notifier != null) {
ServiceHelper.stopService(notifier);
getCamelContext().getManagementStrategy().removeEventNotifier(notifier);
notifier = null;
}
super.doStop();
}

void onEndpointAdd(KameletEndpoint endpoint) {
if (notifier == null) {
try {
addRouteFromTemplate(endpoint);
} catch (Exception e) {
throw RuntimeCamelException.wrapRuntimeException(e);
}
} else {
// remember endpoints as we defer adding routes for them till later
this.endpoints.add(endpoint);
}
}

void addRouteFromTemplate(KameletEndpoint endpoint) throws Exception {
ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class);
String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
RouteDefinition def = context.getRouteDefinition(id);
if (!def.isPrepared()) {
List<RouteDefinition> list = new ArrayList<>(1);
list.add(def);
context.startRouteDefinitions(list);
}
}

private static class RouteTemplateEventNotifier extends EventNotifierSupport {

private final KameletComponent component;

public RouteTemplateEventNotifier(KameletComponent component) {
this.component = component;
}

@Override
public void notify(CamelEvent event) throws Exception {
for (KameletEndpoint endpoint : component.endpoints) {
component.addRouteFromTemplate(endpoint);
}
component.endpoints.clear();
// we were only needed during initializing/starting up camel, so remove after use
ServiceHelper.stopService(this);
component.getCamelContext().getManagementStrategy().removeEventNotifier(this);
component.notifier = null;
}

@Override
public boolean isEnabled(CamelEvent event) {
// we only care about this event during startup as its triggered when
// all route and route template definitions have been added and prepared
// so this allows us to hook into the right moment
return event instanceof CamelEvent.CamelContextInitializedEvent;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.Map;

import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProducer;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
Expand All @@ -26,9 +28,9 @@
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.support.DefaultProducer;
import org.apache.camel.support.service.ServiceHelper;

@UriEndpoint(
Expand Down Expand Up @@ -64,6 +66,11 @@ public KameletEndpoint(
this.kameletUri = "direct:" + routeId;
}

@Override
public KameletComponent getComponent() {
return (KameletComponent) super.getComponent();
}

public String getTemplateId() {
return templateId;
}
Expand All @@ -72,6 +79,10 @@ public String getRouteId() {
return routeId;
}

public Map<String, Object> getKameletProperties() {
return kameletProperties;
}

@Override
public Producer createProducer() throws Exception {
return new KameletProducer();
Expand All @@ -81,21 +92,14 @@ public Producer createProducer() throws Exception {
public Consumer createConsumer(Processor processor) throws Exception {
Consumer answer = new KemeletConsumer(processor);
configureConsumer(answer);

return answer;
}

@Override
protected void doStart() throws Exception {
try {
// Add a route to the camel context from the given template
// TODO: add validation (requires: https://issues.apache.org/jira/browse/CAMEL-15312)
getCamelContext().addRouteFromTemplate(routeId, templateId, kameletProperties);
} catch (Exception e) {
throw new IllegalArgumentException(e);
}

super.doStart();
protected void doInit() throws Exception {
super.doInit();
// only need to add during init phase
getComponent().onEndpointAdd(this);
}

// *********************************
Expand All @@ -117,52 +121,46 @@ protected void doStart() throws Exception {
endpoint = getCamelContext().getEndpoint(kameletUri);
consumer = endpoint.createConsumer(getProcessor());

ServiceHelper.startService(endpoint);
ServiceHelper.startService(consumer);

ServiceHelper.startService(endpoint, consumer);
super.doStart();
}

@Override
protected void doStop() throws Exception {
ServiceHelper.stopService(endpoint);
ServiceHelper.stopService(consumer);

ServiceHelper.stopService(consumer, endpoint);
super.doStop();
}
}

private class KameletProducer extends DefaultProducer {
private class KameletProducer extends DefaultAsyncProducer {
private volatile Endpoint endpoint;
private volatile Producer producer;
private volatile AsyncProducer producer;

public KameletProducer() {
super(KameletEndpoint.this);
}

@Override
public void process(Exchange exchange) throws Exception {
public boolean process(Exchange exchange, AsyncCallback callback) {
if (producer != null) {
producer.process(exchange);
return producer.process(exchange, callback);
} else {
callback.done(true);
return true;
}
}

@Override
protected void doStart() throws Exception {
endpoint = getCamelContext().getEndpoint(kameletUri);
producer = endpoint.createProducer();

ServiceHelper.startService(endpoint);
ServiceHelper.startService(producer);

producer = endpoint.createAsyncProducer();
ServiceHelper.startService(endpoint, producer);
super.doStart();
}

@Override
protected void doStop() throws Exception {
ServiceHelper.stopService(endpoint);
ServiceHelper.stopService(producer);

ServiceHelper.stopService(producer, endpoint);
super.doStop();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.kamelet;

import java.util.UUID;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import static org.assertj.core.api.Assertions.assertThat;

public class KameletAddAfterCamelStartedTest {
private static final Logger LOGGER = LoggerFactory.getLogger(KameletAddAfterCamelStartedTest.class);

@Test
public void test() throws Exception {
String body = UUID.randomUUID().toString();

CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
routeTemplate("setBody")
.templateParameter("bodyValue")
.from("direct:{{routeId}}")
.setBody().constant("{{bodyValue}}");
}
});

/*
context.addRouteFromTemplate("setBody")
.routeId("test")
.parameter("routeId", "test")
.parameter("bodyValue", body)
.build();
*/

// start camel here and add routes with kamelts later
context.start();

context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
// routes
from("direct:template")
.toF("kamelet:setBody/test?bodyValue=%s", body)
.to("log:1");
}
});

assertThat(
context.createFluentProducerTemplate().to("direct:template").withBody("test").request(String.class)
).isEqualTo(body);

context.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void configure() throws Exception {
public void configure() throws Exception {
// routes
from("direct:template")
.to("kamelet:setBody/test?bodyValue=bv")
.toF("kamelet:setBody/test?bodyValue=%s", body)
.to("log:1");
}
});
Expand Down

0 comments on commit c7143ee

Please sign in to comment.