Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Application-level exceptions in Pub/Sub notifications mess up pub sub decoding state and cause timeouts #997

Closed
giridharkannan opened this issue Mar 9, 2019 · 7 comments
Labels
type: bug A general bug
Milestone

Comments

@giridharkannan
Copy link

Current Behavior

If a client exception occurs, while notifying a subscriber and if the next response form Redis Server is an UNSUBSCRIBE response, lettuce will discard this UNSUBSCRIBE data from the buffer, resulting in RedisCommandTimeoutException

Input Code

List<RedisURI> rList = new ArrayList<>();
rList.add(new RedisURI("127.0.0.1", 7000, Duration.ofSeconds(60)));
rList.add(new RedisURI("127.0.0.1", 7001, Duration.ofSeconds(60)));
rList.add(new RedisURI("127.0.0.1", 7002, Duration.ofSeconds(60)));
RedisClusterClient clusterClient = RedisClusterClient.create(rList);

StatefulRedisClusterPubSubConnection<String, String> redisSub = clusterClient.connectPubSub();
StatefulRedisClusterConnection<String, String> redisPub = clusterClient.connect();


redisSub.addListener(new RedisPubSubAdapter<String, String>() {
    int i = 0;
    private void sendMsg(String msg, String channel) {
        if (++i % 2 == 0) throw new NullPointerException();
        System.out.println("Channel -> " + channel + "msg ->" + msg);
    }

    @Override
    public void message(String channel, String message) {
        sendMsg(message, channel);
    }

    @Override
    public void message(String pattern, String channel, String message) {
        sendMsg(message, channel);
    }
});

AtomicInteger count = new AtomicInteger();
List<String> subList = new ArrayList<>();
IntStream.range(0, 3).forEach(i -> subList.add("test_sub_"+i));

//Subscribe
IntStream.range(0, subList.size()).forEach(i -> redisSub.sync().subscribe(subList.get(i)));


//Publish
IntStream.range(0, 5).forEach(j -> {
    for(String channel : subList) redisPub.sync().publish(channel, "text c -"+count.getAndIncrement());
});

//Unsubscribe
IntStream.range(0, subList.size()).forEach(i -> redisSub.sync().unsubscribe(subList.get(i)));

//Publish
IntStream.range(0, 5).forEach(j -> {
    for(String channel : subList) redisPub.sync().publish(channel, "text c -"+count.getAndIncrement());
});

Expected behavior/code

Must not get RedisCommandTimeoutException

Environment

  • Lettuce version(s): [e.g. 5.1.5.RELEASE, 4.2.2.Final]
  • Redis version: [e.g. 4.0.9]

Possible Solution

The issue occurs because the old output is not cleared, which results in canDecode of PubSubCommandHandler to return false, there by discarding all the readBytes of the buffer

Placing all the occurrence of output = new PubSubOutput<>(codec); in PubSubCommandHandler.decode method inside finally block could solve this. But 'am not sure whether it's the right solution

@mp911de
Copy link
Collaborator

mp911de commented Mar 10, 2019

Can you provide a test case to reproduce the issue? Spinning up a massive amount of messages does not help to reliably diagnose the issue. PubSubCommandHandlerUnitTests is a good starting point for isolated Pub/Sub testing.

@mp911de mp911de added type: bug A general bug status: waiting-for-feedback We need additional information before we can continue labels Mar 10, 2019
@giridharkannan
Copy link
Author

giridharkannan commented Mar 10, 2019 via email

@giridharkannan
Copy link
Author

It took me 3 days to pinpoint this issue, so I can understand your concern.

Now the same test code takes 4 to 5 runs to reproduce RedisCommandTimeoutException.
In case, if you are finding difficulty in reproducing, I will try to come up with a better test case.

@mp911de
Copy link
Collaborator

mp911de commented Mar 10, 2019

Thanks. As general rule of thumb, raise that kind of tickets earlier so we can assist you with problem analysis.

@giridharkannan
Copy link
Author

giridharkannan commented Mar 10, 2019

With the below code snippet, you must be able to reproduce the issue.

List<RedisURI> rList = new ArrayList<>();
rList.add(new RedisURI("127.0.0.1", 7000, Duration.ofSeconds(15)));
rList.add(new RedisURI("127.0.0.1", 7001, Duration.ofSeconds(15)));
rList.add(new RedisURI("127.0.0.1", 7002, Duration.ofSeconds(15)));
RedisClusterClient clusterClient = RedisClusterClient.create(rList);

StatefulRedisClusterPubSubConnection<String, String> redisSub = clusterClient.connectPubSub();
StatefulRedisClusterConnection<String, String> redisPub = clusterClient.connect();

CountDownLatch subCl = new CountDownLatch(1);


redisSub.addListener(new RedisPubSubAdapter<String, String>() {
    private void sendMsg() {
        try {
            if(subCl.getCount() > 0) {
                subCl.countDown();
                /*
                 Sleep, so that UNSUBSCRIBE response from redis-server is read by the TCP's
                 receive queue and intern read by netty, in the next event loop
                  */
                Thread.sleep(2000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        throw new NullPointerException();
    }

    @Override
    public void message(String channel, String message) {
        sendMsg();
    }

    @Override
    public void message(String pattern, String channel, String message) {
        sendMsg();
    }
});

AtomicInteger count = new AtomicInteger();
String channel = "test_sub_0";

//Subscribe
redisSub.sync().subscribe(channel);

//Publish
redisPub.async().publish(channel, "text c -"+count.getAndIncrement());
redisPub.async().publish(channel, "text c -"+count.getAndIncrement());

subCl.await(); //Wait for first message
redisSub.sync().unsubscribe(channel);

giridharkannan pushed a commit to giridharkannan/lettuce-core that referenced this issue Mar 11, 2019
@mp911de mp911de removed the status: waiting-for-feedback We need additional information before we can continue label Mar 12, 2019
@mp911de mp911de added this to the 5.1.6 milestone Mar 14, 2019
@mp911de
Copy link
Collaborator

mp911de commented Mar 14, 2019

Thanks a lot for your support. I took the code from your unit test commit and integrated it into Lettuce. It took me quite a while to understand the issue.

The fix should be adding try/catch blocks around notification and around listener invocation with appropriate logging.

@mp911de mp911de changed the title RedisCommandTimeoutException while Unsubscribing Application-level exceptions in Pub/Sub notifications mess up pub sub decoding state and cause timeouts Mar 14, 2019
mp911de added a commit that referenced this issue Mar 14, 2019
Pub/Sub listener callbacks are now guarded against exceptions bubbling up into channel processing. Instead, exceptions are logged. Listener notification stops on the first exception.

These guards prevent exceptions interrupting the state update flow which could previously cause the state machine of decoding leave in an invalid state.
mp911de added a commit that referenced this issue Mar 14, 2019
Pub/Sub listener callbacks are now guarded against exceptions bubbling up into channel processing. Instead, exceptions are logged. Listener notification stops on the first exception.

These guards prevent exceptions interrupting the state update flow which could previously cause the state machine of decoding leave in an invalid state.
@mp911de
Copy link
Collaborator

mp911de commented Mar 14, 2019

That's fixed now.

@mp911de mp911de closed this as completed Mar 14, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants