From da66b2d40f21762648003fcaa081fed773d5822a Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Thu, 21 Dec 2017 00:04:25 -0800 Subject: [PATCH] Fix compilation (#35) * Create pulsar-functions module (#1) * Create pulsar-functions module * rename `sdk` package to `api` * Added the first cut of the Java interface for Pulsar functions (#2) * Fix compilation issue --- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 2 +- .../runtime/container/FunctionContainerFactory.java | 2 +- .../runtime/container/ThreadFunctionContainer.java | 6 ++++-- .../container/ThreadFunctionContainerFactory.java | 5 +++-- .../pulsar/functions/runtime/spawner/Spawner.java | 13 +++++++++---- .../container/ThreadFunctionContainerTest.java | 2 +- 6 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index f41476e22bb92..39c19ba73494b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -108,7 +108,7 @@ void run_functions_cmd() throws Exception { Spawner spawner = Spawner.createSpawner( functionConfig, limitsConfig, - admin.getServiceUrl().toString()); + admin.getServiceUrl().toString(), jarFile); spawner.start(); spawner.join(); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/FunctionContainerFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/FunctionContainerFactory.java index 7fd21188ebeb1..ee69cd5606afe 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/FunctionContainerFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/FunctionContainerFactory.java @@ -33,7 +33,7 @@ public interface FunctionContainerFactory extends AutoCloseable { * @return function container to start/stop instance */ FunctionContainer createContainer( - JavaInstanceConfig instanceConfig); + JavaInstanceConfig instanceConfig, String codeFile); @Override void close(); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainer.java index ee8873342583c..0ccd1b37d4acc 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainer.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainer.java @@ -64,13 +64,15 @@ class Payload { private final FunctionCacheManager fnCache; private LinkedBlockingQueue queue; private String id; + private String jarFile; ThreadFunctionContainer(JavaInstanceConfig instanceConfig, int maxBufferedTuples, - FunctionCacheManager fnCache, ThreadGroup threadGroup) { + FunctionCacheManager fnCache, ThreadGroup threadGroup, String jarFile) { this.javaInstanceConfig = instanceConfig; this.fnCache = fnCache; this.queue = new LinkedBlockingQueue<>(maxBufferedTuples); this.id = "fn-" + instanceConfig.getFunctionConfig().getName() + "-instance-" + instanceConfig.getInstanceId(); + this.jarFile = jarFile; this.fnThread = new Thread(threadGroup, new Runnable() { @Override @@ -111,7 +113,7 @@ public void start() throws Exception { fnCache.registerFunctionInstance( javaInstanceConfig.getFunctionId(), javaInstanceConfig.getInstanceId(), - Arrays.asList(javaInstanceConfig.getFunctionConfig().getCodeFile()), + Arrays.asList(jarFile), Collections.emptyList()); log.info("Initialize function class loader for function {} at function cache manager", javaInstanceConfig.getFunctionConfig().getName()); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainerFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainerFactory.java index 8e2193e49d4bc..fc94436d56fbc 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainerFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainerFactory.java @@ -40,12 +40,13 @@ public ThreadFunctionContainerFactory(int maxBufferedTuples) { } @Override - public ThreadFunctionContainer createContainer(JavaInstanceConfig instanceConfig) { + public ThreadFunctionContainer createContainer(JavaInstanceConfig instanceConfig, String jarFile) { return new ThreadFunctionContainer( instanceConfig, maxBufferedTuples, fnCache, - threadGroup); + threadGroup, + jarFile); } @Override diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/Spawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/Spawner.java index 5151ce2a5d9fd..57ea1cf58ab01 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/Spawner.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/Spawner.java @@ -35,7 +35,8 @@ public class Spawner { public static Spawner createSpawner(FunctionConfig fnConfig, LimitsConfig limitsConfig, - String pulsarBrokerRootUrl) { + String pulsarBrokerRootUrl, + String codeFile) { AssignmentInfo assignmentInfo = new AssignmentInfo( fnConfig, new FunctionID(), @@ -44,7 +45,8 @@ public static Spawner createSpawner(FunctionConfig fnConfig, return new Spawner( limitsConfig, assignmentInfo, - pulsarBrokerRootUrl); + pulsarBrokerRootUrl, + codeFile); } private LimitsConfig limitsConfig; @@ -53,17 +55,20 @@ public static Spawner createSpawner(FunctionConfig fnConfig, private ThreadFunctionContainerFactory threadFunctionContainerFactory; private FunctionContainer functionContainer; private SubscriberManager subscriberManager; + private String codeFile; - public Spawner(LimitsConfig limitsConfig, AssignmentInfo assignmentInfo, String pulsarBrokerRootUrl) { + public Spawner(LimitsConfig limitsConfig, AssignmentInfo assignmentInfo, String pulsarBrokerRootUrl, + String codeFile) { this.limitsConfig = limitsConfig; this.assignmentInfo = assignmentInfo; this.pulsarBrokerRootUrl = pulsarBrokerRootUrl; this.threadFunctionContainerFactory = new ThreadFunctionContainerFactory(limitsConfig.getMaxBufferedTuples()); + this.codeFile = codeFile; } public void start() throws Exception { subscriberManager = new SubscriberManager(createSubscriptionName(), pulsarBrokerRootUrl); - functionContainer = threadFunctionContainerFactory.createContainer(createJavaInstanceConfig()); + functionContainer = threadFunctionContainerFactory.createContainer(createJavaInstanceConfig(), codeFile); subscriberManager.addSubscriber(assignmentInfo.getFunctionConfig().getSourceTopic(), functionContainer); functionContainer.start(); } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainerTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainerTest.java index 3c8f71b226390..de3bbd9f3be98 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainerTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainerTest.java @@ -97,7 +97,7 @@ JavaInstanceConfig createJavaInstanceConfig() { public void testConstructor() { JavaInstanceConfig config = createJavaInstanceConfig(); - ThreadFunctionContainer container = factory.createContainer(config); + ThreadFunctionContainer container = factory.createContainer(config, jarFile); assertEquals( "fn-" + config.getFunctionConfig().getName() + "-instance-" + config.getInstanceId(), container.getFnThread().getName());