From ffb80e76e916046ec9e1cb7d6f84cc02c522887c Mon Sep 17 00:00:00 2001 From: mchades Date: Fri, 15 Mar 2024 13:46:36 +0800 Subject: [PATCH] [#2467] feat(kafka-catalog): Add schema operations support for Kafka catalog (#2521) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? This PR tracks the work of adding schema support for the Kafka catalog. Unlike the other catalog, the Kafka catalog manages schema in the Gravitino's store, and will automatically create a “default” schema to contain all topics in the cluster. ### Why are the changes needed? Fix: #2467 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? UTs added --- .../catalog-messaging-kafka/build.gradle.kts | 8 + .../catalog/kafka/KafkaCatalogOperations.java | 125 +++++++++- .../gravitino/catalog/kafka/KafkaSchema.java | 27 +++ .../kafka/TestKafkaCatalogOperations.java | 222 ++++++++++++++++++ .../gravitino/meta/SchemaEntity.java | 6 + 5 files changed, 382 insertions(+), 6 deletions(-) create mode 100644 catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaSchema.java create mode 100644 catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java diff --git a/catalogs/catalog-messaging-kafka/build.gradle.kts b/catalogs/catalog-messaging-kafka/build.gradle.kts index 2cc2ea2478a..a992063ee09 100644 --- a/catalogs/catalog-messaging-kafka/build.gradle.kts +++ b/catalogs/catalog-messaging-kafka/build.gradle.kts @@ -14,6 +14,14 @@ dependencies { implementation(project(":api")) implementation(project(":core")) implementation(project(":common")) + + implementation(libs.guava) + + testImplementation(libs.junit.jupiter.api) + testImplementation(libs.mockito.core) + testImplementation(libs.commons.io) + + testRuntimeOnly(libs.junit.jupiter.engine) } tasks { diff --git a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java index 50bd60ca93b..3f8cedf11cc 100644 --- a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java +++ b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java @@ -4,14 +4,21 @@ */ package com.datastrato.gravitino.catalog.kafka; +import static com.datastrato.gravitino.StringIdentifier.ID_KEY; +import static com.datastrato.gravitino.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX; +import static com.datastrato.gravitino.catalog.kafka.KafkaCatalogPropertiesMetadata.BOOTSTRAP_SERVERS; + import com.datastrato.gravitino.Entity; import com.datastrato.gravitino.EntityStore; import com.datastrato.gravitino.GravitinoEnv; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.StringIdentifier; +import com.datastrato.gravitino.catalog.BasePropertiesMetadata; import com.datastrato.gravitino.catalog.CatalogOperations; import com.datastrato.gravitino.catalog.PropertiesMetadata; import com.datastrato.gravitino.exceptions.NoSuchCatalogException; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.NoSuchTopicException; import com.datastrato.gravitino.exceptions.NonEmptySchemaException; @@ -21,14 +28,22 @@ import com.datastrato.gravitino.messaging.Topic; import com.datastrato.gravitino.messaging.TopicCatalog; import com.datastrato.gravitino.messaging.TopicChange; +import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.rel.Schema; import com.datastrato.gravitino.rel.SchemaChange; import com.datastrato.gravitino.rel.SupportsSchemas; +import com.datastrato.gravitino.storage.IdGenerator; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; public class KafkaCatalogOperations implements CatalogOperations, SupportsSchemas, TopicCatalog { @@ -40,16 +55,48 @@ public class KafkaCatalogOperations implements CatalogOperations, SupportsSchema new KafkaTopicPropertiesMetadata(); private final EntityStore store; + private final IdGenerator idGenerator; + private final String DEFAULT_SCHEMA_NAME = "default"; + @VisibleForTesting NameIdentifier defaultSchemaIdent; + @VisibleForTesting Properties adminClientConfig; private CatalogEntity entity; + @VisibleForTesting + KafkaCatalogOperations(EntityStore store, IdGenerator idGenerator) { + this.store = store; + this.idGenerator = idGenerator; + } + public KafkaCatalogOperations() { - this.store = GravitinoEnv.getInstance().entityStore(); + this(GravitinoEnv.getInstance().entityStore(), GravitinoEnv.getInstance().idGenerator()); } @Override public void initialize(Map config, CatalogEntity entity) throws RuntimeException { + Preconditions.checkArgument( + config.containsKey(BOOTSTRAP_SERVERS), "Missing configuration: %s", BOOTSTRAP_SERVERS); + Preconditions.checkArgument(config.containsKey(ID_KEY), "Missing configuration: %s", ID_KEY); + this.entity = entity; - // TODO: Implement Kafka catalog initialization, such as creating a default schema. + this.defaultSchemaIdent = + NameIdentifier.of(entity.namespace().level(0), entity.name(), DEFAULT_SCHEMA_NAME); + + // Initialize the Kafka AdminClient configuration + adminClientConfig = new Properties(); + + Map bypassConfigs = + config.entrySet().stream() + .filter(e -> e.getKey().startsWith(CATALOG_BYPASS_PREFIX)) + .collect( + Collectors.toMap( + e -> e.getKey().substring(CATALOG_BYPASS_PREFIX.length()), + Map.Entry::getValue)); + adminClientConfig.putAll(bypassConfigs); + adminClientConfig.put(BOOTSTRAP_SERVERS, config.get(BOOTSTRAP_SERVERS)); + // use gravitino catalog id as the admin client id + adminClientConfig.put("client.id", config.get(ID_KEY)); + + createDefaultSchema(); } @Override @@ -96,23 +143,50 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc @Override public Schema createSchema(NameIdentifier ident, String comment, Map properties) throws NoSuchCatalogException, SchemaAlreadyExistsException { - throw new UnsupportedOperationException(); + // It appears that the "default" schema suffices, so there is no need to support creating schema + // currently + throw new UnsupportedOperationException( + "Kafka catalog does not support schema creation " + + "because the \"default\" schema already includes all topics"); } @Override public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException { - throw new UnsupportedOperationException(); + try { + SchemaEntity schema = store.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class); + + return KafkaSchema.builder() + .withName(schema.name()) + .withComment(schema.comment()) + .withProperties(schema.properties()) + .withAuditInfo(schema.auditInfo()) + .build(); + + } catch (NoSuchEntityException exception) { + throw new NoSuchSchemaException(exception, "Schema %s does not exist", ident); + } catch (IOException ioe) { + throw new RuntimeException("Failed to load schema " + ident, ioe); + } } @Override public Schema alterSchema(NameIdentifier ident, SchemaChange... changes) throws NoSuchSchemaException { - throw new UnsupportedOperationException(); + if (ident.equals(defaultSchemaIdent)) { + throw new IllegalArgumentException("Cannot alter the default schema"); + } + + // TODO: Implement altering schema after adding support for schema creation + throw new UnsupportedOperationException("Kafka catalog does not support schema alteration"); } @Override public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { - throw new UnsupportedOperationException(); + if (ident.equals(defaultSchemaIdent)) { + throw new IllegalArgumentException("Cannot drop the default schema"); + } + // TODO: Implement dropping schema after adding support for schema creation + throw new UnsupportedOperationException("Kafka catalog does not support schema deletion"); } @Override @@ -142,4 +216,43 @@ public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperatio public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException { throw new UnsupportedOperationException("Kafka catalog does not support table operations"); } + + private void createDefaultSchema() { + // If the default schema already exists, do nothing + try { + if (store.exists(defaultSchemaIdent, Entity.EntityType.SCHEMA)) { + return; + } + } catch (IOException e) { + throw new RuntimeException("Failed to check if schema " + defaultSchemaIdent + " exists", e); + } + + // Create the default schema + long uid = idGenerator.nextId(); + ImmutableMap properties = + ImmutableMap.builder() + .put(ID_KEY, StringIdentifier.fromId(uid).toString()) + .put(BasePropertiesMetadata.GRAVITINO_MANAGED_ENTITY, Boolean.TRUE.toString()) + .build(); + + SchemaEntity defaultSchema = + SchemaEntity.builder() + .withName(defaultSchemaIdent.name()) + .withId(uid) + .withNamespace(Namespace.ofSchema(entity.namespace().level(0), entity.name())) + .withComment("The default schema of Kafka catalog including all topics") + .withProperties(properties) + .withAuditInfo( + AuditInfo.builder() + .withCreator(entity.auditInfo().creator()) + .withCreateTime(Instant.now()) + .build()) + .build(); + + try { + store.put(defaultSchema, true /* overwrite */); + } catch (IOException ioe) { + throw new RuntimeException("Failed to create default schema for Kafka catalog", ioe); + } + } } diff --git a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaSchema.java b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaSchema.java new file mode 100644 index 00000000000..a81814e50bc --- /dev/null +++ b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaSchema.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.kafka; + +import com.datastrato.gravitino.catalog.rel.BaseSchema; + +public class KafkaSchema extends BaseSchema { + + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends BaseSchemaBuilder { + + @Override + protected KafkaSchema internalBuild() { + KafkaSchema schema = new KafkaSchema(); + schema.name = name; + schema.comment = comment; + schema.properties = properties; + schema.auditInfo = auditInfo; + return schema; + } + } +} diff --git a/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java new file mode 100644 index 00000000000..062fb71c9ab --- /dev/null +++ b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java @@ -0,0 +1,222 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.kafka; + +import static com.datastrato.gravitino.Catalog.Type.MESSAGING; +import static com.datastrato.gravitino.Configs.DEFAULT_ENTITY_KV_STORE; +import static com.datastrato.gravitino.Configs.ENTITY_KV_STORE; +import static com.datastrato.gravitino.Configs.ENTITY_STORE; +import static com.datastrato.gravitino.Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH; +import static com.datastrato.gravitino.Configs.KV_DELETE_AFTER_TIME; +import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME; +import static com.datastrato.gravitino.StringIdentifier.ID_KEY; +import static com.datastrato.gravitino.catalog.kafka.KafkaCatalogPropertiesMetadata.BOOTSTRAP_SERVERS; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Configs; +import com.datastrato.gravitino.EntitySerDeFactory; +import com.datastrato.gravitino.EntityStore; +import com.datastrato.gravitino.EntityStoreFactory; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.catalog.BasePropertiesMetadata; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.CatalogEntity; +import com.datastrato.gravitino.rel.Schema; +import com.datastrato.gravitino.rel.SchemaChange; +import com.datastrato.gravitino.storage.IdGenerator; +import com.datastrato.gravitino.storage.RandomIdGenerator; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import java.util.UUID; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestKafkaCatalogOperations { + + private static final String ROCKS_DB_STORE_PATH = + "/tmp/gravitino_test_entityStore_" + UUID.randomUUID().toString().replace("-", ""); + private static final String METALAKE_NAME = "metalake"; + private static final String CATALOG_NAME = "test_kafka_catalog"; + private static final Map MOCK_CATALOG_PROPERTIES = + ImmutableMap.of( + BOOTSTRAP_SERVERS, "localhost:9092", ID_KEY, "gravitino.v1.uid33220758755757000"); + private static EntityStore store; + private static IdGenerator idGenerator; + private static CatalogEntity kafkaCatalogEntity; + private static KafkaCatalogOperations kafkaCatalogOperations; + + @BeforeAll + public static void setUp() { + Config config = Mockito.mock(Config.class); + Mockito.when(config.get(ENTITY_STORE)).thenReturn("kv"); + Mockito.when(config.get(ENTITY_KV_STORE)).thenReturn(DEFAULT_ENTITY_KV_STORE); + Mockito.when(config.get(Configs.ENTITY_SERDE)).thenReturn("proto"); + Mockito.when(config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH)).thenReturn(ROCKS_DB_STORE_PATH); + + Assertions.assertEquals(ROCKS_DB_STORE_PATH, config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH)); + Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L); + Mockito.when(config.get(KV_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L); + + store = EntityStoreFactory.createEntityStore(config); + store.initialize(config); + store.setSerDe(EntitySerDeFactory.createEntitySerDe(config)); + idGenerator = new RandomIdGenerator(); + kafkaCatalogEntity = + CatalogEntity.builder() + .withId(1L) + .withName(CATALOG_NAME) + .withNamespace(Namespace.of(METALAKE_NAME)) + .withType(MESSAGING) + .withProvider("kafka") + .withAuditInfo( + AuditInfo.builder() + .withCreator("testKafkaUser") + .withCreateTime(Instant.now()) + .build()) + .build(); + + kafkaCatalogOperations = new KafkaCatalogOperations(store, idGenerator); + kafkaCatalogOperations.initialize(MOCK_CATALOG_PROPERTIES, kafkaCatalogEntity); + } + + @AfterAll + public static void tearDown() throws IOException { + store.close(); + FileUtils.deleteDirectory(FileUtils.getFile(ROCKS_DB_STORE_PATH)); + } + + @Test + public void testKafkaCatalogConfiguration() { + String catalogName = "test_kafka_catalog_configuration"; + CatalogEntity catalogEntity = + CatalogEntity.builder() + .withId(2L) + .withName(catalogName) + .withNamespace(Namespace.of(METALAKE_NAME)) + .withType(MESSAGING) + .withProvider("kafka") + .withAuditInfo( + AuditInfo.builder() + .withCreator("testKafkaUser") + .withCreateTime(Instant.now()) + .build()) + .build(); + KafkaCatalogOperations ops = new KafkaCatalogOperations(store, idGenerator); + Assertions.assertNull(ops.adminClientConfig); + + ops.initialize(MOCK_CATALOG_PROPERTIES, catalogEntity); + Assertions.assertNotNull(ops.adminClientConfig); + Assertions.assertEquals(2, ops.adminClientConfig.size()); + Assertions.assertEquals( + MOCK_CATALOG_PROPERTIES.get(BOOTSTRAP_SERVERS), + ops.adminClientConfig.get(BOOTSTRAP_SERVERS)); + Assertions.assertEquals( + MOCK_CATALOG_PROPERTIES.get(ID_KEY), ops.adminClientConfig.get("client.id")); + } + + @Test + public void testInitialization() { + String catalogName = "test_kafka_catalog_initialization"; + CatalogEntity catalogEntity = + CatalogEntity.builder() + .withId(2L) + .withName(catalogName) + .withNamespace(Namespace.of(METALAKE_NAME)) + .withType(MESSAGING) + .withProvider("kafka") + .withAuditInfo( + AuditInfo.builder() + .withCreator("testKafkaUser") + .withCreateTime(Instant.now()) + .build()) + .build(); + KafkaCatalogOperations ops = new KafkaCatalogOperations(store, idGenerator); + ops.initialize(MOCK_CATALOG_PROPERTIES, catalogEntity); + + Assertions.assertNotNull(ops.defaultSchemaIdent); + Assertions.assertEquals("default", ops.defaultSchemaIdent.name()); + Assertions.assertEquals( + METALAKE_NAME + "." + catalogName, ops.defaultSchemaIdent.namespace().toString()); + + Assertions.assertTrue(ops.schemaExists(ops.defaultSchemaIdent)); + Schema schema = ops.loadSchema(ops.defaultSchemaIdent); + Assertions.assertEquals("default", schema.name()); + } + + @Test + public void testCreateSchema() { + NameIdentifier ident = NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "test_schema"); + + UnsupportedOperationException exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> kafkaCatalogOperations.createSchema(ident, null, null)); + Assertions.assertEquals( + "Kafka catalog does not support schema creation because the \"default\" schema already includes all topics", + exception.getMessage()); + } + + @Test + public void testLoadSchema() { + NameIdentifier ident = NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "default"); + Schema schema = kafkaCatalogOperations.loadSchema(ident); + + Assertions.assertEquals("default", schema.name()); + Assertions.assertEquals( + "The default schema of Kafka catalog including all topics", schema.comment()); + Assertions.assertEquals(2, schema.properties().size()); + Assertions.assertTrue( + schema.properties().containsKey(BasePropertiesMetadata.GRAVITINO_MANAGED_ENTITY)); + Assertions.assertEquals("true", schema.properties().get("gravitino.managed.entity")); + } + + @Test + public void testAlterSchema() { + Exception exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + kafkaCatalogOperations.alterSchema( + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "default"), + SchemaChange.removeProperty("key1"))); + Assertions.assertEquals("Cannot alter the default schema", exception.getMessage()); + + exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + kafkaCatalogOperations.alterSchema( + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "test_schema"), + SchemaChange.removeProperty("key1"))); + Assertions.assertEquals( + "Kafka catalog does not support schema alteration", exception.getMessage()); + } + + @Test + public void testDropSchema() { + Exception exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + kafkaCatalogOperations.dropSchema( + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "default"), true)); + Assertions.assertEquals("Cannot drop the default schema", exception.getMessage()); + + NameIdentifier ident = NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "test_schema"); + exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> kafkaCatalogOperations.dropSchema(ident, true)); + Assertions.assertEquals( + "Kafka catalog does not support schema deletion", exception.getMessage()); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/meta/SchemaEntity.java b/core/src/main/java/com/datastrato/gravitino/meta/SchemaEntity.java index 55f73835933..274a7db4317 100644 --- a/core/src/main/java/com/datastrato/gravitino/meta/SchemaEntity.java +++ b/core/src/main/java/com/datastrato/gravitino/meta/SchemaEntity.java @@ -40,6 +40,12 @@ public class SchemaEntity implements Entity, Auditable, HasIdentifier { private Map properties; + private SchemaEntity() {} + + public static Builder builder() { + return new Builder(); + } + /** * Returns an unmodifiable map of the fields and their corresponding values for this schema. *