Skip to content

Commit

Permalink
[#575] feat(jdbc): Support for DataSource and schema operations in JD…
Browse files Browse the repository at this point in the history
…BC catalog.
  • Loading branch information
Clearvive authored and Clearvive committed Nov 9, 2023
1 parent b1e19c3 commit 22656a0
Show file tree
Hide file tree
Showing 17 changed files with 847 additions and 11 deletions.
8 changes: 7 additions & 1 deletion catalogs/catalog-jdbc-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ dependencies {
implementation(libs.jackson.datatype.jdk8)
implementation(libs.jackson.datatype.jsr310)
implementation(libs.guava)
implementation(libs.bundles.log4j)
implementation(libs.commons.lang3)
implementation(libs.bundles.log4j)
implementation(libs.commons.collections4)
implementation(libs.substrait.java.core) {
exclude("com.fasterxml.jackson.core")
Expand All @@ -31,7 +31,13 @@ dependencies {
exclude("com.google.code.findbugs")
exclude("org.slf4j")
}
implementation(libs.bundles.datasource)

testImplementation(libs.commons.io)
testImplementation(libs.sqlite.jdbc)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testRuntimeOnly(libs.junit.jupiter.engine)
compileOnly(libs.lombok)
annotationProcessor(libs.lombok)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import com.datastrato.gravitino.catalog.BaseCatalog;
import com.datastrato.gravitino.catalog.CatalogOperations;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import com.datastrato.gravitino.rel.SupportsSchemas;
import com.datastrato.gravitino.rel.TableCatalog;
import java.util.Map;
Expand All @@ -21,7 +23,8 @@ public abstract class JdbcCatalog extends BaseCatalog<JdbcCatalog> {
*/
@Override
protected CatalogOperations newOps(Map<String, String> config) {
JdbcCatalogOperations ops = new JdbcCatalogOperations(entity());
JdbcCatalogOperations ops =
new JdbcCatalogOperations(entity(), createExceptionConverter(), createJdbcTypeConverter());
ops.initialize(config);
return ops;
}
Expand All @@ -37,4 +40,12 @@ public SupportsSchemas asSchemas() {
public TableCatalog asTableCatalog() {
return (JdbcCatalogOperations) ops();
}

/** @return The {@link JdbcExceptionConverter} to be used by the catalog. */
protected JdbcExceptionConverter createExceptionConverter() {
return new JdbcExceptionConverter() {};
}

/** @return The {@link JdbcTypeConverter} to be used by the catalog. */
protected abstract JdbcTypeConverter createJdbcTypeConverter();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,24 @@
*/
package com.datastrato.gravitino.catalog.jdbc;

import static com.datastrato.gravitino.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.catalog.CatalogOperations;
import com.datastrato.gravitino.catalog.PropertiesMetadata;
import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
import com.datastrato.gravitino.catalog.jdbc.utils.DataSourceUtils;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.Distribution;
Expand All @@ -24,6 +32,9 @@
import com.datastrato.gravitino.rel.TableCatalog;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.transforms.Transform;
import com.datastrato.gravitino.utils.MapUtils;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,13 +52,26 @@ public class JdbcCatalogOperations implements CatalogOperations, SupportsSchemas

private final CatalogEntity entity;

private final JdbcExceptionConverter exceptionConverter;

private final JdbcTypeConverter jdbcTypeConverter;

private JdbcDatabaseOperations jdbcDatabaseOperations;

/**
* Constructs a new instance of JdbcCatalogOperations.
*
* @param entity The catalog entity associated with this operations instance.
* @param exceptionConverter The exception converter to be used by the operations.
* @param jdbcTypeConverter The type converter to be used by the operations.
*/
public JdbcCatalogOperations(CatalogEntity entity) {
public JdbcCatalogOperations(
CatalogEntity entity,
JdbcExceptionConverter exceptionConverter,
JdbcTypeConverter jdbcTypeConverter) {
this.entity = entity;
this.exceptionConverter = exceptionConverter;
this.jdbcTypeConverter = jdbcTypeConverter;
}

/**
Expand All @@ -58,7 +82,20 @@ public JdbcCatalogOperations(CatalogEntity entity) {
*/
@Override
public void initialize(Map<String, String> conf) throws RuntimeException {
// Key format like gravitino.bypass.a.b
Map<String, String> prefixMap = MapUtils.getPrefixMap(conf, CATALOG_BYPASS_PREFIX);
this.jdbcCatalogPropertiesMetadata = new JdbcCatalogPropertiesMetadata();
// Hold keys that lie in GRAVITINO_CONFIG_TO_ICEBERG
Map<String, String> gravitinoConfig =
this.jdbcCatalogPropertiesMetadata.transformProperties(conf);

Map<String, String> resultConf = Maps.newHashMap(prefixMap);
resultConf.putAll(gravitinoConfig);

JdbcConfig jdbcConfig = new JdbcConfig(resultConf);
this.jdbcDatabaseOperations =
new JdbcDatabaseOperations(
DataSourceUtils.createDataSource(jdbcConfig), exceptionConverter);
this.jdbcTablePropertiesMetadata = new JdbcTablePropertiesMetadata();
this.jdbcSchemaPropertiesMetadata = new JdbcSchemaPropertiesMetadata();
}
Expand All @@ -76,7 +113,10 @@ public void close() {}
*/
@Override
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException {
throw new UnsupportedOperationException();
List<String> schemaNames = jdbcDatabaseOperations.list();
return schemaNames.stream()
.map(db -> NameIdentifier.of(namespace, db))
.toArray(NameIdentifier[]::new);
}

/**
Expand All @@ -93,7 +133,8 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc
public JdbcSchema createSchema(
NameIdentifier ident, String comment, Map<String, String> properties)
throws NoSuchCatalogException, SchemaAlreadyExistsException {
throw new UnsupportedOperationException();
jdbcDatabaseOperations.create(ident.name());
return new JdbcSchema.Builder().withName(ident.name()).withAuditInfo(AuditInfo.EMPTY).build();
}

/**
Expand All @@ -105,7 +146,10 @@ public JdbcSchema createSchema(
*/
@Override
public JdbcSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
throw new UnsupportedOperationException();
if (!jdbcDatabaseOperations.exist(ident.name())) {
throw new NoSuchSchemaException(ident.name());
}
return new JdbcSchema.Builder().withName(ident.name()).withAuditInfo(AuditInfo.EMPTY).build();
}

/**
Expand All @@ -119,7 +163,7 @@ public JdbcSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException
@Override
public JdbcSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
throws NoSuchSchemaException {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("jdbc-catalog does not support alter the schema");
}

/**
Expand All @@ -132,7 +176,8 @@ public JdbcSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
*/
@Override
public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException {
throw new UnsupportedOperationException();
jdbcDatabaseOperations.delete(ident.name());
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,88 @@
*/
package com.datastrato.gravitino.catalog.jdbc;

import static com.datastrato.gravitino.catalog.PropertyEntry.booleanReservedPropertyEntry;
import static com.datastrato.gravitino.catalog.PropertyEntry.integerPropertyEntry;
import static com.datastrato.gravitino.catalog.PropertyEntry.stringReservedPropertyEntry;

import com.datastrato.gravitino.catalog.BaseCatalogPropertiesMetadata;
import com.datastrato.gravitino.catalog.PropertyEntry;
import java.util.Collections;
import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;

public class JdbcCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata {
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

public static final Map<String, String> GRAVITINO_CONFIG_TO_JDBC =
ImmutableMap.<String, String>builder()
.put(JdbcConfig.JDBC_URL.getKey(), JdbcConfig.JDBC_URL.getKey())
.put(JdbcConfig.DRIVER.getKey(), JdbcConfig.DRIVER.getKey())
.put(JdbcConfig.USERNAME.getKey(), JdbcConfig.USERNAME.getKey())
.put(JdbcConfig.PASSWORD.getKey(), JdbcConfig.PASSWORD.getKey())
.put(JdbcConfig.POOL_TYPE.getKey(), JdbcConfig.POOL_TYPE.getKey())
.put(JdbcConfig.POOL_PREFIX_NAME.getKey(), JdbcConfig.POOL_PREFIX_NAME.getKey())
.put(JdbcConfig.POOL_TEST_QUERY.getKey(), JdbcConfig.POOL_TEST_QUERY.getKey())
.put(JdbcConfig.POOL_MIN_SIZE.getKey(), JdbcConfig.POOL_MIN_SIZE.getKey())
.put(JdbcConfig.POOL_MAX_SIZE.getKey(), JdbcConfig.POOL_MAX_SIZE.getKey())
.put(JdbcConfig.POOL_IDLE_TEST.getKey(), JdbcConfig.POOL_IDLE_TEST.getKey())
.build();;

static {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringReservedPropertyEntry(
JdbcConfig.JDBC_URL.getKey(), JdbcConfig.JDBC_URL.getDoc(), true),
stringReservedPropertyEntry(
JdbcConfig.USERNAME.getKey(), JdbcConfig.USERNAME.getDoc(), true),
stringReservedPropertyEntry(
JdbcConfig.PASSWORD.getKey(), JdbcConfig.PASSWORD.getDoc(), true),
stringReservedPropertyEntry(
JdbcConfig.POOL_TYPE.getKey(), JdbcConfig.POOL_TYPE.getDoc(), true),
stringReservedPropertyEntry(
JdbcConfig.POOL_PREFIX_NAME.getKey(), JdbcConfig.POOL_PREFIX_NAME.getDoc(), true),
stringReservedPropertyEntry(
JdbcConfig.POOL_TEST_QUERY.getKey(), JdbcConfig.POOL_TEST_QUERY.getDoc(), true),
integerPropertyEntry(
JdbcConfig.POOL_MIN_SIZE.getKey(),
JdbcConfig.POOL_MIN_SIZE.getDoc(),
false,
false,
JdbcConfig.POOL_MIN_SIZE.getDefaultValue(),
true,
true),
integerPropertyEntry(
JdbcConfig.POOL_MAX_SIZE.getKey(),
JdbcConfig.POOL_MAX_SIZE.getDoc(),
false,
false,
JdbcConfig.POOL_MAX_SIZE.getDefaultValue(),
true,
true),
booleanReservedPropertyEntry(
JdbcConfig.POOL_IDLE_TEST.getKey(),
JdbcConfig.POOL_IDLE_TEST.getDoc(),
JdbcConfig.POOL_IDLE_TEST.getDefaultValue(),
true));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return Collections.emptyMap();
return PROPERTIES_METADATA;
}

public Map<String, String> transformProperties(Map<String, String> conf) {
Map<String, String> gravitinoConfig = Maps.newHashMap();
conf.forEach(
(key, value) -> {
if (GRAVITINO_CONFIG_TO_JDBC.containsKey(key)) {
gravitinoConfig.put(GRAVITINO_CONFIG_TO_JDBC.get(key), value);
}
});
return gravitinoConfig;
}
}
Loading

0 comments on commit 22656a0

Please sign in to comment.