Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ApolloWebSocketClosedException on darwin targets and update docs #6275

Merged
merged 3 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 84 additions & 2 deletions docs/source/advanced/experimental-websockets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,94 @@ The `com.apollographql.apollo.network.websocket` implementation provides the fol

## Status

While they are `@ApolloExperimental`, we believe the new `.websocket` APIS to be more robust than the non-experimental `.ws` ones. They are safe to use in non-lib use cases.
`.websocket` APIS to be more robust than the non-experimental `.ws` ones, especially in scenarios involving retries/connection errors. They are safe to use in non-lib use cases.
martinbonnin marked this conversation as resolved.
Show resolved Hide resolved

The "experimental" tag is to account for required API breaking changes based on community feedback. Ideally no change will be needed.
The `@ApolloExperimental` annotation accounts for required API breaking changes based on community feedback. Ideally no change will be needed.

After a feedback phase, the current `.ws` APIs will become deprecated and the `.websocket` one promoted to stable by removing the `@ApolloExperimental` annotations.

## Handling errors


### Changing the WebSocket HTTP header on Error

The HTTP headers of `ApolloCall` are honored and different WebSockets are create for different header values. This means you can use an interceptor to change the header value on error:
martinbonnin marked this conversation as resolved.
Show resolved Hide resolved

```kotlin
private class UpdateAuthorizationHeaderInterceptor : ApolloInterceptor {
@OptIn(ExperimentalCoroutinesApi::class)
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
return flow {
// Retrieve a new access token every time
val request = request.newBuilder()
.addHttpHeader("Authorization", "Bearer ${accessToken()}")
.build()

emitAll(chain.proceed(request))
}
}
}
```

### Changing the connectionPayload on Error

The connection payload is `WsProtocol` specific and you can pass a lambda to your `WsProtocol` constructor that is evaluated every time a WebSocket needs to be created.

```kotlin
ApolloClient.Builder()
.httpServerUrl(mockServer.url())
.subscriptionNetworkTransport(
WebSocketNetworkTransport.Builder()
.serverUrl(mockServer.url())
.wsProtocol(GraphQLWsProtocol(
connectionPayload = {
getFreshConnectionPayload()
}
))
.build()
)
.build()
```

### retryOnErrorInterceptor

By default, `ApolloClient` does not retry. You can override that behaviour with `retryOnErrorInterceptor`. You can combine that interceptor with the [`UpdateAuthorizationHeaderInterceptor`](#changing-the-websocket-http-header-on-error):

```kotlin
private object RetryException : Exception()

private class RetryOnErrorInterceptor : ApolloInterceptor {
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
var attempt = 0
return flow {
val request = request.newBuilder()
.addHttpHeader("Authorization", "Bearer ${accessToken()}")
.build()

emitAll(chain.proceed(request))
}.onEach {
when (val exception = it.exception) {
is ApolloWebSocketClosedException -> {
if (exception.code == 1002 && exception.reason == "unauthorized") {
attempt = 0 // retry immediately
throw RetryException
}
}
is ApolloNetworkException -> {
// Retry all network exceptions
throw RetryException
}
else -> {
// Terminate the subscription
}
}
}.retryWhen { cause, _ ->
cause is RetryException
}
}
}
```

## Migration guide

### Package name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package com.apollographql.apollo.network.websocket

import com.apollographql.apollo.api.http.HttpHeader
import com.apollographql.apollo.exception.ApolloException
import com.apollographql.apollo.exception.ApolloNetworkException
import com.apollographql.apollo.exception.ApolloWebSocketClosedException
import com.apollographql.apollo.exception.DefaultApolloException
import com.apollographql.apollo.network.toNSData
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.locks.reentrantLock
import kotlinx.atomicfu.locks.withLock
import kotlinx.cinterop.convert
import okio.ByteString.Companion.toByteString
import okio.internal.commonToUtf8String
import platform.Foundation.NSData
import platform.Foundation.NSMutableURLRequest
import platform.Foundation.NSOperationQueue
Expand All @@ -23,6 +27,7 @@ import platform.Foundation.NSURLSessionWebSocketTask
import platform.Foundation.setHTTPMethod
import platform.Foundation.setValue
import platform.darwin.NSObject
import platform.posix.ENOTCONN

internal class AppleWebSocketEngine : WebSocketEngine {
private val delegate = Delegate()
Expand Down Expand Up @@ -126,7 +131,13 @@ internal class AppleWebSocket(
nsurlSessionWebSocketTask.receiveMessageWithCompletionHandler { message, nsError ->
if (nsError != null) {
if (disposed.compareAndSet(expect = false, update = true)) {
listener.onError(DefaultApolloException("Error reading websocket: ${nsError.localizedDescription}"))
val exception = if (nsError.domain == "NSPOSIXErrorDomain" && nsError.code.toLong() == ENOTCONN.toLong()) {
ApolloWebSocketClosedException(nsurlSessionWebSocketTask.closeCode.convert(), nsurlSessionWebSocketTask.closeReason?.toByteString()
?.toByteArray()?.commonToUtf8String())
} else {
ApolloNetworkException("Error reading websocket: ${nsError.localizedDescription}", platformCause = nsError)
}
listener.onError(exception)
}
} else if (message != null) {
when (message.type) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package test

import com.apollographql.apollo.annotations.ApolloDeprecatedSince
import com.apollographql.apollo.annotations.ApolloExperimental
import com.apollographql.apollo.api.Adapter
import com.apollographql.apollo.api.CompiledField
import com.apollographql.apollo.api.CustomScalarAdapters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package test.network

import app.cash.turbine.test
import com.apollographql.apollo.ApolloClient
import com.apollographql.apollo.annotations.ApolloExperimental
import com.apollographql.apollo.api.ApolloRequest
import com.apollographql.apollo.api.ApolloResponse
import com.apollographql.apollo.api.Operation
Expand All @@ -13,13 +12,10 @@ import com.apollographql.apollo.exception.DefaultApolloException
import com.apollographql.apollo.exception.SubscriptionOperationException
import com.apollographql.apollo.interceptor.ApolloInterceptor
import com.apollographql.apollo.interceptor.ApolloInterceptorChain
import com.apollographql.apollo.interceptor.RetryOnErrorInterceptor
import com.apollographql.apollo.network.websocket.GraphQLWsProtocol
import com.apollographql.apollo.network.websocket.WebSocketNetworkTransport
import com.apollographql.apollo.network.websocket.closeConnection
import test.FooSubscription
import test.FooSubscription.Companion.completeMessage
import test.FooSubscription.Companion.errorMessage
import test.FooSubscription.Companion.nextMessage
import test.network.connectionAckMessage
import com.apollographql.apollo.testing.internal.runTest
import com.apollographql.mockserver.CloseFrame
import com.apollographql.mockserver.MockServer
Expand All @@ -33,12 +29,16 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.retryWhen
import okio.use
import test.FooSubscription
import test.FooSubscription.Companion.completeMessage
import test.FooSubscription.Companion.errorMessage
import test.FooSubscription.Companion.nextMessage
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
Expand Down Expand Up @@ -228,19 +228,9 @@ class WebSocketNetworkTransportTest {
webSocketBody.enqueueMessage(CloseFrame(3666, "closed"))

awaitItem().exception.apply {
@Suppress("DEPRECATION")
when (com.apollographql.apollo.testing.platform()) {
com.apollographql.apollo.testing.Platform.Native -> {
assertIs<DefaultApolloException>(this)
assertTrue(message?.contains("Error reading websocket") == true)
}

else -> {
assertIs<ApolloWebSocketClosedException>(this)
assertEquals(3666, code)
assertEquals("closed", reason)
}
}
assertIs<ApolloWebSocketClosedException>(this)
assertEquals(3666, code)
assertEquals("closed", reason)
}

awaitComplete()
Expand Down Expand Up @@ -291,19 +281,9 @@ class WebSocketNetworkTransportTest {
awaitItem()
serverWriter.enqueueMessage(CloseFrame(1001, "flowThrowsIfNoReconnect"))
awaitItem().exception.apply {
@Suppress("DEPRECATION")
when (com.apollographql.apollo.testing.platform()) {
com.apollographql.apollo.testing.Platform.Native -> {
assertIs<DefaultApolloException>(this)
assertTrue(message?.contains("Error reading websocket") == true)
}

else -> {
assertIs<ApolloWebSocketClosedException>(this)
assertEquals(1001, code)
assertEquals("flowThrowsIfNoReconnect", reason)
}
}
assertIs<ApolloWebSocketClosedException>(this)
assertEquals(1001, code)
assertEquals("flowThrowsIfNoReconnect", reason)
}
awaitComplete()
}
Expand Down Expand Up @@ -376,16 +356,24 @@ class WebSocketNetworkTransportTest {
}

@Test
fun canChangeHeadersInInterceptor() = runTest {
fun canChangeHeadersAndConnectionPayloadOnError() = runTest {
MockServer().use { mockServer ->
var connectionPayload = "init0"
ApolloClient.Builder()
.httpServerUrl(mockServer.url())
.subscriptionNetworkTransport(
WebSocketNetworkTransport.Builder()
.serverUrl(mockServer.url())
.wsProtocol(GraphQLWsProtocol(
connectionPayload = {
connectionPayload.also {
connectionPayload = "init1"
}
}
))
.build()
)
.addInterceptor(MyInterceptor())
.retryOnErrorInterceptor(UpdateAuthorizationHeaderOnError())
.build()
.use { apolloClient ->
val serverWriter0 = mockServer.enqueueWebSocket()
Expand All @@ -394,7 +382,11 @@ class WebSocketNetworkTransportTest {
.test(timeout = 300.seconds) {
val serverReader0 = mockServer.awaitWebSocketRequest()
assertEquals("0", serverReader0.headers.headerValueOf("authorization"))
serverReader0.awaitMessage()
serverReader0.awaitMessage().apply {
assertIs<TextMessage>(this)
assertEquals("{\"type\":\"connection_init\",\"payload\":\"init0\"}", this.text)
}

serverWriter0.enqueueMessage(connectionAckMessage())
val id0 = serverReader0.awaitSubscribe()
serverWriter0.enqueueMessage(nextMessage(id0, 0))
Expand All @@ -407,11 +399,14 @@ class WebSocketNetworkTransportTest {
val serverWriter1 = mockServer.enqueueWebSocket()

// Send an error to the interceptor, should retry the chain under the hood
serverWriter0.enqueueMessage(nextMessage(id0, "unauthorized"))
serverWriter0.enqueueMessage(CloseFrame(1002, "unauthorized"))

val serverReader1 = mockServer.awaitWebSocketRequest()
assertEquals("1", serverReader1.headers.headerValueOf("authorization"))
serverReader1.awaitMessage()
serverReader1.awaitMessage().apply {
assertIs<TextMessage>(this)
assertEquals("{\"type\":\"connection_init\",\"payload\":\"init1\"}", this.text)
}
serverWriter1.enqueueMessage(connectionAckMessage())
val id1 = serverReader1.awaitSubscribe()
serverWriter1.enqueueMessage(nextMessage(id1, 1))
Expand Down Expand Up @@ -459,23 +454,28 @@ class WebSocketNetworkTransportTest {

private object RetryException : Exception()

private class MyInterceptor : ApolloInterceptor {
private class UpdateAuthorizationHeaderOnError : ApolloInterceptor {
@OptIn(ExperimentalCoroutinesApi::class)
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
var counter = -1

return flow {
counter++
emit(
request.newBuilder()
val request = request.newBuilder()
.addHttpHeader("Authorization", counter.toString())
.build()
)
}.flatMapConcat {
chain.proceed(it)

emitAll(chain.proceed(request))
}.onEach {
if (it.errors.orEmpty().isNotEmpty()) {
throw RetryException
when (val exception = it.exception) {
is ApolloWebSocketClosedException -> {
if (exception.code == 1002 && exception.reason == "unauthorized") {
throw RetryException
}
}
else -> {
// Terminate the subscription if the exception is terminal or retry
}
}
}.retryWhen { cause, _ ->
cause is RetryException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import test.network.connectionAckMessage
import com.apollographql.apollo.testing.internal.runTest
import com.apollographql.mockserver.MockResponse
import com.apollographql.mockserver.MockServer
import com.apollographql.mockserver.assertNoRequest
import com.apollographql.mockserver.awaitWebSocketRequest
import com.apollographql.mockserver.enqueueWebSocket
import kotlinx.coroutines.CoroutineScope
Expand All @@ -22,7 +21,6 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.retryWhen
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.launch
Expand Down
Loading