Skip to content

Commit

Permalink
Change the locking mechanism for ExplorationProgressController to work
Browse files Browse the repository at this point in the history
with the current MediatorLiveData implementation (see #90 for more
context). Fix existing progress controller tests and add a few more. All
current progress controller tests are passing.
  • Loading branch information
BenHenning committed Oct 3, 2019
1 parent ccbac0e commit 5f326fc
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 121 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package org.oppia.domain.exploration

import androidx.lifecycle.LiveData
import androidx.lifecycle.MutableLiveData
import org.oppia.app.model.Exploration
import org.oppia.util.data.AsyncResult
import org.oppia.util.data.DataProviders
import javax.inject.Inject

private const val EXPLORATION_DATA_PROVIDER_ID = "ExplorationDataProvider"
private const val START_PLAYING_EXPLORATION_RESULT_DATA_PROVIDER_ID = "StartPlayingExplorationResultDataProvider"
private const val STOP_PLAYING_EXPLORATION_RESULT_DATA_PROVIDER_ID = "StopPlayingExplorationResultDataProvider"

/**
* Controller for loading explorations by ID, or beginning to play an exploration.
Expand Down Expand Up @@ -41,21 +40,25 @@ class ExplorationDataController @Inject constructor(
* fail to load, but this provides early-failure detection.
*/
fun startPlayingExploration(explorationId: String): LiveData<AsyncResult<Any?>> {
val operation = explorationProgressController.beginExplorationAsync(explorationId)
val dataProvider = dataProviders.createDeferredDataProviderAsync(
START_PLAYING_EXPLORATION_RESULT_DATA_PROVIDER_ID, operation)
return dataProviders.convertToLiveData(dataProvider)
return try {
explorationProgressController.beginExplorationAsync(explorationId)
MutableLiveData(AsyncResult.success<Any?>(null))
} catch (e: Exception) {
MutableLiveData(AsyncResult.failed(e))
}
}

/**
* Finishes the most recent exploration started by [startPlayingExploration]. This method should only be called if an
* active exploration is being played, otherwise an exception will be thrown.
*/
fun stopPlayingExploration(): LiveData<AsyncResult<Any?>> {
val operation = explorationProgressController.finishExplorationAsync()
val dataProvider = dataProviders.createDeferredDataProviderAsync(
STOP_PLAYING_EXPLORATION_RESULT_DATA_PROVIDER_ID, operation)
return dataProviders.convertToLiveData(dataProvider)
return try {
explorationProgressController.finishExplorationAsync()
MutableLiveData(AsyncResult.success<Any?>(null))
} catch (e: Exception) {
MutableLiveData(AsyncResult.failed(e))
}
}

@Suppress("RedundantSuspendModifier") // DataProviders expects this function to be a suspend function.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.oppia.domain.exploration

import androidx.lifecycle.LiveData
import kotlinx.coroutines.Deferred
import androidx.lifecycle.MutableLiveData
import org.oppia.app.model.AnswerAndResponse
import org.oppia.app.model.AnswerOutcome
import org.oppia.app.model.CompletedState
Expand All @@ -16,33 +16,27 @@ import org.oppia.domain.classify.AnswerClassificationController
import org.oppia.util.data.AsyncDataSubscriptionManager
import org.oppia.util.data.AsyncResult
import org.oppia.util.data.DataProviders
import org.oppia.util.data.InMemoryBlockingCache
import java.util.concurrent.locks.ReentrantLock
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.concurrent.withLock

// TODO(#186): Use an interaction repository to retrieve whether a specific ID corresponds to a terminal interaction.
private const val TERMINAL_INTERACTION_ID = "EndExploration"

private const val CURRENT_STATE_DATA_PROVIDER_ID = "CurrentStateDataProvider"
private const val SUBMIT_ANSWER_RESULT_DATA_PROVIDER_ID = "SubmitAnswerResultDataProvider"
private const val MOVE_TO_NEXT_STATE_RESULT_DATA_PROVIDER_ID = "MoveToNextStateResultDataProvider"
private const val MOVE_TO_PREVIOUS_STATE_RESULT_DATA_PROVIDER_ID = "MoveToPreviousStateResultDataProvider"

/**
* Controller that tracks and reports the learner's ephemeral/non-persisted progress through an exploration. Note that
* this controller only supports one active exploration at a time.
*
* The current exploration session is started via the exploration data controller.
*
* This class uses an [InMemoryBlockingCache] for performing synchronization. This means that although access to state
* is thread-safe, ordering is still arbitrary. Calling code should take care to ensure that uses of this class off the
* main thread do not specifically depend on ordering since observing state changes cannot be done reliably without a
* UI-bound [LiveData] observer. Note that the UI-bound observers are reliable since the in-memory cache guarantees that
* the ordering of operations to the controller have to be observed in equal order on the UI thread.
* This class is thread-safe, but the order of applied operations is arbitrary. Calling code should take care to ensure
* that uses of this class do not specifically depend on ordering.
*/
@Singleton
class ExplorationProgressController @Inject constructor(
inMemoryBlockingCacheFactory: InMemoryBlockingCache.Factory,
private val dataProviders: DataProviders,
private val asyncDataSubscriptionManager: AsyncDataSubscriptionManager,
private val explorationRetriever: ExplorationRetriever,
Expand All @@ -52,31 +46,35 @@ class ExplorationProgressController @Inject constructor(
// TODO(#179): Add support for parameters.
// TODO(#181): Add support for solutions.
// TODO(#182): Add support for refresher explorations.
// TODO(#90): Update the internal locking of this controller to use something like an in-memory blocking cache to
// simplify state locking. However, doing this correctly requires a fix in MediatorLiveData to avoid unexpected
// cancellations in chained cross-scope coroutines.

private val currentStateDataProvider =
dataProviders.createInMemoryDataProviderAsync(CURRENT_STATE_DATA_PROVIDER_ID, this::retrieveCurrentStateAsync)
private val explorationProgress = inMemoryBlockingCacheFactory.create(ExplorationProgress())
private val explorationProgress = ExplorationProgress()
private val explorationProgressLock = ReentrantLock()

/** Resets this controller to begin playing the specified [Exploration]. */
internal fun beginExplorationAsync(explorationId: String): Deferred<*> {
return explorationProgress.updateInPlaceIfPresentAsync { progress ->
check(progress.playStage == PlayStage.NOT_PLAYING) {
internal fun beginExplorationAsync(explorationId: String) {
explorationProgressLock.withLock {
check(explorationProgress.playStage == PlayStage.NOT_PLAYING) {
"Expected to finish previous exploration before starting a new one."
}

progress.currentExplorationId = explorationId
progress.advancePlayStageTo(PlayStage.LOADING_EXPLORATION)
explorationProgress.currentExplorationId = explorationId
explorationProgress.advancePlayStageTo(PlayStage.LOADING_EXPLORATION)
asyncDataSubscriptionManager.notifyChangeAsync(CURRENT_STATE_DATA_PROVIDER_ID)
}
}

/** Indicates that the current exploration being played is now completed. */
internal fun finishExplorationAsync(): Deferred<*> {
return explorationProgress.updateInPlaceIfPresentAsync { progress ->
check(progress.playStage != PlayStage.NOT_PLAYING) {
internal fun finishExplorationAsync() {
explorationProgressLock.withLock {
check(explorationProgress.playStage != PlayStage.NOT_PLAYING) {
"Cannot finish playing an exploration that hasn't yet been started"
}
progress.advancePlayStageTo(PlayStage.NOT_PLAYING)
explorationProgress.advancePlayStageTo(PlayStage.NOT_PLAYING)
}
}

Expand Down Expand Up @@ -107,39 +105,40 @@ class ExplorationProgressController @Inject constructor(
* that point.
*/
fun submitAnswer(answer: InteractionObject): LiveData<AsyncResult<AnswerOutcome>> {
val operation = explorationProgress.updateInPlaceIfPresentAsync { progress ->
check(progress.playStage != PlayStage.NOT_PLAYING) {
"Cannot submit an answer if an exploration is not being played."
}
check(progress.playStage != PlayStage.SUBMITTING_ANSWER) {
"Cannot submit an answer while another answer is pending."
}
check(progress.playStage != PlayStage.LOADING_EXPLORATION) {
"Cannot submit an answer while the exploration is being loaded."
}

// Notify observers that the submitted answer is currently pending.
progress.advancePlayStageTo(PlayStage.SUBMITTING_ANSWER)
asyncDataSubscriptionManager.notifyChangeAsync(CURRENT_STATE_DATA_PROVIDER_ID)
try {
explorationProgressLock.withLock {
check(explorationProgress.playStage != PlayStage.NOT_PLAYING) {
"Cannot submit an answer if an exploration is not being played."
}
check(explorationProgress.playStage != PlayStage.SUBMITTING_ANSWER) {
"Cannot submit an answer while another answer is pending."
}
check(explorationProgress.playStage != PlayStage.LOADING_EXPLORATION) {
"Cannot submit an answer while the exploration is being loaded."
}

val topPendingState = progress.stateDeck.getPendingTopState()
val outcome = answerClassificationController.classify(topPendingState, answer)
val answerOutcome = progress.stateGraph.computeAnswerOutcomeForResult(topPendingState, outcome)
progress.stateDeck.submitAnswer(answer, answerOutcome.feedback)
// Follow the answer's outcome to another part of the graph if it's different.
if (answerOutcome.destinationCase == AnswerOutcome.DestinationCase.STATE_NAME) {
progress.stateDeck.pushState(progress.stateGraph.getState(answerOutcome.stateName))
}
// Notify observers that the submitted answer is currently pending.
explorationProgress.advancePlayStageTo(PlayStage.SUBMITTING_ANSWER)
asyncDataSubscriptionManager.notifyChangeAsync(CURRENT_STATE_DATA_PROVIDER_ID)

val topPendingState = explorationProgress.stateDeck.getPendingTopState()
val outcome = answerClassificationController.classify(topPendingState, answer)
val answerOutcome = explorationProgress.stateGraph.computeAnswerOutcomeForResult(topPendingState, outcome)
explorationProgress.stateDeck.submitAnswer(answer, answerOutcome.feedback)
// Follow the answer's outcome to another part of the graph if it's different.
if (answerOutcome.destinationCase == AnswerOutcome.DestinationCase.STATE_NAME) {
explorationProgress.stateDeck.pushState(explorationProgress.stateGraph.getState(answerOutcome.stateName))
}

// Return to viewing the state.
progress.advancePlayStageTo(PlayStage.VIEWING_STATE)
asyncDataSubscriptionManager.notifyChangeAsync(CURRENT_STATE_DATA_PROVIDER_ID)
// Return to viewing the state.
explorationProgress.advancePlayStageTo(PlayStage.VIEWING_STATE)
asyncDataSubscriptionManager.notifyChangeAsync(CURRENT_STATE_DATA_PROVIDER_ID)

answerOutcome // Propagate the classification result.
return MutableLiveData(AsyncResult.success(answerOutcome))
}
} catch (e: Exception) {
return MutableLiveData(AsyncResult.failed(e))
}

val dataProvider = dataProviders.createDeferredDataProviderAsync(SUBMIT_ANSWER_RESULT_DATA_PROVIDER_ID, operation)
return dataProviders.convertToLiveData(dataProvider)
}

/**
Expand All @@ -161,23 +160,24 @@ class ExplorationProgressController @Inject constructor(
* instead rely on [getCurrentState] for observing a successful transition to another state.
*/
fun moveToPreviousState(): LiveData<AsyncResult<Any?>> {
val operation: Deferred<*> = explorationProgress.updateInPlaceIfPresentAsync { progress ->
check(progress.playStage != PlayStage.NOT_PLAYING) {
"Cannot navigate to a previous state if an exploration is not being played."
}
check(progress.playStage != PlayStage.LOADING_EXPLORATION) {
"Cannot navigate to a previous state if an exploration is being loaded."
}
check(progress.playStage != PlayStage.SUBMITTING_ANSWER) {
"Cannot navigate to a previous state if an answer submission is pending."
try {
explorationProgressLock.withLock {
check(explorationProgress.playStage != PlayStage.NOT_PLAYING) {
"Cannot navigate to a previous state if an exploration is not being played."
}
check(explorationProgress.playStage != PlayStage.LOADING_EXPLORATION) {
"Cannot navigate to a previous state if an exploration is being loaded."
}
check(explorationProgress.playStage != PlayStage.SUBMITTING_ANSWER) {
"Cannot navigate to a previous state if an answer submission is pending."
}
explorationProgress.stateDeck.navigateToPreviousState()
asyncDataSubscriptionManager.notifyChangeAsync(CURRENT_STATE_DATA_PROVIDER_ID)
}
progress.stateDeck.navigateToPreviousState()
asyncDataSubscriptionManager.notifyChangeAsync(CURRENT_STATE_DATA_PROVIDER_ID)
return MutableLiveData(AsyncResult.success<Any?>(null))
} catch (e: Exception) {
return MutableLiveData(AsyncResult.failed(e))
}

val dataProvider = dataProviders.createDeferredDataProviderAsync(
MOVE_TO_NEXT_STATE_RESULT_DATA_PROVIDER_ID, operation)
return dataProviders.convertToLiveData(dataProvider)
}

/**
Expand All @@ -195,23 +195,24 @@ class ExplorationProgressController @Inject constructor(
* [getCurrentState] for observing a successful transition to another state.
*/
fun moveToNextState(): LiveData<AsyncResult<Any?>> {
val operation: Deferred<*> = explorationProgress.updateInPlaceIfPresentAsync { progress ->
check(progress.playStage != PlayStage.NOT_PLAYING) {
"Cannot navigate to a next state if an exploration is not being played."
}
check(progress.playStage != PlayStage.LOADING_EXPLORATION) {
"Cannot navigate to a next state if an exploration is being loaded."
}
check(progress.playStage != PlayStage.SUBMITTING_ANSWER) {
"Cannot navigate to a next state if an answer submission is pending."
try {
explorationProgressLock.withLock {
check(explorationProgress.playStage != PlayStage.NOT_PLAYING) {
"Cannot navigate to a next state if an exploration is not being played."
}
check(explorationProgress.playStage != PlayStage.LOADING_EXPLORATION) {
"Cannot navigate to a next state if an exploration is being loaded."
}
check(explorationProgress.playStage != PlayStage.SUBMITTING_ANSWER) {
"Cannot navigate to a next state if an answer submission is pending."
}
explorationProgress.stateDeck.navigateToNextState()
asyncDataSubscriptionManager.notifyChangeAsync(CURRENT_STATE_DATA_PROVIDER_ID)
}
progress.stateDeck.navigateToNextState()
asyncDataSubscriptionManager.notifyChangeAsync(CURRENT_STATE_DATA_PROVIDER_ID)
return MutableLiveData(AsyncResult.success<Any?>(null))
} catch (e: Exception) {
return MutableLiveData(AsyncResult.failed(e))
}

val dataProvider = dataProviders.createDeferredDataProviderAsync(
MOVE_TO_PREVIOUS_STATE_RESULT_DATA_PROVIDER_ID, operation)
return dataProviders.convertToLiveData(dataProvider)
}

/**
Expand Down Expand Up @@ -242,33 +243,55 @@ class ExplorationProgressController @Inject constructor(

private suspend fun retrieveCurrentStateAsync(): AsyncResult<EphemeralState> {
return try {
retrieveCurrentStateWithinCacheAsync().await()
retrieveCurrentStateWithinCacheAsync()
} catch (e: Exception) {
AsyncResult.failed(e)
}
}

private suspend fun retrieveCurrentStateWithinCacheAsync(): Deferred<AsyncResult<EphemeralState>> {
return explorationProgress.updateInPlaceIfPresentAsync { progress ->
when (progress.playStage) {
private suspend fun retrieveCurrentStateWithinCacheAsync(): AsyncResult<EphemeralState> {
var explorationId: String? = null
lateinit var currentStage: PlayStage
explorationProgressLock.withLock {
currentStage = explorationProgress.playStage
if (currentStage == PlayStage.LOADING_EXPLORATION) {
explorationId = explorationProgress.currentExplorationId
}
}

val exploration: Exploration? =
if (explorationId != null) explorationRetriever.loadExploration(explorationId!!) else null

explorationProgressLock.withLock {
// It's possible for the exploration ID or stage to change between critical sections. However, this is the only
// way to ensure the exploration is loaded since suspended functions cannot be called within a mutex.
check(exploration == null || explorationProgress.currentExplorationId == explorationId) {
"Encountered race condition when retrieving exploration. ID changed from $explorationId" +
" to ${explorationProgress.currentExplorationId}"
}
check(explorationProgress.playStage == currentStage) {
"Encountered race condition when retrieving exploration. ID changed from $explorationId" +
" to ${explorationProgress.currentExplorationId}"
}
return when (explorationProgress.playStage) {
PlayStage.NOT_PLAYING -> AsyncResult.pending()
PlayStage.LOADING_EXPLORATION -> {
try {
finishLoadExploration(progress)
AsyncResult.success(progress.stateDeck.getCurrentEphemeralState())
// The exploration must be available for this stage since it was loaded above.
finishLoadExploration(exploration!!, explorationProgress)
AsyncResult.success(explorationProgress.stateDeck.getCurrentEphemeralState())
} catch (e: Exception) {
AsyncResult.failed<EphemeralState>(e)
}
}
PlayStage.VIEWING_STATE -> AsyncResult.success(progress.stateDeck.getCurrentEphemeralState())
PlayStage.VIEWING_STATE -> AsyncResult.success(explorationProgress.stateDeck.getCurrentEphemeralState())
PlayStage.SUBMITTING_ANSWER -> AsyncResult.pending()
}
}
}

private suspend fun finishLoadExploration(progress: ExplorationProgress) {
private fun finishLoadExploration(exploration: Exploration, progress: ExplorationProgress) {
// The exploration must be initialized first since other lazy fields depend on it being inited.
val exploration = explorationRetriever.loadExploration(progress.currentExplorationId)
progress.currentExploration = exploration
progress.stateGraph.resetStateGraph(exploration.statesMap)
progress.stateDeck.resetDeck(progress.stateGraph.getState(exploration.initStateName))
Expand Down
Loading

0 comments on commit 5f326fc

Please sign in to comment.