Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DefaultEndpoint future listener recycle lose command context on requeue failures #734

Closed
gszpak opened this issue Mar 17, 2018 · 6 comments
Labels
type: bug A general bug
Milestone

Comments

@gszpak
Copy link
Contributor

gszpak commented Mar 17, 2018

Hi Mark, you probably already hate me, but I have another issue for you.:)

I'm using snapshots of version 5.0.3.

I noticed that when the connection is closed while commands are being written to the channel, some of these commands are not completed exceptionally.

I wrote a simple piece of code to reproduce it:

public class LettuceBlockingClose {

    public static <T> CompletionStage<List<T>> join(List<CompletionStage<T>> stages) {
        CompletionStage<List<T>> current = CompletableFuture.completedFuture(new ArrayList<T>());
        for (int i = 0; i < stages.size(); ++i) {
            current = current.thenCombine(stages.get(i), (List<T> acc, T val) -> {
                acc.add(val);
                return acc;
            });
        }
        return current;
    }

    private static CompletionStage<List<Map<String, String>>> sendCommands(
            RedisAsyncCommands<String, String> commands, int numOfCommands) {
        commands.setAutoFlushCommands(false);
        List<CompletionStage<Map<String, String>>> responseFutures = new ArrayList<>(numOfCommands);
        for (int i = 0; i < numOfCommands; ++i) {
            responseFutures.add(commands.hgetall(String.format("key-%d", i)));
        }
        commands.flushCommands();
        commands.setAutoFlushCommands(true);
        return join(responseFutures);
    }

    public static void main(String[] args) throws Throwable {
        String host = args[0];
        int port = Integer.valueOf(args[1]);
        int numOfCommands = Integer.valueOf(args[2]);

        RedisURI redisURI = RedisURI.Builder.redis(host, port).build();
        RedisClient client = RedisClient.create(redisURI);
        StatefulRedisConnection<String, String> connection = client.connect();
        RedisAsyncCommands<String, String> commands = connection.async();

        CompletionStage<List<Map<String, String>>> commandsStage = sendCommands(commands, numOfCommands);
        System.out.println("Commands sent");
        connection.close();
        commandsStage.toCompletableFuture().get();
        System.out.println("Responses received");

        client.shutdown();
    }
}

When I execute it with e.g. 100000 commands, it freezes, because some commands are not completed. The interesting part is, always the same number of commands is completed exceptionally (e.g., 16384 for 100k).

Here is my guess about what happened: when CommandHandler.channelInactive is called, not all written commands have already been added to the stack - i. e., not all AddToStack listeners have already been processed. So, during this call: https://github.com/lettuce-io/lettuce-core/blob/5ef6722c2cf1e56edd328ffebf57e791f3ca1350/src/main/java/io/lettuce/core/protocol/CommandHandler.java#L303 some commands are not on the stack and then remain uncompleted.

I'm not sure how to fix it yet - I'll try to create a PR during the weekend. Any suggestions are welcome.

Thanks!

@mp911de mp911de added the type: bug A general bug label Mar 17, 2018
@mp911de
Copy link
Collaborator

mp911de commented Mar 17, 2018

Thanks a lot for reporting the issue.

This error happens because of the introduced pooling of future callbacks. What happens here is that we resubmit commands to the endpoint.

The endpoint is already closed which fails on command submission. The exception is caught but we lose the commands because the future callback was recycled and so command completion runs into a NPE.

The fix is quite simple: We need to retain the sent command/commands and use these to perform completion.

@mp911de mp911de changed the title Some sent commands are not completed exceptionally on connection.close() DefaultEndpoint future listener recycle lose command context on requeue failures Mar 17, 2018
@mp911de mp911de added this to the Lettuce 5.0.3 milestone Mar 17, 2018
mp911de added a commit that referenced this issue Mar 17, 2018
Lettuce now retains the actual sent commands during exceptional completion. Failures on requeue can occur if the connection is closed or queue bounds are exceeded. Previously, we lost command context because the listener was recycled hence command completion ran into NullPointerExceptions.
mp911de added a commit that referenced this issue Mar 17, 2018
Lettuce now retains the actual sent commands during exceptional completion. Failures on requeue can occur if the connection is closed or queue bounds are exceeded. Previously, we lost command context because the listener was recycled hence command completion ran into NullPointerExceptions.
@mp911de
Copy link
Collaborator

mp911de commented Mar 17, 2018

That's fixed now, snapshot builds are available.

@mp911de mp911de closed this as completed Mar 17, 2018
@gszpak
Copy link
Contributor Author

gszpak commented Mar 20, 2018

Hi Mark, thanks a lot for fixing that part of code.

I'd like to reopen the issue, because unfortunately we still see it happen and I am not able to reproduce it. One important thing is, we want to control reconnection on our own. Therefore, We have set autoReconnect to false and DisconnectedBehavior to REJECT_COMMANDS. So, DefaultEndpoint uses AtMostOnceListener in our case.

My hypothesis was the following:

  • commands are written successfully, but listener adding them to stack is not called immediately - instead it adds listener methods to eventLoop's queue
  • when close is called, ComandHandler's stack does not contain all sent commands, so it can't cancel them in notifyDrainQueuedCommands.

Actually, after reading netty code, I think this hypothesis was wrong - as far as I understood the code,
CommandHandler.channelInactive will also be called by executor.execute(task) - i.e., task will be added at the end of eventLoops queue. So, it would have to be added before the tasks adding command to stack, but that would mean, the command was written successfully after channel had been closed, which is impossible.

@mp911de
Copy link
Collaborator

mp911de commented Mar 20, 2018

All netty channel interaction that happens from inside netty is single-threaded, meaning, there is no multi-threaded aspect when working inside of a ChannelHandler. Code outside of that (e.g. DefaultEndpoint calling Channel.write(…)) is multi-threaded.

ChannelFuture listeners are immediately called as soon as the actual write is finished (see also ChannelOutboundBuffer.remove(…)). From that perspective, no commands are get added to stack after a disconnect.

From a disconnect perspective, channelInactive(…) happens before write ChannelPromises are failed.

I think At-Most-Once mode makes here a difference. This ticket was addressing At-Least-Once mode.
Care to file a new ticket? IMO it makes sense to inspect commandBuffer and disconnectedBuffer in DefaultEndpoint and stack of CommandHandler.

@gszpak
Copy link
Contributor Author

gszpak commented Mar 20, 2018

Hey Mark, I'm sorry - the bug was in my code.

Thanks for help and apologies for bothering you!

@mp911de
Copy link
Collaborator

mp911de commented Mar 20, 2018

Glad to hear you were able to figure it out. Happy to help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants