Skip to content

Commit

Permalink
[EXT] Fix repeated batch after stop gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
zzcclp committed May 19, 2016
1 parent ff2590d commit 3d637ab
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.streaming.scheduler.JobGenerator


private[streaming]
class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
class Checkpoint(ssc: StreamingContext, val checkpointTime: Time, val afterBatchCompletion: Boolean = false)
extends Logging with Serializable {
val master = ssc.sc.master
val framework = ssc.sc.appName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {

// Batches when the master was down, that is,
// between the checkpoint and current restart time
val checkpointTime = ssc.initialCheckpoint.checkpointTime
val checkpointTime = {

This comment has been minimized.

Copy link
@jerryshao

jerryshao May 19, 2016

don't need { here.

// if checkpoint after batch completion, start from next batch
if (ssc.initialCheckpoint.afterBatchCompletion) {
ssc.initialCheckpoint.checkpointTime + batchDuration
} else {
ssc.initialCheckpoint.checkpointTime
}
}

This comment has been minimized.

Copy link
@jerryshao

jerryshao May 19, 2016

also here.

val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
val downTimes = checkpointTime.until(restartTime, batchDuration)
logInfo("Batches during down time (" + downTimes.size + " batches): "
Expand Down Expand Up @@ -292,7 +299,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
checkpointWriter.write(new Checkpoint(ssc, time, clearCheckpointDataLater), clearCheckpointDataLater)

This comment has been minimized.

Copy link
@jerryshao

jerryshao May 19, 2016

Use clearCheckpointDataLater for afterBatchCompletion is a little wired here, maybe you need to explicitly specify afterBatchCompletion = true here.

}
}

Expand Down

0 comments on commit 3d637ab

Please sign in to comment.