Skip to content

Commit

Permalink
Fix tests (apache#24)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Fix void type handling
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent fc1f71c commit 5f3b93e
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class JavaInstance {
private RequestHandler requestHandler;
private RawRequestHandler rawRequestHandler;
private ExecutorService executorService;
private JavaExecutionResult executionResult;
private SerDe serDe;

public static Object createObject(String userClassName) {
Expand Down Expand Up @@ -88,7 +87,6 @@ public JavaInstance(JavaInstanceConfig config, Object object) {
}

executorService = Executors.newFixedThreadPool(1);
this.executionResult = new JavaExecutionResult();
this.serDe = config.getSerDe();
}

Expand All @@ -99,14 +97,16 @@ private void computeInputAndOutputTypes() {
}

private void verifySupportedType(Type type, boolean allowVoid) {
if (!(supportedInputTypes.contains(type) || (allowVoid && !type.equals(Void.TYPE)))) {
if (!allowVoid && !supportedInputTypes.contains(type)) {
throw new RuntimeException("Non Basic types not yet supported: " + type);
} else if (!(supportedInputTypes.contains(type) || type.equals(Void.class))) {
throw new RuntimeException("Non Basic types not yet supported: " + type);
}
}

public JavaExecutionResult handleMessage(String messageId, String topicName, byte[] data) {
context.setCurrentMessageContext(messageId, topicName);
executionResult.reset();
JavaExecutionResult executionResult = new JavaExecutionResult();
Future<?> future = executorService.submit(() -> {
if (requestHandler != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ public UnSupportedClass handleRequest(String input, Context context) throws Exce
}
}

private class VoidInputHandler implements RequestHandler<Void, String> {
@Override
public String handleRequest(Void input, Context context) throws Exception {
return new String("Interesting");
}
}

private class VoidOutputHandler implements RequestHandler<String, Void> {
@Override
public Void handleRequest(String input, Context context) throws Exception {
return null;
}
}
/**
* Verify that functions running longer than time budget fails with Timeout exception
* @throws Exception
Expand Down Expand Up @@ -147,4 +160,36 @@ public void testUnsupportedClasses() {
assertFalse(true);
}
}

/**
* Verify that JavaInstance does not support functions that take Void type as input
*/
@Test
public void testVoidInputClasses() {
JavaInstanceConfig config = new JavaInstanceConfig();
try {
JavaInstance instance = new JavaInstance(config, new VoidInputHandler());
assertFalse(true);
} catch (RuntimeException ex) {
// Good
} catch (Exception ex) {
assertFalse(true);
}
}

/**
* Verify that JavaInstance does support functions that output Void type
*/
@Test
public void testVoidOutputClasses() {
JavaInstanceConfig config = new JavaInstanceConfig();
config.setTimeBudgetInMs(2000);
config.setSerDe(new JavaSerDe());
JavaInstance instance = new JavaInstance(config, new VoidOutputHandler());
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage("1", "r", serialize(testString));
assertNull(result.getUserException());
assertNull(result.getTimeoutException());
assertNull(result.getResult());
}
}

0 comments on commit 5f3b93e

Please sign in to comment.