diff --git a/core/src/main/java/com/datastrato/graviton/Config.java b/core/src/main/java/com/datastrato/graviton/Config.java index e49d27bed09..cb297a30a44 100644 --- a/core/src/main/java/com/datastrato/graviton/Config.java +++ b/core/src/main/java/com/datastrato/graviton/Config.java @@ -5,13 +5,13 @@ package com.datastrato.graviton; import com.datastrato.graviton.config.ConfigEntry; +import com.datastrato.graviton.utils.MapUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; -import java.util.Collections; import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; @@ -128,16 +128,7 @@ public String getRawString(String key, String defaultValue) { * @return An unmodifiable map containing configuration entries with keys matching the prefix. */ public Map getConfigsWithPrefix(String prefix) { - Map configs = Maps.newHashMap(); - configMap.forEach( - (k, v) -> { - if (k.startsWith(prefix)) { - String newKey = k.substring(prefix.length()); - configs.put(newKey, v); - } - }); - - return Collections.unmodifiableMap(configs); + return MapUtils.getPrefixMap(configMap, prefix); } /** diff --git a/core/src/main/java/com/datastrato/graviton/GravitonEnv.java b/core/src/main/java/com/datastrato/graviton/GravitonEnv.java index 5c6524c1b49..75024e4d42a 100644 --- a/core/src/main/java/com/datastrato/graviton/GravitonEnv.java +++ b/core/src/main/java/com/datastrato/graviton/GravitonEnv.java @@ -4,6 +4,7 @@ */ package com.datastrato.graviton; +import com.datastrato.graviton.aux.AuxiliaryServiceManager; import com.datastrato.graviton.catalog.CatalogManager; import com.datastrato.graviton.catalog.CatalogOperationDispatcher; import com.datastrato.graviton.meta.MetalakeManager; @@ -34,6 +35,8 @@ public class GravitonEnv { private IdGenerator idGenerator; + private AuxiliaryServiceManager auxServiceManager; + private GravitonEnv() {} private static class InstanceHolder { @@ -77,6 +80,10 @@ public void initialize(Config config) { this.catalogManager = new CatalogManager(config, entityStore, idGenerator); this.catalogOperationDispatcher = new CatalogOperationDispatcher(catalogManager); + this.auxServiceManager = new AuxiliaryServiceManager(); + this.auxServiceManager.serviceInit( + config.getConfigsWithPrefix(AuxiliaryServiceManager.GRAVITON_AUX_SERVICE_PREFIX)); + LOG.info("Graviton Environment is initialized."); } @@ -144,6 +151,10 @@ public IdGenerator idGenerator() { return idGenerator; } + public void start() { + auxServiceManager.serviceStart(); + } + /** Shutdown the Graviton environment. */ public void shutdown() { LOG.info("Shutting down Graviton Environment..."); @@ -160,6 +171,14 @@ public void shutdown() { catalogManager.close(); } + if (auxServiceManager != null) { + try { + auxServiceManager.serviceStop(); + } catch (Exception e) { + LOG.warn("Failed to stop AuxServiceManager", e); + } + } + LOG.info("Graviton Environment is shut down."); } } diff --git a/core/src/main/java/com/datastrato/graviton/aux/AuxiliaryServiceManager.java b/core/src/main/java/com/datastrato/graviton/aux/AuxiliaryServiceManager.java new file mode 100644 index 00000000000..e1954e45ceb --- /dev/null +++ b/core/src/main/java/com/datastrato/graviton/aux/AuxiliaryServiceManager.java @@ -0,0 +1,201 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.graviton.aux; + +import com.datastrato.graviton.utils.IsolatedClassLoader; +import com.datastrato.graviton.utils.MapUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Streams; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * AuxiliaryServiceManager manage all GravitonAuxiliaryServices with isolated classloader provided + */ +public class AuxiliaryServiceManager { + private static final Logger LOG = LoggerFactory.getLogger(AuxiliaryServiceManager.class); + public static final String GRAVITON_AUX_SERVICE_PREFIX = "graviton.auxService."; + public static final String AUX_SERVICE_NAMES = "auxServiceNames"; + public static final String AUX_SERVICE_CLASSPATH = "auxServiceClasspath"; + + private static final Splitter splitter = Splitter.on(","); + private static final Joiner DOT = Joiner.on("."); + + private Map auxServices = new HashMap<>(); + private Map auxServiceClassLoaders = new HashMap<>(); + + private Exception firstException; + + private Class lookupAuxService( + String provider, ClassLoader cl) { + ServiceLoader loader = + ServiceLoader.load(GravitonAuxiliaryService.class, cl); + List> providers = + Streams.stream(loader.iterator()) + .filter(p -> p.shortName().equalsIgnoreCase(provider)) + .map(GravitonAuxiliaryService::getClass) + .collect(Collectors.toList()); + + if (providers.size() == 0) { + throw new IllegalArgumentException("No GravitonAuxiliaryService found for: " + provider); + } else if (providers.size() > 1) { + throw new IllegalArgumentException( + "Multiple GravitonAuxiliaryService found for: " + provider); + } else { + return Iterables.getOnlyElement(providers); + } + } + + @VisibleForTesting + public GravitonAuxiliaryService loadAuxService( + String auxServiceName, IsolatedClassLoader isolatedClassLoader) throws Exception { + return isolatedClassLoader.withClassLoader( + cl -> { + try { + Class providerClz = + lookupAuxService(auxServiceName, cl); + return providerClz.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @VisibleForTesting + public IsolatedClassLoader getIsolatedClassLoader(String classPath) { + return IsolatedClassLoader.buildClassLoader(classPath); + } + + @VisibleForTesting + static String getValidPath(String auxServiceName, String pathString) { + Preconditions.checkArgument( + StringUtils.isNoneBlank(pathString), + String.format( + "AuxService:%s, %s%s.%s is not set in configuration", + auxServiceName, GRAVITON_AUX_SERVICE_PREFIX, auxServiceName, AUX_SERVICE_CLASSPATH)); + + Path path = Paths.get(pathString); + if (Files.exists(path)) { + return path.toAbsolutePath().toString(); + } + + String gravitonHome = System.getenv("GRAVITON_HOME"); + if (!path.isAbsolute() && gravitonHome != null) { + Path newPath = Paths.get(gravitonHome, pathString); + if (Files.exists(newPath)) { + return newPath.toString(); + } + } + + throw new IllegalArgumentException( + String.format( + "AuxService:%s, classpath: %s not exists, gravitonHome:%s", + auxServiceName, pathString, gravitonHome)); + } + + private void registerAuxService(String auxServiceName, Map config) { + String classPath = config.get(AUX_SERVICE_CLASSPATH); + classPath = getValidPath(auxServiceName, classPath); + LOG.info("AuxService name:{}, config:{}, classpath:{}", auxServiceName, config, classPath); + + IsolatedClassLoader isolatedClassLoader = getIsolatedClassLoader(classPath); + try { + GravitonAuxiliaryService gravitonAuxiliaryService = + loadAuxService(auxServiceName, isolatedClassLoader); + auxServices.put(auxServiceName, gravitonAuxiliaryService); + auxServiceClassLoaders.put(auxServiceName, isolatedClassLoader); + } catch (Exception e) { + LOG.error("Failed to register auxService: {}", auxServiceName, e); + throw new RuntimeException(e); + } + LOG.info("AuxService:{} registered successfully", auxServiceName); + } + + private void registerAuxServices(Map config) { + String auxServiceNames = config.getOrDefault(AUX_SERVICE_NAMES, ""); + splitter + .omitEmptyStrings() + .trimResults() + .splitToStream(auxServiceNames) + .forEach( + auxServiceName -> + registerAuxService( + auxServiceName, MapUtils.getPrefixMap(config, DOT.join(auxServiceName, "")))); + } + + private void doWithClassLoader(String auxServiceName, Consumer func) { + IsolatedClassLoader classLoader = auxServiceClassLoaders.get(auxServiceName); + try { + classLoader.withClassLoader( + cl -> { + try { + func.accept(classLoader); + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + }); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void serviceInit(Map config) { + registerAuxServices(config); + auxServices.forEach( + (auxServiceName, auxService) -> { + doWithClassLoader( + auxServiceName, + cl -> + auxService.serviceInit( + MapUtils.getPrefixMap(config, DOT.join(auxServiceName, "")))); + }); + } + + public void serviceStart() { + auxServices.forEach( + (auxServiceName, auxService) -> { + doWithClassLoader(auxServiceName, cl -> auxService.serviceStart()); + }); + } + + private void stopQuietly(String auxServiceName, GravitonAuxiliaryService auxiliaryService) { + try { + auxiliaryService.serviceStop(); + } catch (Exception e) { + LOG.warn("AuxService:{} stop failed", auxServiceName, e); + if (firstException == null) { + firstException = e; + } + } + } + + public void serviceStop() throws Exception { + auxServices.forEach( + (auxServiceName, auxService) -> { + doWithClassLoader(auxServiceName, cl -> stopQuietly(auxServiceName, auxService)); + }); + if (firstException != null) { + throw firstException; + } + } +} diff --git a/core/src/main/java/com/datastrato/graviton/aux/GravitonAuxiliaryService.java b/core/src/main/java/com/datastrato/graviton/aux/GravitonAuxiliaryService.java new file mode 100644 index 00000000000..2addaff5551 --- /dev/null +++ b/core/src/main/java/com/datastrato/graviton/aux/GravitonAuxiliaryService.java @@ -0,0 +1,33 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.graviton.aux; + +import java.util.Map; + +/** + * GravitonAuxiliaryService could be managed as Aux Service in GravitonServer with isolated + * classpath + */ +public interface GravitonAuxiliaryService { + + /** + * The name of GravitonAuxiliaryService implementation, GravitonServer will automatically start + * the aux service implementation if the name is added to `graviton.auxService.AuxServiceNames` + */ + String shortName(); + + /** + * @param config , GravitonServer will pass the config with prefix + * `graviton.auxService.{shortName}.` to aux server + */ + void serviceInit(Map config); + + /** Start aux service */ + void serviceStart(); + + /** Stop aux service */ + void serviceStop() throws Exception; +} diff --git a/core/src/main/java/com/datastrato/graviton/catalog/CatalogManager.java b/core/src/main/java/com/datastrato/graviton/catalog/CatalogManager.java index 59685180ec0..91708978e9c 100644 --- a/core/src/main/java/com/datastrato/graviton/catalog/CatalogManager.java +++ b/core/src/main/java/com/datastrato/graviton/catalog/CatalogManager.java @@ -33,17 +33,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Streams; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.Closeable; import java.io.File; import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; import java.time.Instant; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -397,7 +393,7 @@ private CatalogWrapper createCatalogWrapper(CatalogEntity entity) { IsolatedClassLoader classLoader; if (config.get(Configs.CATALOG_LOAD_ISOLATED)) { String pkgPath = buildPkgPath(mergedConf, provider); - classLoader = buildClassLoader(pkgPath); + classLoader = IsolatedClassLoader.buildClassLoader(pkgPath); } else { // This will use the current class loader, it is mainly used for test. classLoader = @@ -481,30 +477,6 @@ private String buildPkgPath(Map conf, String provider) { return pkgPath; } - private IsolatedClassLoader buildClassLoader(String pkgPath) { - // Listing all the jars under the package path and build the isolated class loader. - File pkgFolder = new File(pkgPath); - if (!pkgFolder.exists() - || !pkgFolder.isDirectory() - || !pkgFolder.canRead() - || !pkgFolder.canExecute()) { - throw new IllegalArgumentException("Invalid package path: " + pkgPath); - } - - List jars = Lists.newArrayList(); - Arrays.stream(pkgFolder.listFiles()) - .forEach( - f -> { - try { - jars.add(f.toURI().toURL()); - } catch (MalformedURLException e) { - LOG.warn("Failed to read jar file: {}", f.getAbsolutePath(), e); - } - }); - - return new IsolatedClassLoader(jars, Collections.emptyList(), Collections.emptyList()); - } - private Class lookupCatalogProvider(String provider, ClassLoader cl) { ServiceLoader loader = ServiceLoader.load(CatalogProvider.class, cl); diff --git a/core/src/main/java/com/datastrato/graviton/utils/IsolatedClassLoader.java b/core/src/main/java/com/datastrato/graviton/utils/IsolatedClassLoader.java index f913ef6524a..094ce1d4fc4 100644 --- a/core/src/main/java/com/datastrato/graviton/utils/IsolatedClassLoader.java +++ b/core/src/main/java/com/datastrato/graviton/utils/IsolatedClassLoader.java @@ -4,10 +4,15 @@ */ package com.datastrato.graviton.utils; +import com.google.common.collect.Lists; import java.io.Closeable; +import java.io.File; import java.io.InputStream; +import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.JavaVersion; @@ -67,6 +72,30 @@ public T withClassLoader(ThrowableFunction fn) throws Except } } + public static IsolatedClassLoader buildClassLoader(String pkgPath) { + // Listing all the jars under the package path and build the isolated class loader. + File pkgFolder = new File(pkgPath); + if (!pkgFolder.exists() + || !pkgFolder.isDirectory() + || !pkgFolder.canRead() + || !pkgFolder.canExecute()) { + throw new IllegalArgumentException("Invalid package path: " + pkgPath); + } + + List jars = Lists.newArrayList(); + Arrays.stream(pkgFolder.listFiles()) + .forEach( + f -> { + try { + jars.add(f.toURI().toURL()); + } catch (MalformedURLException e) { + LOG.warn("Failed to read jar file: {}", f.getAbsolutePath(), e); + } + }); + + return new IsolatedClassLoader(jars, Collections.emptyList(), Collections.emptyList()); + } + /** Closes the class loader. */ @Override public void close() { diff --git a/core/src/main/java/com/datastrato/graviton/utils/MapUtils.java b/core/src/main/java/com/datastrato/graviton/utils/MapUtils.java new file mode 100644 index 00000000000..eb56fcb825c --- /dev/null +++ b/core/src/main/java/com/datastrato/graviton/utils/MapUtils.java @@ -0,0 +1,25 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.graviton.utils; + +import com.google.common.collect.Maps; +import java.util.Collections; +import java.util.Map; + +public class MapUtils { + public static Map getPrefixMap(Map m, String prefix) { + Map configs = Maps.newHashMap(); + m.forEach( + (k, v) -> { + if (k.startsWith(prefix)) { + String newKey = k.substring(prefix.length()); + configs.put(newKey, v); + } + }); + + return Collections.unmodifiableMap(configs); + } +} diff --git a/core/src/test/java/com/datastrato/graviton/aux/TestAuxiliaryServiceManager.java b/core/src/test/java/com/datastrato/graviton/aux/TestAuxiliaryServiceManager.java new file mode 100644 index 00000000000..b87450d542a --- /dev/null +++ b/core/src/test/java/com/datastrato/graviton/aux/TestAuxiliaryServiceManager.java @@ -0,0 +1,77 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.graviton.aux; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.datastrato.graviton.utils.IsolatedClassLoader; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestAuxiliaryServiceManager { + + @Test + public void testGravitonAuxServiceManagerEmptyServiceName() throws Exception { + AuxiliaryServiceManager auxServiceManager = new AuxiliaryServiceManager(); + auxServiceManager.serviceInit(ImmutableMap.of(AuxiliaryServiceManager.AUX_SERVICE_NAMES, "")); + auxServiceManager.serviceStart(); + auxServiceManager.serviceStop(); + } + + @Test + public void testGravitonAuxServiceNotSetClassPath() { + AuxiliaryServiceManager auxServiceManager = new AuxiliaryServiceManager(); + Assertions.assertThrowsExactly( + IllegalArgumentException.class, + () -> + auxServiceManager.serviceInit( + ImmutableMap.of(AuxiliaryServiceManager.AUX_SERVICE_NAMES, "mock1"))); + } + + @Test + public void testGravitonAuxServiceManager() throws Exception { + GravitonAuxiliaryService auxService = mock(GravitonAuxiliaryService.class); + GravitonAuxiliaryService auxService2 = mock(GravitonAuxiliaryService.class); + + IsolatedClassLoader isolatedClassLoader = + new IsolatedClassLoader( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + + AuxiliaryServiceManager auxServiceManager = new AuxiliaryServiceManager(); + AuxiliaryServiceManager spyAuxManager = spy(auxServiceManager); + + doReturn(isolatedClassLoader).when(spyAuxManager).getIsolatedClassLoader(anyString()); + doReturn(auxService).when(spyAuxManager).loadAuxService("mock1", isolatedClassLoader); + doReturn(auxService2).when(spyAuxManager).loadAuxService("mock2", isolatedClassLoader); + + spyAuxManager.serviceInit( + ImmutableMap.of( + AuxiliaryServiceManager.AUX_SERVICE_NAMES, + "mock1,mock2", + "mock1." + AuxiliaryServiceManager.AUX_SERVICE_CLASSPATH, + "/tmp", + "mock2." + AuxiliaryServiceManager.AUX_SERVICE_CLASSPATH, + "/tmp")); + verify(auxService, times(1)).serviceInit(any()); + verify(auxService2, times(1)).serviceInit(any()); + + spyAuxManager.serviceStart(); + verify(auxService, times(1)).serviceStart(); + verify(auxService2, times(1)).serviceStart(); + + spyAuxManager.serviceStop(); + verify(auxService, times(1)).serviceStop(); + verify(auxService2, times(1)).serviceStop(); + } +} diff --git a/core/src/test/java/com/datastrato/graviton/utils/TestMapUtils.java b/core/src/test/java/com/datastrato/graviton/utils/TestMapUtils.java new file mode 100644 index 00000000000..8e180cc6d4a --- /dev/null +++ b/core/src/test/java/com/datastrato/graviton/utils/TestMapUtils.java @@ -0,0 +1,27 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.graviton.utils; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestMapUtils { + + @Test + public void testGetPrefixMap() { + Map configs = ImmutableMap.of("a.b", "", "a.c", "", "", "", "b.a", ""); + + Assertions.assertEquals(ImmutableMap.of(), MapUtils.getPrefixMap(configs, "xx")); + Assertions.assertThrowsExactly( + NullPointerException.class, () -> MapUtils.getPrefixMap(configs, null)); + Assertions.assertEquals( + ImmutableMap.of("b", "", "c", ""), MapUtils.getPrefixMap(configs, "a.")); + Assertions.assertEquals(ImmutableMap.of("a", ""), MapUtils.getPrefixMap(configs, "b.")); + Assertions.assertEquals(configs, MapUtils.getPrefixMap(configs, "")); + } +} diff --git a/server/src/main/java/com/datastrato/graviton/server/GravitonServer.java b/server/src/main/java/com/datastrato/graviton/server/GravitonServer.java index 0483844d1d0..52d1e03c4a6 100644 --- a/server/src/main/java/com/datastrato/graviton/server/GravitonServer.java +++ b/server/src/main/java/com/datastrato/graviton/server/GravitonServer.java @@ -70,6 +70,7 @@ protected void configure() { public void start() throws Exception { server.start(); + gravitonEnv.start(); } public void join() { diff --git a/server/src/test/java/com/datastrato/graviton/server/TestServerConfig.java b/server/src/test/java/com/datastrato/graviton/server/TestServerConfig.java index 5ddf74f3798..a5b69b1c207 100644 --- a/server/src/test/java/com/datastrato/graviton/server/TestServerConfig.java +++ b/server/src/test/java/com/datastrato/graviton/server/TestServerConfig.java @@ -5,6 +5,7 @@ package com.datastrato.graviton.server; import com.datastrato.graviton.Configs; +import com.datastrato.graviton.aux.AuxiliaryServiceManager; import com.datastrato.graviton.config.ConfigEntry; import java.io.File; import java.io.IOException; @@ -41,6 +42,9 @@ public void checkGravitonConfFile() // `Configs` for (Map.Entry entry : properties.entrySet()) { String propKey = (String) entry.getKey(); + if (propKey.startsWith(AuxiliaryServiceManager.GRAVITON_AUX_SERVICE_PREFIX)) { + continue; + } Assertions.assertTrue( configKeyMap.containsKey(propKey), "Config key " + propKey + " is not defined in ConfigEntry");