From 8539898fe10c17db6fccb7495d42bc98fa8fa97c Mon Sep 17 00:00:00 2001 From: Martin Bonnin Date: Tue, 10 Mar 2020 01:11:24 +0100 Subject: [PATCH] wrap more channel.offer calls Also remove `toChannel` extension functions in favor of `toFlow` --- .../apollo/coroutines/CoroutinesExtensions.kt | 127 ++---------------- .../apollo/CoroutinesApolloTest.kt | 114 ---------------- 2 files changed, 9 insertions(+), 232 deletions(-) diff --git a/apollo-coroutines-support/src/main/kotlin/com/apollographql/apollo/coroutines/CoroutinesExtensions.kt b/apollo-coroutines-support/src/main/kotlin/com/apollographql/apollo/coroutines/CoroutinesExtensions.kt index 9d8fb3012e1..8a8289798c1 100644 --- a/apollo-coroutines-support/src/main/kotlin/com/apollographql/apollo/coroutines/CoroutinesExtensions.kt +++ b/apollo-coroutines-support/src/main/kotlin/com/apollographql/apollo/coroutines/CoroutinesExtensions.kt @@ -11,35 +11,6 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.flow.* -private class ChannelCallback(val channel: Channel>) : ApolloCall.Callback() { - - override fun onResponse(response: Response) { - runCatching { - channel.offer(response) - } - } - - override fun onFailure(e: ApolloException) { - channel.close(e) - } - - override fun onStatusEvent(event: ApolloCall.StatusEvent) { - if (event == ApolloCall.StatusEvent.COMPLETED) { - channel.close() - } - } -} - -private fun checkCapacity(capacity: Int) { - when (capacity) { - Channel.UNLIMITED, - Channel.CONFLATED -> return - else -> - // Everything else than UNLIMITED or CONFLATED does not guarantee that channel.offer() succeeds all the time. - // We don't support these use cases for now - throw IllegalArgumentException("Bad channel capacity ($capacity). Only UNLIMITED and CONFLATED are supported") - } -} /** * Converts an [ApolloCall] to an [Flow]. @@ -52,7 +23,9 @@ fun ApolloCall.toFlow() = callbackFlow { clone().enqueue( object : ApolloCall.Callback() { override fun onResponse(response: Response) { - offer(response) + runCatching { + offer(response) + } } override fun onFailure(e: ApolloException) { @@ -80,7 +53,9 @@ fun ApolloQueryWatcher.toFlow() = callbackFlow { clone().enqueueAndWatch( object : ApolloCall.Callback() { override fun onResponse(response: Response) { - offer(response) + runCatching { + offer(response) + } } override fun onFailure(e: ApolloException) { @@ -97,28 +72,6 @@ fun ApolloQueryWatcher.toFlow() = callbackFlow { awaitClose { this@toFlow.cancel() } } -/** - * Converts an [ApolloCall] to an [Channel]. The number of values produced by the channel is based on the - * [com.apollographql.apollo.fetcher.ResponseFetcher] used with the call. - * - * @param the value type. - * @param capacity used for the underlying channel. Only [Channel.UNLIMITED] and [Channel.CONFLATED] are supported at the moment - * @throws IllegalArgumentException if capacity is not [Channel.UNLIMITED] or [Channel.CONFLATED] - * @return a channel which emits [Responses] - */ -@ExperimentalCoroutinesApi -@Deprecated(message = "Use toFlow instead", replaceWith = ReplaceWith("toFlow()")) -fun ApolloCall.toChannel(capacity: Int = Channel.UNLIMITED): Channel> { - checkCapacity(capacity) - val channel = Channel>(capacity) - - channel.invokeOnClose { - cancel() - } - enqueue(ChannelCallback(channel)) - - return channel -} /** * Converts an [ApolloCall] to an [Deferred]. This is a convenience method that will only return the first value emitted. @@ -152,70 +105,6 @@ fun ApolloCall.toDeferred(): Deferred> { return deferred } -/** - * Converts an [ApolloQueryWatcher] to an [Channel]. - * - * @param the value type. - * @param capacity used for the underlying channel. Only [Channel.UNLIMITED] and [Channel.CONFLATED] are supported at the moment - * @throws IllegalArgumentException if capacity is not [Channel.UNLIMITED] or [Channel.CONFLATED] - * @return a channel which emits [Responses] - */ -@ExperimentalCoroutinesApi -@Deprecated(message = "Use toFlow instead", replaceWith = ReplaceWith("toFlow()")) -fun ApolloQueryWatcher.toChannel(capacity: Int = Channel.UNLIMITED): Channel> { - checkCapacity(capacity) - val channel = Channel>(capacity) - channel.invokeOnClose { - cancel() - } - enqueueAndWatch(ChannelCallback(channel)) - - return channel -} - -/** - * Converts an [ApolloSubscriptionCall] to an [Channel]. - * - * @param the value type. - * @param capacity the {@link Capacity} used for the underlying channel. Only [Channel.UNLIMITED] and [Channel.CONFLATED] are supported - * at the moment - * @throws IllegalArgumentException if capacity is not [Channel.UNLIMITED] or [Channel.CONFLATED] - * @return a channel which emits [Responses] - */ -@ExperimentalCoroutinesApi -@Deprecated(message = "Use toFlow instead", replaceWith = ReplaceWith("toFlow()")) -fun ApolloSubscriptionCall.toChannel(capacity: Int = Channel.UNLIMITED): Channel> { - checkCapacity(capacity) - val channel = Channel>(capacity) - channel.invokeOnClose { - cancel() - } - execute(object : ApolloSubscriptionCall.Callback { - override fun onConnected() { - } - - override fun onResponse(response: Response) { - runCatching { - channel.offer(response) - } - } - - override fun onFailure(e: ApolloException) { - channel.close(e) - } - - override fun onCompleted() { - channel.close() - } - - override fun onTerminated() { - channel.close() - } - }) - - return channel -} - /** * Converts an [ApolloSubscriptionCall] to an [Flow]. * @@ -230,7 +119,9 @@ fun ApolloSubscriptionCall.toFlow(): Flow> = callbackFlow { } override fun onResponse(response: Response) { - channel.offer(response) + runCatching { + channel.offer(response) + } } override fun onFailure(e: ApolloException) { diff --git a/apollo-integration/src/test/java/com/apollographql/apollo/CoroutinesApolloTest.kt b/apollo-integration/src/test/java/com/apollographql/apollo/CoroutinesApolloTest.kt index dbe78d34591..eea1551ddc6 100644 --- a/apollo-integration/src/test/java/com/apollographql/apollo/CoroutinesApolloTest.kt +++ b/apollo-integration/src/test/java/com/apollographql/apollo/CoroutinesApolloTest.kt @@ -7,7 +7,6 @@ import com.apollographql.apollo.api.Input import com.apollographql.apollo.api.Response import com.apollographql.apollo.cache.normalized.lru.EvictionPolicy import com.apollographql.apollo.cache.normalized.lru.LruNormalizedCacheFactory -import com.apollographql.apollo.coroutines.toChannel import com.apollographql.apollo.coroutines.toDeferred import com.apollographql.apollo.coroutines.toFlow import com.apollographql.apollo.coroutines.toJob @@ -51,18 +50,6 @@ class CoroutinesApolloTest { .build() } - @Test - fun callChannelProducesValue() { - server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID)) - - val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).toChannel() - runBlocking { - val response = channel.receive() - - assertThat(response.data()!!.hero()!!.name()).isEqualTo("R2-D2") - } - } - @Test fun callDeferredProducesValue() { server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID)) @@ -73,22 +60,6 @@ class CoroutinesApolloTest { } } - @Test - fun errorIsTriggered() { - server.enqueue(MockResponse().setResponseCode(200).setBody("nonsense")) - - val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).toChannel() - runBlocking { - var exception: java.lang.Exception? = null - try { - channel.receive() - } catch (e: Exception) { - exception = e - } - assertThat(exception is ApolloParseException).isTrue() - } - } - @Test fun prefetchCompletes() { server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID)) @@ -111,91 +82,6 @@ class CoroutinesApolloTest { } } - @Test - fun queryWatcherUpdatedSameQueryDifferentResults() { - server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID)) - - val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel() - - server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_CHANGE)) - apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))) - .responseFetcher(ApolloResponseFetchers.NETWORK_ONLY) - .enqueue(null) - - runBlocking { - val response0 = channel.receive() - assertThat(response0.data()!!.hero()!!.name()).isEqualTo("R2-D2") - - val response1 = channel.receive() - assertThat(response1.data()!!.hero()!!.name()).isEqualTo("Artoo") - } - } - - @Test - fun queryWatcherNotUpdatedSameQuerySameResults() { - server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID)) - - val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel() - - server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID)) - apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))) - .responseFetcher(ApolloResponseFetchers.NETWORK_ONLY) - .enqueue(null) - - runBlocking { - val response0 = channel.receive() - assertThat(response0.data()!!.hero()!!.name()).isEqualTo("R2-D2") - - assertThat(channel.isEmpty).isEqualTo(true) - } - } - - @Test - fun queryWatcherUpdatedDifferentQueryDifferentResults() { - server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID)) - - val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel() - - server.enqueue(mockResponse("HeroAndFriendsNameWithIdsNameChange.json")) - apolloClient.query(HeroAndFriendsNamesWithIDsQuery(Input.fromNullable(Episode.NEWHOPE))) - .enqueue(null) - - runBlocking { - val response0 = channel.receive() - assertThat(response0.data()!!.hero()!!.name()).isEqualTo("R2-D2") - - val response1 = channel.receive() - assertThat(response1.data()!!.hero()!!.name()).isEqualTo("Artoo") - } - } - - @Test - fun queryWatcherUpdatedConflatedOnlyReturnsLastResult() { - server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID)) - - val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel(Channel.CONFLATED) - - server.enqueue(mockResponse("HeroAndFriendsNameWithIdsNameChange.json")) - apolloClient.query(HeroAndFriendsNamesWithIDsQuery(Input.fromNullable(Episode.NEWHOPE))) - .enqueue(null) - - runBlocking { - delay(500) - val response1 = channel.receive() - assertThat(response1.data()!!.hero()!!.name()).isEqualTo("Artoo") - } - } - - @Test - fun queryWatcherCancelClosesChannel() { - server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID)) - - val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel() - - channel.cancel() - assertThat(channel.isClosedForReceive).isEqualTo(true) - } - @Test fun flowCanBeRead() { server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))