Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report watchInternal to incubating #5875

Merged
merged 2 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.apollographql.apollo3.cache.normalized.api.RecordMerger
import com.apollographql.apollo3.cache.normalized.api.TypePolicyCacheKeyGenerator
import com.apollographql.apollo3.cache.normalized.internal.ApolloCacheInterceptor
import com.apollographql.apollo3.cache.normalized.internal.WatcherInterceptor
import com.apollographql.apollo3.cache.normalized.internal.WatcherSentinel
import com.apollographql.apollo3.exception.ApolloException
import com.apollographql.apollo3.exception.CacheMissException
import com.apollographql.apollo3.interceptor.ApolloInterceptor
Expand All @@ -42,9 +43,9 @@ import com.apollographql.apollo3.interceptor.AutoPersistedQueryInterceptor
import com.apollographql.apollo3.mpp.currentTimeMillis
import com.apollographql.apollo3.network.http.HttpInfo
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlin.jvm.JvmName
import kotlin.jvm.JvmOverloads

Expand Down Expand Up @@ -152,10 +153,7 @@ fun ApolloClient.Builder.logCacheMisses(
return addInterceptor(CacheMissLoggingInterceptor(log))
}

fun ApolloClient.Builder.store(
store: ApolloStore,
writeToCacheAsynchronously: Boolean = false,
): ApolloClient.Builder {
fun ApolloClient.Builder.store(store: ApolloStore, writeToCacheAsynchronously: Boolean = false): ApolloClient.Builder {
check(interceptors.none { it is AutoPersistedQueryInterceptor }) {
"Apollo: the normalized cache must be configured before the auto persisted queries"
}
Expand Down Expand Up @@ -188,8 +186,14 @@ fun <D : Query.Data> ApolloCall<D>.watch(
): Flow<ApolloResponse<D>> = throw UnsupportedOperationException("watch(fetchThrows: Boolean, refetchThrows: Boolean) is no longer supported, use watch() instead")

/**
* Gets the result from the network, then observes the cache for any changes.
* [fetchPolicy] will control how the result is first queried, while [refetchPolicy] will control the subsequent fetches.
* Gets initial response(s) then observes the cache for any changes.
*
* There is a guarantee that the cache is subscribed before the initial response(s) finish emitting. Any update to the cache done after the initial response(s) are received will be received.
*
* [fetchPolicy] controls how the result is first queried, while [refetchPolicy] will control the subsequent fetches.
*
* @see fetchPolicy
* @see refetchPolicy
*/
fun <D : Query.Data> ApolloCall<D>.watch(): Flow<ApolloResponse<D>> {
return flow {
Expand Down Expand Up @@ -223,14 +227,18 @@ fun <D : Query.Data> ApolloCall<D>.watch(): Flow<ApolloResponse<D>> {
}
}


copy().fetchPolicyInterceptor(refetchPolicyInterceptor)
.watch(response?.data)
.onStart {
if (lastResponse != null) {
emit(lastResponse!!)
.watchInternal(response?.data)
.collect {
if (it.exception === WatcherSentinel) {
if (lastResponse != null) {
emit(lastResponse!!)
lastResponse = null
}
} else {
emit(it)
}
}.collect {
emit(it)
}
}
}
Expand All @@ -240,6 +248,14 @@ fun <D : Query.Data> ApolloCall<D>.watch(): Flow<ApolloResponse<D>> {
* The fetch policy set by [fetchPolicy] will be used.
*/
fun <D : Query.Data> ApolloCall<D>.watch(data: D?): Flow<ApolloResponse<D>> {
return watchInternal(data).filter { it.exception !== WatcherSentinel }
}

/**
* Observes the cache for the given data. Unlike [watch], no initial request is executed on the network.
* The fetch policy set by [fetchPolicy] will be used.
*/
internal fun <D : Query.Data> ApolloCall<D>.watchInternal(data: D?): Flow<ApolloResponse<D>> {
return copy().addExecutionContext(WatchContext(data)).toFlow()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ import com.apollographql.apollo3.cache.normalized.ApolloStore
import com.apollographql.apollo3.cache.normalized.ApolloStoreInterceptor
import com.apollographql.apollo3.cache.normalized.api.dependentKeys
import com.apollographql.apollo3.cache.normalized.watchContext
import com.apollographql.apollo3.exception.DefaultApolloException
import com.apollographql.apollo3.interceptor.ApolloInterceptor
import com.apollographql.apollo3.interceptor.ApolloInterceptorChain
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onSubscription

internal val WatcherSentinel = DefaultApolloException("The watcher has started")

internal class WatcherInterceptor(val store: ApolloStore) : ApolloInterceptor, ApolloStoreInterceptor {
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
Expand All @@ -31,16 +37,26 @@ internal class WatcherInterceptor(val store: ApolloStore) : ApolloInterceptor, A
@Suppress("UNCHECKED_CAST")
var watchedKeys: Set<String>? = watchContext.data?.let { store.normalize(request.operation, it as D, customScalarAdapters).values.dependentKeys() }

return store.changedKeys
return (store.changedKeys as SharedFlow<Any>)
.onSubscription {
emit(Unit)
}
.filter { changedKeys ->
if (changedKeys !is Set<*>) {
return@filter true
}
watchedKeys == null || changedKeys.intersect(watchedKeys!!).isNotEmpty()
}.map {
chain.proceed(request.newBuilder().build())
.onEach { response ->
if (response.data != null) {
watchedKeys = store.normalize(request.operation, response.data!!, customScalarAdapters).values.dependentKeys()
if (it == Unit) {
flowOf(ApolloResponse.Builder(request.operation, request.requestUuid).exception(WatcherSentinel).build())
} else {
chain.proceed(request)
.onEach { response ->
if (response.data != null) {
watchedKeys = store.normalize(request.operation, response.data!!, customScalarAdapters).values.dependentKeys()
}
}
}
}
}
.flattenConcatPolyfill()
}
Expand Down
1 change: 1 addition & 0 deletions libraries/apollo-runtime/api/apollo-runtime.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ final class com.apollographql.apollo3/ApolloClient : com.apollographql.apollo3.a
final fun httpServerUrl(kotlin/String?): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.httpServerUrl|httpServerUrl(kotlin.String?){}[0]
final fun interceptors(kotlin.collections/List<com.apollographql.apollo3.interceptor/ApolloInterceptor>): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.interceptors|interceptors(kotlin.collections.List<com.apollographql.apollo3.interceptor.ApolloInterceptor>){}[0]
final fun networkTransport(com.apollographql.apollo3.network/NetworkTransport?): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.networkTransport|networkTransport(com.apollographql.apollo3.network.NetworkTransport?){}[0]
final fun removeHttpInterceptor(com.apollographql.apollo3.network.http/HttpInterceptor): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.removeHttpInterceptor|removeHttpInterceptor(com.apollographql.apollo3.network.http.HttpInterceptor){}[0]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(unrelated, this was missing from #5858)

final fun removeInterceptor(com.apollographql.apollo3.interceptor/ApolloInterceptor): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.removeInterceptor|removeInterceptor(com.apollographql.apollo3.interceptor.ApolloInterceptor){}[0]
final fun sendApqExtensions(kotlin/Boolean?): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.sendApqExtensions|sendApqExtensions(kotlin.Boolean?){}[0]
final fun sendDocument(kotlin/Boolean?): com.apollographql.apollo3/ApolloClient.Builder // com.apollographql.apollo3/ApolloClient.Builder.sendDocument|sendDocument(kotlin.Boolean?){}[0]
Expand Down
Loading