Skip to content
This repository has been archived by the owner on Jun 7, 2018. It is now read-only.

co-routine consumer. #18

Open
d0ugal opened this issue Jul 4, 2014 · 3 comments
Open

co-routine consumer. #18

d0ugal opened this issue Jul 4, 2014 · 3 comments

Comments

@d0ugal
Copy link
Owner

d0ugal commented Jul 4, 2014

I'm not quite sure what to call this, maybe @kalfa can correct me :)

At the moment the API works from us passing in a callback function, I wonder if we can do something like this with a coroutine.

rfxcom = AsyncioTransport(dev_name)
while True:
    packet = yield from rfxcom
    print(packet)

Any thoughts about how this would be implemented? or some similar APi.

@kalfa
Copy link
Contributor

kalfa commented Jul 4, 2014

A big vague :)

Trying to do an educated guess (which is anyway an opinion of someone who started using asyncio some weeks ago :P) :

You're seeing the transport as a producer for packets, which is nice, especially because it decouples two stages of the workflow.
You need something to consume the packets as they come in and act upon them.

You might be able to do it with a co routine, but what you're doing is too simplified for a normal workflow. This is why there are Transports and Protocols.
What you wrote does not keep in account that the protocol might want to write something as well. There is a bi-directional relationship about the data flow between T&P.

Which means: there is no simple implementation of that, in a rfxcom context :)
If the question was something else (e.g. for sake of curiosity, can I do it?) let me know.

Going deeper:

The pattern used by asyncio is Transport + Protocol. The protocol will be your consumer, when there is something to read (and your Producer when something to write is present).
The transport will call the protocol when it has something read (a protocol instance is associated to each transport), IIRC calling proto.data_received(pkt).

This acts pretty much as your line of code, adding stages which are important:

transport is required to notify its protocol when a connection has been made by proto.connection_made(transport_instance), which instruct the protocol about how to communicate back to the transport if required.
Then on each pkg received proto.data_received(pkt), as said.

The protocol has the biz logic about what to do with the pkt. the transport is a dump I/O executioner.

In my opinion to apply that to rfxcom:

  • the transport does what it's doing now, in general terms:
    • adding readers and writers to the serial line, which are called.
    • the writer is buffered (queued) to allow a single pkt to be in the writing critical section at a single time.
    • the reader can be, especially if you need a serialised workflow. it can be a priority queue, a per-pkt-type-queue. you'll call proto.data_received(pkt) to send the pkt to its protocol depending on the queing policy, if any. You probably want to keep it simple at the beginning and just send it right away.
    • in terms of transport interface implementation you might want to implement the WriteTransport and ReadTransport interfaces (IIRC those are the names).
    • the transport will call the protocol when required by the interface workflow.

What you need is decouple completely the biz logic from the transport and put it into a protocol
The protocol is the one knowing how to deal with pkts (aka the protocol is not a pkt, but a pkt-manager)
This is currently performed by the transport via do_callbacks, which dispatches it to the right pkt handler.
This should be done by the protocol (transport is agnostic of such things).

So in code terms:
def read(self):
data = [read something]
[checks data is coherent]
self.loop.call_soon(self.protocol.data_received, data)

this is it. then the data_received call will do what do_callback does, but on protocol side.

I hope this helps. I guess this turned to be the issue (#17) for refactoring though :)

@kalfa
Copy link
Contributor

kalfa commented Jul 4, 2014

Since I'm in the mood :p

I guess this would work also with the code you wrote.
I'm not considering the limitation of the approach, which are irrelevant for sake of understanding.
Also, this is not the asyncio way of doing things, it would be much harder for people to read.

I guess that you can do a

asyncio.async(rfxcom)
loop.run_forever()

and it would work, modifying the Transport class to yield the values in the read buffer once at a time.

I think that asyncio.async() needs an object which implements the full generator protocol, which is quite long to implement for a quick proof of concept.

So I'd modify the class being an iterator which yields the next pkt (I guess it needs to block though, if there is nothing, which is not optimal. It cannot raise StopIteration as in theory it's an infinite generator.
This is the tricky part I guess.

Then wrap the transport, which is not a generator, into a corouting

@asyncio.coroutine
def transport_wrapper(transport):
yield form transport

asyncio.async(transport_wrapper(rfxcom)
loop.run_forever()

This is (more or less, probably closer to less, than to more 😄 ) the steps.

Edit: I missed the part where you print the pkt. I guess it can go in the transport_wrapper 😄

@d0ugal
Copy link
Owner Author

d0ugal commented Jul 4, 2014

Okay, wow! Thanks for this, I'll need to find some time to digest it and follow up.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

2 participants