diff --git a/camel-kamelet/pom.xml b/camel-kamelet/pom.xml index 33e09c34f..7083b874f 100644 --- a/camel-kamelet/pom.xml +++ b/camel-kamelet/pom.xml @@ -43,7 +43,7 @@ org.apache.camel - camel-core-engine + camel-support org.apache.camel diff --git a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java index 22cd5430b..d3f0eac62 100644 --- a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java +++ b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java @@ -18,6 +18,8 @@ import java.util.Map; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProducer; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -26,9 +28,9 @@ import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriPath; +import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.DefaultEndpoint; -import org.apache.camel.support.DefaultProducer; import org.apache.camel.support.service.ServiceHelper; @UriEndpoint( @@ -81,7 +83,6 @@ public Producer createProducer() throws Exception { public Consumer createConsumer(Processor processor) throws Exception { Consumer answer = new KemeletConsumer(processor); configureConsumer(answer); - return answer; } @@ -117,52 +118,46 @@ protected void doStart() throws Exception { endpoint = getCamelContext().getEndpoint(kameletUri); consumer = endpoint.createConsumer(getProcessor()); - ServiceHelper.startService(endpoint); - ServiceHelper.startService(consumer); - + ServiceHelper.startService(endpoint, consumer); super.doStart(); } @Override protected void doStop() throws Exception { - ServiceHelper.stopService(endpoint); - ServiceHelper.stopService(consumer); - + ServiceHelper.stopService(consumer, endpoint); super.doStop(); } } - private class KameletProducer extends DefaultProducer { + private class KameletProducer extends DefaultAsyncProducer { private volatile Endpoint endpoint; - private volatile Producer producer; + private volatile AsyncProducer producer; public KameletProducer() { super(KameletEndpoint.this); } @Override - public void process(Exchange exchange) throws Exception { + public boolean process(Exchange exchange, AsyncCallback callback) { if (producer != null) { - producer.process(exchange); + return producer.process(exchange, callback); + } else { + callback.done(true); + return true; } } @Override protected void doStart() throws Exception { endpoint = getCamelContext().getEndpoint(kameletUri); - producer = endpoint.createProducer(); - - ServiceHelper.startService(endpoint); - ServiceHelper.startService(producer); - + producer = endpoint.createAsyncProducer(); + ServiceHelper.startService(endpoint, producer); super.doStart(); } @Override protected void doStop() throws Exception { - ServiceHelper.stopService(endpoint); - ServiceHelper.stopService(producer); - + ServiceHelper.stopService(producer, endpoint); super.doStop(); } }