Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition in RedisPublisher DEMAND.request() and DEMAND.onDataAvailable() #634

Closed
mayamoon opened this issue Oct 24, 2017 · 16 comments
Closed
Labels
type: bug A general bug
Milestone

Comments

@mayamoon
Copy link

mayamoon commented Oct 24, 2017

I Write very simple code to test lettuce 5.0 redis cluster client ,

public static void main(String[] args) throws InterruptedException {
        ClientResources res = DefaultClientResources.builder()
                .ioThreadPoolSize(4)
                .computationThreadPoolSize(4)
                .build();
        RedisClusterClient client = RedisClusterClient.create(res,Arrays.asList(RedisURI.create("redis://172.18.211.189:8001"), RedisURI.create("redis://172.18.211.189:8002"), RedisURI.create("redis://172.18.211.189:8003")));
        StatefulRedisClusterConnection<String, String> connect = client.connect();
        while (true) {
            Flux<String> keys = connect.reactive().keys("*");
            keys
                    .count()
                    .log().doOnCancel(()->{
                System.out.println("cancel");
                System.out.println(Thread.currentThread().getId());
            })
                    .subscribe((Long s)->{
                        System.out.println(s);
                        System.out.println(Thread.currentThread().getId());
                    });
            keys.blockLast();
        }

it print out three times,then block and never get any info. i am not very good at java,pls help me fix this problem

=========Log data==============================================
十月 24, 2017 4:15:01 下午 reactor.util.Loggers$JdkLogger info
信息: | onSubscribe(reactor.core.publisher.MonoCount$CountSubscriber@20c0a64d)
十月 24, 2017 4:15:01 下午 reactor.util.Loggers$JdkLogger info
信息: | request(unbounded)
cancel
15
5230
15
十月 24, 2017 4:15:01 下午 reactor.util.Loggers$JdkLogger info
信息: | onNext(5230)
十月 24, 2017 4:15:01 下午 reactor.util.Loggers$JdkLogger info
信息: | cancel()
十月 24, 2017 4:15:01 下午 reactor.util.Loggers$JdkLogger info
信息: | onSubscribe(reactor.core.publisher.MonoCount$CountSubscriber@4b213651)
十月 24, 2017 4:15:01 下午 reactor.util.Loggers$JdkLogger info
信息: | request(unbounded)
十月 24, 2017 4:15:01 下午 reactor.util.Loggers$JdkLogger info
信息: | onNext(5230)
十月 24, 2017 4:15:01 下午 reactor.util.Loggers$JdkLogger info
信息: | cancel()
cancel
15
5230
15
十月 24, 2017 4:15:01 下午 reactor.util.Loggers$JdkLogger info
信息: | onSubscribe(reactor.core.publisher.MonoCount$CountSubscriber@4241e0f4)
十月 24, 2017 4:15:01 下午 reactor.util.Loggers$JdkLogger info
信息: | request(unbounded)
十月 24, 2017 4:15:01 下午 reactor.util.Loggers$JdkLogger info
信息: | onNext(5230)
十月 24, 2017 4:15:01 下午 reactor.util.Loggers$JdkLogger info
信息: | cancel()
cancel
17
5230
17

@mp911de
Copy link
Collaborator

mp911de commented Oct 24, 2017

Please learn how to properly format code and logs.

The attached Gist is no longer accessible.

@mp911de mp911de added the status: waiting-for-feedback We need additional information before we can continue label Oct 24, 2017
@mayamoon
Copy link
Author

mayamoon commented Oct 24, 2017

@mp911de ,thank you for reply me,i change my code like following,

public class RedisApplication {
    public static void main(String[] args) throws InterruptedException {
        ClientResources res = DefaultClientResources.builder()
                .ioThreadPoolSize(4)
                .computationThreadPoolSize(4)
                .build();
        RedisClusterClient client = RedisClusterClient.create(res,Arrays.asList(RedisURI.create("redis://172.18.211.189:8001"), RedisURI.create("redis://172.18.211.189:8002"), RedisURI.create("redis://172.18.211.189:8003")));
        StatefulRedisClusterConnection<String, String> connect = client.connect();
        while (true) {
            RedisAdvancedClusterReactiveCommands<String, String> reactive = connect.reactive();
            Flux<String> keys = reactive.keys("urn:XBS:Hotel:CompanyBasicInfo*");
           Long count =  keys
                    .count()
                   .blockMillis(50000);
            System.out.println(count);
        }
    }
}

Then the application blocked and never get the value back.
after time out i got a exception

Exception in thread "main" java.lang.IllegalStateException: Timeout on blocking read
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:107)
	at reactor.core.publisher.Mono.blockMillis(Mono.java:1172)
	at RedisApplication.main(RedisApplication.java:44)

i debug the application ,get the dump info :

"Attach Listener@2138" daemon prio=5 tid=0x5 nid=NA runnable
  java.lang.Thread.State: RUNNABLE

"Finalizer@2140" daemon prio=8 tid=0x3 nid=NA waiting
  java.lang.Thread.State: WAITING
	  at java.lang.Object.wait(Object.java:-1)
	  at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
	  at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
	  at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"lettuce-nioEventLoop-4-1@1779" daemon prio=5 tid=0xf nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x867> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x868> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x869> (a io.netty.channel.nio.SelectedSelectionKeySet)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:759)
	  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:400)
	  at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
	  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	  at java.lang.Thread.run(Thread.java:748)

"lettuce-nioEventLoop-4-2@1933" daemon prio=5 tid=0x11 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x864> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x865> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x866> (a io.netty.channel.nio.SelectedSelectionKeySet)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:759)
	  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:400)
	  at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
	  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	  at java.lang.Thread.run(Thread.java:748)

"lettuce-nioEventLoop-4-3@1935" daemon prio=5 tid=0x12 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x861> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x862> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x863> (a io.netty.channel.nio.SelectedSelectionKeySet)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:759)
	  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:400)
	  at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
	  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	  at java.lang.Thread.run(Thread.java:748)

"lettuce-nioEventLoop-4-4@2029" daemon prio=5 tid=0x13 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(WindowsSelectorImpl.java:-1)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
	  at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
	  at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	  - locked <0x85e> (a sun.nio.ch.WindowsSelectorImpl)
	  - locked <0x85f> (a java.util.Collections$UnmodifiableSet)
	  - locked <0x860> (a io.netty.channel.nio.SelectedSelectionKeySet)
	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	  at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:759)
	  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:400)
	  at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
	  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	  at java.lang.Thread.run(Thread.java:748)

"main@1" prio=5 tid=0x1 nid=NA waiting
  java.lang.Thread.State: WAITING
	  at sun.misc.Unsafe.park(Unsafe.java:-1)
	  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	  at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
	  at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
	  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
	  at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:105)
	  at reactor.core.publisher.Mono.blockMillis(Mono.java:1172)
	  at RedisApplication.main(RedisApplication.java:44)

"Reference Handler@2141" daemon prio=10 tid=0x2 nid=NA waiting
  java.lang.Thread.State: WAITING
	  at java.lang.Object.wait(Object.java:-1)
	  at java.lang.Object.wait(Object.java:502)
	  at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
	  at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"Signal Dispatcher@2139" daemon prio=9 tid=0x4 nid=NA runnable
  java.lang.Thread.State: RUNNABLE

"threadDeathWatcher-5-1@1825" daemon prio=1 tid=0x10 nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
	  at java.lang.Thread.sleep(Thread.java:-1)
	  at io.netty.util.ThreadDeathWatcher$Watcher.run(ThreadDeathWatcher.java:150)
	  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	  at java.lang.Thread.run(Thread.java:748)

@mp911de
Copy link
Collaborator

mp911de commented Oct 24, 2017

Several points come to my attention:

  • Reactive API usage isn't really the right thing to use if you're not familiar with Java. Rather use the synchronous API to get familiar how things work. Reactive APIs come with a higher level of complexity.
  • Mono.blockMillis(…) was removed in Project Reactor 3.1 and indicates you're using an outdated version of Reactor. Please make sure to update to Reactor 3.1.0.RELEASE or 3.1.1.RELEASE.
  • Your previous logs shows cancels without any further details. It would make sense to set a breakpoint inside the .doOnCance(…) lambda and capture the stack trace to see why the subscription gets canceled.
  • In your first code sample, you're subscribing to the reactive publisher twice: Once with .subscribe(…) and once with .blockLast(). The two subscriptions run commands individually and both invocations are not related to each other, i.e. .blockLast() does not affect/synchronize the previous subscription.

@mayamoon
Copy link
Author

mayamoon commented Oct 25, 2017

@mp911de
I update my pom.xml Dependencies

<dependencies>
    <dependency>
        <groupId>io.lettuce</groupId>
        <artifactId>lettuce-core</artifactId>
        <version>5.0.0.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.1.1.RELEASE</version>
    </dependency>
</dependencies>

and add log to the reactor inside

  ClientResources res = DefaultClientResources.builder()
                .ioThreadPoolSize(4)
                .computationThreadPoolSize(4)
                .build();
        RedisClusterClient client = RedisClusterClient.create(res,Arrays.asList(RedisURI.create("redis://172.18.21.189:8001"), RedisURI.create("redis://172.18.21.189:8002"), RedisURI.create("redis://172.18.21.189:8003")));
        StatefulRedisClusterConnection<String, String> connect = client.connect();
        while (true) {
            RedisAdvancedClusterReactiveCommands<String, String> reactive = connect.reactive();
            Flux<String> keys = reactive.keys("urn:PBS:Hotel:CtripBasicInfo*");
           Long count =  keys
                    .count()
                   .log()
                   .block();
            System.out.println(count);
        }

and get the fllowing log,

[DEBUG] (main) Using Console logging
[DEBUG] (main) Starting UnsafeSupport init in Java 1.8
[TRACE] (main) sun.misc.Unsafe.theUnsafe ok
[TRACE] (main) sun.misc.Unsafe.copyMemory ok
[TRACE] (main) java.nio.Buffer.address ok
[DEBUG] (main) Unsafe is available
[ INFO] (main) | onSubscribe([Fuseable] MonoCount.CountSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (lettuce-nioEventLoop-4-1) | onNext(2786)
2786
[ INFO] (lettuce-nioEventLoop-4-1) | onComplete()
[ INFO] (main) | onSubscribe([Fuseable] MonoCount.CountSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (lettuce-nioEventLoop-4-1) | onNext(2786)
[ INFO] (lettuce-nioEventLoop-4-1) | onComplete()
2786
[ INFO] (main) | onSubscribe([Fuseable] MonoCount.CountSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (lettuce-nioEventLoop-4-2) | onNext(2786)
[ INFO] (lettuce-nioEventLoop-4-2) | onComplete()
2786
[ INFO] (main) | onSubscribe([Fuseable] MonoCount.CountSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (lettuce-nioEventLoop-4-4) | onNext(2786)
[ INFO] (lettuce-nioEventLoop-4-4) | onComplete()
2786
[ INFO] (main) | onSubscribe([Fuseable] MonoCount.CountSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (lettuce-nioEventLoop-4-1) | onNext(2786)
[ INFO] (lettuce-nioEventLoop-4-1) | onComplete()
2786
[ INFO] (main) | onSubscribe([Fuseable] MonoCount.CountSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (lettuce-nioEventLoop-4-2) | onNext(2786)
[ INFO] (lettuce-nioEventLoop-4-2) | onComplete()
2786
[ INFO] (main) | onSubscribe([Fuseable] MonoCount.CountSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (lettuce-nioEventLoop-4-2) | onNext(2786)
[ INFO] (lettuce-nioEventLoop-4-2) | onComplete()
2786
[ INFO] (main) | onSubscribe([Fuseable] MonoCount.CountSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (lettuce-nioEventLoop-4-1) | onNext(2786)
2786
[ INFO] (lettuce-nioEventLoop-4-1) | onComplete()
[ INFO] (main) | onSubscribe([Fuseable] MonoCount.CountSubscriber)
[ INFO] (main) | request(unbounded)

after the 8th print 2786,the application hand up.

@mayamoon
Copy link
Author

i config the logger to print detail info :
十月 25, 2017 11:23:30 上午 io.lettuce.core.RedisPublisher$RedisSubscription onDataAvailable
非常详细: DEMAND onDataAvailable()
十月 25, 2017 11:23:30 上午 io.lettuce.core.RedisPublisher$RedisSubscription onDataAvailable
非常详细: DEMAND onDataAvailable()
十月 25, 2017 11:23:30 上午 io.lettuce.core.RedisPublisher$RedisSubscription onDataAvailable
非常详细: DEMAND onDataAvailable()
十月 25, 2017 11:23:30 上午 io.lettuce.core.RedisPublisher$RedisSubscription onDataAvailable
非常详细: DEMAND onDataAvailable()
十月 25, 2017 11:23:30 上午 io.lettuce.core.RedisPublisher$RedisSubscription onDataAvailable
非常详细: DEMAND onDataAvailable()
十月 25, 2017 11:23:30 上午 io.lettuce.core.RedisPublisher$RedisSubscription onDataAvailable
非常详细: DEMAND onDataAvailable()
十月 25, 2017 11:23:30 上午 io.lettuce.core.protocol.RedisStateMachine decode
详细: Decoded SubscriptionCommand [type=KEYS, output=KeyListOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command], empty stack: true
十月 25, 2017 11:23:30 上午 io.lettuce.core.RedisPublisher$RedisSubscription onAllDataRead
非常详细: DEMAND onAllDataRead()'''
'''

@mp911de mp911de added type: bug A general bug and removed status: waiting-for-feedback We need additional information before we can continue labels Oct 26, 2017
@mp911de
Copy link
Collaborator

mp911de commented Oct 26, 2017

Thanks a lot. With enough keys in Redis I'm able to reproduce the issue.

@mp911de mp911de added this to the Lettuce 5.0.1 milestone Oct 26, 2017
@mp911de mp911de changed the title Reactive Keys Command DeadLock Race condition in RedisPublisher DEMAND.request() and DEMAND.onDataAvailable() Oct 26, 2017
@mp911de
Copy link
Collaborator

mp911de commented Oct 26, 2017

I found the cause of the issue: It's a race condition between DEMAND.onDataAvailable(…) and DEMAND.request(…).

It's possible that two concurrent threads, one processing onDataAvailable() (a), the other issuing a request(n) (b) run into a race:

  • (b) invokes request(…) in the DEMAND state.
  • (a) sees there is no further demand and sets the state to NO_DEMAND
  • (b) performs the actual increment of demand
  • (a) completes without further emission
  • (b) starves, no further emission

The fix is to introduce checks in both places:

  • after registering demand in the DEMAND state
  • after processing onDataAvailable() in the demand state

mp911de added a commit that referenced this issue Oct 26, 2017
This commit fixes a race condition between two concurrent threads that are calling DEMAND.request(…) and DEMAND.onDataAvailable(…) resulting in registered demand but no further emission. The publishing part now checks whether new demand was registered after the state transition and the requesting thread checks for a state transition after registering demand. If one of the newly introduced conditions yields true, the outstanding signals are processed.

There are three key components to create the race:

* the command is completed
* the requesting thread does not pull because of DEMAND state
* the emitting thread completes prematurely

The race is possible because the requesting thread calls request(…) in the DEMAND state after the publishing thread determined there was no further demand. The publishing did not check whether demand was registered after its state transition to NO_DEMAND, the requesting code did not check whether a state transition occurred meanwhile so the publisher never completed.
mp911de added a commit that referenced this issue Oct 26, 2017
This commit fixes a race condition between two concurrent threads that are calling DEMAND.request(…) and DEMAND.onDataAvailable(…) resulting in registered demand but no further emission. The publishing part now checks whether new demand was registered after the state transition and the requesting thread checks for a state transition after registering demand. If one of the newly introduced conditions yields true, the outstanding signals are processed.

There are three key components to create the race:

* the command is completed
* the requesting thread does not pull because of DEMAND state
* the emitting thread completes prematurely

The race is possible because the requesting thread calls request(…) in the DEMAND state after the publishing thread determined there was no further demand. The publishing did not check whether demand was registered after its state transition to NO_DEMAND, the requesting code did not check whether a state transition occurred meanwhile so the publisher never completed.
@mp911de
Copy link
Collaborator

mp911de commented Oct 26, 2017

Looks like the change fixed the issue only partially...

@mp911de
Copy link
Collaborator

mp911de commented Oct 26, 2017

I fixed the issue in 5.0.1.BUILD-SNAPSHOT. Care to give it a try?

<dependency>
  <groupId>io.lettuce</groupId>
  <artifactId>lettuce-core</artifactId>
  <version>5.0.1.BUILD-SNAPSHOT</version>
</dependency>

<repositories>
  <repository>
    <id>sonatype-nexus-snapshots</id>
    <url>http://oss.sonatype.org/content/repositories/snapshots</url>
    <snapshots>
      <enabled>true</enabled>
    </snapshots>
  </repository>
</repositories>

mp911de added a commit that referenced this issue Oct 26, 2017
Fix completion where concurrently another thread signals demand setting the state from NO_DEMAND to DEMAND and a concurrent completion occurs. Because completion checks against the current state to perform state change only once, it checks with the old state hence completion never succeeds in that case.

Completion now executes in any state as long as the queue is empty.
mp911de added a commit that referenced this issue Oct 26, 2017
Replace AtomicReference with AtomicReferenceFieldUpdater.
mp911de added a commit that referenced this issue Oct 26, 2017
Fix completion where concurrently another thread signals demand setting the state from NO_DEMAND to DEMAND and a concurrent completion occurs. Because completion checks against the current state to perform state change only once, it checks with the old state hence completion never succeeds in that case.

Completion now executes in any state as long as the queue is empty.
mp911de added a commit that referenced this issue Oct 26, 2017
Replace AtomicReference with AtomicReferenceFieldUpdater.
mp911de added a commit that referenced this issue Oct 26, 2017
RedisPublisher now considers an empty subscription buffer as demand to continue reading from the transport. Reading will prefetch data and ensure the publisher has enough data buffered to satisfy demand from the buffer. Reading from the transport also ensures that command completion is read as the completion does not count into the actual demand.
mp911de added a commit that referenced this issue Oct 26, 2017
RedisPublisher now considers an empty subscription buffer as demand to continue reading from the transport. Reading will prefetch data and ensure the publisher has enough data buffered to satisfy demand from the buffer. Reading from the transport also ensures that command completion is read as the completion does not count into the actual demand.
@mayamoon
Copy link
Author

mayamoon commented Oct 27, 2017

I was already tried 5.0.1.BUILD-SNAPSHOT;unfortunately,the issue still occurs when i commited my show detail logger config, it look like the logging operation can remit the situaction. my OS version is Window Enterprise Server 2008,JDK 9.0

@mp911de
Copy link
Collaborator

mp911de commented Oct 27, 2017

Enabling logging changes timing. The fixed issues were caused due to concurrency. Do you know the snapshot version (e.g. lettuce-core-5.0.1.BUILD-20171026.194537-22.jar) that you've used?

The issue was caused by multiple, individual issues. I fixed some of these after my previous
comment.

@mayamoon
Copy link
Author

@mp911de my snapshot version is lettuce-core-5.0.1.BUILD-20171026.194537-22.jar, is the same as you supplied.

@mp911de
Copy link
Collaborator

mp911de commented Oct 30, 2017

I tried to reproduce the issue after the changes but I'm not able to reproduce the issue anymore with 5.0.1.

@mayamoon
Copy link
Author

mayamoon commented Oct 31, 2017

@mp911de
Do you have enough data ?
i have a redis cluster with three nodes,contain 166826 Keys,
the issue is very easy to occur.
please also try my sample code:

public class RedisApplication {
    public static void main(String[] args) throws Exception {
//System.setProperty("java.util.logging.config.file","C:\\Sources\\Java\\LettuceRedisTest\\src\\main\\resources\\logging.properties");

        ClientResources res = DefaultClientResources.builder()
                .ioThreadPoolSize(10)
                .computationThreadPoolSize(20)
                .build();
        final RedisClusterClient client = RedisClusterClient.create(res,Arrays.asList(RedisURI.create("redis://***.**.**.***:8001"), RedisURI.create("redis://***.**.**.***:8002"), RedisURI.create("redis://***.**.**.***:8003")));
        StatefulRedisClusterConnection<String, String> connect = client.connect();

        while (true) {

            RedisAdvancedClusterReactiveCommands<String, String> reactive = connect.reactive();
            Flux<String> keys = reactive.keys("*");
           Long count =  keys
                    .count()
                   .log()
                   .block();

            System.out.println("get data:"+count);

          //  System.in.read();
        }
    }
}

in my sample ,the final output is:
[ INFO] (lettuce-nioEventLoop-4-9) | onComplete()
[ INFO] (main) | onSubscribe([Fuseable] MonoCount.CountSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (lettuce-nioEventLoop-4-8) | onNext(166827)
[ INFO] (lettuce-nioEventLoop-4-8) | onComplete()
get data:166827
[ INFO] (main) | onSubscribe([Fuseable] MonoCount.CountSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (lettuce-nioEventLoop-4-10) | onNext(166827)
get data:166827
[ INFO] (lettuce-nioEventLoop-4-10) | onComplete()
[ INFO] (main) | onSubscribe([Fuseable] MonoCount.CountSubscriber)
[ INFO] (main) | request(unbounded)

mp911de added a commit that referenced this issue Oct 31, 2017
Lettuce now sends a netty event if auto-read is disabled to stay with config changes within the channel thread. Auto-read state is always set based on whether the demand-aware sink has demand.
mp911de added a commit that referenced this issue Oct 31, 2017
Lettuce now sends a netty event if auto-read is disabled to stay with config changes within the channel thread. Auto-read state is always set based on whether the demand-aware sink has demand.
@mp911de
Copy link
Collaborator

mp911de commented Oct 31, 2017

Thanks for the hint with more data. I created a bigger cluster with more data and was able to reproduce and fix the issue. A new build is available via lettuce-core-5.0.1.BUILD-20171031.102600-26-sources.jar.

@mp911de
Copy link
Collaborator

mp911de commented Nov 16, 2017

Closing as this issue is solved and no longer reproducible.

@mp911de mp911de closed this as completed Nov 16, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants