Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datastore): register network callback only once in reachability monitor #2434

Merged
merged 3 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import android.net.Network
import androidx.annotation.VisibleForTesting
import com.amplifyframework.datastore.DataStoreException
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.ObservableEmitter
import io.reactivex.rxjava3.core.ObservableOnSubscribe
import io.reactivex.rxjava3.subjects.BehaviorSubject
import java.util.concurrent.TimeUnit

/**
Expand Down Expand Up @@ -54,25 +53,33 @@ public interface ReachabilityMonitor {
}

private class ReachabilityMonitorImpl constructor(val schedulerProvider: SchedulerProvider) : ReachabilityMonitor {
private var emitter: ObservableOnSubscribe<Boolean>? = null
private val subject = BehaviorSubject.create<Boolean>()
private var connectivityProvider: ConnectivityProvider? = null

override fun configure(context: Context) {
return configure(context, DefaultConnectivityProvider())
}

override fun configure(context: Context, connectivityProvider: ConnectivityProvider) {
emitter = ObservableOnSubscribe { emitter ->
val callback = getCallback(emitter)
connectivityProvider.registerDefaultNetworkCallback(context, callback)
// Provide the current network status upon subscription.
emitter.onNext(connectivityProvider.hasActiveNetwork)
}
this.connectivityProvider = connectivityProvider
connectivityProvider.registerDefaultNetworkCallback(
context,
object : NetworkCallback() {
override fun onAvailable(network: Network) {
subject.onNext(true)
}

override fun onLost(network: Network) {
subject.onNext(false)
}
}
)
}

override fun getObservable(): Observable<Boolean> {
emitter?.let { emitter ->
return Observable.create(emitter)
.subscribeOn(schedulerProvider.io())
connectivityProvider?.let { connectivityProvider ->
return subject.subscribeOn(schedulerProvider.io())
.doOnSubscribe { subject.onNext(connectivityProvider.hasActiveNetwork) }
.debounce(250, TimeUnit.MILLISECONDS, schedulerProvider.computation())
} ?: run {
throw DataStoreException(
Expand All @@ -81,17 +88,6 @@ private class ReachabilityMonitorImpl constructor(val schedulerProvider: Schedul
)
}
}

private fun getCallback(emitter: ObservableEmitter<Boolean>): NetworkCallback {
return object : NetworkCallback() {
override fun onAvailable(network: Network) {
emitter.onNext(true)
}
override fun onLost(network: Network) {
emitter.onNext(false)
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.schedulers.TestScheduler
import io.reactivex.rxjava3.subscribers.TestSubscriber
import java.util.concurrent.TimeUnit
import org.junit.Assert.assertEquals
import org.junit.Test
import org.mockito.Mockito.mock

Expand Down Expand Up @@ -69,4 +70,47 @@ class ReachabilityMonitorTest {

testSubscriber.assertValues(true, false, true, true)
}

/**
* Test that calling getObservable() multiple times only results in the network
* callback being registered once.
*/
@Test
fun testNetworkCallbackRegisteredOnce() {
var networkCallback: ConnectivityManager.NetworkCallback? = null
var numCallbacksRegistered = 0

val connectivityProvider = object : ConnectivityProvider {
override val hasActiveNetwork: Boolean
get() = run {
return true
}
override fun registerDefaultNetworkCallback(
context: Context,
callback: ConnectivityManager.NetworkCallback
) {
networkCallback = callback
numCallbacksRegistered += 1
}
}

// TestScheduler allows the virtual time to be advanced by exact amounts, to allow for repeatable tests
val testScheduler = TestScheduler()
val reachabilityMonitor = ReachabilityMonitor.createForTesting(TestSchedulerProvider(testScheduler))
val mockContext = mock(Context::class.java)
reachabilityMonitor.configure(mockContext, connectivityProvider)

reachabilityMonitor.getObservable().subscribe()
val network = mock(Network::class.java)
// Should provide initial network state (true) upon subscription (after debounce)
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)
networkCallback!!.onAvailable(network)

reachabilityMonitor.getObservable().subscribe()
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)
networkCallback!!.onAvailable(network)

// Only 1 network callback should be registered
assertEquals(1, numCallbacksRegistered)
}
}
2 changes: 1 addition & 1 deletion aws-push-notifications-pinpoint-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ dependencies {
//noinspection GradleDependency
implementation("androidx.lifecycle:lifecycle-livedata-ktx:$lifecycleVersion")
//noinspection GradleDependency
implementation("com.google.android.material:material:1.8.0")
implementation(dependency.google.material)

testImplementation(testDependency.junit)
testImplementation(testDependency.mockk)
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ dependencyResolutionManagement {
library("rxjava", "io.reactivex.rxjava3:rxjava:3.0.6")

// Google
library("google-material", "com.google.android.material:material:1.4.0")
library("google-material", "com.google.android.material:material:1.8.0")
library("firebasemessaging", "com.google.firebase:firebase-messaging-ktx:23.1.0")

// Misc
Expand Down