Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xpending(K, Consumer, Range, Limit) fails with ERR syntax error using Limit.unlimited() #1302

Closed
nagaran1 opened this issue Jun 1, 2020 · 2 comments
Labels
type: bug A general bug
Milestone

Comments

@nagaran1
Copy link

nagaran1 commented Jun 1, 2020

I am creating an application which operates on Redis streams using the commands Add, ReadGroup, Del, Ack, Pending and Claim using the dependency spring-boot-starter-data-redis-reactive and i have configured a bean for ReactiveRedisConnectionFactory as below

@Bean
    @Primary
    public ReactiveRedisConnectionFactory connectionFactory(RedisStandaloneConfiguration redisStandaloneConfiguration) {

        LettuceClientConfiguration configuration = LettuceClientConfiguration.builder()
                .clientOptions(clientOptions()).build();
        return new LettuceConnectionFactory(redisStandaloneConfiguration, configuration);
    }
@Bean
    @Primary
    ReactiveRedisOperations<String, String> redisOperations(ReactiveRedisConnectionFactory factory) {
        RedisSerializationContext.RedisSerializationContextBuilder<String, String> builder =
                RedisSerializationContext.newSerializationContext(new StringRedisSerializer());
        RedisSerializationContext<String, String> context = builder.build();
        return new ReactiveRedisTemplate<String, String>(factory, context);
    }

Then using the opsForStream, I am able to add/delete/ack/read/readgroup the message

this.reactiveRedisOperations
                .opsForStream()
                .add(streamName, Collections.singletonMap("body", message))
                .subscribe();

But, the below snippet throws ERR syntax errror. where i am expecting PendingMessages, so that i can reclaim it.

this.reactiveRedisOperations
                            .opsForStream()
                            .pending("mystream", Consumer.from("mygroup", "myservice"))
                            .subscribe(pendingMessages -> System.out.println(pendingMessages.size()));

Exception Trace:

2020-06-01 19:59:01.773  WARN 38696 --- [ioEventLoop-7-1] io.lettuce.core.protocol.CommandHandler  : null Unexpected exception during request: reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: ERR syntax error

reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: ERR syntax error
Caused by: org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: ERR syntax error
	at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:54) ~[spring-data-redis-2.3.0.RELEASE.jar:2.3.0.RELEASE]
	at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.3.0.RELEASE.jar:2.3.0.RELEASE]
	at org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection.lambda$translateException$0(LettuceReactiveRedisConnection.java:293) ~[spring-data-redis-2.3.0.RELEASE.jar:2.3.0.RELEASE]
	at reactor.core.publisher.Flux.lambda$onErrorMap$28(Flux.java:6504) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:88) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:247) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onError(MonoCollectList.java:106) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onError(RedisPublisher.java:890) ~[lettuce-core-5.3.0.RELEASE.jar:5.3.0.RELEASE]
	at io.lettuce.core.RedisPublisher$State.onError(RedisPublisher.java:687) ~[lettuce-core-5.3.0.RELEASE.jar:5.3.0.RELEASE]
	at io.lettuce.core.RedisPublisher$RedisSubscription.onError(RedisPublisher.java:344) ~[lettuce-core-5.3.0.RELEASE.jar:5.3.0.RELEASE]
	at io.lettuce.core.RedisPublisher$SubscriptionCommand.onError(RedisPublisher.java:800) ~[lettuce-core-5.3.0.RELEASE.jar:5.3.0.RELEASE]
	at io.lettuce.core.RedisPublisher$SubscriptionCommand.complete(RedisPublisher.java:736) ~[lettuce-core-5.3.0.RELEASE.jar:5.3.0.RELEASE]
	at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654) [lettuce-core-5.3.0.RELEASE.jar:5.3.0.RELEASE]
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614) [lettuce-core-5.3.0.RELEASE.jar:5.3.0.RELEASE]
	at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565) [lettuce-core-5.3.0.RELEASE.jar:5.3.0.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-common-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.49.Final.jar:4.1.49.Final]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_251]
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR syntax error
	at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135) ~[lettuce-core-5.3.0.RELEASE.jar:5.3.0.RELEASE]
	at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:108) ~[lettuce-core-5.3.0.RELEASE.jar:5.3.0.RELEASE]
	... 20 common frames omitted

Environment

  • Lettuce version(s): 5.3.0.RELEASE
  • Redis version: 6.0.2 and 5.0.6

Additional context

The below XPending commands works fine which produces PendingMessageSummary.

this.reactiveRedisOperations
                .opsForStream()
                .pending("mystream", "mygroup")
                .subscribe((pendingMessagesSummary -> {
                    long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages();
                 }));
@nagaran1 nagaran1 added the type: bug A general bug label Jun 1, 2020
@nagaran1 nagaran1 changed the title Pending Exception in ReactiveRedisOperations.opsForStream().pending() Jun 1, 2020
@nagaran1 nagaran1 changed the title Exception in ReactiveRedisOperations.opsForStream().pending() io.lettuce.core.RedisCommandExecutionException: ERR syntax error in ReactiveRedisOperations.opsForStream().pending() Jun 1, 2020
@nagaran1
Copy link
Author

nagaran1 commented Jun 2, 2020

This issue happens in both 5.0.x and 6.0.x versions of Redis

@mp911de
Copy link
Collaborator

mp911de commented Jun 8, 2020

Thanks for reporting the issue. There's a bit of a conflict here. Redis changes the output format based on the input arguments. The xpending(K, Consumer, Range, Limit) method can be called with unbounded Range and unlimited Limit. While there's a substitution for unbounded range (+/-), there's no substitution for an unlimited option. Without specifying a limit, there's no way how to obtain the "range" representation.

Callers of this method expect a stable output and not one that changes across invocations. Therefore, we need to add an artificial limit and so we need to pick Long.MAX_VALUE.

@mp911de mp911de changed the title io.lettuce.core.RedisCommandExecutionException: ERR syntax error in ReactiveRedisOperations.opsForStream().pending() xpending(K, Consumer, Range, Limit) fails with ERR syntax error using Limit.unlimited() Jun 8, 2020
mp911de added a commit that referenced this issue Jun 8, 2020
xpending with Range and Limit aims for a range format of the XPENDING command output. Previously, calling this method without Limit appended just the consumer name which is not a valid combination for that command. We now apply a synthetic limit of Long.MAX_VALUE to obtain the range format to avoid format changes or syntax errors.
mp911de added a commit that referenced this issue Jun 8, 2020
xpending with Range and Limit aims for a range format of the XPENDING command output. Previously, calling this method without Limit appended just the consumer name which is not a valid combination for that command. We now apply a synthetic limit of Long.MAX_VALUE to obtain the range format to avoid format changes or syntax errors.
@mp911de mp911de added this to the 5.3.1 milestone Jun 8, 2020
@mp911de mp911de closed this as completed Jun 8, 2020
mp911de added a commit that referenced this issue Jun 19, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants