Skip to content

Commit

Permalink
wrap more channel.offer calls (#2058)
Browse files Browse the repository at this point in the history
Also remove `toChannel` extension functions in favor of `toFlow`
  • Loading branch information
martinbonnin authored Mar 11, 2020
1 parent b22ebcd commit 7e4dca7
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 232 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,6 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.*

private class ChannelCallback<T>(val channel: Channel<Response<T>>) : ApolloCall.Callback<T>() {

override fun onResponse(response: Response<T>) {
runCatching {
channel.offer(response)
}
}

override fun onFailure(e: ApolloException) {
channel.close(e)
}

override fun onStatusEvent(event: ApolloCall.StatusEvent) {
if (event == ApolloCall.StatusEvent.COMPLETED) {
channel.close()
}
}
}

private fun checkCapacity(capacity: Int) {
when (capacity) {
Channel.UNLIMITED,
Channel.CONFLATED -> return
else ->
// Everything else than UNLIMITED or CONFLATED does not guarantee that channel.offer() succeeds all the time.
// We don't support these use cases for now
throw IllegalArgumentException("Bad channel capacity ($capacity). Only UNLIMITED and CONFLATED are supported")
}
}

/**
* Converts an [ApolloCall] to an [Flow].
Expand All @@ -52,7 +23,9 @@ fun <T> ApolloCall<T>.toFlow() = callbackFlow {
clone().enqueue(
object : ApolloCall.Callback<T>() {
override fun onResponse(response: Response<T>) {
offer(response)
runCatching {
offer(response)
}
}

override fun onFailure(e: ApolloException) {
Expand Down Expand Up @@ -80,7 +53,9 @@ fun <T> ApolloQueryWatcher<T>.toFlow() = callbackFlow {
clone().enqueueAndWatch(
object : ApolloCall.Callback<T>() {
override fun onResponse(response: Response<T>) {
offer(response)
runCatching {
offer(response)
}
}

override fun onFailure(e: ApolloException) {
Expand All @@ -97,28 +72,6 @@ fun <T> ApolloQueryWatcher<T>.toFlow() = callbackFlow {
awaitClose { this@toFlow.cancel() }
}

/**
* Converts an [ApolloCall] to an [Channel]. The number of values produced by the channel is based on the
* [com.apollographql.apollo.fetcher.ResponseFetcher] used with the call.
*
* @param <T> the value type.
* @param capacity used for the underlying channel. Only [Channel.UNLIMITED] and [Channel.CONFLATED] are supported at the moment
* @throws IllegalArgumentException if capacity is not [Channel.UNLIMITED] or [Channel.CONFLATED]
* @return a channel which emits [Responses<T>]
*/
@ExperimentalCoroutinesApi
@Deprecated(message = "Use toFlow instead", replaceWith = ReplaceWith("toFlow()"))
fun <T> ApolloCall<T>.toChannel(capacity: Int = Channel.UNLIMITED): Channel<Response<T>> {
checkCapacity(capacity)
val channel = Channel<Response<T>>(capacity)

channel.invokeOnClose {
cancel()
}
enqueue(ChannelCallback(channel))

return channel
}

/**
* Converts an [ApolloCall] to an [Deferred]. This is a convenience method that will only return the first value emitted.
Expand Down Expand Up @@ -152,70 +105,6 @@ fun <T> ApolloCall<T>.toDeferred(): Deferred<Response<T>> {
return deferred
}

/**
* Converts an [ApolloQueryWatcher] to an [Channel].
*
* @param <T> the value type.
* @param capacity used for the underlying channel. Only [Channel.UNLIMITED] and [Channel.CONFLATED] are supported at the moment
* @throws IllegalArgumentException if capacity is not [Channel.UNLIMITED] or [Channel.CONFLATED]
* @return a channel which emits [Responses<T>]
*/
@ExperimentalCoroutinesApi
@Deprecated(message = "Use toFlow instead", replaceWith = ReplaceWith("toFlow()"))
fun <T> ApolloQueryWatcher<T>.toChannel(capacity: Int = Channel.UNLIMITED): Channel<Response<T>> {
checkCapacity(capacity)
val channel = Channel<Response<T>>(capacity)
channel.invokeOnClose {
cancel()
}
enqueueAndWatch(ChannelCallback(channel))

return channel
}

/**
* Converts an [ApolloSubscriptionCall] to an [Channel].
*
* @param <T> the value type.
* @param capacity the {@link Capacity} used for the underlying channel. Only [Channel.UNLIMITED] and [Channel.CONFLATED] are supported
* at the moment
* @throws IllegalArgumentException if capacity is not [Channel.UNLIMITED] or [Channel.CONFLATED]
* @return a channel which emits [Responses<T>]
*/
@ExperimentalCoroutinesApi
@Deprecated(message = "Use toFlow instead", replaceWith = ReplaceWith("toFlow()"))
fun <T> ApolloSubscriptionCall<T>.toChannel(capacity: Int = Channel.UNLIMITED): Channel<Response<T>> {
checkCapacity(capacity)
val channel = Channel<Response<T>>(capacity)
channel.invokeOnClose {
cancel()
}
execute(object : ApolloSubscriptionCall.Callback<T> {
override fun onConnected() {
}

override fun onResponse(response: Response<T>) {
runCatching {
channel.offer(response)
}
}

override fun onFailure(e: ApolloException) {
channel.close(e)
}

override fun onCompleted() {
channel.close()
}

override fun onTerminated() {
channel.close()
}
})

return channel
}

/**
* Converts an [ApolloSubscriptionCall] to an [Flow].
*
Expand All @@ -230,7 +119,9 @@ fun <T> ApolloSubscriptionCall<T>.toFlow(): Flow<Response<T>> = callbackFlow {
}

override fun onResponse(response: Response<T>) {
channel.offer(response)
runCatching {
channel.offer(response)
}
}

override fun onFailure(e: ApolloException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import com.apollographql.apollo.api.Input
import com.apollographql.apollo.api.Response
import com.apollographql.apollo.cache.normalized.lru.EvictionPolicy
import com.apollographql.apollo.cache.normalized.lru.LruNormalizedCacheFactory
import com.apollographql.apollo.coroutines.toChannel
import com.apollographql.apollo.coroutines.toDeferred
import com.apollographql.apollo.coroutines.toFlow
import com.apollographql.apollo.coroutines.toJob
Expand Down Expand Up @@ -51,18 +50,6 @@ class CoroutinesApolloTest {
.build()
}

@Test
fun callChannelProducesValue() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).toChannel()
runBlocking {
val response = channel.receive()

assertThat(response.data!!.hero()!!.name()).isEqualTo("R2-D2")
}
}

@Test
fun callDeferredProducesValue() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))
Expand All @@ -73,22 +60,6 @@ class CoroutinesApolloTest {
}
}

@Test
fun errorIsTriggered() {
server.enqueue(MockResponse().setResponseCode(200).setBody("nonsense"))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).toChannel()
runBlocking {
var exception: java.lang.Exception? = null
try {
channel.receive()
} catch (e: Exception) {
exception = e
}
assertThat(exception is ApolloParseException).isTrue()
}
}

@Test
fun prefetchCompletes() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))
Expand All @@ -111,91 +82,6 @@ class CoroutinesApolloTest {
}
}

@Test
fun queryWatcherUpdatedSameQueryDifferentResults() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel()

server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_CHANGE))
apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE)))
.responseFetcher(ApolloResponseFetchers.NETWORK_ONLY)
.enqueue(null)

runBlocking {
val response0 = channel.receive()
assertThat(response0.data!!.hero()!!.name()).isEqualTo("R2-D2")

val response1 = channel.receive()
assertThat(response1.data!!.hero()!!.name()).isEqualTo("Artoo")
}
}

@Test
fun queryWatcherNotUpdatedSameQuerySameResults() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel()

server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))
apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE)))
.responseFetcher(ApolloResponseFetchers.NETWORK_ONLY)
.enqueue(null)

runBlocking {
val response0 = channel.receive()
assertThat(response0.data!!.hero()!!.name()).isEqualTo("R2-D2")

assertThat(channel.isEmpty).isEqualTo(true)
}
}

@Test
fun queryWatcherUpdatedDifferentQueryDifferentResults() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel()

server.enqueue(mockResponse("HeroAndFriendsNameWithIdsNameChange.json"))
apolloClient.query(HeroAndFriendsNamesWithIDsQuery(Input.fromNullable(Episode.NEWHOPE)))
.enqueue(null)

runBlocking {
val response0 = channel.receive()
assertThat(response0.data!!.hero()!!.name()).isEqualTo("R2-D2")

val response1 = channel.receive()
assertThat(response1.data!!.hero()!!.name()).isEqualTo("Artoo")
}
}

@Test
fun queryWatcherUpdatedConflatedOnlyReturnsLastResult() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel(Channel.CONFLATED)

server.enqueue(mockResponse("HeroAndFriendsNameWithIdsNameChange.json"))
apolloClient.query(HeroAndFriendsNamesWithIDsQuery(Input.fromNullable(Episode.NEWHOPE)))
.enqueue(null)

runBlocking {
delay(500)
val response1 = channel.receive()
assertThat(response1.data!!.hero()!!.name()).isEqualTo("Artoo")
}
}

@Test
fun queryWatcherCancelClosesChannel() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel()

channel.cancel()
assertThat(channel.isClosedForReceive).isEqualTo(true)
}

@Test
fun flowCanBeRead() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))
Expand Down

0 comments on commit 7e4dca7

Please sign in to comment.