Skip to content

Commit

Permalink
[apache#2467] feat(kafka-catalog): Add schema operations support for …
Browse files Browse the repository at this point in the history
…Kafka catalog (apache#2521)

### What changes were proposed in this pull request?

This PR tracks the work of adding schema support for the Kafka catalog.
Unlike the other catalog, the Kafka catalog manages schema in the
Gravitino's store, and will automatically create a “default” schema to
contain all topics in the cluster.

### Why are the changes needed?

Fix: apache#2467 

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?

UTs added
  • Loading branch information
mchades authored Mar 15, 2024
1 parent e65ba06 commit ffb80e7
Show file tree
Hide file tree
Showing 5 changed files with 382 additions and 6 deletions.
8 changes: 8 additions & 0 deletions catalogs/catalog-messaging-kafka/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ dependencies {
implementation(project(":api"))
implementation(project(":core"))
implementation(project(":common"))

implementation(libs.guava)

testImplementation(libs.junit.jupiter.api)
testImplementation(libs.mockito.core)
testImplementation(libs.commons.io)

testRuntimeOnly(libs.junit.jupiter.engine)
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@
*/
package com.datastrato.gravitino.catalog.kafka;

import static com.datastrato.gravitino.StringIdentifier.ID_KEY;
import static com.datastrato.gravitino.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX;
import static com.datastrato.gravitino.catalog.kafka.KafkaCatalogPropertiesMetadata.BOOTSTRAP_SERVERS;

import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.EntityStore;
import com.datastrato.gravitino.GravitinoEnv;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.StringIdentifier;
import com.datastrato.gravitino.catalog.BasePropertiesMetadata;
import com.datastrato.gravitino.catalog.CatalogOperations;
import com.datastrato.gravitino.catalog.PropertiesMetadata;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTopicException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
Expand All @@ -21,14 +28,22 @@
import com.datastrato.gravitino.messaging.Topic;
import com.datastrato.gravitino.messaging.TopicCatalog;
import com.datastrato.gravitino.messaging.TopicChange;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.meta.SchemaEntity;
import com.datastrato.gravitino.rel.Schema;
import com.datastrato.gravitino.rel.SchemaChange;
import com.datastrato.gravitino.rel.SupportsSchemas;
import com.datastrato.gravitino.storage.IdGenerator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

public class KafkaCatalogOperations implements CatalogOperations, SupportsSchemas, TopicCatalog {

Expand All @@ -40,16 +55,48 @@ public class KafkaCatalogOperations implements CatalogOperations, SupportsSchema
new KafkaTopicPropertiesMetadata();

private final EntityStore store;
private final IdGenerator idGenerator;
private final String DEFAULT_SCHEMA_NAME = "default";
@VisibleForTesting NameIdentifier defaultSchemaIdent;
@VisibleForTesting Properties adminClientConfig;
private CatalogEntity entity;

@VisibleForTesting
KafkaCatalogOperations(EntityStore store, IdGenerator idGenerator) {
this.store = store;
this.idGenerator = idGenerator;
}

public KafkaCatalogOperations() {
this.store = GravitinoEnv.getInstance().entityStore();
this(GravitinoEnv.getInstance().entityStore(), GravitinoEnv.getInstance().idGenerator());
}

@Override
public void initialize(Map<String, String> config, CatalogEntity entity) throws RuntimeException {
Preconditions.checkArgument(
config.containsKey(BOOTSTRAP_SERVERS), "Missing configuration: %s", BOOTSTRAP_SERVERS);
Preconditions.checkArgument(config.containsKey(ID_KEY), "Missing configuration: %s", ID_KEY);

this.entity = entity;
// TODO: Implement Kafka catalog initialization, such as creating a default schema.
this.defaultSchemaIdent =
NameIdentifier.of(entity.namespace().level(0), entity.name(), DEFAULT_SCHEMA_NAME);

// Initialize the Kafka AdminClient configuration
adminClientConfig = new Properties();

Map<String, String> bypassConfigs =
config.entrySet().stream()
.filter(e -> e.getKey().startsWith(CATALOG_BYPASS_PREFIX))
.collect(
Collectors.toMap(
e -> e.getKey().substring(CATALOG_BYPASS_PREFIX.length()),
Map.Entry::getValue));
adminClientConfig.putAll(bypassConfigs);
adminClientConfig.put(BOOTSTRAP_SERVERS, config.get(BOOTSTRAP_SERVERS));
// use gravitino catalog id as the admin client id
adminClientConfig.put("client.id", config.get(ID_KEY));

createDefaultSchema();
}

@Override
Expand Down Expand Up @@ -96,23 +143,50 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc
@Override
public Schema createSchema(NameIdentifier ident, String comment, Map<String, String> properties)
throws NoSuchCatalogException, SchemaAlreadyExistsException {
throw new UnsupportedOperationException();
// It appears that the "default" schema suffices, so there is no need to support creating schema
// currently
throw new UnsupportedOperationException(
"Kafka catalog does not support schema creation "
+ "because the \"default\" schema already includes all topics");
}

@Override
public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
throw new UnsupportedOperationException();
try {
SchemaEntity schema = store.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);

return KafkaSchema.builder()
.withName(schema.name())
.withComment(schema.comment())
.withProperties(schema.properties())
.withAuditInfo(schema.auditInfo())
.build();

} catch (NoSuchEntityException exception) {
throw new NoSuchSchemaException(exception, "Schema %s does not exist", ident);
} catch (IOException ioe) {
throw new RuntimeException("Failed to load schema " + ident, ioe);
}
}

@Override
public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
throws NoSuchSchemaException {
throw new UnsupportedOperationException();
if (ident.equals(defaultSchemaIdent)) {
throw new IllegalArgumentException("Cannot alter the default schema");
}

// TODO: Implement altering schema after adding support for schema creation
throw new UnsupportedOperationException("Kafka catalog does not support schema alteration");
}

@Override
public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException {
throw new UnsupportedOperationException();
if (ident.equals(defaultSchemaIdent)) {
throw new IllegalArgumentException("Cannot drop the default schema");
}
// TODO: Implement dropping schema after adding support for schema creation
throw new UnsupportedOperationException("Kafka catalog does not support schema deletion");
}

@Override
Expand Down Expand Up @@ -142,4 +216,43 @@ public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperatio
public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException("Kafka catalog does not support table operations");
}

private void createDefaultSchema() {
// If the default schema already exists, do nothing
try {
if (store.exists(defaultSchemaIdent, Entity.EntityType.SCHEMA)) {
return;
}
} catch (IOException e) {
throw new RuntimeException("Failed to check if schema " + defaultSchemaIdent + " exists", e);
}

// Create the default schema
long uid = idGenerator.nextId();
ImmutableMap<String, String> properties =
ImmutableMap.<String, String>builder()
.put(ID_KEY, StringIdentifier.fromId(uid).toString())
.put(BasePropertiesMetadata.GRAVITINO_MANAGED_ENTITY, Boolean.TRUE.toString())
.build();

SchemaEntity defaultSchema =
SchemaEntity.builder()
.withName(defaultSchemaIdent.name())
.withId(uid)
.withNamespace(Namespace.ofSchema(entity.namespace().level(0), entity.name()))
.withComment("The default schema of Kafka catalog including all topics")
.withProperties(properties)
.withAuditInfo(
AuditInfo.builder()
.withCreator(entity.auditInfo().creator())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(defaultSchema, true /* overwrite */);
} catch (IOException ioe) {
throw new RuntimeException("Failed to create default schema for Kafka catalog", ioe);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.kafka;

import com.datastrato.gravitino.catalog.rel.BaseSchema;

public class KafkaSchema extends BaseSchema {

public static Builder builder() {
return new Builder();
}

public static class Builder extends BaseSchemaBuilder<Builder, KafkaSchema> {

@Override
protected KafkaSchema internalBuild() {
KafkaSchema schema = new KafkaSchema();
schema.name = name;
schema.comment = comment;
schema.properties = properties;
schema.auditInfo = auditInfo;
return schema;
}
}
}
Loading

0 comments on commit ffb80e7

Please sign in to comment.