-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Implement data statistics coordinator to aggregate data statistics from operator subtasks #7360
Flink: Implement data statistics coordinator to aggregate data statistics from operator subtasks #7360
Conversation
@hililiwei can you also help review? |
...v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java
Outdated
Show resolved
Hide resolved
...v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java
Outdated
Show resolved
Hide resolved
...v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java
Outdated
Show resolved
Hide resolved
...v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
...nk/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java
Outdated
Show resolved
Hide resolved
...nk/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java
Outdated
Show resolved
Hide resolved
...nk/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java
Outdated
Show resolved
Hide resolved
.../v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
Outdated
Show resolved
Hide resolved
...nk/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
...v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java
Outdated
Show resolved
Hide resolved
...nk/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java
Outdated
Show resolved
Hide resolved
...nk/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java
Outdated
Show resolved
Hide resolved
...nk/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java
Outdated
Show resolved
Hide resolved
...k/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorProvider.java
Outdated
Show resolved
Hide resolved
...17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsAggregator.java
Outdated
Show resolved
Hide resolved
...v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
...17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsAggregator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
...nk/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
...nk/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
LOG.info( | ||
"Restoring data statistic coordinator {} from checkpoint {}.", operatorName, checkpointId); | ||
lastCompletedAggregator = | ||
InstantiationUtil.deserializeObject( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
later, we should follow up with a separate PR with a more stable serializer for checkpoint state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I will have a separate PR for the serializer
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
...nk/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
LOG.info( | ||
"Restoring data statistic coordinator {} from checkpoint {}.", operatorName, checkpointId); | ||
lastCompletedAggregator = | ||
InstantiationUtil.deserializeObject( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I will have a separate PR for the serializer
...nk/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
9595f7a
to
bb332cc
Compare
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
...17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsAggregator.java
Outdated
Show resolved
Hide resolved
...k/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsAggregatorTracker.java
Outdated
Show resolved
Hide resolved
...k/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsAggregatorTracker.java
Outdated
Show resolved
Hide resolved
…tics from operator subtasks
…taStatistics to GlobalStatisticsAggregator
…d remove start check in coordinator close function
17f83b2
to
c1ff366
Compare
5690c28
to
fc36623
Compare
…pletion in class GlobalStatisticsAggregatorTracker
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Show resolved
Hide resolved
...k/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorProvider.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
.../v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
Outdated
Show resolved
Hide resolved
.../v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
Outdated
Show resolved
Hide resolved
...k/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsAggregatorTracker.java
Outdated
Show resolved
Hide resolved
...k/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsAggregatorTracker.java
Outdated
Show resolved
Hide resolved
...k/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsAggregatorTracker.java
Outdated
Show resolved
Hide resolved
…d operator name to logging
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Show resolved
Hide resolved
.../v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
Outdated
Show resolved
Hide resolved
...flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java
Show resolved
Hide resolved
...flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
new RowDataSerializer(RowType.of(new VarCharType()))); | ||
provider = | ||
new DataStatisticsCoordinatorProvider<>( | ||
"DataStatisticsCoordinatorProviderTest", OPERATOR_ID, statisticsSerializer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: DataStatisticsCoordinatorProviderTest
. maybe just use class name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant getClass().getSimpleName()
...c/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinatorProvider.java
Show resolved
Hide resolved
...7/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestGlobalStatisticsTracker.java
Outdated
Show resolved
Hide resolved
…r to track the processed subtasks
…cs and AggregatedStatisticsTracker
DataStatisticsEvent<MapDataStatistics, Map<RowData, Long>> | ||
checkpoint1Subtask0DataStatisticEvent = | ||
DataStatisticsEvent.create(1, checkpoint1Subtask0DataStatistic, statisticsSerializer); | ||
Assert.assertNull( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we convert everything to assertj? I believe the guideline for new unit test should be assertj.
https://iceberg.apache.org/contribute/#assertj
example for equals
Assertions.assertThat(UpdateRequirementParser.toJson(actual))
.as("AssertRefSnapshotId should convert to the correct JSON value")
.isEqualTo(expected);
...7/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java
Outdated
Show resolved
Hide resolved
...7/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java
Outdated
Show resolved
Hide resolved
...7/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java
Outdated
Show resolved
Hide resolved
...7/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
....17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Outdated
Show resolved
Hide resolved
...k/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorProvider.java
Show resolved
Hide resolved
new RowDataSerializer(RowType.of(new VarCharType()))); | ||
provider = | ||
new DataStatisticsCoordinatorProvider<>( | ||
"DataStatisticsCoordinatorProviderTest", OPERATOR_ID, statisticsSerializer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant getClass().getSimpleName()
...c/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinatorProvider.java
Outdated
Show resolved
Hide resolved
...17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
Outdated
Show resolved
Hide resolved
thanks @yegangy0718 for the contribution! |
…dinator to aggregate data statistics from operator subtasks
…dinator to aggregate data statistics from operator subtasks
…dinator to aggregate data statistics from operator subtasks
…dinator to aggregate data statistics from operator subtasks
…r to aggregate data statistics from operator subtasks (#8747) Co-authored-by: gang_ye <[email protected]>
…r to aggregate data statistics from operator subtasks (#8749) Co-authored-by: gang_ye <[email protected]>
This PR is created as part of issue #6303 and project https://github.com/apache/iceberg/projects/27
Motivation:
In this PR, we implement DataStatisticsCoordinator to receive DataStatisticsEvent from DataStatisticsOperator, aggregate those DataStatisticsEvent from operator subtasks via AggregateDataStatistics and then send the aggregation result back to operators.
Changes: