Skip to content

Commit

Permalink
Update Threading.md and change the dispatcher earlier in the chain (#…
Browse files Browse the repository at this point in the history
…4319)

* Change the dispatcher earlier

* Update Threading.md

* remove defaultDispatcher

* restore Dispatchers.IO

* Kdoc

* use Unconfined

* revert dcebug

* KDoc nitpick

* update tests
  • Loading branch information
martinbonnin authored Mar 20, 2024
1 parent 0ee42a8 commit dcd27b9
Show file tree
Hide file tree
Showing 22 changed files with 243 additions and 224 deletions.
108 changes: 9 additions & 99 deletions design-docs/Threading.md
Original file line number Diff line number Diff line change
@@ -1,56 +1,33 @@
# Thoughts on Android threading and concurrency

A tentative high level overview of what threads are used and why we came to this. K/N makes the whole thing more complex than what it would have been on the JVM so this document tries to keep track of the various decisions

### Goals
* JVM should be at least as fast as 2.x
* Minimize number of threads and context changes

### Non-goals
* Atomic cache requests (see the [detailed explanation](#appendix-1-non-goal-atomic-cached-requests) at the end of this
document).
* Multi-threaded coroutines on native.
* JS is out of scope so far. It _should_ be easier since it's single threaded but it's left out for now.
* Linux/Windows is also out of scope.

A tentative high level overview of what threads are used and why we came to this.

At a high level, Apollo-Android does multiple things:
* HTTP requests
* Json deserialization (for data)
* Json serialization (for variables)
* Json deserialization (for data)
* Normalization
* Cache writing
* De-normalization
* Cache reading
* Cache writing
* Plumbing: the work of connecting the above steps together

Most of the above steps need to happen in a background thread as they are potentially CPU expensive (normalization), IO expensive (HTTP) or both (SQLite), except maybe the plumbing that is just passing plates around. That being said, running the plumbing in the main thread means that we have to pay the price of a context switch every time we need to perform an operation. So it makes sense to run the plumbing on the same thread as was previously executing to avoid the context switch.

## Mutable, shared state that requires synchronization

If everything were immutable, we could run each request on a separate thread and let them execute concurrently. Unfortunately, there is some state that requires synchronization:
Some state in the pipeline is shared require synchronisation:

* Normalized cache
* Store listeners
* Websocket IDLE timeout
* HTTP2 connection state with HTTP2 multiplexing, some state is needed there
* ResponseAdapterCache: This currently caches the `ResponseAdapters` so that they don't have to lookup their field `ResponseAdapters`. The fact that this is mutable and that it doesn't work for recursive models encourages to remove that behaviour and look up the custom scalar adapters every time.
* Websocket for subscription
* HTTP2 socket

**On the JVM**, synchronization is typically done using locks (or read-write locks for better granularity).

**On native**, with the [new Memory Manager](https://github.com/JetBrains/kotlin/blob/master/kotlin-native/NEW_MM.md),
it is now also possible to mutate states from different threads,
and [atomicfu](https://github.com/Kotlin/kotlinx.atomicfu) offers locks that work in multiplatform code.

The section below discuss what to use in what situation, starting with the bigger constraints.
Earlier versions of Apollo Kotlin used immutable data and the old K/N memory model to handle synchronisation. Newer versions use standard locking mechanisms for this.

## Why coroutines in interceptors?

We want to expose a `<Flow>` API so coroutines are definitely a must there. Internally, that isn't required though. Libraries like OkHttp or SqlDelight do most of their work without coroutines and expose a coroutine API at the very last moment, before returning to the user. We decided to go with coroutines in interceptors because:
* it handles cancellation automatically.
* more importantly, it doesn't keep a thread waiting while another operation like an Async HTTP request is executing (more on that [below](#sync-vs-async-http-requests)).

That last point is important. While cancellation could be implemented manually, implementing a state machine that waits on HTTP requests would be way harder and error-prone
Mainly for historical reasons. The coroutines APIs are convenient to use in Kotlin, expose a standard cancellation API, and it was decided to use them early in the Kotlin conversion.

## Sync vs Async HTTP requests

Expand All @@ -59,71 +36,4 @@ synchronous using semaphores, it is hard to do so because that would most likely
that would mean that we pay the context switching price in all cases and also that keeps a thread waiting just doing
nothing so a coroutine is way more efficient there.

**On the JVM**, there are less restrictions. OkHttp has as `synchronous` API that could potentially avoid a context
switch. One pitfall is cancellation as it would have to happen from a separate thread but that might actually work. Not
reusing the OkHttp threadpool means that it won't be able to be shared with other OkHttp clients but since GraphQL
usually connects to a single host, it's not clear what would be shared there.

## Sync vs Async Cache

**On iOS**, here as well, there isn't much choice as the cache is fundamentally mutable and will need to be run from its
own thread. The difference with NSURLSession is that we have more control over where the callback happens. We can decide
the thread where the work and callback happen.

**On the JVM**, the traditional way to do this would involve ReadWriteLock. ReadWriteLock allow:
* concurrent reads to the DB
* don't switch contexts

On the other hand,

* it's not clear if Android's SQLiteOpenHelper allows concurrent reads
* there's a price to take the lock. It would need to be measured how much it is. In high load, this might also keep threads waiting

**On the JVM**, using the async version would remove all contention on the database. It would also allow to handle cache writes asynchronously as a "Fire & Forget" thing. At this stage it's not clear which one would perform better so it should be configurable. Also we might need to debounce/read the previous value in which case we definitely need the return value.

## Conclusion

There are still questions:
* Can a synchronous OkHttp call be cancelled?
* Does Android allow concurrent SQLite reads?
* Do we need the return value from `cache.write()` or can this be debounced later on?

With all that, the typical flows should be:

* iOS (6 context changes)
* Callsite (Main) -> CacheRead (Cache) -> Plumbing (Main) -> HTTP (NSURLSession) -> Plumbing (Main) -> CacheWrite (Cache) -> Response (Main)
* JVM-synccache-asynchttp (4 context changes):
* Callsite (Main) -> CacheRead (IO) -> Plumbing (IO) -> HTTP (OkHttp) -> Plumbing (IO) -> CacheWrite (IO) -> Response (Main)
* JVM-synccache-synchttp (2 context changes):
* Callsite (Main) -> CacheRead (IO) -> Plumbing (IO) -> HTTP (IO) -> Plumbing (IO) -> CacheWrite (IO) -> Response (Main)
* JVM-asynccache-asynchttp (6 context changes):
* Callsite (Main) -> CacheRead (Cache) -> Plumbing (IO) -> HTTP (OkHttp) -> Plumbing (IO) -> CacheWrite (Cache) -> Response (Main)
* JVM-asynccache-synchttp (4 context changes):
* Callsite (Main) -> CacheRead (Cache) -> Plumbing (IO) -> HTTP (IO) -> Plumbing (IO) -> CacheWrite (Cache) -> Response (Main)

Note that plumbing above contains potentially not-cheap operations like normalization or serializing variables.

## Appendix-1 Non-goal: Atomic Cached Requests


Apollo Kotlin has no concept of "Atomic request". Launching the same request twice in a row will most likely end up in the request being sent to the network twice even if the first one will ultimately cache it (but this is not guaranteed either):

```kotlin
val response1 = launch {
// Since "hero" is not in cache, this will go to the network
apolloClient.query(HeroQuery()).execute()
}
val response2 = launch {
// This will most likely go to the network even though it's the same request as above
// If another request is modifying the cache, what is returned depends the timings of the different request
apolloClient.query(HeroQuery()).execute()
}
```

On the other hand, waiting for one query to complete before launching the next one is guaranteed to have a predictable cache state. Especially if asynchronous cache write is implemented, the second query should wait until the write is written by the first one to read the cache:

```kotlin
val response1 = apolloClient.query(HeroQuery()).execute()
// If no other request is executing and the first one was cached, response2 will return the cached result
val response2 = apolloClient.query(HeroQuery()).execute()
```
**On the JVM**, there are less restrictions. OkHttp has as synchronous API that [has proven to be quite efficient](https://github.com/grpc/grpc-java/issues/6696)/
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package com.apollographql.apollo3.cache.http

import com.apollographql.apollo3.ApolloClient
import com.apollographql.apollo3.ConcurrencyInfo
import com.apollographql.apollo3.api.ApolloRequest
import com.apollographql.apollo3.api.ApolloResponse
import com.apollographql.apollo3.api.ExecutionContext
Expand All @@ -23,7 +22,6 @@ import com.apollographql.apollo3.network.http.HttpInterceptor
import com.apollographql.apollo3.network.http.HttpInterceptorChain
import com.apollographql.apollo3.network.http.HttpNetworkTransport
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import okio.FileSystem
Expand Down Expand Up @@ -139,7 +137,7 @@ fun ApolloClient.Builder.httpCache(
}
}.onCompletion {
synchronized(apolloRequestToCacheKey) { apolloRequestToCacheKey.remove(request.requestUuid.toString()) }
}.flowOn(request.executionContext[ConcurrencyInfo]!!.dispatcher)
}
} else {
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,7 @@ package com.apollographql.apollo3.android
import androidx.test.espresso.IdlingResource
import com.apollographql.apollo3.ApolloClient
import com.apollographql.apollo3.api.ApolloRequest
import com.apollographql.apollo3.api.ApolloResponse
import com.apollographql.apollo3.api.Operation
import com.apollographql.apollo3.api.Subscription
import com.apollographql.apollo3.interceptor.ApolloInterceptor
import com.apollographql.apollo3.interceptor.ApolloInterceptorChain
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
import com.apollographql.apollo3.internal.ApolloClientListener

class ApolloIdlingResource(
private val resourceName: String,
Expand Down Expand Up @@ -46,21 +39,15 @@ class ApolloIdlingResource(
}

fun ApolloClient.Builder.idlingResource(idlingResource: ApolloIdlingResource): ApolloClient.Builder {
check(!interceptors.any { it is IdlingResourceInterceptor }) { "idlingResource was already set, can only be set once" }
return addInterceptor(IdlingResourceInterceptor(idlingResource))
return addListener(IdlingResourceListener(idlingResource))
}

private class IdlingResourceInterceptor(private val idlingResource: ApolloIdlingResource) : ApolloInterceptor {
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
// Do not update the idling resource on subscriptions as they will never terminate
return if (request.operation !is Subscription) {
chain.proceed(request).onStart {
idlingResource.operationStart()
}.onCompletion {
idlingResource.operationEnd()
}
} else {
chain.proceed(request)
}
private class IdlingResourceListener(private val idlingResource: ApolloIdlingResource) : ApolloClientListener {
override fun requestStarted(request: ApolloRequest<*>) {
idlingResource.operationStart()
}

override fun requestCompleted(request: ApolloRequest<*>) {
idlingResource.operationEnd()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import com.apollographql.apollo3.mpp.currentTimeMillis
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -107,7 +106,7 @@ internal class ApolloCacheInterceptor(
}

else -> error("Unknown operation ${request.operation}")
}.flowOn(request.executionContext[ConcurrencyInfo]!!.dispatcher)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.apollographql.apollo3.cache.normalized.api.NormalizedCacheFactory
import com.apollographql.apollo3.cache.normalized.api.TypePolicyCacheKeyGenerator
import com.apollographql.apollo3.cache.normalized.internal.ApolloCacheInterceptor
import com.apollographql.apollo3.cache.normalized.internal.WatcherInterceptor
import com.apollographql.apollo3.cache.normalized.internal.WatcherSentinel
import com.apollographql.apollo3.exception.ApolloException
import com.apollographql.apollo3.exception.CacheMissException
import com.apollographql.apollo3.interceptor.ApolloInterceptor
Expand All @@ -32,9 +33,9 @@ import com.apollographql.apollo3.interceptor.AutoPersistedQueryInterceptor
import com.apollographql.apollo3.mpp.currentTimeMillis
import com.apollographql.apollo3.network.http.HttpInfo
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlin.jvm.JvmName
import kotlin.jvm.JvmOverloads

Expand Down Expand Up @@ -144,7 +145,13 @@ fun <D : Query.Data> ApolloCall<D>.watch(

/**
* Gets initial response(s) then observes the cache for any changes.
*
* There is a guarantee that the cache is subscribed before the initial response(s) finish emitting. Any update to the cache done after the initial response(s) are received will be received.
*
* [fetchPolicy] controls how the result is first queried, while [refetchPolicy] will control the subsequent fetches.
*
* @see fetchPolicy
* @see refetchPolicy
*/
fun <D : Query.Data> ApolloCall<D>.watch(): Flow<ApolloResponse<D>> {
return flow {
Expand Down Expand Up @@ -178,14 +185,18 @@ fun <D : Query.Data> ApolloCall<D>.watch(): Flow<ApolloResponse<D>> {
}
}


copy().fetchPolicyInterceptor(refetchPolicyInterceptor)
.watch(response?.data)
.onStart {
if (lastResponse != null) {
emit(lastResponse!!)
.watchInternal(response?.data)
.collect {
if (it.exception === WatcherSentinel) {
if (lastResponse != null) {
emit(lastResponse!!)
lastResponse = null
}
} else {
emit(it)
}
}.collect {
emit(it)
}
}
}
Expand All @@ -195,6 +206,14 @@ fun <D : Query.Data> ApolloCall<D>.watch(): Flow<ApolloResponse<D>> {
* The fetch policy set by [fetchPolicy] will be used.
*/
fun <D : Query.Data> ApolloCall<D>.watch(data: D?): Flow<ApolloResponse<D>> {
return watchInternal(data).filter { it.exception !== WatcherSentinel }
}

/**
* Observes the cache for the given data. Unlike [watch], no initial request is executed on the network.
* The fetch policy set by [fetchPolicy] will be used.
*/
internal fun <D : Query.Data> ApolloCall<D>.watchInternal(data: D?): Flow<ApolloResponse<D>> {
return copy().addExecutionContext(WatchContext(data)).toFlow()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import com.apollographql.apollo3.mpp.currentTimeMillis
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -107,7 +106,7 @@ internal class ApolloCacheInterceptor(
}

else -> error("Unknown operation ${request.operation}")
}.flowOn(request.executionContext[ConcurrencyInfo]!!.dispatcher)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ internal class DefaultApolloStore(
// I think as long as the refetchPolicy is [FetchPolicy.CacheOnly] everything should be fine as there is no reentrant emission.
// If the refetechPolicy is something else, we should certainly try to detect it in the cache interceptor
extraBufferCapacity = 10,
onBufferOverflow = BufferOverflow.DROP_OLDEST
onBufferOverflow = BufferOverflow.DROP_LATEST
)

override val changedKeys = changedKeysEvents.asSharedFlow()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@ import com.apollographql.apollo3.api.Query
import com.apollographql.apollo3.cache.normalized.ApolloStore
import com.apollographql.apollo3.cache.normalized.api.dependentKeys
import com.apollographql.apollo3.cache.normalized.watchContext
import com.apollographql.apollo3.exception.DefaultApolloException
import com.apollographql.apollo3.interceptor.ApolloInterceptor
import com.apollographql.apollo3.interceptor.ApolloInterceptorChain
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onSubscription

internal val WatcherSentinel = DefaultApolloException("The watcher has started")

internal class WatcherInterceptor(val store: ApolloStore) : ApolloInterceptor {
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
Expand All @@ -30,16 +36,26 @@ internal class WatcherInterceptor(val store: ApolloStore) : ApolloInterceptor {
@Suppress("UNCHECKED_CAST")
var watchedKeys: Set<String>? = watchContext.data?.let { store.normalize(request.operation, it as D, customScalarAdapters).values.dependentKeys() }

return store.changedKeys
return (store.changedKeys as SharedFlow<Any>)
.onSubscription {
emit(Unit)
}
.filter { changedKeys ->
if (changedKeys !is Set<*>) {
return@filter true
}
watchedKeys == null || changedKeys.intersect(watchedKeys!!).isNotEmpty()
}.map {
chain.proceed(request.newBuilder().build())
.onEach { response ->
if (response.data != null) {
watchedKeys = store.normalize(request.operation, response.data!!, customScalarAdapters).values.dependentKeys()
if (it == Unit) {
flowOf(ApolloResponse.Builder(request.operation, request.requestUuid).exception(WatcherSentinel).build())
} else {
chain.proceed(request.newBuilder().build())
.onEach { response ->
if (response.data != null) {
watchedKeys = store.normalize(request.operation, response.data!!, customScalarAdapters).values.dependentKeys()
}
}
}
}
}
.flattenConcatPolyfill()
}
Expand Down
8 changes: 0 additions & 8 deletions libraries/apollo-runtime/api/android/apollo-runtime.api
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,6 @@ public final class com/apollographql/apollo3/AutoPersistedQueryInfoKt {
public static final fun getAutoPersistedQueryInfo (Lcom/apollographql/apollo3/api/ApolloResponse;)Lcom/apollographql/apollo3/AutoPersistedQueryInfo;
}

public final class com/apollographql/apollo3/ConcurrencyInfo : com/apollographql/apollo3/api/ExecutionContext$Element {
public static final field Key Lcom/apollographql/apollo3/ConcurrencyInfo$Key;
public fun <init> (Lkotlinx/coroutines/CoroutineDispatcher;Lkotlinx/coroutines/CoroutineScope;)V
public final fun getCoroutineScope ()Lkotlinx/coroutines/CoroutineScope;
public final fun getDispatcher ()Lkotlinx/coroutines/CoroutineDispatcher;
public fun getKey ()Lcom/apollographql/apollo3/api/ExecutionContext$Key;
}

public final class com/apollographql/apollo3/ConcurrencyInfo$Key : com/apollographql/apollo3/api/ExecutionContext$Key {
}

Expand Down
Loading

0 comments on commit dcd27b9

Please sign in to comment.