Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RedisPubSubAdapter.message() being called with wrong channel #724

Closed
adimarco opened this issue Mar 6, 2018 · 8 comments
Closed

RedisPubSubAdapter.message() being called with wrong channel #724

adimarco opened this issue Mar 6, 2018 · 8 comments
Labels
type: bug A general bug
Milestone

Comments

@adimarco
Copy link

adimarco commented Mar 6, 2018

This is happening with 5.0.0. I have a pretty simple pubsub listener that I'm using to listen to a LOT of different pubsub topics. One of those topics is "heartbeat", which I use as a keepalive for downstream processes. After one of my listeners has been up for a while, it starts seeing messages published on the "heartbeat" topic that were not published on it. I assume this is also happening for other topics, but that one is easy to debug because only one message is every actually published on it.

I have confirmed this by manually connecting to redis with redis-cli and doing a SUBSCRIBE heartbeat and watching the raw output in a terminal window. The message() callback is invoked for messages that were not received on that channel.

The java code boils down to this:

public class MessageProcessor {
     private static final Logger LOG = LoggerFactory.getLogger(MessageProcessor.class);
     private final StatefulRedisPubSubConnection<String, String> conn;

     // default constructor, takes a connection and adds a listener to it
     public MessageProcessor(StatefulRedisPubSubConnection<String, String> conn) {
        this.conn = conn;

        conn.addListener(new RedisPubSubAdapter<String, String>() {
          @Override
          public void message(String channel, String message) {
              // debug print every heartbeat received
              if ("heartbeat".equals(channel)) {
                  LOG.info("RECEIVED HEARTBEAT: " + message);
               }
               processMessage(channel, message);
          }
        });
    }

    // tell the listener to subscribe to more topics
    public void subscribeToChannels(Set<String> channels) {
        conn.async().subscribe(channels.toArray(new String[] {}));
    }
}

When the message processor is created, it's passed a connection. It creates a listener that passes received messages off to be processed elsewhere. Since I was seeing weird behavior, I had it print every message it receives on the "heartbeat" channel.

The subscribeToChannels() method adds to the list of topics the connection is subscribed to. This list never decreases in size, i.e. unsubscribe(channels) is never called, but it grows toward a finite maximum (around 8000).

Eventually, I start seeing log messages "RECEIVED HEARTBEAT:" that include messages that definitely were not published on the "heartbeat" channel. Also weird, some of those messages appear to be topic names themselves, i.e., the message(channel, message) callback is invoked as message("heartbeat", "someOtherTopicName")

I've confirmed that the bogus messages aren't sent on the heartbeat channel manually via redis-cli, and also because I have several different copies of this java code running simultaneously, and generally only one of them will "go bad" and start claiming to have received messages with the wrong channel.

I'm reverting to 4.4.0 to see if I can reproduce the problem there as well. I'd be happy to help debug and reproduce however I can.

@mp911de mp911de added the type: bug A general bug label Mar 6, 2018
@mp911de
Copy link
Collaborator

mp911de commented Mar 6, 2018

Thanks a lot for reporting your issue, great level of detail. Do you use the same connection to publish messages?

Depending on the load you have on the server, you could enable debug logging and dump log events via asynchronous logging to a file (to not create side-effects caused by I/O wait/fsync). Debug/trace logs show the actual payload that helps to investigate what's going on.

@adimarco
Copy link
Author

adimarco commented Mar 6, 2018

I was originally using the same connection to publish some messages, but for the sake of narrowing this down I removed that code and the problem remains. This connection is only ever used to receive messages.

I'll see if I can get the async logging going on the server. I just reverted to 4.4.0 this morning and am waiting to see if the problem still exists there. Even with 5.0.0, it wouldn't start happening until the process had been up and running for the better part of 24 hours, so it'll probably be until tomorrow before I know more.

@mp911de
Copy link
Collaborator

mp911de commented Mar 6, 2018

One thing that can play into that as well is how messages are received.

The receiving piece can't be always sure where to route the Pub/Sub response: To a command or to a Pub/Sub listener. The thing is, when you go into subscribe mode, then responsed are attempted to be decoded to active commands first, then to Pub/Sub listeners.

Assume that you send a command (SUBSCRIBE) the server while you're already subscribed to some other channel, the command is set to receive the next incoming response. At the same time, a different client publishes a message and Redis processes the PUBLISH first, notifying your client that has just sent another SUBSCRIBE.

A client isn't able to distinguish whether the response it has received belongs to the command or whether it's a subscribe confirmation without parsing the actual content.

Something similar can happen for other commands.

@mp911de
Copy link
Collaborator

mp911de commented Mar 6, 2018

PS: If you know the number of channels is finite, then either subscribe to them all at once or use PSUBSCRIBE * until we figured out how to fix the issue.

@adimarco
Copy link
Author

adimarco commented Mar 6, 2018

Assume that you send a command (SUBSCRIBE) the server while you're already subscribed to some other channel, the command is set to receive the next incoming response. At the same time, a different client publishes a message and Redis processes the PUBLISH first, notifying your client that has just sent another SUBSCRIBE.

Oh, that's got to be it. Fits the observed problem to a T.

I can use PSUBSCRIBE - but what's the preferred workflow if you need to subscribe to a changing list of topics? Disconnect and re-connect/re-subscribe when the list of topics changes? Sounds like it's maybe a race condition in any case to SUBSCRIBE and then change the list of subscriptions while still in subscribe mode?

@mp911de
Copy link
Collaborator

mp911de commented Mar 6, 2018

TBCH, at that point, I don't know the answer yet. PSUBSCRIBE * and maintaining the known channels/pattern in the application might be the way to go for now. I wouldn't recommend unsubscribing since you might lose messages.

@mp911de mp911de added this to the Lettuce 4.4.4 milestone Mar 6, 2018
@mp911de
Copy link
Collaborator

mp911de commented Mar 19, 2018

I think I found an approach how to deal with this issue. The crucial bit is whether one or more commands are in progress. Responses can arrive in-line while command decoding is in process.

Client side

  1. Send command SUBSCRIBE
  2. Send PING command

A second client

  1. Send command PUBLISH

Server side

  1. SUBSCRIBE command arrives
  2. PUBLISH command arrives
  3. PING command arrives

Redis sends responses to commands as they are received/processed, meaning:

Client side

  1. Receives SUBSCRIBE command response (correlated correctly to SUBSCRIBE command)
  2. Receives Pub/Sub message (correlated to PING command)
  3. Receives PING command response (treated as Pub/Sub message - ugh)

Conclusion

We need to record and inspect the actual response if we're using Pub/Sub connections and there are commands without a response. Then we need to inspect the response to see whether it's a PMESSAGE or MESSAGE response to either propagate the message to Pub/Sub listeners or to complete the command.

Does this make sense?

mp911de added a commit that referenced this issue Mar 19, 2018
Lettuce now inspects Redis responses via PubSubCommandHandler and ReplayOutput whether a received response is a Pub/Sub message or whether the response belongs to a command on the protocol stack. Introspection is required as Redis responses may contain interleaved messages that do not belong to a command or may arrive before the command response.

Previously, interleaved messages could get used to complete commands on the protocol stack which causes a defunct protocol state.
mp911de added a commit that referenced this issue Mar 19, 2018
Lettuce now inspects Redis responses via PubSubCommandHandler and ReplayOutput whether a received response is a Pub/Sub message or whether the response belongs to a command on the protocol stack. Introspection is required as Redis responses may contain interleaved messages that do not belong to a command or may arrive before the command response.

Previously, interleaved messages could get used to complete commands on the protocol stack which causes a defunct protocol state.
mp911de added a commit that referenced this issue Mar 19, 2018
Lettuce now inspects Redis responses via PubSubCommandHandler and ReplayOutput whether a received response is a Pub/Sub message or whether the response belongs to a command on the protocol stack. Introspection is required as Redis responses may contain interleaved messages that do not belong to a command or may arrive before the command response.

Previously, interleaved messages could get used to complete commands on the protocol stack which causes a defunct protocol state.
mp911de added a commit that referenced this issue Mar 19, 2018
Lettuce now inspects Redis responses via PubSubCommandHandler and ReplayOutput whether a received response is a Pub/Sub message or whether the response belongs to a command on the protocol stack. Introspection is required as Redis responses may contain interleaved messages that do not belong to a command or may arrive before the command response.

Previously, interleaved messages could get used to complete commands on the protocol stack which causes a defunct protocol state.
@mp911de
Copy link
Collaborator

mp911de commented Mar 19, 2018

Snapshot builds are in place (4.4.4 up to 5.1.0).

@mp911de mp911de closed this as completed Mar 21, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants