Skip to content

Commit

Permalink
Adjusting tests for 1.15 and 1.16. Removing a redundant one. Style wa…
Browse files Browse the repository at this point in the history
…s lost.
  • Loading branch information
schongloo committed Oct 17, 2023
1 parent 26f1734 commit 77ff8d7
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@ public class TestBucketPartitionerFlinkIcebergSink {

@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUMBER_TASK_MANAGERS)
.setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
.build());
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUMBER_TASK_MANAGERS)
.setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
.build());

@RegisterExtension
private static final HadoopCatalogExtension catalogExtension =
new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);

private static final TypeInformation<Row> ROW_TYPE_INFO =
new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());

// Parallelism = 8 (parallelism > numBuckets) throughout the test suite
private final int parallelism = NUMBER_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER;
Expand All @@ -90,44 +90,44 @@ public class TestBucketPartitionerFlinkIcebergSink {
private void setupEnvironment(TableSchemaType tableSchemaType) {
PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets);
table =
catalogExtension
.catalog()
.createTable(
TABLE_IDENTIFIER,
SimpleDataUtil.SCHEMA,
partitionSpec,
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()));
catalogExtension
.catalog()
.createTable(
TABLE_IDENTIFIER,
SimpleDataUtil.SCHEMA,
partitionSpec,
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()));
env =
StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism * 2);
StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism * 2);
tableLoader = catalogExtension.tableLoader();
}

private void appendRowsToTable(List<RowData> allRows) throws Exception {
DataFormatConverters.RowConverter converter =
new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());

DataStream<RowData> dataStream =
env.addSource(
new BoundedTestSource<>(
allRows.stream().map(converter::toExternal).toArray(Row[]::new)),
ROW_TYPE_INFO)
.map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE))
.partitionCustom(
new BucketPartitioner(table.spec()),
new BucketPartitionKeySelector(
table.spec(),
table.schema(),
FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA)));
env.addSource(
new BoundedTestSource<>(
allRows.stream().map(converter::toExternal).toArray(Row[]::new)),
ROW_TYPE_INFO)
.map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE))
.partitionCustom(
new BucketPartitioner(table.spec()),
new BucketPartitionKeySelector(
table.spec(),
table.schema(),
FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA)));

FlinkSink.forRowData(dataStream)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.distributionMode(DistributionMode.NONE)
.append();
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.distributionMode(DistributionMode.NONE)
.append();

env.execute("Test Iceberg DataStream");

Expand All @@ -136,8 +136,8 @@ private void appendRowsToTable(List<RowData> allRows) throws Exception {

@ParameterizedTest
@EnumSource(
value = TableSchemaType.class,
names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
value = TableSchemaType.class,
names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) throws Exception {
setupEnvironment(tableSchemaType);
List<RowData> rows = generateTestDataRows();
Expand Down Expand Up @@ -174,7 +174,7 @@ private List<RowData> generateTestDataRows() {
}

private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType)
throws IOException {
throws IOException {
int totalRecordCount = 0;
Map<Integer, List<Integer>> writersPerBucket = Maps.newHashMap(); // <BucketId, List<WriterId>>
Map<Integer, Integer> filesPerBucket = Maps.newHashMap(); // <BucketId, NumFiles>
Expand All @@ -192,10 +192,10 @@ private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType)

totalRecordCount += recordCountInFile;
int bucketId =
scanTask
.file()
.partition()
.get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class);
scanTask
.file()
.partition()
.get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class);
writersPerBucket.computeIfAbsent(bucketId, k -> Lists.newArrayList());
writersPerBucket.get(bucketId).add(writerId);
filesPerBucket.put(bucketId, filesPerBucket.getOrDefault(bucketId, 0) + 1);
Expand All @@ -214,10 +214,10 @@ private static class TableTestStats {
final Map<Integer, Long> rowsPerWriter;

TableTestStats(
int totalRecordCount,
Map<Integer, List<Integer>> writersPerBucket,
Map<Integer, Integer> numFilesPerBucket,
Map<Integer, Long> rowsPerWriter) {
int totalRecordCount,
Map<Integer, List<Integer>> writersPerBucket,
Map<Integer, Integer> numFilesPerBucket,
Map<Integer, Long> rowsPerWriter) {
this.totalRowCount = totalRecordCount;
this.writersPerBucket = writersPerBucket;
this.numFilesPerBucket = numFilesPerBucket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@ public class TestBucketPartitionerFlinkIcebergSink {

@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUMBER_TASK_MANAGERS)
.setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
.build());
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUMBER_TASK_MANAGERS)
.setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
.build());

@RegisterExtension
private static final HadoopCatalogExtension catalogExtension =
new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);

private static final TypeInformation<Row> ROW_TYPE_INFO =
new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());

// Parallelism = 8 (parallelism > numBuckets) throughout the test suite
private final int parallelism = NUMBER_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER;
Expand All @@ -90,44 +90,44 @@ public class TestBucketPartitionerFlinkIcebergSink {
private void setupEnvironment(TableSchemaType tableSchemaType) {
PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets);
table =
catalogExtension
.catalog()
.createTable(
TABLE_IDENTIFIER,
SimpleDataUtil.SCHEMA,
partitionSpec,
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()));
catalogExtension
.catalog()
.createTable(
TABLE_IDENTIFIER,
SimpleDataUtil.SCHEMA,
partitionSpec,
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()));
env =
StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism * 2);
StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism * 2);
tableLoader = catalogExtension.tableLoader();
}

private void appendRowsToTable(List<RowData> allRows) throws Exception {
DataFormatConverters.RowConverter converter =
new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());

DataStream<RowData> dataStream =
env.addSource(
new BoundedTestSource<>(
allRows.stream().map(converter::toExternal).toArray(Row[]::new)),
ROW_TYPE_INFO)
.map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE))
.partitionCustom(
new BucketPartitioner(table.spec()),
new BucketPartitionKeySelector(
table.spec(),
table.schema(),
FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA)));
env.addSource(
new BoundedTestSource<>(
allRows.stream().map(converter::toExternal).toArray(Row[]::new)),
ROW_TYPE_INFO)
.map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE))
.partitionCustom(
new BucketPartitioner(table.spec()),
new BucketPartitionKeySelector(
table.spec(),
table.schema(),
FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA)));

FlinkSink.forRowData(dataStream)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.distributionMode(DistributionMode.NONE)
.append();
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.distributionMode(DistributionMode.NONE)
.append();

env.execute("Test Iceberg DataStream");

Expand All @@ -136,8 +136,8 @@ private void appendRowsToTable(List<RowData> allRows) throws Exception {

@ParameterizedTest
@EnumSource(
value = TableSchemaType.class,
names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
value = TableSchemaType.class,
names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) throws Exception {
setupEnvironment(tableSchemaType);
List<RowData> rows = generateTestDataRows();
Expand Down Expand Up @@ -174,7 +174,7 @@ private List<RowData> generateTestDataRows() {
}

private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType)
throws IOException {
throws IOException {
int totalRecordCount = 0;
Map<Integer, List<Integer>> writersPerBucket = Maps.newHashMap(); // <BucketId, List<WriterId>>
Map<Integer, Integer> filesPerBucket = Maps.newHashMap(); // <BucketId, NumFiles>
Expand All @@ -192,10 +192,10 @@ private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType)

totalRecordCount += recordCountInFile;
int bucketId =
scanTask
.file()
.partition()
.get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class);
scanTask
.file()
.partition()
.get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class);
writersPerBucket.computeIfAbsent(bucketId, k -> Lists.newArrayList());
writersPerBucket.get(bucketId).add(writerId);
filesPerBucket.put(bucketId, filesPerBucket.getOrDefault(bucketId, 0) + 1);
Expand All @@ -214,10 +214,10 @@ private static class TableTestStats {
final Map<Integer, Long> rowsPerWriter;

TableTestStats(
int totalRecordCount,
Map<Integer, List<Integer>> writersPerBucket,
Map<Integer, Integer> numFilesPerBucket,
Map<Integer, Long> rowsPerWriter) {
int totalRecordCount,
Map<Integer, List<Integer>> writersPerBucket,
Map<Integer, Integer> numFilesPerBucket,
Map<Integer, Long> rowsPerWriter) {
this.totalRowCount = totalRecordCount;
this.writersPerBucket = writersPerBucket;
this.numFilesPerBucket = numFilesPerBucket;
Expand Down

0 comments on commit 77ff8d7

Please sign in to comment.