Skip to content

Commit

Permalink
#697 Add conflict resolution logic to SparkUtils.copyMetadata.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Jul 31, 2024
1 parent 6d89cec commit 1417d55
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,17 +243,26 @@ object SparkUtils extends Logging {
/**
* Copies metadata from one schema to another as long as names and data types are the same.
*
* @param schemaFrom Schema to copy metadata from.
* @param schemaTo Schema to copy metadata to.
* @param overwrite If true, the metadata of schemaTo is not retained
* @param schemaFrom Schema to copy metadata from.
* @param schemaTo Schema to copy metadata to.
* @param overwrite If true, the metadata of schemaTo is not retained
* @param sourcePreferred If true, schemaFrom metadata is used on conflicts, schemaTo otherwise.
* @return Same schema as schemaTo with metadata from schemaFrom.
*/
def copyMetadata(schemaFrom: StructType, schemaTo: StructType, overwrite: Boolean = false): StructType = {
def copyMetadata(schemaFrom: StructType,
schemaTo: StructType,
overwrite: Boolean = false,
sourcePreferred: Boolean = false): StructType = {
def joinMetadata(from: Metadata, to: Metadata): Metadata = {
val newMetadataMerged = new MetadataBuilder

newMetadataMerged.withMetadata(from)
newMetadataMerged.withMetadata(to)
if (sourcePreferred) {
newMetadataMerged.withMetadata(to)
newMetadataMerged.withMetadata(from)
} else {
newMetadataMerged.withMetadata(from)
newMetadataMerged.withMetadata(to)
}

newMetadataMerged.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,58 @@ class SparkUtilsSuite extends AnyFunSuite with SparkTestBase with BinaryFileFixt
assert(newDf.schema.fields.head.metadata.getLong("maxLength") == 120)
}

test("copyMetadata should retain metadata on conflicts by default") {
val df1 = List(1, 2, 3).toDF("col1")
val df2 = List(1, 2, 3).toDF("col1")

val metadata1 = new MetadataBuilder()
metadata1.putString("comment", "Test")
metadata1.putLong("maxLength", 100)

val metadata2 = new MetadataBuilder()
metadata2.putLong("maxLength", 120)
metadata2.putLong("newMetadata", 180)

val schema1WithMetadata = StructType(Seq(df1.schema.fields.head.copy(metadata = metadata1.build())))
val schema2WithMetadata = StructType(Seq(df2.schema.fields.head.copy(metadata = metadata2.build())))

val df1WithMetadata = spark.createDataFrame(df2.rdd, schema1WithMetadata)

val schemaWithMetadata = SparkUtils.copyMetadata(df1WithMetadata.schema, schema2WithMetadata)

val newDf = spark.createDataFrame(df2.rdd, schemaWithMetadata)

assert(newDf.schema.fields.head.metadata.getString("comment") == "Test")
assert(newDf.schema.fields.head.metadata.getLong("maxLength") == 120)
assert(newDf.schema.fields.head.metadata.getLong("newMetadata") == 180)
}

test("copyMetadata should overwrite metadata on conflicts when sourcePreferred=true") {
val df1 = List(1, 2, 3).toDF("col1")
val df2 = List(1, 2, 3).toDF("col1")

val metadata1 = new MetadataBuilder()
metadata1.putString("comment", "Test")
metadata1.putLong("maxLength", 100)

val metadata2 = new MetadataBuilder()
metadata2.putLong("maxLength", 120)
metadata2.putLong("newMetadata", 180)

val schema1WithMetadata = StructType(Seq(df1.schema.fields.head.copy(metadata = metadata1.build())))
val schema2WithMetadata = StructType(Seq(df2.schema.fields.head.copy(metadata = metadata2.build())))

val df1WithMetadata = spark.createDataFrame(df2.rdd, schema1WithMetadata)

val schemaWithMetadata = SparkUtils.copyMetadata(df1WithMetadata.schema, schema2WithMetadata, sourcePreferred = true)

val newDf = spark.createDataFrame(df2.rdd, schemaWithMetadata)

assert(newDf.schema.fields.head.metadata.getString("comment") == "Test")
assert(newDf.schema.fields.head.metadata.getLong("maxLength") == 100)
assert(newDf.schema.fields.head.metadata.getLong("newMetadata") == 180)
}

test("copyMetadata should not retain original metadata when overwrite = true") {
val df1 = List(1, 2, 3).toDF("col1")
val df2 = List(1, 2, 3).toDF("col1")
Expand Down

0 comments on commit 1417d55

Please sign in to comment.