-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Virtual cluster wide topic id cache #11
Virtual cluster wide topic id cache #11
Conversation
//TODO revisit error handling | ||
try { | ||
final CompletableFuture<String> topicNameFuture = topicUuidToNameCache.getTopicName(originalUuid); | ||
return topicNameFuture != null ? topicNameFuture.get(5, TimeUnit.SECONDS) : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This blocking in the netty thread seems risky. If the future isn't completed then connections on this eventloop will slow to a crawl.
Maybe it's better to redundantly request the metadata while handling the Request, or respond with an error message if the future isnt complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah thats a fair call re-blocking.
Should maybe switch to getNow
and just error if it returns null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One property I think we'll want is that metadata fetch is re-attempted pretty quickly in case the initial one failed. I guess that's part of //TODO revisit error handling
, maybe the futures should be removed from the cache. Or the cache entry in the cache could be an object with a timestamp on it as well, so callers could decide to retry.
private final AsyncCache<Uuid, String> topicNamesById; | ||
|
||
public TopicIdCache() { | ||
this(Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(10)).buildAsync()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for expiring? Is this to keep only relevant/used mappings cached?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Can't say I gave it too much thought. I wanted to avoid the data going stale/leaking in case the proxy missed a metadata update which deleted a topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose if a kafka use-case used short lived topics, then this would be a concern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should probably be a bounded cache as well, given we are going to have 1 per VirtualCluster.
3bab12f
to
e130c3a
Compare
@@ -22,4 +29,9 @@ public TopicEncryptionConfig(@JsonProperty(value = IN_MEMORY_POLICY_REPOSITORY_P | |||
public PolicyRepository getPolicyRepository() { | |||
return inMemoryPolicyRepository.getPolicyRepository(); | |||
} | |||
|
|||
public TopicIdCache getTopicUuidToNameCache() { | |||
return virtualClusterToTopicUUIDToTopicNameCache.computeIfAbsent("VIRTUAL_CLUSTER_ID", (key) -> new TopicIdCache()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the intent here that the config will be given access to the VirtualCluster name, or its UID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha you spotted my deliberate fudge. I'm currently working on https://github.com/sambarker/kroxylicious/tree/name_that_cluster I my current suspicion is it will need to be name based as we are leaning towards relaxed restrictions on clusterID's.
final MetadataRequest metadataRequest = builder.build(builder.latestAllowedVersion()); | ||
topicIdsToResolve.forEach(uuid -> topicNamesById.put(uuid, new CompletableFuture<>())); | ||
context.<MetadataResponseData> sendRequest(metadataRequest.version(), metadataRequest.data()) | ||
.whenComplete((metadataResponseData, throwable) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What path is followed when topicId is not found?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None yet 😁 As I haven't spun up a real cluster to work out what that would look like (this is the sort of reason its still a draft PR).
I suspect it will need to fail the future or even just complete it with null and let it get re-queried.
Kudos, SonarCloud Quality Gate passed! 0 Bugs No Coverage information |
Fixes: #2