Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAFU-177 Add dedupByAllExcept #46

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ object DataFrameOps {
columnsFilter,
columnsFilterKeep)

def dedupByAllExcept(ignoredColumn: String, aggFunction : String => Column = org.apache.spark.sql.functions.max) : DataFrame =
SparkDFUtils.dedupByAllExcept(df, ignoredColumn, aggFunction)

def flatten(colName: String): DataFrame = SparkDFUtils.flatten(df, colName)

def changeSchema(newScheme: String*): DataFrame =
Expand Down
30 changes: 30 additions & 0 deletions datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,36 @@ object SparkDFUtils {
df2.sparkSession.createDataFrame(df2.rdd,ns)
}

/**
* Method for reducing rows when there is one column whose value is not important, but you don't want to lose any
* actual data from the other rows. For example if a server creates events with an autogenerated event id, and sometimes
* events are duplicated. You don't want double rows just for the event ids, but if any of the other fields are distinct
* you want to keep the rows (with their event ids)
* @param df
* @param ignoredColumn The one column whose value you need only one of
* @param aggFunction Default is max
* @return
*/
def dedupByAllExcept(df: DataFrame, ignoredColumn: String, aggFunction : String => Column): DataFrame = {
val cols = df.schema.fields.filter(_.name != ignoredColumn).map(i => i.name)
val grouped = df.groupBy(cols.head, cols.tail: _*)
val resultWithAggregateFieldName = grouped.agg(aggFunction(ignoredColumn))

val pattern = s".*\\($ignoredColumn\\)"
val candidateFields = resultWithAggregateFieldName.schema.fields.filter(_.name.matches(pattern))

// Cleanly notify users if something unexpected with the aggregate function name has happened
if (candidateFields.isEmpty) {
throw new Exception(s"Aggregated field name not found after applying aggregate function to '$ignoredColumn'")
} else if (candidateFields.size > 1) {
throw new Exception(s"Multiple fields with names that match aggregate function pattern found for '$ignoredColumn'")
}

// rename the aggregated field back to its original name and restore the original order
resultWithAggregateFieldName.withColumnRenamed(candidateFields(0).name, ignoredColumn).
select(df.columns.head, df.columns.tail:_*)
}

/**
* Returns a DataFrame with the given column (should be a StructType)
* replaced by its inner fields.
Expand Down
34 changes: 34 additions & 0 deletions datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -455,4 +455,38 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {

assertDataFrameEquals(expected, actual)
}

test("test_dedup_by-all_except") {

val df = spark.createDataFrame(
Seq(("a", "a", 1, 2, "aa12", "a"),
("a", "a", 1, 1, "aa11", "a"),
("a", "a", 2, 1, "aa21", "a"),
("a", "b", 3, 2, "ab32", "a"),
("b", "a", 1, 1, "ba11", "a"))
).toDF("col_grp1", "col_grp2", "col_grp3", "col_grp4", "col_grp5", "col_grp6")

val noChange = df.dedupByAllExcept("col_grp1")

assertDataFrameNoOrderEquals(df, noChange)

val df2 = spark.createDataFrame(
Seq(("a", "a"),
("b", "a"),
("c", "c"),
("d", "d"),
("e", "e"))
).toDF("col_grp1", "col_grp2")

val loseRow = df2.dedupByAllExcept("col_grp1")

val expected = spark.createDataFrame(
Seq(("b", "a"),
("c", "c"),
("d", "d"),
("e", "e"))
).toDF("col_grp1", "col_grp2")

assertDataFrameNoOrderEquals(expected, loseRow)
}
}