Skip to content

Commit

Permalink
Merge branch 'consensus/reputation' into 0.21
Browse files Browse the repository at this point in the history
  • Loading branch information
lostystyg committed Jun 13, 2022
2 parents 039f41f + bb11a49 commit 67a8917
Show file tree
Hide file tree
Showing 57 changed files with 1,721 additions and 1,422 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@
<ClCompile Include="..\..\src\pocketdb\models\dto\Complain.hpp" />
<ClCompile Include="..\..\src\pocketdb\models\dto\Default.hpp" />
<ClCompile Include="..\..\src\pocketdb\models\dto\Post.hpp" />
<ClCompile Include="..\..\src\pocketdb\models\dto\ReturnDtoModels.hpp" />
<ClCompile Include="..\..\src\pocketdb\models\dto\DtoModels.hpp" />
<ClCompile Include="..\..\src\pocketdb\models\dto\ScoreComment.hpp" />
<ClCompile Include="..\..\src\pocketdb\models\dto\Subscribe.hpp" />
<ClCompile Include="..\..\src\pocketdb\models\dto\SubscribeCancel.hpp" />
Expand Down
4 changes: 3 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ add_library(${POCKETDB}
pocketdb/models/base/TransactionOutput.h
pocketdb/models/base/SocialTransaction.h
pocketdb/models/base/Rating.h
pocketdb/models/base/ReturnDtoModels.h
pocketdb/models/base/DtoModels.h
pocketdb/models/dto/Default.h
pocketdb/models/dto/Coinbase.h
pocketdb/models/dto/Coinstake.h
Expand Down Expand Up @@ -1082,6 +1082,8 @@ add_library(${POCKETCOIN_SERVER}
pocketdb/repositories/CheckpointRepository.cpp
pocketdb/repositories/SystemRepository.h
pocketdb/repositories/SystemRepository.cpp
pocketdb/repositories/MigrationRepository.h
pocketdb/repositories/MigrationRepository.cpp
pocketdb/repositories/web/NotifierRepository.h
pocketdb/repositories/web/NotifierRepository.cpp
pocketdb/repositories/web/WebRepository.h
Expand Down
4 changes: 3 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ POCKETDB_H = \
pocketdb/repositories/RatingsRepository.h \
pocketdb/repositories/CheckpointRepository.h \
pocketdb/repositories/SystemRepository.h \
pocketdb/repositories/MigrationRepository.h \
pocketdb/repositories/web/WebRepository.h \
pocketdb/repositories/web/WebRpcRepository.h \
pocketdb/repositories/web/NotifierRepository.h \
Expand Down Expand Up @@ -190,7 +191,7 @@ POCKETDB_H = \
pocketdb/models/base/SocialTransaction.h \
pocketdb/models/base/Rating.h \
pocketdb/models/base/Payload.h \
pocketdb/models/base/ReturnDtoModels.h \
pocketdb/models/base/DtoModels.h \
\
pocketdb/models/dto/Default.h \
pocketdb/models/dto/Coinbase.h \
Expand Down Expand Up @@ -243,6 +244,7 @@ POCKETDB_CPP = \
pocketdb/repositories/RatingsRepository.cpp \
pocketdb/repositories/CheckpointRepository.cpp \
pocketdb/repositories/SystemRepository.cpp \
pocketdb/repositories/MigrationRepository.cpp \
pocketdb/repositories/web/WebRepository.cpp \
pocketdb/repositories/web/WebRpcRepository.cpp \
pocketdb/repositories/web/NotifierRepository.cpp \
Expand Down
2 changes: 0 additions & 2 deletions src/chainparams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ class CMainParams : public CChainParams
consensus.nRuleChangeActivationThreshold = 95;
consensus.nMinerConfirmationWindow = 100;
consensus.nPosFirstBlock = 1020;
consensus.fPosRequiresPeers = false;
consensus.nStakeMinAge = 60 * 60;
consensus.nPosTargetSpacing = consensus.nPowTargetSpacing;
consensus.nPosTargetTimespan = 7500;
Expand Down Expand Up @@ -209,7 +208,6 @@ class CTestNetParams : public CChainParams
consensus.nMinerConfirmationWindow = 100; // nPowTargetTimespan / nPowTargetSpacing

consensus.nPosFirstBlock = 1020;
consensus.fPosRequiresPeers = false;
consensus.nStakeMinAge = 30 * 60;
consensus.nPosTargetSpacing = consensus.nPowTargetSpacing;
consensus.nPosTargetTimespan = 7500;
Expand Down
1 change: 0 additions & 1 deletion src/consensus/params.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ struct Params {
int64_t nStakeMinimumThreshold;
int64_t nStakeMaximumThreshold;

bool fPosRequiresPeers;
int nDailyBlockCount;
unsigned int nModifierInterval;

Expand Down
85 changes: 52 additions & 33 deletions src/eventloop.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ template<class T>
class Queue
{
public:
using condCheck = std::function<bool()>;

/**
* Pop the next object from queue.
* If queue is empty - blocks current thread until new value comes to queue
Expand All @@ -33,42 +35,50 @@ class Queue
* @return true if element was filled
* @return false if element was not filled
*/
bool GetNext(T& out)
bool GetNext(T& out, const condCheck& pre, const condCheck& post)
{
std::unique_lock<std::mutex> lock(m_mutex);
if (!GetPostConditionCheck()) {
return false;
WAIT_LOCK(m_mutex, lock);

if (pre) {
if (!pre()) {
return false;
}
}

if (m_queue.empty()) {
m_cv.wait(lock);
}
if (m_queue.empty()) {
// Just return false because if we are here - queue waiting was interrupted and wi need to unblock waiting threads.
// False indicates that there is no out value and thread can call GetNext() again if it was not expected to interrupt.
return false;

if (post) {
if (!post()) {
return false;
}
}
if (!GetPostConditionCheck()) {

if (m_queue.empty()) {
return false;
}

out = std::forward<T>(m_queue.front());
m_queue.pop();
return true;
}

bool Add(T entry)
{
{
std::unique_lock<std::mutex> lock(m_mutex);
if (!AddConditionCheck()) {
return false;
}
m_queue.push(std::forward<T>(entry));
LOCK(m_mutex);

if (!AddConditionCheck()) {
return false;
}

m_queue.push(std::forward<T>(entry));
m_cv.notify_one();
return true;
}
void Interrupt()
{
LOCK(m_mutex);
// This just simply unblocks all threads that are waiting for value.
// If there are multiple threads working with a single queue this will have the following workflow:
// 1) All threads that are waiting in GetNext() will be unblocked.
Expand All @@ -80,7 +90,7 @@ class Queue

size_t Size()
{
std::unique_lock<std::mutex> lock(m_mutex);
LOCK(m_mutex);
return _Size();
}

Expand All @@ -90,19 +100,14 @@ class Queue
virtual bool AddConditionCheck() {
return true;
}
virtual bool GetPreconditionCheck() {
return true;
}
virtual bool GetPostConditionCheck() {
return true;
}

size_t _Size()
{
return m_queue.size();
}
private:
std::queue<T> m_queue;
std::mutex m_mutex;
Mutex m_mutex;
std::condition_variable m_cv;
};

Expand Down Expand Up @@ -169,22 +174,33 @@ class QueueEventLoopThread

void Start(std::optional<std::string> name = std::nullopt)
{
LOCK(m_running_mutex);
m_fRunning = true;
m_thread = std::thread([name, &fRunning = m_fRunning, queue = m_queue, queueProcessor = m_queueProcessor](){

m_thread = std::thread([name, &fRunning = m_fRunning, queue = m_queue, queueProcessor = m_queueProcessor]()
{
if (name) {
util::ThreadRename(name->c_str());
}
while (fRunning) {
try {

// This is going to be executed after internal queue mutex lock to prevent
// stopping thread between this check and starting to wait.
auto preAndPostCheck = [&]() -> bool { return fRunning; };

while (fRunning)
{
try
{
T entry;
auto res = queue->GetNext(entry);
auto res = queue->GetNext(entry, preAndPostCheck, preAndPostCheck);

// If res is false - someone else interrupts queue and if current thread still wants to run just call GetNext() again
if (res && fRunning) {
if (res)
queueProcessor->Process(std::forward<T>(entry));
}
} catch (const std::exception& e) {
fRunning = false;
LogPrintf("Shutting down %s event loop thread because of exception: %s", name.value_or(""), e.what());
}
catch (const std::exception& e)
{
LogPrintf("%s event loop thread exception: %s", name.value_or(""), e.what());
}
}
});
Expand All @@ -196,10 +212,12 @@ class QueueEventLoopThread
*/
void Stop()
{
LOCK(m_running_mutex);
if (m_fRunning) {
m_fRunning = false;
m_queue->Interrupt();
}

// Try join anyway because otherwise there could be a situation when thread is stopped but not joined
// that is a bad practise.
if (m_thread.joinable()) {
Expand All @@ -216,7 +234,8 @@ class QueueEventLoopThread
std::thread m_thread;
std::shared_ptr<Queue<T>> m_queue;
std::atomic_bool m_fRunning = true;
Mutex m_running_mutex;
std::shared_ptr<IQueueProcessor<T>> m_queueProcessor;
};

#endif // POCKETCOIN_EVENTLOOP_H
#endif // POCKETCOIN_EVENTLOOP_H
20 changes: 14 additions & 6 deletions src/httpserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,21 +569,21 @@ HTTPSocket::~HTTPSocket()
}
}

void HTTPSocket::StartThreads(std::shared_ptr<Queue<std::unique_ptr<HTTPClosure>>> queue, int threadCount, bool selfDbConnection)
void HTTPSocket::StartThreads(const std::string name, std::shared_ptr<Queue<std::unique_ptr<HTTPClosure>>> queue, int threadCount, bool selfDbConnection)
{
for (int i = 0; i < threadCount; i++) {
// Creating exec processor for every thread to guarantee each thread will have its own sqliteConnection.
// If unique sqliteConnection for each thread is not required, execProcessor can be shared between threads
auto execProcessor = std::make_shared<ExecutorSqlite>(selfDbConnection);
auto thread = std::make_shared<QueueEventLoopThread<std::unique_ptr<HTTPClosure>>>(queue, std::move(execProcessor));
thread->Start(strprintf("pocketcoin-httpworker.%i", i));
thread->Start(name);
m_thread_http_workers.emplace_back(thread);
}
}

void HTTPSocket::StartHTTPSocket(int threadCount, bool selfDbConnection)
{
StartThreads(m_workQueue, threadCount, selfDbConnection);
StartThreads("HTTPSocket::StartHTTPSocket", m_workQueue, threadCount, selfDbConnection);
}

void HTTPSocket::StopHTTPSocket()
Expand All @@ -608,6 +608,9 @@ void HTTPSocket::StopHTTPSocket()

void HTTPSocket::InterruptHTTPSocket()
{
if (m_thread_http_workers.empty())
return;

if (m_eventHTTP)
{
// Reject requests on current connections
Expand Down Expand Up @@ -801,14 +804,19 @@ HTTPWebSocket::~HTTPWebSocket() = default;

void HTTPWebSocket::StartHTTPSocket(int threadCount, int threadPostCount, bool selfDbConnection)
{
StartThreads(m_workQueue, threadCount, selfDbConnection);
StartThreads(m_workPostQueue, threadPostCount, selfDbConnection);
StartThreads("HTTPWebSocket::StartHTTPSocket (GET)", m_workQueue, threadCount, selfDbConnection);
StartThreads("HTTPWebSocket::StartHTTPSocket (POST)", m_workPostQueue, threadPostCount, selfDbConnection);
}

void HTTPWebSocket::StopHTTPSocket()
{
HTTPSocket::StopHTTPSocket();
// Interrupting socket here because stop without interrupting is illegal.
InterruptHTTPSocket();

// Resetting queue as it has done previously that restricts running this socket again.
// However this doesn't affect current rpc handlers because they handle their own shared_ptr of queue, but adding new rpc handlers
// is UB after this call.
m_workQueue.reset();
m_workPostQueue.reset();
}

Expand Down
3 changes: 1 addition & 2 deletions src/httpserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class HTTPSocket
std::vector<std::shared_ptr<QueueEventLoopThread<std::unique_ptr<HTTPClosure>>>> m_thread_http_workers;

protected:
void StartThreads(std::shared_ptr<Queue<std::unique_ptr<HTTPClosure>>> queue, int threadCount, bool selfDbConnection);
void StartThreads(const std::string name, std::shared_ptr<Queue<std::unique_ptr<HTTPClosure>>> queue, int threadCount, bool selfDbConnection);

public:
HTTPSocket(struct event_base* base, int timeout, int queueDepth, bool publicAccess);
Expand Down Expand Up @@ -328,7 +328,6 @@ class HTTPWebSocket: public HTTPSocket

HTTPWebSocket(struct event_base* base, int timeout, int queueDepth, int queuePostDepth, bool publicAccess);
~HTTPWebSocket();

void StartHTTPSocket(int threadCount, int threadPostCount, bool selfDbConnection);
void StopHTTPSocket();
void InterruptHTTPSocket();
Expand Down
Loading

0 comments on commit 67a8917

Please sign in to comment.