diff --git a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala index daad4692..4b204fc8 100644 --- a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala +++ b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala @@ -56,6 +56,9 @@ object DataFrameOps { columnsFilter, columnsFilterKeep) + def dedupByAllExcept(ignoredColumn: String, aggFunction : String => Column = org.apache.spark.sql.functions.max) : DataFrame = + SparkDFUtils.dedupByAllExcept(df, ignoredColumn, aggFunction) + def flatten(colName: String): DataFrame = SparkDFUtils.flatten(df, colName) def changeSchema(newScheme: String*): DataFrame = diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala index 27324600..2b347c39 100644 --- a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala +++ b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala @@ -245,6 +245,36 @@ object SparkDFUtils { df2.sparkSession.createDataFrame(df2.rdd,ns) } +/** + * Method for reducing rows when there is one column whose value is not important, but you don't want to lose any + * actual data from the other rows. For example if a server creates events with an autogenerated event id, and sometimes + * events are duplicated. You don't want double rows just for the event ids, but if any of the other fields are distinct + * you want to keep the rows (with their event ids) + * @param df + * @param ignoredColumn The one column whose value you need only one of + * @param aggFunction Default is max + * @return + */ + def dedupByAllExcept(df: DataFrame, ignoredColumn: String, aggFunction : String => Column): DataFrame = { + val cols = df.schema.fields.filter(_.name != ignoredColumn).map(i => i.name) + val grouped = df.groupBy(cols.head, cols.tail: _*) + val resultWithAggregateFieldName = grouped.agg(aggFunction(ignoredColumn)) + + val pattern = s".*\\($ignoredColumn\\)" + val candidateFields = resultWithAggregateFieldName.schema.fields.filter(_.name.matches(pattern)) + + // Cleanly notify users if something unexpected with the aggregate function name has happened + if (candidateFields.isEmpty) { + throw new Exception(s"Aggregated field name not found after applying aggregate function to '$ignoredColumn'") + } else if (candidateFields.size > 1) { + throw new Exception(s"Multiple fields with names that match aggregate function pattern found for '$ignoredColumn'") + } + + // rename the aggregated field back to its original name and restore the original order + resultWithAggregateFieldName.withColumnRenamed(candidateFields(0).name, ignoredColumn). + select(df.columns.head, df.columns.tail:_*) + } + /** * Returns a DataFrame with the given column (should be a StructType) * replaced by its inner fields. diff --git a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala index 75fa8da9..ae5873cc 100644 --- a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala +++ b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala @@ -455,4 +455,38 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase { assertDataFrameEquals(expected, actual) } + + test("test_dedup_by-all_except") { + + val df = spark.createDataFrame( + Seq(("a", "a", 1, 2, "aa12", "a"), + ("a", "a", 1, 1, "aa11", "a"), + ("a", "a", 2, 1, "aa21", "a"), + ("a", "b", 3, 2, "ab32", "a"), + ("b", "a", 1, 1, "ba11", "a")) + ).toDF("col_grp1", "col_grp2", "col_grp3", "col_grp4", "col_grp5", "col_grp6") + + val noChange = df.dedupByAllExcept("col_grp1") + + assertDataFrameNoOrderEquals(df, noChange) + + val df2 = spark.createDataFrame( + Seq(("a", "a"), + ("b", "a"), + ("c", "c"), + ("d", "d"), + ("e", "e")) + ).toDF("col_grp1", "col_grp2") + + val loseRow = df2.dedupByAllExcept("col_grp1") + + val expected = spark.createDataFrame( + Seq(("b", "a"), + ("c", "c"), + ("d", "d"), + ("e", "e")) + ).toDF("col_grp1", "col_grp2") + + assertDataFrameNoOrderEquals(expected, loseRow) + } }