Skip to content

Commit

Permalink
Merge pull request twitter#462 from vidma/features/refactor-map-aggre…
Browse files Browse the repository at this point in the history
…gator

Refactor MapAggregator
  • Loading branch information
johnynek committed Jul 15, 2015
2 parents 917e6bf + 5cfe69f commit 2bdab21
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,24 +341,26 @@ class TupleAggregatorsTest extends WordSpec with Matchers {
val MinLongAgg = Aggregator.min[Int].andThenPresent{ _.toLong }

"Create an aggregator from 1 (key, aggregator) pair" in {
val agg: Aggregator[Int, Long, Map[String, Long]] = MapAggregator(
val agg: MapMonoidAggregator[Int, Long, String, Long] = MapAggregator(
("key1", SizeAgg))
val expectedMap = Map("key1" -> 6)
assert(agg(data) == expectedMap)
assert(agg.keys == expectedMap.keys)
}

"Create an aggregator from 2 (key, aggregator) pairs" in {
val agg: Aggregator[Int, Tuple2[Int, Long], Map[String, Long]] = MapAggregator(
val agg: MapAggregator[Int, Tuple2[Int, Long], String, Long] = MapAggregator(
("key1", MinLongAgg),
("key2", SizeAgg))
val expectedMap = Map(
"key1" -> 0,
"key2" -> 6)
assert(agg(data) == expectedMap)
assert(agg.keys == expectedMap.keys)
}

"Create an aggregator from 3 (key, aggregator) pairs" in {
val agg: Aggregator[Int, Tuple3[Int, Long, Int], Map[String, Long]] = MapAggregator(
val agg: MapAggregator[Int, Tuple3[Int, Long, Int], String, Long] = MapAggregator(
("key1", MinLongAgg),
("key2", SizeAgg),
("key3", MinLongAgg))
Expand All @@ -367,10 +369,11 @@ class TupleAggregatorsTest extends WordSpec with Matchers {
"key2" -> 6,
"key3" -> 0)
assert(agg(data) == expectedMap)
assert(agg.keys == expectedMap.keys)
}

"Create an aggregator from 4 (key, aggregator) pairs" in {
val agg: Aggregator[Int, Tuple4[Int, Long, Int, Long], Map[String, Long]] = MapAggregator(
val agg: MapAggregator[Int, Tuple4[Int, Long, Int, Long], String, Long] = MapAggregator(
("key1", MinLongAgg),
("key2", SizeAgg),
("key3", MinLongAgg),
Expand All @@ -381,6 +384,7 @@ class TupleAggregatorsTest extends WordSpec with Matchers {
"key3" -> 0,
"key4" -> 6)
assert(agg(data) == expectedMap)
assert(agg.keys == expectedMap.keys)
}

"Create an aggregator from 5 (key, aggregator) pairs" in {
Expand Down
89 changes: 56 additions & 33 deletions project/GenTupleAggregators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,25 @@ object MultiAggregator {
genMethods(22, "def", Some("apply"), true) + "\n" + "}")

val mapAggPlace = dir / "com" / "twitter" / "algebird" / "MapAggregator.scala"
IO.write(mapAggPlace,
"""package com.twitter.algebird
object MapAggregator {
""" +
genMapMethods(22) + "\n" +
genMapMethods(22, true) + "\n" + "}")
IO.write(
mapAggPlace,
s"""
|package com.twitter.algebird
|
|trait MapAggregator[A, B, K, C] extends Aggregator[A, B, Map[K, C]] {
| def keys: Set[K]
|}
|
|trait MapMonoidAggregator[A, B, K, C] extends MonoidAggregator[A, B, Map[K, C]] {
| def keys: Set[K]
|}
|
|object MapAggregator {
| ${genMapMethods(22)}
| ${genMapMethods(22, isMonoid = true)}
|}
""".stripMargin
)

Seq(tupleAggPlace, multiAggPlace, mapAggPlace)
}
Expand Down Expand Up @@ -66,39 +78,50 @@ object MapAggregator {
}

def genMapMethods(max: Int, isMonoid: Boolean = false): String = {
val aggType = if (isMonoid) "Monoid" else ""
val inputAggregatorType = if (isMonoid) "MonoidAggregator" else "Aggregator"
val mapAggregatorType = if (isMonoid) "MapMonoidAggregator" else "MapAggregator"

val semigroupType = if (isMonoid) "monoid" else "semigroup"

// there's no Semigroup[Tuple1[T]], so just use T as intermediary type instead of Tuple1[T]
// TODO: keys for 1 item
val aggregatorForOneItem = s"""
|def apply[K, A, B, C](aggDef: (K, ${aggType}Aggregator[A, B, C])): ${aggType}Aggregator[A, B, Map[K, C]] = {
| val (key, agg) = aggDef
| agg.andThenPresent(value => Map(key -> value))
|def apply[K, A, B, C](agg: (K, ${inputAggregatorType}[A, B, C])): ${mapAggregatorType}[A, B, K, C] = {
| new ${mapAggregatorType}[A, B, K, C] {
| def prepare(a: A) = agg._2.prepare(a)
| val ${semigroupType} = agg._2.${semigroupType}
| def present(b: B) = Map(agg._1 -> agg._2.present(b))
| def keys = Set(agg._1)
| }
|}
""".stripMargin

(2 to max).map(i => {
val nums = (1 to i)
val bs = nums.map("B" + _).mkString(", ")
val aggs = nums.map(x => "agg%s: Tuple2[K, %sAggregator[A, B%s, C]]".format(x, aggType, x)).mkString(", ")
val prepares = nums.map(x => "agg%s._2.prepare(a)".format(x)).mkString(", ")
val semiType = if (isMonoid) "monoid" else "semigroup"
val semigroups = nums.map(x => "agg%s._2.%s".format(x, semiType)).mkString(", ")
val semigroup = "new Tuple%d%s()(%s)".format(i, semiType.capitalize, semigroups)
val present = nums.map(x => "agg%s._1 -> agg%s._2.present(b._%s)".format(x, x, x)).mkString(", ")
val tupleBs = "Tuple%d[%s]".format(i, bs)
(2 to max).map(aggrCount => {
val aggrNums = 1 to aggrCount

"""
def apply[K, A, %s, C](%s): %sAggregator[A, %s, Map[K, C]] = {
new %sAggregator[A, %s, Map[K, C]] {
def prepare(a: A) = (%s)
val %s = %s
def present(b: %s) = Map(%s)
}
}""".format(bs, aggs, aggType, tupleBs,
aggType, tupleBs,
prepares,
semiType, semigroup,
tupleBs, present)
val inputAggs = aggrNums.map(i => s"agg$i: (K, ${inputAggregatorType}[A, B$i, C])").mkString(", ")

val bs = aggrNums.map("B" + _).mkString(", ")
val tupleBs = s"Tuple${aggrCount}[$bs]"

s"""
|def apply[K, A, $bs, C]($inputAggs): ${mapAggregatorType}[A, $tupleBs, K, C] = {
| new ${mapAggregatorType}[A, $tupleBs, K, C] {
| def prepare(a: A) = (
| ${aggrNums.map(i => s"agg${i}._2.prepare(a)").mkString(", ")}
| )
| // a field for semigroup/monoid that combines all input aggregators
| val $semigroupType = new Tuple${aggrCount}${semigroupType.capitalize}()(
| ${aggrNums.map(i => s"agg${i}._2.$semigroupType").mkString(", ")}
| )
| def present(b: $tupleBs) = Map(
| ${aggrNums.map(i => s"agg${i}._1 -> agg${i}._2.present(b._${i})").mkString(", ")}
| )
| def keys: Set[K] = Set(
| ${aggrNums.map(i => s"agg${i}._1").mkString(", ")}
| )
| }
|}""".stripMargin
}).mkString("\n") + aggregatorForOneItem
}
}

0 comments on commit 2bdab21

Please sign in to comment.