Skip to content

Commit

Permalink
Merge pull request #5688 from npoltorapavlo/DELIA-66118-persistentstore
Browse files Browse the repository at this point in the history
DELIA-66118: PersistentStore improvements
  • Loading branch information
anand-ky authored Sep 10, 2024
2 parents 4789e6b + 5439e02 commit 2262024
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 19 deletions.
2 changes: 2 additions & 0 deletions PersistentStore/Module.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
#define MAXVALUE_ENV "PERSISTENTSTORE_MAXVALUE"
#define LIMIT_ENV "PERSISTENTSTORE_LIMIT"
#define IARM_INIT_NAME "Thunder_Plugins"
#define IARM_TIMEOUT 1000
#define SQLITE_TIMEOUT 1000

#undef EXTERNAL
#define EXTERNAL
1 change: 1 addition & 0 deletions PersistentStore/l0test/ServiceMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class ServiceMock : public WPEFramework::PluginHost::IShell,
MOCK_METHOD(void, Unregister, (const IShell::ICOMLink::INotification*), (override));
MOCK_METHOD(WPEFramework::RPC::IRemoteConnection*, RemoteConnection, (const uint32_t), (override));
MOCK_METHOD(void*, Instantiate, (const WPEFramework::RPC::Object&, const uint32_t, uint32_t&), (override));
MOCK_METHOD(WPEFramework::RPC::IStringIterator*, GetLibrarySearchPaths, (const string&), (const, override));
BEGIN_INTERFACE_MAP(ServiceMock)
INTERFACE_ENTRY(IShell)
INTERFACE_ENTRY(IShell::ICOMLink)
Expand Down
1 change: 1 addition & 0 deletions PersistentStore/l1test/ServiceMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class ServiceMock : public WPEFramework::PluginHost::IShell,
MOCK_METHOD(void, Unregister, (const IShell::ICOMLink::INotification*), (override));
MOCK_METHOD(WPEFramework::RPC::IRemoteConnection*, RemoteConnection, (const uint32_t), (override));
MOCK_METHOD(void*, Instantiate, (const WPEFramework::RPC::Object&, const uint32_t, uint32_t&), (override));
MOCK_METHOD(WPEFramework::RPC::IStringIterator*, GetLibrarySearchPaths, (const string&), (const, override));
BEGIN_INTERFACE_MAP(ServiceMock)
INTERFACE_ENTRY(IShell)
INTERFACE_ENTRY(IShell::ICOMLink)
Expand Down
112 changes: 94 additions & 18 deletions PersistentStore/sqlite/Store2.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,35 @@ namespace Plugin {
public Exchange::IStoreCache,
public Exchange::IStoreInspector,
public Exchange::IStoreLimit {
private:
class Job : public Core::IDispatch {
public:
Job(Store2* parent, const IStore2::ScopeType scope, const string& ns, const string& key, const string& value)
: _parent(parent)
, _scope(scope)
, _ns(ns)
, _key(key)
, _value(value)
{
_parent->AddRef();
}
~Job() override
{
_parent->Release();
}
void Dispatch() override
{
_parent->OnValueChanged(_scope, _ns, _key, _value);
}

private:
Store2* _parent;
const IStore2::ScopeType _scope;
const string _ns;
const string _key;
const string _value;
};

private:
Store2(const Store2&) = delete;
Store2& operator=(const Store2&) = delete;
Expand Down Expand Up @@ -75,22 +104,59 @@ namespace Plugin {
if (rc != SQLITE_OK) {
OnError(__FUNCTION__, rc);
}
rc = sqlite3_busy_timeout(_data, SQLITE_TIMEOUT); // Timeout
if (rc != SQLITE_OK) {
OnError(__FUNCTION__, rc);
}
const std::vector<string> statements = {
"pragma foreign_keys = on;",
"pragma busy_timeout = 1000000;",
"create table if not exists namespace (id integer primary key,name text unique);",
"create table if not exists item (ns integer,key text,value text,foreign key(ns) references namespace(id) on delete cascade on update no action,unique(ns,key) on conflict replace);",
"create table if not exists limits (n integer,size integer,foreign key(n) references namespace(id) on delete cascade on update no action,unique(n) on conflict replace);",
"create table if not exists namespace"
" (id integer primary key,name text unique);",
"create table if not exists item"
" (ns integer,key text,value text,"
"foreign key(ns) references namespace(id) on delete cascade on update no action,"
"unique(ns,key) on conflict replace);",
"create table if not exists limits"
" (n integer,size integer,"
"foreign key(n) references namespace(id) on delete cascade on update no action,"
"unique(n) on conflict replace);",
"alter table item add column ttl integer;",
"create temporary trigger if not exists ns_empty insert on namespace begin select case when length(new.name) = 0 then raise (fail, 'empty') end; end;",
"create temporary trigger if not exists key_empty insert on item begin select case when length(new.key) = 0 then raise (fail, 'empty') end; end;",
"create temporary trigger if not exists ns_maxvalue insert on namespace begin select case when length(new.name) > " + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;",
"create temporary trigger if not exists key_maxvalue insert on item begin select case when length(new.key) > " + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;",
"create temporary trigger if not exists value_maxvalue insert on item begin select case when length(new.value) > " + std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;",
"create temporary trigger if not exists ns_maxsize insert on namespace begin select case when (select sum(s) from (select sum(length(key)+length(value)) s from item union all select sum(length(name)) s from namespace union all select length(new.name) s)) > " + std::to_string(_maxSize) + " then raise (fail, 'max size') end; end;",
"create temporary trigger if not exists item_maxsize insert on item begin select case when (select sum(s) from (select sum(length(key)+length(value)) s from item union all select sum(length(name)) s from namespace union all select length(new.key)+length(new.value) s)) > " + std::to_string(_maxSize) + " then raise (fail, 'max size') end; end;",
"create temporary trigger if not exists item_limit_default insert on item begin select case when (select length(new.key)+length(new.value)+sum(length(key)+length(value)) from item where ns = new.ns) > " + std::to_string(_limit) + " then raise (fail, 'limit') end; end;",
"create temporary trigger if not exists item_limit insert on item begin select case when (select size-length(new.key)-length(new.value)-sum(length(key)+length(value)) from limits inner join item on limits.n = item.ns where n = new.ns) < 0 then raise (fail, 'limit') end; end;"
"create temporary trigger if not exists ns_empty insert on namespace"
" begin select case when length(new.name) = 0"
" then raise (fail, 'empty') end; end;",
"create temporary trigger if not exists key_empty insert on item"
" begin select case when length(new.key) = 0"
" then raise (fail, 'empty') end; end;",
"create temporary trigger if not exists ns_maxvalue insert on namespace"
" begin select case when length(new.name) > "
+ std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;",
"create temporary trigger if not exists key_maxvalue insert on item"
" begin select case when length(new.key) > "
+ std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;",
"create temporary trigger if not exists value_maxvalue insert on item"
" begin select case when length(new.value) > "
+ std::to_string(_maxValue) + " then raise (fail, 'max value') end; end;",
"create temporary trigger if not exists ns_maxsize insert on namespace"
" begin select case when"
" (select sum(s) from (select sum(length(key)+length(value)) s from item"
" union all select sum(length(name)) s from namespace"
" union all select length(new.name) s)) > "
+ std::to_string(_maxSize) + " then raise (fail, 'max size') end; end;",
"create temporary trigger if not exists item_maxsize insert on item"
" begin select case when"
" (select sum(s) from (select sum(length(key)+length(value)) s from item"
" union all select sum(length(name)) s from namespace"
" union all select length(new.key)+length(new.value) s)) > "
+ std::to_string(_maxSize) + " then raise (fail, 'max size') end; end;",
"create temporary trigger if not exists item_limit_default insert on item"
" begin select case when"
" (select length(new.key)+length(new.value)+sum(length(key)+length(value)) from item where ns = new.ns) > "
+ std::to_string(_limit) + " then raise (fail, 'limit') end; end;",
"create temporary trigger if not exists item_limit insert on item"
" begin select case when"
" (select size-length(new.key)-length(new.value)-sum(length(key)+length(value)) from limits"
" inner join item on limits.n = item.ns where n = new.ns) < 0"
" then raise (fail, 'limit') end; end;"
};
for (auto& sql : statements) {
auto rc = sqlite3_exec(_data, sql.c_str(), nullptr, nullptr, nullptr);
Expand All @@ -106,13 +172,20 @@ namespace Plugin {
OnError(__FUNCTION__, rc);
}
}
static bool IsTimeSynced()
bool IsTimeSynced() const
{
#ifdef WITH_SYSMGR
// Get actual state, as it may change at any time...
IARM_Bus_Init(IARM_INIT_NAME);
IARM_Bus_Connect();
IARM_Bus_SYSMgr_GetSystemStates_Param_t param;
if ((IARM_Bus_Call(IARM_BUS_SYSMGR_NAME, IARM_BUS_SYSMGR_API_GetSystemStates, &param, sizeof(param)) != IARM_RESULT_SUCCESS)
if ((IARM_Bus_Call_with_IPCTimeout(
IARM_BUS_SYSMGR_NAME,
IARM_BUS_SYSMGR_API_GetSystemStates,
&param,
sizeof(param),
IARM_TIMEOUT) // Timeout
!= IARM_RESULT_SUCCESS)
|| !param.time_source.state) {
return false;
}
Expand Down Expand Up @@ -185,7 +258,9 @@ namespace Plugin {
}

if (rc == SQLITE_DONE) {
OnValueChanged(ns, key, value);
Core::IWorkerPool::Instance().Submit(Core::ProxyType<Core::IDispatch>(
Core::ProxyType<Job>::Create(this, scope, ns, key, value))); // Decouple notification

result = Core::ERROR_NONE;
} else {
OnError(__FUNCTION__, rc);
Expand Down Expand Up @@ -489,15 +564,16 @@ namespace Plugin {
END_INTERFACE_MAP

private:
void OnValueChanged(const string& ns, const string& key, const string& value)
void OnValueChanged(const IStore2::ScopeType scope, const string& ns, const string& key, const string& value)
{
Core::SafeSyncType<Core::CriticalSection> lock(_clientLock);

std::list<INotification*>::iterator
index(_clients.begin());

while (index != _clients.end()) {
(*index)->ValueChanged(IStore2::ScopeType::DEVICE, ns, key, value);
// If main process is out of threads, this can time out, and IPC will mess up...
(*index)->ValueChanged(scope, ns, key, value);
index++;
}
}
Expand Down
11 changes: 10 additions & 1 deletion PersistentStore/sqlite/l1test/Store2Test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "../Store2.h"
#include "Store2NotificationMock.h"
#include "WorkerPoolImplementation.h"

using ::testing::_;
using ::testing::Eq;
Expand Down Expand Up @@ -39,10 +40,18 @@ const auto kLimit40 = 40;

class AStore2 : public Test {
protected:
WPEFramework::Core::ProxyType<WorkerPoolImplementation> workerPool;
WPEFramework::Core::ProxyType<Store2> store2;
AStore2()
: store2(WPEFramework::Core::ProxyType<Store2>::Create(kPath, kMaxSize, kMaxValue, kLimit))
: workerPool(WPEFramework::Core::ProxyType<WorkerPoolImplementation>::Create(
WPEFramework::Core::Thread::DefaultStackSize()))
, store2(WPEFramework::Core::ProxyType<Store2>::Create(kPath, kMaxSize, kMaxValue, kLimit))
{
WPEFramework::Core::IWorkerPool::Assign(&(*workerPool));
}
~AStore2() override
{
WPEFramework::Core::IWorkerPool::Assign(nullptr);
}
};

Expand Down
49 changes: 49 additions & 0 deletions PersistentStore/sqlite/l1test/WorkerPoolImplementation.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include "../../Module.h"

class WorkerPoolImplementation
: public WPEFramework::Core::WorkerPool,
public WPEFramework::Core::ThreadPool::ICallback {
private:
class Dispatcher : public WPEFramework::Core::ThreadPool::IDispatcher {
public:
Dispatcher(const Dispatcher&) = delete;
Dispatcher& operator=(const Dispatcher&) = delete;
Dispatcher() = default;
~Dispatcher() override = default;

private:
void Initialize() override
{
}
void Deinitialize() override
{
}
void Dispatch(WPEFramework::Core::IDispatch* job) override
{
job->Dispatch();
}
};

public:
WorkerPoolImplementation() = delete;
WorkerPoolImplementation(const WorkerPoolImplementation&) = delete;
WorkerPoolImplementation& operator=(const WorkerPoolImplementation&) = delete;
WorkerPoolImplementation(const uint32_t stackSize)
: WPEFramework::Core::WorkerPool(4 /*threadCount*/, stackSize, 32 /*queueSize*/, &_dispatch, this)
, _dispatch()
{
Run();
}
~WorkerPoolImplementation() override
{
Stop();
}
void Idle() override
{
}

private:
Dispatcher _dispatch;
};

0 comments on commit 2262024

Please sign in to comment.