From e8c9f3243bcc6374da794f395e5bfced50d914d8 Mon Sep 17 00:00:00 2001 From: zhen wang Date: Thu, 19 Oct 2023 10:08:22 +0800 Subject: [PATCH 1/4] support read meta columns in staged scan --- .../spark/extensions/TestMetaColumns.java | 104 ++++++++++++++++++ .../iceberg/spark/source/SparkStagedScan.java | 6 +- .../spark/source/SparkStagedScanBuilder.java | 53 ++++++++- 3 files changed, 158 insertions(+), 5 deletions(-) create mode 100644 spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumns.java diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumns.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumns.java new file mode 100644 index 000000000000..72845eb7bf05 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumns.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestMetaColumns extends SparkExtensionsTestBase { + + public TestMetaColumns(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties() + } + }; + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testReadStageTableMeta() throws Exception { + sql( + "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES" + + "('format-version'='2', 'write.delete.mode'='merge-on-read')", + tableName); + + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c"), + new SimpleRecord(4, "d")); + + spark + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(tableName) + .append(); + + Table table = Spark3Util.loadIcebergTable(spark, tableName); + table.refresh(); + String tableLocation = table.location(); + + Dataset scanDF2 = + spark + .read() + .format("iceberg") + .option(SparkReadOptions.FILE_OPEN_COST, "0") + .load(tableLocation); + + Assert.assertEquals("base case", 2, scanDF2.columns().length); + + Dataset scanDF = + spark + .read() + .format("iceberg") + .option(SparkReadOptions.FILE_OPEN_COST, "0") + .load(tableLocation) + .select("*", "_pos"); + + List rows = scanDF.collectAsList(); + Assert.assertEquals("should have 4 records", 4, rows.size()); + Assert.assertEquals("meta columns should be included", 3, scanDF.columns().length); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index 0290bf7e84ce..3986760bb4cf 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -22,6 +22,7 @@ import java.util.Objects; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -39,9 +40,8 @@ class SparkStagedScan extends SparkScan { private List> taskGroups = null; // lazy cache of tasks - SparkStagedScan(SparkSession spark, Table table, SparkReadConf readConf) { - super(spark, table, readConf, table.schema(), ImmutableList.of(), null); - + SparkStagedScan(SparkSession spark, Table table, Schema expectedSchema, SparkReadConf readConf) { + super(spark, table, readConf, expectedSchema, ImmutableList.of(), null); this.taskSetId = readConf.scanTaskSetId(); this.splitSize = readConf.splitSize(); this.splitLookback = readConf.splitLookback(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java index 37bbea42e5b1..6d8edc4b6ae1 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java @@ -18,27 +18,76 @@ */ package org.apache.iceberg.spark.source; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; -class SparkStagedScanBuilder implements ScanBuilder { +class SparkStagedScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns { private final SparkSession spark; private final Table table; private final SparkReadConf readConf; + private final List metaColumns = Lists.newArrayList(); + + private Schema schema = null; + SparkStagedScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { this.spark = spark; this.table = table; this.readConf = new SparkReadConf(spark, table, options); + this.schema = table.schema(); } @Override public Scan build() { - return new SparkStagedScan(spark, table, readConf); + return new SparkStagedScan(spark, table, schemaWithMetadataColumns(), readConf); + } + + @Override + public void pruneColumns(StructType requestedSchema) { + StructType requestedProjection = + new StructType( + Stream.of(requestedSchema.fields()) + .filter(field -> MetadataColumns.nonMetadataColumn(field.name())) + .toArray(StructField[]::new)); + + // the projection should include all columns that will be returned, including those only used in + // filters + this.schema = SparkSchemaUtil.prune(schema, requestedProjection); + + Stream.of(requestedSchema.fields()) + .map(StructField::name) + .filter(MetadataColumns::isMetadataColumn) + .distinct() + .forEach(metaColumns::add); + } + + private Schema schemaWithMetadataColumns() { + // metadata columns + List fields = + metaColumns.stream() + .distinct() + .map(name -> MetadataColumns.metadataColumn(table, name)) + .collect(Collectors.toList()); + Schema meta = new Schema(fields); + + // schema or rows returned by readers + return TypeUtil.join(schema, meta); } } From 7f3e3f6d9e3c082a4bb2708c452af0ceff640cd1 Mon Sep 17 00:00:00 2001 From: zhen wang Date: Mon, 23 Oct 2023 15:17:38 +0800 Subject: [PATCH 2/4] fix tests --- ...estMetaColumnProjectionWithStageScan.java} | 61 +++++++++++++------ 1 file changed, 41 insertions(+), 20 deletions(-) rename spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/{TestMetaColumns.java => TestMetaColumnProjectionWithStageScan.java} (59%) diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumns.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java similarity index 59% rename from spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumns.java rename to spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java index 72845eb7bf05..b275003c25a8 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumns.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java @@ -20,8 +20,12 @@ import java.util.List; import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.ScanTask; import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.SparkReadOptions; @@ -29,14 +33,15 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.assertj.core.api.Assertions; import org.junit.After; -import org.junit.Assert; import org.junit.Test; import org.junit.runners.Parameterized; -public class TestMetaColumns extends SparkExtensionsTestBase { +public class TestMetaColumnProjectionWithStageScan extends SparkExtensionsTestBase { - public TestMetaColumns(String catalogName, String implementation, Map config) { + public TestMetaColumnProjectionWithStageScan( + String catalogName, String implementation, Map config) { super(catalogName, implementation, config); } @@ -56,6 +61,12 @@ public void removeTables() { sql("DROP TABLE IF EXISTS %s", tableName); } + private void stageTask( + Table tab, String fileSetID, CloseableIterable tasks) { + ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); + taskSetManager.stageTasks(tab, fileSetID, Lists.newArrayList(tasks)); + } + @Test public void testReadStageTableMeta() throws Exception { sql( @@ -80,25 +91,35 @@ public void testReadStageTableMeta() throws Exception { table.refresh(); String tableLocation = table.location(); - Dataset scanDF2 = - spark - .read() - .format("iceberg") - .option(SparkReadOptions.FILE_OPEN_COST, "0") - .load(tableLocation); + try (CloseableIterable tasks = table.newBatchScan().planFiles()) { + String fileSetID = UUID.randomUUID().toString(); + stageTask(table, fileSetID, tasks); + Dataset scanDF2 = + spark + .read() + .format("iceberg") + .option(SparkReadOptions.FILE_OPEN_COST, "0") + .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) + .load(tableLocation); - Assert.assertEquals("base case", 2, scanDF2.columns().length); + Assertions.assertThat(scanDF2.columns().length).isEqualTo(2); + } - Dataset scanDF = - spark - .read() - .format("iceberg") - .option(SparkReadOptions.FILE_OPEN_COST, "0") - .load(tableLocation) - .select("*", "_pos"); + try (CloseableIterable tasks = table.newBatchScan().planFiles()) { + String fileSetID = UUID.randomUUID().toString(); + stageTask(table, fileSetID, tasks); + Dataset scanDF = + spark + .read() + .format("iceberg") + .option(SparkReadOptions.FILE_OPEN_COST, "0") + .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) + .load(tableLocation) + .select("*", "_pos"); - List rows = scanDF.collectAsList(); - Assert.assertEquals("should have 4 records", 4, rows.size()); - Assert.assertEquals("meta columns should be included", 3, scanDF.columns().length); + List rows = scanDF.collectAsList(); + Assertions.assertThat(rows.size()).isEqualTo(4); + Assertions.assertThat(scanDF.columns().length).isEqualTo(3); + } } } From 7bbd58ad91ad41ee3c047f9563b755cbaa48e149 Mon Sep 17 00:00:00 2001 From: zhen wang Date: Thu, 16 Nov 2023 10:17:35 +0800 Subject: [PATCH 3/4] address review comments --- ...TestMetaColumnProjectionWithStageScan.java | 6 ++++-- .../iceberg/spark/source/SparkStagedScan.java | 4 +++- .../spark/source/SparkStagedScanBuilder.java | 19 +++++++++---------- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java index b275003c25a8..e9013848cf11 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java @@ -24,6 +24,7 @@ import org.apache.iceberg.ScanTask; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.Spark3Util; @@ -118,8 +119,9 @@ public void testReadStageTableMeta() throws Exception { .select("*", "_pos"); List rows = scanDF.collectAsList(); - Assertions.assertThat(rows.size()).isEqualTo(4); - Assertions.assertThat(scanDF.columns().length).isEqualTo(3); + ImmutableList expectedRows = + ImmutableList.of(row(1L, "a", 0L), row(2L, "b", 1L), row(3L, "c", 2L), row(4L, "d", 3L)); + assertEquals("result should match", expectedRows, rowsToJava(rows)); } } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index 3986760bb4cf..49c8bb5bc32f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -77,6 +77,7 @@ public boolean equals(Object other) { SparkStagedScan that = (SparkStagedScan) other; return table().name().equals(that.table().name()) && Objects.equals(taskSetId, that.taskSetId) + && readSchema().equals(that.readSchema()) && Objects.equals(splitSize, that.splitSize) && Objects.equals(splitLookback, that.splitLookback) && Objects.equals(openFileCost, that.openFileCost); @@ -84,7 +85,8 @@ public boolean equals(Object other) { @Override public int hashCode() { - return Objects.hash(table().name(), taskSetId, splitSize, splitSize, openFileCost); + return Objects.hash( + table().name(), readSchema(), taskSetId, splitSize, splitSize, openFileCost); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java index 6d8edc4b6ae1..25393888f95c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java @@ -42,7 +42,6 @@ class SparkStagedScanBuilder implements ScanBuilder, SupportsPushDownRequiredCol private final SparkSession spark; private final Table table; private final SparkReadConf readConf; - private final List metaColumns = Lists.newArrayList(); private Schema schema = null; @@ -61,14 +60,7 @@ public Scan build() { @Override public void pruneColumns(StructType requestedSchema) { - StructType requestedProjection = - new StructType( - Stream.of(requestedSchema.fields()) - .filter(field -> MetadataColumns.nonMetadataColumn(field.name())) - .toArray(StructField[]::new)); - - // the projection should include all columns that will be returned, including those only used in - // filters + StructType requestedProjection = removeMetaColumns(requestedSchema); this.schema = SparkSchemaUtil.prune(schema, requestedProjection); Stream.of(requestedSchema.fields()) @@ -78,6 +70,13 @@ public void pruneColumns(StructType requestedSchema) { .forEach(metaColumns::add); } + private StructType removeMetaColumns(StructType structType) { + return new StructType( + Stream.of(structType.fields()) + .filter(field -> MetadataColumns.nonMetadataColumn(field.name())) + .toArray(StructField[]::new)); + } + private Schema schemaWithMetadataColumns() { // metadata columns List fields = @@ -87,7 +86,7 @@ private Schema schemaWithMetadataColumns() { .collect(Collectors.toList()); Schema meta = new Schema(fields); - // schema or rows returned by readers + // schema of rows returned by readers return TypeUtil.join(schema, meta); } } From ea3eba8a4b83e955e00112be02b6ab83c4549f9d Mon Sep 17 00:00:00 2001 From: zhen wang Date: Thu, 16 Nov 2023 10:27:04 +0800 Subject: [PATCH 4/4] address review comments --- .../java/org/apache/iceberg/spark/source/SparkStagedScan.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index 49c8bb5bc32f..fd299ade7fdc 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -86,7 +86,7 @@ && readSchema().equals(that.readSchema()) @Override public int hashCode() { return Objects.hash( - table().name(), readSchema(), taskSetId, splitSize, splitSize, openFileCost); + table().name(), taskSetId, readSchema(), splitSize, splitSize, openFileCost); } @Override