diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java index 038189e075..69649cf4f2 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java @@ -97,7 +97,7 @@ private AWSDataStorePlugin( sqliteStorageAdapter, AppSyncClient.via(api), () -> pluginConfiguration, - () -> api.getPlugins().isEmpty() ? Orchestrator.Mode.LOCAL_ONLY : Orchestrator.Mode.SYNC_VIA_API + () -> api.getPlugins().isEmpty() ? Orchestrator.State.LOCAL_ONLY : Orchestrator.State.SYNC_VIA_API ); this.userProvidedConfiguration = userProvidedConfiguration; } @@ -235,18 +235,11 @@ private Completable initializeStorageAdapter(Context context) { )); } - private void waitForInitialization(@NonNull Action onComplete, @NonNull Consumer onError) { - Completable.create(emitter -> { - categoryInitializationsPending.await(); - emitter.onComplete(); - }) - .timeout(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS) - .subscribeOn(Schedulers.io()) - .subscribe( - onComplete::call, - throwable -> onError.accept(new DataStoreException("Request failed because DataStore is not " + - "initialized.", throwable, "Retry your request.")) - ); + private Completable waitForInitialization() { + return Completable.fromAction(() -> categoryInitializationsPending.await()) + .timeout(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .subscribeOn(Schedulers.io()) + .doOnError(error -> LOG.error("DataStore initialization timed out.", error)); } /** @@ -254,15 +247,13 @@ private void waitForInitialization(@NonNull Action onComplete, @NonNull Consumer */ @Override public void start(@NonNull Action onComplete, @NonNull Consumer onError) { - waitForInitialization(() -> { - try { - orchestrator.start(); - } catch (DataStoreException exception) { - onError.accept(exception); - return; - } - onComplete.call(); - }, onError); + waitForInitialization() + .andThen(orchestrator.start()) + .subscribeOn(Schedulers.io()) + .subscribe( + onComplete::call, + error -> onError.accept(new DataStoreException("Failed to start DataStore.", error, "Retry.")) + ); } /** @@ -270,12 +261,13 @@ public void start(@NonNull Action onComplete, @NonNull Consumer onError) { - waitForInitialization(() -> orchestrator.stop() - .subscribeOn(Schedulers.io()) - .subscribe( - onComplete::call, - error -> onError.accept(new DataStoreException("Failed to stop DataStore.", error, - "Retry your request."))), onError); + waitForInitialization() + .andThen(orchestrator.stop()) + .subscribeOn(Schedulers.io()) + .subscribe( + onComplete::call, + error -> onError.accept(new DataStoreException("Failed to stop DataStore.", error, "Retry.")) + ); } /** @@ -296,19 +288,6 @@ public void clear(@NonNull Action onComplete, @NonNull Consumer targetMode; - private final AtomicReference currentMode; + private final Supplier targetState; + private final AtomicReference currentState; private final MutationOutbox mutationOutbox; private final CompositeDisposable disposables; private final Scheduler startStopScheduler; @@ -85,7 +83,7 @@ public final class Orchestrator { * {@link AWSDataStorePlugin}'s constructor, the plugin is not fully configured yet. * The reference to the variable returned by the provider only get set after the plugin's * {@link AWSDataStorePlugin#configure(JSONObject, Context)} is invoked by Amplify. - * @param targetMode The desired mode of operation - online, or offline + * @param targetState The desired state of operation - online, or offline */ public Orchestrator( @NonNull final ModelProvider modelProvider, @@ -93,7 +91,7 @@ public Orchestrator( @NonNull final LocalStorageAdapter localStorageAdapter, @NonNull final AppSync appSync, @NonNull final DataStoreConfigurationProvider dataStoreConfigurationProvider, - @NonNull final Supplier targetMode) { + @NonNull final Supplier targetState) { Objects.requireNonNull(modelSchemaRegistry); Objects.requireNonNull(modelProvider); Objects.requireNonNull(appSync); @@ -130,8 +128,8 @@ public Orchestrator( .queryPredicateProvider(queryPredicateProvider) .build(); this.storageObserver = new StorageObserver(localStorageAdapter, mutationOutbox); - this.currentMode = new AtomicReference<>(Mode.STOPPED); - this.targetMode = targetMode; + this.currentState = new AtomicReference<>(State.STOPPED); + this.targetState = targetState; this.disposables = new CompositeDisposable(); this.startStopScheduler = Schedulers.single(); @@ -142,127 +140,68 @@ public Orchestrator( TIMEOUT_SECONDS_PER_MODEL * modelProvider.models().size() ); this.startStopSemaphore = new Semaphore(1); - } - /** - * Checks if the orchestrator is running in the desired target state. - * @return true if so, false otherwise. - */ - public boolean isStarted() { - return ObjectsCompat.equals(targetMode.get(), currentMode.get()); } /** - * Checks if the orchestrator is stopped. - * @return true if so, false otherwise. + * Start the orchestrator. + * @return A completable which emits success when the orchestrator has transitioned to LOCAL_ONLY (synchronously) + * and started (asynchronously) the transition to SYNC_VIA_API, if an API is available. */ - @SuppressWarnings("unused") - public boolean isStopped() { - return Mode.STOPPED.equals(currentMode.get()); + public synchronized Completable start() { + return performSynchronized(Completable.fromAction(() -> { + switch (targetState.get()) { + case LOCAL_ONLY: + transitionToLocalOnly(); + break; + case SYNC_VIA_API: + transitionToApiSync(); + break; + case STOPPED: + default: + break; + } + })); } /** - * Start performing sync operations between the local storage adapter - * and the remote GraphQL endpoint. - * @throws DataStoreException on failure to aquire start stop lock. + * Stop the orchestrator. + * @return A completable which emits success when orchestrator stops */ - public synchronized void start() throws DataStoreException { - if (tryAcquireStartStopLock(LOCAL_OP_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { - if (isStarted()) { - startStopSemaphore.release(); - return; - } - disposables.add(transitionCompletable() - .doOnSubscribe(subscriber -> { - LOG.info("Starting the orchestrator."); - }) - .doOnComplete(() -> { - LOG.info("Orchestrator completed a transition"); - if (isStarted()) { - Amplify.Hub.publish(HubChannel.DATASTORE, - HubEvent.create(DataStoreChannelEventName.READY)); - } - }) - .doOnError(failure -> { - LOG.warn("Failed to start orchestrator.", failure); - }) - .doOnDispose(() -> LOG.debug("Orchestrator disposed a transition.")) - .doFinally(startStopSemaphore::release) - .subscribeOn(startStopScheduler) - .subscribe() - ); - } else { - throw new DataStoreException("Unable to acquire orchestrator lock. Transition currently in " + - "progress.", "Retry your request"); - } + public synchronized Completable stop() { + return performSynchronized(transitionToStopped()); } - private boolean tryAcquireStartStopLock(long opTimeout, TimeUnit timeUnit) { + private Completable performSynchronized(Completable completable) { boolean permitAvailable = startStopSemaphore.availablePermits() > 0; LOG.debug("Attempting to acquire lock. Permits available = " + permitAvailable); try { - if (!startStopSemaphore.tryAcquire(opTimeout, timeUnit)) { - LOG.warn("Unable to acquire orchestrator lock. Transition currently in progress."); - return false; + if (!startStopSemaphore.tryAcquire(LOCAL_OP_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + return Completable.error(new DataStoreException("Timed out acquiring orchestrator lock.", + "Retry your request.")); } } catch (InterruptedException exception) { - return false; + return Completable.error(new DataStoreException("Interrupted while acquiring orchestrator lock.", + "Retry your request.")); } - LOG.debug("Lock acquired."); - return true; + LOG.info("Orchestrator lock acquired."); + return completable.doFinally(() -> { + startStopSemaphore.release(); + LOG.info("Orchestrator lock released."); + }); } - private Completable transitionCompletable() { - Mode current = currentMode.get(); - Mode target = targetMode.get(); - if (ObjectsCompat.equals(current, target)) { - return Completable.complete(); - } - LOG.info(String.format(Locale.US, - "DataStore orchestrator transitioning states. " + - "Current mode = %s, target mode = %s.", current, target - )); - - switch (target) { - case STOPPED: - return transitionToStopped(current); - case LOCAL_ONLY: - return transitionToLocalOnly(current); - case SYNC_VIA_API: - return transitionToApiSync(current); - default: - return unknownMode(target); - } - } - - /** - * Stop the orchestrator. - * @return A completable which emits success when orchestrator stops - */ - public synchronized Completable stop() { - LOG.info("DataStore orchestrator stopping. Current mode = " + currentMode.get().name()); - if (tryAcquireStartStopLock(LOCAL_OP_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { - disposables.clear(); - return transitionToStopped(currentMode.get()) - .subscribeOn(startStopScheduler) - .doFinally(startStopSemaphore::release); - } else { - return Completable.error(new DataStoreException("Unable to acquire orchestrator lock. " + - "Transition currently in progress.", - "Retry your operation")); - } - - } - - private static Completable unknownMode(Mode mode) { - return Completable.error(new DataStoreException( - "Orchestrator state machine made reference to unknown mode = " + mode.name(), - AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION - )); + private DataStoreException unknownStateError(State state) { + return new DataStoreException( + "Orchestrator state machine made reference to unknown state = " + state.name(), + AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION + ); } - private Completable transitionToStopped(Mode current) { - switch (current) { + private Completable transitionToStopped() { + LOG.info("DataStore orchestrator stopping. Current mode = " + currentState.get().name()); + disposables.clear(); + switch (currentState.get()) { case SYNC_VIA_API: return stopApiSync().doFinally(this::stopObservingStorageChanges); case LOCAL_ONLY: @@ -271,35 +210,42 @@ private Completable transitionToStopped(Mode current) { case STOPPED: return Completable.complete(); default: - return unknownMode(current); + return Completable.error(unknownStateError(currentState.get())); + } } - private Completable transitionToLocalOnly(Mode current) { - switch (current) { + private void transitionToLocalOnly() throws DataStoreException { + switch (currentState.get()) { case STOPPED: + LOG.info("Starting the orchestrator."); startObservingStorageChanges(); - return Completable.complete(); + publishReadyEvent(); + break; case LOCAL_ONLY: - return Completable.complete(); + break; case SYNC_VIA_API: - return stopApiSync(); + stopApiSyncBlocking(); + break; default: - return unknownMode(current); + throw unknownStateError(currentState.get()); } } - private Completable transitionToApiSync(Mode current) { - switch (current) { + private void transitionToApiSync() throws DataStoreException { + switch (currentState.get()) { case SYNC_VIA_API: - return Completable.complete(); + break; case LOCAL_ONLY: - return startApiSync(); + startApiSync(); + break; case STOPPED: + LOG.info("Starting the orchestrator."); startObservingStorageChanges(); - return startApiSync(); + startApiSync(); + break; default: - return unknownMode(current); + throw unknownStateError(currentState.get()); } } @@ -307,19 +253,21 @@ private Completable transitionToApiSync(Mode current) { * Start observing the local storage adapter for changes; * enqueue them into the mutation outbox. */ - private void startObservingStorageChanges() { + private void startObservingStorageChanges() throws DataStoreException { LOG.info("Starting to observe local storage changes."); try { boolean subscribed = mutationOutbox.load() .andThen(Completable.create(emitter -> { storageObserver.startObservingStorageChanges(emitter::onComplete); - currentMode.set(Mode.LOCAL_ONLY); + LOG.info("Setting currentState to LOCAL_ONLY"); + currentState.set(State.LOCAL_ONLY); })).blockingAwait(LOCAL_OP_TIMEOUT_SECONDS, TimeUnit.SECONDS); if (!subscribed) { throw new TimeoutException("Timed out while preparing local-only mode."); } } catch (Throwable throwable) { - LOG.warn("Failed to start observing storage changes.", throwable); + throw new DataStoreException("Timed out while starting to observe storage changes.", + AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION); } } @@ -329,14 +277,29 @@ private void startObservingStorageChanges() { private void stopObservingStorageChanges() { LOG.info("Stopping observation of local storage changes."); storageObserver.stopObservingStorageChanges(); - currentMode.set(Mode.STOPPED); + LOG.info("Setting currentState to STOPPED"); + currentState.set(State.STOPPED); } /** * Start syncing models to and from a remote API. * @return A Completable that succeeds when API sync is enabled. */ - private Completable startApiSync() { + private void startApiSync() { + LOG.info("Setting currentState to SYNC_VIA_API"); + currentState.set(State.SYNC_VIA_API); + disposables.add(startApiSyncCompletable() + .doOnComplete(() -> { + LOG.info("Started the orchestrator in API sync mode."); + publishReadyEvent(); + }) + .doOnDispose(() -> LOG.debug("Orchestrator disposed the API sync")) + .subscribeOn(Schedulers.io()) + .subscribe() + ); + } + + private Completable startApiSyncCompletable() { return Completable.create(emitter -> { LOG.info("Starting API synchronization mode."); @@ -372,8 +335,6 @@ private Completable startApiSync() { LOG.debug("Draining subscription buffer..."); subscriptionProcessor.startDrainingMutationBuffer(this::stopApiSyncBlocking); - - currentMode.set(Mode.SYNC_VIA_API); emitter.onComplete(); }) .doOnError(error -> { @@ -383,6 +344,10 @@ private Completable startApiSync() { .onErrorComplete(); } + private void publishReadyEvent() { + Amplify.Hub.publish(HubChannel.DATASTORE, HubEvent.create(DataStoreChannelEventName.READY)); + } + private void stopApiSyncBlocking() { try { boolean stopped = stopApiSync() @@ -407,13 +372,13 @@ private Completable stopApiSync() { mutationProcessor.stopDrainingMutationOutbox(); }) .onErrorComplete() - .doOnComplete(() -> currentMode.set(Mode.LOCAL_ONLY)); + .doOnComplete(() -> currentState.set(State.LOCAL_ONLY)); } /** - * The mode of operation for the Orchestrator's synchronization logic. + * The current state of the Orchestrator. */ - public enum Mode { + public enum State { /** * The sync orchestrator is fully stopped. */ diff --git a/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/OrchestratorTest.java b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/OrchestratorTest.java index 17a7bd8cfb..3837c37c95 100644 --- a/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/OrchestratorTest.java +++ b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/OrchestratorTest.java @@ -18,12 +18,12 @@ import com.amplifyframework.AmplifyException; import com.amplifyframework.api.graphql.GraphQLBehavior; import com.amplifyframework.api.graphql.MutationType; +import com.amplifyframework.core.Amplify; import com.amplifyframework.core.model.ModelProvider; import com.amplifyframework.core.model.ModelSchemaRegistry; import com.amplifyframework.core.model.temporal.Temporal; import com.amplifyframework.datastore.DataStoreChannelEventName; import com.amplifyframework.datastore.DataStoreConfiguration; -import com.amplifyframework.datastore.DataStoreException; import com.amplifyframework.datastore.appsync.AppSyncClient; import com.amplifyframework.datastore.appsync.ModelMetadata; import com.amplifyframework.datastore.appsync.ModelWithMetadata; @@ -32,6 +32,7 @@ import com.amplifyframework.datastore.storage.SynchronousStorageAdapter; import com.amplifyframework.hub.HubChannel; import com.amplifyframework.hub.HubEvent; +import com.amplifyframework.logging.Logger; import com.amplifyframework.testmodels.commentsblog.BlogOwner; import com.amplifyframework.testutils.HubAccumulator; import com.amplifyframework.testutils.mocks.ApiMocking; @@ -61,6 +62,8 @@ */ @RunWith(RobolectricTestRunner.class) public final class OrchestratorTest { + private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-datastore:test"); + private Orchestrator orchestrator; private HubAccumulator orchestratorInitObserver; private GraphQLBehavior mockApi; @@ -78,9 +81,9 @@ public void setup() throws AmplifyException { // Arrange: create a BlogOwner susan = BlogOwner.builder().name("Susan Quimby").build(); - // SUBSCRIPTIONS_ESTABLISHED indicates that the orchestrator is up and running. + // SYNC_QUERIES_READY indicates that the sync queries have completed. orchestratorInitObserver = - HubAccumulator.create(HubChannel.DATASTORE, DataStoreChannelEventName.SUBSCRIPTIONS_ESTABLISHED, 1) + HubAccumulator.create(HubChannel.DATASTORE, DataStoreChannelEventName.SYNC_QUERIES_READY, 1) .start(); ModelMetadata metadata = new ModelMetadata(susan.getId(), @@ -107,7 +110,7 @@ public void setup() throws AmplifyException { localStorageAdapter, appSync, DataStoreConfiguration::defaults, - () -> Orchestrator.Mode.SYNC_VIA_API + () -> Orchestrator.State.SYNC_VIA_API ); } @@ -122,7 +125,7 @@ public void setup() throws AmplifyException { @Test public void itemsPlacedInStorageArePublishedToNetwork() throws AmplifyException { // Arrange: orchestrator is running - orchestrator.start(); + orchestrator.start().test(); orchestratorInitObserver.await(10, TimeUnit.SECONDS); HubAccumulator accumulator = @@ -148,26 +151,20 @@ public void itemsPlacedInStorageArePublishedToNetwork() throws AmplifyException /** * Verify preventing concurrent state transitions from happening. - * @throws AmplifyException Not expected. */ @Test - public void preventConcurrentStateTransitions() throws AmplifyException { + public void preventConcurrentStateTransitions() { // Arrange: orchestrator is running - orchestrator.start(); + orchestrator.start().test(); // Try to start it in a new thread. boolean success = Completable.create(emitter -> { - new Thread(() -> { - try { - orchestrator.start(); - emitter.onComplete(); - } catch (DataStoreException exception) { - emitter.onError(exception); - } - }).start(); + new Thread(() -> orchestrator.start() + .subscribe(emitter::onComplete, emitter::onError) + ).start(); // Try to start it again on the current thread. - orchestrator.start(); + orchestrator.start().test(); }).blockingAwait(5, TimeUnit.SECONDS); assertTrue("Failed to start orchestrator on a background thread", success);