diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 132641c8f01c7..7cd35352bd21f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -46,7 +46,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; -import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -102,7 +101,7 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.FunctionMetaDataUtils; import org.apache.pulsar.functions.utils.functions.FunctionArchive; -import org.apache.pulsar.functions.utils.functions.FunctionUtils; +import org.apache.pulsar.functions.utils.io.Connector; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; @@ -1473,10 +1472,18 @@ public StreamingOutput downloadFunction(String tenant, String namespace, String ? functionMetaData.getTransformFunctionPackageLocation().getPackagePath() : functionMetaData.getPackageLocation().getPackagePath(); - return getStreamingOutput(pkgPath); + FunctionDetails.ComponentType componentType = transformFunction + ? FunctionDetails.ComponentType.FUNCTION + : InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()); + + return getStreamingOutput(pkgPath, componentType); } private StreamingOutput getStreamingOutput(String pkgPath) { + return getStreamingOutput(pkgPath, null); + } + + private StreamingOutput getStreamingOutput(String pkgPath, FunctionDetails.ComponentType componentType) { return output -> { if (pkgPath.startsWith(Utils.HTTP)) { URL url = URI.create(pkgPath).toURL(); @@ -1489,15 +1496,7 @@ private StreamingOutput getStreamingOutput(String pkgPath) { Files.copy(file.toPath(), output); } else if (pkgPath.startsWith(Utils.BUILTIN) && !worker().getWorkerConfig().getUploadBuiltinSinksSources()) { - String sType = pkgPath.replaceFirst("^builtin://", ""); - final String connectorsDir = worker().getWorkerConfig().getConnectorsDirectory(); - log.warn("Processing package {} ; looking at the dir {}", pkgPath, connectorsDir); - TreeMap sinksOrSources = - FunctionUtils.searchForFunctions(connectorsDir, true); - Path narPath = sinksOrSources.get(sType).getArchivePath(); - if (narPath == null) { - throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir); - } + Path narPath = getBuiltinArchivePath(pkgPath, componentType); log.info("Loading {} from {}", pkgPath, narPath); try (InputStream in = new FileInputStream(narPath.toString())) { IOUtils.copy(in, output, 1024); @@ -1511,7 +1510,7 @@ private StreamingOutput getStreamingOutput(String pkgPath) { output.flush(); } } catch (Exception e) { - log.error("Failed download package {} from packageMangment Service", pkgPath, e); + log.error("Failed download package {} from packageManagement Service", pkgPath, e); } } else { @@ -1520,6 +1519,27 @@ private StreamingOutput getStreamingOutput(String pkgPath) { }; } + private Path getBuiltinArchivePath(String pkgPath, FunctionDetails.ComponentType componentType) { + String type = pkgPath.replaceFirst("^builtin://", ""); + if (!FunctionDetails.ComponentType.FUNCTION.equals(componentType)) { + Connector connector = worker().getConnectorsManager().getConnector(type); + if (connector != null) { + return connector.getArchivePath(); + } + if (componentType != null) { + throw new IllegalStateException("Didn't find " + type + " in built-in connectors"); + } + } + FunctionArchive function = worker().getFunctionsManager().getFunction(type); + if (function != null) { + return function.getArchivePath(); + } + if (componentType != null) { + throw new IllegalStateException("Didn't find " + type + " in built-in functions"); + } + throw new IllegalStateException("Didn't find " + type + " in built-in connectors or functions"); + } + @Override public StreamingOutput downloadFunction( final String path, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {