From ebf45d66db3fe696ab6253a511a763a041776a1a Mon Sep 17 00:00:00 2001 From: Kamil Cudnik Date: Thu, 14 Sep 2017 17:02:31 -0700 Subject: [PATCH] [syncd]: Refactor mutexes (#221) --- syncd/syncd.cpp | 32 ++-- syncd/syncd.h | 5 +- syncd/syncd_counters.cpp | 4 +- syncd/syncd_notifications.cpp | 285 +++++++++++++++++++++++++++++----- syncd/syncd_saiswitch.cpp | 2 - 5 files changed, 272 insertions(+), 56 deletions(-) diff --git a/syncd/syncd.cpp b/syncd/syncd.cpp index 808b820f6a76..d82899d6cf53 100644 --- a/syncd/syncd.cpp +++ b/syncd/syncd.cpp @@ -7,7 +7,19 @@ #include #include -std::mutex g_db_mutex; +/** + * @brief Global mutex for thread synchronization + * + * Purpose of this mutex is to synchronize multiple threads like main thread, + * counters and notifications as well as all operations which require multiple + * Redis DB access. + * + * For example: query DB for next VID id number, and then put map RID and VID + * to Redis. From syncd point of view this entire operation should be atomic + * and no other thread should access DB or make assumption on previous + * information until entire operation will finish. + */ +std::mutex g_mutex; std::shared_ptr g_redisClient; std::shared_ptr getResponse; @@ -41,7 +53,7 @@ std::map> switches; * could be vlan members, bridge ports etc. * * We need this list to later on not put them back to temp view mode when doing - * populate existing obejcts in apply view mode. + * populate existing objects in apply view mode. * * Object ids here a VIDs. */ @@ -327,8 +339,6 @@ sai_object_id_t translate_rid_to_vid( _In_ sai_object_id_t rid, _In_ sai_object_id_t switch_vid) { - std::lock_guard lock(g_db_mutex); - SWSS_LOG_ENTER(); /* @@ -516,7 +526,6 @@ void translate_rid_to_vid_list( sai_object_id_t translate_vid_to_rid( _In_ sai_object_id_t vid) { - std::lock_guard lock(g_db_mutex); SWSS_LOG_ENTER(); @@ -679,7 +688,6 @@ void snoop_get_attr( SWSS_LOG_DEBUG("%s", key.c_str()); - std::lock_guard lock(g_db_mutex); g_redisClient->hset(key, attr_id, attr_value); } @@ -1208,7 +1216,6 @@ sai_status_t handle_generic( */ { - std::lock_guard lock(g_db_mutex); g_redisClient->hset(VIDTORID, str_vid, str_rid); g_redisClient->hset(RIDTOVID, str_rid, str_vid); @@ -1246,7 +1253,6 @@ sai_status_t handle_generic( */ { - std::lock_guard lock(g_db_mutex); g_redisClient->hdel(VIDTORID, str_vid); g_redisClient->hdel(RIDTOVID, str_rid); @@ -1411,7 +1417,6 @@ void clearTempView() * We need to expose api to execute user lua script not only predefined. */ - std::lock_guard lock(g_db_mutex); for (const auto &key: g_redisClient->keys(pattern)) { @@ -2107,6 +2112,8 @@ sai_status_t processBulkEvent( sai_status_t processEvent( _In_ swss::ConsumerTable &consumer) { + std::lock_guard lock(g_mutex); + SWSS_LOG_ENTER(); swss::KeyOpFieldsValuesTuple kco; @@ -2613,7 +2620,6 @@ bool handleRestartQuery(swss::NotificationConsumer &restartQuery) bool isVeryFirstRun() { - std::lock_guard lock(g_db_mutex); SWSS_LOG_ENTER(); @@ -2761,6 +2767,8 @@ void performWarmRestart() void onSyncdStart(bool warmStart) { + std::lock_guard lock(g_mutex); + /* * It may happen that after initialize we will receive some port * notifications with port'ids that are not in redis db yet, so after @@ -2987,6 +2995,8 @@ int main(int argc, char **argv) startCountersThread(options.countersThreadIntervalInSeconds); } + startNotificationsProcessingThread(); + SWSS_LOG_NOTICE("syncd listening for events"); swss::Select s; @@ -3056,6 +3066,8 @@ int main(int argc, char **argv) SWSS_LOG_ERROR("failed to uninitialize api: %s", sai_serialize_status(status).c_str()); } + stopNotificationsProcessingThread(); + SWSS_LOG_NOTICE("uninitialize finished"); exit(EXIT_SUCCESS); diff --git a/syncd/syncd.h b/syncd/syncd.h index 6ba0a6aac159..83a4d3fd7ead 100644 --- a/syncd/syncd.h +++ b/syncd/syncd.h @@ -58,7 +58,7 @@ extern "C" { #define SWITCH_SAI_THRIFT_RPC_SERVER_PORT 9092 #endif // SAITHRIFT -extern std::mutex g_db_mutex; +extern std::mutex g_mutex; extern std::map> switches; @@ -110,4 +110,7 @@ bool is_set_attribute_workaround( _In_ sai_attr_id_t attrid, _In_ sai_status_t status); +void startNotificationsProcessingThread(); +void stopNotificationsProcessingThread(); + #endif // __SYNCD_H__ diff --git a/syncd/syncd_counters.cpp b/syncd/syncd_counters.cpp index 768a2d5a8b77..f77135e15e86 100644 --- a/syncd/syncd_counters.cpp +++ b/syncd/syncd_counters.cpp @@ -29,8 +29,10 @@ void collectCountersThread( for (auto sw: switches) { + std::lock_guard lock(g_mutex); + /* - * Collect counters should be under mutex sice configuration can + * Collect counters should be under mutex since configuration can * change and we don't want that during counters collection. */ diff --git a/syncd/syncd_notifications.cpp b/syncd/syncd_notifications.cpp index b2ac622473ce..c2323313c100 100644 --- a/syncd/syncd_notifications.cpp +++ b/syncd/syncd_notifications.cpp @@ -1,8 +1,9 @@ #include "syncd.h" #include "sairedis.h" -// mutex to protect notification send call -std::mutex g_ntf_mutex; +#include +#include +#include void send_notification( _In_ std::string op, @@ -29,12 +30,10 @@ void send_notification( send_notification(op, data, entry); } -void on_switch_state_change( +void process_on_switch_state_change( _In_ sai_object_id_t switch_rid, _In_ sai_switch_oper_status_t switch_oper_status) { - std::lock_guard lock(g_ntf_mutex); - SWSS_LOG_ENTER(); sai_object_id_t switch_vid = translate_rid_to_vid(switch_rid, SAI_NULL_OBJECT_ID); @@ -98,8 +97,6 @@ void redisPutFdbEntryToAsicView( const std::string &strField = fvField(e); const std::string &strValue = fvValue(e); - std::lock_guard lock(g_db_mutex); - g_redisClient->hset(key, strField, strValue); } @@ -125,17 +122,13 @@ void redisPutFdbEntryToAsicView( std::string strAttrId = sai_serialize_attr_id(*meta); std::string strAttrValue = sai_serialize_attr_value(*meta, attr); - std::lock_guard lock(g_db_mutex); - g_redisClient->hset(key, strAttrId, strAttrValue); } -void on_fdb_event( +void process_on_fdb_event( _In_ uint32_t count, _In_ sai_fdb_event_notification_data_t *data) { - std::lock_guard lock(g_ntf_mutex); - SWSS_LOG_ENTER(); SWSS_LOG_DEBUG("fdb event count: %d", count); @@ -167,12 +160,10 @@ void on_fdb_event( send_notification("fdb_event", s); } -void on_port_state_change( +void process_on_port_state_change( _In_ uint32_t count, _In_ sai_port_oper_status_notification_t *data) { - std::lock_guard lock(g_ntf_mutex); - SWSS_LOG_ENTER(); SWSS_LOG_DEBUG("port notification count: %u", count); @@ -198,11 +189,9 @@ void on_port_state_change( send_notification("port_state_change", s); } -void on_switch_shutdown_request( +void process_on_switch_shutdown_request( _In_ sai_object_id_t switch_rid) { - std::lock_guard lock(g_ntf_mutex); - SWSS_LOG_ENTER(); sai_object_id_t switch_vid = translate_rid_to_vid(switch_rid, SAI_NULL_OBJECT_ID); @@ -212,6 +201,174 @@ void on_switch_shutdown_request( send_notification("switch_shutdown_request", s); } +void handle_switch_state_change( + _In_ const std::string &data) +{ + SWSS_LOG_ENTER(); + + sai_switch_oper_status_t switch_oper_status; + sai_object_id_t switch_id; + + sai_deserialize_switch_oper_status(data, switch_id, switch_oper_status); + + process_on_switch_state_change(switch_id, switch_oper_status); +} + +void handle_fdb_event( + _In_ const std::string &data) +{ + SWSS_LOG_ENTER(); + + uint32_t count; + sai_fdb_event_notification_data_t *fdbevent = NULL; + + sai_deserialize_fdb_event_ntf(data, count, &fdbevent); + + process_on_fdb_event(count, fdbevent); + + sai_deserialize_free_fdb_event_ntf(count, fdbevent); +} + +void handle_port_state_change( + _In_ const std::string &data) +{ + SWSS_LOG_ENTER(); + + uint32_t count; + sai_port_oper_status_notification_t *portoperstatus = NULL; + + sai_deserialize_port_oper_status_ntf(data, count, &portoperstatus); + + process_on_port_state_change(count, portoperstatus); + + sai_deserialize_free_port_oper_status_ntf(count, portoperstatus); +} + +void handle_switch_shutdown_request( + _In_ const std::string &data) +{ + SWSS_LOG_ENTER(); + + sai_object_id_t switch_id; + + sai_deserialize_switch_shutdown_request(data, switch_id); + + process_on_switch_shutdown_request(switch_id); +} + +void processNotification( + _In_ const swss::KeyOpFieldsValuesTuple &item) +{ + std::lock_guard lock(g_mutex); + + SWSS_LOG_ENTER(); + + std::string notification = kfvKey(item); + std::string data = kfvOp(item); + + if (notification == "switch_state_change") + { + handle_switch_state_change(data); + } + else if (notification == "fdb_event") + { + handle_fdb_event(data); + } + else if (notification == "port_state_change") + { + handle_port_state_change(data); + } + else if (notification == "switch_shutdown_request") + { + handle_switch_shutdown_request(data); + } + else + { + SWSS_LOG_ERROR("unknow notification: %s", notification.c_str()); + } +} + +// condition variable will be used to notify processing thread +// that some notiffication arrived + +std::condition_variable cv; +std::mutex queue_mutex; +std::queue ntf_queue; + +void enqueue_notification( + _In_ std::string op, + _In_ std::string data, + _In_ std::vector &entry) +{ + SWSS_LOG_ENTER(); + + SWSS_LOG_INFO("%s %s", op.c_str(), data.c_str()); + + swss::KeyOpFieldsValuesTuple item(op, data, entry); + + // this is notification context, so we need to protect queue + + std::lock_guard lock(queue_mutex); + + ntf_queue.push(item); + + cv.notify_all(); +} + +void enqueue_notification( + _In_ std::string op, + _In_ std::string data) +{ + SWSS_LOG_ENTER(); + + std::vector entry; + + enqueue_notification(op, data, entry); +} + +void on_switch_state_change( + _In_ sai_object_id_t switch_id, + _In_ sai_switch_oper_status_t switch_oper_status) +{ + SWSS_LOG_ENTER(); + + std::string s = sai_serialize_switch_oper_status(switch_id, switch_oper_status); + + enqueue_notification("switch_state_change", s); +} + +void on_fdb_event( + _In_ uint32_t count, + _In_ sai_fdb_event_notification_data_t *data) +{ + SWSS_LOG_ENTER(); + + std::string s = sai_serialize_fdb_event_ntf(count, data); + + enqueue_notification("fdb_event", s); +} + +void on_port_state_change( + _In_ uint32_t count, + _In_ sai_port_oper_status_notification_t *data) +{ + SWSS_LOG_ENTER(); + + std::string s = sai_serialize_port_oper_status_ntf(count, data); + + enqueue_notification("port_state_change", s); +} + +void on_switch_shutdown_request( + _In_ sai_object_id_t switch_id) +{ + SWSS_LOG_ENTER(); + + std::string s = sai_serialize_switch_shutdown_request(switch_id); + + enqueue_notification("switch_shutdown_request", ""); +} + void on_packet_event( _In_ sai_object_id_t switch_id, _In_ const void *buffer, @@ -219,42 +376,86 @@ void on_packet_event( _In_ uint32_t attr_count, _In_ const sai_attribute_t *attr_list) { - std::lock_guard lock(g_ntf_mutex); - SWSS_LOG_ENTER(); SWSS_LOG_ERROR("not implemented"); +} - /* - TODO translate switch id - std::string s; - sai_serialize_primitive(buffer_size, s); +// determine whether notification thread is running + +volatile bool runThread; - sai_serialize_buffer(buffer, buffer_size, s); +std::mutex ntf_mutex; +std::unique_lock ulock(ntf_mutex); - std::vector entry; +bool tryDequeue( + _Out_ swss::KeyOpFieldsValuesTuple &item) +{ + std::lock_guard lock(queue_mutex); - entry = SaiAttributeList::serialize_attr_list( - SAI_OBJECT_TYPE_PACKET, - attr_count, - attr_list, - false); + SWSS_LOG_ENTER(); - // since attr_list is const, we can't replace rid's - // we need to create copy of that list + if (ntf_queue.empty()) + { + return false; + } - SaiAttributeList copy(SAI_OBJECT_TYPE_PACKET, entry, false); + item = ntf_queue.front(); - translate_rid_to_vid_list(SAI_OBJECT_TYPE_PACKET, copy.get_attr_count(), copy.get_attr_list()); + ntf_queue.pop(); - entry = SaiAttributeList::serialize_attr_list( - SAI_OBJECT_TYPE_PACKET, - copy.get_attr_count(), - copy.get_attr_list(), - false); + return true; +} + +void ntf_process_function() +{ + SWSS_LOG_ENTER(); + + while (runThread) + { + cv.wait(ulock); + + // this is notifications processing thread context, which is different + // from SAI notifications context, we can safe use g_mutex here, + // processing each notification is under same mutex as processing main + // events, counters and reinit + + swss::KeyOpFieldsValuesTuple item; + + while (tryDequeue(item)) + { + processNotification(item); + } + } +} + +std::shared_ptr ntf_process_thread; + +void startNotificationsProcessingThread() +{ + SWSS_LOG_ENTER(); + + runThread = true; + + ntf_process_thread = std::make_shared(ntf_process_function); + + ntf_process_thread->detach(); +} + +void stopNotificationsProcessingThread() +{ + SWSS_LOG_ENTER(); + + runThread = false; + + cv.notify_all(); + + if (ntf_process_thread != nullptr) + { + ntf_process_thread->join(); + } - send_notification("packet_event", s, entry); - */ + ntf_process_thread = nullptr; } sai_switch_state_change_notification_fn on_switch_state_change_ntf = on_switch_state_change; diff --git a/syncd/syncd_saiswitch.cpp b/syncd/syncd_saiswitch.cpp index f85b656c66e6..4b6edf4548a8 100644 --- a/syncd/syncd_saiswitch.cpp +++ b/syncd/syncd_saiswitch.cpp @@ -604,8 +604,6 @@ void SaiSwitch::collectCounters( values.push_back(fvt); } - std::lock_guard lock(g_db_mutex); - countersTable.set(strPortId, values, ""); } }