diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 735f08b665dd2..80f0cfafccd76 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -35,7 +35,21 @@ import java.io.FileInputStream; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Base64; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; +import javax.ws.rs.core.UriBuilder; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.api.StorageClient; import org.apache.bookkeeper.api.kv.Table; @@ -82,7 +96,7 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.FunctionMetaDataUtils; import org.apache.pulsar.functions.utils.functions.FunctionArchive; -import org.apache.pulsar.functions.utils.functions.FunctionUtils; +import org.apache.pulsar.functions.utils.io.Connector; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; @@ -1319,10 +1333,18 @@ public StreamingOutput downloadFunction(String tenant, String namespace, String ? functionMetaData.getTransformFunctionPackageLocation().getPackagePath() : functionMetaData.getPackageLocation().getPackagePath(); - return getStreamingOutput(pkgPath); + FunctionDetails.ComponentType componentType = transformFunction + ? FunctionDetails.ComponentType.FUNCTION + : InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()); + + return getStreamingOutput(pkgPath, componentType); } private StreamingOutput getStreamingOutput(String pkgPath) { + return getStreamingOutput(pkgPath, null); + } + + private StreamingOutput getStreamingOutput(String pkgPath, FunctionDetails.ComponentType componentType) { return output -> { if (pkgPath.startsWith(Utils.HTTP)) { URL url = URI.create(pkgPath).toURL(); @@ -1333,16 +1355,9 @@ private StreamingOutput getStreamingOutput(String pkgPath) { URI url = URI.create(pkgPath); File file = new File(url.getPath()); Files.copy(file.toPath(), output); - } else if(pkgPath.startsWith(Utils.BUILTIN) && !worker().getWorkerConfig().getUploadBuiltinSinksSources()) { - String sType = pkgPath.replaceFirst("^builtin://", ""); - final String connectorsDir = worker().getWorkerConfig().getConnectorsDirectory(); - log.warn("Processing package {} ; looking at the dir {}", pkgPath, connectorsDir); - TreeMap sinksOrSources = - FunctionUtils.searchForFunctions(connectorsDir, true); - Path narPath = sinksOrSources.get(sType).getArchivePath(); - if (narPath == null) { - throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir); - } + } else if (pkgPath.startsWith(Utils.BUILTIN) + && !worker().getWorkerConfig().getUploadBuiltinSinksSources()) { + Path narPath = getBuiltinArchivePath(pkgPath, componentType); log.info("Loading {} from {}", pkgPath, narPath); try (InputStream in = new FileInputStream(narPath.toString())) { IOUtils.copy(in, output, 1024); @@ -1354,6 +1369,27 @@ private StreamingOutput getStreamingOutput(String pkgPath) { }; } + private Path getBuiltinArchivePath(String pkgPath, FunctionDetails.ComponentType componentType) { + String type = pkgPath.replaceFirst("^builtin://", ""); + if (!FunctionDetails.ComponentType.FUNCTION.equals(componentType)) { + Connector connector = worker().getConnectorsManager().getConnector(type); + if (connector != null) { + return connector.getArchivePath(); + } + if (componentType != null) { + throw new IllegalStateException("Didn't find " + type + " in built-in connectors"); + } + } + FunctionArchive function = worker().getFunctionsManager().getFunction(type); + if (function != null) { + return function.getArchivePath(); + } + if (componentType != null) { + throw new IllegalStateException("Didn't find " + type + " in built-in functions"); + } + throw new IllegalStateException("Didn't find " + type + " in built-in connectors or functions"); + } + @Override public StreamingOutput downloadFunction(final String path, String clientRole, AuthenticationDataHttps clientAuthenticationDataHttps) {