Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make receiving messages event driven #2

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

vickash
Copy link

@vickash vickash commented Jul 22, 2023

I originally wanted to stop #get from blocking so the main loop could do other things. I thought messages would be in the queue and could be checked occasionally.

Then I realized the mqtt event objects all point to the same location for topic and data, so every subsequent message overwrites the previous.

Instead of using another queue with the actual values, I went ahead and made the whole thing event driven. The client sets a generic mruby proc that gets called inside the event handler, similar to the Wi-Fi gem.

When a message is received:

  • Suspend main task
  • Generic proc called (gives topic and message to mruby)
  • In mruby, look up (array of procs) if there's a specific callback for the given topic
  • Run the callback if it exists, giving the message
  • Resume main task

@yuuu
Copy link
Contributor

yuuu commented Jul 23, 2023

@vickash
Thanks for your Pull Request.
You are correct, #get had a problem with subsequent messages being overwritten.

I also had an idea to use asynchronous callbacks. But in this way mrb_state is referenced by multiple tasks. Is there any concern about this?

@vickash
Copy link
Author

vickash commented Jul 24, 2023

But in this way mrb_state is referenced by multiple tasks. Is there any concern about this?

That's a good question. Unless I'm mistaken, the only other time the mruby task is paused and "taken over" is during the Wi-Fi connect and disconnect blocks?

So would be hard to notice if this is an issue, since MQTT events won't happen until Wi-Fi connects anyway.

Either way, could be solved with a semaphore, no?

@yuuu
Copy link
Contributor

yuuu commented Jul 24, 2023

@vickash
You are right, the only situations in mruby-esp32 that suspend tasks are Wi-Fi and MQTT.
However, similar situations are expected to increase as SPI and I2C are supported in the future.

For example, the following code may cause mruby to crash.

wifi = ESP32::WiFi.new

wifi.on_connected do |ip|
  mqtt.connect
  mqtt.subscribe("mruby-esp32-mqtt") do |message|
    # Process messages
  end
end

wifi.connect('SSID', 'password')

After much consideration, I decided that "mrb_state should only be handled in a single task.
If you have a better way, please let me know 🙏

@vickash
Copy link
Author

vickash commented Jul 24, 2023

wifi = ESP32::WiFi.new

wifi.on_connected do |ip|
  mqtt.connect
  mqtt.subscribe("mruby-esp32-mqtt") do |message|
    # Process messages
  end
end

wifi.connect('SSID', 'password')

You're right that this doesn't work, but it isn't because #subscribe is hooking its callback. All that does is set a few variables. MQTT data events aren't happening yet, and that isn't running in the context of an MQTT event. It's a Wi-Fi event.

The root cause is mqtt_wait_for_event. Leaving mqtt.connect alone inside the block causes the spinlock error too.

I haven't dug deeper, but my guess is that the block given to wifi.on_connected is happening inside an event handler where the RTOS doesn't want you to wait indefinitely for another event.

This style of code is probably possible only if everything becomes event driven.

Example:

  • #connect is called inside the WiFi event.

  • Set @connected ivar in mqtt client to false

  • Call C function to start the connection, but return immediately from #connect

  • Meanwhile in the mqtt event handler, wait for the connected event. When it arrives, pause mruby_task, set @connected to true, and run any blocks saved to run on connect (see below)

  • #subscribe is called now

  • Set @subscribed to false (would have to be an array or hash, but simplifying)

  • Check if @connected is true

    • If it is, call C function to subscribe and add message callback immediately
    • If it isn't, then hook a callback to the connect event that will start the subscription and hook the message callback
    • There could be a weird race condition here if mruby_task gets paused exactly after checking the value of @connected
    • Either way, the subscribe event handler sets @subscribed to true

This should work in the wifi.on_connected block, and is a cleaner looking way of handling disconnects, but it's probably a lot of work, and we'll need a way for mruby to interact with the RTOS semaphores to avoid race conditions like that example.

That should be the eventual goal, but I'm just trying to make message subscription usable for now.

To deal with disconnects, I was planning to use ivars (set by the event handlers) to store the state of the wifi and mqtt connections. Then, in the main loop, check those ivars periodically, and rerun the setup process in the same linear blocking way as the startup procedure.

This ties up the main task, but shouldn't be needed often. Event driven absolutely will be better since it leaves the main task free to do whatever in case it can't reconnect, but we'll get there eventually.

However, similar situations are expected to increase as SPI and I2C are supported in the future.

Yes, but only for SPI and I2C slave modes I think? I also want to do interrupt driven digital input pins eventually, so it will matter there too, but I think the semaphore will be fine.

Think of it like using interrupts on an Arduino. mruby_task is our main loop and the other RTOS tasks with their events want to interrupt it. Each event handler takes the semaphore before it pauses mruby and releases it after it resumes. This prevents "interrupts" from overlapping. If we add an interface so mruby can "disable interrupts", i.e. take the semaphore, we can run mruby code guaranteed not to be paused for an event, solving problems like the @connected race condition above.

@vickash
Copy link
Author

vickash commented Jul 24, 2023

@yuuu, it looks like there is no need to suspend and resume mruby_task. In fact, doing so was causing delays in my main loop to end prematurely when a message is received.

Simply calling mrb_yield_argv and running a block works just fine, with delays staying the correct length, or at least close enough. Might be adding the event time? Haven't tried to figure that out yet.

I used the same pattern as adding the data callback to deal with these events:

  • Connected
  • Disconnected
  • Unsubscribed

The wait functions have been removed, and code like that example you gave should work. However, you'll need to increase the event loop's stack size. I used 16k.

Try this for a more thorough test:

wifi = ESP32::WiFi.new
mqtt = ESP32::MQTT::Client.new('test.mosquitto.org', 1883)

message_counter = 0

# Nested callback hooks
wifi.on_connected do |ip|
  puts "Connected with IP: #{ip}"
  
  mqtt.connect
  mqtt.subscribe("mruby-esp32-rx") do |message|
    message_counter += 1
    puts "Message from mruby-esp32-rx: #{message}. Counter: #{message_counter}"
  end
end

wifi.connect('SSID', 'password')

# Ensure connection before trying to publish. Not sure if needed.
while !mqtt.connected do
  ESP32::System.delay(100)
end

loop do
  mqtt.publish("mruby-esp32-tx", 'message')
  ESP32::System.delay(10000)
end

I added comments for where the semaphore should be given and released, but I'm not sure this is even necessary. It looks like mrb_yield_argv is doing something similar to the GIL on CRuby, but I can't find any documentation on it. Can you figure that out and confirm?

If that's the case, the two event handlers in the wifi gem could be changed so they don't suspend mruby_task and everything will play well together.

@vickash
Copy link
Author

vickash commented Jul 24, 2023

I added comments for where the semaphore should be given and released, but I'm not sure this is even necessary. It looks like mrb_yield_argv is doing something similar to the GIL on CRuby, but I can't find any documentation on it. Can you figure that out and confirm?

Ok, this definitely isn't the case. It only worked because I had a long delay.

@vickash
Copy link
Author

vickash commented Jul 24, 2023

This has gotten too complex, so I'm giving up on this branch.

@yuuu let me know how this sounds instead:

  • Block for non data events, just like the original code
  • Don't block for data, but add the messages to some new queue
  • Add callbacks through #subscribe like my first commit here
  • Add a method like Client#run that the user has to call in the main loop. It empties the queue and runs the callbacks with each message.

This gives the same feel, with one extra line. We could do something similar to handle both wifi and mqtt disconnect and reconnect too.

@yuuu
Copy link
Contributor

yuuu commented Jul 24, 2023

@vickash
I really appreciate your various validations.

mrb_yield_argv without suspend is as risky as you say, and it's hard for me to accept. I'm sorry I can't give you a better suggestion.

I have some concerns about the suggestion at #2 (comment).

  • A callback would be ideal. However, it is difficult to execute yield in another context (another task), and I think that various side effects will occur.
    • In what context will yield be executed in your plan?
  • Is it necessary to add Client#run ?

@vickash
Copy link
Author

vickash commented Jul 24, 2023

  • In what context will yield be executed in your plan?

There's no yield at all in my new plan. By "callback" I mean #subscribe takes the block to handle messages up front, rather than having that code inside the main loop. Client#run pops messages off the queue, checks the topic and calls the right block with it. That all happens inside mruby_task.

The only thing happening outside is adding items to the queue. This isn't truly asynchronous, since you're checking for messages once every loop, but I looked at how CircuitPython handles MQTT, and it's basically this.

The only thing the user has to remember to add to their main loop is Client#run.

It might, but doesn't have to, if you're ok with extending it.

#get can be reworked to pop a message off the new queue (i.e. the one holding actual message values, not pointers) while still blocking.

@yuuu
Copy link
Contributor

yuuu commented Jul 25, 2023

@vickash
I see. If there is no yielding in another task and #get can get the message, I think it is a good plan.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants