Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support asynchronous calls to redis #241

Closed
xetorthio opened this issue Dec 20, 2011 · 38 comments
Closed

support asynchronous calls to redis #241

xetorthio opened this issue Dec 20, 2011 · 38 comments

Comments

@xetorthio
Copy link
Contributor

No description provided.

@DivineTraube
Copy link

Is it likely to be available soon? I'm on the verge of implementing it myself as I need asynchronous calls for performance reasons.

@xetorthio
Copy link
Contributor Author

Hopefully it will be available very soon.
How fast do you need it?

On Tue, Aug 21, 2012 at 9:34 AM, DivineTraube [email protected]:

Is it likely to be available soon? I'm on the verge of implementing it
myself as I need asynchronous calls for performance reasons.


Reply to this email directly or view it on GitHubhttps://github.com//issues/241#issuecomment-7899502.

@DivineTraube
Copy link

That sounds good. I could wait for up to four weeks. Is it likely to be finished in that time frame?

@xetorthio
Copy link
Contributor Author

I think we should have something releasable next week. If you can test it
and give your feedback, that would be awesome!

On Tue, Aug 21, 2012 at 11:15 AM, DivineTraube [email protected]:

That sounds good. I could wait for up to four weeks. Is it likely to be
finished in that time frame?


Reply to this email directly or view it on GitHubhttps://github.com//issues/241#issuecomment-7902220.

@DivineTraube
Copy link

I will be happy to do that.

@DivineTraube
Copy link

Is it testable by now?

@samhendley
Copy link
Contributor

Hey DivineTraube, I did a simple async implementation, does that do what you are looking for?

https://github.com/samhendley/jedis/commits/version_tests

@subnetmarco
Copy link

Any updates?

@paulicka
Copy link

paulicka commented Apr 9, 2014

Anything new and exciting here with async jedis?
Please, please, please!
;-)

@sscarduzio
Copy link

👍

@sdeleuze
Copy link

With the recent release of Java 8,n being able to have async operations with Redis would be awesome. Any updates ?

@gdubicki
Copy link

Any progress on async jedis?

@HeartSaVioR
Copy link
Contributor

Hello all!

I have requested for switching Jedis IO to NIO. #647
When pull request is merged, we may really think about non-blocking operations using Java's NIO.
(There would be many hurdles to switch blocking to non-blocking, but we can do it eventually.)

Btw, I'm beginner of NIO and async, I wish to know your client code example for make decision of way of supporting async.
(making Future object, or calling callback function, etc.)
Note that Jedis should run from at least JDK 1.6.
Client code would be fine to use upper than JDK 1.6.

Thanks in advance!

@sdeleuze
Copy link

That's good news thanks !

For client code, you should have a look to https://github.com/sdeleuze/opensnap/tree/master/opensnap-server/src/main/java/opensnap
It currently uses MongoDB async java driver 3.0.

Since Jedis should run from at least JDK 1.6, providing a custom completable future is the best option I think. Please have a look to https://github.com/mongodb/mongo-java-driver/blob/3.0.x/driver/src/main/org/mongodb/MongoFuture.java and https://github.com/mongodb/mongo-java-driver/blob/3.0.x/driver/src/main/org/mongodb/connection/SingleResultCallback.java

Please note they use clearly separated classes (and even module) in order to avoid polluting synchronous one. java.util.concurrent.Future is not interesting to support since it only allow async blocking implementations.

On the API side, the main point is to use a callback with an interface with only 1 method in order to make it possible to use lambda for people using Java 8.

I will be happy to update my OpenSnap example with any alpha level release of async Jedis in order to send you feedbacks.

@xetorthio
Copy link
Contributor Author

Those are great tips! Thanks a lot! And yes, any feedback is super
important!

On Sun, May 25, 2014 at 7:52 PM, Sébastien Deleuze <[email protected]

wrote:

That's good news thanks !

For client code, you should have a look to
https://github.com/sdeleuze/opensnap/tree/master/opensnap-server/src/main/java/opensnap
It currently uses MongoDB async java driver 3.0.

Since Jedis should run from at least JDK 1.6, providing a custom
completable future is the best option I think. Please have a look to
https://github.com/mongodb/mongo-java-driver/blob/3.0.x/driver/src/main/org/mongodb/MongoFuture.javaand
https://github.com/mongodb/mongo-java-driver/blob/3.0.x/driver/src/main/org/mongodb/connection/SingleResultCallback.java

Please note they use clearly separated classes (and even module) in order
to avoid polluting synchronous one. java.util.concurrent.Future is not
interesting to support since it only allow async blocking implementations.

On the API side, the main point is to use a callback with an interface
with only 1 method in order to make it possible to use lambda for people
using Java 8.

I will be happy to update my OpenSnap example with any alpha level release
of async Jedis in order to send you feedbacks.


Reply to this email directly or view it on GitHubhttps://github.com//issues/241#issuecomment-44149205
.

@HeartSaVioR
Copy link
Contributor

Finally Jedis would have Async Jedis APIs by pulling #713 to master!
Please review and comment to #713 so we can improve #713 better.
Thanks for your patient!

@nurnabisiddique
Copy link

is it available?

@nurnabisiddique
Copy link

now jedis support async pubsub? any help?

@HeartSaVioR
Copy link
Contributor

@Nurnabi It isn't available and it doesn't support pubsub yet.
We're planning to introduce async feature to 3.0.0.

@Librael
Copy link

Librael commented Feb 12, 2015

Do you have any planing date/month for 3.0.0 ?
Release 3.0.0 will support async feature for sure or is it not completely decided yet?
If yes - it will be a Future result or Callback registration or both?
Thank you!

@nykolaslima
Copy link
Contributor

We will support async feature for sure.
We're still studying future vs callback.

I believe we still don't have a schedule date, we want to do it as fast as
possible.
On Thu, Feb 12, 2015 at 12:11 PM Librael [email protected] wrote:

Do you have any planing date/month for 3.0.0 ?
Release 3.0.0 will support async feature for sure or is it not completely
decided yet?
If yes - it will be a Future result or Callback registration or both?
Thank you!


Reply to this email directly or view it on GitHub
#241 (comment).

@sdeleuze
Copy link

Rather than just Async, a Stream based API like MongoDB or CouchBase are developing would be even more powerful. See http://blog.couchbase.com/why-couchbase-chose-rxjava-new-java-sdk for more details ...

@HeartSaVioR
Copy link
Contributor

@sdeleuze
Hello!
Actually I don't have any experiences on stream for Java.
(I know map / filter / reduce cause I'm Python programmer, and studied Scala a bit. ;) )
Since Jedis 3.0.0 should support JDK 7, it would be some kind of pain to implement custom Future (like CompletableFuture) and support it.

Could you give a hand to show some implementations? Thanks in advance!

@HeartSaVioR
Copy link
Contributor

@sdeleuze
I've just take a look at RxJava and it's amazing!
I can see the benefit of supporting 'stream', but since Redis has various operations and its return types are different, so I'm wondering how can I create stream based on Redis response.
One more question, when there're some ERRs inside of stream, we can't use traditional approach - throwing JedisDataException because it would stop whole stream.
Is it better to have a Option class which can have output or Exception?

@sdeleuze
Copy link

@HeartSaVioR Glad you like it. In term of API, some possible use cases for streams (Observable) are:

  • Return Observable<?> where you return List<?> in your current API.
  • Use it for Publish/Subscribe

You can see an example of converting Netty data pipeline to RxJava here (using what RxJava call a Subject which is also an Observable.

@sdeleuze
Copy link

Exposing just a Reactive Streams based API, like Mongo is currently doing, could also make sense.

@ivansanchezvera
Copy link

@HeartSaVioR is looking at an interesting direction, RxJava is great, couchbase client is using it and performance is great.
Is easy to use but I thing its kinda tricky to implement.
RxJava is really reactive and saves you the hustle to deal with futures yourself.

@HeartSaVioR
Copy link
Contributor

@ivansanchezvera
Yes. Actually I feel very tricky to implement with RxJava because most of commands returns single value, and there're many kinds of return types.
But I also agree that callback could be hell, and Java Future (under Java 8) is not fully async, so I'll give it a try.

@msavy
Copy link

msavy commented Mar 6, 2015

I broadly agree with @sdeleuze that Futures are, unfortunately, largely useless in applications where non-blocking is required as well as asynchronicity. For instance, on a platform such as Vert.x, one cannot block the thread under any circumstances, and polling is drastically suboptimal, so callbacks (with the library itself using async techniques to avoid blocking) is required.

Of course the Vert.x guys made their own implementation, so you could use that for inspiration, but it'd be nice to have a generic client.

@samhendley
Copy link
Contributor

This shouldn't be too tricky to do. The hardest part is you need to delegate a thread to consuming the server responses and feeding them into the futures you returned to your clients.

Here is a snippet that uses an underlying Pipeline client that actually has alot of what we need other than a "pump thread". This isn't quite what we want since the subscribe method here will block waiting for each response. I don't have time to actually implement and test the full fledged version but I could provide guidance on how it can be implemented. (Or possibly if I get real bored one evening it might get done).

public interface RxRedisCommands {
  Observable<Long> append(String key, String value);
  Observable<String> get(String key);
  Observable<Long> incr(String key);
}

class RxJedisClient implements RxRedisCommands {

    private final Pipeline pipeline;

    private volatile boolean keepRunning;
    private final AtomicLong outstandingResponses = new AtomicLong();

    public RxJedisClient(Pipeline client) {
        this.pipeline = client;
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (keepRunning) {
                    try {
                        while (outstandingResponses.get() > 0) {
                            pipeline.sync();
                        }
                        synchronized (outstandingResponses) {
                            while (outstandingResponses.get() == 0) {
                                outstandingResponses.wait();
                            }
                        }
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        });
    }

    public void destroy(){
        keepRunning = false;
        synchronized (outstandingResponses) {
            notify();
        }
    }

    protected <T> Observable<T> getObservable(final Response<T> response) {
        Observable<T> lr = Observable.create(new Observable.OnSubscribe<T>() {
            @Override
            public void call(Subscriber<? super T> subscriber) {
                subscriber.onNext(response.get());
                subscriber.onCompleted();
            }
        });

        outstandingResponses.incrementAndGet();
        synchronized (outstandingResponses) {
            notify();
        }
        return lr;
    }

    public Observable<Long> append(String key, String value) {
        return getObservable(pipeline.append(key, value));
    }

    @Override
    public Observable<String> get(String key) {
        return getObservable(pipeline.get(key));
    }

    @Override
    public Observable<Long> incr(String key) {
        return getObservable(pipeline.incr(key));
    }
}

@HeartSaVioR
Copy link
Contributor

@samhendley Hey, thanks for guiding!
Actually I thought it could help us when only Observable has two or more responses to treat it stream.
Is there any noticeable overheads when we use Observable for each response?
If it isn't I'll apply RxJava to #713 and try to complete this long discussion. :)

@samhendley
Copy link
Contributor

I personally haven't been sold that Observable<List<T>> should be flattened to Observable<T>. In the vast majority of the jedis cases that that wouldn't make so I think it would be confusing to apply it to the few places where it maps well. I would directly copy the PipelineCommands interface and just change Response<X> to Observable<X>.

I look forward to seeing this applied to that PR.

@sdeleuze
Copy link

sdeleuze commented Apr 9, 2015

Observable<List<T>> is usually a code smell, and indeed using Observable<T> is far more better.

Also have a look to this issue (ReactiveX/RxJava#1594) that intend to provide a promise like type in RxJava (a strong need from my POV).

@wongswoon
Copy link

@xetorthio what's the time dose asyncJedis RELESE ?
https://github.com/xetorthio/jedis-netty-exploration,the demo does not work

@frankxieke
Copy link

##does it work ansync Jedis client?

@ordiy
Copy link

ordiy commented Jul 20, 2017

English not my primary language, but i try my best clearly describe this question.I use vertx build a async mirco service. Vertx throw an “Thread Block" exception. I submit this exception details ,hope to help yours solve this problem.

<!-- pom -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-core</artifactId>
            <version>3.4.1</version>
        </dependency>

the code:

    public Future<List<String>> getUd(String rId,String em,String platform){
        Future<List<String>> reFuture = Future.future();
        try {
            String keyPrefix = "i";
            String bucketPrefix = "ib";
            if("a".equals(platform)){
                keyPrefix = "a";
                bucketPrefix = "ab";
            }
            String uids = jedisCluster.getJedisCluster().get(keyPrefix + em);
            if (StringUtils.isBlank(uids)){
                String key = bucketPrefix + em.substring(0,8);
                String filed = em.substring(8);
                uids = jedisCluster.getJedisCluster().hget(key, filed);
            }
            if (StringUtils.isBlank(uids)) {
                reFuture.tryFail(new RuntimeException(" data not found"));
            }else {
                log.info("{} have get uids {} ,will complete future ",rId,uids);
                reFuture.complete(Lists.newArrayList(uids.split(",")));
            }
        } catch (Exception e) {
            reFuture.tryFail(e.getCause());
        }
        return reFuture;
    }

log details

2017-07-20 15:51:40.534 [vertx-blocked-thread-checker] WARN  io.vertx.core.impl.BlockedThreadChecker:167- Thread Thread[vert.x-eventloop-thread-2,5,main] has been blocked for 2995 ms, time limit is 2000
2017-07-20 15:51:41.535 [vertx-blocked-thread-checker] WARN  io.vertx.core.impl.BlockedThreadChecker:167- Thread Thread[vert.x-eventloop-thread-2,5,main] has been blocked for 3996 ms, time limit is 2000
2017-07-20 15:51:42.535 [vertx-blocked-thread-checker] WARN  io.vertx.core.impl.BlockedThreadChecker:167- Thread Thread[vert.x-eventloop-thread-2,5,main] has been blocked for 4996 ms, time limit is 2000
2017-07-20 15:51:43.537 [vertx-blocked-thread-checker] WARN  io.vertx.core.impl.BlockedThreadChecker:167- Thread Thread[vert.x-eventloop-thread-2,5,main] has been blocked for 5997 ms, time limit is 2000
io.vertx.core.VertxException: Thread blocked
	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at redis.clients.jedis.Connection.connect(Connection.java:184)
	at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:93)
	at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1767)
	at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:106)
	at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:836)
	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:434)
	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:361)
	at redis.clients.util.Pool.getResource(Pool.java:49)
	at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
	at redis.clients.jedis.JedisSlotBasedConnectionHandler.getConnectionFromSlot(JedisSlotBasedConnectionHandler.java:66)
	at redis.clients.jedis.JedisClusterCommand.runWithRetries(JedisClusterCommand.java:116)
	at redis.clients.jedis.JedisClusterCommand.runWithRetries(JedisClusterCommand.java:141)
	at redis.clients.jedis.JedisClusterCommand.runWithRetries(JedisClusterCommand.java:141)
	at redis.clients.jedis.JedisClusterCommand.run(JedisClusterCommand.java:31)
	at redis.clients.jedis.JedisCluster.get(JedisCluster.java:124)

	at io.vertx.ext.web.impl.RouteImpl.handleContext(RouteImpl.java:217)
	at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:78)
	at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:118)
	at io.vertx.ext.web.impl.RouterImpl.accept(RouterImpl.java:79)
	at io.vertx.core.http.impl.ServerConnection.handleRequest(ServerConnection.java:285)
	at io.vertx.core.http.impl.ServerConnection.processMessage(ServerConnection.java:429)
	at io.vertx.core.http.impl.ServerConnection.handleMessage(ServerConnection.java:131)
	at io.vertx.core.http.impl.HttpServerImpl$ServerHandler.lambda$createConnAndHandle$1(HttpServerImpl.java:763)
	at io.vertx.core.http.impl.HttpServerImpl$ServerHandler$$Lambda$39/1904759218.run(Unknown Source)
	at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:335)
	at io.vertx.core.impl.ContextImpl$$Lambda$22/214649470.run(Unknown Source)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:193)
	at io.vertx.core.http.impl.HttpServerImpl$ServerHandler.createConnAndHandle(HttpServerImpl.java:757)
	at io.vertx.core.http.impl.HttpServerImpl$ServerHandler.doMessageReceived(HttpServerImpl.java:621)
	at io.vertx.core.http.impl.HttpServerImpl$ServerHandler.doMessageReceived(HttpServerImpl.java:573)
	at io.vertx.core.http.impl.VertxHttpHandler.channelRead(VertxHttpHandler.java:76)
	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:122)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
	at io.vertx.core.http.impl.HttpServerImpl$Http1xOrHttp2Handler.http1(HttpServerImpl.java:1064)
	at io.vertx.core.http.impl.HttpServerImpl$Http1xOrHttp2Handler.channelRead(HttpServerImpl.java:1035)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at java.lang.Thread.run(Thread.java:748)

@baiwfg2
Copy link

baiwfg2 commented Mar 6, 2019

I also hope async redis would be available soon.

@marcosnils
Copy link
Contributor

Let's continue the discussion here: #713

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests