Skip to content

Commit

Permalink
Minor fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Jan 20, 2017
1 parent 7557ade commit cbcd6e8
Showing 1 changed file with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ object StateStore extends Logging {
val MAINTENANCE_INTERVAL_CONFIG = "spark.sql.streaming.stateStore.maintenanceInterval"
val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60

@GuardedBy("loadedProviders")
private val loadedProviders = new mutable.HashMap[StateStoreId, StateStoreProvider]()

class MaintenanceTask(periodMs: Long, task: => Unit, onError: => Unit) {
Expand All @@ -138,25 +139,28 @@ object StateStore extends Logging {
} catch {
case NonFatal(e) =>
logWarning("Error running maintenance thread", e)
future.cancel(false) // do interrupt active run, as this is being called from the run
onError
throw e
}
}
}

private val future: ScheduledFuture[_] = executor.scheduleAtFixedRate(
runnable, periodMs, periodMs, TimeUnit.MILLISECONDS)

def stop(): Unit = { future.cancel(false) }
def stop(): Unit = {
future.cancel(false)
executor.shutdown()
}

def isRunning: Boolean = !future.isCancelled
def isRunning: Boolean = !future.isDone
}

@GuardedBy("loadedProviders")
@volatile private var maintenanceTask: MaintenanceTask = null
private var maintenanceTask: MaintenanceTask = null

@GuardedBy("loadedProviders")
@volatile private var _coordRef: StateStoreCoordinatorRef = null
private var _coordRef: StateStoreCoordinatorRef = null

/** Get or create a store associated with the id. */
def get(
Expand Down Expand Up @@ -238,6 +242,7 @@ object StateStore extends Logging {
} catch {
case NonFatal(e) =>
logWarning(s"Error managing $provider, stopping management thread")
throw e
}
}
}
Expand All @@ -263,7 +268,7 @@ object StateStore extends Logging {
}
}

private def coordinatorRef: Option[StateStoreCoordinatorRef] = synchronized {
private def coordinatorRef: Option[StateStoreCoordinatorRef] = loadedProviders.synchronized {
val env = SparkEnv.get
if (env != null) {
if (_coordRef == null) {
Expand Down

0 comments on commit cbcd6e8

Please sign in to comment.