-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize kinesis ingestion task assignment after resharding #12235
Optimize kinesis ingestion task assignment after resharding #12235
Conversation
...-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
Outdated
Show resolved
Hide resolved
...ng-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Minor comments.
...ng-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
Show resolved
Hide resolved
...ng-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approach looks good to me. Need some minor changes.
...-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
Outdated
Show resolved
Hide resolved
...-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
Outdated
Show resolved
Hide resolved
...-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
Outdated
Show resolved
Hide resolved
...ng-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
Outdated
Show resolved
Hide resolved
...exing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
Outdated
Show resolved
Hide resolved
...exing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
Outdated
Show resolved
Hide resolved
...ervice/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
Outdated
Show resolved
Hide resolved
...ervice/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
Outdated
Show resolved
Hide resolved
...ervice/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
Outdated
Show resolved
Hide resolved
@@ -88,6 +91,11 @@ | |||
private final AWSCredentialsConfig awsCredentialsConfig; | |||
private volatile Map<String, Long> currentPartitionTimeLag; | |||
|
|||
// Maintain sets of currently closed shards to find "bad" (closed and empty) shards | |||
// Poll closed shards once and store the result to avoid redundant costly calls to kinesis | |||
private final Set<String> emptyClosedShardIds = new TreeSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be in thread-safe container, or have access be protected with lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out, @zachjsh.
This code would be executed by the SeekableStreamSupervisor
while executing a RunNotice
(scheduled when status of a task changes) as well as a DynamicAllocationTasksNotice
(scheduled for auto-scaling). There is a possibility of contention between these two executions.
We can make the part where the caches are updated synchronized
.
Just changing these two caches to a Concurrent
version might not be enough as a whole new list of active shards is fetched in updateClosedShardCache()
and the caches must be updated with this new state before any other action is performed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synchronizing the whole method updateClosedShardCache
would actually be preferable because the state returned by two subsequent calls to recordSupplier.getShards()
can be different.
So this call should happen inside the synchronized block, as should the calls to recordSupplier.isClosedShardEmpty()
.
I hope this doesn't cause bottlenecks though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole method has been synchronized. Thanks!
@@ -2291,9 +2291,30 @@ protected boolean supportsPartitionExpiration() | |||
return false; | |||
} | |||
|
|||
protected boolean shouldSkipIgnorablePartitions() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not do something similar for Kafka? Why is this not an issue with Kafka?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We haven't encountered something similar in Kafka yet. But the API has been put in place so that if KafkaSupervisor
needs to do something similar, it can override the methods shouldSkipIgnorablePartitions()
and getIgnorablePartitionIds()
.
For Kinesis, the ignorable partitions translate to empty and closed shards, which is a concept specific to Kinesis.
...ng-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, @AmatyaAvadhanula . LGTM.
* @return set of shards ignorable by kinesis ingestion | ||
*/ | ||
@Override | ||
protected Set<String> getIgnorablePartitionIds() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - this should be called computeIgnorablePartitionIds() or loadIgnorablePartitionIds()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the current verb indicates that it is just a getter but behind the scenes it can do network calls etc to fetch the partition ids.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making changes. LGTM
This reverts commit 1ec57cb.
Description
KinesisRecordSupplier is used to get a list of all shards during Kinesis ingestion.
Additional methods have been added to determine which shards are closed and empty.
Repetitive calls to kinesis for shards' records are avoided by maintaining an in-memory cache in the supervisor.
Proposed Design:
skipIgnorableShards
is set to true, the cache is utilized and updated. This also means that "ignorable" shards no longer participate in task allocation for ingestion or for autoscaler estimations.Limitations:
Alternative design:
Update the metdata with end offsets of closed and empty shards. This may be simpler to implement since it doesn't require a cache but would lead to waste of resources since a task would have to update the metadata
Key changed/added classes in this PR
KinesisSupervisor
This PR has: