diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 239e1d7b0fdd5b..4829289aee21f8 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -47,7 +47,6 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; -import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -102,7 +101,7 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.FunctionMetaDataUtils; import org.apache.pulsar.functions.utils.functions.FunctionArchive; -import org.apache.pulsar.functions.utils.functions.FunctionUtils; +import org.apache.pulsar.functions.utils.io.Connector; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; @@ -1436,14 +1435,22 @@ public StreamingOutput downloadFunction(String tenant, String namespace, String String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); } - String pkgPath = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName) - .getPackageLocation().getPackagePath(); + FunctionMetaData functionMetaData = + functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); + String pkgPath = functionMetaData.getPackageLocation().getPackagePath(); + + FunctionDetails.ComponentType componentType = + InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()); - return getStreamingOutput(pkgPath); + return getStreamingOutput(pkgPath, componentType); } private StreamingOutput getStreamingOutput(String pkgPath) { - final StreamingOutput streamingOutput = output -> { + return getStreamingOutput(pkgPath, null); + } + + private StreamingOutput getStreamingOutput(String pkgPath, FunctionDetails.ComponentType componentType) { + return output -> { if (pkgPath.startsWith(Utils.HTTP)) { URL url = URI.create(pkgPath).toURL(); try (InputStream inputStream = url.openStream()) { @@ -1455,15 +1462,7 @@ private StreamingOutput getStreamingOutput(String pkgPath) { Files.copy(file.toPath(), output); } else if (pkgPath.startsWith(Utils.BUILTIN) && !worker().getWorkerConfig().getUploadBuiltinSinksSources()) { - String sType = pkgPath.replaceFirst("^builtin://", ""); - final String connectorsDir = worker().getWorkerConfig().getConnectorsDirectory(); - log.warn("Processing package {} ; looking at the dir {}", pkgPath, connectorsDir); - TreeMap sinksOrSources = - FunctionUtils.searchForFunctions(connectorsDir, true); - Path narPath = sinksOrSources.get(sType).getArchivePath(); - if (narPath == null) { - throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir); - } + Path narPath = getBuiltinArchivePath(pkgPath, componentType); log.info("Loading {} from {}", pkgPath, narPath); try (InputStream in = new FileInputStream(narPath.toString())) { IOUtils.copy(in, output, 1024); @@ -1477,14 +1476,34 @@ private StreamingOutput getStreamingOutput(String pkgPath) { output.flush(); } } catch (Exception e) { - log.error("Failed download package {} from packageMangment Service", pkgPath, e); + log.error("Failed download package {} from packageManagement Service", pkgPath, e); } } else { WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, pkgPath); } }; - return streamingOutput; + } + + private Path getBuiltinArchivePath(String pkgPath, FunctionDetails.ComponentType componentType) { + String type = pkgPath.replaceFirst("^builtin://", ""); + if (!FunctionDetails.ComponentType.FUNCTION.equals(componentType)) { + Connector connector = worker().getConnectorsManager().getConnector(type); + if (connector != null) { + return connector.getArchivePath(); + } + if (componentType != null) { + throw new IllegalStateException("Didn't find " + type + " in built-in connectors"); + } + } + FunctionArchive function = worker().getFunctionsManager().getFunction(type); + if (function != null) { + return function.getArchivePath(); + } + if (componentType != null) { + throw new IllegalStateException("Didn't find " + type + " in built-in functions"); + } + throw new IllegalStateException("Didn't find " + type + " in built-in connectors or functions"); } @Override diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java index a868d6325064e1..ff76413bd8daca 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.functions.worker.rest.api.v3; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; @@ -29,7 +28,6 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.fail; import com.google.common.collect.Lists; import java.io.File; import java.io.FileInputStream; @@ -44,7 +42,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.TreeMap; import java.util.UUID; import java.util.function.Consumer; import javax.ws.rs.core.Response; @@ -78,7 +75,8 @@ import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.functions.FunctionArchive; -import org.apache.pulsar.functions.utils.functions.FunctionUtils; +import org.apache.pulsar.functions.utils.io.Connector; +import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.FunctionsManager; @@ -1604,20 +1602,13 @@ public void testDownloadFunctionHttpUrl() throws Exception { String jarHttpUrl = "https://repo1.maven.org/maven2/org/apache/pulsar/pulsar-common/2.4.2/pulsar-common-2.4.2.jar"; String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - PulsarWorkerService worker = mock(PulsarWorkerService.class); - doReturn(true).when(worker).isInitialized(); - WorkerConfig config = mock(WorkerConfig.class); - when(config.isAuthorizationEnabled()).thenReturn(false); - when(worker.getWorkerConfig()).thenReturn(config); - FunctionsImpl function = new FunctionsImpl(() -> worker); - StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl, null, null); + + StreamingOutput streamOutput = resource.downloadFunction(jarHttpUrl, null, null); File pkgFile = new File(testDir, UUID.randomUUID().toString()); OutputStream output = new FileOutputStream(pkgFile); streamOutput.write(output); Assert.assertTrue(pkgFile.exists()); - if (pkgFile.exists()) { - pkgFile.delete(); - } + pkgFile.delete(); } @Test @@ -1626,53 +1617,61 @@ public void testDownloadFunctionFile() throws Exception { File file = Paths.get(fileUrl.toURI()).toFile(); String fileLocation = file.getAbsolutePath().replace('\\', '/'); String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - PulsarWorkerService worker = mock(PulsarWorkerService.class); - doReturn(true).when(worker).isInitialized(); - WorkerConfig config = mock(WorkerConfig.class); - when(config.isAuthorizationEnabled()).thenReturn(false); - when(worker.getWorkerConfig()).thenReturn(config); - FunctionsImpl function = new FunctionsImpl(() -> worker); - StreamingOutput streamOutput = function.downloadFunction("file:///" + fileLocation, null, null); + + StreamingOutput streamOutput = resource.downloadFunction("file:///" + fileLocation, null, null); File pkgFile = new File(testDir, UUID.randomUUID().toString()); OutputStream output = new FileOutputStream(pkgFile); streamOutput.write(output); Assert.assertTrue(pkgFile.exists()); - if (pkgFile.exists()) { - pkgFile.delete(); - } + Assert.assertEquals(file.length(), pkgFile.length()); + pkgFile.delete(); } @Test - public void testDownloadFunctionBuiltin() throws Exception { - mockStatic(WorkerUtils.class, ctx -> { - }); - + public void testDownloadFunctionBuiltinConnector() throws Exception { URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); File file = Paths.get(fileUrl.toURI()).toFile(); String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - PulsarWorkerService worker = mock(PulsarWorkerService.class); - doReturn(true).when(worker).isInitialized(); + WorkerConfig config = new WorkerConfig() + .setUploadBuiltinSinksSources(false); + when(mockedWorkerService.getWorkerConfig()).thenReturn(config); - WorkerConfig config = mock(WorkerConfig.class); - when(config.isAuthorizationEnabled()).thenReturn(false); - when(config.getUploadBuiltinSinksSources()).thenReturn(false); - when(config.getConnectorsDirectory()).thenReturn("/connectors"); + Connector connector = Connector.builder().archivePath(file.toPath()).build(); + ConnectorsManager connectorsManager = mock(ConnectorsManager.class); + when(connectorsManager.getConnector("cassandra")).thenReturn(connector); + when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager); - when(worker.getDlogNamespace()).thenReturn(mock(Namespace.class)); - when(worker.getWorkerConfig()).thenReturn(config); - FunctionsImpl function = new FunctionsImpl(() -> worker); + StreamingOutput streamOutput = resource.downloadFunction("builtin://cassandra", null, null); - TreeMap functions = new TreeMap<>(); - FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build(); - functions.put("cassandra", functionArchive); + File pkgFile = new File(testDir, UUID.randomUUID().toString()); + OutputStream output = new FileOutputStream(pkgFile); + streamOutput.write(output); + output.flush(); + output.close(); + Assert.assertTrue(pkgFile.exists()); + Assert.assertTrue(pkgFile.exists()); + Assert.assertEquals(file.length(), pkgFile.length()); + pkgFile.delete(); + } - mockStatic(FunctionUtils.class, ctx -> { - ctx.when(() -> FunctionUtils.searchForFunctions(anyString(), anyBoolean())).thenReturn(functions); + @Test + public void testDownloadFunctionBuiltinFunction() throws Exception { + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - }); + WorkerConfig config = new WorkerConfig() + .setUploadBuiltinSinksSources(false); + when(mockedWorkerService.getWorkerConfig()).thenReturn(config); - StreamingOutput streamOutput = function.downloadFunction("builtin://cassandra", null, null); + FunctionsManager functionsManager = mock(FunctionsManager.class); + FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build(); + when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive); + when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class)); + when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager); + + StreamingOutput streamOutput = resource.downloadFunction("builtin://exclamation", null, null); File pkgFile = new File(testDir, UUID.randomUUID().toString()); OutputStream output = new FileOutputStream(pkgFile); @@ -1680,12 +1679,107 @@ public void testDownloadFunctionBuiltin() throws Exception { output.flush(); output.close(); Assert.assertTrue(pkgFile.exists()); - if (pkgFile.exists()) { - Assert.assertEquals(file.length(), pkgFile.length()); - pkgFile.delete(); - } else { - fail("expected file"); - } + Assert.assertEquals(file.length(), pkgFile.length()); + pkgFile.delete(); + } + + @Test + public void testDownloadFunctionBuiltinConnectorByName() throws Exception { + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + WorkerConfig config = new WorkerConfig() + .setUploadBuiltinSinksSources(false); + when(mockedWorkerService.getWorkerConfig()).thenReturn(config); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); + + FunctionMetaData metaData = FunctionMetaData.newBuilder() + .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://cassandra")) + .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid")) + .setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.SINK)) + .build(); + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData); + + Connector connector = Connector.builder().archivePath(file.toPath()).build(); + ConnectorsManager connectorsManager = mock(ConnectorsManager.class); + when(connectorsManager.getConnector("cassandra")).thenReturn(connector); + when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager); + + StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null); + File pkgFile = new File(testDir, UUID.randomUUID().toString()); + OutputStream output = new FileOutputStream(pkgFile); + streamOutput.write(output); + Assert.assertTrue(pkgFile.exists()); + Assert.assertEquals(file.length(), pkgFile.length()); + pkgFile.delete(); + } + + @Test + public void testDownloadFunctionBuiltinFunctionByName() throws Exception { + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + WorkerConfig config = new WorkerConfig() + .setUploadBuiltinSinksSources(false); + when(mockedWorkerService.getWorkerConfig()).thenReturn(config); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); + + FunctionMetaData metaData = FunctionMetaData.newBuilder() + .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://exclamation")) + .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid")) + .setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.FUNCTION)) + .build(); + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData); + + FunctionsManager functionsManager = mock(FunctionsManager.class); + FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build(); + when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive); + when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class)); + when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager); + + StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null); + File pkgFile = new File(testDir, UUID.randomUUID().toString()); + OutputStream output = new FileOutputStream(pkgFile); + streamOutput.write(output); + Assert.assertTrue(pkgFile.exists()); + Assert.assertEquals(file.length(), pkgFile.length()); + pkgFile.delete(); + } + + @Test + public void testDownloadTransformFunctionByName() throws Exception { + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + + WorkerConfig workerConfig = new WorkerConfig() + .setUploadBuiltinSinksSources(false); + when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); + + FunctionMetaData metaData = FunctionMetaData.newBuilder() + .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid")) + .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder() + .setPackagePath("builtin://exclamation")) + .build(); + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData); + + FunctionsManager functionsManager = mock(FunctionsManager.class); + FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build(); + when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive); + when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class)); + when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager); + + StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null); + File pkgFile = new File(testDir, UUID.randomUUID().toString()); + OutputStream output = new FileOutputStream(pkgFile); + streamOutput.write(output); + Assert.assertTrue(pkgFile.exists()); + Assert.assertEquals(file.length(), pkgFile.length()); + pkgFile.delete(); } @Test