Skip to content

Commit

Permalink
Release semaphore if an error occurs while starting or stopping the O…
Browse files Browse the repository at this point in the history
…rchestrator
  • Loading branch information
mattcreaser committed Apr 19, 2023
1 parent dbf7bf0 commit 346dfa7
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 66 deletions.
46 changes: 42 additions & 4 deletions .idea/codeStyles/Project.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,13 @@ private Completable performSynchronized(Action action) {
"Retry your request."));
}
LOG.info("Orchestrator lock acquired.");
return Completable.fromAction(action)
.andThen(
Completable.fromAction(
() -> {
startStopSemaphore.release();
LOG.info("Orchestrator lock released.");
}
)
);
return Completable.fromAction(action).doOnError((e) -> {
startStopSemaphore.release();
LOG.info("Orchestrator lock released.");
}).andThen(Completable.fromAction(() -> {
startStopSemaphore.release();
LOG.info("Orchestrator lock released.");
}));
}

private void unknownState(State state) throws DataStoreException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@

package com.amplifyframework.datastore.syncengine;

import android.content.Context;
import androidx.annotation.NonNull;

import com.amplifyframework.AmplifyException;
import com.amplifyframework.api.graphql.GraphQLBehavior;
import com.amplifyframework.api.graphql.MutationType;
import com.amplifyframework.core.Amplify;
import com.amplifyframework.core.model.ModelProvider;
import com.amplifyframework.core.model.SchemaRegistry;
import com.amplifyframework.core.model.temporal.Temporal;
Expand All @@ -35,7 +31,6 @@
import com.amplifyframework.datastore.storage.SynchronousStorageAdapter;
import com.amplifyframework.hub.HubChannel;
import com.amplifyframework.hub.HubEvent;
import com.amplifyframework.logging.Logger;
import com.amplifyframework.testmodels.commentsblog.BlogOwner;
import com.amplifyframework.testutils.HubAccumulator;
import com.amplifyframework.testutils.mocks.ApiMocking;
Expand All @@ -57,21 +52,24 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Tests the {@link Orchestrator}.
*/
@RunWith(RobolectricTestRunner.class)
public final class OrchestratorTest {
private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-datastore:test");

private Orchestrator orchestrator;
private HubAccumulator orchestratorInitObserver;
private GraphQLBehavior mockApi;
private InMemoryStorageAdapter localStorageAdapter;
private BlogOwner susan;
private final GraphQLBehavior mockApi = mock(GraphQLBehavior.class);
private final InMemoryStorageAdapter localStorageAdapter = InMemoryStorageAdapter.create();
private final BlogOwner susan = BlogOwner.builder().name("Susan Quimby").build();

private final ReachabilityMonitor reachabilityMonitor = mock(ReachabilityMonitor.class);

/**
* Setup mocks and other common elements.
Expand All @@ -81,75 +79,59 @@ public final class OrchestratorTest {
@Before
public void setup() throws AmplifyException {
ShadowLog.stream = System.out;
// Arrange: create a BlogOwner
susan = BlogOwner.builder().name("Susan Quimby").build();

// SYNC_QUERIES_READY indicates that the sync queries have completed.
orchestratorInitObserver =
HubAccumulator.create(HubChannel.DATASTORE, DataStoreChannelEventName.SYNC_QUERIES_READY, 1)
.start();
.start();

ModelMetadata metadata = new ModelMetadata(susan.getId(),
false,
1,
Temporal.Timestamp.now());
ModelMetadata metadata = new ModelMetadata(
susan.getId(),
false,
1,
Temporal.Timestamp.now()
);
ModelWithMetadata<BlogOwner> modelWithMetadata = new ModelWithMetadata<>(susan, metadata);
// Mock behaviors from for the API category
mockApi = mock(GraphQLBehavior.class);
ApiMocking.mockSubscriptionStart(mockApi);
ApiMocking.mockSuccessfulMutation(mockApi, susan.getId(), modelWithMetadata);
ApiMocking.mockSuccessfulQuery(mockApi, modelWithMetadata);
AppSyncClient appSync = AppSyncClient.via(mockApi);

localStorageAdapter = InMemoryStorageAdapter.create();
ModelProvider modelProvider = SimpleModelProvider.withRandomVersion(BlogOwner.class);
SchemaRegistry schemaRegistry = SchemaRegistry.instance();
schemaRegistry.clear();
schemaRegistry.register(modelProvider.models());

ReachabilityMonitor reachabilityMonitor = new ReachabilityMonitor() {
@Override
public void configure(@NonNull Context context) { }

@Override
public void configure(@NonNull Context context, @NonNull ConnectivityProvider connectivityProvider) {}

@NonNull
@Override
public Observable<Boolean> getObservable() {
return Observable.just(true);
}
};

orchestrator =
new Orchestrator(modelProvider,
schemaRegistry,
localStorageAdapter,
appSync,
DataStoreConfiguration::defaults,
() -> Orchestrator.State.SYNC_VIA_API,
reachabilityMonitor,
true
);
when(reachabilityMonitor.getObservable()).thenReturn(Observable.just(true));

orchestrator = new Orchestrator(
modelProvider,
schemaRegistry,
localStorageAdapter,
appSync,
DataStoreConfiguration::defaults,
() -> Orchestrator.State.SYNC_VIA_API,
reachabilityMonitor,
true
);
}

/**
* When an item is placed into storage, a cascade of
* things happen which should ultimately result in a mutation call
* to the API category, with an {@link MutationType} corresponding to the type of
* modification that was made to the storage.
* When an item is placed into storage, a cascade of things happen which should ultimately result in a mutation call
* to the API category, with an {@link MutationType} corresponding to the type of modification that was made to the
* storage.
* @throws AmplifyException On failure to load model schema into registry
*/
@SuppressWarnings("unchecked") // Casting ? in HubEvent<?> to PendingMutation<? extends Model>
@Test
public void itemsPlacedInStorageArePublishedToNetwork() throws AmplifyException {
// Arrange: orchestrator is running
orchestrator.start().test();
orchestrator.start().test().assertComplete();

orchestratorInitObserver.await(10, TimeUnit.SECONDS);
HubAccumulator accumulator =
HubAccumulator.create(HubChannel.DATASTORE, isProcessed(susan), 1)
.start();
HubAccumulator accumulator = HubAccumulator.create(HubChannel.DATASTORE, isProcessed(susan), 1).start();

// Act: Put BlogOwner into storage, and wait for it to complete.
SynchronousStorageAdapter.delegatingTo(localStorageAdapter).save(susan);

Expand All @@ -174,16 +156,16 @@ public void itemsPlacedInStorageArePublishedToNetwork() throws AmplifyException
@Test
public void preventConcurrentStateTransitions() {
// Arrange: orchestrator is running
orchestrator.start().test();
orchestrator.start().test().assertComplete();

// Try to start it in a new thread.
boolean success = Completable.create(emitter -> {
new Thread(() -> orchestrator.start()
.subscribe(emitter::onComplete, emitter::onError)
.subscribe(emitter::onComplete, emitter::onError)
).start();

// Try to start it again on the current thread.
orchestrator.start().test();
orchestrator.start().test().assertComplete();
}).blockingAwait(5, TimeUnit.SECONDS);
assertTrue("Failed to start orchestrator on a background thread", success);

Expand All @@ -192,4 +174,25 @@ public void preventConcurrentStateTransitions() {

assertTrue(orchestrator.stop().blockingAwait(5, TimeUnit.SECONDS));
}

/**
* Verify that an error that occurs during a start/stop still releases the semaphore and allows retrying.
*/
@Test
public void preventLockupOnStartStopError() {
IllegalStateException exception = new IllegalStateException("Simulated failure");

// Not a particularly realistic failure, but the exact cause isn't important.
when(reachabilityMonitor.getObservable()).thenThrow(exception);

// This will fail, but it should not leave the orchestrator in a bad state.
orchestrator.start().test().assertError(exception);

// Reset the mock so that it won't throw an error on second attempt
reset(reachabilityMonitor);
when(reachabilityMonitor.getObservable()).thenReturn(Observable.just(true));

// Now we should be able to start successfully
orchestrator.start().test().assertComplete();
}
}

0 comments on commit 346dfa7

Please sign in to comment.