-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Notify producer and consumer clients when schemas are uploaded on topic #2060
Conversation
Signed-off-by: muralibasani <[email protected]>
Signed-off-by: muralibasani <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple of comments.
NEW_USER_ADDED, | ||
RESET_CONSUMER_OFFSET_REQUESTED, | ||
RESET_CONSUMER_OFFSET_APPROVED, | ||
RESET_CONSUMER_OFFSET_DENIED | ||
RESET_CONSUMER_OFFSET_DENIED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a trailing comma here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it was by mistake.
String topic = schemaRequest.getTopicname(); | ||
String schemaRequestEnvironment = schemaRequest.getEnvironment(); | ||
Optional<Env> optionalKafkaEnv = | ||
manageDatabase.getKafkaEnvList(tenantId).stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
manageDatabase.getKafkaEnvList(tenantId).stream() | |
Optional<Env> optSchemaEnv = manageDatabase.getEnv(tenantId,schemaRequestEnvironment); | |
//add null pointer checks. | |
Optional<Env> optKafkaEnv = manageDatabase.getEnv(tenantId,optSchemaEnv .get().getAssociatedEnv().getId); |
Signed-off-by: muralibasani <[email protected]>
manageDatabase | ||
.getHandleDbRequests() | ||
.getSyncAcls(optionalKafkaEnv.get().getId(), topic, tenantId); | ||
optionalEnv = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these OptionalEnv and the OptionalKafkaEnv the same object?
Signed-off-by: muralibasani <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Linked issue
Resolves: #1068
What kind of change does this PR introduce?
This change adds a functionality.
Notifies all producers and consumers of a topic for a particular environment, when there are new schemas uploaded onto a topic
Team mail ids are used here
Bug fix
[ X] New feature
Refactor
Docs update
CI update
What is the current behavior?
Describe the state of the application before this PR. Illustrations appreciated (videos, gifs, screenshots).
Currently topic subscribers are not notified when there are schema uploads on topic
What is the new behavior?
Describe the state of the application after this PR. Illustrations appreciated (videos, gifs, screenshots).
This change notifies topic subscribers when there are new schemas uploaded to topic
Other information
Additional changes, explanations of the approach taken, unresolved issues, necessary follow ups, etc.
Requirements (all must be checked before review)
main
branch have been pulledpnpm lint
has been run successfully