Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support spark.sql.parquet.datetimeRebaseModeInRead=LEGACY and spark.sql.parquet.int96RebaseModeInRead=LEGACY [databricks] #9649

Merged
merged 62 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
e368aa6
Add check for nested types
ttnghia Aug 28, 2023
7da416b
Recursively check for rebasing
ttnghia Nov 2, 2023
df8f861
Extract common code
ttnghia Nov 2, 2023
95d19ee
Allow nested type in rebase check
ttnghia Nov 2, 2023
b426610
Enable nested timestamp in roundtrip test
ttnghia Nov 2, 2023
7343b17
Fix another test
ttnghia Nov 2, 2023
0d48f57
Merge branch 'check_rebase_nested' into rebase_datatime
ttnghia Nov 2, 2023
024e6c9
Enable `LEGACY` rebase in read
ttnghia Nov 2, 2023
9a39628
Remove comment
ttnghia Nov 2, 2023
e686bb0
Change function/class signatures
ttnghia Nov 2, 2023
b49963e
Merge branch 'branch-23.12' into rebase_datatime
ttnghia Nov 3, 2023
2c232f8
Complete modification
ttnghia Nov 3, 2023
ac0f3e4
Misc
ttnghia Nov 3, 2023
c773794
Add explicit type
ttnghia Nov 3, 2023
29df7cd
Rename file and add some stuff in DateTimeRebaseHelpers.scala
ttnghia Nov 3, 2023
1b5112d
Move file and rename class
ttnghia Nov 4, 2023
63342a9
Adopt new enum type
ttnghia Nov 4, 2023
6b2d795
Add name for the enum classes
ttnghia Nov 4, 2023
37aa40b
Change exception messages
ttnghia Nov 4, 2023
d4cdc1b
Merge branch 'branch-23.12' into refactor_parquet_scan
ttnghia Nov 4, 2023
03f681e
Does not yet support legacy rebase in read
ttnghia Nov 5, 2023
14f230f
Change legacy to corrected mode
ttnghia Nov 5, 2023
1b464ec
Extract common code
ttnghia Nov 5, 2023
0d26d97
Rename functions
ttnghia Nov 5, 2023
c2504fd
Reformat
ttnghia Nov 5, 2023
edb6c81
Make classes serializable
ttnghia Nov 5, 2023
ea86e8f
Revert "Support rebase checking for nested dates and timestamps (#9617)"
ttnghia Nov 6, 2023
b14463f
Merge branch 'refactor_parquet_scan' into rebase_datatime
ttnghia Nov 6, 2023
adc8ae2
Implement date time rebase
ttnghia Nov 6, 2023
791573c
Optimize rebase op
ttnghia Nov 6, 2023
54e959f
Merge branch 'branch-23.12' into refactor_parquet_scan
ttnghia Nov 6, 2023
3f01690
Change comment
ttnghia Nov 6, 2023
6d9c20b
Merge branch 'refactor_parquet_scan' into rebase_datatime
ttnghia Nov 6, 2023
8c63273
Move tests
ttnghia Nov 6, 2023
1b1fdc3
Add test for datatime rebase
ttnghia Nov 6, 2023
e6559ce
Various changes
ttnghia Nov 6, 2023
74fe84a
Various changes
ttnghia Nov 6, 2023
a455a90
Fix compile errors
ttnghia Nov 6, 2023
b87493c
Fix comments
ttnghia Nov 6, 2023
321e516
Fix indentations
ttnghia Nov 6, 2023
4bc33be
Merge branch 'refactor_parquet_scan' into rebase_datatime
ttnghia Nov 6, 2023
4aab36b
Change comments and indentations
ttnghia Nov 6, 2023
c615925
Merge branch 'branch-23.12' into rebase_datatime
ttnghia Nov 7, 2023
be92368
Write different timestamp types in test
ttnghia Nov 7, 2023
b09c61f
Fix conversion if timestamp is not micros
ttnghia Nov 7, 2023
00d96e4
Rename var
ttnghia Nov 7, 2023
7d81311
Dont have to down cast after up cast
ttnghia Nov 7, 2023
116bf3e
Change comment
ttnghia Nov 7, 2023
273b2c4
Still cast timestamp to the old type after rebasing
ttnghia Nov 7, 2023
996d9d4
Rename test
ttnghia Nov 7, 2023
5fd6ef5
Should not transform non-datetime types
ttnghia Nov 7, 2023
4144655
Fix test
ttnghia Nov 7, 2023
5a8b44c
Update tests
ttnghia Nov 7, 2023
247f47f
Change tests
ttnghia Nov 7, 2023
fbe37d7
Merge branch 'branch-23.12' into rebase_datatime
ttnghia Nov 7, 2023
4a92d54
Change configs
ttnghia Nov 8, 2023
238d7dc
Merge branch 'branch-23.12' into rebase_datatime
ttnghia Nov 10, 2023
29367d4
Merge branch 'branch-23.12' into rebase_datatime
ttnghia Nov 14, 2023
bd9ee93
Add comment for `TimestampNTZType`
ttnghia Nov 14, 2023
8cf4820
Test with mixed rebase configs
ttnghia Nov 14, 2023
74bef2e
Handle different rebase configs
ttnghia Nov 14, 2023
8930fbd
Merge branch 'branch-23.12' into rebase_datatime
ttnghia Nov 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 30 additions & 66 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,11 @@ def test_parquet_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1
lambda spark: rf(spark).select(f.col('a') >= s0),
conf=all_confs)

parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS']

parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS']

# Once https://github.com/NVIDIA/spark-rapids/issues/1126 is fixed delete this test and merge it
# into test_ts_read_round_trip nested timestamps and dates are not supported right now.
# into test_parquet_read_roundtrip_datetime
@pytest.mark.parametrize('gen', [ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))),
ArrayGen(ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))))], ids=idfn)
@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
Expand All @@ -334,50 +334,35 @@ def test_parquet_ts_read_round_trip_nested(gen, spark_tmp_path, ts_write, ts_reb
lambda spark : spark.read.parquet(data_path),
conf=all_confs)

# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with
# timestamp_gen
@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))], ids=idfn)
@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'LEGACY'])
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_ts_read_round_trip(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs):
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : unary_op_df(spark, gen).write.parquet(data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase,
'spark.sql.legacy.parquet.int96RebaseModeInWrite': ts_rebase,
'spark.sql.parquet.outputTimestampType': ts_write})
all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf=all_confs)

def readParquetCatchException(spark, data_path):
with pytest.raises(Exception) as e_info:
df = spark.read.parquet(data_path).collect()
assert e_info.match(r".*SparkUpgradeException.*")
parquet_gens_legacy_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
revans2 marked this conversation as resolved.
Show resolved Hide resolved
string_gen, boolean_gen, date_gen, timestamp_gen]]

# Once https://github.com/NVIDIA/spark-rapids/issues/1126 is fixed nested timestamps and dates should be added in
# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with
# timestamp_gen
@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))], ids=idfn)
@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase', ['LEGACY'])
@pytest.mark.parametrize('parquet_gens', parquet_gens_legacy_list, ids=idfn)
@pytest.mark.parametrize('ts_type', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase_write', ['CORRECTED', 'LEGACY'])
revans2 marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.parametrize('ts_rebase_read', ['CORRECTED', 'LEGACY'])
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_ts_read_fails_datetime_legacy(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs):
def test_parquet_read_roundtrip_datetime(spark_tmp_path, parquet_gens, ts_type,
ts_rebase_write, ts_rebase_read,
reader_confs, v1_enabled_list):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
write_confs = {'spark.sql.parquet.outputTimestampType': ts_type,
'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase_write,
'spark.sql.legacy.parquet.int96RebaseModeInWrite': ts_rebase_write}

with_cpu_session(
lambda spark : unary_op_df(spark, gen).write.parquet(data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase,
'spark.sql.legacy.parquet.int96RebaseModeInWrite': ts_rebase,
'spark.sql.parquet.outputTimestampType': ts_write})
all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
with_gpu_session(
lambda spark : readParquetCatchException(spark, data_path),
conf=all_confs)
lambda spark: gen_df(spark, gen_list).write.parquet(data_path),
conf=write_confs)
read_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.legacy.parquet.datetimeRebaseModeInRead': ts_rebase_read,
'spark.sql.legacy.parquet.int96RebaseModeInRead': ts_rebase_read})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path),
conf=read_confs)

# This is legacy format, which is totally different from datatime legacy rebase mode.
@pytest.mark.parametrize('parquet_gens', [[byte_gen, short_gen, decimal_gen_32bit], decimal_gens,
[ArrayGen(decimal_gen_32bit, max_length=10)],
[StructGen([['child0', decimal_gen_32bit]])]], ids=idfn)
Expand All @@ -388,32 +373,11 @@ def test_parquet_decimal_read_legacy(spark_tmp_path, parquet_gens, read_func, re
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.parquet(data_path),
conf={'spark.sql.parquet.writeLegacyFormat': 'true'})
lambda spark : gen_df(spark, gen_list).write.parquet(data_path),
conf={'spark.sql.parquet.writeLegacyFormat': 'true'})
all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(read_func(data_path), conf=all_confs)


parquet_gens_legacy_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens,
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133')),
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133'))]

@pytest.mark.parametrize('parquet_gens', parquet_gens_legacy_list, ids=idfn)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_parquet_read_round_trip_legacy(spark_tmp_path, parquet_gens, v1_enabled_list, reader_confs):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.parquet(data_path),
conf=rebase_write_legacy_conf)
all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@pytest.mark.parametrize('batch_size', [100, INT_MAX])
Expand Down Expand Up @@ -1004,7 +968,7 @@ def test_parquet_reading_from_unaligned_pages_basic_filters_with_nulls(spark_tmp


conf_for_parquet_aggregate_pushdown = {
"spark.sql.parquet.aggregatePushdown": "true",
"spark.sql.parquet.aggregatePushdown": "true",
"spark.sql.sources.useV1SourceList": ""
}

Expand Down Expand Up @@ -1491,15 +1455,15 @@ def test_parquet_read_count(spark_tmp_path):
def test_read_case_col_name(spark_tmp_path, read_func, v1_enabled_list, reader_confs, col_name):
all_confs = copy_and_update(reader_confs, {
'spark.sql.sources.useV1SourceList': v1_enabled_list})
gen_list =[('k0', LongGen(nullable=False, min_val=0, max_val=0)),
gen_list =[('k0', LongGen(nullable=False, min_val=0, max_val=0)),
('k1', LongGen(nullable=False, min_val=1, max_val=1)),
('k2', LongGen(nullable=False, min_val=2, max_val=2)),
('k3', LongGen(nullable=False, min_val=3, max_val=3)),
('v0', LongGen()),
('v1', LongGen()),
('v2', LongGen()),
('v3', LongGen())]

gen = StructGen(gen_list, nullable=False)
data_path = spark_tmp_path + '/PAR_DATA'
reader = read_func(data_path)
Expand Down
114 changes: 85 additions & 29 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import com.nvidia.spark.rapids.ParquetPartitionReader.{CopyRange, LocalCopy}
import com.nvidia.spark.rapids.RapidsConf.ParquetFooterReaderType
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.filecache.FileCache
import com.nvidia.spark.rapids.jni.{ParquetFooter, SplitAndRetryOOM}
import com.nvidia.spark.rapids.jni.{DateTimeRebase, ParquetFooter, SplitAndRetryOOM}
import com.nvidia.spark.rapids.shims.{GpuParquetCrypto, GpuTypeShims, ParquetLegacyNanoAsLongShims, ParquetSchemaClipShims, ParquetStringPredShims, ReaderUtils, ShimFilePartitionReaderFactory, SparkShimImpl}
import org.apache.commons.io.IOUtils
import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream}
Expand Down Expand Up @@ -156,24 +156,7 @@ object GpuParquetScan {
tagSupport(scan.sparkSession, schema, scanMeta)
}

def throwIfRebaseNeeded(table: Table, dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode): Unit = {
(0 until table.getNumberOfColumns).foreach { i =>
val col = table.getColumn(i)
if (dateRebaseMode != DateTimeRebaseCorrected &&
DateTimeRebaseUtils.isDateRebaseNeededInRead(col)) {
throw DataSourceUtils.newRebaseExceptionInRead("Parquet")
}
else if (timestampRebaseMode != DateTimeRebaseCorrected &&
DateTimeRebaseUtils.isTimeRebaseNeededInRead(col)) {
throw DataSourceUtils.newRebaseExceptionInRead("Parquet")
}
}
}

def tagSupport(
sparkSession: SparkSession,
readSchema: StructType,
def tagSupport(sparkSession: SparkSession, readSchema: StructType,
meta: RapidsMeta[_, _, _]): Unit = {
val sqlConf = sparkSession.conf

Expand All @@ -196,10 +179,12 @@ object GpuParquetScan {
val schemaHasTimestamps = readSchema.exists { field =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType])
}
def isTsOrDate(dt: DataType) : Boolean = dt match {

def isTsOrDate(dt: DataType): Boolean = dt match {
case TimestampType | DateType => true
revans2 marked this conversation as resolved.
Show resolved Hide resolved
case _ => false
}

val schemaMightNeedNestedRebase = readSchema.exists { field =>
if (DataTypeUtils.isNestedType(field.dataType)) {
TrampolineUtil.dataTypeExistsRecursively(field.dataType, isTsOrDate)
Expand Down Expand Up @@ -316,17 +301,85 @@ object GpuParquetScan {
* @return the updated target batch size.
*/
def splitTargetBatchSize(targetBatchSize: Long, useChunkedReader: Boolean): Long = {
if (!useChunkedReader) {
if (!useChunkedReader) {
throw new SplitAndRetryOOM("GPU OutOfMemory: could not split inputs " +
"chunked parquet reader is configured off")
"chunked parquet reader is configured off")
}
val ret = targetBatchSize / 2
if (targetBatchSize < minTargetBatchSizeMiB * 1024 * 1024) {
throw new SplitAndRetryOOM("GPU OutOfMemory: could not split input " +
s"target batch size to less than $minTargetBatchSizeMiB MiB")
throw new SplitAndRetryOOM("GPU OutOfMemory: could not split input " +
s"target batch size to less than $minTargetBatchSizeMiB MiB")
}
ret
}

def throwIfRebaseNeededInExceptionMode(table: Table, dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode): Unit = {
(0 until table.getNumberOfColumns).foreach { i =>
val col = table.getColumn(i)
if (dateRebaseMode == DateTimeRebaseException &&
DateTimeRebaseUtils.isDateRebaseNeededInRead(col)) {
throw DataSourceUtils.newRebaseExceptionInRead("Parquet")
}
else if (timestampRebaseMode == DateTimeRebaseException &&
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
DateTimeRebaseUtils.isTimeRebaseNeededInRead(col)) {
throw DataSourceUtils.newRebaseExceptionInRead("Parquet")
}
}
}

def rebaseDateTime(table: Table, dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode): Table = {
val tableHasDate = (0 until table.getNumberOfColumns).exists { i =>
checkTypeRecursively(table.getColumn(i), { dtype => dtype == DType.TIMESTAMP_DAYS })
}
val tableHasTimestamp = (0 until table.getNumberOfColumns).exists { i =>
checkTypeRecursively(table.getColumn(i), { dtype => dtype.isTimestampType })
}

if ((tableHasDate && dateRebaseMode == DateTimeRebaseLegacy) ||
revans2 marked this conversation as resolved.
Show resolved Hide resolved
(tableHasTimestamp && timestampRebaseMode == DateTimeRebaseLegacy)) {
// Need to close the input table when returning a new table.
withResource(table) { tmpTable =>
val newColumns = (0 until tmpTable.getNumberOfColumns).map { i =>
deepTransformRebaseDateTime(tmpTable.getColumn(i))
}
withResource(newColumns) { newCols =>
new Table(newCols: _*)
}
}
} else {
table
}
}

private def checkTypeRecursively(column: ColumnView, f: DType => Boolean): Boolean = {
column.getType match {
case DType.LIST | DType.STRUCT => (0 until column.getNumChildren).exists(i =>
withResource(column.getChildColumnView(i)) { child =>
checkTypeRecursively(child, f)
})
case t: DType => f(t)
}
}

private def deepTransformRebaseDateTime(cv: ColumnVector): ColumnVector = {
ColumnCastUtil.deepTransform(cv) {
case (cv, _) if cv.getType.isTimestampType =>
if (cv.getType == DType.TIMESTAMP_DAYS || cv.getType == DType.TIMESTAMP_MICROSECONDS) {
DateTimeRebase.rebaseJulianToGregorian(cv)
} else {
// This is just a backup: it should not be reached out since timestamps is already
// converted into MICROSECONDS when reading Parquet files.
val oldType = cv.getType
withResource(cv.castTo(DType.TIMESTAMP_MICROSECONDS)) { cvAsMicros =>
withResource(DateTimeRebase.rebaseJulianToGregorian(cvAsMicros)) { rebasedTs =>
rebasedTs.castTo(oldType)
}
}
}
}
}
}

// contains meta about all the blocks in a file
Expand Down Expand Up @@ -2627,17 +2680,19 @@ object MakeParquetTableProducer extends Logging {
logWarning(s"Wrote data for ${splits.mkString(", ")} to $p")
}
}
GpuParquetScan.throwIfRebaseNeeded(table, dateRebaseMode,
GpuParquetScan.throwIfRebaseNeededInExceptionMode(table, dateRebaseMode,
timestampRebaseMode)
if (readDataSchema.length < table.getNumberOfColumns) {
throw new QueryExecutionException(s"Expected ${readDataSchema.length} columns " +
s"but read ${table.getNumberOfColumns} from ${splits.mkString("; ")}")
}
}
metrics(NUM_OUTPUT_BATCHES) += 1
val ret = ParquetSchemaUtils.evolveSchemaIfNeededAndClose(table,
val evolvedSchemaTable = ParquetSchemaUtils.evolveSchemaIfNeededAndClose(table,
clippedParquetSchema, readDataSchema, isSchemaCaseSensitive, useFieldId)
new SingleGpuDataProducer(ret)
val outputTable = GpuParquetScan.rebaseDateTime(evolvedSchemaTable, dateRebaseMode,
timestampRebaseMode)
new SingleGpuDataProducer(outputTable)
}
}
}
Expand Down Expand Up @@ -2682,15 +2737,16 @@ case class ParquetTableReader(
}

closeOnExcept(table) { _ =>
GpuParquetScan.throwIfRebaseNeeded(table, dateRebaseMode, timestampRebaseMode)
GpuParquetScan.throwIfRebaseNeededInExceptionMode(table, dateRebaseMode, timestampRebaseMode)
if (readDataSchema.length < table.getNumberOfColumns) {
throw new QueryExecutionException(s"Expected ${readDataSchema.length} columns " +
s"but read ${table.getNumberOfColumns} from $splitsString")
}
}
metrics(NUM_OUTPUT_BATCHES) += 1
ParquetSchemaUtils.evolveSchemaIfNeededAndClose(table,
val evolvedSchemaTable = ParquetSchemaUtils.evolveSchemaIfNeededAndClose(table,
clippedParquetSchema, readDataSchema, isSchemaCaseSensitive, useFieldId)
GpuParquetScan.rebaseDateTime(evolvedSchemaTable, dateRebaseMode, timestampRebaseMode)
}

override def close(): Unit = {
Expand Down