Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap more channel.offer calls #2058

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,6 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.*

private class ChannelCallback<T>(val channel: Channel<Response<T>>) : ApolloCall.Callback<T>() {

override fun onResponse(response: Response<T>) {
runCatching {
channel.offer(response)
}
}

override fun onFailure(e: ApolloException) {
channel.close(e)
}

override fun onStatusEvent(event: ApolloCall.StatusEvent) {
if (event == ApolloCall.StatusEvent.COMPLETED) {
channel.close()
}
}
}

private fun checkCapacity(capacity: Int) {
when (capacity) {
Channel.UNLIMITED,
Channel.CONFLATED -> return
else ->
// Everything else than UNLIMITED or CONFLATED does not guarantee that channel.offer() succeeds all the time.
// We don't support these use cases for now
throw IllegalArgumentException("Bad channel capacity ($capacity). Only UNLIMITED and CONFLATED are supported")
}
}

/**
* Converts an [ApolloCall] to an [Flow].
Expand All @@ -52,7 +23,9 @@ fun <T> ApolloCall<T>.toFlow() = callbackFlow {
clone().enqueue(
object : ApolloCall.Callback<T>() {
override fun onResponse(response: Response<T>) {
offer(response)
runCatching {
offer(response)
}
}

override fun onFailure(e: ApolloException) {
Expand Down Expand Up @@ -80,7 +53,9 @@ fun <T> ApolloQueryWatcher<T>.toFlow() = callbackFlow {
clone().enqueueAndWatch(
object : ApolloCall.Callback<T>() {
override fun onResponse(response: Response<T>) {
offer(response)
runCatching {
offer(response)
}
}

override fun onFailure(e: ApolloException) {
Expand All @@ -97,28 +72,6 @@ fun <T> ApolloQueryWatcher<T>.toFlow() = callbackFlow {
awaitClose { [email protected]() }
}

/**
* Converts an [ApolloCall] to an [Channel]. The number of values produced by the channel is based on the
* [com.apollographql.apollo.fetcher.ResponseFetcher] used with the call.
*
* @param <T> the value type.
* @param capacity used for the underlying channel. Only [Channel.UNLIMITED] and [Channel.CONFLATED] are supported at the moment
* @throws IllegalArgumentException if capacity is not [Channel.UNLIMITED] or [Channel.CONFLATED]
* @return a channel which emits [Responses<T>]
*/
@ExperimentalCoroutinesApi
@Deprecated(message = "Use toFlow instead", replaceWith = ReplaceWith("toFlow()"))
fun <T> ApolloCall<T>.toChannel(capacity: Int = Channel.UNLIMITED): Channel<Response<T>> {
checkCapacity(capacity)
val channel = Channel<Response<T>>(capacity)

channel.invokeOnClose {
cancel()
}
enqueue(ChannelCallback(channel))

return channel
}

/**
* Converts an [ApolloCall] to an [Deferred]. This is a convenience method that will only return the first value emitted.
Expand Down Expand Up @@ -152,70 +105,6 @@ fun <T> ApolloCall<T>.toDeferred(): Deferred<Response<T>> {
return deferred
}

/**
* Converts an [ApolloQueryWatcher] to an [Channel].
*
* @param <T> the value type.
* @param capacity used for the underlying channel. Only [Channel.UNLIMITED] and [Channel.CONFLATED] are supported at the moment
* @throws IllegalArgumentException if capacity is not [Channel.UNLIMITED] or [Channel.CONFLATED]
* @return a channel which emits [Responses<T>]
*/
@ExperimentalCoroutinesApi
@Deprecated(message = "Use toFlow instead", replaceWith = ReplaceWith("toFlow()"))
fun <T> ApolloQueryWatcher<T>.toChannel(capacity: Int = Channel.UNLIMITED): Channel<Response<T>> {
checkCapacity(capacity)
val channel = Channel<Response<T>>(capacity)
channel.invokeOnClose {
cancel()
}
enqueueAndWatch(ChannelCallback(channel))

return channel
}

/**
* Converts an [ApolloSubscriptionCall] to an [Channel].
*
* @param <T> the value type.
* @param capacity the {@link Capacity} used for the underlying channel. Only [Channel.UNLIMITED] and [Channel.CONFLATED] are supported
* at the moment
* @throws IllegalArgumentException if capacity is not [Channel.UNLIMITED] or [Channel.CONFLATED]
* @return a channel which emits [Responses<T>]
*/
@ExperimentalCoroutinesApi
@Deprecated(message = "Use toFlow instead", replaceWith = ReplaceWith("toFlow()"))
fun <T> ApolloSubscriptionCall<T>.toChannel(capacity: Int = Channel.UNLIMITED): Channel<Response<T>> {
checkCapacity(capacity)
val channel = Channel<Response<T>>(capacity)
channel.invokeOnClose {
cancel()
}
execute(object : ApolloSubscriptionCall.Callback<T> {
override fun onConnected() {
}

override fun onResponse(response: Response<T>) {
runCatching {
channel.offer(response)
}
}

override fun onFailure(e: ApolloException) {
channel.close(e)
}

override fun onCompleted() {
channel.close()
}

override fun onTerminated() {
channel.close()
}
})

return channel
}

/**
* Converts an [ApolloSubscriptionCall] to an [Flow].
*
Expand All @@ -230,7 +119,9 @@ fun <T> ApolloSubscriptionCall<T>.toFlow(): Flow<Response<T>> = callbackFlow {
}

override fun onResponse(response: Response<T>) {
channel.offer(response)
runCatching {
channel.offer(response)
}
}

override fun onFailure(e: ApolloException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import com.apollographql.apollo.api.Input
import com.apollographql.apollo.api.Response
import com.apollographql.apollo.cache.normalized.lru.EvictionPolicy
import com.apollographql.apollo.cache.normalized.lru.LruNormalizedCacheFactory
import com.apollographql.apollo.coroutines.toChannel
import com.apollographql.apollo.coroutines.toDeferred
import com.apollographql.apollo.coroutines.toFlow
import com.apollographql.apollo.coroutines.toJob
Expand Down Expand Up @@ -51,18 +50,6 @@ class CoroutinesApolloTest {
.build()
}

@Test
fun callChannelProducesValue() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).toChannel()
runBlocking {
val response = channel.receive()

assertThat(response.data!!.hero()!!.name()).isEqualTo("R2-D2")
}
}

@Test
fun callDeferredProducesValue() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))
Expand All @@ -73,22 +60,6 @@ class CoroutinesApolloTest {
}
}

@Test
fun errorIsTriggered() {
server.enqueue(MockResponse().setResponseCode(200).setBody("nonsense"))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).toChannel()
runBlocking {
var exception: java.lang.Exception? = null
try {
channel.receive()
} catch (e: Exception) {
exception = e
}
assertThat(exception is ApolloParseException).isTrue()
}
}

@Test
fun prefetchCompletes() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))
Expand All @@ -111,91 +82,6 @@ class CoroutinesApolloTest {
}
}

@Test
fun queryWatcherUpdatedSameQueryDifferentResults() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel()

server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_CHANGE))
apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE)))
.responseFetcher(ApolloResponseFetchers.NETWORK_ONLY)
.enqueue(null)

runBlocking {
val response0 = channel.receive()
assertThat(response0.data!!.hero()!!.name()).isEqualTo("R2-D2")

val response1 = channel.receive()
assertThat(response1.data!!.hero()!!.name()).isEqualTo("Artoo")
}
}

@Test
fun queryWatcherNotUpdatedSameQuerySameResults() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel()

server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))
apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE)))
.responseFetcher(ApolloResponseFetchers.NETWORK_ONLY)
.enqueue(null)

runBlocking {
val response0 = channel.receive()
assertThat(response0.data!!.hero()!!.name()).isEqualTo("R2-D2")

assertThat(channel.isEmpty).isEqualTo(true)
}
}

@Test
fun queryWatcherUpdatedDifferentQueryDifferentResults() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel()

server.enqueue(mockResponse("HeroAndFriendsNameWithIdsNameChange.json"))
apolloClient.query(HeroAndFriendsNamesWithIDsQuery(Input.fromNullable(Episode.NEWHOPE)))
.enqueue(null)

runBlocking {
val response0 = channel.receive()
assertThat(response0.data!!.hero()!!.name()).isEqualTo("R2-D2")

val response1 = channel.receive()
assertThat(response1.data!!.hero()!!.name()).isEqualTo("Artoo")
}
}

@Test
fun queryWatcherUpdatedConflatedOnlyReturnsLastResult() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel(Channel.CONFLATED)

server.enqueue(mockResponse("HeroAndFriendsNameWithIdsNameChange.json"))
apolloClient.query(HeroAndFriendsNamesWithIDsQuery(Input.fromNullable(Episode.NEWHOPE)))
.enqueue(null)

runBlocking {
delay(500)
val response1 = channel.receive()
assertThat(response1.data!!.hero()!!.name()).isEqualTo("Artoo")
}
}

@Test
fun queryWatcherCancelClosesChannel() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))

val channel = apolloClient.query(EpisodeHeroNameQuery(Input.fromNullable(Episode.EMPIRE))).watcher().toChannel()

channel.cancel()
assertThat(channel.isClosedForReceive).isEqualTo(true)
}

@Test
fun flowCanBeRead() {
server.enqueue(mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID))
Expand Down