Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xreadgroup throws an exception if message was trimmed when reading pending messages #1116

Closed
xeizmendi opened this issue Jan 10, 2019 · 7 comments

Comments

@xeizmendi
Copy link
Contributor

xeizmendi commented Jan 10, 2019

Version: 3.0.1

Platform: Python 3.6.6 on Debian 9 and Redis 5.0.2

Description: If messages get deleted from a STREAM via XADD with MAXLEN or XTRIM in a stream where there is an existing consumer group after it's consumed but before being acked, when you consume the pending messages using XREADGROUP with id 0, redis returns a (nil) as the payload of the messages instead of field key/value pairs. This causes xreadgroup to throw an exception:

[2019-01-10 12:00:10,206] [16] [ERROR] Error reading messages
Traceback (most recent call last):
  File "/code/app/daemons/__init__.py", line 38, in run
    stream_messages = self.redis_client.xreadgroup("consumer-group", "consumer", {"stream": "0"}, count=1000)
  File "/usr/local/lib/python3.6/site-packages/redis/client.py", line 2185, in xreadgroup
    return self.execute_command('XREADGROUP', *pieces)
  File "/usr/local/lib/python3.6/site-packages/redis/client.py", line 755, in execute_command
    return self.parse_response(connection, command_name, **options)
  File "/usr/local/lib/python3.6/site-packages/redis/client.py", line 774, in parse_response
    return self.response_callbacks[command_name](response, **options)
  File "/usr/local/lib/python3.6/site-packages/redis/client.py", line 275, in parse_xread
    return [[nativestr(r[0]), parse_stream_list(r[1])] for r in response]
  File "/usr/local/lib/python3.6/site-packages/redis/client.py", line 275, in <listcomp>
    return [[nativestr(r[0]), parse_stream_list(r[1])] for r in response]
  File "/usr/local/lib/python3.6/site-packages/redis/client.py", line 246, in parse_stream_list
    return [(r[0], pairs_to_dict(r[1])) for r in response]
  File "/usr/local/lib/python3.6/site-packages/redis/client.py", line 246, in <listcomp>
    return [(r[0], pairs_to_dict(r[1])) for r in response]
  File "/usr/local/lib/python3.6/site-packages/redis/client.py", line 195, in pairs_to_dict
    it = iter(response)
TypeError: 'NoneType' object is not iterable

You can see this is the expected behaviour in Redis's source code:
https://github.com/antirez/redis/blob/5.0.2/src/t_stream.c#L1031

@itamarhaber
Copy link
Member

Hello @xeizmendi

Thanks for detecting the issue and submitting a corrective PR.

@andymccurdy
Copy link
Contributor

@itamarhaber What's the intended behavior on the server side in this situation? Leaving the Message ID hanging around in the consumer group with a nil payload seems weird to me and might be an edge case that wasn't thought of.

I'm fine to merge this but curious about the thinking on the server side.

@andymccurdy
Copy link
Contributor

This could have other unintended side effects for users. For instance, if someone has code like this:

for stream, message in r.xreadgroup(...):
    for message in messages:
        my_field = message['my_field']
        ...

If all your messages contain 'my_field' it's reasonably safe to assume that the above code works fine. However with this PR now you would also need to account for an empty message.

Granted, returning an empty dict is far better than raising an exception.

@itamarhaber
Copy link
Member

The thinking afaik is that this is ok. Unacknowledged messages stay in the PEL even if deleted, and that's the meaning of a nil payload.

As for the above snippet, yeah it would break but a person who's trimming/deleting a stream should probably do something like:

    for message in messages:
        if not message:
            # The message was deleted before it was acked - do something :)
            continue
        my_field = message['my_field']
        ...

Ref: redis/redis#5754 (and mebbe redis/redis#5739).

@soloestoy
Copy link

relate to redis/redis#5718

@xeizmendi
Copy link
Contributor Author

Any chance the PR will be merged?
The issue still remains and the current implementation in master throws an exception which is worse than returning an empty dictionary.

andymccurdy added a commit that referenced this issue Jan 27, 2019
@andymccurdy
Copy link
Contributor

Merged. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants