Skip to content

Commit

Permalink
[#415] feat(core, server, catalogs): Add property validation logic fo…
Browse files Browse the repository at this point in the history
…r catalog (#452)

### What changes were proposed in this pull request?

1. Add property validation logic for catalog
2. Move property validation code for tables to  `PropertiesMetadata`
3. Fix some typo problem.

### Why are the changes needed?

Currently, we have not verified the accuracy of the properties when
creating/altering catalogs, which is very risky and tends to be buggy.


Fix: #415 

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

Add `TestPropertyMeta` and `TestPropertyValidator`
  • Loading branch information
yuqi1129 authored Oct 8, 2023
1 parent 80d1661 commit e7843e2
Show file tree
Hide file tree
Showing 21 changed files with 514 additions and 124 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ tasks.rat {
// Ignore files we track but do not distribute
"**/.github/**/*",
"dev/docker/**/*.xml",
"**/*.log",
)

// Add .gitignore excludes to the Apache Rat exclusion list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.datastrato.graviton.rel.transforms.Transforms;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
Expand Down Expand Up @@ -742,4 +743,10 @@ private static String currentUser() {
public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException {
return tablePropertiesMetadata;
}

@Override
public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException {
// TODO(yuqi): We will implement this in next PR
return Maps::newHashMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
*/
package com.datastrato.graviton.catalog.hive;

import com.datastrato.graviton.catalog.BasePropertiesMetadata;
import com.datastrato.graviton.catalog.PropertyEntry;
import com.datastrato.graviton.catalog.TablePropertiesMetadata;
import com.google.common.collect.Maps;
import java.util.Map;

public class HiveTablePropertiesMetadata extends TablePropertiesMetadata {
public class HiveTablePropertiesMetadata extends BasePropertiesMetadata {
@Override
protected Map<String, PropertyEntry<?>> tablePropertyEntries() {
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
// TODO(Minghuang): support Hive table property specs
return Maps.newHashMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.datastrato.graviton.rel.transforms.Transform;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -384,4 +385,9 @@ private static String currentUser() {
public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException {
return tablePropertiesMetadata;
}

@Override
public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException {
return Maps::newHashMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg;

import com.datastrato.graviton.catalog.BasePropertiesMetadata;
import com.datastrato.graviton.catalog.PropertyEntry;
import com.datastrato.graviton.catalog.TablePropertiesMetadata;
import com.google.common.collect.Maps;
import java.util.Map;

public class IcebergTablePropertiesMetadata extends TablePropertiesMetadata {
public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata {
@Override
protected Map<String, PropertyEntry<?>> tablePropertyEntries() {
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
// TODO: support Iceberg table property specs
return Maps.newHashMap();
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/com/datastrato/graviton/Entity.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ enum EntityType {

AUDIT("au", 65534);

// Short name can be used to identify the entity type in the logs, peristent storage, etc.
// Short name can be used to identify the entity type in the logs, persistent storage, etc.
private final String shortName;
private final int index;

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/com/datastrato/graviton/EntityStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ <E extends Entity & HasIdentifier> void put(E e, boolean overwritten)
* @return E the updated entity
* @throws IOException if the store operation fails
* @throws NoSuchEntityException if the entity does not exist
* @throws AlreadyExistsException if the updated entity already exitsed.
* @throws AlreadyExistsException if the updated entity already existed.
*/
<E extends Entity & HasIdentifier> E update(
NameIdentifier ident, Class<E> type, EntityType entityType, Function<E, E> updater)
Expand Down Expand Up @@ -143,7 +143,7 @@ default boolean delete(NameIdentifier ident, EntityType entityType) throws IOExc
*
* @param ident the name identifier of the entity
* @param entityType the type of the entity to be deleted
* @param cascade support cacade detele or not
* @param cascade support cascade delete or not
* @return true if the entity exists and is deleted successfully, false otherwise
* @throws IOException if the delete operation fails
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.datastrato.graviton.CatalogProvider;
import com.datastrato.graviton.meta.CatalogEntity;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.Map;

/**
Expand All @@ -32,6 +33,7 @@ public abstract class BaseCatalog<T extends BaseCatalog>

private volatile CatalogOperations ops;

private volatile Map<String, String> properties;
/**
* Creates a new instance of CatalogOperations.
*
Expand All @@ -45,6 +47,11 @@ public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationE
return ops().tablePropertiesMetadata();
}

@Override
public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException {
return ops().catalogPropertiesMetadata();
}

/**
* Retrieves the CatalogOperations instance associated with this catalog. Lazily initializes the
* instance if not already created.
Expand Down Expand Up @@ -123,8 +130,19 @@ public String comment() {

@Override
public Map<String, String> properties() {
Preconditions.checkArgument(entity != null, "entity is not set");
return entity.getProperties();
if (properties == null) {
synchronized (this) {
if (properties == null) {
Preconditions.checkArgument(entity != null, "entity is not set");
properties = Maps.newHashMap(entity.getProperties());
properties
.entrySet()
.removeIf(
entry -> ops().catalogPropertiesMetadata().isHiddenProperty(entry.getKey()));
}
}
}
return properties;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@
import java.util.List;
import java.util.Map;

public abstract class TablePropertiesMetadata implements PropertiesMetadata {
public abstract class BasePropertiesMetadata implements PropertiesMetadata {

private static final Map<String, PropertyEntry<?>> BASIC_TABLE_PROPERTY_ENTRIES;
private static final Map<String, PropertyEntry<?>> BASIC_PROPERTY_ENTRIES;

private Map<String, PropertyEntry<?>> propertyEntries;
private volatile Map<String, PropertyEntry<?>> propertyEntries;

static {
List<PropertyEntry<?>> basicTablePropertyEntries =
// basicPropertyEntries is shared by all entities
List<PropertyEntry<?>> basicPropertyEntries =
ImmutableList.of(
PropertyEntry.stringReservedPropertyEntry(
ID_KEY,
"To differentiate the entities created directly by the underlying sources",
true));

BASIC_TABLE_PROPERTY_ENTRIES =
Maps.uniqueIndex(basicTablePropertyEntries, PropertyEntry::getName);
BASIC_PROPERTY_ENTRIES = Maps.uniqueIndex(basicPropertyEntries, PropertyEntry::getName);
}

@Override
Expand All @@ -37,14 +37,13 @@ public Map<String, PropertyEntry<?>> propertyEntries() {
synchronized (this) {
if (propertyEntries == null) {
ImmutableMap.Builder<String, PropertyEntry<?>> builder = ImmutableMap.builder();
Map<String, PropertyEntry<?>> catalogTableProperty = tablePropertyEntries();
builder.putAll(catalogTableProperty);
Map<String, PropertyEntry<?>> properties = specificPropertyEntries();
builder.putAll(properties);

BASIC_TABLE_PROPERTY_ENTRIES.forEach(
BASIC_PROPERTY_ENTRIES.forEach(
(name, entry) -> {
Preconditions.checkArgument(
!catalogTableProperty.containsKey(name),
"Property metadata already exists: " + name);
!properties.containsKey(name), "Property metadata already exists: " + name);
builder.put(name, entry);
});

Expand All @@ -55,5 +54,5 @@ public Map<String, PropertyEntry<?>> propertyEntries() {
return propertyEntries;
}

protected abstract Map<String, PropertyEntry<?>> tablePropertyEntries();
protected abstract Map<String, PropertyEntry<?>> specificPropertyEntries();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import com.datastrato.graviton.Catalog;
import com.datastrato.graviton.CatalogChange;
import com.datastrato.graviton.CatalogChange.RemoveProperty;
import com.datastrato.graviton.CatalogChange.SetProperty;
import com.datastrato.graviton.CatalogProvider;
import com.datastrato.graviton.Config;
import com.datastrato.graviton.Configs;
Expand Down Expand Up @@ -40,6 +42,7 @@
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -49,6 +52,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -210,7 +214,8 @@ public NameIdentifier[] listCatalogs(Namespace namespace) throws NoSuchMetalakeE
*/
@Override
public Catalog loadCatalog(NameIdentifier ident) throws NoSuchCatalogException {
return loadCatalogAndWrap(ident).catalog;
CatalogWrapper wrapper = loadCatalogInternal(ident);
return wrapper.catalog;
}

/**
Expand Down Expand Up @@ -266,18 +271,44 @@ public Catalog createCatalog(
store.put(e, false /* overwrite */);
return e;
});
return catalogCache.get(ident, id -> createCatalogWrapper(entity)).catalog;

} catch (EntityAlreadyExistsException ee) {
LOG.warn("Catalog {} already exists", ident, ee);
CatalogWrapper wrapper = catalogCache.get(ident, id -> createCatalogWrapper(entity));
wrapper.doWithPropertiesMeta(
f -> {
f.catalogPropertiesMetadata().validatePropertyForCreate(properties);
return null;
});
return wrapper.catalog;
} catch (EntityAlreadyExistsException e1) {
LOG.warn("Catalog {} already exists", ident, e1);
throw new CatalogAlreadyExistsException("Catalog " + ident + " already exists");

} catch (IOException ioe) {
LOG.error("Failed to create catalog {}", ident, ioe);
throw new RuntimeException(ioe);
} catch (IllegalArgumentException | NoSuchMetalakeException e2) {
throw e2;
} catch (Exception e3) {
LOG.error("Failed to create catalog {}", ident, e3);
throw new RuntimeException(e3);
}
}

private Pair<Map<String, String>, Map<String, String>> getCatalogAlterProperty(
CatalogChange... catalogChanges) {
Map<String, String> upserts = Maps.newHashMap();
Map<String, String> deletes = Maps.newHashMap();

Arrays.stream(catalogChanges)
.forEach(
tableChange -> {
if (tableChange instanceof SetProperty) {
SetProperty setProperty = (SetProperty) tableChange;
upserts.put(setProperty.getProperty(), setProperty.getValue());
} else if (tableChange instanceof RemoveProperty) {
RemoveProperty removeProperty = (RemoveProperty) tableChange;
deletes.put(removeProperty.getProperty(), removeProperty.getProperty());
}
});

return Pair.of(upserts, deletes);
}

/**
* Alters an existing catalog with the specified changes.
*
Expand All @@ -292,8 +323,29 @@ public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
throws NoSuchCatalogException, IllegalArgumentException {
// There could be a race issue that someone is using the catalog from cache while we are
// updating it.
catalogCache.invalidate(ident);

CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
if (catalogWrapper == null) {
throw new NoSuchCatalogException("Catalog " + ident + " does not exist");
}

try {
catalogWrapper.doWithPropertiesMeta(
f -> {
Pair<Map<String, String>, Map<String, String>> alterProperty =
getCatalogAlterProperty(changes);
f.catalogPropertiesMetadata()
.validatePropertyForAlter(alterProperty.getLeft(), alterProperty.getRight());
return null;
});
} catch (IllegalArgumentException e1) {
throw e1;
} catch (Exception e) {
LOG.error("Failed to alter catalog {}", ident, e);
throw new RuntimeException(e);
}

catalogCache.invalidate(ident);
try {
CatalogEntity updatedCatalog =
store.update(
Expand Down Expand Up @@ -434,7 +486,24 @@ private CatalogWrapper createCatalogWrapper(CatalogEntity entity) {

// Initialize the catalog
catalog = catalog.withCatalogEntity(entity).withCatalogConf(mergedConf);
return new CatalogWrapper(catalog, classLoader);
CatalogWrapper wrapper = new CatalogWrapper(catalog, classLoader);

// Call wrapper.catalog.properties() to make BaseCatalog#properties in IsolatedClassLoader
// not null. Why we do this? Because wrapper.catalog.properties() need to be called in the
// IsolatedClassLoader, it needs to load the specific catalog class such as HiveCatalog or so.
// For simply, We will preload the value of properties and thus AppClassLoader can get the
// value of properties.
try {
wrapper.doWithPropertiesMeta(
f -> {
wrapper.catalog.properties();
return null;
});
} catch (Exception e) {
LOG.error("Failed to load catalog '{}' properties", entity.name(), e);
throw new RuntimeException(e);
}
return wrapper;
}

static Map<String, String> mergeConf(Map<String, String> properties, Map<String, String> conf) {
Expand Down
Loading

0 comments on commit e7843e2

Please sign in to comment.