-
Notifications
You must be signed in to change notification settings - Fork 380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#2610] feat(kafka-catalog, core): Adapt the Kafka catalog to CatalogOperationsDispatcher #2694
Conversation
.withComment(comment) | ||
// Because there is no way to store the Gravitino ID in Kafka, therefor we use the topic | ||
// ID as the Gravitino ID | ||
.withProperties(newPropertiesWithId(convertToGravitinoId(topicId), properties)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a test for this change.
StringIdentifier stringId = getStringIdFromProperties(topic.properties()); | ||
// Case 1: The topic is not created by Gravitino. | ||
// Note: for Kafka catalog, stringId will not be null. Because there is no way to store the | ||
// Gravitino | ||
// ID in Kafka, therefor we use the topic ID as the Gravitino ID | ||
if (stringId == null) { | ||
return EntityCombinedTopic.of(topic) | ||
.withHiddenPropertiesSet( | ||
getHiddenPropertyNames( | ||
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should never happened, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As of now, yes.
But if other messaging catalogs that support setting topic property are implemented later, such as Pulsar, this stringId
may be null.
@@ -88,6 +88,7 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten) | |||
} else if (e instanceof FilesetEntity) { | |||
FilesetMetaService.getInstance().insertFileset((FilesetEntity) e, overwritten); | |||
} else { | |||
// TODO: Add support for TopicEntity | |||
throw new UnsupportedEntityTypeException( | |||
"Unsupported entity type: %s for insert operation", e.getClass()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you going to do this in another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, some design work may be needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
What changes were proposed in this pull request?
This PR add the Kafka catalog adaption to the CatalogOperationsDispatcher
Why are the changes needed?
Part of Kafka catalog support work
Fix: #2610
Does this PR introduce any user-facing change?
no
How was this patch tested?
UTs