Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement XREAD #147

Merged
merged 8 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.11"
- name: Install dependencies
env:
PYTHON_KEYRING_BACKEND: keyring.backends.null.Keyring
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- uses: actions/setup-python@v4
with:
cache-dependency-path: poetry.lock
python-version: "3.10"
python-version: "3.11"
- name: Install dependencies
env:
PYTHON_KEYRING_BACKEND: keyring.backends.null.Keyring
Expand Down Expand Up @@ -51,15 +51,15 @@ jobs:
max-parallel: 8
fail-fast: false
matrix:
redis-image: [ "redis:6.2.10", "redis:7.0.7" ]
redis-image: [ "redis:6.2.12", "redis:7.0.11" ]
python-version: [ "3.7", "3.8", "3.10", "3.11" ]
redis-py: [ "4.3.6", "4.4.4", "4.5.4" ]
include:
- python-version: "3.10"
- python-version: "3.11"
redis-image: "redis:6.2.10"
redis-py: "4.5.4"
lupa: true
- python-version: "3.10"
- python-version: "3.11"
redis-image: "redis/redis-stack:7.0.6-RC3"
redis-py: "4.5.4"
lupa: true
Expand Down
18 changes: 18 additions & 0 deletions docs/about/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,24 @@ description: Change log of all fakeredis releases

## Next release

## v2.12.0

### 🚀 Features
- Implement `XREAD` #147

## v2.11.2

### 🧰 Bug Fixes

- Unique FakeServer when no connection params are provided (#142)

## v2.11.1

### 🧰 Maintenance

- Minor fixes supporting multiple connections
- Update documentation

## v2.11.0

### 🚀 Features
Expand Down
8 changes: 4 additions & 4 deletions docs/redis-commands/Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,10 @@ Return the number of messages in a stream.

Returns the messages from a stream within a range of IDs.

### [XREAD](https://redis.io/commands/xread/)

Returns messages from multiple streams with IDs greater than the ones requested. Blocks until a message is available otherwise.

### [XREVRANGE](https://redis.io/commands/xrevrange/)

Returns the messages from a stream within a range of IDs in reverse order.
Expand Down Expand Up @@ -1565,10 +1569,6 @@ Returns information about a stream.

Returns the information and entries from a stream consumer group's pending entries list.

#### [XREAD](https://redis.io/commands/xread/) <small>(not implemented)</small>

Returns messages from multiple streams with IDs greater than the ones requested. Blocks until a message is available otherwise.

#### [XREADGROUP](https://redis.io/commands/xreadgroup/) <small>(not implemented)</small>

Returns new or historical messages from a stream for a consumer in agroup. Blocks until a message is available otherwise.
Expand Down
4 changes: 2 additions & 2 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
mkdocs==1.4.2
mkdocs-material==9.1.8
mkdocs==1.4.3
mkdocs-material==9.1.9
2 changes: 1 addition & 1 deletion fakeredis/_command_args_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def _parse_params(
while i < len(actual_args):
found = False
for key in args_info:
if null_terminate(actual_args[i]).lower() == key:
if null_terminate(actual_args[i]) == key:
arg_position, _ = args_info[key]
results[arg_position], parsed = _parse_params(key, i, actual_args)
i += parsed
Expand Down
13 changes: 10 additions & 3 deletions fakeredis/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ def parse_id(id_str: str):
return timestamp, sequence

@classmethod
def decode(cls, value):
def decode(cls, value, exclusive=False):
if value == b'-':
return cls(BeforeAny(), True)
elif value == b'+':
return cls(AfterAny(), True)
elif value[:1] == b'(':
return cls(cls.parse_id(value[1:]), True)
return cls(cls.parse_id(value), False)
return cls(cls.parse_id(value), exclusive)


class XStream:
Expand Down Expand Up @@ -88,10 +88,14 @@ def find_index(self, id_str: str) -> Tuple[int, bool]:
ind = bisect.bisect_left(list(map(lambda x: x[0], self._values)), ts_seq)
return ind, self._values[ind][0] == ts_seq

@staticmethod
def _encode_id(record):
return f'{record[0][0]}-{record[0][1]}'.encode()

@staticmethod
def _format_record(record):
results = list(record[1:][0])
return [f'{record[0][0]}-{record[0][1]}'.encode(), results]
return [XStream._encode_id(record), results]

def trim(self,
maxlen: Optional[int] = None,
Expand Down Expand Up @@ -125,3 +129,6 @@ def match(record):
if reverse:
return list(reversed(tuple(matches)))
return list(matches)

def last_item_key(self):
XStream._encode_id(self._values[-1])
1 change: 0 additions & 1 deletion fakeredis/commands_mixins/bitmap_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


class BitmapCommandsMixin:
# BITMAP commands
# TODO: bitfield, bitfield_ro, bitpos
@staticmethod
def _bytes_as_bin_string(value):
Expand Down
4 changes: 2 additions & 2 deletions fakeredis/commands_mixins/connection_mixin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from fakeredis import _msgs as msgs
from fakeredis._commands import command, DbIndex
from fakeredis._helpers import SimpleError, OK, SimpleString
from fakeredis._commands import (command, DbIndex)
from fakeredis._helpers import (SimpleError, OK, SimpleString)

PONG = SimpleString(b'PONG')

Expand Down
2 changes: 1 addition & 1 deletion fakeredis/commands_mixins/generic_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from fakeredis._commands import (
command, Key, Int, DbIndex, BeforeAny, CommandItem, SortFloat,
delete_keys, key_value_type, )
from fakeredis._helpers import compile_pattern, SimpleError, OK, casematch
from fakeredis._helpers import (compile_pattern, SimpleError, OK, casematch)
from fakeredis._zset import ZSet


Expand Down
4 changes: 2 additions & 2 deletions fakeredis/commands_mixins/hash_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import math

from fakeredis import _msgs as msgs
from fakeredis._commands import command, Key, Hash, Int, Float
from fakeredis._helpers import SimpleError, OK
from fakeredis._commands import (command, Key, Hash, Int, Float)
from fakeredis._helpers import (SimpleError, OK)


class HashCommandsMixin:
Expand Down
11 changes: 3 additions & 8 deletions fakeredis/commands_mixins/list_mixin.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import functools

from fakeredis import _msgs as msgs
from fakeredis._commands import (
Key, command, Int, CommandItem, Timeout, fix_range)
from fakeredis._helpers import (
OK, SimpleError, SimpleString, casematch)
from fakeredis._commands import (Key, command, Int, CommandItem, Timeout, fix_range)
from fakeredis._helpers import (OK, SimpleError, SimpleString, casematch)


def _list_pop(get_slice, key, *args):
"""Implements lpop and rpop.

`get_slice` must take a count and return a slice expression for the
range to pop.
`get_slice` must take a count and return a slice expression for the range to pop.
"""
# This implementation is somewhat contorted to match the odd
# behaviours described in https://github.com/redis/redis/issues/9680.
Expand All @@ -36,8 +33,6 @@ def _list_pop(get_slice, key, *args):


class ListCommandsMixin:
# List commands

def _bpop_pass(self, keys, op, first_pass):
for key in keys:
item = CommandItem(key, self._db, item=self._db.get(key), default=[])
Expand Down
1 change: 0 additions & 1 deletion fakeredis/commands_mixins/pubsub_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


class PubSubCommandsMixin:
# Pubsub commands
def __init__(self, *args, **kwargs):
super(PubSubCommandsMixin, self).__init__(*args, **kwargs)
self._pubsub = 0 # Count of subscriptions
Expand Down
8 changes: 5 additions & 3 deletions fakeredis/commands_mixins/scripting_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import logging

from fakeredis import _msgs as msgs
from fakeredis._commands import command, Int
from fakeredis._helpers import SimpleError, SimpleString, null_terminate, OK, encode_command
from fakeredis._commands import (command, Int)
from fakeredis._helpers import (SimpleError, SimpleString, null_terminate, OK, encode_command)

LOGGER = logging.getLogger('fakeredis')
REDIS_LOG_LEVELS = {
Expand Down Expand Up @@ -84,6 +84,8 @@ def _convert_redis_result(self, lua_runtime, result):
]
return lua_runtime.table_from(converted)
elif isinstance(result, SimpleError):
if result.value.startswith('ERR wrong number of arguments'):
raise SimpleError(msgs.WRONG_ARGS_MSG7)
raise result
else:
raise RuntimeError("Unexpected return type from redis: {}".format(type(result)))
Expand Down Expand Up @@ -175,7 +177,7 @@ def eval(self, script, numkeys, *keys_and_args):
try:
result = lua_runtime.execute(script)
except SimpleError as ex:
if self.version == 6:
if self.version <= 6:
raise SimpleError(msgs.SCRIPT_ERROR_MSG.format(sha1.decode(), ex))
raise SimpleError(ex.value)
except LuaError as ex:
Expand Down
6 changes: 3 additions & 3 deletions fakeredis/commands_mixins/server_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ def save(self):

@command(())
def time(self):
now_us = round(time.time() * 1000000)
now_s = now_us // 1000000
now_us %= 1000000
now_us = round(time.time() * 1_000_000)
now_s = now_us // 1_000_000
now_us %= 1_000_000
return [str(now_s).encode(), str(now_us).encode()]

@command((DbIndex, DbIndex))
Expand Down
45 changes: 42 additions & 3 deletions fakeredis/commands_mixins/streams_mixin.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import functools
from typing import List

import fakeredis._msgs as msgs
from fakeredis._command_args_parsing import extract_args
from fakeredis._commands import Key, command
from fakeredis._helpers import SimpleError
from fakeredis._commands import Key, command, CommandItem
from fakeredis._helpers import SimpleError, casematch
from fakeredis._stream import XStream, StreamRangeTest


class StreamsCommandsMixin:
@command(name="XADD", fixed=(Key(),), repeat=(bytes,), )
def xadd(self, key, *args):

(nomkstream, limit, maxlen, minid), left_args = extract_args(
args, ('nomkstream', '+limit', '~+maxlen', '~minid'), error_on_unexpected=False)
if nomkstream and key.value is None:
Expand Down Expand Up @@ -71,3 +73,40 @@ def xrange(self, key, _min, _max, *args):
def xrevrange(self, key, _min, _max, *args):
(count,), _ = extract_args(args, ('+count',))
return self._xrange(key, _max, _min, True, count)

def _xread(self, stream_start_id_list: List, count: int, first_pass: bool):
max_inf = StreamRangeTest.decode(b'+')
res = list()
for (item, start_id) in stream_start_id_list:
stream_results = self._xrange(item, start_id, max_inf, False, count)
if first_pass and (count is None or len(stream_results) < count):
raise SimpleError(msgs.WRONGTYPE_MSG)
if len(stream_results) > 0:
res.append([item.key, stream_results])
return res

@staticmethod
def _parse_start_id(key: CommandItem, s: bytes) -> StreamRangeTest:
if s == b'$':
return StreamRangeTest.decode(key.value.last_item_key(), exclusive=True)
return StreamRangeTest.decode(s, exclusive=True)

@command(name="XREAD", fixed=(bytes,), repeat=(bytes,))
def xread(self, *args):
(count, timeout,), left_args = extract_args(args, ('+count', '+block',), error_on_unexpected=False)
if (len(left_args) < 3
or not casematch(left_args[0], b'STREAMS')
or len(left_args) % 2 != 1):
raise SimpleError(msgs.SYNTAX_ERROR_MSG)
left_args = left_args[1:]
num_streams = int(len(left_args) / 2)

stream_start_id_list = list()
for i in range(num_streams):
item = CommandItem(left_args[i], self._db, item=self._db.get(left_args[i]), default=None)
start_id = self._parse_start_id(item, left_args[i + num_streams])
stream_start_id_list.append((item, start_id,))
if timeout is None:
return self._xread(stream_start_id_list, count, False)
else:
return self._blocking(timeout, functools.partial(self._xread, stream_start_id_list, count))
50 changes: 25 additions & 25 deletions fakeredis/commands_mixins/string_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,45 @@ def _lcs(s1, s2):
pi = [[0] * (l2 + 1) for _ in range(0, l1 + 1)]

# Algorithm to calculate the length of the longest common subsequence
for i in range(1, l1 + 1):
for j in range(1, l2 + 1):
if s1[i - 1] == s2[j - 1]:
opt[i][j] = opt[i - 1][j - 1] + 1
pi[i][j] = 0
elif opt[i][j - 1] >= opt[i - 1][j]:
opt[i][j] = opt[i][j - 1]
pi[i][j] = 1
for r in range(1, l1 + 1):
for c in range(1, l2 + 1):
if s1[r - 1] == s2[c - 1]:
opt[r][c] = opt[r - 1][c - 1] + 1
pi[r][c] = 0
elif opt[r][c - 1] >= opt[r - 1][c]:
opt[r][c] = opt[r][c - 1]
pi[r][c] = 1
else:
opt[i][j] = opt[i - 1][j]
pi[i][j] = 2
opt[r][c] = opt[r - 1][c]
pi[r][c] = 2
# Length of the longest common subsequence is saved at opt[n][m]

# Algorithm to calculate the longest common subsequence using the Pi array
# Also calculate the list of matches
i, j = l1, l2
r, c = l1, l2
result = ''
matches = list()
s1ind, s2ind, curr_length = None, None, 0

while i > 0 and j > 0:
if pi[i][j] == 0:
result = chr(s1[i - 1]) + result
i -= 1
j -= 1
while r > 0 and c > 0:
if pi[r][c] == 0:
result = chr(s1[r - 1]) + result
r -= 1
c -= 1
curr_length += 1
elif pi[i][j] == 2:
i -= 1
elif pi[r][c] == 2:
r -= 1
else:
j -= 1
c -= 1

if pi[i][j] == 0 and curr_length == 1:
s1ind = i
s2ind = j
elif pi[i][j] > 0 and curr_length > 0:
matches.append([[i, s1ind], [j, s2ind], curr_length])
if pi[r][c] == 0 and curr_length == 1:
s1ind = r
s2ind = c
elif pi[r][c] > 0 and curr_length > 0:
matches.append([[r, s1ind], [c, s2ind], curr_length])
s1ind, s2ind, curr_length = None, None, 0
if curr_length:
matches.append([[s1ind, i], [s2ind, j], curr_length])
matches.append([[s1ind, r], [s2ind, c], curr_length])

return opt[l1][l2], result.encode(), matches

Expand Down
Loading