Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransactionCommand applied incorrectly #1625

Closed
checky opened this issue Feb 15, 2021 · 8 comments
Closed

TransactionCommand applied incorrectly #1625

checky opened this issue Feb 15, 2021 · 8 comments
Labels
type: bug A general bug
Milestone

Comments

@checky
Copy link

checky commented Feb 15, 2021

Bug Report

Current Behavior

During load testing, I found that some commands throw RedisCommandTimeoutException. When I checked the DEBUG log in io.lettuce.core.protocol, I found that the transaction was applied to an incorrect command.

Below is the log. The transactional commands of the function1 are processed as SubscriptionCommand. The first "WATCH" command of the function2 is processed as TransactionalCommand. Even though the "WATCH" command received response from redis server, it throws the RedisCommandTimeoutException.

2021-02-05 02:53:37,709 DEBUG [my-service-executor-2] DefaultEndpoint: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, epid=0x49] write() writeAndFlush command SubscriptionCommand [type=MULTI, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-02-05 02:53:37,709 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] write(ctx, SubscriptionCommand [type=MULTI, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
2021-02-05 02:53:37,709 DEBUG [lettuce-epollEventLoop-4-1] CommandEncoder: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379] writing command SubscriptionCommand [type=MULTI, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] Received: 5 bytes, 1 commands in the stack
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] Stack contains: 1 commands
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] DefaultEndpoint: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, epid=0x49] write() writeAndFlush command SubscriptionCommand [type=HSET, output=BooleanOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] write(ctx, SubscriptionCommand [type=HSET, output=BooleanOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] CommandEncoder: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379] writing command SubscriptionCommand [type=HSET, output=BooleanOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] DefaultEndpoint: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, epid=0x49] write() done
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] DefaultEndpoint: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, epid=0x49] write() writeAndFlush command SubscriptionCommand [type=EXPIRE, output=BooleanOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] write(ctx, SubscriptionCommand [type=EXPIRE, output=BooleanOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] CommandEncoder: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379] writing command SubscriptionCommand [type=EXPIRE, output=BooleanOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] DefaultEndpoint: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, epid=0x49] write() done
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] DefaultEndpoint: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, epid=0x49] write() writeAndFlush command SubscriptionCommand [type=EXPIRE, output=BooleanOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] write(ctx, SubscriptionCommand [type=EXPIRE, output=BooleanOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] CommandEncoder: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379] writing command SubscriptionCommand [type=EXPIRE, output=BooleanOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] DefaultEndpoint: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, epid=0x49] write() done
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] DefaultEndpoint: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, epid=0x49] write() writeAndFlush command SubscriptionCommand [type=EXEC, output=MultiOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] write(ctx, SubscriptionCommand [type=EXEC, output=MultiOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] CommandEncoder: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379] writing command SubscriptionCommand [type=EXEC, output=MultiOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-02-05 02:53:37,710 DEBUG [lettuce-epollEventLoop-4-1] DefaultEndpoint: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, epid=0x49] write() done
2021-02-05 02:53:37,710 DEBUG [my-service-executor-2] DefaultEndpoint: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, epid=0x49] write() done
2021-02-05 02:53:37,711 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] Received: 9 bytes, 4 commands in the stack
2021-02-05 02:53:37,711 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] Stack contains: 4 commands
2021-02-05 02:53:37,712 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] Received: 34 bytes, 3 commands in the stack
2021-02-05 02:53:37,712 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] Stack contains: 3 commands
2021-02-05 02:53:37,712 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] Stack contains: 2 commands
2021-02-05 02:53:37,712 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] Stack contains: 1 commands
2021-02-05 02:53:37,839 DEBUG [my-service-executor-2] DefaultEndpoint: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, epid=0x49] write() writeAndFlush command TransactionalCommand [type=WATCH, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.RedisPublisher$SubscriptionCommand]
2021-02-05 02:53:37,839 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] write(ctx, TransactionalCommand [type=WATCH, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.RedisPublisher$SubscriptionCommand], promise)
2021-02-05 02:53:37,839 DEBUG [lettuce-epollEventLoop-4-1] CommandEncoder: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379] writing command TransactionalCommand [type=WATCH, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.RedisPublisher$SubscriptionCommand]
2021-02-05 02:53:37,839 DEBUG [my-service-executor-2] DefaultEndpoint: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, epid=0x49] write() done
2021-02-05 02:53:37,840 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] Received: 5 bytes, 1 commands in the stack
2021-02-05 02:53:37,840 DEBUG [lettuce-epollEventLoop-4-1] CommandHandler: [channel=0xa0e0cdc5, /172.31.25.155:59746 -> 172.31.11.187/172.31.11.187:6379, chid=0x49] Stack contains: 1 commands

This issue occurs when the caller thread of the "MULTI" command runs delayed the StatefulRedisConnectionImpl.dispatch (RedisCommand <K, V, T> command)'s finally statement. You can see the "write() done" of the MULTI command logged after the "EXEC" command is done. (focus on the thread name of the log)

    @Override
    public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {

        RedisCommand<K, V, T> toSend = preProcessCommand(command);

        try {
            return super.dispatch(toSend);
        } finally {
            if (command.getType().name().equals(MULTI.name())) {
                multi = (multi == null ? new MultiOutput<>(codec) : multi);
            }
        }
    }

Input Code

function1

return reactiveCommands.multi()
              .flatMap(multiResult -> {
                reactiveCommands.hset(sessionInventoryRedisKey, String.valueOf(sessionKey), sessionInventoryItem.toJson()).subscribe();
                reactiveCommands.expire(sessionInventoryRedisKey, maxSessionExpireSeconds).subscribe();
                reactiveCommands.expire(sessionRedisKey, sessionExpireSeconds).subscribe();
                return reactiveCommands.exec();
              })
              .doOnSuccess(transactionResult -> {
                // do something..
              }

function2

return reactiveCommands.watch(sessionInventoryRedisKey)
              .flatMap(watchResult -> reactiveCommands.hgetall(sessionInventoryRedisKey))
              .doOnSuccess(result -> {
                // do something..
              }

Expected behavior/code

The commands that come after the "MULTI" command must be processed by TransactionalCommand regardless of the thread order.

Environment

  • Lettuce version(s): 5.3.6.RELEASE
  • Redis version: 5.0.4

Possible Solution

How about set multi field before run super.dispatch(sentCommands); in StatefulRedisConnectionImpl.dispatch (RedisCommand <K, V, T> command).

    @Override
    public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {

        RedisCommand<K, V, T> toSend = preProcessCommand(command);

        if (command.getType().name().equals(MULTI.name())) {
            multi = (multi == null ? new MultiOutput<>(codec) : multi);
        }

        return super.dispatch(toSend);
    }
@mp911de
Copy link
Collaborator

mp911de commented Mar 4, 2021

Are you using dedicated connections for each transaction or are you sharing a single connection across multiple concurrent Reactive Streams subscriptions?

@mp911de mp911de added the status: waiting-for-feedback We need additional information before we can continue label Mar 4, 2021
@checky
Copy link
Author

checky commented Mar 4, 2021

I'm using dedicated connections for each transaction.
Although omitted in the previous code, every APIs that use transactions have the following code structure.

return this.redisConnectionManager.getConnectionForTransaction(userId)
        .flatMap(pooledRedisConnection -> {
          RedisReactiveCommands<String, String> reactiveCommands = pooledRedisConnection.reactive();
          return reactiveCommands.watch(sessionInventoryRedisKey)
              .flatMap(watchResult -> reactiveCommands.hgetall(sessionInventoryRedisKey))
              // ... do something ...

This problem doesn't happen often. It mainly occurs in high cpu utilization > 80% while executing transactions more than 2000 tps.

@mp911de
Copy link
Collaborator

mp911de commented Mar 5, 2021

Interestingly, the multi state isn't activate after sending the MULTI command. Did you try locally moving the multi block to before calling super.dispatch? Each approach is error prone to different usages or error scenarios. Enabling multi before sending the command can leave the connection in multi state although the command was rejected.

@checky
Copy link
Author

checky commented Mar 5, 2021

Do you mean, did I have tried the "Possible Solution" that i suggested by myself? If the question is that, the answer is "no". Sorry. I am very beginner, so I haven't tried modifying myself yet.
I saw public Collection<RedisCommand<K, V, ?>> dispatch(Collection<? extends RedisCommand<K, V, ?>> commands) method. That method has set multi state before super.dispatch(sentCommands). So, I thought it can be possible to modify similarly. Below is that method.

    @Override
    public Collection<RedisCommand<K, V, ?>> dispatch(Collection<? extends RedisCommand<K, V, ?>> commands) {

        List<RedisCommand<K, V, ?>> sentCommands = new ArrayList<>(commands.size());

        commands.forEach(o -> {
            RedisCommand<K, V, ?> command = preProcessCommand(o);

            sentCommands.add(command);
            if (command.getType().name().equals(MULTI.name())) {
                multi = (multi == null ? new MultiOutput<>(codec) : multi);
            }
        });

        return super.dispatch(sentCommands);
    }

If the suggested solution has a problem with an error scenario, how about setting the multi state via attachOnComplete in the preProcessCommand method? If you set the multi state of the connection before propagated to my code's onComplete(), the race condition problem is gone.

@mp911de
Copy link
Collaborator

mp911de commented Mar 5, 2021

the issue with attachOnComplete is that you then no longer can make use of pipelining because onComplete happens when Redis responds which requires awaiting completion of MULTI. Right now, we enable output collection via multi without the need to await a Redis response. That allows triggering commands right away using e.g. the async API.

@checky
Copy link
Author

checky commented Mar 8, 2021

OK, I understood that using attachOnComplete is not possible solution.

Today I tried to modify lettuce-core souce code, and applied it to my application manually. When I move the multi block to before calling super.dispatch, the problem was not reproduced. The solution works well for me. Maybe this modification can make side effect. But even now, the multi block is in the finally statement. Even if errors occured in the super.dispatch, the multi block will be executed. So I thought the additional issue - leave the connection in multi state although the command was rejected - is already exists, and should be treated as separate issue.

@mp911de
Copy link
Collaborator

mp911de commented Mar 8, 2021

Thanks for the update. Do you want to submit a pull request?

@mp911de
Copy link
Collaborator

mp911de commented Mar 8, 2021

I filed #1650 as follow up.

checky pushed a commit to checky/lettuce-core that referenced this issue Mar 9, 2021
mp911de pushed a commit that referenced this issue Mar 11, 2021
mp911de pushed a commit that referenced this issue Mar 11, 2021
@mp911de mp911de added type: bug A general bug and removed status: waiting-for-feedback We need additional information before we can continue labels Mar 11, 2021
@mp911de mp911de added this to the 6.0.3 milestone Mar 11, 2021
@mp911de mp911de closed this as completed Mar 11, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants