From 1c6f04a66aa087676891405aa29f4699142dee10 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Fri, 4 Aug 2023 14:15:41 +0100 Subject: [PATCH] FOGL-8002 Add support for delete operations in table registration (#1132) * Checkpoint Signed-off-by: Mark Riddoch * FOGL-8002 Add support for delete operations in table registration Signed-off-by: Mark Riddoch --------- Signed-off-by: Mark Riddoch --- C/services/storage/include/storage_registry.h | 4 + C/services/storage/storage_api.cpp | 2 + C/services/storage/storage_registry.cpp | 129 +++++++++++++++++- 3 files changed, 130 insertions(+), 5 deletions(-) diff --git a/C/services/storage/include/storage_registry.h b/C/services/storage/include/storage_registry.h index 1bb207f9b..bdb109965 100644 --- a/C/services/storage/include/storage_registry.h +++ b/C/services/storage/include/storage_registry.h @@ -34,6 +34,7 @@ class StorageRegistry { void process(const std::string& payload); void processTableInsert(const std::string& tableName, const std::string& payload); void processTableUpdate(const std::string& tableName, const std::string& payload); + void processTableDelete(const std::string& tableName, const std::string& payload); void registerTable(const std::string& table, const std::string& url); void unregisterTable(const std::string& table, const std::string& url); void run(); @@ -43,6 +44,7 @@ class StorageRegistry { void filterPayload(const std::string& url, char *payload, const std::string& asset); void processInsert(char *tableName, char *payload); void processUpdate(char *tableName, char *payload); + void processDelete(char *tableName, char *payload); TableRegistration* parseTableSubscriptionPayload(const std::string& payload); void insertTestTableReg(); @@ -59,6 +61,8 @@ class StorageRegistry { m_tableInsertQueue; std::queue m_tableUpdateQueue; + std::queue + m_tableDeleteQueue; std::mutex m_qMutex; std::mutex m_registrationsMutex; std::mutex m_tableRegistrationsMutex; diff --git a/C/services/storage/storage_api.cpp b/C/services/storage/storage_api.cpp index 79fddd15e..3810b9ad9 100644 --- a/C/services/storage/storage_api.cpp +++ b/C/services/storage/storage_api.cpp @@ -771,6 +771,7 @@ string responsePayload; int rval = plugin->commonDelete(tableName, payload); if (rval != -1) { + registry.processTableDelete(tableName, payload); responsePayload = "{ \"response\" : \"deleted\", \"rows_affected\" : "; responsePayload += to_string(rval); responsePayload += " }"; @@ -1676,6 +1677,7 @@ string responsePayload; int rval = plugin->commonDelete(tableName, payload, const_cast(schemaName.c_str())); if (rval != -1) { + registry.processTableDelete(tableName, payload); responsePayload = "{ \"response\" : \"deleted\", \"rows_affected\" : "; responsePayload += to_string(rval); responsePayload += " }"; diff --git a/C/services/storage/storage_registry.cpp b/C/services/storage/storage_registry.cpp index cb56133b5..9d4c0bc17 100644 --- a/C/services/storage/storage_registry.cpp +++ b/C/services/storage/storage_registry.cpp @@ -139,7 +139,7 @@ StorageRegistry::processTableInsert(const string& tableName, const string& paylo * if any microservice has registered an interest * in this table. Called from StorageApi::commonUpdate() * - * @param payload The table insert payload + * @param payload The table update payload */ void StorageRegistry::processTableUpdate(const string& tableName, const string& payload) @@ -167,6 +167,39 @@ StorageRegistry::processTableUpdate(const string& tableName, const string& paylo } } +/** + * Process a table delete payload and determine + * if any microservice has registered an interest + * in this table. Called from StorageApi::commonDelete() + * + * @param payload The table delete payload + */ +void +StorageRegistry::processTableDelete(const string& tableName, const string& payload) +{ + Logger::getLogger()->info("Checking for registered interest in table %s with delete %s", tableName.c_str(), payload.c_str()); + + if (m_tableRegistrations.size() > 0) + { + /* + * We have some registrations so queue a copy of the payload + * to be examined in the thread the send table notifications + * to interested parties. + */ + char *table = strdup(tableName.c_str()); + char *data = strdup(payload.c_str()); + + if (data != NULL && table != NULL) + { + time_t now = time(0); + TableItem item = make_tuple(now, table, data); + lock_guard guard(m_qMutex); + m_tableDeleteQueue.push(item); + m_cv.notify_all(); + } + } +} + /** * Handle a registration request from a client of the storage layer * @@ -356,7 +389,7 @@ StorageRegistry::run() #endif { unique_lock mlock(m_cvMutex); - while (m_queue.size() == 0 && m_tableInsertQueue.size() == 0 && m_tableUpdateQueue.size() == 0) + while (m_queue.size() == 0 && m_tableInsertQueue.size() == 0 && m_tableUpdateQueue.size() == 0 && m_tableDeleteQueue.size() == 0) { m_cv.wait_for(mlock, std::chrono::seconds(REGISTRY_SLEEP_TIME)); if (!m_running) @@ -435,7 +468,31 @@ StorageRegistry::run() free(data); } } - + + while (!m_tableDeleteQueue.empty()) + { + char *tableName = NULL; + + TableItem item = m_tableDeleteQueue.front(); + m_tableDeleteQueue.pop(); + tableName = get<1>(item); + data = get<2>(item); +#if CHECK_QTIMES + qTime = item.first; +#endif + if (tableName && data) + { +#if CHECK_QTIMES + if (time(0) - qTime > QTIME_THRESHOLD) + { + Logger::getLogger()->error("Table delete data has been queued for %d seconds to be sent to registered party", (time(0) - qTime)); + } +#endif + processDelete(tableName, data); + free(tableName); + free(data); + } + } } } } @@ -497,7 +554,6 @@ StorageRegistry::sendPayload(const string& url, const char *payload) size_t found1 = url.find_first_of("/", found + 3); string hostport = url.substr(found+3, found1 - found - 3); string resource = url.substr(found1); - HttpClient client(hostport); try { client.request("POST", resource, payload); @@ -699,7 +755,7 @@ StorageRegistry::processUpdate(char *tableName, char *payload) if (tblreg->key.empty()) { - // No key to match, send alll updates to table + // No key to match, send all updates to table sendPayload(tblreg->url, payload); } else @@ -792,6 +848,69 @@ StorageRegistry::processUpdate(char *tableName, char *payload) } } +/** + * Process an incoming payload and distribute as required to registered + * services + * + * @param payload The payload to potentially distribute + */ +void +StorageRegistry::processDelete(char *tableName, char *payload) +{ + Document doc; + + doc.Parse(payload); + if (doc.HasParseError()) + { + Logger::getLogger()->error("Unable to parse table delete payload for table %s, request is %s", tableName, payload); + return; + } + + lock_guard guard(m_tableRegistrationsMutex); + for (auto & reg : m_tableRegistrations) + { + if (reg.first->compare(tableName) != 0) + continue; + + TableRegistration *tblreg = reg.second; + + // If key is empty string, no need to match key/value pair in payload + if (tblreg->operation.compare("delete") != 0) + { + continue; + } + if (tblreg->key.empty()) + { + // No key to match, send all updates to table + sendPayload(tblreg->url, payload); + } + else + { + if (doc.HasMember("where") && doc["where"].IsObject()) + { + const Value& where = doc["where"]; + if (where.HasMember("column") && where["column"].IsString() && + where.HasMember("value") && where["value"].IsString()) + { + string updateKey = where["column"].GetString(); + string keyValue = where["value"].GetString(); + if (updateKey.compare(tblreg->key) == 0 && + std::find(tblreg->keyValues.begin(), tblreg->keyValues.end(), keyValue) + != tblreg->keyValues.end()) + { + StringBuffer buffer; + Writer writer(buffer); + where.Accept(writer); + + const char *output = buffer.GetString(); + sendPayload(tblreg->url, output); + } + } + } + } + } +} + /** * Test function to add some dummy/test table subscriptions */