Skip to content

Commit

Permalink
[fix][functions] Fix the download of builtin Functions (apache#17877)
Browse files Browse the repository at this point in the history
(cherry picked from commit 603b5f9)
  • Loading branch information
cbornet authored and nicoloboschi committed Sep 29, 2022
1 parent 4723dfe commit 00fe3ed
Showing 1 changed file with 48 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,21 @@
import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
Expand Down Expand Up @@ -82,7 +96,7 @@
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
Expand Down Expand Up @@ -1319,10 +1333,18 @@ public StreamingOutput downloadFunction(String tenant, String namespace, String
? functionMetaData.getTransformFunctionPackageLocation().getPackagePath()
: functionMetaData.getPackageLocation().getPackagePath();

return getStreamingOutput(pkgPath);
FunctionDetails.ComponentType componentType = transformFunction
? FunctionDetails.ComponentType.FUNCTION
: InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails());

return getStreamingOutput(pkgPath, componentType);
}

private StreamingOutput getStreamingOutput(String pkgPath) {
return getStreamingOutput(pkgPath, null);
}

private StreamingOutput getStreamingOutput(String pkgPath, FunctionDetails.ComponentType componentType) {
return output -> {
if (pkgPath.startsWith(Utils.HTTP)) {
URL url = URI.create(pkgPath).toURL();
Expand All @@ -1333,16 +1355,9 @@ private StreamingOutput getStreamingOutput(String pkgPath) {
URI url = URI.create(pkgPath);
File file = new File(url.getPath());
Files.copy(file.toPath(), output);
} else if(pkgPath.startsWith(Utils.BUILTIN) && !worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
String sType = pkgPath.replaceFirst("^builtin://", "");
final String connectorsDir = worker().getWorkerConfig().getConnectorsDirectory();
log.warn("Processing package {} ; looking at the dir {}", pkgPath, connectorsDir);
TreeMap<String, FunctionArchive> sinksOrSources =
FunctionUtils.searchForFunctions(connectorsDir, true);
Path narPath = sinksOrSources.get(sType).getArchivePath();
if (narPath == null) {
throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir);
}
} else if (pkgPath.startsWith(Utils.BUILTIN)
&& !worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
Path narPath = getBuiltinArchivePath(pkgPath, componentType);
log.info("Loading {} from {}", pkgPath, narPath);
try (InputStream in = new FileInputStream(narPath.toString())) {
IOUtils.copy(in, output, 1024);
Expand All @@ -1354,6 +1369,27 @@ private StreamingOutput getStreamingOutput(String pkgPath) {
};
}

private Path getBuiltinArchivePath(String pkgPath, FunctionDetails.ComponentType componentType) {
String type = pkgPath.replaceFirst("^builtin://", "");
if (!FunctionDetails.ComponentType.FUNCTION.equals(componentType)) {
Connector connector = worker().getConnectorsManager().getConnector(type);
if (connector != null) {
return connector.getArchivePath();
}
if (componentType != null) {
throw new IllegalStateException("Didn't find " + type + " in built-in connectors");
}
}
FunctionArchive function = worker().getFunctionsManager().getFunction(type);
if (function != null) {
return function.getArchivePath();
}
if (componentType != null) {
throw new IllegalStateException("Didn't find " + type + " in built-in functions");
}
throw new IllegalStateException("Didn't find " + type + " in built-in connectors or functions");
}

@Override
public StreamingOutput downloadFunction(final String path, String clientRole, AuthenticationDataHttps clientAuthenticationDataHttps) {

Expand Down

0 comments on commit 00fe3ed

Please sign in to comment.