Skip to content

Commit

Permalink
[#575] feat(jdbc): Support for DataSource and schema operations in JD…
Browse files Browse the repository at this point in the history
…BC catalog. (#703)

### What changes were proposed in this pull request?

We need to add management functionalities for data sources and
operations on schemas in jdbc-common.

### Why are the changes needed?

Fix: #575

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

---------

Co-authored-by: Clearvive <[email protected]>
  • Loading branch information
Clearvive and Clearvive authored Nov 20, 2023
1 parent d3d5aeb commit c4177bb
Show file tree
Hide file tree
Showing 19 changed files with 778 additions and 11 deletions.
8 changes: 7 additions & 1 deletion catalogs/catalog-jdbc-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
implementation(libs.bundles.log4j)
implementation(libs.commons.lang3)
implementation(libs.commons.collections4)
implementation(libs.commons.dbcp2)
implementation(libs.substrait.java.core) {
exclude("com.fasterxml.jackson.core")
exclude("com.fasterxml.jackson.datatype")
Expand All @@ -32,6 +33,11 @@ dependencies {
exclude("org.slf4j")
}

testImplementation(libs.commons.io)
testImplementation(libs.sqlite.jdbc)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testRuntimeOnly(libs.junit.jupiter.engine)
compileOnly(libs.lombok)
annotationProcessor(libs.lombok)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import com.datastrato.gravitino.catalog.BaseCatalog;
import com.datastrato.gravitino.catalog.CatalogOperations;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
import com.datastrato.gravitino.rel.SupportsSchemas;
import com.datastrato.gravitino.rel.TableCatalog;
import java.util.Map;
Expand All @@ -21,7 +24,12 @@ public abstract class JdbcCatalog extends BaseCatalog<JdbcCatalog> {
*/
@Override
protected CatalogOperations newOps(Map<String, String> config) {
JdbcCatalogOperations ops = new JdbcCatalogOperations(entity());
JdbcCatalogOperations ops =
new JdbcCatalogOperations(
entity(),
createExceptionConverter(),
createJdbcTypeConverter(),
createJdbcDatabaseOperations());
ops.initialize(config);
return ops;
}
Expand All @@ -37,4 +45,17 @@ public SupportsSchemas asSchemas() {
public TableCatalog asTableCatalog() {
return (JdbcCatalogOperations) ops();
}

/** @return The {@link JdbcExceptionConverter} to be used by the catalog. */
protected JdbcExceptionConverter createExceptionConverter() {
return new JdbcExceptionConverter() {};
}

/** @return The {@link JdbcTypeConverter} to be used by the catalog. */
protected abstract JdbcTypeConverter createJdbcTypeConverter();

/**
* @return The {@link JdbcDatabaseOperations} to be used by the catalog to manage databases in the
*/
protected abstract JdbcDatabaseOperations createJdbcDatabaseOperations();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,25 @@
*/
package com.datastrato.gravitino.catalog.jdbc;

import static com.datastrato.gravitino.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.StringIdentifier;
import com.datastrato.gravitino.catalog.CatalogOperations;
import com.datastrato.gravitino.catalog.PropertiesMetadata;
import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
import com.datastrato.gravitino.catalog.jdbc.utils.DataSourceUtils;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.SchemaChange;
Expand All @@ -24,7 +33,15 @@
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.utils.MapUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,13 +58,31 @@ public class JdbcCatalogOperations implements CatalogOperations, SupportsSchemas

private final CatalogEntity entity;

private final JdbcExceptionConverter exceptionConverter;

private final JdbcTypeConverter jdbcTypeConverter;

private final JdbcDatabaseOperations jdbcDatabaseOperations;

private DataSource dataSource;

/**
* Constructs a new instance of JdbcCatalogOperations.
*
* @param entity The catalog entity associated with this operations instance.
* @param exceptionConverter The exception converter to be used by the operations.
* @param jdbcTypeConverter The type converter to be used by the operations.
* @param jdbcDatabaseOperations The database operations to be used by the operations.
*/
public JdbcCatalogOperations(CatalogEntity entity) {
public JdbcCatalogOperations(
CatalogEntity entity,
JdbcExceptionConverter exceptionConverter,
JdbcTypeConverter jdbcTypeConverter,
JdbcDatabaseOperations jdbcDatabaseOperations) {
this.entity = entity;
this.exceptionConverter = exceptionConverter;
this.jdbcTypeConverter = jdbcTypeConverter;
this.jdbcDatabaseOperations = jdbcDatabaseOperations;
}

/**
Expand All @@ -58,14 +93,22 @@ public JdbcCatalogOperations(CatalogEntity entity) {
*/
@Override
public void initialize(Map<String, String> conf) throws RuntimeException {
// Key format like gravitino.bypass.a.b
Map<String, String> resultConf =
Maps.newHashMap(MapUtils.getPrefixMap(conf, CATALOG_BYPASS_PREFIX));
JdbcConfig jdbcConfig = new JdbcConfig(resultConf);
this.dataSource = DataSourceUtils.createDataSource(jdbcConfig);
this.jdbcDatabaseOperations.initialize(dataSource, exceptionConverter);
this.jdbcCatalogPropertiesMetadata = new JdbcCatalogPropertiesMetadata();
this.jdbcTablePropertiesMetadata = new JdbcTablePropertiesMetadata();
this.jdbcSchemaPropertiesMetadata = new JdbcSchemaPropertiesMetadata();
}

/** Closes the Jdbc catalog and releases the associated client pool. */
@Override
public void close() {}
public void close() {
DataSourceUtils.closeDataSource(dataSource);
}

/**
* Lists the schemas under the given namespace.
Expand All @@ -76,7 +119,10 @@ public void close() {}
*/
@Override
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException {
throw new UnsupportedOperationException();
List<String> schemaNames = jdbcDatabaseOperations.list();
return schemaNames.stream()
.map(db -> NameIdentifier.of(namespace, db))
.toArray(NameIdentifier[]::new);
}

/**
Expand All @@ -93,7 +139,27 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc
public JdbcSchema createSchema(
NameIdentifier ident, String comment, Map<String, String> properties)
throws NoSuchCatalogException, SchemaAlreadyExistsException {
throw new UnsupportedOperationException();
StringIdentifier identifier =
Preconditions.checkNotNull(
StringIdentifier.fromProperties(properties),
"The gravitino id attribute does not exist in properties");
String notAllowedKey =
properties.keySet().stream()
.filter(s -> !StringUtils.equals(s, StringIdentifier.ID_KEY))
.collect(Collectors.joining(","));
if (StringUtils.isNotEmpty(notAllowedKey)) {
LOG.warn("The properties [{}] are not allowed to be set in the jdbc schema", notAllowedKey);
}
HashMap<String, String> resultProperties = Maps.newHashMap(properties);
resultProperties.remove(StringIdentifier.ID_KEY);
jdbcDatabaseOperations.create(
ident.name(), StringIdentifier.addToComment(identifier, comment), resultProperties);
return new JdbcSchema.Builder()
.withName(ident.name())
.withProperties(resultProperties)
.withComment(comment)
.withAuditInfo(AuditInfo.EMPTY)
.build();
}

/**
Expand All @@ -105,7 +171,23 @@ public JdbcSchema createSchema(
*/
@Override
public JdbcSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
throw new UnsupportedOperationException();
JdbcSchema load = jdbcDatabaseOperations.load(ident.name());
String comment = load.comment();
StringIdentifier id = StringIdentifier.fromComment(comment);
if (id == null) {
LOG.warn("The comment {} does not contain gravitino id attribute", comment);
return load;
} else {
Map<String, String> properties =
load.properties() == null ? Maps.newHashMap() : Maps.newHashMap(load.properties());
StringIdentifier.addToProperties(id, properties);
return new JdbcSchema.Builder()
.withAuditInfo(load.auditInfo())
.withName(load.name())
.withComment(load.comment())
.withProperties(properties)
.build();
}
}

/**
Expand All @@ -119,7 +201,7 @@ public JdbcSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException
@Override
public JdbcSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
throws NoSuchSchemaException {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("jdbc-catalog does not support alter the schema");
}

/**
Expand All @@ -132,7 +214,8 @@ public JdbcSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
*/
@Override
public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException {
throw new UnsupportedOperationException();
jdbcDatabaseOperations.delete(ident.name(), cascade);
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,50 @@
*/
package com.datastrato.gravitino.catalog.jdbc;

import static com.datastrato.gravitino.catalog.PropertyEntry.integerPropertyEntry;
import static com.datastrato.gravitino.catalog.PropertyEntry.stringReservedPropertyEntry;

import com.datastrato.gravitino.catalog.BaseCatalogPropertiesMetadata;
import com.datastrato.gravitino.catalog.PropertyEntry;
import java.util.Collections;
import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;

public class JdbcCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata {
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

static {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringReservedPropertyEntry(
JdbcConfig.JDBC_URL.getKey(), JdbcConfig.JDBC_URL.getDoc(), true),
stringReservedPropertyEntry(
JdbcConfig.USERNAME.getKey(), JdbcConfig.USERNAME.getDoc(), true),
stringReservedPropertyEntry(
JdbcConfig.PASSWORD.getKey(), JdbcConfig.PASSWORD.getDoc(), true),
integerPropertyEntry(
JdbcConfig.POOL_MIN_SIZE.getKey(),
JdbcConfig.POOL_MIN_SIZE.getDoc(),
false,
false,
JdbcConfig.POOL_MIN_SIZE.getDefaultValue(),
true,
true),
integerPropertyEntry(
JdbcConfig.POOL_MAX_SIZE.getKey(),
JdbcConfig.POOL_MAX_SIZE.getDoc(),
false,
false,
JdbcConfig.POOL_MAX_SIZE.getDefaultValue(),
true,
true));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return Collections.emptyMap();
return PROPERTIES_METADATA;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog.jdbc.config;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.config.ConfigBuilder;
import com.datastrato.gravitino.config.ConfigEntry;
import java.util.Map;

public class JdbcConfig extends Config {

public static final ConfigEntry<String> JDBC_URL =
new ConfigBuilder("jdbc-url")
.doc("The url of the Jdbc connection")
.version("0.3.0")
.stringConf()
.createWithDefault(null);

public static final ConfigEntry<String> USERNAME =
new ConfigBuilder("jdbc-user")
.doc("The username of the Jdbc connection")
.version("0.3.0")
.stringConf()
.createWithDefault(null);

public static final ConfigEntry<String> PASSWORD =
new ConfigBuilder("jdbc-password")
.doc("The password of the Jdbc connection")
.version("0.3.0")
.stringConf()
.createWithDefault(null);

public static final ConfigEntry<Integer> POOL_MIN_SIZE =
new ConfigBuilder("jdbc.pool.min-size")
.doc("The minimum number of connections in the pool")
.version("0.3.0")
.intConf()
.createWithDefault(2);

public static final ConfigEntry<Integer> POOL_MAX_SIZE =
new ConfigBuilder("jdbc.pool.max-size")
.doc("The maximum number of connections in the pool")
.version("0.3.0")
.intConf()
.createWithDefault(10);

public String getJdbcUrl() {
return get(JDBC_URL);
}

public String getUsername() {
return get(USERNAME);
}

public String getPassword() {
return get(PASSWORD);
}

public int getPoolMinSize() {
return get(POOL_MIN_SIZE);
}

public int getPoolMaxSize() {
return get(POOL_MAX_SIZE);
}

public JdbcConfig(Map<String, String> properties) {
super(false);
loadFromMap(properties, k -> true);
assert null != getJdbcUrl();
}
}
Loading

0 comments on commit c4177bb

Please sign in to comment.