Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datastore): Add missing onError handlers #2844

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions aws-datastore/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ android {
}

dependencies {
compileOnly(libs.rxlint)

implementation(project(":core"))
implementation(project(":aws-core"))
implementation(project(":aws-api-appsync"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;

/**
Expand Down Expand Up @@ -99,6 +101,12 @@ public final class AWSDataStorePlugin extends DataStorePlugin<Void> {

private final ReachabilityMonitor reachabilityMonitor;

// Subscriptions that should be disposed when datastore is stopped
private final CompositeDisposable startedDisposables = new CompositeDisposable();

// Subscriptions that have the same lifetime as the plugin
private final CompositeDisposable pluginDisposables = new CompositeDisposable();

private AWSDataStorePlugin(
@NonNull ModelProvider modelProvider,
@NonNull SchemaRegistry schemaRegistry,
Expand Down Expand Up @@ -288,7 +296,11 @@ private void configure(Context context, DataStoreConfiguration configuration) {

reachabilityMonitor.configure(context);

waitForInitialization().subscribe(this::observeNetworkStatus);
Disposable subscription = waitForInitialization().subscribe(
this::observeNetworkStatus,
error -> LOG.error("Datastore did not initialize", error)
);
pluginDisposables.add(subscription);
}

private void publishNetworkStatusEvent(boolean active) {
Expand All @@ -298,17 +310,27 @@ private void publishNetworkStatusEvent(boolean active) {

@SuppressLint({"CheckResult", "RxLeakedSubscription", "RxSubscribeOnError"})
private void observeNetworkStatus() {
reachabilityMonitor.getObservable()
.subscribe(this::publishNetworkStatusEvent);
Disposable subscription = reachabilityMonitor.getObservable()
.subscribe(
this::publishNetworkStatusEvent,
error -> LOG.warn("Unable to subscribe to network status events", error)
);
pluginDisposables.add(subscription);
}

@SuppressLint("CheckResult")
@WorkerThread
@Override
public void initialize(@NonNull Context context) throws AmplifyException {
try {
initializeStorageAdapter(context)
.blockingAwait(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
boolean initialized = initializeStorageAdapter(context)
.blockingAwait(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (!initialized) {
throw new DataStoreException(
"Storage adapter did not initialize within allotted timeout",
AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION
);
}
} catch (Throwable initError) {
throw new AmplifyException(
"Failed to initialize the local storage adapter for the DataStore plugin.",
Expand All @@ -332,8 +354,8 @@ private Completable initializeStorageAdapter(Context context) {

@SuppressLint("RxDefaultScheduler")
private Completable waitForInitialization() {
return Completable.fromAction(() -> categoryInitializationsPending.await())
.timeout(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
return Completable.fromAction(categoryInitializationsPending::await)
.timeout(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS, Schedulers.io())
.subscribeOn(Schedulers.io())
.doOnComplete(() -> LOG.info("DataStore plugin initialized."))
.doOnError(error -> LOG.error("DataStore initialization timed out.", error));
Expand All @@ -345,13 +367,14 @@ private Completable waitForInitialization() {
@SuppressLint({"RxLeakedSubscription", "CheckResult"})
@Override
public void start(@NonNull Action onComplete, @NonNull Consumer<DataStoreException> onError) {
waitForInitialization()
Disposable subscription = waitForInitialization()
.andThen(orchestrator.start())
.subscribeOn(Schedulers.io())
.subscribe(
onComplete::call,
error -> onError.accept(new DataStoreException("Failed to start DataStore.", error, "Retry."))
);
startedDisposables.add(subscription);
}

/**
Expand All @@ -360,13 +383,15 @@ public void start(@NonNull Action onComplete, @NonNull Consumer<DataStoreExcepti
@SuppressLint({"RxLeakedSubscription", "CheckResult"})
@Override
public void stop(@NonNull Action onComplete, @NonNull Consumer<DataStoreException> onError) {
waitForInitialization()
startedDisposables.dispose();
Disposable subscription = waitForInitialization()
.andThen(orchestrator.stop())
.subscribeOn(Schedulers.io())
.subscribe(
onComplete::call,
error -> onError.accept(new DataStoreException("Failed to stop DataStore.", error, "Retry."))
);
startedDisposables.add(subscription);
}

/**
Expand All @@ -379,12 +404,19 @@ public void stop(@NonNull Action onComplete, @NonNull Consumer<DataStoreExceptio
*/
@Override
public void clear(@NonNull Action onComplete, @NonNull Consumer<DataStoreException> onError) {
stop(() -> Completable.create(emitter -> sqliteStorageAdapter.clear(emitter::onComplete, emitter::onError))
.subscribeOn(Schedulers.io())
.subscribe(onComplete::call,
throwable -> onError.accept(new DataStoreException("Clear operation failed",
throwable, AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION))),
onError);
stop(
() -> {
Disposable completable = Completable.create(
emitter -> sqliteStorageAdapter.clear(emitter::onComplete, emitter::onError)
)
.subscribeOn(Schedulers.io())
.subscribe(onComplete::call,
throwable -> onError.accept(new DataStoreException("Clear operation failed",
throwable, AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION)));
pluginDisposables.add(completable);
},
onError
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,11 @@ private void onApiSyncFailure(Throwable exception) {
return;
}
LOG.warn("API sync failed - transitioning to LOCAL_ONLY.", exception);
Completable.fromAction(this::transitionToLocalOnly)
.doOnError(error -> LOG.warn("Transition to LOCAL_ONLY failed.", error))
.subscribe();
try {
transitionToLocalOnly();
} catch (Exception error) {
LOG.warn("Transition to LOCAL_ONLY failed.", error);
}
}

private void disposeNetworkChanges() {
Expand All @@ -422,13 +424,16 @@ private void monitorNetworkChanges() {
monitorNetworkChangesDisposable = reachabilityMonitor.getObservable()
.skip(1) // We skip the current online state, we only care about transitions
.filter(ignore -> !State.STOPPED.equals(currentState.get()))
.subscribe(isOnline -> {
if (isOnline) {
transitionToApiSync();
} else {
transitionToLocalOnly();
}
});
.subscribe(
isOnline -> {
if (isOnline) {
transitionToApiSync();
} else {
transitionToLocalOnly();
}
},
error -> LOG.warn("Error observing network changes", error)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.schedulers.Schedulers;

/**
* Class that defines inner classes and interfaces related to retry strategies.
Expand Down Expand Up @@ -71,7 +72,11 @@ public boolean retryHandler(int attemptNumber, Throwable throwable) {
} else {
final long waitTimeSeconds = Double.valueOf(Math.pow(2, attemptNumber % maxExponent)).longValue();
LOG.debug("Waiting " + waitTimeSeconds + " seconds before retrying");
Completable.timer(TimeUnit.SECONDS.toMillis(waitTimeSeconds), TimeUnit.MILLISECONDS).blockingAwait();
Completable.timer(
TimeUnit.SECONDS.toMillis(waitTimeSeconds),
TimeUnit.MILLISECONDS,
Schedulers.io()
).blockingAwait();
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,10 @@ void startDrainingMutationBuffer() {
.flatMapCompletable(this::mergeEvent)
.doOnError(failure -> LOG.warn("Reading subscriptions buffer has failed.", failure))
.doOnComplete(() -> LOG.warn("Reading from subscriptions buffer is completed."))
.subscribe()
.subscribe(
() -> LOG.info("Subscription data buffer processing complete"),
error -> LOG.warn("Error draining subscription data buffer", error)
)
);
}

Expand Down
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ oauth2 = "0.26.0"
okhttp = "5.0.0-alpha.11"
robolectric = "4.7"
rxjava = "3.0.6"
rxlint = "1.7.8"
slf4j = "2.0.6"
sqlcipher = "4.5.4"
tensorflow = "2.0.0"
Expand Down Expand Up @@ -97,6 +98,7 @@ maplibre-sdk = { module = "org.maplibre.gl:android-sdk", version.ref = "maplibre
oauth2 = { module = "com.google.auth:google-auth-library-oauth2-http", version.ref = "oauth2" }
okhttp = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" }
rxjava = { module = "io.reactivex.rxjava3:rxjava", version.ref = "rxjava" }
rxlint = { module = "nl.littlerobots.rxlint:rxlint", version.ref = "rxlint" }
slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j"}
sqlcipher= { module = "net.zetetic:android-database-sqlcipher", version.ref = "sqlcipher" }
tensorflow = { module = "org.tensorflow:tensorflow-lite", version.ref="tensorflow" }
Expand Down
Loading