Skip to content

Commit

Permalink
FOGL-8002 Add support for delete operations in table registration (#1132
Browse files Browse the repository at this point in the history
)

* Checkpoint

Signed-off-by: Mark Riddoch <[email protected]>

* FOGL-8002 Add support for delete operations in table registration

Signed-off-by: Mark Riddoch <[email protected]>

---------

Signed-off-by: Mark Riddoch <[email protected]>
  • Loading branch information
MarkRiddoch authored Aug 4, 2023
1 parent 4617e08 commit 1c6f04a
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 5 deletions.
4 changes: 4 additions & 0 deletions C/services/storage/include/storage_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class StorageRegistry {
void process(const std::string& payload);
void processTableInsert(const std::string& tableName, const std::string& payload);
void processTableUpdate(const std::string& tableName, const std::string& payload);
void processTableDelete(const std::string& tableName, const std::string& payload);
void registerTable(const std::string& table, const std::string& url);
void unregisterTable(const std::string& table, const std::string& url);
void run();
Expand All @@ -43,6 +44,7 @@ class StorageRegistry {
void filterPayload(const std::string& url, char *payload, const std::string& asset);
void processInsert(char *tableName, char *payload);
void processUpdate(char *tableName, char *payload);
void processDelete(char *tableName, char *payload);
TableRegistration*
parseTableSubscriptionPayload(const std::string& payload);
void insertTestTableReg();
Expand All @@ -59,6 +61,8 @@ class StorageRegistry {
m_tableInsertQueue;
std::queue<StorageRegistry::TableItem>
m_tableUpdateQueue;
std::queue<StorageRegistry::TableItem>
m_tableDeleteQueue;
std::mutex m_qMutex;
std::mutex m_registrationsMutex;
std::mutex m_tableRegistrationsMutex;
Expand Down
2 changes: 2 additions & 0 deletions C/services/storage/storage_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ string responsePayload;
int rval = plugin->commonDelete(tableName, payload);
if (rval != -1)
{
registry.processTableDelete(tableName, payload);
responsePayload = "{ \"response\" : \"deleted\", \"rows_affected\" : ";
responsePayload += to_string(rval);
responsePayload += " }";
Expand Down Expand Up @@ -1676,6 +1677,7 @@ string responsePayload;
int rval = plugin->commonDelete(tableName, payload, const_cast<char*>(schemaName.c_str()));
if (rval != -1)
{
registry.processTableDelete(tableName, payload);
responsePayload = "{ \"response\" : \"deleted\", \"rows_affected\" : ";
responsePayload += to_string(rval);
responsePayload += " }";
Expand Down
129 changes: 124 additions & 5 deletions C/services/storage/storage_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ StorageRegistry::processTableInsert(const string& tableName, const string& paylo
* if any microservice has registered an interest
* in this table. Called from StorageApi::commonUpdate()
*
* @param payload The table insert payload
* @param payload The table update payload
*/
void
StorageRegistry::processTableUpdate(const string& tableName, const string& payload)
Expand Down Expand Up @@ -167,6 +167,39 @@ StorageRegistry::processTableUpdate(const string& tableName, const string& paylo
}
}

/**
* Process a table delete payload and determine
* if any microservice has registered an interest
* in this table. Called from StorageApi::commonDelete()
*
* @param payload The table delete payload
*/
void
StorageRegistry::processTableDelete(const string& tableName, const string& payload)
{
Logger::getLogger()->info("Checking for registered interest in table %s with delete %s", tableName.c_str(), payload.c_str());

if (m_tableRegistrations.size() > 0)
{
/*
* We have some registrations so queue a copy of the payload
* to be examined in the thread the send table notifications
* to interested parties.
*/
char *table = strdup(tableName.c_str());
char *data = strdup(payload.c_str());

if (data != NULL && table != NULL)
{
time_t now = time(0);
TableItem item = make_tuple(now, table, data);
lock_guard<mutex> guard(m_qMutex);
m_tableDeleteQueue.push(item);
m_cv.notify_all();
}
}
}

/**
* Handle a registration request from a client of the storage layer
*
Expand Down Expand Up @@ -356,7 +389,7 @@ StorageRegistry::run()
#endif
{
unique_lock<mutex> mlock(m_cvMutex);
while (m_queue.size() == 0 && m_tableInsertQueue.size() == 0 && m_tableUpdateQueue.size() == 0)
while (m_queue.size() == 0 && m_tableInsertQueue.size() == 0 && m_tableUpdateQueue.size() == 0 && m_tableDeleteQueue.size() == 0)
{
m_cv.wait_for(mlock, std::chrono::seconds(REGISTRY_SLEEP_TIME));
if (!m_running)
Expand Down Expand Up @@ -435,7 +468,31 @@ StorageRegistry::run()
free(data);
}
}


while (!m_tableDeleteQueue.empty())
{
char *tableName = NULL;

TableItem item = m_tableDeleteQueue.front();
m_tableDeleteQueue.pop();
tableName = get<1>(item);
data = get<2>(item);
#if CHECK_QTIMES
qTime = item.first;
#endif
if (tableName && data)
{
#if CHECK_QTIMES
if (time(0) - qTime > QTIME_THRESHOLD)
{
Logger::getLogger()->error("Table delete data has been queued for %d seconds to be sent to registered party", (time(0) - qTime));
}
#endif
processDelete(tableName, data);
free(tableName);
free(data);
}
}
}
}
}
Expand Down Expand Up @@ -497,7 +554,6 @@ StorageRegistry::sendPayload(const string& url, const char *payload)
size_t found1 = url.find_first_of("/", found + 3);
string hostport = url.substr(found+3, found1 - found - 3);
string resource = url.substr(found1);

HttpClient client(hostport);
try {
client.request("POST", resource, payload);
Expand Down Expand Up @@ -699,7 +755,7 @@ StorageRegistry::processUpdate(char *tableName, char *payload)

if (tblreg->key.empty())
{
// No key to match, send alll updates to table
// No key to match, send all updates to table
sendPayload(tblreg->url, payload);
}
else
Expand Down Expand Up @@ -792,6 +848,69 @@ StorageRegistry::processUpdate(char *tableName, char *payload)
}
}

/**
* Process an incoming payload and distribute as required to registered
* services
*
* @param payload The payload to potentially distribute
*/
void
StorageRegistry::processDelete(char *tableName, char *payload)
{
Document doc;

doc.Parse(payload);
if (doc.HasParseError())
{
Logger::getLogger()->error("Unable to parse table delete payload for table %s, request is %s", tableName, payload);
return;
}

lock_guard<mutex> guard(m_tableRegistrationsMutex);
for (auto & reg : m_tableRegistrations)
{
if (reg.first->compare(tableName) != 0)
continue;

TableRegistration *tblreg = reg.second;

// If key is empty string, no need to match key/value pair in payload
if (tblreg->operation.compare("delete") != 0)
{
continue;
}
if (tblreg->key.empty())
{
// No key to match, send all updates to table
sendPayload(tblreg->url, payload);
}
else
{
if (doc.HasMember("where") && doc["where"].IsObject())
{
const Value& where = doc["where"];
if (where.HasMember("column") && where["column"].IsString() &&
where.HasMember("value") && where["value"].IsString())
{
string updateKey = where["column"].GetString();
string keyValue = where["value"].GetString();
if (updateKey.compare(tblreg->key) == 0 &&
std::find(tblreg->keyValues.begin(), tblreg->keyValues.end(), keyValue)
!= tblreg->keyValues.end())
{
StringBuffer buffer;
Writer<StringBuffer> writer(buffer);
where.Accept(writer);

const char *output = buffer.GetString();
sendPayload(tblreg->url, output);
}
}
}
}
}
}

/**
* Test function to add some dummy/test table subscriptions
*/
Expand Down

0 comments on commit 1c6f04a

Please sign in to comment.