Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complex streaming doc/examples #96

Closed
eric-burel opened this issue Mar 12, 2018 · 8 comments
Closed

Complex streaming doc/examples #96

eric-burel opened this issue Mar 12, 2018 · 8 comments

Comments

@eric-burel
Copy link

Hi,

I crosspost my Stack Overflow post here because the issue is related to ZeroRPC doc. I try to set up a Pub/Sub workflow, between a Node client and a Python server, and I did not find any example/doc/insights on this, I am a bit lost.

As you may know, ZeroRPC documentation is sparse. I can't get Streaming between a Python server and a Node client to work.

Here is the Python method:

    @zerorpc.stream
    def PublishWhaterver(self, some_args):
        yield "love"
        yield "stack"
        yield "overflow"

Here is the Node call:

    export const tryStream = () => {
          connectedZerorpcClient.invoke('PublishWhatever', (error, res, more) => {
          console.log('STREAM', res, more);
      });
    };

This code will log "STREAM love", and then do nothing.

So here are my questions:

  • In the Python server code, am I supposed to call PublishWhatever with relevant args so that it yield additionnal values ?
  • In the Node client, should I call some recursive function when there is more data ?

What I am trying to implement is a Pub/Sub system but right now implementation seems to only exists for a Python server and a Python client, there are no Node example.

The example on the main page and tests are not relevant either, it shows how to stream an array or Python iterator that already exists when the invoke method is called.
Here the messages are generated during some heavy computations, I want the server to be able to tell the client "here, some data are ready" and never disconnect.

Thanks for the help!

Original post: https://stackoverflow.com/questions/49241365/streaming-with-zerorpc

@eric-burel eric-burel changed the title Complex streaming issue Complex streaming doc/examples Mar 12, 2018
@bombela
Copy link
Member

bombela commented Mar 12, 2018

PublishWhaterver takes one argument, but you never provide it from the nodjs client. See http://www.zerorpc.io/ for some basic examples. Note sure why do you still get at least one message from the server though.

@zerorpc.stream instructs zerorpc to consume the result of the function as an iterable. xrange is an iterable. A function that make uses of yield becomes a factory returning an iterable. A list is an iterable etc.

@eric-burel
Copy link
Author

eric-burel commented Mar 12, 2018

Thanks @bombela for the fast answer. I phrased my question badly, sorry, it was a unclear.
I think I almost got it indeed for the server-side part. I wrote an infinite generator that consumes a queue, the call to PublishWhatever returns the corresponding iterable. I added an argument because I thought I would need to call it from the server but now I got it, the current version takes no argument.

That's the client side that bugs me the most, I am not really sure of what happens. Since this is an infinite iterator that triggers some Python computing, how could I consume it client side ? Is zeroRPC handling the connection somehow ? Like when I call next() on my client-side iterator, it will call next() on its server-side counterpart and send me the result through the wire ?

What I meant with the example is that xrange is a finite and "instateneous" iterator, you could have sent the whole array instead so it makes the code a bit trivial and did not help me to understand how thing works.
What bugs me is how it works with an infinite iterator, and "non instaneous" stuffs. I found examples in Python so I am starting to grasp it but nothing with a Node client.

Those might be noob questions but I hope it could help to provide more complex examples for future users.

I found this example in JS for the generator consumption but that's not really what I seek, I want the client to actually listen to the server all the time and not to poll it. Just as if I wrote an event listener with a socket (but I'd like to enjoy zeroRPC robustness and scalability).

Edit: Maybe streams are not the right tool for this job though, I should call the underlying socket directly and establish a connection? But that would shortcut zeroRPC, I don't fully understand the consequences of this.

@bombela
Copy link
Member

bombela commented Mar 12, 2018

Thanks for this excellent question! I had not realized it was what you wanted to know in the first place :)

zerorpc has two type of RPC (remote procedure call) pattern. One is called request/response and is the default. The other is called a stream (really should be called request/stream) and is opt-in (with @zerorpc.stream in python, nodejs has a stream method etc).

request/response

The client sends one request to the server. This request contains the name of the function to execute on the server. The result from the execution of the function is then sent back to the client as a single response on the network.

In Python, if you have one big array, this array is sent as a single response. One big response.

request/stream

Similar to request/response the client sends one request to the server. The result of the function is interpreted as an iterable in Python (in nodejs you call the response callback repeatedly). Every item from the iterable is then sent as one individual response on the network.

In Python, if you have one big array, this array is iterated, and every element is sent as a single small response. 10 elements, 10 responses.

client api

In Python, the client exposes the stream as an iterable object. You can keep the server and client code untouched and applying @zerorpc.stream on server will change the behavior on the network.

Example, the server returns a list(), and the client converts the response to a list(). Semantically, the server and clients are manipulating a list. Now if you want the list to be transferred one item at the time on the network, you only have to decorate the server function with @zerorpc.stream.

Because nodejs doesn't have a first class notion of iterable, the code has to be different. There is no abstraction hiding the iteration. On the network though, everything is the same.

As you figured out, a stream doesn't have to ever terminate. It can be infinite. zerorpc doesn't care.

You can use streaming for breaking down big responses into smaller pieces. Or for pub/sub, as a mean of sending a message when some event occur.

implementation details

Behind the scene, zerorpc represent every communication as channels.

A request/response will open a channel, send a single message on it, receive a single message, and then the channel is closed on both sides.

A request/stream will open a channel, send a single message, and then will receive an infinity of messages. Eventually the server can add a flag to a message informing the client that it is the last message of the stream. Then the server and client both closes theirs channel.

If you wonder if this means we can add other RPC pattern: yes, yes we can. For example stream/response, or stream/stream etc. or 10request/3response or whatever we want really. For now though, we stick to request/response and request/stream.

As we seen, a channel can be opened for any length of time. And it doesn't have to be busy at all time. A pub/sub could be sending a single message every hour for example.

heartbeating

To keep track of the status of the channel, zerorpc sends a periodic heartbeat message on it (every 5s by default). The server and client send this heartbeat message independently. And both side keep track of how long it has elapsed since they last received an heartbeat from the other. After twice the heartbeat frequency (5x*2 == 10s by default) without any news of the other side, the channel is dropped and a LostRemote error raised/returned.

This means you never get stuck forever because the network disconnects, or the other side is stuck in some ways or another.

Heartbeating can be turned off. Sometimes you do not want to pay the cost of heartbeating. Or because you are doing load-balancing across a bunch of workers using the request/response pattern, and ZMQ will try to load-balance heartbeat messages, breaking the short-lived channel. Or because you have a bunch of CPU bound workers that are too busy to send any heartbeat messages etc.

Of course when it is off, you cannot differentiate anymore between a network err and a stuck remote process. Pick your poison. If you are doing local dispatch via a unix socket, you can usually forgo heartbeating without much trouble.

client timeout

Finally, there is the notion of client timeout (30s by default). This is how long zerorpc will wait until it receives the first response message on the channel. Heartbeat messages are not responses. You can set the timeout to any value. On the request/response pattern, this translates to how long the client is waiting for a response from the server. On the request/stream pattern, how long the client is waiting for the first response on the stream.

Do not hesitate asking for more details.

@eric-burel
Copy link
Author

Much thanks for this detailed answer!
I am almost there I think, at least I can consume the infinite stream. I'll need to test how it behaves in my app (I only ran a small test case), I'll keep you informed and post the examples here.

If I manage to provide an example of the Node client implemention, where I could share it to help? Your post could almost make a nice Medium post or doc page if I could illustrate it with examples.

@eric-burel
Copy link
Author

eric-burel commented Mar 13, 2018

I am almost done! However I have issues with queues. I wrote the following iterator:

def data_ready_iterator():
    while True:
        if not DATA_READY_QUEUE.empty():
            logging.debug('Some data is ready')
            data = DATA_READY_QUEUE.get()
            yield {'result': 0, 'data': data}
            logging.debug('Done yielding data')
            DATA_READY_QUEUE.task_done()

Edit: the while loop provokes some deadlock because the DATA_READY_QUEUE seems to be always locked due to the empty() call, so this solution does not work.
Edit 2: There is this stack question but answers are irrelevant, they only handle finite data. I need to consume the queue all the time and consider it infinite, so I need to use a while loop (or at least some polling).
Edit 3: using the underlying socket would be a solution bu I am not sure if I can access it

I could rephrase the question this way: how would you yield a value in the stream on an event that happens in a consumer thread ? Queues are a solution to share messages between threads, but I can't manage to yield when a new message arrives. Maybe I could do some polling instead ? But still I'll need some kind of infinite loop, so a thread...

@bombela
Copy link
Member

bombela commented Mar 13, 2018

Threads and gevent don't play nice together. This is because gevent implicitly starts an ioloop for you to schedule coroutines (greenlets). Every time a coroutine uses what appears to be a blocking gevent function, it yields to the ioloop. The ioloop in turn checks for IO being completed, and schedules any coroutine ready for progress.

In other words, gevent has one ioloop, in a single thread, scheduling multiple coroutines concurrently. There is no parallelism. And in fact, you can get in great trouble when mixing threads and gevent (see some discussion there 0rpc/zerorpc-python/issues/136).

The queue from the standard python library is designed for threading. Any blocking operation will block the current thread when waiting on the queue. If you use it from a coroutine, the gevent thread is now blocked, and gevent ioloop has no chance to run. Deadlock.

The queue from gevent is designed for cooperative coroutine. Any blocking operation will block the current coroutine. The gevent ioloop will schedule another coroutine or just wait for some IO to complete. Then resume the original coroutine whenever the blocking operation is completed.

If what you care is concurrency and you do not need parallelism (ie: using multiple physical CPUs), then simply use a coroutine gevent.spawn and gevent.queue.

If you really want to use multiple CPUs, Python multi-threading is rarely the solution. This is because the main implementation of the Python interpreter locks everything globally (the dreaded GIL). If you are calling C functions though, it will effectively run in paralleled. But any Python code will run in lock-step.

A good solution is to start a bunch of worker sub-processes (see http://www.gevent.org/gevent.subprocess.html), and communicate with them via zerorpc. You might want to turn off heart-beating if workers are CPU bound. And use a unix socket instead of TCP on the loopback for slightly better performances.

Instantiate a zerorpc Client in the master process for every worker. Add a little function to select which worker to use next. The simplest solution simply round-robin, but you can go as far as you want. Then communicate with the worker, wait for the result, return the result.

In pseudo code this could look like:

# Master
class MyService:
  def __init__(self):
     self.workers = []
     for i in range(num_workers):
         endpoint = '/tmp/myworker.sock.%d' % i
         p = subprocess.Popen("./myworker", endpoint)
         self.workers.append((p, zerorpc.Client(endpoint)))
     self.round_robin_idx = 0

   def _next_available_worker(self):
     r = self.workers[round_robin_idx][1]
     self.round_robin_idx = (self.round_robin_idx + 1) % len(self.workers)
     return r

   def something_to_do(self, some_arg):
      return self._next_available_worker().something_to_do_using_lotta_cpu(some_arg)

s = zerorpc.Server(MyService())
s.bind(...)
s.run()

# Worker
s = zerorpc.Server({'something_to_do_using_lotta_cpu': lambda args: return "done some work"})
s.bind(sys.argv[1])
s.run()

Let me know if you need any clarification.

If you want to share some extra nodejs examples, maybe issue a pull request against https://github.com/0rpc/0rpc.github.io . And feel free to write any blog post you want. I can proof read the implementation details if you want.

@eric-burel
Copy link
Author

eric-burel commented Jun 20, 2018

Hi,

I owe you an answer since you took a lot of time to explain this to me.
In the end, using ZeroRPC with streams was far too complicated. This model is suited for actual streaming: when you are asked a resource and you want to send it a piece at a time. This is still quite a synchronous workflow (data are known in advance or can be computed on demand using a generator), while I needed a really asynchronous workflow where data can evolve randomly either client side (when the user clicks somewhere) or server side (when the computation service produce results).

I ended up using websockets, that are more suited for real-time communication. It was not trivial to setup the threads too from a Python beginner standpoint but that should be far easier for a Python expert than relying on ZeroRPC for the same use case. Here is a SO question with more details on this: https://stackoverflow.com/questions/49696964/push-data-to-a-distant-client-from-a-python-thread-running-on-a-server.

Thanks again for your support

@Robinxbnie
Copy link

Robinxbnie commented Oct 23, 2019

@bombela.

I have a question on realization on "request/stream". I have an actual need. Let's say, I have a request to get a very long string that python-server will produce. Could you please show me with some code snip how to realize it (using stream to pass the string to the node-client).

in server:
... def createBigStrStream(self, bigStr): # some code to prepare stream, or in other words, chop the big String into pieces return # piece, or smaller string one by one

in node-client"
let bigArr = [] client.invoke("createBigStrStream", bigStr, function(error, res, more) { bigArr.push(res) console.log(res); console.log("bigArr = ", bigArr) // if no more coming, we can use the bigArr here... });

I am stocked here and the stream example on the web with range() not clearly solve my confusion.
I tried to use generator, while got the error:

TypeError: can not serialize 'generator' object

The most import question: what parameter type should I put to the "return" statement?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants