Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Remove extra columns for ColumnBatch #11551

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

huaxingao
Copy link
Contributor

In Equality Delete, we build ColumnarBatchReader for the equality delete filter columns to read their values and determine which rows are deleted. If these filter columns are not among the requested columns, they are considered extra and should be removed before returning the ColumnBatch to Spark.

Suppose the table schema includes C1, C2, C3, C4, C5. If the query is: SELECT C5 FROM table, and the equality delete filter is on C3 and C4,

We read the values of C3 and C4 to identify which rows are deleted. However, we do not want to include these values in the ColumnBatch that we return to Spark.

@@ -622,6 +624,41 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio
assertThat(rowSet(tblName, tbl, "*")).hasSize(193);
}

@TestTemplate
public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is expected to pass even without the fix provided by this PR. Currently, the extra columns returned to Spark do not cause any problems. However, with Comet native execution, since Comet allocates arrays in a pre-allocated list and relies on the requested schema to determine the number of columns in the batch, this test would fail without the fix proposed in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to check the intermediate results? for example, checking the ColumnarBatch returned to Spark. We may avoid using comet as a dependency for the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the test to check the number of columns in ColumnarBatch

Comment on lines 56 to 57
// is 2. Since when creating the DeleteFilter, we append these extra columns in the end of the
// requested schema, we can just remove them from the end of the ColumnVector.
Copy link
Contributor

@singhpk234 singhpk234 Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[doubt] Is it possible to fix this at the place where these extra columns are appended in the end of the requested schema, this would probably help us in avoiding extra memory in the first place and expensive copy of Columbatch ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comment!

The extra columns are appended to the requested schema in DeleteFilter.fileProjection. The values of these extra columns are read in ColumnarBatchReader and used to identify which rows are deleted in applyEqDelete. I remove the extra columns right after calling applyEqDelete.

Copy link
Contributor

@singhpk234 singhpk234 Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the response !

[doubt] Considering applyEquality delete anyways does another projection on top of the schema returned from DeleteFilte.fileProjection

Schema deleteSchema = TypeUtil.select(requiredSchema, ids);

can we not add one another param in the fileProjection like we did here to include additional fiels based on the boolean flag ? :

so that we get what columns we actually need in the first place ? to avoid removing extra columns post filter evaluation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a look!
We traverse the schema and build a VectorizedReader for each of the column in VectorizedReaderBuilder, this is done before DeleteFilte.fileProjection

@huaxingao
Copy link
Contributor Author

cc @flyrain @szehon-ho @viirya

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for woking on it, @huaxingao ! Left some comments.

@@ -245,5 +259,16 @@ void applyEqDelete(ColumnarBatch columnarBatch) {

columnarBatch.setNumRows(currentRowId);
}

ColumnarBatch removeExtraColumnsFromColumnarBatch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion: Simplify to removeExtraColumns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks

@@ -622,6 +624,41 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio
assertThat(rowSet(tblName, tbl, "*")).hasSize(193);
}

@TestTemplate
public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to check the intermediate results? for example, checking the ColumnarBatch returned to Spark. We may avoid using comet as a dependency for the test.

@@ -125,4 +126,25 @@ protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>> reorder
return reader;
}
}

private static int numOfExtraColumns(DeleteFilter deleteFilter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: DeleteFilter -> dDeleteFilter<InternalRow>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks

Comment on lines 133 to 141
// For Equality Delete, the requiredColumns and expectedColumns may not be the
// same. For example, supposed table schema is C1, C2, C3, C4, C5, The query is:
// SELECT C5 FROM table, and the equality delete Filter is on C3, C4, then
// the requestedSchema is C5, and the required schema is C5, C3 and C4. The
// vectorized reader reads also need to read C3 and C4 columns to figure out
// which rows are deleted. However, after figuring out the deleted rows, the
// extra columns values are not needed to returned to Spark.
// Getting the numOfExtraColumns so we can remove these extra columns
// from ColumnBatch later.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the comment to method's Java doc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved. Thanks

// from ColumnBatch later.
List<Types.NestedField> requiredColumns = deleteFilter.requiredSchema().columns();
List<Types.NestedField> expectedColumns = deleteFilter.requestedSchema().columns();
return requiredColumns.size() - expectedColumns.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure the extra columns are consistently appended to the end of the array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because the extra columns are appended to the end of requestedSchema in DeleteFilter.fileProjection

@@ -73,6 +74,7 @@ protected DeleteFilter(
boolean needRowPosCol) {
this.filePath = filePath;
this.counter = counter;
this.requestedSchema = requestedSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we be consistent with the name? requestedSchema or expectedSchema? I guess expectedSchema is more commonly used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to expectedSchema


public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
public ColumnarBatchReader(List<VectorizedReader<?>> readers, int numExtraCol) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the DeleteFilter(field deletes) within the class, so that no extra parameter is required? We could move the method numOfExtraColumns to this class in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed. Thanks

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @huaxingao, for the update! The changes look great overall. I've left a minor refactoring suggestion for consideration. 😊

Comment on lines +663 to +664
int numOfCols = columnarBatch.numCols();
assertThat(numOfCols).as("Number of columns").isEqualTo(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: also check the column type to make sure dt is removed like following?

          // only the expected column(id) is kept
          assertThat(columnarBatch.numCols()).as("Number of columns").isEqualTo(1);
          assertThat(columnarBatch.column(0).dataType()).as("Column type").isEqualTo(IntegerType);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Thanks!

Comment on lines 275 to 286
int numOfExtraColumns = numOfExtraColumns(deletes);
if (numOfExtraColumns > 0) {
int newLength = arrowColumnVectors.length - numOfExtraColumns;
// In DeleteFilter.fileProjection, the columns for missingIds (the columns required
// for equality delete or ROW_POSITION) are appended to the end of the expectedSchema.
// Therefore, these extra columns can be removed from the end of arrowColumnVectors.
ColumnVector[] newColumns = Arrays.copyOf(arrowColumnVectors, newLength);
return new ColumnarBatch(newColumns, columnarBatch.numRows());
} else {
return columnarBatch;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about refactor it like this, so that method numOfExtraColumns is not needed? Can we also move all related comments to this method's Java doc?

      int expectedColumnSize = deletes.expectedSchema().columns().size();
      if (arrowColumnVectors.length > expectedColumnSize) {
        ColumnVector[] newColumns = Arrays.copyOf(arrowColumnVectors, expectedColumnSize);
        return new ColumnarBatch(newColumns, columnarBatch.numRows());
      } else {
        return columnarBatch;
      }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! Changed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants