Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FOGL-8002 Add support for delete operations in table registration #1132

Merged
merged 4 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions C/services/storage/include/storage_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class StorageRegistry {
void process(const std::string& payload);
void processTableInsert(const std::string& tableName, const std::string& payload);
void processTableUpdate(const std::string& tableName, const std::string& payload);
void processTableDelete(const std::string& tableName, const std::string& payload);
void registerTable(const std::string& table, const std::string& url);
void unregisterTable(const std::string& table, const std::string& url);
void run();
Expand All @@ -43,6 +44,7 @@ class StorageRegistry {
void filterPayload(const std::string& url, char *payload, const std::string& asset);
void processInsert(char *tableName, char *payload);
void processUpdate(char *tableName, char *payload);
void processDelete(char *tableName, char *payload);
TableRegistration*
parseTableSubscriptionPayload(const std::string& payload);
void insertTestTableReg();
Expand All @@ -59,6 +61,8 @@ class StorageRegistry {
m_tableInsertQueue;
std::queue<StorageRegistry::TableItem>
m_tableUpdateQueue;
std::queue<StorageRegistry::TableItem>
m_tableDeleteQueue;
std::mutex m_qMutex;
std::mutex m_registrationsMutex;
std::mutex m_tableRegistrationsMutex;
Expand Down
2 changes: 2 additions & 0 deletions C/services/storage/storage_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ string responsePayload;
int rval = plugin->commonDelete(tableName, payload);
if (rval != -1)
{
registry.processTableDelete(tableName, payload);
responsePayload = "{ \"response\" : \"deleted\", \"rows_affected\" : ";
responsePayload += to_string(rval);
responsePayload += " }";
Expand Down Expand Up @@ -1676,6 +1677,7 @@ string responsePayload;
int rval = plugin->commonDelete(tableName, payload, const_cast<char*>(schemaName.c_str()));
if (rval != -1)
{
registry.processTableDelete(tableName, payload);
responsePayload = "{ \"response\" : \"deleted\", \"rows_affected\" : ";
responsePayload += to_string(rval);
responsePayload += " }";
Expand Down
129 changes: 124 additions & 5 deletions C/services/storage/storage_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ StorageRegistry::processTableInsert(const string& tableName, const string& paylo
* if any microservice has registered an interest
* in this table. Called from StorageApi::commonUpdate()
*
* @param payload The table insert payload
* @param payload The table update payload
*/
void
StorageRegistry::processTableUpdate(const string& tableName, const string& payload)
Expand Down Expand Up @@ -167,6 +167,39 @@ StorageRegistry::processTableUpdate(const string& tableName, const string& paylo
}
}

/**
* Process a table delete payload and determine
* if any microservice has registered an interest
* in this table. Called from StorageApi::commonDelete()
*
* @param payload The table delete payload
*/
void
StorageRegistry::processTableDelete(const string& tableName, const string& payload)
{
Logger::getLogger()->info("Checking for registered interest in table %s with delete %s", tableName.c_str(), payload.c_str());

if (m_tableRegistrations.size() > 0)
{
/*
* We have some registrations so queue a copy of the payload
* to be examined in the thread the send table notifications
* to interested parties.
*/
char *table = strdup(tableName.c_str());
char *data = strdup(payload.c_str());

if (data != NULL && table != NULL)
{
time_t now = time(0);
TableItem item = make_tuple(now, table, data);
lock_guard<mutex> guard(m_qMutex);
m_tableDeleteQueue.push(item);
m_cv.notify_all();
}
}
}

/**
* Handle a registration request from a client of the storage layer
*
Expand Down Expand Up @@ -356,7 +389,7 @@ StorageRegistry::run()
#endif
{
unique_lock<mutex> mlock(m_cvMutex);
while (m_queue.size() == 0 && m_tableInsertQueue.size() == 0 && m_tableUpdateQueue.size() == 0)
while (m_queue.size() == 0 && m_tableInsertQueue.size() == 0 && m_tableUpdateQueue.size() == 0 && m_tableDeleteQueue.size() == 0)
{
m_cv.wait_for(mlock, std::chrono::seconds(REGISTRY_SLEEP_TIME));
if (!m_running)
Expand Down Expand Up @@ -435,7 +468,31 @@ StorageRegistry::run()
free(data);
}
}


while (!m_tableDeleteQueue.empty())
{
char *tableName = NULL;

TableItem item = m_tableDeleteQueue.front();
m_tableDeleteQueue.pop();
tableName = get<1>(item);
data = get<2>(item);
#if CHECK_QTIMES
qTime = item.first;
#endif
if (tableName && data)
{
#if CHECK_QTIMES
if (time(0) - qTime > QTIME_THRESHOLD)
{
Logger::getLogger()->error("Table delete data has been queued for %d seconds to be sent to registered party", (time(0) - qTime));
}
#endif
processDelete(tableName, data);
free(tableName);
free(data);
}
}
}
}
}
Expand Down Expand Up @@ -497,7 +554,6 @@ StorageRegistry::sendPayload(const string& url, const char *payload)
size_t found1 = url.find_first_of("/", found + 3);
string hostport = url.substr(found+3, found1 - found - 3);
string resource = url.substr(found1);

HttpClient client(hostport);
try {
client.request("POST", resource, payload);
Expand Down Expand Up @@ -699,7 +755,7 @@ StorageRegistry::processUpdate(char *tableName, char *payload)

if (tblreg->key.empty())
{
// No key to match, send alll updates to table
// No key to match, send all updates to table
sendPayload(tblreg->url, payload);
}
else
Expand Down Expand Up @@ -792,6 +848,69 @@ StorageRegistry::processUpdate(char *tableName, char *payload)
}
}

/**
* Process an incoming payload and distribute as required to registered
* services
*
* @param payload The payload to potentially distribute
*/
void
StorageRegistry::processDelete(char *tableName, char *payload)
{
Document doc;

doc.Parse(payload);
if (doc.HasParseError())
{
Logger::getLogger()->error("Unable to parse table delete payload for table %s, request is %s", tableName, payload);
return;
}

lock_guard<mutex> guard(m_tableRegistrationsMutex);
for (auto & reg : m_tableRegistrations)
{
if (reg.first->compare(tableName) != 0)
continue;

TableRegistration *tblreg = reg.second;

// If key is empty string, no need to match key/value pair in payload
if (tblreg->operation.compare("delete") != 0)
{
continue;
}
if (tblreg->key.empty())
{
// No key to match, send all updates to table
sendPayload(tblreg->url, payload);
}
else
{
if (doc.HasMember("where") && doc["where"].IsObject())
{
const Value& where = doc["where"];
if (where.HasMember("column") && where["column"].IsString() &&
where.HasMember("value") && where["value"].IsString())
{
string updateKey = where["column"].GetString();
string keyValue = where["value"].GetString();
if (updateKey.compare(tblreg->key) == 0 &&
std::find(tblreg->keyValues.begin(), tblreg->keyValues.end(), keyValue)
!= tblreg->keyValues.end())
{
StringBuffer buffer;
Writer<StringBuffer> writer(buffer);
where.Accept(writer);

const char *output = buffer.GetString();
sendPayload(tblreg->url, output);
}
}
}
}
}
}

/**
* Test function to add some dummy/test table subscriptions
*/
Expand Down