-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Remove extra columns for ColumnBatch #11551
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,11 +21,13 @@ | |
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; | ||
import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue; | ||
import static org.apache.iceberg.types.Types.NestedField.required; | ||
import static org.apache.spark.sql.types.DataTypes.IntegerType; | ||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.assertj.core.api.Assumptions.assumeThat; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
import java.time.LocalDate; | ||
import java.util.List; | ||
import java.util.Set; | ||
import org.apache.hadoop.conf.Configuration; | ||
|
@@ -86,6 +88,7 @@ | |
import org.apache.spark.sql.catalyst.InternalRow; | ||
import org.apache.spark.sql.internal.SQLConf; | ||
import org.apache.spark.sql.types.StructType; | ||
import org.apache.spark.sql.vectorized.ColumnarBatch; | ||
import org.jetbrains.annotations.NotNull; | ||
import org.junit.jupiter.api.AfterAll; | ||
import org.junit.jupiter.api.AfterEach; | ||
|
@@ -95,7 +98,6 @@ | |
|
||
@ExtendWith(ParameterizedTestExtension.class) | ||
public class TestSparkReaderDeletes extends DeleteReadTests { | ||
|
||
private static TestHiveMetastore metastore = null; | ||
protected static SparkSession spark = null; | ||
protected static HiveCatalog catalog = null; | ||
|
@@ -622,6 +624,51 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio | |
assertThat(rowSet(tblName, tbl, "*")).hasSize(193); | ||
} | ||
|
||
@TestTemplate | ||
public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is expected to pass even without the fix provided by this PR. Currently, the extra columns returned to Spark do not cause any problems. However, with Comet native execution, since Comet allocates arrays in a pre-allocated list and relies on the requested schema to determine the number of columns in the batch, this test would fail without the fix proposed in this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to check the intermediate results? for example, checking the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have changed the test to check the number of columns in |
||
assumeThat(format).isEqualTo(FileFormat.PARQUET); | ||
initDateTable(); | ||
|
||
Schema deleteRowSchema = dateTable.schema().select("dt"); | ||
Record dataDelete = GenericRecord.create(deleteRowSchema); | ||
List<Record> dataDeletes = | ||
Lists.newArrayList( | ||
dataDelete.copy("dt", LocalDate.parse("2021-09-01")), | ||
dataDelete.copy("dt", LocalDate.parse("2021-09-02")), | ||
dataDelete.copy("dt", LocalDate.parse("2021-09-03"))); | ||
|
||
DeleteFile eqDeletes = | ||
FileHelpers.writeDeleteFile( | ||
dateTable, | ||
Files.localOutput(File.createTempFile("junit", null, temp.toFile())), | ||
TestHelpers.Row.of(0), | ||
dataDeletes.subList(0, 3), | ||
deleteRowSchema); | ||
|
||
dateTable.newRowDelta().addDeletes(eqDeletes).commit(); | ||
|
||
CloseableIterable<CombinedScanTask> tasks = | ||
TableScanUtil.planTasks( | ||
dateTable.newScan().planFiles(), | ||
TableProperties.METADATA_SPLIT_SIZE_DEFAULT, | ||
TableProperties.SPLIT_LOOKBACK_DEFAULT, | ||
TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); | ||
|
||
for (CombinedScanTask task : tasks) { | ||
try (BatchDataReader reader = | ||
new BatchDataReader( | ||
// expected column is id, while the equality filter column is dt | ||
dateTable, task, dateTable.schema(), dateTable.schema().select("id"), false, 7)) { | ||
while (reader.next()) { | ||
ColumnarBatch columnarBatch = reader.get(); | ||
int numOfCols = columnarBatch.numCols(); | ||
assertThat(numOfCols).as("Number of columns").isEqualTo(1); | ||
Comment on lines
+664
to
+665
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: also check the column type to make sure
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added. Thanks! |
||
assertThat(columnarBatch.column(0).dataType()).as("Column type").isEqualTo(IntegerType); | ||
} | ||
} | ||
} | ||
} | ||
|
||
private static final Schema PROJECTION_SCHEMA = | ||
new Schema( | ||
required(1, "id", Types.IntegerType.get()), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be helpful to keep the following comments as part of the JavaDoc for the method to provide clarity and context:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, missed this part. Added.