diff --git a/catalogs/catalog-messaging-kafka/build.gradle.kts b/catalogs/catalog-messaging-kafka/build.gradle.kts index 2cc2ea2478a..a992063ee09 100644 --- a/catalogs/catalog-messaging-kafka/build.gradle.kts +++ b/catalogs/catalog-messaging-kafka/build.gradle.kts @@ -14,6 +14,14 @@ dependencies { implementation(project(":api")) implementation(project(":core")) implementation(project(":common")) + + implementation(libs.guava) + + testImplementation(libs.junit.jupiter.api) + testImplementation(libs.mockito.core) + testImplementation(libs.commons.io) + + testRuntimeOnly(libs.junit.jupiter.engine) } tasks { diff --git a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java index 50bd60ca93b..3f8cedf11cc 100644 --- a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java +++ b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java @@ -4,14 +4,21 @@ */ package com.datastrato.gravitino.catalog.kafka; +import static com.datastrato.gravitino.StringIdentifier.ID_KEY; +import static com.datastrato.gravitino.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX; +import static com.datastrato.gravitino.catalog.kafka.KafkaCatalogPropertiesMetadata.BOOTSTRAP_SERVERS; + import com.datastrato.gravitino.Entity; import com.datastrato.gravitino.EntityStore; import com.datastrato.gravitino.GravitinoEnv; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.StringIdentifier; +import com.datastrato.gravitino.catalog.BasePropertiesMetadata; import com.datastrato.gravitino.catalog.CatalogOperations; import com.datastrato.gravitino.catalog.PropertiesMetadata; import com.datastrato.gravitino.exceptions.NoSuchCatalogException; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.NoSuchTopicException; import com.datastrato.gravitino.exceptions.NonEmptySchemaException; @@ -21,14 +28,22 @@ import com.datastrato.gravitino.messaging.Topic; import com.datastrato.gravitino.messaging.TopicCatalog; import com.datastrato.gravitino.messaging.TopicChange; +import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.rel.Schema; import com.datastrato.gravitino.rel.SchemaChange; import com.datastrato.gravitino.rel.SupportsSchemas; +import com.datastrato.gravitino.storage.IdGenerator; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; public class KafkaCatalogOperations implements CatalogOperations, SupportsSchemas, TopicCatalog { @@ -40,16 +55,48 @@ public class KafkaCatalogOperations implements CatalogOperations, SupportsSchema new KafkaTopicPropertiesMetadata(); private final EntityStore store; + private final IdGenerator idGenerator; + private final String DEFAULT_SCHEMA_NAME = "default"; + @VisibleForTesting NameIdentifier defaultSchemaIdent; + @VisibleForTesting Properties adminClientConfig; private CatalogEntity entity; + @VisibleForTesting + KafkaCatalogOperations(EntityStore store, IdGenerator idGenerator) { + this.store = store; + this.idGenerator = idGenerator; + } + public KafkaCatalogOperations() { - this.store = GravitinoEnv.getInstance().entityStore(); + this(GravitinoEnv.getInstance().entityStore(), GravitinoEnv.getInstance().idGenerator()); } @Override public void initialize(Map config, CatalogEntity entity) throws RuntimeException { + Preconditions.checkArgument( + config.containsKey(BOOTSTRAP_SERVERS), "Missing configuration: %s", BOOTSTRAP_SERVERS); + Preconditions.checkArgument(config.containsKey(ID_KEY), "Missing configuration: %s", ID_KEY); + this.entity = entity; - // TODO: Implement Kafka catalog initialization, such as creating a default schema. + this.defaultSchemaIdent = + NameIdentifier.of(entity.namespace().level(0), entity.name(), DEFAULT_SCHEMA_NAME); + + // Initialize the Kafka AdminClient configuration + adminClientConfig = new Properties(); + + Map bypassConfigs = + config.entrySet().stream() + .filter(e -> e.getKey().startsWith(CATALOG_BYPASS_PREFIX)) + .collect( + Collectors.toMap( + e -> e.getKey().substring(CATALOG_BYPASS_PREFIX.length()), + Map.Entry::getValue)); + adminClientConfig.putAll(bypassConfigs); + adminClientConfig.put(BOOTSTRAP_SERVERS, config.get(BOOTSTRAP_SERVERS)); + // use gravitino catalog id as the admin client id + adminClientConfig.put("client.id", config.get(ID_KEY)); + + createDefaultSchema(); } @Override @@ -96,23 +143,50 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc @Override public Schema createSchema(NameIdentifier ident, String comment, Map properties) throws NoSuchCatalogException, SchemaAlreadyExistsException { - throw new UnsupportedOperationException(); + // It appears that the "default" schema suffices, so there is no need to support creating schema + // currently + throw new UnsupportedOperationException( + "Kafka catalog does not support schema creation " + + "because the \"default\" schema already includes all topics"); } @Override public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException { - throw new UnsupportedOperationException(); + try { + SchemaEntity schema = store.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class); + + return KafkaSchema.builder() + .withName(schema.name()) + .withComment(schema.comment()) + .withProperties(schema.properties()) + .withAuditInfo(schema.auditInfo()) + .build(); + + } catch (NoSuchEntityException exception) { + throw new NoSuchSchemaException(exception, "Schema %s does not exist", ident); + } catch (IOException ioe) { + throw new RuntimeException("Failed to load schema " + ident, ioe); + } } @Override public Schema alterSchema(NameIdentifier ident, SchemaChange... changes) throws NoSuchSchemaException { - throw new UnsupportedOperationException(); + if (ident.equals(defaultSchemaIdent)) { + throw new IllegalArgumentException("Cannot alter the default schema"); + } + + // TODO: Implement altering schema after adding support for schema creation + throw new UnsupportedOperationException("Kafka catalog does not support schema alteration"); } @Override public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { - throw new UnsupportedOperationException(); + if (ident.equals(defaultSchemaIdent)) { + throw new IllegalArgumentException("Cannot drop the default schema"); + } + // TODO: Implement dropping schema after adding support for schema creation + throw new UnsupportedOperationException("Kafka catalog does not support schema deletion"); } @Override @@ -142,4 +216,43 @@ public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperatio public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException { throw new UnsupportedOperationException("Kafka catalog does not support table operations"); } + + private void createDefaultSchema() { + // If the default schema already exists, do nothing + try { + if (store.exists(defaultSchemaIdent, Entity.EntityType.SCHEMA)) { + return; + } + } catch (IOException e) { + throw new RuntimeException("Failed to check if schema " + defaultSchemaIdent + " exists", e); + } + + // Create the default schema + long uid = idGenerator.nextId(); + ImmutableMap properties = + ImmutableMap.builder() + .put(ID_KEY, StringIdentifier.fromId(uid).toString()) + .put(BasePropertiesMetadata.GRAVITINO_MANAGED_ENTITY, Boolean.TRUE.toString()) + .build(); + + SchemaEntity defaultSchema = + SchemaEntity.builder() + .withName(defaultSchemaIdent.name()) + .withId(uid) + .withNamespace(Namespace.ofSchema(entity.namespace().level(0), entity.name())) + .withComment("The default schema of Kafka catalog including all topics") + .withProperties(properties) + .withAuditInfo( + AuditInfo.builder() + .withCreator(entity.auditInfo().creator()) + .withCreateTime(Instant.now()) + .build()) + .build(); + + try { + store.put(defaultSchema, true /* overwrite */); + } catch (IOException ioe) { + throw new RuntimeException("Failed to create default schema for Kafka catalog", ioe); + } + } } diff --git a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaSchema.java b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaSchema.java new file mode 100644 index 00000000000..a81814e50bc --- /dev/null +++ b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaSchema.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.kafka; + +import com.datastrato.gravitino.catalog.rel.BaseSchema; + +public class KafkaSchema extends BaseSchema { + + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends BaseSchemaBuilder { + + @Override + protected KafkaSchema internalBuild() { + KafkaSchema schema = new KafkaSchema(); + schema.name = name; + schema.comment = comment; + schema.properties = properties; + schema.auditInfo = auditInfo; + return schema; + } + } +} diff --git a/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java new file mode 100644 index 00000000000..062fb71c9ab --- /dev/null +++ b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java @@ -0,0 +1,222 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.kafka; + +import static com.datastrato.gravitino.Catalog.Type.MESSAGING; +import static com.datastrato.gravitino.Configs.DEFAULT_ENTITY_KV_STORE; +import static com.datastrato.gravitino.Configs.ENTITY_KV_STORE; +import static com.datastrato.gravitino.Configs.ENTITY_STORE; +import static com.datastrato.gravitino.Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH; +import static com.datastrato.gravitino.Configs.KV_DELETE_AFTER_TIME; +import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME; +import static com.datastrato.gravitino.StringIdentifier.ID_KEY; +import static com.datastrato.gravitino.catalog.kafka.KafkaCatalogPropertiesMetadata.BOOTSTRAP_SERVERS; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Configs; +import com.datastrato.gravitino.EntitySerDeFactory; +import com.datastrato.gravitino.EntityStore; +import com.datastrato.gravitino.EntityStoreFactory; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.catalog.BasePropertiesMetadata; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.CatalogEntity; +import com.datastrato.gravitino.rel.Schema; +import com.datastrato.gravitino.rel.SchemaChange; +import com.datastrato.gravitino.storage.IdGenerator; +import com.datastrato.gravitino.storage.RandomIdGenerator; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import java.util.UUID; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestKafkaCatalogOperations { + + private static final String ROCKS_DB_STORE_PATH = + "/tmp/gravitino_test_entityStore_" + UUID.randomUUID().toString().replace("-", ""); + private static final String METALAKE_NAME = "metalake"; + private static final String CATALOG_NAME = "test_kafka_catalog"; + private static final Map MOCK_CATALOG_PROPERTIES = + ImmutableMap.of( + BOOTSTRAP_SERVERS, "localhost:9092", ID_KEY, "gravitino.v1.uid33220758755757000"); + private static EntityStore store; + private static IdGenerator idGenerator; + private static CatalogEntity kafkaCatalogEntity; + private static KafkaCatalogOperations kafkaCatalogOperations; + + @BeforeAll + public static void setUp() { + Config config = Mockito.mock(Config.class); + Mockito.when(config.get(ENTITY_STORE)).thenReturn("kv"); + Mockito.when(config.get(ENTITY_KV_STORE)).thenReturn(DEFAULT_ENTITY_KV_STORE); + Mockito.when(config.get(Configs.ENTITY_SERDE)).thenReturn("proto"); + Mockito.when(config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH)).thenReturn(ROCKS_DB_STORE_PATH); + + Assertions.assertEquals(ROCKS_DB_STORE_PATH, config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH)); + Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L); + Mockito.when(config.get(KV_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L); + + store = EntityStoreFactory.createEntityStore(config); + store.initialize(config); + store.setSerDe(EntitySerDeFactory.createEntitySerDe(config)); + idGenerator = new RandomIdGenerator(); + kafkaCatalogEntity = + CatalogEntity.builder() + .withId(1L) + .withName(CATALOG_NAME) + .withNamespace(Namespace.of(METALAKE_NAME)) + .withType(MESSAGING) + .withProvider("kafka") + .withAuditInfo( + AuditInfo.builder() + .withCreator("testKafkaUser") + .withCreateTime(Instant.now()) + .build()) + .build(); + + kafkaCatalogOperations = new KafkaCatalogOperations(store, idGenerator); + kafkaCatalogOperations.initialize(MOCK_CATALOG_PROPERTIES, kafkaCatalogEntity); + } + + @AfterAll + public static void tearDown() throws IOException { + store.close(); + FileUtils.deleteDirectory(FileUtils.getFile(ROCKS_DB_STORE_PATH)); + } + + @Test + public void testKafkaCatalogConfiguration() { + String catalogName = "test_kafka_catalog_configuration"; + CatalogEntity catalogEntity = + CatalogEntity.builder() + .withId(2L) + .withName(catalogName) + .withNamespace(Namespace.of(METALAKE_NAME)) + .withType(MESSAGING) + .withProvider("kafka") + .withAuditInfo( + AuditInfo.builder() + .withCreator("testKafkaUser") + .withCreateTime(Instant.now()) + .build()) + .build(); + KafkaCatalogOperations ops = new KafkaCatalogOperations(store, idGenerator); + Assertions.assertNull(ops.adminClientConfig); + + ops.initialize(MOCK_CATALOG_PROPERTIES, catalogEntity); + Assertions.assertNotNull(ops.adminClientConfig); + Assertions.assertEquals(2, ops.adminClientConfig.size()); + Assertions.assertEquals( + MOCK_CATALOG_PROPERTIES.get(BOOTSTRAP_SERVERS), + ops.adminClientConfig.get(BOOTSTRAP_SERVERS)); + Assertions.assertEquals( + MOCK_CATALOG_PROPERTIES.get(ID_KEY), ops.adminClientConfig.get("client.id")); + } + + @Test + public void testInitialization() { + String catalogName = "test_kafka_catalog_initialization"; + CatalogEntity catalogEntity = + CatalogEntity.builder() + .withId(2L) + .withName(catalogName) + .withNamespace(Namespace.of(METALAKE_NAME)) + .withType(MESSAGING) + .withProvider("kafka") + .withAuditInfo( + AuditInfo.builder() + .withCreator("testKafkaUser") + .withCreateTime(Instant.now()) + .build()) + .build(); + KafkaCatalogOperations ops = new KafkaCatalogOperations(store, idGenerator); + ops.initialize(MOCK_CATALOG_PROPERTIES, catalogEntity); + + Assertions.assertNotNull(ops.defaultSchemaIdent); + Assertions.assertEquals("default", ops.defaultSchemaIdent.name()); + Assertions.assertEquals( + METALAKE_NAME + "." + catalogName, ops.defaultSchemaIdent.namespace().toString()); + + Assertions.assertTrue(ops.schemaExists(ops.defaultSchemaIdent)); + Schema schema = ops.loadSchema(ops.defaultSchemaIdent); + Assertions.assertEquals("default", schema.name()); + } + + @Test + public void testCreateSchema() { + NameIdentifier ident = NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "test_schema"); + + UnsupportedOperationException exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> kafkaCatalogOperations.createSchema(ident, null, null)); + Assertions.assertEquals( + "Kafka catalog does not support schema creation because the \"default\" schema already includes all topics", + exception.getMessage()); + } + + @Test + public void testLoadSchema() { + NameIdentifier ident = NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "default"); + Schema schema = kafkaCatalogOperations.loadSchema(ident); + + Assertions.assertEquals("default", schema.name()); + Assertions.assertEquals( + "The default schema of Kafka catalog including all topics", schema.comment()); + Assertions.assertEquals(2, schema.properties().size()); + Assertions.assertTrue( + schema.properties().containsKey(BasePropertiesMetadata.GRAVITINO_MANAGED_ENTITY)); + Assertions.assertEquals("true", schema.properties().get("gravitino.managed.entity")); + } + + @Test + public void testAlterSchema() { + Exception exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + kafkaCatalogOperations.alterSchema( + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "default"), + SchemaChange.removeProperty("key1"))); + Assertions.assertEquals("Cannot alter the default schema", exception.getMessage()); + + exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + kafkaCatalogOperations.alterSchema( + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "test_schema"), + SchemaChange.removeProperty("key1"))); + Assertions.assertEquals( + "Kafka catalog does not support schema alteration", exception.getMessage()); + } + + @Test + public void testDropSchema() { + Exception exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + kafkaCatalogOperations.dropSchema( + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "default"), true)); + Assertions.assertEquals("Cannot drop the default schema", exception.getMessage()); + + NameIdentifier ident = NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "test_schema"); + exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> kafkaCatalogOperations.dropSchema(ident, true)); + Assertions.assertEquals( + "Kafka catalog does not support schema deletion", exception.getMessage()); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/meta/SchemaEntity.java b/core/src/main/java/com/datastrato/gravitino/meta/SchemaEntity.java index 55f73835933..274a7db4317 100644 --- a/core/src/main/java/com/datastrato/gravitino/meta/SchemaEntity.java +++ b/core/src/main/java/com/datastrato/gravitino/meta/SchemaEntity.java @@ -40,6 +40,12 @@ public class SchemaEntity implements Entity, Auditable, HasIdentifier { private Map properties; + private SchemaEntity() {} + + public static Builder builder() { + return new Builder(); + } + /** * Returns an unmodifiable map of the fields and their corresponding values for this schema. *