Skip to content

Commit

Permalink
Avoid Persisting NumRecord of RemoveFile to Checkpoint
Browse files Browse the repository at this point in the history
This PR avoids persisting NumRecord of RemoveFile action to checkpoint by removing this attribute from the constructor of RemoveFile object.

Resolves #1229.

This PR ensures that `RemoveFile.numRecords` field is not written out to the delta checkpoint.

We write out a checkpoint, and read it back as parquet, and ensure that its schema does not contain `numRecords`.

No.

Closes #1230.

GitOrigin-RevId: 518e46c0622cca4277729e9e6e7ebb08452619f3
  • Loading branch information
kamcheungting-db authored and vkorukanti committed Jul 11, 2022
1 parent d4b70a8 commit 5842b5f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,12 @@ case class AddFile(
dataChange: Boolean = true): RemoveFile = {
var newTags = tags
// scalastyle:off
RemoveFile(
val removedFile = RemoveFile(
path, Some(timestamp), dataChange,
extendedFileMetadata = Some(true), partitionValues, Some(size), newTags
)
// scalastyle:on
removedFile
}

@JsonIgnore
Expand Down Expand Up @@ -361,7 +362,6 @@ object AddFile {
* nullable by setting their type Option.
*/
// scalastyle:off
@JsonIgnoreProperties(Array("numRecords"))
case class RemoveFile(
path: String,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
Expand All @@ -371,14 +371,17 @@ case class RemoveFile(
partitionValues: Map[String, String] = null,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
size: Option[Long] = None,
tags: Map[String, String] = null,
numRecords: Option[Long] = None
tags: Map[String, String] = null
) extends FileAction {
override def wrap: SingleAction = SingleAction(remove = this)

@JsonIgnore
val delTimestamp: Long = deletionTimestamp.getOrElse(0L)

/** The number of records contained inside the removed file. */
@JsonIgnore
var numRecords: Option[Long] = None


@JsonIgnore
def optimizedTargetSize: Option[Long] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.hadoop.util.Progressable

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType


class CheckpointsSuite extends QueryTest
Expand Down Expand Up @@ -206,6 +207,54 @@ class CheckpointsSuite extends QueryTest
}
}
}

test("checkpoint does not contain remove.numRecords field") {
withTempDir { tempDir =>
var expectedRemoveFileSchema = Seq(
"path",
"deletionTimestamp",
"dataChange",
"extendedFileMetadata",
"partitionValues",
"size",
"tags")
val tablePath = tempDir.getAbsolutePath
// Append rows [0, 9] to table and merge tablePath.
spark.range(end = 10).write.format("delta").mode("overwrite").save(tablePath)
spark.range(5, 15).createOrReplaceTempView("src")
sql(
s"""
|MERGE INTO delta.`$tempDir` t USING src s ON t.id = s.id
|WHEN MATCHED THEN DELETE
|WHEN NOT MATCHED THEN INSERT *
|""".stripMargin)
val deltaLog = DeltaLog.forTable(spark, tablePath)
deltaLog.checkpoint()
var checkpointFile = FileNames.checkpointFileSingular(deltaLog.logPath, 1).toString
var checkpointSchema = spark.read.format(source = "parquet").load(checkpointFile).schema
var removeSchemaName = checkpointSchema("remove").dataType.asInstanceOf[StructType].fieldNames
assert(removeSchemaName.toSeq === expectedRemoveFileSchema)
checkAnswer(
spark.sql(s"select * from delta.`$tablePath`"),
Seq(0, 1, 2, 3, 4, 10, 11, 12, 13, 14).map { i => Row(i) })
// Append rows [0, 9] to table and merge one more time.
spark.range(end = 10).write.format("delta").mode("append").save(tablePath)
sql(
s"""
|MERGE INTO delta.`$tempDir` t USING src s ON t.id = s.id
|WHEN MATCHED THEN DELETE
|WHEN NOT MATCHED THEN INSERT *
|""".stripMargin)
deltaLog.checkpoint()
checkpointFile = FileNames.checkpointFileSingular(deltaLog.logPath, 1).toString
checkpointSchema = spark.read.format(source = "parquet").load(checkpointFile).schema
removeSchemaName = checkpointSchema("remove").dataType.asInstanceOf[StructType].fieldNames
assert(removeSchemaName.toSeq === expectedRemoveFileSchema)
checkAnswer(
spark.sql(s"select * from delta.`$tablePath`"),
Seq(0, 0, 1, 1, 2, 2, 3, 3, 4, 4).map { i => Row(i) })
}
}
}

/**
Expand Down

0 comments on commit 5842b5f

Please sign in to comment.