Skip to content

Commit

Permalink
add catalog meta cache
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyongxiang.alpha committed Jan 10, 2025
1 parent 0fd26a1 commit fc1d05a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public class AmoroManagementConf {
.defaultValue("admin")
.withDescription("The administrator password");

public static final ConfigOption<Duration> CACHE_CATALOG_META_DURATION =
ConfigOptions.key("cache.catalog-meta.duration")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription("TTL for catalog metadata.");

public static final ConfigOption<Integer> TABLE_MANIFEST_IO_THREAD_COUNT =
ConfigOptions.key("table-manifest-io.thread-count")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,66 @@
import org.apache.amoro.exception.IllegalMetadataException;
import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
import org.apache.amoro.shade.guava32.com.google.common.cache.CacheBuilder;
import org.apache.amoro.shade.guava32.com.google.common.cache.CacheLoader;
import org.apache.amoro.shade.guava32.com.google.common.cache.LoadingCache;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class DefaultCatalogManager extends PersistentBase implements CatalogManager {

private static final Logger LOG = LoggerFactory.getLogger(DefaultCatalogManager.class);
protected final Configurations serverConfiguration;
private final LoadingCache<String, Optional<CatalogMeta>> metaCache;

private final Map<String, ServerCatalog> serverCatalogMap = Maps.newConcurrentMap();

public DefaultCatalogManager(Configurations serverConfiguration) {
this.serverConfiguration = serverConfiguration;
Duration cacheTtl = serverConfiguration.get(AmoroManagementConf.CACHE_CATALOG_META_DURATION);
metaCache =
CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterWrite(cacheTtl)
.build(
new CacheLoader<String, Optional<CatalogMeta>>() {
@Override
public @NotNull Optional<CatalogMeta> load(@NotNull String key) throws Exception {
return Optional.ofNullable(
getAs(CatalogMetaMapper.class, mapper -> mapper.getCatalog(key)));
}
});

listCatalogMetas()
.forEach(
c -> {
ServerCatalog serverCatalog =
CatalogBuilder.buildServerCatalog(c, serverConfiguration);
serverCatalogMap.put(c.getCatalogName(), serverCatalog);
metaCache.put(c.getCatalogName(), Optional.of(c));
LOG.info("Load catalog {}, type:{}", c.getCatalogName(), c.getCatalogType());
});
LOG.info("DefaultCatalogManager initialized, total catalogs: {}", serverCatalogMap.size());
}

@Override
public List<CatalogMeta> listCatalogMetas() {
return getAs(CatalogMetaMapper.class, CatalogMetaMapper::getCatalogs);
return getAs(CatalogMetaMapper.class, CatalogMetaMapper::getCatalogs)
.stream()
.peek(c -> metaCache.put(c.getCatalogName(), Optional.of(c)))
.collect(Collectors.toList());
}

@Override
Expand All @@ -66,10 +93,11 @@ public CatalogMeta getCatalogMeta(String catalogName) {
}

private Optional<CatalogMeta> getCatalogMetaOptional(String catalogName) {
return Optional.ofNullable(
getAs(
CatalogMetaMapper.class,
catalogMetaMapper -> catalogMetaMapper.getCatalog(catalogName)));
try {
return metaCache.get(catalogName);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

@Override
Expand Down Expand Up @@ -151,6 +179,7 @@ public void dropCatalog(String catalogName) {
}
}
mapper.deleteCatalog(catalogName);
metaCache.invalidate(catalogName);
});

disposeCatalog(catalogName);
Expand All @@ -160,6 +189,8 @@ public void dropCatalog(String catalogName) {
public void updateCatalog(CatalogMeta catalogMeta) {
ServerCatalog catalog = getServerCatalog(catalogMeta.getCatalogName());
validateCatalogUpdate(catalog.getMetadata(), catalogMeta);

metaCache.invalidate(catalogMeta.getCatalogName());
catalog.updateMetadata(catalogMeta);
LOG.info("Update catalog metadata: {}", catalogMeta.getCatalogName());
}
Expand All @@ -178,9 +209,10 @@ private void disposeCatalog(String name) {
c.dispose();
return null;
});
metaCache.invalidate(name);
}

private boolean isInternal(CatalogMeta meta) {
private static boolean isInternal(CatalogMeta meta) {
return CatalogMetaProperties.CATALOG_TYPE_AMS.equalsIgnoreCase(meta.getCatalogType());
}
}
4 changes: 4 additions & 0 deletions dist/src/main/amoro-bin/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ ams:
table-manifest-io:
thread-count: 20

cache:
catalog-meta:
duration: 60s

database:
type: derby
jdbc-driver-class: org.apache.derby.jdbc.EmbeddedDriver
Expand Down

0 comments on commit fc1d05a

Please sign in to comment.