Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSockets: better separate common code and protocol-specific code #3405

Merged
merged 5 commits into from
Oct 13, 2021

Conversation

martinbonnin
Copy link
Contributor

Follow up from #3401

WebSocketNetworkTransport now takes a WsProtocol.Factory instead of a WsProtocol.

WebSocketNetworkTransport handles the opening/closing of the websocket and will create a new WsProtocol for each new websocket. The WsProtocol is responsible for the actual sending/receiving messages:

  • receives commands through WsProtocol.startOperation and WsProtocol.stopOperation
  • notifies WebSocketNetworkTransport with listener.operationResponse,listener.operationError,listener.operationComplete and listener.networkError

Flow control:

This change makes all buffers UNLIMITED so as to avoid any deadlock. This means that there is no flow control. If the client writes messages faster than what the server can handle, OkHttp will throw when trying to write more than 16MB and native will most likely OOM. I think it's OK as messages are very small and if that ever happens, it'll be the manifestation of another bug somewhere else that needs to be fixed.

Graphql-ws "ping":

Because "ping" is a graphql-ws specific message, sending "ping" messages has been moved to GraphQLWsProtocol. It is now controlled with pingIntervalMillis (disabled by default):

    val apolloClient = ApolloClient(
        networkTransport = WebSocketNetworkTransport(
            serverUrl = "http://localhost:9090/graphql",
            protocolFactory = GraphQLWsProtocol.Factory(
                pingIntervalMillis = 60_000
            ),
        )
    )

@dchappelle let me know how that works for you. Also, I'm curious what server you're using. I've been trying to test with https://github.com/martinbonnin/graphql-ws-server and the server responds to any "ping" message with

WebSocket Closed code='4400' reason='Invalid message received'

@martinbonnin martinbonnin changed the title WebScokets: better separate common code and protocol-specific code WebSockets: better separate common code and protocol-specific code Oct 12, 2021
override fun parseMessage(message: String, webSocketConnection: WebSocketConnection): WsServerMessage {
val map = AnyAdapter.fromJson(BufferedSourceJsonReader(Buffer().writeUtf8(message))) as Map<String, Any?>
/**
* A factory that for [WsProtocol]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* A factory that for [WsProtocol]
* A factory for [WsProtocol]

@dchappelle
Copy link
Contributor

dchappelle commented Oct 12, 2021

@martinbonnin I'm testing the changes but cannot establish a websocket connection now... "Connection initialization timeout"

error = {ApolloNetworkException@18872} "com.apollographql.apollo3.exception.ApolloNetworkException: Network error while executing RegisterEndpoint"
 platformCause = {ApolloWebSocketClosedException@18880} "com.apollographql.apollo3.exception.ApolloWebSocketClosedException: WebSocket Closed code='4408' reason='Connection initialisation timeout.'"
 backtrace = {Object[20]@18881} 
 cause = {ApolloWebSocketClosedException@18880} "com.apollographql.apollo3.exception.ApolloWebSocketClosedException: WebSocket Closed code='4408' reason='Connection initialisation timeout.'"
 detailMessage = "Network error while executing RegisterEndpoint"
 stackTrace = {StackTraceElement[19]@18886} 
  0 = {StackTraceElement@18914} "com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$map$1$2.emit(Collect.kt:145)"
  1 = {StackTraceElement@18915} "kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)"
  2 = {StackTraceElement@18916} "kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)"
  3 = {StackTraceElement@18917} "kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:77)"
  4 = {StackTraceElement@18918} "kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)"
  5 = {StackTraceElement@18919} "com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invokeSuspend(WebSocketNetworkTransport.kt:172)"
  6 = {StackTraceElement@18920} "com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke(Unknown Source:11)"
  7 = {StackTraceElement@18921} "com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke(Unknown Source:6)"
  8 = {StackTraceElement@18922} "kotlinx.coroutines.flow.FlowKt__LimitKt$transformWhile$1$invokeSuspend$$inlined$collectWhile$1.emit(Limit.kt:144)"
  9 = {StackTraceElement@18923} "com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$filter$1$2.emit(Collect.kt:137)"
  10 = {StackTraceElement@18924} "kotlinx.coroutines.flow.SubscribedFlowCollector.emit(Unknown Source:2)"
  11 = {StackTraceElement@18925} "kotlinx.coroutines.flow.SharedFlowImpl.collect(SharedFlow.kt:351)"
  12 = {StackTraceElement@18926} "kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(Unknown Source:15)"
  13 = {StackTraceElement@18927} "kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)"
  14 = {StackTraceElement@18928} "kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)"
  15 = {StackTraceElement@18929} "kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)"
  16 = {StackTraceElement@18930} "kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)"
  17 = {StackTraceElement@18931} "kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)"
  18 = {StackTraceElement@18932} "kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)"
 suppressedExceptions = {Collections$EmptyList@18884}  size = 0
 shadow$_klass_ = {Class@18631} "class com.apollographql.apollo3.exception.ApolloNetworkException"
 shadow$_monitor_ = 0

I noticed a couple things..

  • connectionAcknowledgeTimeoutMs defaults to -1 if a value is not provided, probably should default to 10_000
  • The client no longer knows if the pong was not received

@martinbonnin
Copy link
Contributor Author

@dchappelle

WebSocket Closed code='4408' reason='Connection initialisation timeout.

Damn, this means that connection_init is not received which is super weird. Can you enable logs in OkHttpWebSocketEngine to see if it's sent correctly?

connectionAcknowledgeTimeoutMs defaults to -1 if a value is not provided, probably should default to 10_000

Indeed, fixed.

The client no longer knows if the pong was not received

Is that important ultimately? What would you do if the pong is not received?

@dchappelle
Copy link
Contributor

@martinbonnin the connection issue is because the message was being sent out as binary instead of string... but I don't see WsFrameType.TEXT anymore?

@martinbonnin
Copy link
Contributor Author

Aaaah got it, thanks! I'll make it configurable again.

@martinbonnin
Copy link
Contributor Author

Alright, it should be configurable again, sorry about that. I took this opportunity to default to "Text" as it's the format of the server messages using https://github.com/martinbonnin/graphql-ws-server so might as well do things symmetrically by default.

Let me know about what to do with "pong" failures. If required, we can tunnel an error to the subscriptions.

@dchappelle
Copy link
Contributor

Is that important ultimately? What would you do if the pong is not received?

Currently, we don't do anything if the Pong is not received from the server, nor do we send any payload. However, this could change if we want to report the backend is not responding or we want to send/receive latency metrics in the payload... which could be different for each Ping/Pong.

@martinbonnin
Copy link
Contributor Author

I see. Thanks for providing details, that helps a ton 👍

What I'm leaning to is that for more advanced use cases, you could implement WsProtocol and have full control of the websocketConnection from there. We could think of adding helper functions so that the implementation would be pretty small. E.g something like:

class CustomWsProtocol(
    webSocketConnection: WebSocketConnection,
    listener: Listener,
) : WsProtocol(webSocketConnection, listener) {

  override suspend fun connectionInit() {
    GraphQLWsProtocol.connectionInit(connectionPayload, timeout, webSocketConnection)
  }

  override suspend fun startOperation() {
    GraphQLWsProtocol.startOperation(request, webSocketConnection)
  }

  override suspend fun stopOperation() {
    GraphQLWsProtocol.stopOperation(request, webSocketConnection)
  }
  
  override fun run(scope: CoroutineScope) {
    scope.launch {
      // Send ping here
    }
    super.run(scope)
  }

  override fun handleServerMessage(messageMap: Map<String, Any?>) {
    GraphQLWsProtocol.handleServerMessage(messageMap)
    
    // Handle pong here
  }

Ultimately, it gives full control over the websocket without adding tons of constructor parameters to GraphQLWsProtocol. The alternative is opening the GraphQLWsProtocol class so that you could inherit and re-implement the run method but that feels harder to maintain in the long run.

@dchappelle
Copy link
Contributor

Everything is working for me with your latest changes, Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants