Skip to content

Commit

Permalink
fix semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-torres committed Jan 9, 2018
1 parent eafe670 commit f825155
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class ContinuousExecution(
sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
}
} finally {
epochEndpoint.askSync(StopEpochCoordinator)
epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
SparkEnv.get.rpcEnv.stop(epochEndpoint)

epochUpdateThread.interrupt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ private[continuous] sealed trait EpochCoordinatorMessage extends Serializable
private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage

/**
* Synchronously stop the epoch coordinator. The RpcEndpoint stop() will clear out the message queue
* before terminating the endpoint, but we must be sure no more messages will be processed before we
* can restart the query. The framework unfortunately provides no handle to wait for the queue.
* The RpcEndpoint stop() will wait to clear out the message queue before terminating the
* object. This can lead to a race condition where the query restarts at epoch n, a new
* EpochCoordinator starts at epoch n, and then the old epoch coordinator commits epoch n + 1.
* The framework doesn't provide a handle to wait on the message queue, so we use a synchronous
* message to stop any writes to the ContinuousExecution object.
*/
private[sql] case object StopEpochCoordinator extends EpochCoordinatorMessage
private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorMessage

// Init messages
/**
Expand Down Expand Up @@ -123,7 +125,7 @@ private[continuous] class EpochCoordinator(
override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint with Logging {

private var stopped: Boolean = false
private var queryWritesStopped: Boolean = false

private var numReaderPartitions: Int = _
private var numWriterPartitions: Int = _
Expand Down Expand Up @@ -162,7 +164,10 @@ private[continuous] class EpochCoordinator(
}

override def receive: PartialFunction[Any, Unit] = {
case _ if stopped => throw new IllegalStateException(s"Coordinator $this stopped")
// If we just drop these messages, we won't do any writes to the query. The lame duck tasks
// won't shed errors or anything.
case _ if queryWritesStopped => ()

case CommitPartitionEpoch(partitionId, epoch, message) =>
logDebug(s"Got commit from partition $partitionId at epoch $epoch: $message")
if (!partitionCommits.isDefinedAt((epoch, partitionId))) {
Expand All @@ -182,7 +187,6 @@ private[continuous] class EpochCoordinator(
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ if stopped => throw new IllegalStateException(s"Coordinator $this stopped")
case GetCurrentEpoch =>
val result = currentDriverEpoch
logDebug(s"Epoch $result")
Expand All @@ -200,8 +204,8 @@ private[continuous] class EpochCoordinator(
numWriterPartitions = numPartitions
context.reply(())

case StopEpochCoordinator =>
stopped = true
case StopContinuousExecutionWrites =>
queryWritesStopped = true
context.reply(())
}
}

0 comments on commit f825155

Please sign in to comment.