Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datastore): Release semaphore if an error occurs while starting or stopping the Orchestrator #2398

Merged
merged 2 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions .idea/codeStyles/Project.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,13 @@ private Completable performSynchronized(Action action) {
"Retry your request."));
}
LOG.info("Orchestrator lock acquired.");
return Completable.fromAction(action)
.andThen(
Completable.fromAction(
() -> {
startStopSemaphore.release();
LOG.info("Orchestrator lock released.");
}
)
);
return Completable.fromAction(action).doOnError((e) -> {
startStopSemaphore.release();
LOG.info("Orchestrator lock released.");
}).andThen(Completable.fromAction(() -> {
startStopSemaphore.release();
LOG.info("Orchestrator lock released.");
}));
}

private void unknownState(State state) throws DataStoreException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@

package com.amplifyframework.datastore.syncengine;

import android.content.Context;
import androidx.annotation.NonNull;

import com.amplifyframework.AmplifyException;
import com.amplifyframework.api.graphql.GraphQLBehavior;
import com.amplifyframework.api.graphql.MutationType;
import com.amplifyframework.core.Amplify;
import com.amplifyframework.core.model.ModelProvider;
import com.amplifyframework.core.model.SchemaRegistry;
import com.amplifyframework.core.model.temporal.Temporal;
Expand All @@ -35,7 +31,6 @@
import com.amplifyframework.datastore.storage.SynchronousStorageAdapter;
import com.amplifyframework.hub.HubChannel;
import com.amplifyframework.hub.HubEvent;
import com.amplifyframework.logging.Logger;
import com.amplifyframework.testmodels.commentsblog.BlogOwner;
import com.amplifyframework.testutils.HubAccumulator;
import com.amplifyframework.testutils.mocks.ApiMocking;
Expand All @@ -57,21 +52,24 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Tests the {@link Orchestrator}.
*/
@RunWith(RobolectricTestRunner.class)
public final class OrchestratorTest {
private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-datastore:test");

private Orchestrator orchestrator;
private HubAccumulator orchestratorInitObserver;
private GraphQLBehavior mockApi;
private InMemoryStorageAdapter localStorageAdapter;
private BlogOwner susan;
private final GraphQLBehavior mockApi = mock(GraphQLBehavior.class);
private final InMemoryStorageAdapter localStorageAdapter = InMemoryStorageAdapter.create();
private final BlogOwner susan = BlogOwner.builder().name("Susan Quimby").build();

private final ReachabilityMonitor reachabilityMonitor = mock(ReachabilityMonitor.class);

/**
* Setup mocks and other common elements.
Expand All @@ -81,75 +79,59 @@ public final class OrchestratorTest {
@Before
public void setup() throws AmplifyException {
ShadowLog.stream = System.out;
// Arrange: create a BlogOwner
susan = BlogOwner.builder().name("Susan Quimby").build();

// SYNC_QUERIES_READY indicates that the sync queries have completed.
orchestratorInitObserver =
HubAccumulator.create(HubChannel.DATASTORE, DataStoreChannelEventName.SYNC_QUERIES_READY, 1)
.start();
.start();

ModelMetadata metadata = new ModelMetadata(susan.getId(),
false,
1,
Temporal.Timestamp.now());
ModelMetadata metadata = new ModelMetadata(
susan.getId(),
false,
1,
Temporal.Timestamp.now()
);
ModelWithMetadata<BlogOwner> modelWithMetadata = new ModelWithMetadata<>(susan, metadata);
// Mock behaviors from for the API category
mockApi = mock(GraphQLBehavior.class);
ApiMocking.mockSubscriptionStart(mockApi);
ApiMocking.mockSuccessfulMutation(mockApi, susan.getId(), modelWithMetadata);
ApiMocking.mockSuccessfulQuery(mockApi, modelWithMetadata);
AppSyncClient appSync = AppSyncClient.via(mockApi);

localStorageAdapter = InMemoryStorageAdapter.create();
ModelProvider modelProvider = SimpleModelProvider.withRandomVersion(BlogOwner.class);
SchemaRegistry schemaRegistry = SchemaRegistry.instance();
schemaRegistry.clear();
schemaRegistry.register(modelProvider.models());

ReachabilityMonitor reachabilityMonitor = new ReachabilityMonitor() {
@Override
public void configure(@NonNull Context context) { }

@Override
public void configure(@NonNull Context context, @NonNull ConnectivityProvider connectivityProvider) {}

@NonNull
@Override
public Observable<Boolean> getObservable() {
return Observable.just(true);
}
};

orchestrator =
new Orchestrator(modelProvider,
schemaRegistry,
localStorageAdapter,
appSync,
DataStoreConfiguration::defaults,
() -> Orchestrator.State.SYNC_VIA_API,
reachabilityMonitor,
true
);
when(reachabilityMonitor.getObservable()).thenReturn(Observable.just(true));

orchestrator = new Orchestrator(
modelProvider,
schemaRegistry,
localStorageAdapter,
appSync,
DataStoreConfiguration::defaults,
() -> Orchestrator.State.SYNC_VIA_API,
reachabilityMonitor,
true
);
}

/**
* When an item is placed into storage, a cascade of
* things happen which should ultimately result in a mutation call
* to the API category, with an {@link MutationType} corresponding to the type of
* modification that was made to the storage.
* When an item is placed into storage, a cascade of things happen which should ultimately result in a mutation call
* to the API category, with an {@link MutationType} corresponding to the type of modification that was made to the
* storage.
* @throws AmplifyException On failure to load model schema into registry
*/
@SuppressWarnings("unchecked") // Casting ? in HubEvent<?> to PendingMutation<? extends Model>
@Test
public void itemsPlacedInStorageArePublishedToNetwork() throws AmplifyException {
// Arrange: orchestrator is running
orchestrator.start().test();
orchestrator.start().test().assertComplete();

orchestratorInitObserver.await(10, TimeUnit.SECONDS);
HubAccumulator accumulator =
HubAccumulator.create(HubChannel.DATASTORE, isProcessed(susan), 1)
.start();
HubAccumulator accumulator = HubAccumulator.create(HubChannel.DATASTORE, isProcessed(susan), 1).start();

// Act: Put BlogOwner into storage, and wait for it to complete.
SynchronousStorageAdapter.delegatingTo(localStorageAdapter).save(susan);

Expand All @@ -174,16 +156,16 @@ public void itemsPlacedInStorageArePublishedToNetwork() throws AmplifyException
@Test
public void preventConcurrentStateTransitions() {
// Arrange: orchestrator is running
orchestrator.start().test();
orchestrator.start().test().assertComplete();

// Try to start it in a new thread.
boolean success = Completable.create(emitter -> {
new Thread(() -> orchestrator.start()
.subscribe(emitter::onComplete, emitter::onError)
.subscribe(emitter::onComplete, emitter::onError)
).start();

// Try to start it again on the current thread.
orchestrator.start().test();
orchestrator.start().test().assertComplete();
}).blockingAwait(5, TimeUnit.SECONDS);
assertTrue("Failed to start orchestrator on a background thread", success);

Expand All @@ -192,4 +174,25 @@ public void preventConcurrentStateTransitions() {

assertTrue(orchestrator.stop().blockingAwait(5, TimeUnit.SECONDS));
}

/**
* Verify that an error that occurs during a start/stop still releases the semaphore and allows retrying.
*/
@Test
public void preventLockupOnStartStopError() {
IllegalStateException exception = new IllegalStateException("Simulated failure");

// Not a particularly realistic failure, but the exact cause isn't important.
when(reachabilityMonitor.getObservable()).thenThrow(exception);

// This will fail, but it should not leave the orchestrator in a bad state.
orchestrator.start().test().assertError(exception);

// Reset the mock so that it won't throw an error on second attempt
reset(reachabilityMonitor);
when(reachabilityMonitor.getObservable()).thenReturn(Observable.just(true));

// Now we should be able to start successfully
orchestrator.start().test().assertComplete();
}
}