Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GraphQL TypeSafe Client in Quarkus Smallrye: Retaining WebSocket Connection Failure in Multi, preventing server reconnection on retry #38595

Open
ITrium-Salah opened this issue Feb 5, 2024 · 9 comments
Labels

Comments

@ITrium-Salah
Copy link

ITrium-Salah commented Feb 5, 2024

Describe the bug

When using the Quarkus Smallry TypeSafe GraphQL client
io.quarkus.quarkus-smallrye-graphql-client
a caching issue is observed in the handling of WebSocket connection failures within the Multi. Specifically, the client retains the failure of the WebSocket connection loss, and during subsequent retries in the Multi, the same failure is reissued in the stream. This behavior prevents the client from attempting to reconnect to the server.

Expected behavior

The client should recognize the WebSocket connection loss, attempt to reconnect, and not cache the previous connection failure during retries in the Multi. The emitter of failure should be invalidated at retry.

Actual behavior

  1. Start the GraphQL server with a souscription endpoint
  2. Start the Graphql Client with a souscription
  3. Souscription OK, message received and logged
  4. Stop the Graphql server
  5. Failure on the graphql client =>OK
  6. Trying to subscribe to the Multi => logging the same failure instance again and again Exception instance:java.net.SocketException@2006a90c
  7. Restart the Graphql Server
  8. Trying to subscribe to the Multi => logging the same failure instance again and again Exception instance:java.net.SocketException@2006a90c
  9. Never reconnect
2024-02-05 23:01:12,018 ERROR [MASKED] (vert.x-eventloop-thread-3) item:Message 134
2024-02-05 23:01:17,007 ERROR [MASKED] (vert.x-eventloop-thread-3) Item received:Message 135
2024-02-05 23:01:17,007 ERROR [MASKED] (vert.x-eventloop-thread-3) item:Message 135
2024-02-05 23:01:19,419 ERROR [io.ver.cor.net.imp.ConnectionBase] (vert.x-eventloop-thread-3) Connection reset
2024-02-05 23:01:19,421 ERROR [MASKED] (vert.x-eventloop-thread-3) Erreur: java.net.SocketException: Connection reset


2024-02-05 23:01:19,428 ERROR [MASKED] (vert.x-eventloop-thread-3) Exception instance:java.net.SocketException@2006a90c
2024-02-05 23:01:24,458 ERROR [MASKED] (vert.x-eventloop-thread-3) Erreur: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8081
Caused by: java.net.ConnectException: Connection refused: no further information


2024-02-05 23:01:24,459 ERROR [MASKED] (vert.x-eventloop-thread-3) Exception instance:io.netty.channel.AbstractChannel$AnnotatedConnectException@15462526
2024-02-05 23:01:29,461 ERROR [MASKED] (executor-thread-1) Erreur: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8081
Caused by: java.net.ConnectException: Connection refused: no further information


2024-02-05 23:01:29,462 ERROR [MASKED] (executor-thread-1) Exception instance:io.netty.channel.AbstractChannel$AnnotatedConnectException@15462526
2024-02-05 23:01:34,465 ERROR [MASKED] (executor-thread-1) Erreur: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8081
Caused by: java.net.ConnectException: Connection refused: no further information


2024-02-05 23:01:34,466 ERROR [MASKED] (executor-thread-1) Exception instance:io.netty.channel.AbstractChannel$AnnotatedConnectException@15462526
2024-02-05 23:01:39,468 ERROR [MASKED] (executor-thread-1) Erreur: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8081
Caused by: java.net.ConnectException: Connection refused: no further information```



### How to Reproduce?

1. Server part

      @GraphQLApi
      @Slf4j
      public class TestSubscription {
          BroadcastProcessor<String> processor = BroadcastProcessor.create();
      
          int i=0;
          @Scheduled(every = "5s")
          public void test()
          {
              processor.onNext(String.format("Message %s",++i));
          }
      
          @Subscription
          public Multi<String> testSouscription() {
              return processor;
          }
      }
  
2. Client part

    @Inject
    TestClientApi clientApi;
    private Multi<String> testSub()
    {
        return clientApi.testSouscription().onItem().invoke(s -> log.error("Item received:{}",s)).onFailure().invoke(throwable -> {
           log.error("Erreur",throwable);
           log.error("Exception instance:{}", throwable.getClass().getName() + "@" + Integer.toHexString(throwable.hashCode()));
        }).onFailure().call(() -> Uni.createFrom().voidItem().onItem().delayIt().by(Duration.ofSeconds(5))).onFailure()
              .recoverWithMulti(this::testSub);
    }

3.  Actions
   1. Start the GraphQL server with a souscription endpoint
   2. Start the Graphql Client with a souscription
   4. Stop the Graphql server
   5. Restart the Graphql Server after 30 secondes
 


### Output of `uname -a` or `ver`

_No response_

### Output of `java -version`

openjdk version 21.0.1

### Quarkus version or git rev

Quarkus 3.5.3

### Build tool (ie. output of `mvnw --version` or `gradlew --version`)

_No response_

### Additional information

I think that the problem come from that class:
      io.smallrye.graphql.client.vertx.typesafe.VertxTypesafeGrphaQLClientProxy
        private Uni<WebSocketSubprotocolHandler> webSocketHandler() {
            return webSocketHandler.updateAndGet(currentValue -> {
                if (currentValue == null) {
                    return Uni.createFrom().<WebSocketSubprotocolHandler> emitter(handlerEmitter -> {
                        List<String> subprotocolIds = subprotocols.stream().map(i -> i.getProtocolId()).collect(toList());
                        MultiMap headers = HeadersMultiMap.headers()
                                .addAll(new HeaderBuilder(api, null, additionalHeaders).build());
                        websocketUrl.get().subscribe().with(wsUrl -> {
                            httpClient.webSocketAbs(wsUrl, headers, WebsocketVersion.V13, subprotocolIds,
                                    result -> {
                                        if (result.succeeded()) {
                                            WebSocket webSocket = result.result();
                                            WebSocketSubprotocolHandler handler = BuiltinWebsocketSubprotocolHandlers
                                                    .createHandlerFor(webSocket.subProtocol(), webSocket,
                                                            subscriptionInitializationTimeout, initPayload, () -> {
                                                                webSocketHandler.set(null);
                                                            });
                                            handlerEmitter.complete(handler);
                                            log.debug("Using websocket subprotocol handler: " + handler);
                                        } else {
                                            handlerEmitter.fail(result.cause());
                                        }
                                    });
                        });
                    }).memoize().indefinitely();
                } else {
                    return currentValue;
                }
            });
The `webSocketHandler` is set to null when an error occured with the websocket, but the returned Uni is based on an emmiter that will failed when trying to connect multiple times. Then subsequent subscribe of this websocket handler will result in the same Failure been propagated.
@ITrium-Salah ITrium-Salah added the kind/bug Something isn't working label Feb 5, 2024
Copy link

quarkus-bot bot commented Feb 5, 2024

/cc @Ladicek (smallrye), @jmartisk (graphql,smallrye), @phillip-kruger (graphql,smallrye), @radcortez (smallrye)

@jmartisk
Copy link
Contributor

jmartisk commented Feb 6, 2024

Hi, could you have a look at smallrye/smallrye-graphql#2030 which is my proposed solution? It turns out we

  • wrongly cache the handler when the connection to the target server is refused
  • when the connection can't be established, we drop the exception (typically, Connection refused or something), which I think means the Multi won't get a proper failure callback, and retrying mechanisms won't work

I'd also suggest replacing your retry logic

onFailure().call(() -> Uni.createFrom().voidItem().onItem().delayIt().by(Duration.ofSeconds(5))).onFailure()
              .recoverWithMulti(this::testSub);

with something like

.onFailure().retry().withBackOff(Duration.ofSeconds(5)).atMost(MAX_ATTEMPTS)

to make sure it is retried more times than just once.
Hopefully the PR solves your problem?!

@jmartisk
Copy link
Contributor

jmartisk commented Feb 6, 2024

And one more important thing to note is that this retry logic probably won't work if you shut down the server gracefully, because then it will get a chance to send a proper close status, and the client will treat it as correct shutdown, so the multi will be treated as completed, not failed. You have to kill -9 the server or something like that.

For reconnecting when the subscription ends with success, you'd need something like

.onCompletion().switchTo(this::testSub)

@ITrium-Salah
Copy link
Author

ITrium-Salah commented Feb 6, 2024

Hi, could you have a look at smallrye/smallrye-graphql#2030 which is my proposed solution? It turns out we

  • wrongly cache the handler when the connection to the target server is refused
  • when the connection can't be established, we drop the exception (typically, Connection refused or something), which I think means the Multi won't get a proper failure callback, and retrying mechanisms won't work

I'd also suggest replacing your retry logic

onFailure().call(() -> Uni.createFrom().voidItem().onItem().delayIt().by(Duration.ofSeconds(5))).onFailure()
              .recoverWithMulti(this::testSub);

with something like

.onFailure().retry().withBackOff(Duration.ofSeconds(5)).atMost(MAX_ATTEMPTS)

to make sure it is retried more times than just once. Hopefully the PR solves your problem?!

I can't use the withBackOff method cause this method is forbidden when using the until method. Do you have a proper way to retry every Xsecondes until a predicate is true? I use the delay + the recoverWithMulti to be able to change the multi if i need to stop the connections process. By the way the method from my example retry every 5 seconds (no just once)

@ITrium-Salah
Copy link
Author

And one more important thing to note is that this retry logic probably won't work if you shut down the server gracefully, because then it will get a chance to send a proper close status, and the client will treat it as correct shutdown, so the multi will be treated as completed, not failed. You have to kill -9 the server or something like that.

For reconnecting when the subscription ends with success, you'd need something like

.onCompletion().switchTo(this::testSub)

Thank you, i will take this into account

@ITrium-Salah
Copy link
Author

@jmartisk How can i try the code from the pull request? I use maven

@jmartisk
Copy link
Contributor

jmartisk commented Feb 6, 2024

@jmartisk How can i try the code from the pull request? I use maven

Replied in the PR.

I can't use the withBackOff method cause this method is forbidden when using the until method. Do you have a proper way to retry every Xsecondes until a predicate is true? I use the delay + the recoverWithMulti to be able to change the multi if i need to stop the connections process. By the way the method from my example retry every 5 seconds (no just once)

Good question. I'm not really sure why delay + until is forbidden and how to implement it (you could theoretically just implement an until predicate that contains a sleep, but that would block the thread), maybe @jponge can advise?

@ITrium-Salah
Copy link
Author

@jmartisk How can i try the code from the pull request? I use maven

Replied in the PR.

I can't use the withBackOff method cause this method is forbidden when using the until method. Do you have a proper way to retry every Xsecondes until a predicate is true? I use the delay + the recoverWithMulti to be able to change the multi if i need to stop the connections process. By the way the method from my example retry every 5 seconds (no just once)

Good question. I'm not really sure why delay + until is forbidden and how to implement it (you could theoretically just implement an until predicate that contains a sleep, but that would block the thread), maybe @jponge can advise?

This solution work for me:
Uni.createFrom().voidItem().delayIt().by(Duration.ofSeconds(5)).transformToMulti(....).retry().until(predicate) work for me. But it will be more clean to have a retry().withBackOff().until() auhtorized

@jponge
Copy link
Member

jponge commented Feb 7, 2024

It's true we can't combine until with exponential backoff constructs, but I can't recall why we don't support this, possibly a scheduler mismatch of some kind. I've opened smallrye/smallrye-mutiny#1510 to remind us to revisit the assumptions and perhaps add this feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants