From 346dfa76b00f290c1bad94db1e73a268278997ab Mon Sep 17 00:00:00 2001 From: Matt Creaser Date: Tue, 18 Apr 2023 14:15:29 -0300 Subject: [PATCH] Release semaphore if an error occurs while starting or stopping the Orchestrator --- .idea/codeStyles/Project.xml | 46 +++++++- .../datastore/syncengine/Orchestrator.java | 16 ++- .../syncengine/OrchestratorTest.java | 109 +++++++++--------- 3 files changed, 105 insertions(+), 66 deletions(-) diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml index 42cc79e213..04d8266a43 100644 --- a/.idea/codeStyles/Project.xml +++ b/.idea/codeStyles/Project.xml @@ -1,6 +1,33 @@ + - + \ No newline at end of file diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java index bf507d977b..10833faa5b 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java @@ -194,15 +194,13 @@ private Completable performSynchronized(Action action) { "Retry your request.")); } LOG.info("Orchestrator lock acquired."); - return Completable.fromAction(action) - .andThen( - Completable.fromAction( - () -> { - startStopSemaphore.release(); - LOG.info("Orchestrator lock released."); - } - ) - ); + return Completable.fromAction(action).doOnError((e) -> { + startStopSemaphore.release(); + LOG.info("Orchestrator lock released."); + }).andThen(Completable.fromAction(() -> { + startStopSemaphore.release(); + LOG.info("Orchestrator lock released."); + })); } private void unknownState(State state) throws DataStoreException { diff --git a/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/OrchestratorTest.java b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/OrchestratorTest.java index 3f9b063907..583166cff8 100644 --- a/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/OrchestratorTest.java +++ b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/OrchestratorTest.java @@ -15,13 +15,9 @@ package com.amplifyframework.datastore.syncengine; -import android.content.Context; -import androidx.annotation.NonNull; - import com.amplifyframework.AmplifyException; import com.amplifyframework.api.graphql.GraphQLBehavior; import com.amplifyframework.api.graphql.MutationType; -import com.amplifyframework.core.Amplify; import com.amplifyframework.core.model.ModelProvider; import com.amplifyframework.core.model.SchemaRegistry; import com.amplifyframework.core.model.temporal.Temporal; @@ -35,7 +31,6 @@ import com.amplifyframework.datastore.storage.SynchronousStorageAdapter; import com.amplifyframework.hub.HubChannel; import com.amplifyframework.hub.HubEvent; -import com.amplifyframework.logging.Logger; import com.amplifyframework.testmodels.commentsblog.BlogOwner; import com.amplifyframework.testutils.HubAccumulator; import com.amplifyframework.testutils.mocks.ApiMocking; @@ -57,21 +52,24 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests the {@link Orchestrator}. */ @RunWith(RobolectricTestRunner.class) public final class OrchestratorTest { - private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-datastore:test"); private Orchestrator orchestrator; private HubAccumulator orchestratorInitObserver; - private GraphQLBehavior mockApi; - private InMemoryStorageAdapter localStorageAdapter; - private BlogOwner susan; + private final GraphQLBehavior mockApi = mock(GraphQLBehavior.class); + private final InMemoryStorageAdapter localStorageAdapter = InMemoryStorageAdapter.create(); + private final BlogOwner susan = BlogOwner.builder().name("Susan Quimby").build(); + + private final ReachabilityMonitor reachabilityMonitor = mock(ReachabilityMonitor.class); /** * Setup mocks and other common elements. @@ -81,75 +79,59 @@ public final class OrchestratorTest { @Before public void setup() throws AmplifyException { ShadowLog.stream = System.out; - // Arrange: create a BlogOwner - susan = BlogOwner.builder().name("Susan Quimby").build(); // SYNC_QUERIES_READY indicates that the sync queries have completed. orchestratorInitObserver = HubAccumulator.create(HubChannel.DATASTORE, DataStoreChannelEventName.SYNC_QUERIES_READY, 1) - .start(); + .start(); - ModelMetadata metadata = new ModelMetadata(susan.getId(), - false, - 1, - Temporal.Timestamp.now()); + ModelMetadata metadata = new ModelMetadata( + susan.getId(), + false, + 1, + Temporal.Timestamp.now() + ); ModelWithMetadata modelWithMetadata = new ModelWithMetadata<>(susan, metadata); // Mock behaviors from for the API category - mockApi = mock(GraphQLBehavior.class); ApiMocking.mockSubscriptionStart(mockApi); ApiMocking.mockSuccessfulMutation(mockApi, susan.getId(), modelWithMetadata); ApiMocking.mockSuccessfulQuery(mockApi, modelWithMetadata); AppSyncClient appSync = AppSyncClient.via(mockApi); - localStorageAdapter = InMemoryStorageAdapter.create(); ModelProvider modelProvider = SimpleModelProvider.withRandomVersion(BlogOwner.class); SchemaRegistry schemaRegistry = SchemaRegistry.instance(); schemaRegistry.clear(); schemaRegistry.register(modelProvider.models()); - ReachabilityMonitor reachabilityMonitor = new ReachabilityMonitor() { - @Override - public void configure(@NonNull Context context) { } - - @Override - public void configure(@NonNull Context context, @NonNull ConnectivityProvider connectivityProvider) {} - - @NonNull - @Override - public Observable getObservable() { - return Observable.just(true); - } - }; - - orchestrator = - new Orchestrator(modelProvider, - schemaRegistry, - localStorageAdapter, - appSync, - DataStoreConfiguration::defaults, - () -> Orchestrator.State.SYNC_VIA_API, - reachabilityMonitor, - true - ); + when(reachabilityMonitor.getObservable()).thenReturn(Observable.just(true)); + + orchestrator = new Orchestrator( + modelProvider, + schemaRegistry, + localStorageAdapter, + appSync, + DataStoreConfiguration::defaults, + () -> Orchestrator.State.SYNC_VIA_API, + reachabilityMonitor, + true + ); } /** - * When an item is placed into storage, a cascade of - * things happen which should ultimately result in a mutation call - * to the API category, with an {@link MutationType} corresponding to the type of - * modification that was made to the storage. + * When an item is placed into storage, a cascade of things happen which should ultimately result in a mutation call + * to the API category, with an {@link MutationType} corresponding to the type of modification that was made to the + * storage. * @throws AmplifyException On failure to load model schema into registry */ @SuppressWarnings("unchecked") // Casting ? in HubEvent to PendingMutation @Test public void itemsPlacedInStorageArePublishedToNetwork() throws AmplifyException { // Arrange: orchestrator is running - orchestrator.start().test(); + orchestrator.start().test().assertComplete(); orchestratorInitObserver.await(10, TimeUnit.SECONDS); - HubAccumulator accumulator = - HubAccumulator.create(HubChannel.DATASTORE, isProcessed(susan), 1) - .start(); + HubAccumulator accumulator = HubAccumulator.create(HubChannel.DATASTORE, isProcessed(susan), 1).start(); + // Act: Put BlogOwner into storage, and wait for it to complete. SynchronousStorageAdapter.delegatingTo(localStorageAdapter).save(susan); @@ -174,16 +156,16 @@ public void itemsPlacedInStorageArePublishedToNetwork() throws AmplifyException @Test public void preventConcurrentStateTransitions() { // Arrange: orchestrator is running - orchestrator.start().test(); + orchestrator.start().test().assertComplete(); // Try to start it in a new thread. boolean success = Completable.create(emitter -> { new Thread(() -> orchestrator.start() - .subscribe(emitter::onComplete, emitter::onError) + .subscribe(emitter::onComplete, emitter::onError) ).start(); // Try to start it again on the current thread. - orchestrator.start().test(); + orchestrator.start().test().assertComplete(); }).blockingAwait(5, TimeUnit.SECONDS); assertTrue("Failed to start orchestrator on a background thread", success); @@ -192,4 +174,25 @@ public void preventConcurrentStateTransitions() { assertTrue(orchestrator.stop().blockingAwait(5, TimeUnit.SECONDS)); } + + /** + * Verify that an error that occurs during a start/stop still releases the semaphore and allows retrying. + */ + @Test + public void preventLockupOnStartStopError() { + IllegalStateException exception = new IllegalStateException("Simulated failure"); + + // Not a particularly realistic failure, but the exact cause isn't important. + when(reachabilityMonitor.getObservable()).thenThrow(exception); + + // This will fail, but it should not leave the orchestrator in a bad state. + orchestrator.start().test().assertError(exception); + + // Reset the mock so that it won't throw an error on second attempt + reset(reachabilityMonitor); + when(reachabilityMonitor.getObservable()).thenReturn(Observable.just(true)); + + // Now we should be able to start successfully + orchestrator.start().test().assertComplete(); + } }