From 64e9532ca1d59aa7299ab9071cd90338de70e5d7 Mon Sep 17 00:00:00 2001 From: Nikita Poltorapavlo Date: Mon, 9 Sep 2024 18:43:32 +0300 Subject: [PATCH 1/2] DELIA-66118: PersistentStore improvements Reason for change: Decouple notification, add timeouts for iarm/sqlite. Pass customer's scope to the event as is. Test Procedure: None Risks: None Signed-off-by: Nikita Poltorapavlo --- PersistentStore/Module.h | 2 + PersistentStore/sqlite/Store2.h | 112 +++++++++++++++--- PersistentStore/sqlite/l1test/Store2Test.cpp | 11 +- .../sqlite/l1test/WorkerPoolImplementation.h | 49 ++++++++ 4 files changed, 155 insertions(+), 19 deletions(-) create mode 100644 PersistentStore/sqlite/l1test/WorkerPoolImplementation.h diff --git a/PersistentStore/Module.h b/PersistentStore/Module.h index 342536a741..88ac1288c7 100644 --- a/PersistentStore/Module.h +++ b/PersistentStore/Module.h @@ -34,6 +34,8 @@ #define MAXVALUE_ENV "PERSISTENTSTORE_MAXVALUE" #define LIMIT_ENV "PERSISTENTSTORE_LIMIT" #define IARM_INIT_NAME "Thunder_Plugins" +#define IARM_TIMEOUT 1000 +#define SQLITE_TIMEOUT 1000 #undef EXTERNAL #define EXTERNAL diff --git a/PersistentStore/sqlite/Store2.h b/PersistentStore/sqlite/Store2.h index 1b9b9bbb13..4091f48661 100644 --- a/PersistentStore/sqlite/Store2.h +++ b/PersistentStore/sqlite/Store2.h @@ -36,6 +36,35 @@ namespace Plugin { public Exchange::IStoreCache, public Exchange::IStoreInspector, public Exchange::IStoreLimit { + private: + class Job : public Core::IDispatch { + public: + Job(Store2* parent, const IStore2::ScopeType scope, const string& ns, const string& key, const string& value) + : _parent(parent) + , _scope(scope) + , _ns(ns) + , _key(key) + , _value(value) + { + _parent->AddRef(); + } + ~Job() override + { + _parent->Release(); + } + void Dispatch() override + { + _parent->OnValueChanged(_scope, _ns, _key, _value); + } + + private: + Store2* _parent; + const IStore2::ScopeType _scope; + const string _ns; + const string _key; + const string _value; + }; + private: Store2(const Store2&) = delete; Store2& operator=(const Store2&) = delete; @@ -75,22 +104,59 @@ namespace Plugin { if (rc != SQLITE_OK) { OnError(__FUNCTION__, rc); } + rc = sqlite3_busy_timeout(_data, SQLITE_TIMEOUT); // Timeout + if (rc != SQLITE_OK) { + OnError(__FUNCTION__, rc); + } const std::vector statements = { "pragma foreign_keys = on;", - "pragma busy_timeout = 1000000;", - "create table if not exists namespace (id integer primary key,name text unique);", - "create table if not exists item (ns integer,key text,value text,foreign key(ns) references namespace(id) on delete cascade on update no action,unique(ns,key) on conflict replace);", - "create table if not exists limits (n integer,size integer,foreign key(n) references namespace(id) on delete cascade on update no action,unique(n) on conflict replace);", + "create table if not exists namespace" + " (id integer primary key,name text unique);", + "create table if not exists item" + " (ns integer,key text,value text," + "foreign key(ns) references namespace(id) on delete cascade on update no action," + "unique(ns,key) on conflict replace);", + "create table if not exists limits" + " (n integer,size integer," + "foreign key(n) references namespace(id) on delete cascade on update no action," + "unique(n) on conflict replace);", "alter table item add column ttl integer;", - "create temporary trigger if not exists ns_empty insert on namespace begin select case when length(new.name) = 0 then raise (fail, 'empty') end; end;", - "create temporary trigger if not exists key_empty insert on item begin select case when length(new.key) = 0 then raise (fail, 'empty') end; end;", - "create temporary trigger if not exists ns_maxvalue insert on namespace begin select case when length(new.name) > " + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;", - "create temporary trigger if not exists key_maxvalue insert on item begin select case when length(new.key) > " + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;", - "create temporary trigger if not exists value_maxvalue insert on item begin select case when length(new.value) > " + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;", - "create temporary trigger if not exists ns_maxsize insert on namespace begin select case when (select sum(s) from (select sum(length(key)+length(value)) s from item union all select sum(length(name)) s from namespace union all select length(new.name) s)) > " + std::to_string(_maxSize) + " then raise (fail, 'max size') end; end;", - "create temporary trigger if not exists item_maxsize insert on item begin select case when (select sum(s) from (select sum(length(key)+length(value)) s from item union all select sum(length(name)) s from namespace union all select length(new.key)+length(new.value) s)) > " + std::to_string(_maxSize) + " then raise (fail, 'max size') end; end;", - "create temporary trigger if not exists item_limit_default insert on item begin select case when (select length(new.key)+length(new.value)+sum(length(key)+length(value)) from item where ns = new.ns) > " + std::to_string(_limit) + " then raise (fail, 'limit') end; end;", - "create temporary trigger if not exists item_limit insert on item begin select case when (select size-length(new.key)-length(new.value)-sum(length(key)+length(value)) from limits inner join item on limits.n = item.ns where n = new.ns) < 0 then raise (fail, 'limit') end; end;" + "create temporary trigger if not exists ns_empty insert on namespace" + " begin select case when length(new.name) = 0" + " then raise (fail, 'empty') end; end;", + "create temporary trigger if not exists key_empty insert on item" + " begin select case when length(new.key) = 0" + " then raise (fail, 'empty') end; end;", + "create temporary trigger if not exists ns_maxvalue insert on namespace" + " begin select case when length(new.name) > " + + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;", + "create temporary trigger if not exists key_maxvalue insert on item" + " begin select case when length(new.key) > " + + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;", + "create temporary trigger if not exists value_maxvalue insert on item" + " begin select case when length(new.value) > " + + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;", + "create temporary trigger if not exists ns_maxsize insert on namespace" + " begin select case when" + " (select sum(s) from (select sum(length(key)+length(value)) s from item" + " union all select sum(length(name)) s from namespace" + " union all select length(new.name) s)) > " + + std::to_string(_maxSize) + " then raise (fail, 'max size') end; end;", + "create temporary trigger if not exists item_maxsize insert on item" + " begin select case when" + " (select sum(s) from (select sum(length(key)+length(value)) s from item" + " union all select sum(length(name)) s from namespace" + " union all select length(new.key)+length(new.value) s)) > " + + std::to_string(_maxSize) + " then raise (fail, 'max size') end; end;", + "create temporary trigger if not exists item_limit_default insert on item" + " begin select case when" + " (select length(new.key)+length(new.value)+sum(length(key)+length(value)) from item where ns = new.ns) > " + + std::to_string(_limit) + " then raise (fail, 'limit') end; end;", + "create temporary trigger if not exists item_limit insert on item" + " begin select case when" + " (select size-length(new.key)-length(new.value)-sum(length(key)+length(value)) from limits" + " inner join item on limits.n = item.ns where n = new.ns) < 0" + " then raise (fail, 'limit') end; end;" }; for (auto& sql : statements) { auto rc = sqlite3_exec(_data, sql.c_str(), nullptr, nullptr, nullptr); @@ -106,13 +172,20 @@ namespace Plugin { OnError(__FUNCTION__, rc); } } - static bool IsTimeSynced() + bool IsTimeSynced() const { #ifdef WITH_SYSMGR + // Get actual state, as it may change at any time... IARM_Bus_Init(IARM_INIT_NAME); IARM_Bus_Connect(); IARM_Bus_SYSMgr_GetSystemStates_Param_t param; - if ((IARM_Bus_Call(IARM_BUS_SYSMGR_NAME, IARM_BUS_SYSMGR_API_GetSystemStates, ¶m, sizeof(param)) != IARM_RESULT_SUCCESS) + if ((IARM_Bus_Call_with_IPCTimeout( + IARM_BUS_SYSMGR_NAME, + IARM_BUS_SYSMGR_API_GetSystemStates, + ¶m, + sizeof(param), + IARM_TIMEOUT) // Timeout + != IARM_RESULT_SUCCESS) || !param.time_source.state) { return false; } @@ -185,7 +258,9 @@ namespace Plugin { } if (rc == SQLITE_DONE) { - OnValueChanged(ns, key, value); + Core::IWorkerPool::Instance().Submit(Core::ProxyType( + Core::ProxyType::Create(this, scope, ns, key, value))); // Decouple notification + result = Core::ERROR_NONE; } else { OnError(__FUNCTION__, rc); @@ -489,7 +564,7 @@ namespace Plugin { END_INTERFACE_MAP private: - void OnValueChanged(const string& ns, const string& key, const string& value) + void OnValueChanged(const IStore2::ScopeType scope, const string& ns, const string& key, const string& value) { Core::SafeSyncType lock(_clientLock); @@ -497,7 +572,8 @@ namespace Plugin { index(_clients.begin()); while (index != _clients.end()) { - (*index)->ValueChanged(IStore2::ScopeType::DEVICE, ns, key, value); + // If main process is out of threads, this can time out, and IPC will mess up... + (*index)->ValueChanged(scope, ns, key, value); index++; } } diff --git a/PersistentStore/sqlite/l1test/Store2Test.cpp b/PersistentStore/sqlite/l1test/Store2Test.cpp index 2ee78da6a4..04454ed49e 100644 --- a/PersistentStore/sqlite/l1test/Store2Test.cpp +++ b/PersistentStore/sqlite/l1test/Store2Test.cpp @@ -3,6 +3,7 @@ #include "../Store2.h" #include "Store2NotificationMock.h" +#include "WorkerPoolImplementation.h" using ::testing::_; using ::testing::Eq; @@ -39,10 +40,18 @@ const auto kLimit40 = 40; class AStore2 : public Test { protected: + WPEFramework::Core::ProxyType workerPool; WPEFramework::Core::ProxyType store2; AStore2() - : store2(WPEFramework::Core::ProxyType::Create(kPath, kMaxSize, kMaxValue, kLimit)) + : workerPool(WPEFramework::Core::ProxyType::Create( + WPEFramework::Core::Thread::DefaultStackSize())) + , store2(WPEFramework::Core::ProxyType::Create(kPath, kMaxSize, kMaxValue, kLimit)) { + WPEFramework::Core::IWorkerPool::Assign(&(*workerPool)); + } + ~AStore2() override + { + WPEFramework::Core::IWorkerPool::Assign(nullptr); } }; diff --git a/PersistentStore/sqlite/l1test/WorkerPoolImplementation.h b/PersistentStore/sqlite/l1test/WorkerPoolImplementation.h new file mode 100644 index 0000000000..f782fa5489 --- /dev/null +++ b/PersistentStore/sqlite/l1test/WorkerPoolImplementation.h @@ -0,0 +1,49 @@ +#pragma once + +#include "../../Module.h" + +class WorkerPoolImplementation + : public WPEFramework::Core::WorkerPool, + public WPEFramework::Core::ThreadPool::ICallback { +private: + class Dispatcher : public WPEFramework::Core::ThreadPool::IDispatcher { + public: + Dispatcher(const Dispatcher&) = delete; + Dispatcher& operator=(const Dispatcher&) = delete; + Dispatcher() = default; + ~Dispatcher() override = default; + + private: + void Initialize() override + { + } + void Deinitialize() override + { + } + void Dispatch(WPEFramework::Core::IDispatch* job) override + { + job->Dispatch(); + } + }; + +public: + WorkerPoolImplementation() = delete; + WorkerPoolImplementation(const WorkerPoolImplementation&) = delete; + WorkerPoolImplementation& operator=(const WorkerPoolImplementation&) = delete; + WorkerPoolImplementation(const uint32_t stackSize) + : WPEFramework::Core::WorkerPool(4 /*threadCount*/, stackSize, 32 /*queueSize*/, &_dispatch, this) + , _dispatch() + { + Run(); + } + ~WorkerPoolImplementation() override + { + Stop(); + } + void Idle() override + { + } + +private: + Dispatcher _dispatch; +}; From 5439e02c0b1d2841f3ca81a1572ee3172436dfcd Mon Sep 17 00:00:00 2001 From: Nikita Poltorapavlo Date: Mon, 9 Sep 2024 19:14:23 +0300 Subject: [PATCH 2/2] adapt to the recent changes in Thunder master branch --- PersistentStore/l0test/ServiceMock.h | 1 + PersistentStore/l1test/ServiceMock.h | 1 + 2 files changed, 2 insertions(+) diff --git a/PersistentStore/l0test/ServiceMock.h b/PersistentStore/l0test/ServiceMock.h index 63fa33379f..c9efdb8da5 100644 --- a/PersistentStore/l0test/ServiceMock.h +++ b/PersistentStore/l0test/ServiceMock.h @@ -53,6 +53,7 @@ class ServiceMock : public WPEFramework::PluginHost::IShell, MOCK_METHOD(void, Unregister, (const IShell::ICOMLink::INotification*), (override)); MOCK_METHOD(WPEFramework::RPC::IRemoteConnection*, RemoteConnection, (const uint32_t), (override)); MOCK_METHOD(void*, Instantiate, (const WPEFramework::RPC::Object&, const uint32_t, uint32_t&), (override)); + MOCK_METHOD(WPEFramework::RPC::IStringIterator*, GetLibrarySearchPaths, (const string&), (const, override)); BEGIN_INTERFACE_MAP(ServiceMock) INTERFACE_ENTRY(IShell) INTERFACE_ENTRY(IShell::ICOMLink) diff --git a/PersistentStore/l1test/ServiceMock.h b/PersistentStore/l1test/ServiceMock.h index 63fa33379f..c9efdb8da5 100644 --- a/PersistentStore/l1test/ServiceMock.h +++ b/PersistentStore/l1test/ServiceMock.h @@ -53,6 +53,7 @@ class ServiceMock : public WPEFramework::PluginHost::IShell, MOCK_METHOD(void, Unregister, (const IShell::ICOMLink::INotification*), (override)); MOCK_METHOD(WPEFramework::RPC::IRemoteConnection*, RemoteConnection, (const uint32_t), (override)); MOCK_METHOD(void*, Instantiate, (const WPEFramework::RPC::Object&, const uint32_t, uint32_t&), (override)); + MOCK_METHOD(WPEFramework::RPC::IStringIterator*, GetLibrarySearchPaths, (const string&), (const, override)); BEGIN_INTERFACE_MAP(ServiceMock) INTERFACE_ENTRY(IShell) INTERFACE_ENTRY(IShell::ICOMLink)