Skip to content

Commit

Permalink
[fix][functions] Fix the download of builtin Functions (#17877)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored Sep 30, 2022
1 parent c0b3039 commit 6651bbb
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -102,7 +101,7 @@
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
Expand Down Expand Up @@ -1473,10 +1472,18 @@ public StreamingOutput downloadFunction(String tenant, String namespace, String
? functionMetaData.getTransformFunctionPackageLocation().getPackagePath()
: functionMetaData.getPackageLocation().getPackagePath();

return getStreamingOutput(pkgPath);
FunctionDetails.ComponentType componentType = transformFunction
? FunctionDetails.ComponentType.FUNCTION
: InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails());

return getStreamingOutput(pkgPath, componentType);
}

private StreamingOutput getStreamingOutput(String pkgPath) {
return getStreamingOutput(pkgPath, null);
}

private StreamingOutput getStreamingOutput(String pkgPath, FunctionDetails.ComponentType componentType) {
return output -> {
if (pkgPath.startsWith(Utils.HTTP)) {
URL url = URI.create(pkgPath).toURL();
Expand All @@ -1489,15 +1496,7 @@ private StreamingOutput getStreamingOutput(String pkgPath) {
Files.copy(file.toPath(), output);
} else if (pkgPath.startsWith(Utils.BUILTIN)
&& !worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
String sType = pkgPath.replaceFirst("^builtin://", "");
final String connectorsDir = worker().getWorkerConfig().getConnectorsDirectory();
log.warn("Processing package {} ; looking at the dir {}", pkgPath, connectorsDir);
TreeMap<String, FunctionArchive> sinksOrSources =
FunctionUtils.searchForFunctions(connectorsDir, true);
Path narPath = sinksOrSources.get(sType).getArchivePath();
if (narPath == null) {
throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir);
}
Path narPath = getBuiltinArchivePath(pkgPath, componentType);
log.info("Loading {} from {}", pkgPath, narPath);
try (InputStream in = new FileInputStream(narPath.toString())) {
IOUtils.copy(in, output, 1024);
Expand All @@ -1511,7 +1510,7 @@ private StreamingOutput getStreamingOutput(String pkgPath) {
output.flush();
}
} catch (Exception e) {
log.error("Failed download package {} from packageMangment Service", pkgPath, e);
log.error("Failed download package {} from packageManagement Service", pkgPath, e);

}
} else {
Expand All @@ -1520,6 +1519,27 @@ private StreamingOutput getStreamingOutput(String pkgPath) {
};
}

private Path getBuiltinArchivePath(String pkgPath, FunctionDetails.ComponentType componentType) {
String type = pkgPath.replaceFirst("^builtin://", "");
if (!FunctionDetails.ComponentType.FUNCTION.equals(componentType)) {
Connector connector = worker().getConnectorsManager().getConnector(type);
if (connector != null) {
return connector.getArchivePath();
}
if (componentType != null) {
throw new IllegalStateException("Didn't find " + type + " in built-in connectors");
}
}
FunctionArchive function = worker().getFunctionsManager().getFunction(type);
if (function != null) {
return function.getArchivePath();
}
if (componentType != null) {
throw new IllegalStateException("Didn't find " + type + " in built-in functions");
}
throw new IllegalStateException("Didn't find " + type + " in built-in connectors or functions");
}

@Override
public StreamingOutput downloadFunction(
final String path, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.functions.worker.rest.api.v3;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
Expand All @@ -29,7 +28,6 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
Expand All @@ -44,7 +42,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Consumer;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -78,7 +75,8 @@
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
Expand Down Expand Up @@ -1604,20 +1602,13 @@ public void testDownloadFunctionHttpUrl() throws Exception {
String jarHttpUrl =
"https://repo1.maven.org/maven2/org/apache/pulsar/pulsar-common/2.4.2/pulsar-common-2.4.2.jar";
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
PulsarWorkerService worker = mock(PulsarWorkerService.class);
doReturn(true).when(worker).isInitialized();
WorkerConfig config = mock(WorkerConfig.class);
when(config.isAuthorizationEnabled()).thenReturn(false);
when(worker.getWorkerConfig()).thenReturn(config);
FunctionsImpl function = new FunctionsImpl(() -> worker);
StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl, null, null);

StreamingOutput streamOutput = resource.downloadFunction(jarHttpUrl, null, null);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
if (pkgFile.exists()) {
pkgFile.delete();
}
pkgFile.delete();
}

@Test
Expand All @@ -1626,125 +1617,169 @@ public void testDownloadFunctionFile() throws Exception {
File file = Paths.get(fileUrl.toURI()).toFile();
String fileLocation = file.getAbsolutePath().replace('\\', '/');
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
PulsarWorkerService worker = mock(PulsarWorkerService.class);
doReturn(true).when(worker).isInitialized();
WorkerConfig config = mock(WorkerConfig.class);
when(config.isAuthorizationEnabled()).thenReturn(false);
when(worker.getWorkerConfig()).thenReturn(config);
FunctionsImpl function = new FunctionsImpl(() -> worker);
StreamingOutput streamOutput = function.downloadFunction("file:///" + fileLocation, null, null);

StreamingOutput streamOutput = resource.downloadFunction("file:///" + fileLocation, null, null);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
if (pkgFile.exists()) {
pkgFile.delete();
}
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
}

@Test
public void testDownloadFunctionBuiltin() throws Exception {
mockStatic(WorkerUtils.class, ctx -> {
});

public void testDownloadFunctionBuiltinConnector() throws Exception {
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();

PulsarWorkerService worker = mock(PulsarWorkerService.class);
doReturn(true).when(worker).isInitialized();
WorkerConfig config = new WorkerConfig()
.setUploadBuiltinSinksSources(false);
when(mockedWorkerService.getWorkerConfig()).thenReturn(config);

WorkerConfig config = mock(WorkerConfig.class);
when(config.isAuthorizationEnabled()).thenReturn(false);
when(config.getUploadBuiltinSinksSources()).thenReturn(false);
when(config.getConnectorsDirectory()).thenReturn("/connectors");
Connector connector = Connector.builder().archivePath(file.toPath()).build();
ConnectorsManager connectorsManager = mock(ConnectorsManager.class);
when(connectorsManager.getConnector("cassandra")).thenReturn(connector);
when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager);

when(worker.getDlogNamespace()).thenReturn(mock(Namespace.class));
when(worker.getWorkerConfig()).thenReturn(config);
FunctionsImpl function = new FunctionsImpl(() -> worker);
StreamingOutput streamOutput = resource.downloadFunction("builtin://cassandra", null, null);

TreeMap<String, FunctionArchive> functions = new TreeMap<>();
FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
functions.put("cassandra", functionArchive);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
output.flush();
output.close();
Assert.assertTrue(pkgFile.exists());
Assert.assertTrue(pkgFile.exists());
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
}

mockStatic(FunctionUtils.class, ctx -> {
ctx.when(() -> FunctionUtils.searchForFunctions(anyString(), anyBoolean())).thenReturn(functions);
@Test
public void testDownloadFunctionBuiltinFunction() throws Exception {
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();

});
WorkerConfig config = new WorkerConfig()
.setUploadBuiltinSinksSources(false);
when(mockedWorkerService.getWorkerConfig()).thenReturn(config);

FunctionsManager functionsManager = mock(FunctionsManager.class);
FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive);
when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class));
when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);

StreamingOutput streamOutput = function.downloadFunction("builtin://cassandra", null, null);
StreamingOutput streamOutput = resource.downloadFunction("builtin://exclamation", null, null);

File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
output.flush();
output.close();
Assert.assertTrue(pkgFile.exists());
if (pkgFile.exists()) {
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
} else {
fail("expected file");
}
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
}

@Test
public void testDownloadFunctionByName() throws Exception {
public void testDownloadFunctionBuiltinConnectorByName() throws Exception {
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String fileLocation = file.getAbsolutePath().replace('\\', '/');
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
doReturn(true).when(mockedWorkerService).isInitialized();
WorkerConfig config = mock(WorkerConfig.class);
when(config.isAuthorizationEnabled()).thenReturn(false);
WorkerConfig config = new WorkerConfig()
.setUploadBuiltinSinksSources(false);
when(mockedWorkerService.getWorkerConfig()).thenReturn(config);

when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);

FunctionMetaData metaData = FunctionMetaData.newBuilder()
.setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("file:///" + fileLocation))
.setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://cassandra"))
.setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid"))
.setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.SINK))
.build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);

Connector connector = Connector.builder().archivePath(file.toPath()).build();
ConnectorsManager connectorsManager = mock(ConnectorsManager.class);
when(connectorsManager.getConnector("cassandra")).thenReturn(connector);
when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager);

StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, false);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
if (pkgFile.exists()) {
pkgFile.delete();
}
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
}

@Test
public void testDownloadTransformFunctionByName() throws Exception {
public void testDownloadFunctionBuiltinFunctionByName() throws Exception {
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String fileLocation = file.getAbsolutePath().replace('\\', '/');
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
doReturn(true).when(mockedWorkerService).isInitialized();
WorkerConfig config = mock(WorkerConfig.class);
when(config.isAuthorizationEnabled()).thenReturn(false);
WorkerConfig config = new WorkerConfig()
.setUploadBuiltinSinksSources(false);
when(mockedWorkerService.getWorkerConfig()).thenReturn(config);

when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);

FunctionMetaData metaData = FunctionMetaData.newBuilder()
.setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://exclamation"))
.setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid"))
.setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.FUNCTION))
.build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);

FunctionsManager functionsManager = mock(FunctionsManager.class);
FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive);
when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class));
when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);

StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, false);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
}

@Test
public void testDownloadTransformFunctionByName() throws Exception {
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();

WorkerConfig workerConfig = new WorkerConfig()
.setUploadBuiltinSinksSources(false);
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);

when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);

FunctionMetaData metaData = FunctionMetaData.newBuilder()
.setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid"))
.setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder()
.setPackagePath("file:///" + fileLocation))
.setPackagePath("builtin://exclamation"))
.build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);

FunctionsManager functionsManager = mock(FunctionsManager.class);
FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive);
when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class));
when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);

StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, true);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
if (pkgFile.exists()) {
pkgFile.delete();
}
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
}


Expand Down

0 comments on commit 6651bbb

Please sign in to comment.