Skip to content
This repository has been archived by the owner on Sep 10, 2021. It is now read-only.

Commit

Permalink
test using scalding 0.18
Browse files Browse the repository at this point in the history
  • Loading branch information
johnynek committed Feb 16, 2018
1 parent 4e0b832 commit d0e156e
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 17 deletions.
21 changes: 9 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def scalaBinaryVersion(scalaVersion: String) = scalaVersion match {

def isScala210x(scalaVersion: String) = scalaBinaryVersion(scalaVersion) == "2.10"

val algebirdVersion = "0.12.0"
val algebirdVersion = "0.13.3"
val bijectionVersion = "0.9.1"
val chillVersion = "0.7.3"
val commonsHttpClientVersion = "3.1"
Expand All @@ -27,7 +27,7 @@ val log4jVersion = "1.2.16"
val novocodeJunitVersion = "0.10"
val scalaCheckVersion = "1.12.2"
val scalatestVersion = "2.2.4"
val scaldingVersion = "0.16.0-RC3"
val scaldingVersion = "0.18.0-RC1-stripe"
val slf4jVersion = "1.6.6"
val storehausVersion = "0.13.0"
val stormDep = "storm" % "storm" % "0.9.0-wip15" //This project also compiles with the latest storm, which is in fact required to run the example
Expand Down Expand Up @@ -99,16 +99,13 @@ val sharedSettings = extraSettings ++ Seq(
ReleaseStep(action = Command.process("sonatypeReleaseAll", _)),
pushChanges),

publishTo := {
val v = version.value
Some(
if (v.trim.toUpperCase.endsWith("SNAPSHOT"))
Opts.resolver.sonatypeSnapshots
else
Opts.resolver.sonatypeStaging
//"twttr" at "http://artifactory.local.twitter.com/libs-releases-local"
)
},
publishTo := Some(
if (version.value.trim.endsWith("SNAPSHOT")) {
sys.props.get("snapshots.url").map("snapshots" at _).getOrElse(Opts.resolver.sonatypeSnapshots)
} else {
sys.props.get("releases.url").map("releases" at _).getOrElse(Opts.resolver.sonatypeStaging)
}
),

pomExtra := (
<url>https://github.com/twitter/summingbird</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ object BatchID {
.flatMap {
case (min, max, cnt) =>
if ((min + cnt) == (max + 1L)) {
Some(Interval.leftClosedRightOpen(min, max.next).right.get)
Some(Interval.leftClosedRightOpen(min, max.next): Interval[BatchID])
} else {
// These batches are not contiguous, not an interval
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,11 @@ object Timestamp {
implicit val timestampSuccessible: Successible[Timestamp] = new Successible[Timestamp] {
def next(old: Timestamp) = if (old.milliSinceEpoch != Long.MaxValue) Some(old.next) else None
def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp
def partialOrdering = Timestamp.orderingOnTimestamp
}

implicit val timestampPredecessible: Predecessible[Timestamp] = new Predecessible[Timestamp] {
def prev(old: Timestamp) = if (old.milliSinceEpoch != Long.MinValue) Some(old.prev) else None
def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp
def partialOrdering = Timestamp.orderingOnTimestamp
}

// This is a right semigroup, that given any two Timestamps just take the one on the right.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[scalding] class VersionedState(meta: HDFSMetadata, startDate: Option[Tim
Interval.leftClosedRightOpen(
batcher.earliestTimeOf(beginning),
batcher.earliestTimeOf(end)
).right.get
)
}

def willAccept(available: Interval[Timestamp]) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.twitter.chill.java.IterableRegistrar
import com.twitter.scalding.Config
import com.twitter.scalding.Mode
import com.twitter.scalding.{ Tool => STool, Source => SSource, TimePathedSource => STPS, _ }
import com.twitter.scalding.typed.cascading_backend.CascadingBackend
import com.twitter.summingbird._
import com.twitter.summingbird.batch._
import com.twitter.summingbird.batch.option.{ FlatMapShards, Reducers }
Expand Down Expand Up @@ -615,6 +616,7 @@ object Scalding {
* writes we do, the second time we only read, we do not
* do any new writes.
*/
CascadingBackend.planTypedWrites(fd, mode)
Execution.fromFn { (_, _) => fd }
.flatMap { _ =>
// Now plan again and use summingbird's built in support
Expand Down Expand Up @@ -780,6 +782,7 @@ class Scalding(
.flatMap {
case (ts, pipe) =>
// Now we have a populated flowDef, time to let Cascading do it's thing:
CascadingBackend.planTypedWrites(flowDef, mode)
try {
if (flowDef.getSinks.isEmpty) {
Right((ts, None))
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.10.0-RC2"
version in ThisBuild := "0.10.0-M8-stripe"

0 comments on commit d0e156e

Please sign in to comment.