From d31c6ced415b5a1390b976ac52ca8b3a9500b2c1 Mon Sep 17 00:00:00 2001 From: Sergey Polyakov Date: Fri, 16 Feb 2024 17:33:57 +0200 Subject: [PATCH 1/6] Remove the unused active object queue implementation --- system/inc/active_object.h | 36 -- system/inc/channel.h | 852 ------------------------------------- 2 files changed, 888 deletions(-) delete mode 100644 system/inc/channel.h diff --git a/system/inc/active_object.h b/system/inc/active_object.h index af2af4e90f..2752fca1af 100644 --- a/system/inc/active_object.h +++ b/system/inc/active_object.h @@ -29,7 +29,6 @@ #include #include -#include "channel.h" #include "concurrent_hal.h" #include "hal_platform.h" @@ -344,41 +343,6 @@ class ActiveObjectBase }; - -template -class ActiveObjectChannel : public ActiveObjectBase -{ - cpp::channel _channel; - -protected: - - virtual bool take(Item& item) override - { - return cpp::select().recv_only(_channel, item).try_once(); - } - - virtual bool put(const Item& item) override - { - _channel.send(item); - return true; - } - - -public: - - ActiveObjectChannel(ActiveObjectConfiguration& config) : ActiveObjectBase(config) {} - - /** - * Start the asynchronous processing for this active object. - */ - void start() - { - _channel = cpp::channel(); - start_thread(); - } - -}; - class ActiveObjectQueue : public ActiveObjectBase { protected: diff --git a/system/inc/channel.h b/system/inc/channel.h deleted file mode 100644 index 6e3146a41d..0000000000 --- a/system/inc/channel.h +++ /dev/null @@ -1,852 +0,0 @@ -// Copyright 2014, Alex Horn. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -#ifndef CPP_CHANNEL_H -#define CPP_CHANNEL_H - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace cpp -{ - -namespace internal -{ - -#if __cplusplus <= 201103L -// since C++14 in std, see Herb Sutter's blog -template -std::unique_ptr make_unique(Args&& ...args) -{ - return std::unique_ptr(new T(std::forward(args)...)); -} -#else - using std::make_unique; -#endif - -template -struct _is_exception_safe : - std::integral_constant::value or - std::is_nothrow_move_constructible::value> -{}; - -// Note that currently handshakes between send/receives inside selects -// have higher priority compared to sends/receives outside selects. - -// TODO: investigate and ideally also discuss other scheduling algorithms -template -class _channel -{ -static_assert(N < std::numeric_limits::max(), - "N must be strictly less than the largest possible size_t value"); - -private: - std::mutex m_mutex; - std::condition_variable m_send_begin_cv; - std::condition_variable m_send_end_cv; - std::condition_variable m_recv_cv; - - // FIFO order - std::deque> m_queue; - - bool m_is_send_done; - bool m_is_try_send_done; - bool m_is_recv_ready; - bool m_is_try_send_ready; - bool m_is_try_recv_ready; - - bool is_full() const - { - return m_queue.size() > N; - } - - // Is nonblocking receive and nonblocking send ready for handshake? - bool is_try_ready() const - { - return m_is_try_recv_ready && m_is_try_send_ready; - } - - // Block calling thread until queue becomes nonempty. While waiting - // (i.e. queue is empty), give try_send() a chance to succeed. - // - // \pre: calling thread owns lock - // \post: queue is nonempty and calling thread still owns lock - void _pre_blocking_recv(std::unique_lock& lock) - { - m_is_recv_ready = true; - m_recv_cv.wait(lock, [this]{ return !m_queue.empty(); }); - - // TODO: support the case where both ends of a channel are inside a select - assert(!is_try_ready()); - } - - // Pop front of queue and unblock one _send() (if any) - // - // \pre: calling thread must own lock and queue is nonempty - // \post: calling thread doesn't own lock anymore, and protocol with - // try_send() and try_recv() is fulfilled - void _post_blocking_recv(std::unique_lock& lock) - { - // If queue is full, then there exists either a _send() waiting - // for m_send_end_cv, or try_send() has just enqueued an element. - // - // In general, the converse is false: if there exists a blocking send, - // then a nonblocking receive might have just dequeued an element, - // i.e. queue is not full. - assert(!is_full() || !m_is_send_done || !m_is_try_send_done); - - // blocking and nonblocking send can never occur simultaneously - assert(m_is_try_send_done || m_is_send_done); - - m_queue.pop_front(); - assert(!is_full()); - - // protocol with nonblocking calls - m_is_try_send_done = true; - m_is_recv_ready = false; - m_is_try_recv_ready = false; - m_is_try_send_ready = false; - - // Consider two concurrent _send() calls denoted by s and s'. - // - // Suppose s is waiting to enqueue an element (i.e. m_send_begin_cv), - // whereas s' is waiting for an acknowledgment (i.e. m_send_end_cv) - // that its previously enqueued element has been dequeued. Since s' - // is waiting and the flag m_is_send_done is only modified by _send(), - // m_is_send_done is false. Hence, we notify m_send_end_cv. This - // causes s' to notify s, thereby allowing s to proceed if possible. - // - // Now suppose there is no such s' (say, due to the fact that the - // queue never became full). Then, m_is_send_done == true. Thus, - // m_send_begin_cv is notified and s proceeds if possible. Note - // that if we hadn't notified s this way, then it could deadlock - // in case that it waited on m_is_try_send_done to become true. - if (m_is_send_done) - { - // unlock before notifying threads; otherwise, the - // notified thread would unnecessarily block again - lock.unlock(); - - // nonblocking, see also note below about notifications - m_send_begin_cv.notify_one(); - } - else - { - // unlock before notifying threads; otherwise, the - // notified thread would unnecessarily block again - lock.unlock(); - - // we rely on the following semantics of notify_one(): - // - // if a notification n is issued to s (i.e. s is chosen from among - // a set of threads waiting on a condition variable associated with - // mutex m) but another thread t locks m before s wakes up (i.e. s - // has not owned yet m after n had been issued), then n is retried - // as soon as t unlocks m. The retries repeat until n arrives at s - // meaning that s actually owns m and checks its wait guard. - m_send_end_cv.notify_one(); - } - } - - template - void _send(U&&); - -public: - // \pre: calling thread must own mutex() - // \post: calling thread doesn't own mutex() anymore - template - bool try_send(std::unique_lock&, U&&); - - // \pre: calling thread must own mutex() - // \post: calling thread doesn't own mutex() anymore - std::pair> try_recv_ptr( - std::unique_lock&); - - _channel(const _channel&) = delete; - - // Propagates exceptions thrown by std::condition_variable constructor - _channel() - : m_mutex(), - m_send_begin_cv(), - m_send_end_cv(), - m_recv_cv(), - m_queue(), - m_is_send_done(true), - m_is_try_send_done(true), - m_is_recv_ready(false), - m_is_try_send_ready(false), - m_is_try_recv_ready(false) {} - - // channel lock - std::mutex& mutex() - { - return m_mutex; - } - - // Propagates exceptions thrown by std::condition_variable::wait() - void send(const T& t) - { - _send(t); - } - - // Propagates exceptions thrown by std::condition_variable::wait() - void send(T&& t) - { - _send(std::move(t)); - } - - // Propagates exceptions thrown by std::condition_variable::wait() - T recv(); - - // Propagates exceptions thrown by std::condition_variable::wait() - void recv(T&); - - // Propagates exceptions thrown by std::condition_variable::wait() - std::unique_ptr recv_ptr(); -}; - -} - -template class ichannel; -template class ochannel; - -/// Go-style concurrency - -/// Thread synchronization mechanism as in the Go language. -/// As in Go, cpp::channel are first-class values. -/// -/// Unlike Go, however, cpp::channels cannot be nil -/// not closed. This simplifies the usage of the library. -/// -/// The template arguments are as follows: -/// -/// * T -- type of data to be communicated over channel -/// * N is zero -- synchronous channel -/// * N is positive -- asynchronous channel with queue size N -/// -/// Note that cpp::channel::recv() is only supported if T is -/// exception safe. This is automatically checked at compile time. -/// If T is not exception safe, use any of the other receive member -/// functions. -/// -/// \see http://golang.org/ref/spec#Channel_types -/// \see http://golang.org/ref/spec#Send_statements -/// \see http://golang.org/ref/spec#Receive_operator -/// \see http://golang.org/doc/effective_go.html#chan_of_chan -template -class channel -{ -static_assert(N < std::numeric_limits::max(), - "N must be strictly less than the largest possible size_t value"); - -private: - friend class ichannel; - friend class ochannel; - - std::shared_ptr> m_channel_ptr; - -public: - channel(const channel& other) noexcept - : m_channel_ptr(other.m_channel_ptr) {} - - // Propagates exceptions thrown by std::condition_variable constructor - channel() - : m_channel_ptr(std::make_shared>()) {} - - channel& operator=(const channel& other) noexcept - { - m_channel_ptr = other.m_channel_ptr; - return *this; - } - - bool operator==(const channel& other) const noexcept - { - return m_channel_ptr == other.m_channel_ptr; - } - - bool operator!=(const channel& other) const noexcept - { - return m_channel_ptr != other.m_channel_ptr; - } - - bool operator==(const ichannel&) const noexcept; - bool operator!=(const ichannel&) const noexcept; - - bool operator==(const ochannel&) const noexcept; - bool operator!=(const ochannel&) const noexcept; - - // Propagates exceptions thrown by std::condition_variable::wait() - void send(const T& t) - { - m_channel_ptr->send(t); - } - - // Propagates exceptions thrown by std::condition_variable::wait() - void send(T&& t) - { - m_channel_ptr->send(std::move(t)); - } - - // Propagates exceptions thrown by std::condition_variable::wait() - T recv() - { - static_assert(internal::_is_exception_safe::value, - "Cannot guarantee exception safety, use another recv operator"); - - return m_channel_ptr->recv(); - } - - // Propagates exceptions thrown by std::condition_variable::wait() - std::unique_ptr recv_ptr() - { - return m_channel_ptr->recv_ptr(); - } - - // Propagates exceptions thrown by std::condition_variable::wait() - void recv(T& t) - { - m_channel_ptr->recv(t); - } -}; - -class select; - -/// Can only be used to receive elements of type T -template -class ichannel -{ -private: - friend class select; - friend class channel; - std::shared_ptr> m_channel_ptr; - -public: - ichannel(const channel& other) noexcept - : m_channel_ptr(other.m_channel_ptr) {} - - ichannel(const ichannel& other) noexcept - : m_channel_ptr(other.m_channel_ptr) {} - - ichannel(ichannel&& other) noexcept - : m_channel_ptr(std::move(other.m_channel_ptr)) {} - - ichannel& operator=(const ichannel& other) noexcept - { - m_channel_ptr = other.m_channel_ptr; - return *this; - } - - bool operator==(const ichannel& other) const noexcept - { - return m_channel_ptr == other.m_channel_ptr; - } - - bool operator!=(const ichannel& other) const noexcept - { - return m_channel_ptr != other.m_channel_ptr; - } - - // Propagates exceptions thrown by std::condition_variable::wait() - T recv() - { - static_assert(internal::_is_exception_safe::value, - "Cannot guarantee exception safety, use another recv operator"); - - return m_channel_ptr->recv(); - } - - // Propagates exceptions thrown by std::condition_variable::wait() - void recv(T& t) - { - m_channel_ptr->recv(t); - } - - // Propagates exceptions thrown by std::condition_variable::wait() - std::unique_ptr recv_ptr() - { - return m_channel_ptr->recv_ptr(); - } - -}; - -/// Can only be used to send elements of type T -template -class ochannel -{ -private: - friend class select; - friend class channel; - std::shared_ptr> m_channel_ptr; - -public: - ochannel(const channel& other) noexcept - : m_channel_ptr(other.m_channel_ptr) {} - - ochannel(const ochannel& other) noexcept - : m_channel_ptr(other.m_channel_ptr) {} - - ochannel(ochannel&& other) noexcept - : m_channel_ptr(std::move(other.m_channel_ptr)) {} - - ochannel& operator=(const ochannel& other) noexcept - { - m_channel_ptr = other.m_channel_ptr; - return *this; - } - - bool operator==(const ochannel& other) const noexcept - { - return m_channel_ptr == other.m_channel_ptr; - } - - bool operator!=(const ochannel& other) const noexcept - { - return m_channel_ptr != other.m_channel_ptr; - } - - // Propagates exceptions thrown by std::condition_variable::wait() - void send(const T& t) - { - m_channel_ptr->send(t); - } - - // Propagates exceptions thrown by std::condition_variable::wait() - void send(T&& t) - { - m_channel_ptr->send(std::move(t)); - } -}; - -/// Go's select statement - -/// \see http://golang.org/ref/spec#Select_statements -/// -/// \warning select objects must not be shared between threads -/// -// TODO: investigate and ideally discuss pseudo-random distribution -class select -{ -private: - template - class try_send_nullary - { - private: - template::type> - static bool _run(ochannel& c, U&& u, NullaryFunction f) - { - internal::_channel& _c = *c.m_channel_ptr; - std::unique_lock lock(_c.mutex(), std::defer_lock); - if (lock.try_lock() && _c.try_send(lock, std::forward(u))) - { - assert(!lock.owns_lock()); - f(); - return true; - } - - return false; - } - - public: - bool operator()(ochannel& c, const T& t, NullaryFunction f) - { - return _run(c, t, f); - } - - bool operator()(ochannel& c, T&& t, NullaryFunction f) - { - return _run(c, std::move(t), f); - } - }; - - template - struct try_recv_nullary - { - bool operator()(ichannel& c, T& t, NullaryFunction f) - { - internal::_channel& _c = *c.m_channel_ptr; - std::unique_lock lock(_c.mutex(), std::defer_lock); - if (lock.try_lock()) - { - std::pair> pair = _c.try_recv_ptr(lock); - if (pair.first) - { - assert(!lock.owns_lock()); - t = *pair.second; - f(); - return true; - } - } - - return false; - } - }; - - template - struct try_recv_unary - { - bool operator()(ichannel& c, UnaryFunction f) - { - internal::_channel& _c = *c.m_channel_ptr; - std::unique_lock lock(_c.mutex(), std::defer_lock); - if (lock.try_lock()) - { - std::pair> pair = _c.try_recv_ptr(lock); - if (pair.first) - { - assert(!lock.owns_lock()); - f(std::move(*pair.second)); - return true; - } - } - - return false; - } - }; - - typedef std::function try_function; - typedef std::vector try_functions; - try_functions m_try_functions; - - std::mt19937 random_gen; - -public: - select() - : m_try_functions(), - random_gen(std::time(nullptr)) {} - - /* send cases */ - - template::type> - select& send_only(channel c, T&& t) - { - return send_only(ochannel(c), std::forward(t)); - } - - template::type> - select& send_only(ochannel c, T&& t) - { - return send(c, std::forward(t), [](){ /* skip */ }); - } - - template::type> - select& send(channel c, T&& t, NullaryFunction f) - { - return send(ochannel(c), std::forward(t), f); - } - - template::type> - select& send(ochannel c, T&& t, NullaryFunction f) - { - m_try_functions.push_back(std::bind( - try_send_nullary(), c, std::forward(t), f)); - return *this; - } - - /* receive cases */ - - template - select& recv_only(channel c, T& t) - { - return recv_only(ichannel(c), t); - } - - template - select& recv_only(ichannel c, T& t) - { - return recv(c, t, [](){ /* skip */ }); - } - - template - select& recv(channel c, T& t, NullaryFunction f) - { - return recv(ichannel(c), t, f); - } - - template - select& recv(ichannel c, T& t, NullaryFunction f) - { - m_try_functions.push_back(std::bind( - try_recv_nullary(), c, std::ref(t), f)); - return *this; - } - - template - select& recv(channel c, UnaryFunction f) - { - return recv(ichannel(c), f); - } - - template - select& recv(ichannel c, UnaryFunction f) - { - m_try_functions.push_back(std::bind( - try_recv_unary(), c, f)); - return *this; - } - - /// Nonblocking like Go's select statement with default case - - /// Returns true if and only if exactly one case succeeded - bool try_once() - { - const try_functions::size_type n = m_try_functions.size(), i = random_gen(); - for(try_functions::size_type j = 0; j < n; j++) - { - if (m_try_functions.at((i + j) % n)()) - return true; - } - return false; - } - - void wait() - { - const try_functions::size_type n = m_try_functions.size(); - try_functions::size_type i = random_gen(); - for(;;) - { - i = (i + 1) % n; - if (m_try_functions.at(i)()) - break; - } - } - - // Propagates any exception thrown by std::this_thread::sleep_for - template - void wait(const std::chrono::duration& sleep) - { - const try_functions::size_type n = m_try_functions.size(); - try_functions::size_type i = random_gen(); - for(;;) - { - i = (i + 1) % n; - if (m_try_functions.at(i)()) - break; - - std::this_thread::sleep_for(sleep); - } - } -}; - -template -bool channel::operator==(const ichannel& other) const noexcept -{ - return m_channel_ptr == other.m_channel_ptr; -} - -template -bool channel::operator!=(const ichannel& other) const noexcept -{ - return m_channel_ptr != other.m_channel_ptr; -} - -template -bool channel::operator==(const ochannel& other) const noexcept -{ - return m_channel_ptr == other.m_channel_ptr; -} - -template -bool channel::operator!=(const ochannel& other) const noexcept -{ - return m_channel_ptr != other.m_channel_ptr; -} - -template -template -bool internal::_channel::try_send( - std::unique_lock& lock, U&& u) -{ - m_is_try_send_ready = true; - - // TODO: support the case where both ends of a channel are inside a select - assert(!is_try_ready()); - - if ((!m_is_send_done || !m_is_try_send_done || is_full() || - (0 == N - m_queue.size() && !m_is_recv_ready))) - { - // TODO: investigate potential LLVM libc++ RAII unlocking issue - lock.unlock(); - return false; - } - - assert(m_is_send_done); - assert(m_is_try_send_done); - assert(!is_full()); - - // if enqueue should block, there must be a receiver waiting - m_is_try_send_done = 0 < N - m_queue.size(); - assert(m_is_try_send_done || m_is_recv_ready); - - m_queue.emplace_back(std::this_thread::get_id(), std::forward(u)); - - // Let v be the value enqueued by try_send(). If m_is_try_send_done - // is false, no other sender (whether blocking or not) can enqueue a - // value until a receiver has dequeued v, thereby ensuring the channel - // FIFO order when the queue is filled up by try_send(). Moreover, in - // that case, since m_is_try_send_done implies m_is_recv_ready, such a - // receiver is guaranteed to exist, and it will reset m_is_try_send_done - // to true so that other senders can make progress after v has been - // dequeued. And by notifying m_recv_cv, other receivers waiting for - // the queue to become nonempty can make progress as well. - lock.unlock(); - m_recv_cv.notify_one(); - return true; -} - -template -std::pair> internal::_channel::try_recv_ptr( - std::unique_lock& lock) -{ - m_is_try_recv_ready = true; - - if (m_queue.empty()) - return std::make_pair(false, std::unique_ptr(nullptr)); - - // If queue is full, then there exists either a _send() waiting - // for m_send_end_cv, or try_send() has just enqueued an element. - // - // In general, the converse is false: if there exists a blocking send, - // then a nonblocking receive might have just dequeued an element, - // i.e. queue is not full. - assert(!is_full() || !m_is_send_done || !m_is_try_send_done); - - // blocking and nonblocking send can never occur simultaneously - assert(m_is_try_send_done || m_is_send_done); - - std::pair pair(std::move(m_queue.front())); - assert(!is_full() || std::this_thread::get_id() != pair.first); - - // move/copy before pop_front() to ensure strong exception safety - std::unique_ptr t_ptr(make_unique(std::move(pair.second))); - - m_queue.pop_front(); - assert(!is_full()); - - // protocol with nonblocking calls - m_is_try_send_done = true; - m_is_try_recv_ready = false; - m_is_try_send_ready = false; - - // see also explanation in _channel::_post_blocking_recv() - if (m_is_send_done) - { - lock.unlock(); - m_send_begin_cv.notify_one(); - } - else - { - lock.unlock(); - m_send_end_cv.notify_one(); - } - - return std::make_pair(true, std::move(t_ptr)); -} - -template -template -void internal::_channel::_send(U&& u) -{ - // unlock before notifying threads; otherwise, the - // notified thread would unnecessarily block again - { - // wait (if necessary) until queue is no longer full and any - // previous _send() has successfully enqueued element - std::unique_lock lock(m_mutex); - m_send_begin_cv.wait(lock, [this]{ return m_is_send_done && - m_is_try_send_done && !is_full(); }); - - assert(m_is_send_done); - assert(m_is_try_send_done); - assert(!is_full()); - - // TODO: support the case where both ends of a channel are inside a select - assert(!is_try_ready()); - - m_queue.emplace_back(std::this_thread::get_id(), std::forward(u)); - m_is_send_done = false; - } - - // nonblocking - m_recv_cv.notify_one(); - - // wait (if necessary) until u has been received by another thread - { - std::unique_lock lock(m_mutex); - - // It is enough to check !is_full() because m_is_send_done == false. - // Therefore, no other thread could have caused the queue to fill up - // during the brief time we didn't own the lock. - // - // Performance note: unblocks after at least N successful recv calls - m_send_end_cv.wait(lock, [this]{ return !is_full(); }); - m_is_send_done = true; - } - - // see also explanation in _channel::recv() - m_send_begin_cv.notify_one(); -} - -template -T internal::_channel::recv() -{ - static_assert(internal::_is_exception_safe::value, - "Cannot guarantee exception safety, use another recv operator"); - - std::unique_lock lock(m_mutex); - _pre_blocking_recv(lock); - - std::pair pair(std::move(m_queue.front())); - assert(!is_full() || std::this_thread::get_id() != pair.first); - - _post_blocking_recv(lock); - return std::move(pair.second); -} - -template -void internal::_channel::recv(T& t) -{ - std::unique_lock lock(m_mutex); - _pre_blocking_recv(lock); - - std::pair pair(std::move(m_queue.front())); - assert(!is_full() || std::this_thread::get_id() != pair.first); - - // assignment before pop_front() to ensure strong exception safety - t = std::move(pair.second); - _post_blocking_recv(lock); -} - -template -std::unique_ptr internal::_channel::recv_ptr() -{ - std::unique_lock lock(m_mutex); - _pre_blocking_recv(lock); - - std::pair pair(std::move(m_queue.front())); - assert(!is_full() || std::this_thread::get_id() != pair.first); - - // move/copy before pop_front() to ensure strong exception safety - std::unique_ptr t_ptr(make_unique(std::move(pair.second))); - _post_blocking_recv(lock); - return t_ptr; -} - -} - -#endif \ No newline at end of file From 4f1d836cfb9ee269c19ca0276050536b64901d54 Mon Sep 17 00:00:00 2001 From: Sergey Polyakov Date: Fri, 16 Feb 2024 18:32:11 +0200 Subject: [PATCH 2/6] Drop chunk notification events if the event queue is full --- system/inc/active_object.h | 14 +++++++------- system/inc/system_event.h | 3 ++- system/inc/system_threading.h | 7 +++++++ system/src/firmware_update.cpp | 2 +- system/src/system_event.cpp | 10 +++++++--- 5 files changed, 24 insertions(+), 12 deletions(-) diff --git a/system/inc/active_object.h b/system/inc/active_object.h index 2752fca1af..79d95f0f87 100644 --- a/system/inc/active_object.h +++ b/system/inc/active_object.h @@ -283,7 +283,7 @@ class ActiveObjectBase // todo - concurrent queue should be a strategy so it's pluggable without requiring inheritance virtual bool take(Item& item)=0; - virtual bool put(Item& item)=0; + virtual bool put(Item& item, bool dontBlock = false)=0; /** * Static thread entrypoint to run this active object loop. @@ -315,13 +315,13 @@ class ActiveObjectBase return started; } - template void invoke_async(const std::function& work) + template void invoke_async(const std::function& work, bool dontBlock = false) { auto task = new AsyncTask(work); if (task) { Item message = task; - if (!put(message)) + if (!put(message, dontBlock)) delete task; } } @@ -354,9 +354,9 @@ class ActiveObjectQueue : public ActiveObjectBase return !os_queue_take(queue, &result, configuration.take_wait, nullptr); } - virtual bool put(Item& item) + virtual bool put(Item& item, bool dontBlock) { - return !os_queue_put(queue, &item, configuration.put_wait, nullptr); + return !os_queue_put(queue, &item, dontBlock ? 0 : configuration.put_wait, nullptr); } void createQueue() @@ -422,9 +422,9 @@ class ActiveObjectThreadQueue : public ActiveObjectQueue return r; } - virtual bool put(Item& item) override + virtual bool put(Item& item, bool dontBlock) override { - bool r = ActiveObjectQueue::put(item); + bool r = ActiveObjectQueue::put(item, dontBlock); if (r && _thread != OS_THREAD_INVALID_HANDLE) { os_thread_notify(_thread, nullptr); } diff --git a/system/inc/system_event.h b/system/inc/system_event.h index a350bad032..885d9af48d 100644 --- a/system/inc/system_event.h +++ b/system/inc/system_event.h @@ -113,7 +113,8 @@ enum SystemEventsParam { * Flags altering the behavior of the `system_notify_event()` function. */ enum SystemNotifyEventFlag { - NOTIFY_SYNCHRONOUSLY = 0x01 + NOTIFY_SYNCHRONOUSLY = 0x01, + NOTIFY_IF_POSSIBLE = 0x02 }; #define SYSTEM_EVENT_CONTEXT_VERSION (2) diff --git a/system/inc/system_threading.h b/system/inc/system_threading.h index c48852bcf5..d5e4a2b88b 100644 --- a/system/inc/system_threading.h +++ b/system/inc/system_threading.h @@ -86,6 +86,13 @@ os_mutex_recursive_t mutex_usb_serial(); return; \ } +#define _THREAD_CONTEXT_TRY_ASYNC(thread, fn) \ + if (thread.isStarted() && !thread.isCurrentThread()) { \ + auto lambda = [=]() { (fn); }; \ + thread.invoke_async(particle::FFL(lambda), true /* dontBlock */); \ + return; \ + } + // execute synchronously on the system thread. Since the parameter lifetime is // assumed to be bound by the caller, the parameters don't need marshalling // fn: the function call to perform. This is textually substitued into a lambda, with the diff --git a/system/src/firmware_update.cpp b/system/src/firmware_update.cpp index 8a6be95312..7c6de77e99 100644 --- a/system/src/firmware_update.cpp +++ b/system/src/firmware_update.cpp @@ -259,7 +259,7 @@ int FirmwareUpdate::saveChunk(const char* chunkData, size_t chunkSize, size_t ch // Generate a system event fileDesc_.chunk_address = fileDesc_.file_address + chunkOffset; fileDesc_.chunk_size = chunkSize; - system_notify_event(firmware_update, firmware_update_progress, &fileDesc_); + system_notify_event(firmware_update, firmware_update_progress, &fileDesc_, nullptr /* fn */, nullptr /* fndata */, NOTIFY_IF_POSSIBLE); lastActiveTime_ = HAL_Timer_Get_Milli_Seconds(); return 0; } diff --git a/system/src/system_event.cpp b/system/src/system_event.cpp index a84f5e1338..eb504f8e36 100644 --- a/system/src/system_event.cpp +++ b/system/src/system_event.cpp @@ -87,9 +87,13 @@ void system_notify_event_impl(system_event_t event, uint32_t data, void* pointer } } -void system_notify_event_async(system_event_t event, uint32_t data, void* pointer, void (*fn)(void* data), void* fndata) { +void system_notify_event_async(system_event_t event, uint32_t data, void* pointer, void (*fn)(void* data), void* fndata, bool dontBlock = false) { // run event notifications on the application thread - APPLICATION_THREAD_CONTEXT_ASYNC(system_notify_event_async(event, data, pointer, fn, fndata)); + if (dontBlock) { + APPLICATION_THREAD_CONTEXT_ASYNC(system_notify_event_async(event, data, pointer, fn, fndata)); + } else { + _THREAD_CONTEXT_TRY_ASYNC(particle::ApplicationThread, system_notify_event_async(event, data, pointer, fn, fndata)); // FIXME + } #if PLATFORM_THREADING std::lock_guard lk(sSubscriptionsMutex); #endif // PLATFORM_THREADING @@ -203,7 +207,7 @@ void system_notify_event(system_event_t event, uint32_t data, void* pointer, voi SystemISRTaskQueue.enqueue(task); }; } else { - system_notify_event_async(event, data, pointer, fn, fndata); + system_notify_event_async(event, data, pointer, fn, fndata, flags & NOTIFY_IF_POSSIBLE); } } From d6fa39d4aecb4c271560515e4a581240d7f43e54 Mon Sep 17 00:00:00 2001 From: Sergey Polyakov Date: Mon, 19 Feb 2024 18:54:23 +0200 Subject: [PATCH 3/6] Minor fixes --- hal/src/newhal/static_recursive_mutex.h | 31 +++++++++++++++++++ system/inc/active_object.h | 17 +++++----- system/inc/system_event.h | 4 +-- system/inc/system_threading.h | 4 ++- system/src/firmware_update.cpp | 2 +- system/src/main.cpp | 2 ++ system/src/system_event.cpp | 41 ++++++++++++++++--------- 7 files changed, 75 insertions(+), 26 deletions(-) create mode 100644 hal/src/newhal/static_recursive_mutex.h diff --git a/hal/src/newhal/static_recursive_mutex.h b/hal/src/newhal/static_recursive_mutex.h new file mode 100644 index 0000000000..061650b234 --- /dev/null +++ b/hal/src/newhal/static_recursive_mutex.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024 Particle Industries, Inc. All rights reserved. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + */ + +#pragma once + +class StaticRecursiveMutex { +public: + StaticRecursiveMutex() = default; + + bool lock(unsigned timeout = 0) { + return true; + } + + bool unlock() { + return true; + } +}; diff --git a/system/inc/active_object.h b/system/inc/active_object.h index 79d95f0f87..35b2afb593 100644 --- a/system/inc/active_object.h +++ b/system/inc/active_object.h @@ -315,16 +315,19 @@ class ActiveObjectBase return started; } - template void invoke_async(const std::function& work, bool dontBlock = false) + template bool invoke_async(const std::function& work, bool dontBlock = false) { auto task = new AsyncTask(work); - if (task) - { - Item message = task; - if (!put(message, dontBlock)) - delete task; + if (!task) { + return false; + } + Item message = task; + if (!put(message, dontBlock)) { + delete task; + return false; } - } + return true; + } template SystemPromise* invoke_future(const std::function& work) { diff --git a/system/inc/system_event.h b/system/inc/system_event.h index 885d9af48d..3e6d8eab14 100644 --- a/system/inc/system_event.h +++ b/system/inc/system_event.h @@ -113,8 +113,8 @@ enum SystemEventsParam { * Flags altering the behavior of the `system_notify_event()` function. */ enum SystemNotifyEventFlag { - NOTIFY_SYNCHRONOUSLY = 0x01, - NOTIFY_IF_POSSIBLE = 0x02 + NOTIFY_SYNCHRONOUSLY = 0x01, ///< Invoke the event handlers directly in the calling thread. + NOTIFY_DONT_BLOCK = 0x02 ///< Ignore the event if the event queue of the application thread is full. }; #define SYSTEM_EVENT_CONTEXT_VERSION (2) diff --git a/system/inc/system_threading.h b/system/inc/system_threading.h index d5e4a2b88b..a5a912ebd0 100644 --- a/system/inc/system_threading.h +++ b/system/inc/system_threading.h @@ -86,7 +86,7 @@ os_mutex_recursive_t mutex_usb_serial(); return; \ } -#define _THREAD_CONTEXT_TRY_ASYNC(thread, fn) \ +#define _THREAD_CONTEXT_ASYNC_TRY(thread, fn) \ if (thread.isStarted() && !thread.isCurrentThread()) { \ auto lambda = [=]() { (fn); }; \ thread.invoke_async(particle::FFL(lambda), true /* dontBlock */); \ @@ -112,6 +112,7 @@ os_mutex_recursive_t mutex_usb_serial(); #else // !PLATFORM_THREADING #define _THREAD_CONTEXT_ASYNC(thread, fn) +#define _THREAD_CONTEXT_ASYNC_TRY(thread, fn) #define _THREAD_CONTEXT_ASYNC_RESULT(thread, fn, result) #define SYSTEM_THREAD_CONTEXT_SYNC(fn) @@ -123,6 +124,7 @@ os_mutex_recursive_t mutex_usb_serial(); #define SYSTEM_THREAD_CONTEXT_ASYNC(fn) _THREAD_CONTEXT_ASYNC(particle::SystemThread, fn) #define SYSTEM_THREAD_CONTEXT_ASYNC_RESULT(fn, result) _THREAD_CONTEXT_ASYNC_RESULT(particle::SystemThread, fn, result) #define APPLICATION_THREAD_CONTEXT_ASYNC(fn) _THREAD_CONTEXT_ASYNC(particle::ApplicationThread, fn) +#define APPLICATION_THREAD_CONTEXT_ASYNC_TRY(fn) _THREAD_CONTEXT_ASYNC_TRY(particle::ApplicationThread, fn) #define APPLICATION_THREAD_CONTEXT_ASYNC_RESULT(fn, result) _THREAD_CONTEXT_ASYNC_RESULT(particle::ApplicationThread, fn, result) // Perform an asynchronous function call if not on the system thread, diff --git a/system/src/firmware_update.cpp b/system/src/firmware_update.cpp index 7c6de77e99..56dedafecc 100644 --- a/system/src/firmware_update.cpp +++ b/system/src/firmware_update.cpp @@ -259,7 +259,7 @@ int FirmwareUpdate::saveChunk(const char* chunkData, size_t chunkSize, size_t ch // Generate a system event fileDesc_.chunk_address = fileDesc_.file_address + chunkOffset; fileDesc_.chunk_size = chunkSize; - system_notify_event(firmware_update, firmware_update_progress, &fileDesc_, nullptr /* fn */, nullptr /* fndata */, NOTIFY_IF_POSSIBLE); + system_notify_event(firmware_update, firmware_update_progress, &fileDesc_, nullptr /* fn */, nullptr /* fndata */, NOTIFY_DONT_BLOCK); lastActiveTime_ = HAL_Timer_Get_Milli_Seconds(); return 0; } diff --git a/system/src/main.cpp b/system/src/main.cpp index bb1f99c3d5..57f5954d14 100644 --- a/system/src/main.cpp +++ b/system/src/main.cpp @@ -31,6 +31,8 @@ // STATIC_ASSERT macro clashes with the nRF SDK #define NO_STATIC_ASSERT +#include + #include "debug.h" #include "system_event.h" #include "system_mode.h" diff --git a/system/src/system_event.cpp b/system/src/system_event.cpp index eb504f8e36..5dbbbae48f 100644 --- a/system/src/system_event.cpp +++ b/system/src/system_event.cpp @@ -17,14 +17,17 @@ ****************************************************************************** */ +#define NO_STATIC_ASSERT + #include "system_event.h" #include "system_threading.h" #include "interrupts_hal.h" #include "system_task.h" +#include "static_recursive_mutex.h" +#include "scope_guard.h" #include #include "spark_wiring_vector.h" #include "spark_wiring_interrupts.h" -#include "spark_wiring_thread.h" #include namespace { @@ -75,10 +78,20 @@ struct SystemEventSubscription { // for now a simple implementation spark::Vector subscriptions; #if PLATFORM_THREADING -RecursiveMutex sSubscriptionsMutex; +StaticRecursiveMutex sSubscriptionsMutex; #endif // PLATFORM_THREADING -void system_notify_event_impl(system_event_t event, uint32_t data, void* pointer, void (*fn)(void* data), void* fndata) { +void system_notify_event_impl(system_event_t event, uint32_t data, void* pointer, void (*fn)(void* data), void* fndata, bool isIsr) { +#if PLATFORM_THREADING + if (!isIsr) { + sSubscriptionsMutex.lock(); + } + SCOPE_GUARD({ + if (!isIsr) { + sSubscriptionsMutex.unlock(); + } + }); +#endif // PLATFORM_THREADING for (SystemEventSubscription& subscription : subscriptions) { subscription.notify(event, data, pointer); } @@ -90,14 +103,11 @@ void system_notify_event_impl(system_event_t event, uint32_t data, void* pointer void system_notify_event_async(system_event_t event, uint32_t data, void* pointer, void (*fn)(void* data), void* fndata, bool dontBlock = false) { // run event notifications on the application thread if (dontBlock) { - APPLICATION_THREAD_CONTEXT_ASYNC(system_notify_event_async(event, data, pointer, fn, fndata)); + APPLICATION_THREAD_CONTEXT_ASYNC_TRY(system_notify_event_async(event, data, pointer, fn, fndata, dontBlock)); } else { - _THREAD_CONTEXT_TRY_ASYNC(particle::ApplicationThread, system_notify_event_async(event, data, pointer, fn, fndata)); // FIXME + APPLICATION_THREAD_CONTEXT_ASYNC(system_notify_event_async(event, data, pointer, fn, fndata, dontBlock)); } -#if PLATFORM_THREADING - std::lock_guard lk(sSubscriptionsMutex); -#endif // PLATFORM_THREADING - system_notify_event_impl(event, data, pointer, fn, fndata); + system_notify_event_impl(event, data, pointer, fn, fndata, false /* isIsr */); } class SystemEventTask : public ISRTaskQueue::Task { @@ -149,7 +159,7 @@ int system_subscribe_event(system_event_t events, system_event_handler_t* handle // Modification of subscriptions normally happens from thread context, so for events generated outside ISR // context, only mutex acquisition is sufficient to keep things thread safe (see system_notify_event_async()) #if PLATFORM_THREADING - std::lock_guard lk(sSubscriptionsMutex); + std::lock_guard lk(sSubscriptionsMutex); #endif // PLATFORM_THREADING int r = 0; ATOMIC_BLOCK() { @@ -168,7 +178,7 @@ void system_unsubscribe_event(system_event_t events, system_event_handler_t* han // Modification of subscriptions normally happens from thread context, so for events generated outside ISR // context, only mutex acquisition is sufficient to keep things thread safe (see system_notify_event_async()) #if PLATFORM_THREADING - std::lock_guard lk(sSubscriptionsMutex); + std::lock_guard lk(sSubscriptionsMutex); #endif // PLATFORM_THREADING ATOMIC_BLOCK() { auto it = std::remove_if(subscriptions.begin(), subscriptions.end(), [events, handler, context](const SystemEventSubscription& sub) { @@ -199,15 +209,16 @@ void system_notify_event(system_event_t event, uint32_t data, void* pointer, voi unsigned flags) { // TODO: Add an API that would allow user applications to control which event handlers can be // executed synchronously, possibly in the context of an ISR + bool isIsr = hal_interrupt_is_isr(); if (flags & NOTIFY_SYNCHRONOUSLY) { - system_notify_event_impl(event, data, pointer, fn, fndata); - } else if (hal_interrupt_is_isr()) { + system_notify_event_impl(event, data, pointer, fn, fndata, isIsr); + } else if (isIsr) { auto task = systemPoolNew(event, data, pointer, fn, fndata); if (task) { SystemISRTaskQueue.enqueue(task); - }; + } } else { - system_notify_event_async(event, data, pointer, fn, fndata, flags & NOTIFY_IF_POSSIBLE); + system_notify_event_async(event, data, pointer, fn, fndata, flags & NOTIFY_DONT_BLOCK); } } From 570aab766e2436547f60ec9ad4adb6ea5c1bee33 Mon Sep 17 00:00:00 2001 From: Sergey Polyakov Date: Mon, 19 Feb 2024 20:50:28 +0200 Subject: [PATCH 4/6] Add test --- user/tests/wiring/no_fixture/thread.cpp | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/user/tests/wiring/no_fixture/thread.cpp b/user/tests/wiring/no_fixture/thread.cpp index f433e1c611..83ddf7ef84 100644 --- a/user/tests/wiring/no_fixture/thread.cpp +++ b/user/tests/wiring/no_fixture/thread.cpp @@ -248,6 +248,24 @@ test(THREAD_08_newlib_reent_impure_ptr_changes_on_context_switch) assertEqual((uintptr_t)testImpure, (uintptr_t)_impure_ptr); } +test(THREAD_09_dont_block_event_queue_option) +{ + if (system_thread_get_state(nullptr) != spark::feature::ENABLED) { + skip(); + return; + } + ActiveObjectBase* app = (ActiveObjectBase*)system_internal(0, nullptr); // Returns application thread instance + test_val_fn1 = 0; + std::function fn = increment; + for (int i = 0; i < 20; ++i) { + assertTrue(app->invoke_async(fn, true /* dontBlock */)); + } + assertFalse(app->invoke_async(fn, true)); + while (Particle.process()) { + } + assertEqual((int)test_val_fn1, 20); +} + // todo - test for SingleThreadedSection From b5657393dc8f191301dc912e11935c21ac588f07 Mon Sep 17 00:00:00 2001 From: Sergey Polyakov Date: Thu, 22 Feb 2024 18:33:55 +0200 Subject: [PATCH 5/6] Inline methods that depend on PARTICLE_WIRING_PRINT_NO_FLOAT to avoid linker errors --- wiring/inc/spark_wiring_print.h | 76 +++++++++++++++++++++++++++++-- wiring/src/spark_wiring_print.cpp | 73 ----------------------------- 2 files changed, 71 insertions(+), 78 deletions(-) diff --git a/wiring/inc/spark_wiring_print.h b/wiring/inc/spark_wiring_print.h index adc2748f14..6eea33abc3 100644 --- a/wiring/inc/spark_wiring_print.h +++ b/wiring/inc/spark_wiring_print.h @@ -35,6 +35,7 @@ #include "spark_wiring_printable.h" #include "spark_wiring_fixed_point.h" +#include #include #include @@ -65,9 +66,60 @@ class Print size_t printNumber(unsigned long, uint8_t); size_t printNumber(unsigned long long, uint8_t); + #ifndef PARTICLE_WIRING_PRINT_NO_FLOAT - size_t printFloat(double, uint8_t); + size_t printFloat(double number, uint8_t digits) { + size_t n = 0; + + if (std::isnan(number)) { + return print("nan"); + } + if (std::isinf(number)) { + return print("inf"); + } + if (number > 4294967040.0) { + return print ("ovf"); // constant determined empirically + } + if (number <-4294967040.0) { + return print ("ovf"); // constant determined empirically + } + + // Handle negative numbers + if (number < 0.0) { + n += print('-'); + number = -number; + } + + // Round correctly so that print(1.999, 2) prints as "2.00" + double rounding = 0.5; + for (uint8_t i = 0; i < digits; ++i) { + rounding /= 10.0; + } + + number += rounding; + + // Extract the integer part of the number and print it + unsigned long int_part = (unsigned long)number; + double remainder = number - (double)int_part; + n += print(int_part); + + // Print the decimal point, but only if there are digits beyond + if (digits > 0) { + n += print("."); + } + + // Extract digits from the remainder one at a time + while (digits-- > 0) { + remainder *= 10.0; + int toPrint = int(remainder); + n += print(toPrint); + remainder -= toPrint; + } + + return n; + } #endif // PARTICLE_WIRING_PRINT_NO_FLOAT + size_t printVariant(const particle::Variant& var); protected: @@ -92,9 +144,15 @@ class Print template ::value && (std::is_integral::value || std::is_convertible::value || std::is_convertible::value), int> = 0> size_t print(T, int = DEC); + #ifndef PARTICLE_WIRING_PRINT_NO_FLOAT - size_t print(float, int = 2); - size_t print(double, int = 2); + size_t print(float n, int digits) { + return printFloat((double)n, digits); + } + + size_t print(double n, int digits) { + return printFloat(n, digits); + } #endif // PARTICLE_WIRING_PRINT_NO_FLOAT // Prevent implicit constructors of Variant from affecting overload resolution @@ -115,9 +173,17 @@ class Print n += println(); return n; } + #ifndef PARTICLE_WIRING_PRINT_NO_FLOAT - size_t println(float, int = 2); - size_t println(double, int = 2); + size_t println(float num, int digits) { + return println((double)num, digits); + } + + size_t println(double num, int digits) { + size_t n = print(num, digits); + n += println(); + return n; + } #endif // PARTICLE_WIRING_PRINT_NO_FLOAT template, int> = 0> diff --git a/wiring/src/spark_wiring_print.cpp b/wiring/src/spark_wiring_print.cpp index 779db026bf..1944cffac2 100644 --- a/wiring/src/spark_wiring_print.cpp +++ b/wiring/src/spark_wiring_print.cpp @@ -124,18 +124,6 @@ size_t Print::print(char c) return write(c); } -#ifndef PARTICLE_WIRING_PRINT_NO_FLOAT -size_t Print::print(float n, int digits) -{ - return printFloat((double)n, digits); -} - -size_t Print::print(double n, int digits) -{ - return printFloat(n, digits); -} -#endif // PARTICLE_WIRING_PRINT_NO_FLOAT - size_t Print::print(const Printable& x) { return x.printTo(*this); @@ -167,20 +155,6 @@ size_t Print::println(char c) return n; } -#ifndef PARTICLE_WIRING_PRINT_NO_FLOAT -size_t Print::println(float num, int digits) -{ - return println((double)num, digits); -} - -size_t Print::println(double num, int digits) -{ - size_t n = print(num, digits); - n += println(); - return n; -} -#endif // PARTICLE_WIRING_PRINT_NO_FLOAT - size_t Print::println(const Printable& x) { size_t n = print(x); @@ -234,53 +208,6 @@ size_t Print::printNumber(unsigned long n, uint8_t base) { return write(str); } -#ifndef PARTICLE_WIRING_PRINT_NO_FLOAT -size_t Print::printFloat(double number, uint8_t digits) -{ - size_t n = 0; - - if (isnan(number)) return print("nan"); - if (isinf(number)) return print("inf"); - if (number > 4294967040.0) return print ("ovf"); // constant determined empirically - if (number <-4294967040.0) return print ("ovf"); // constant determined empirically - - // Handle negative numbers - if (number < 0.0) - { - n += print('-'); - number = -number; - } - - // Round correctly so that print(1.999, 2) prints as "2.00" - double rounding = 0.5; - for (uint8_t i=0; i 0) { - n += print("."); - } - - // Extract digits from the remainder one at a time - while (digits-- > 0) - { - remainder *= 10.0; - int toPrint = int(remainder); - n += print(toPrint); - remainder -= toPrint; - } - - return n; -} -#endif // PARTICLE_WIRING_PRINT_NO_FLOAT - size_t Print::printVariant(const Variant& var) { JSONStreamWriter writer(*this); writeVariant(var, writer); From 28e9d840d7a86ebc1efded263bc86b025b0fc3b7 Mon Sep 17 00:00:00 2001 From: Andrey Tolstoy Date: Mon, 26 Feb 2024 18:41:09 +0700 Subject: [PATCH 6/6] [wiring] Print: fix default float digits --- wiring/inc/spark_wiring_print.h | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/wiring/inc/spark_wiring_print.h b/wiring/inc/spark_wiring_print.h index 6eea33abc3..3af8efcf62 100644 --- a/wiring/inc/spark_wiring_print.h +++ b/wiring/inc/spark_wiring_print.h @@ -68,6 +68,9 @@ class Print size_t printNumber(unsigned long long, uint8_t); #ifndef PARTICLE_WIRING_PRINT_NO_FLOAT + + static constexpr auto FLOAT_DEFAULT_FRACTIONAL_DIGITS = 2; + size_t printFloat(double number, uint8_t digits) { size_t n = 0; @@ -146,11 +149,11 @@ class Print size_t print(T, int = DEC); #ifndef PARTICLE_WIRING_PRINT_NO_FLOAT - size_t print(float n, int digits) { + size_t print(float n, int digits = FLOAT_DEFAULT_FRACTIONAL_DIGITS) { return printFloat((double)n, digits); } - size_t print(double n, int digits) { + size_t print(double n, int digits = FLOAT_DEFAULT_FRACTIONAL_DIGITS) { return printFloat(n, digits); } #endif // PARTICLE_WIRING_PRINT_NO_FLOAT @@ -175,11 +178,11 @@ class Print } #ifndef PARTICLE_WIRING_PRINT_NO_FLOAT - size_t println(float num, int digits) { + size_t println(float num, int digits = FLOAT_DEFAULT_FRACTIONAL_DIGITS) { return println((double)num, digits); } - size_t println(double num, int digits) { + size_t println(double num, int digits = FLOAT_DEFAULT_FRACTIONAL_DIGITS) { size_t n = print(num, digits); n += println(); return n;