Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unexpected ordering #35

Open
cmusser opened this issue May 21, 2019 · 4 comments
Open

unexpected ordering #35

cmusser opened this issue May 21, 2019 · 4 comments

Comments

@cmusser
Copy link

cmusser commented May 21, 2019

I'm having some unexpected results with a program using redis_async. For a series of items (structs with a "key" and "data" field), it does a Redis GET and, depending on the result, possibly a SET. The processing of earlier items in this sequence ought to affect the processing of later ones, but doesn't, until the next run of the program. The code is basically a simple de-duplication algorithm that only puts things in Redis if they don't exist or are different: It attempts to GET each object in the series from Redis. If not found, or the value in Redis is different, it issues a SET for the object. For identical objects it does nothing.

Given a series of three identical objects, I'd expect the program to store the value for the first object, then do nothing for the remaining two. Instead, the code performs three SET operations, as if the initial SET (or subsequent ones) had no effect. I've confirmed that the SETs did indeed happen. I want the sequence to be (GET->SET),(GET-NOOP)->(GET->NOOP), but according to the traces I put in, the actual sequence is GET, GET, GET, SET, SET, SET. The requests are being pipelined in a way that ensures the GETs complete before any SETs do. So from the standpoint of the program, the searched for object doesn't exist in Redis. Of course, when you run the program again, the object is in Redis and it correctly identifies the objects as duplicates.

My first attempt at this created a Vec of 3 futures and then used future::join_all() to return a combined set of futures. On reflection, this didn't seem right because it doesn't say anything about the order that things should happen. It seemed like it just launched all three concurrently (with the initial GETs at the head of the queue). I also tried creating a "stream" of futures with for_each(), but that didn't change the order that things happened. Is there a way to somehow create an ordered sequence of "future chains" (meaning the (GET->SET, or possibly (GET->NOOP) such that those two things completes before the next "pair of operations" happens?

@benashford
Copy link
Owner

Hello,

Thanks for raising this, it's an interesting problem.

By the sounds of the description your diagnosis in the third paragraph is probably correct. A future::join_all will run each future in turn, moving on to the next when the first is blocked waiting for a response (or a buffer to become ready, etc.). In practice this means it would send three GETs, then three SETs.

I'm not quite sure what would have happened with the for_each example, without seeing some example code.

The guarantee that redis_async offers in terms of ordering is that messages are sent in the same order as the calls to the send on PairedConnection. So in this example case the three futures would need to be combined with and_then instead to ensure they run in one sequence rather than concurrently. Specifically the send containing the next GET shouldn't run until after the SET command of the previous future.

This is assuming that there's nothing else sending commands for the same key, of course. If this was intended to be used in a situation where Redis was shared by multiple processes all doing the same thing, you'd need to use Redis's transactional features: https://redis.io/topics/transactions which unfortunately aren't yet supported by redis_async although because you are checking the result of the GET this wouldn't solve your problem either.

Another alternative is to put the GET -> check the result -> SET logic in a Lua script for Redis to run via EVAL, this will ensure each GET/compare/SET operation is atomic within one particular Redis instance regardless of how many clients are connected. Which would mean the original future::join_all example would also work.

@cmusser
Copy link
Author

cmusser commented May 23, 2019

I tried an experiment of chaining two of the futures I'm using with and_then(), and as you predict, that does work correctly. I don't know how to make that work with sequence of values of arbitrary length, though, which is why I tried using the for_each(). I've honed this down to a sample project https://github.com/cmusser/dupcheck if you want to take a look at what I'm doing. This has both the broken for_each based logic and the hardwired and_then based logic, which can be selected by command line options.

@benashford
Copy link
Owner

Ah OK, I can see it now. This is an interesting edge-case.

When the call to send is made on the connection, the outgoing message is serialised and added to a queue (this is done eagerly as there's no I/O at this stage). So in this code, because the call to send is made when filling the vector before for_eaching through the stream, all the GETs were at the head of the queue.

The solution for this would be drop lines 112 to 115, and instead of 116 do:

iter_ok::<_, redis_async::error::Error>(events).for_each(move |event| get_dedup_future(conn.clone(), event))

This will make sure that the following GET is only initiated after processing the previous SET.

An alternative would be to wrap the implementation of get_dedup_future inside a future::lazy this will defer the send to when the future is polled, which will be inside the for_each.

(I'm now thinking a future version of the library probably should have a send_eager and a send_lazy as options on the connection to make this kind of eager/deferred choice an obvious choice...)

Personally I prefer the first option, but whichever works best depends on the idioms of the wider application.

@cmusser
Copy link
Author

cmusser commented May 23, 2019

I just stumbled on this solution in a slightly different form myself. Some background: the "add a future to the Vec" loop exists because I'd tried earlier to create a Vec of futures with map() and did it incorrectly. I attempted it again and ended up with the following, which behaves correctly:

let v = events.into_iter().map(move |event| get_dedup_future(conn.clone(), event));
iter_ok::<_, redis_async::error::Error>(v).for_each(|f| f)

This is a more verbose equivalent of what you have.

It took me a bit to understand why this worked. To restate what you said: calling get_dedup_future() in that loop, in turn callssend, which actually does cause a Redis request to get queued. It's not "in the future"; it happened right then and there. So N of those in a loop results in N GETs at the head of the line. Putting it inside the for_each closure has the effect of deferring each of those send calls, so that they happen in sequence. Wow. Ok. Thanks for the insight on this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants