Skip to content

Commit

Permalink
[#321] feat(core): add auxiliary service support in Graviton (#406)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

add a aux service to graviton to support Iceberg REST server

### Why are the changes needed?
support Iceberg REST server

part: #321
will propose another PR to start Iceberg REST server.

### Does this PR introduce _any_ user-facing change?
add  Iceberg REST server

### How was this patch tested?
1. UT
2. start the Iceberg REST server in local env. 

### design docs 

https://docs.google.com/document/d/1N_9NtaoHIeKQ36Q4fYZ9eGDotvjlB0-7lHKSoT76IZk/edit#heading=h.kbhoexq6d6ba
  • Loading branch information
FANNG1 authored Sep 20, 2023
1 parent b260814 commit 0212b05
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 40 deletions.
13 changes: 2 additions & 11 deletions core/src/main/java/com/datastrato/graviton/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
package com.datastrato.graviton;

import com.datastrato.graviton.config.ConfigEntry;
import com.datastrato.graviton.utils.MapUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.Collections;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
Expand Down Expand Up @@ -128,16 +128,7 @@ public String getRawString(String key, String defaultValue) {
* @return An unmodifiable map containing configuration entries with keys matching the prefix.
*/
public Map<String, String> getConfigsWithPrefix(String prefix) {
Map<String, String> configs = Maps.newHashMap();
configMap.forEach(
(k, v) -> {
if (k.startsWith(prefix)) {
String newKey = k.substring(prefix.length());
configs.put(newKey, v);
}
});

return Collections.unmodifiableMap(configs);
return MapUtils.getPrefixMap(configMap, prefix);
}

/**
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/com/datastrato/graviton/GravitonEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.datastrato.graviton;

import com.datastrato.graviton.aux.AuxiliaryServiceManager;
import com.datastrato.graviton.catalog.CatalogManager;
import com.datastrato.graviton.catalog.CatalogOperationDispatcher;
import com.datastrato.graviton.meta.MetalakeManager;
Expand Down Expand Up @@ -34,6 +35,8 @@ public class GravitonEnv {

private IdGenerator idGenerator;

private AuxiliaryServiceManager auxServiceManager;

private GravitonEnv() {}

private static class InstanceHolder {
Expand Down Expand Up @@ -77,6 +80,10 @@ public void initialize(Config config) {
this.catalogManager = new CatalogManager(config, entityStore, idGenerator);
this.catalogOperationDispatcher = new CatalogOperationDispatcher(catalogManager);

this.auxServiceManager = new AuxiliaryServiceManager();
this.auxServiceManager.serviceInit(
config.getConfigsWithPrefix(AuxiliaryServiceManager.GRAVITON_AUX_SERVICE_PREFIX));

LOG.info("Graviton Environment is initialized.");
}

Expand Down Expand Up @@ -144,6 +151,10 @@ public IdGenerator idGenerator() {
return idGenerator;
}

public void start() {
auxServiceManager.serviceStart();
}

/** Shutdown the Graviton environment. */
public void shutdown() {
LOG.info("Shutting down Graviton Environment...");
Expand All @@ -160,6 +171,14 @@ public void shutdown() {
catalogManager.close();
}

if (auxServiceManager != null) {
try {
auxServiceManager.serviceStop();
} catch (Exception e) {
LOG.warn("Failed to stop AuxServiceManager", e);
}
}

LOG.info("Graviton Environment is shut down.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.graviton.aux;

import com.datastrato.graviton.utils.IsolatedClassLoader;
import com.datastrato.graviton.utils.MapUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* AuxiliaryServiceManager manage all GravitonAuxiliaryServices with isolated classloader provided
*/
public class AuxiliaryServiceManager {
private static final Logger LOG = LoggerFactory.getLogger(AuxiliaryServiceManager.class);
public static final String GRAVITON_AUX_SERVICE_PREFIX = "graviton.auxService.";
public static final String AUX_SERVICE_NAMES = "auxServiceNames";
public static final String AUX_SERVICE_CLASSPATH = "auxServiceClasspath";

private static final Splitter splitter = Splitter.on(",");
private static final Joiner DOT = Joiner.on(".");

private Map<String, GravitonAuxiliaryService> auxServices = new HashMap<>();
private Map<String, IsolatedClassLoader> auxServiceClassLoaders = new HashMap<>();

private Exception firstException;

private Class<? extends GravitonAuxiliaryService> lookupAuxService(
String provider, ClassLoader cl) {
ServiceLoader<GravitonAuxiliaryService> loader =
ServiceLoader.load(GravitonAuxiliaryService.class, cl);
List<Class<? extends GravitonAuxiliaryService>> providers =
Streams.stream(loader.iterator())
.filter(p -> p.shortName().equalsIgnoreCase(provider))
.map(GravitonAuxiliaryService::getClass)
.collect(Collectors.toList());

if (providers.size() == 0) {
throw new IllegalArgumentException("No GravitonAuxiliaryService found for: " + provider);
} else if (providers.size() > 1) {
throw new IllegalArgumentException(
"Multiple GravitonAuxiliaryService found for: " + provider);
} else {
return Iterables.getOnlyElement(providers);
}
}

@VisibleForTesting
public GravitonAuxiliaryService loadAuxService(
String auxServiceName, IsolatedClassLoader isolatedClassLoader) throws Exception {
return isolatedClassLoader.withClassLoader(
cl -> {
try {
Class<? extends GravitonAuxiliaryService> providerClz =
lookupAuxService(auxServiceName, cl);
return providerClz.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

@VisibleForTesting
public IsolatedClassLoader getIsolatedClassLoader(String classPath) {
return IsolatedClassLoader.buildClassLoader(classPath);
}

@VisibleForTesting
static String getValidPath(String auxServiceName, String pathString) {
Preconditions.checkArgument(
StringUtils.isNoneBlank(pathString),
String.format(
"AuxService:%s, %s%s.%s is not set in configuration",
auxServiceName, GRAVITON_AUX_SERVICE_PREFIX, auxServiceName, AUX_SERVICE_CLASSPATH));

Path path = Paths.get(pathString);
if (Files.exists(path)) {
return path.toAbsolutePath().toString();
}

String gravitonHome = System.getenv("GRAVITON_HOME");
if (!path.isAbsolute() && gravitonHome != null) {
Path newPath = Paths.get(gravitonHome, pathString);
if (Files.exists(newPath)) {
return newPath.toString();
}
}

throw new IllegalArgumentException(
String.format(
"AuxService:%s, classpath: %s not exists, gravitonHome:%s",
auxServiceName, pathString, gravitonHome));
}

private void registerAuxService(String auxServiceName, Map<String, String> config) {
String classPath = config.get(AUX_SERVICE_CLASSPATH);
classPath = getValidPath(auxServiceName, classPath);
LOG.info("AuxService name:{}, config:{}, classpath:{}", auxServiceName, config, classPath);

IsolatedClassLoader isolatedClassLoader = getIsolatedClassLoader(classPath);
try {
GravitonAuxiliaryService gravitonAuxiliaryService =
loadAuxService(auxServiceName, isolatedClassLoader);
auxServices.put(auxServiceName, gravitonAuxiliaryService);
auxServiceClassLoaders.put(auxServiceName, isolatedClassLoader);
} catch (Exception e) {
LOG.error("Failed to register auxService: {}", auxServiceName, e);
throw new RuntimeException(e);
}
LOG.info("AuxService:{} registered successfully", auxServiceName);
}

private void registerAuxServices(Map<String, String> config) {
String auxServiceNames = config.getOrDefault(AUX_SERVICE_NAMES, "");
splitter
.omitEmptyStrings()
.trimResults()
.splitToStream(auxServiceNames)
.forEach(
auxServiceName ->
registerAuxService(
auxServiceName, MapUtils.getPrefixMap(config, DOT.join(auxServiceName, ""))));
}

private void doWithClassLoader(String auxServiceName, Consumer<IsolatedClassLoader> func) {
IsolatedClassLoader classLoader = auxServiceClassLoaders.get(auxServiceName);
try {
classLoader.withClassLoader(
cl -> {
try {
func.accept(classLoader);
} catch (Exception e) {
throw new RuntimeException(e);
}
return null;
});
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public void serviceInit(Map<String, String> config) {
registerAuxServices(config);
auxServices.forEach(
(auxServiceName, auxService) -> {
doWithClassLoader(
auxServiceName,
cl ->
auxService.serviceInit(
MapUtils.getPrefixMap(config, DOT.join(auxServiceName, ""))));
});
}

public void serviceStart() {
auxServices.forEach(
(auxServiceName, auxService) -> {
doWithClassLoader(auxServiceName, cl -> auxService.serviceStart());
});
}

private void stopQuietly(String auxServiceName, GravitonAuxiliaryService auxiliaryService) {
try {
auxiliaryService.serviceStop();
} catch (Exception e) {
LOG.warn("AuxService:{} stop failed", auxServiceName, e);
if (firstException == null) {
firstException = e;
}
}
}

public void serviceStop() throws Exception {
auxServices.forEach(
(auxServiceName, auxService) -> {
doWithClassLoader(auxServiceName, cl -> stopQuietly(auxServiceName, auxService));
});
if (firstException != null) {
throw firstException;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.graviton.aux;

import java.util.Map;

/**
* GravitonAuxiliaryService could be managed as Aux Service in GravitonServer with isolated
* classpath
*/
public interface GravitonAuxiliaryService {

/**
* The name of GravitonAuxiliaryService implementation, GravitonServer will automatically start
* the aux service implementation if the name is added to `graviton.auxService.AuxServiceNames`
*/
String shortName();

/**
* @param config , GravitonServer will pass the config with prefix
* `graviton.auxService.{shortName}.` to aux server
*/
void serviceInit(Map<String, String> config);

/** Start aux service */
void serviceStart();

/** Stop aux service */
void serviceStop() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -397,7 +393,7 @@ private CatalogWrapper createCatalogWrapper(CatalogEntity entity) {
IsolatedClassLoader classLoader;
if (config.get(Configs.CATALOG_LOAD_ISOLATED)) {
String pkgPath = buildPkgPath(mergedConf, provider);
classLoader = buildClassLoader(pkgPath);
classLoader = IsolatedClassLoader.buildClassLoader(pkgPath);
} else {
// This will use the current class loader, it is mainly used for test.
classLoader =
Expand Down Expand Up @@ -481,30 +477,6 @@ private String buildPkgPath(Map<String, String> conf, String provider) {
return pkgPath;
}

private IsolatedClassLoader buildClassLoader(String pkgPath) {
// Listing all the jars under the package path and build the isolated class loader.
File pkgFolder = new File(pkgPath);
if (!pkgFolder.exists()
|| !pkgFolder.isDirectory()
|| !pkgFolder.canRead()
|| !pkgFolder.canExecute()) {
throw new IllegalArgumentException("Invalid package path: " + pkgPath);
}

List<URL> jars = Lists.newArrayList();
Arrays.stream(pkgFolder.listFiles())
.forEach(
f -> {
try {
jars.add(f.toURI().toURL());
} catch (MalformedURLException e) {
LOG.warn("Failed to read jar file: {}", f.getAbsolutePath(), e);
}
});

return new IsolatedClassLoader(jars, Collections.emptyList(), Collections.emptyList());
}

private Class<? extends CatalogProvider> lookupCatalogProvider(String provider, ClassLoader cl) {
ServiceLoader<CatalogProvider> loader = ServiceLoader.load(CatalogProvider.class, cl);

Expand Down
Loading

0 comments on commit 0212b05

Please sign in to comment.