Skip to content

Commit

Permalink
Opt (apache#22)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Don't wait for the result. Instead chain it via CompletedFuture
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 228e01b commit 9766676
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
public class ExclamationFunction implements RequestHandler<String, String> {
@Override
public String handleRequest(String input, Context context) {
System.out.println("input = " + input);
return input + "!";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ class Payload {
public String topicName;
public String messageId;
public byte[] msgData;
CompletableFuture<ExecutionResult> result;
Payload(String topicName, String messageId, byte[] msgData) {
this.topicName = topicName;
this.messageId = messageId;
this.msgData = msgData;
this.result = new CompletableFuture<>();
}
}

Expand All @@ -59,15 +61,13 @@ class Payload {
private JavaInstance javaInstance;
private final FunctionCacheManager fnCache;
private LinkedBlockingQueue<Payload> queue;
private LinkedBlockingQueue<JavaExecutionResult> resultQueue;
private String id;

ThreadFunctionContainer(JavaInstanceConfig instanceConfig,
FunctionCacheManager fnCache, ThreadGroup threadGroup) {
this.javaInstanceConfig = instanceConfig;
this.fnCache = fnCache;
this.queue = new LinkedBlockingQueue<>();
this.resultQueue = new LinkedBlockingQueue<>();
this.id = "fn-" + instanceConfig.getFunctionConfig().getName() + "-instance-" + instanceConfig.getInstanceId();
this.fnThread = new Thread(threadGroup,
new Runnable() {
Expand All @@ -81,7 +81,9 @@ public void run() {
Payload payload = queue.take();
result = javaInstance.handleMessage(payload.messageId,
payload.topicName, payload.msgData);
resultQueue.offer(result);
ExecutionResult actualResult = ExecutionResult.fromJavaResult(result,
javaInstanceConfig.getSerDe());
payload.result.complete(actualResult);
} catch (InterruptedException ie) {
log.info("Function thread {} is interrupted", ie);
}
Expand Down Expand Up @@ -147,16 +149,9 @@ public void stop() {

@Override
public CompletableFuture<ExecutionResult> sendMessage(String topicName, String messageId, byte[] data) {
queue.offer(new Payload(topicName, messageId, data));
try {
JavaExecutionResult result = resultQueue.take();
return CompletableFuture.completedFuture(ExecutionResult.fromJavaResult(result,
javaInstanceConfig.getSerDe()));
} catch (InterruptedException e) {
CompletableFuture<ExecutionResult> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
Payload payload = new Payload(topicName, messageId, data);
queue.offer(payload);
return payload.result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public void start() throws Exception {
if (null == msg) {
continue;
}
log.info("Received message {}", msg);
String messageId = convertMessageIdToString(msg.getMessageId());
for (FunctionContainer subscriber : subscriberMap.values()) {
subscriber.sendMessage(topicName, messageId, msg.getData())
Expand Down

0 comments on commit 9766676

Please sign in to comment.