Skip to content

Commit

Permalink
kamelets: polished apache#375
Browse files Browse the repository at this point in the history
  • Loading branch information
davsclaus authored and lburgazzoli committed Aug 11, 2020
1 parent 7473f18 commit daa2e5a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 21 deletions.
2 changes: 1 addition & 1 deletion camel-kamelet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core-engine</artifactId>
<artifactId>camel-support</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.Map;

import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProducer;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
Expand All @@ -26,9 +28,9 @@
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.support.DefaultProducer;
import org.apache.camel.support.service.ServiceHelper;

@UriEndpoint(
Expand Down Expand Up @@ -81,7 +83,6 @@ public Producer createProducer() throws Exception {
public Consumer createConsumer(Processor processor) throws Exception {
Consumer answer = new KemeletConsumer(processor);
configureConsumer(answer);

return answer;
}

Expand Down Expand Up @@ -117,52 +118,46 @@ protected void doStart() throws Exception {
endpoint = getCamelContext().getEndpoint(kameletUri);
consumer = endpoint.createConsumer(getProcessor());

ServiceHelper.startService(endpoint);
ServiceHelper.startService(consumer);

ServiceHelper.startService(endpoint, consumer);
super.doStart();
}

@Override
protected void doStop() throws Exception {
ServiceHelper.stopService(endpoint);
ServiceHelper.stopService(consumer);

ServiceHelper.stopService(consumer, endpoint);
super.doStop();
}
}

private class KameletProducer extends DefaultProducer {
private class KameletProducer extends DefaultAsyncProducer {
private volatile Endpoint endpoint;
private volatile Producer producer;
private volatile AsyncProducer producer;

public KameletProducer() {
super(KameletEndpoint.this);
}

@Override
public void process(Exchange exchange) throws Exception {
public boolean process(Exchange exchange, AsyncCallback callback) {
if (producer != null) {
producer.process(exchange);
return producer.process(exchange, callback);
} else {
callback.done(true);
return true;
}
}

@Override
protected void doStart() throws Exception {
endpoint = getCamelContext().getEndpoint(kameletUri);
producer = endpoint.createProducer();

ServiceHelper.startService(endpoint);
ServiceHelper.startService(producer);

producer = endpoint.createAsyncProducer();
ServiceHelper.startService(endpoint, producer);
super.doStart();
}

@Override
protected void doStop() throws Exception {
ServiceHelper.stopService(endpoint);
ServiceHelper.stopService(producer);

ServiceHelper.stopService(producer, endpoint);
super.doStop();
}
}
Expand Down

0 comments on commit daa2e5a

Please sign in to comment.