Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Pass the semigroup to create Mergeable online #687

Merged
merged 2 commits into from
Oct 5, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,19 @@ object CombinedServiceStoreFactory {
def apply[K, V](onlineStore: => MergeableStore[(K, BatchID), V], batchesToKeep: Int)(implicit b: Batcher) = {

lazy val clientStore = ClientStore[K, V](onlineStore, batchesToKeep)(b, onlineStore.semigroup)

new CombinedServiceStoreFactory[K, V] {
def mergeableStore = () => onlineStore
def mergeableBatcher = b
def serviceStore = () => clientStore
}
from[K, V](clientStore, onlineStore, b)
}

def apply[K, V](offlineStore: => ReadableStore[K, (BatchID, V)], onlineStore: => MergeableStore[(K, BatchID), V], batchesToKeep: Int)(implicit b: Batcher) = {

lazy val clientStore = ClientStore[K, V](offlineStore, onlineStore, batchesToKeep)(b, onlineStore.semigroup)
from[K, V](clientStore, onlineStore, b)
}

private[this] def from[K, V](cs: => ReadableStore[K, V], online: => MergeableStore[(K, BatchID), V], b: Batcher): CombinedServiceStoreFactory[K, V] =
new CombinedServiceStoreFactory[K, V] {
def mergeableStore = () => onlineStore
def mergeableStore = _ => online
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistency between _ and ()

Copy link
Collaborator Author

@johnynek johnynek Sep 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean? They mean different things. _: Semigroup[V] is being ignored here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, of course

def mergeableBatcher = b
def serviceStore = () => clientStore
def serviceStore = () => cs
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.twitter.summingbird.online

import com.twitter.algebird.Semigroup
import com.twitter.storehaus.algebra.{ MergeableStore, Mergeable, StoreAlgebra }
import com.twitter.summingbird.batch.{ Batcher, BatchID }

Expand All @@ -26,23 +27,28 @@ import com.twitter.summingbird.batch.{ Batcher, BatchID }
*/
object MergeableStoreFactory {

def apply[K, V](store: () => Mergeable[K, V], batcher: Batcher) = {
def apply[K, V](store: () => Mergeable[K, V], batcher: Batcher) =
fromWithSemigroup { _: Semigroup[V] => store() }(batcher)

def from[K, V](store: => Mergeable[(K, BatchID), V])(implicit batcher: Batcher): MergeableStoreFactory[(K, BatchID), V] =
fromWithSemigroup(_ => store)

def fromWithSemigroup[K, V](fn: Semigroup[V] => Mergeable[K, V])(implicit batcher: Batcher): MergeableStoreFactory[K, V] =
new MergeableStoreFactory[K, V] {
def mergeableStore = store
def mergeableStore = fn
def mergeableBatcher = batcher
}
}

def from[K, V](store: => Mergeable[(K, BatchID), V])(implicit batcher: Batcher): MergeableStoreFactory[(K, BatchID), V] =
apply({ () => store }, batcher)

def fromOnlineOnly[K, V](store: => MergeableStore[K, V]): MergeableStoreFactory[(K, BatchID), V] = {
implicit val batcher = Batcher.unit
from(store.convert { k: (K, BatchID) => k._1 })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromOnlineOnlyWithSemigroup(_ => store) ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call.

}

def fromOnlineOnlyWithSemigroup[K, V](fn: Semigroup[V] => MergeableStore[K, V]): MergeableStoreFactory[(K, BatchID), V] =
fromWithSemigroup(fn.andThen(_.convert { k: (K, BatchID) => k._1 }))(Batcher.unit)
}

trait MergeableStoreFactory[-K, V] extends java.io.Serializable {
def mergeableStore: () => Mergeable[K, V]
def mergeableStore: Semigroup[V] => Mergeable[K, V]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point it seems like this method should be named like a function and not a property. Earlier it was in the spirit of lazily created mergeable store so the name was ok. Should we call it createMergeableStore?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I'm a little nervous about that because it lead to a lot of serialization problems before (which is why we did this). Calling a method is still lazy, so def mergeableStore = store would have been lazy, but still we got burned enough to add this extra layer).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mean to suggest we change the structure, just the name. MergeableStoreFactory.mergeableStore reads like a property which would provide a mergeable store but it is a function that takes an input to provide a mergeable store.

e.g. when I read factory.mergeableStore I would think that this is a mergeable store object, finding out that it is a function that needs a semigroup as input to produce a mergeable store would be a surprise.

def mergeableBatcher: Batcher
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,3 @@ class WrappedTSInMergeable[K, V](self: Mergeable[K, V]) extends Mergeable[K, (Ti
})
}
}

object MergeableStoreFactoryAlgebra {
/*
Our tuples that we hand to the store are of the form ((K, BatchID), (Timestamp, V))
but in our store we only store ((K, BatchID), V). That is we don't include the timestamp.
We need these timestamps to continue processing downstream however, so we use a Right timestamp to say
the last value is taken. (Which may not be the max(TS)).

The merge operation here takes the inbound value of (Timestamp, V), performs the inner merge from the store.
Then looks back up the timestamp handed from the stream and outputs with that.
*/
def wrapOnlineFactory[K, V](supplier: MergeableStoreFactory[K, V]): MergeableStoreFactory[K, (Timestamp, V)] =
{
val mergeable: () => Mergeable[K, (Timestamp, V)] =
() => { new WrappedTSInMergeable(supplier.mergeableStore()) }

MergeableStoreFactory[K, (Timestamp, V)](mergeable, supplier.mergeableBatcher)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import scala.util.control.NonFatal
*/

class Summer[Key, Value: Semigroup, Event, S, D, RC](
@transient storeSupplier: MergeableStoreFactory[Key, Value],
@transient storeSupplier: () => Mergeable[Key, Value],
@transient flatMapOp: FlatMapOperation[(Key, (Option[Value], Value)), Event],
@transient successHandler: OnlineSuccessHandler,
@transient exceptionHandler: OnlineExceptionHandler,
Expand Down Expand Up @@ -86,7 +86,7 @@ class Summer[Key, Value: Semigroup, Event, S, D, RC](

override def init(runtimeContext: RC) {
super.init(runtimeContext)
storePromise.setValue(storeBox.get.mergeableStore())
storePromise.setValue(storeBox.get())
store.toString // Do the lazy evaluation now so we can connect before tuples arrive.

successHandlerOpt = if (includeSuccessHandler.get) Some(successHandlerBox.get) else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,6 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
type ExecutorValueType = (Timestamp, V)
type ExecutorOutputType = (Timestamp, (K, (Option[V], V)))

val supplier: MergeableStoreFactory[ExecutorKeyType, V] = summer.store match {
case m: MergeableStoreFactory[ExecutorKeyType, V] => m
case _ => sys.error("Should never be able to get here, looking for a MergeableStoreFactory from %s".format(summer.store))
}

val wrappedStore: MergeableStoreFactory[ExecutorKeyType, ExecutorValueType] =
MergeableStoreFactoryAlgebra.wrapOnlineFactory(supplier)

val anchorTuples = getOrElse(stormDag, node, AnchorTuples.default)
val metrics = getOrElse(stormDag, node, DEFAULT_SUMMER_STORM_METRICS)
val shouldEmit = stormDag.dependantsOf(node).size > 0
Expand All @@ -218,6 +210,8 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
val flatmapOp: FlatMapOperation[(ExecutorKeyType, (Option[ExecutorValueType], ExecutorValueType)), ExecutorOutputType] =
FlatMapOperation.apply(storeBaseFMOp)

val supplier: MergeableStoreFactory[ExecutorKeyType, V] = summer.store

val sinkBolt = BaseBolt(
jobID,
metrics.metrics,
Expand All @@ -227,7 +221,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
ackOnEntry,
maxExecutePerSec,
new executor.Summer(
wrappedStore,
() => new WrappedTSInMergeable(supplier.mergeableStore(semigroup)),
flatmapOp,
getOrElse(stormDag, node, DEFAULT_ONLINE_SUCCESS_HANDLER),
getOrElse(stormDag, node, DEFAULT_ONLINE_EXCEPTION_HANDLER),
Expand Down