From 6651bbbab5b33f09cdde83de048d8116b2835de6 Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Fri, 30 Sep 2022 22:05:35 +0200 Subject: [PATCH] [fix][functions] Fix the download of builtin Functions (#17877) --- .../worker/rest/api/ComponentImpl.java | 46 +++-- .../api/v3/FunctionApiV3ResourceTest.java | 171 +++++++++++------- 2 files changed, 136 insertions(+), 81 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 132641c8f01c7..7cd35352bd21f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -46,7 +46,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; -import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -102,7 +101,7 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.FunctionMetaDataUtils; import org.apache.pulsar.functions.utils.functions.FunctionArchive; -import org.apache.pulsar.functions.utils.functions.FunctionUtils; +import org.apache.pulsar.functions.utils.io.Connector; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; @@ -1473,10 +1472,18 @@ public StreamingOutput downloadFunction(String tenant, String namespace, String ? functionMetaData.getTransformFunctionPackageLocation().getPackagePath() : functionMetaData.getPackageLocation().getPackagePath(); - return getStreamingOutput(pkgPath); + FunctionDetails.ComponentType componentType = transformFunction + ? FunctionDetails.ComponentType.FUNCTION + : InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()); + + return getStreamingOutput(pkgPath, componentType); } private StreamingOutput getStreamingOutput(String pkgPath) { + return getStreamingOutput(pkgPath, null); + } + + private StreamingOutput getStreamingOutput(String pkgPath, FunctionDetails.ComponentType componentType) { return output -> { if (pkgPath.startsWith(Utils.HTTP)) { URL url = URI.create(pkgPath).toURL(); @@ -1489,15 +1496,7 @@ private StreamingOutput getStreamingOutput(String pkgPath) { Files.copy(file.toPath(), output); } else if (pkgPath.startsWith(Utils.BUILTIN) && !worker().getWorkerConfig().getUploadBuiltinSinksSources()) { - String sType = pkgPath.replaceFirst("^builtin://", ""); - final String connectorsDir = worker().getWorkerConfig().getConnectorsDirectory(); - log.warn("Processing package {} ; looking at the dir {}", pkgPath, connectorsDir); - TreeMap sinksOrSources = - FunctionUtils.searchForFunctions(connectorsDir, true); - Path narPath = sinksOrSources.get(sType).getArchivePath(); - if (narPath == null) { - throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir); - } + Path narPath = getBuiltinArchivePath(pkgPath, componentType); log.info("Loading {} from {}", pkgPath, narPath); try (InputStream in = new FileInputStream(narPath.toString())) { IOUtils.copy(in, output, 1024); @@ -1511,7 +1510,7 @@ private StreamingOutput getStreamingOutput(String pkgPath) { output.flush(); } } catch (Exception e) { - log.error("Failed download package {} from packageMangment Service", pkgPath, e); + log.error("Failed download package {} from packageManagement Service", pkgPath, e); } } else { @@ -1520,6 +1519,27 @@ private StreamingOutput getStreamingOutput(String pkgPath) { }; } + private Path getBuiltinArchivePath(String pkgPath, FunctionDetails.ComponentType componentType) { + String type = pkgPath.replaceFirst("^builtin://", ""); + if (!FunctionDetails.ComponentType.FUNCTION.equals(componentType)) { + Connector connector = worker().getConnectorsManager().getConnector(type); + if (connector != null) { + return connector.getArchivePath(); + } + if (componentType != null) { + throw new IllegalStateException("Didn't find " + type + " in built-in connectors"); + } + } + FunctionArchive function = worker().getFunctionsManager().getFunction(type); + if (function != null) { + return function.getArchivePath(); + } + if (componentType != null) { + throw new IllegalStateException("Didn't find " + type + " in built-in functions"); + } + throw new IllegalStateException("Didn't find " + type + " in built-in connectors or functions"); + } + @Override public StreamingOutput downloadFunction( final String path, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java index 8d19869b4738a..fb09a4026a59c 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.functions.worker.rest.api.v3; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; @@ -29,7 +28,6 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.fail; import com.google.common.collect.Lists; import java.io.File; import java.io.FileInputStream; @@ -44,7 +42,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.TreeMap; import java.util.UUID; import java.util.function.Consumer; import javax.ws.rs.core.Response; @@ -78,7 +75,8 @@ import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.functions.FunctionArchive; -import org.apache.pulsar.functions.utils.functions.FunctionUtils; +import org.apache.pulsar.functions.utils.io.Connector; +import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.FunctionsManager; @@ -1604,20 +1602,13 @@ public void testDownloadFunctionHttpUrl() throws Exception { String jarHttpUrl = "https://repo1.maven.org/maven2/org/apache/pulsar/pulsar-common/2.4.2/pulsar-common-2.4.2.jar"; String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - PulsarWorkerService worker = mock(PulsarWorkerService.class); - doReturn(true).when(worker).isInitialized(); - WorkerConfig config = mock(WorkerConfig.class); - when(config.isAuthorizationEnabled()).thenReturn(false); - when(worker.getWorkerConfig()).thenReturn(config); - FunctionsImpl function = new FunctionsImpl(() -> worker); - StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl, null, null); + + StreamingOutput streamOutput = resource.downloadFunction(jarHttpUrl, null, null); File pkgFile = new File(testDir, UUID.randomUUID().toString()); OutputStream output = new FileOutputStream(pkgFile); streamOutput.write(output); Assert.assertTrue(pkgFile.exists()); - if (pkgFile.exists()) { - pkgFile.delete(); - } + pkgFile.delete(); } @Test @@ -1626,53 +1617,61 @@ public void testDownloadFunctionFile() throws Exception { File file = Paths.get(fileUrl.toURI()).toFile(); String fileLocation = file.getAbsolutePath().replace('\\', '/'); String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - PulsarWorkerService worker = mock(PulsarWorkerService.class); - doReturn(true).when(worker).isInitialized(); - WorkerConfig config = mock(WorkerConfig.class); - when(config.isAuthorizationEnabled()).thenReturn(false); - when(worker.getWorkerConfig()).thenReturn(config); - FunctionsImpl function = new FunctionsImpl(() -> worker); - StreamingOutput streamOutput = function.downloadFunction("file:///" + fileLocation, null, null); + + StreamingOutput streamOutput = resource.downloadFunction("file:///" + fileLocation, null, null); File pkgFile = new File(testDir, UUID.randomUUID().toString()); OutputStream output = new FileOutputStream(pkgFile); streamOutput.write(output); Assert.assertTrue(pkgFile.exists()); - if (pkgFile.exists()) { - pkgFile.delete(); - } + Assert.assertEquals(file.length(), pkgFile.length()); + pkgFile.delete(); } @Test - public void testDownloadFunctionBuiltin() throws Exception { - mockStatic(WorkerUtils.class, ctx -> { - }); - + public void testDownloadFunctionBuiltinConnector() throws Exception { URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); File file = Paths.get(fileUrl.toURI()).toFile(); String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - PulsarWorkerService worker = mock(PulsarWorkerService.class); - doReturn(true).when(worker).isInitialized(); + WorkerConfig config = new WorkerConfig() + .setUploadBuiltinSinksSources(false); + when(mockedWorkerService.getWorkerConfig()).thenReturn(config); - WorkerConfig config = mock(WorkerConfig.class); - when(config.isAuthorizationEnabled()).thenReturn(false); - when(config.getUploadBuiltinSinksSources()).thenReturn(false); - when(config.getConnectorsDirectory()).thenReturn("/connectors"); + Connector connector = Connector.builder().archivePath(file.toPath()).build(); + ConnectorsManager connectorsManager = mock(ConnectorsManager.class); + when(connectorsManager.getConnector("cassandra")).thenReturn(connector); + when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager); - when(worker.getDlogNamespace()).thenReturn(mock(Namespace.class)); - when(worker.getWorkerConfig()).thenReturn(config); - FunctionsImpl function = new FunctionsImpl(() -> worker); + StreamingOutput streamOutput = resource.downloadFunction("builtin://cassandra", null, null); - TreeMap functions = new TreeMap<>(); - FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build(); - functions.put("cassandra", functionArchive); + File pkgFile = new File(testDir, UUID.randomUUID().toString()); + OutputStream output = new FileOutputStream(pkgFile); + streamOutput.write(output); + output.flush(); + output.close(); + Assert.assertTrue(pkgFile.exists()); + Assert.assertTrue(pkgFile.exists()); + Assert.assertEquals(file.length(), pkgFile.length()); + pkgFile.delete(); + } - mockStatic(FunctionUtils.class, ctx -> { - ctx.when(() -> FunctionUtils.searchForFunctions(anyString(), anyBoolean())).thenReturn(functions); + @Test + public void testDownloadFunctionBuiltinFunction() throws Exception { + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - }); + WorkerConfig config = new WorkerConfig() + .setUploadBuiltinSinksSources(false); + when(mockedWorkerService.getWorkerConfig()).thenReturn(config); + + FunctionsManager functionsManager = mock(FunctionsManager.class); + FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build(); + when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive); + when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class)); + when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager); - StreamingOutput streamOutput = function.downloadFunction("builtin://cassandra", null, null); + StreamingOutput streamOutput = resource.downloadFunction("builtin://exclamation", null, null); File pkgFile = new File(testDir, UUID.randomUUID().toString()); OutputStream output = new FileOutputStream(pkgFile); @@ -1680,71 +1679,107 @@ public void testDownloadFunctionBuiltin() throws Exception { output.flush(); output.close(); Assert.assertTrue(pkgFile.exists()); - if (pkgFile.exists()) { - Assert.assertEquals(file.length(), pkgFile.length()); - pkgFile.delete(); - } else { - fail("expected file"); - } + Assert.assertEquals(file.length(), pkgFile.length()); + pkgFile.delete(); } @Test - public void testDownloadFunctionByName() throws Exception { + public void testDownloadFunctionBuiltinConnectorByName() throws Exception { URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); File file = Paths.get(fileUrl.toURI()).toFile(); - String fileLocation = file.getAbsolutePath().replace('\\', '/'); String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - doReturn(true).when(mockedWorkerService).isInitialized(); - WorkerConfig config = mock(WorkerConfig.class); - when(config.isAuthorizationEnabled()).thenReturn(false); + WorkerConfig config = new WorkerConfig() + .setUploadBuiltinSinksSources(false); when(mockedWorkerService.getWorkerConfig()).thenReturn(config); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); FunctionMetaData metaData = FunctionMetaData.newBuilder() - .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("file:///" + fileLocation)) + .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://cassandra")) .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid")) + .setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.SINK)) .build(); when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData); + Connector connector = Connector.builder().archivePath(file.toPath()).build(); + ConnectorsManager connectorsManager = mock(ConnectorsManager.class); + when(connectorsManager.getConnector("cassandra")).thenReturn(connector); + when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager); + StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, false); File pkgFile = new File(testDir, UUID.randomUUID().toString()); OutputStream output = new FileOutputStream(pkgFile); streamOutput.write(output); Assert.assertTrue(pkgFile.exists()); - if (pkgFile.exists()) { - pkgFile.delete(); - } + Assert.assertEquals(file.length(), pkgFile.length()); + pkgFile.delete(); } @Test - public void testDownloadTransformFunctionByName() throws Exception { + public void testDownloadFunctionBuiltinFunctionByName() throws Exception { URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); File file = Paths.get(fileUrl.toURI()).toFile(); - String fileLocation = file.getAbsolutePath().replace('\\', '/'); String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - doReturn(true).when(mockedWorkerService).isInitialized(); - WorkerConfig config = mock(WorkerConfig.class); - when(config.isAuthorizationEnabled()).thenReturn(false); + WorkerConfig config = new WorkerConfig() + .setUploadBuiltinSinksSources(false); when(mockedWorkerService.getWorkerConfig()).thenReturn(config); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); + FunctionMetaData metaData = FunctionMetaData.newBuilder() + .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://exclamation")) + .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid")) + .setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.FUNCTION)) + .build(); + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData); + + FunctionsManager functionsManager = mock(FunctionsManager.class); + FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build(); + when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive); + when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class)); + when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager); + + StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, false); + File pkgFile = new File(testDir, UUID.randomUUID().toString()); + OutputStream output = new FileOutputStream(pkgFile); + streamOutput.write(output); + Assert.assertTrue(pkgFile.exists()); + Assert.assertEquals(file.length(), pkgFile.length()); + pkgFile.delete(); + } + + @Test + public void testDownloadTransformFunctionByName() throws Exception { + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + + WorkerConfig workerConfig = new WorkerConfig() + .setUploadBuiltinSinksSources(false); + when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); + FunctionMetaData metaData = FunctionMetaData.newBuilder() .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid")) .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder() - .setPackagePath("file:///" + fileLocation)) + .setPackagePath("builtin://exclamation")) .build(); when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData); + FunctionsManager functionsManager = mock(FunctionsManager.class); + FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build(); + when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive); + when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class)); + when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager); + StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, true); File pkgFile = new File(testDir, UUID.randomUUID().toString()); OutputStream output = new FileOutputStream(pkgFile); streamOutput.write(output); Assert.assertTrue(pkgFile.exists()); - if (pkgFile.exists()) { - pkgFile.delete(); - } + Assert.assertEquals(file.length(), pkgFile.length()); + pkgFile.delete(); }