Skip to content

Commit

Permalink
Report watchInternal to incubating (apollographql#5875)
Browse files Browse the repository at this point in the history
* Report watchInternal to incubating

* Fix apiDump
  • Loading branch information
BoD committed Jul 1, 2024
1 parent 1b92dbd commit d7803c6
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.apollographql.apollo3.cache.normalized.api.RecordMerger
import com.apollographql.apollo3.cache.normalized.api.TypePolicyCacheKeyGenerator
import com.apollographql.apollo3.cache.normalized.internal.ApolloCacheInterceptor
import com.apollographql.apollo3.cache.normalized.internal.WatcherInterceptor
import com.apollographql.apollo3.cache.normalized.internal.WatcherSentinel
import com.apollographql.apollo3.exception.ApolloException
import com.apollographql.apollo3.exception.CacheMissException
import com.apollographql.apollo3.interceptor.ApolloInterceptor
Expand All @@ -42,9 +43,9 @@ import com.apollographql.apollo3.interceptor.AutoPersistedQueryInterceptor
import com.apollographql.apollo3.mpp.currentTimeMillis
import com.apollographql.apollo3.network.http.HttpInfo
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlin.jvm.JvmName
import kotlin.jvm.JvmOverloads

Expand Down Expand Up @@ -152,10 +153,7 @@ fun ApolloClient.Builder.logCacheMisses(
return addInterceptor(CacheMissLoggingInterceptor(log))
}

fun ApolloClient.Builder.store(
store: ApolloStore,
writeToCacheAsynchronously: Boolean = false,
): ApolloClient.Builder {
fun ApolloClient.Builder.store(store: ApolloStore, writeToCacheAsynchronously: Boolean = false): ApolloClient.Builder {
check(interceptors.none { it is AutoPersistedQueryInterceptor }) {
"Apollo: the normalized cache must be configured before the auto persisted queries"
}
Expand Down Expand Up @@ -188,8 +186,14 @@ fun <D : Query.Data> ApolloCall<D>.watch(
): Flow<ApolloResponse<D>> = throw UnsupportedOperationException("watch(fetchThrows: Boolean, refetchThrows: Boolean) is no longer supported, use watch() instead")

/**
* Gets the result from the network, then observes the cache for any changes.
* [fetchPolicy] will control how the result is first queried, while [refetchPolicy] will control the subsequent fetches.
* Gets initial response(s) then observes the cache for any changes.
*
* There is a guarantee that the cache is subscribed before the initial response(s) finish emitting. Any update to the cache done after the initial response(s) are received will be received.
*
* [fetchPolicy] controls how the result is first queried, while [refetchPolicy] will control the subsequent fetches.
*
* @see fetchPolicy
* @see refetchPolicy
*/
fun <D : Query.Data> ApolloCall<D>.watch(): Flow<ApolloResponse<D>> {
return flow {
Expand Down Expand Up @@ -223,14 +227,18 @@ fun <D : Query.Data> ApolloCall<D>.watch(): Flow<ApolloResponse<D>> {
}
}


copy().fetchPolicyInterceptor(refetchPolicyInterceptor)
.watch(response?.data)
.onStart {
if (lastResponse != null) {
emit(lastResponse!!)
.watchInternal(response?.data)
.collect {
if (it.exception === WatcherSentinel) {
if (lastResponse != null) {
emit(lastResponse!!)
lastResponse = null
}
} else {
emit(it)
}
}.collect {
emit(it)
}
}
}
Expand All @@ -240,6 +248,14 @@ fun <D : Query.Data> ApolloCall<D>.watch(): Flow<ApolloResponse<D>> {
* The fetch policy set by [fetchPolicy] will be used.
*/
fun <D : Query.Data> ApolloCall<D>.watch(data: D?): Flow<ApolloResponse<D>> {
return watchInternal(data).filter { it.exception !== WatcherSentinel }
}

/**
* Observes the cache for the given data. Unlike [watch], no initial request is executed on the network.
* The fetch policy set by [fetchPolicy] will be used.
*/
internal fun <D : Query.Data> ApolloCall<D>.watchInternal(data: D?): Flow<ApolloResponse<D>> {
return copy().addExecutionContext(WatchContext(data)).toFlow()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ import com.apollographql.apollo3.cache.normalized.ApolloStore
import com.apollographql.apollo3.cache.normalized.ApolloStoreInterceptor
import com.apollographql.apollo3.cache.normalized.api.dependentKeys
import com.apollographql.apollo3.cache.normalized.watchContext
import com.apollographql.apollo3.exception.DefaultApolloException
import com.apollographql.apollo3.interceptor.ApolloInterceptor
import com.apollographql.apollo3.interceptor.ApolloInterceptorChain
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onSubscription

internal val WatcherSentinel = DefaultApolloException("The watcher has started")

internal class WatcherInterceptor(val store: ApolloStore) : ApolloInterceptor, ApolloStoreInterceptor {
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
Expand All @@ -31,16 +37,26 @@ internal class WatcherInterceptor(val store: ApolloStore) : ApolloInterceptor, A
@Suppress("UNCHECKED_CAST")
var watchedKeys: Set<String>? = watchContext.data?.let { store.normalize(request.operation, it as D, customScalarAdapters).values.dependentKeys() }

return store.changedKeys
return (store.changedKeys as SharedFlow<Any>)
.onSubscription {
emit(Unit)
}
.filter { changedKeys ->
if (changedKeys !is Set<*>) {
return@filter true
}
watchedKeys == null || changedKeys.intersect(watchedKeys!!).isNotEmpty()
}.map {
chain.proceed(request.newBuilder().build())
.onEach { response ->
if (response.data != null) {
watchedKeys = store.normalize(request.operation, response.data!!, customScalarAdapters).values.dependentKeys()
if (it == Unit) {
flowOf(ApolloResponse.Builder(request.operation, request.requestUuid).exception(WatcherSentinel).build())
} else {
chain.proceed(request)
.onEach { response ->
if (response.data != null) {
watchedKeys = store.normalize(request.operation, response.data!!, customScalarAdapters).values.dependentKeys()
}
}
}
}
}
.flattenConcatPolyfill()
}
Expand Down
1 change: 1 addition & 0 deletions libraries/apollo-runtime/api/apollo-runtime.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ final class com.apollographql.apollo3/ApolloClient : com.apollographql.apollo3.a
final fun httpServerUrl(kotlin/String?): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.httpServerUrl|httpServerUrl(kotlin.String?){}[0]
final fun interceptors(kotlin.collections/List<com.apollographql.apollo3.interceptor/ApolloInterceptor>): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.interceptors|interceptors(kotlin.collections.List<com.apollographql.apollo3.interceptor.ApolloInterceptor>){}[0]
final fun networkTransport(com.apollographql.apollo3.network/NetworkTransport?): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.networkTransport|networkTransport(com.apollographql.apollo3.network.NetworkTransport?){}[0]
final fun removeHttpInterceptor(com.apollographql.apollo3.network.http/HttpInterceptor): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.removeHttpInterceptor|removeHttpInterceptor(com.apollographql.apollo3.network.http.HttpInterceptor){}[0]
final fun removeInterceptor(com.apollographql.apollo3.interceptor/ApolloInterceptor): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.removeInterceptor|removeInterceptor(com.apollographql.apollo3.interceptor.ApolloInterceptor){}[0]
final fun sendApqExtensions(kotlin/Boolean?): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.sendApqExtensions|sendApqExtensions(kotlin.Boolean?){}[0]
final fun sendDocument(kotlin/Boolean?): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.sendDocument|sendDocument(kotlin.Boolean?){}[0]
Expand Down

0 comments on commit d7803c6

Please sign in to comment.