From b93938914dae3f743cb20da7c5dce7b6129b310e Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Tue, 21 Dec 2021 18:11:51 -0500 Subject: [PATCH 01/19] add readwrite lock for forking --- src/rdb.cpp | 4 ++++ src/readwritelock.h | 37 +++++++++++++++++++++++++++++++++++++ src/server.cpp | 6 ++++++ src/server.h | 2 ++ 4 files changed, 49 insertions(+) create mode 100644 src/readwritelock.h diff --git a/src/rdb.cpp b/src/rdb.cpp index dfeaaa54b..b3e2fda4b 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1511,6 +1511,8 @@ int rdbSaveBackground(rdbSaveInfo *rsi) { g_pserver->dirty_before_bgsave = g_pserver->dirty; g_pserver->lastbgsave_try = time(NULL); + g_forkLock->releaseRead(); + g_forkLock->acquireWrite(); if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) { int retval; @@ -1524,6 +1526,8 @@ int rdbSaveBackground(rdbSaveInfo *rsi) { } exitFromChild((retval == C_OK) ? 0 : 1); } else { + g_forkLock->releaseWrite(); + g_forkLock->acquireRead(); /* Parent */ if (childpid == -1) { g_pserver->lastbgsave_status = C_ERR; diff --git a/src/readwritelock.h b/src/readwritelock.h new file mode 100644 index 000000000..ce8f6c07d --- /dev/null +++ b/src/readwritelock.h @@ -0,0 +1,37 @@ +#pragma once +#include + +class readWriteLock { + std::condition_variable m_cv; + std::mutex m_readLock; + std::mutex m_writeLock; + std::unique_lock m_ul; + int m_readCount = 0; + bool m_wlocked = false; +public: + void acquireRead() { + std::unique_lock rm(m_readLock); + m_readCount++; + } + void acquireWrite() { + m_ul = std::unique_lock(m_readLock); + while (m_readCount > 0) { + m_cv.wait(m_ul); + } + m_writeLock.lock(); + m_wlocked = true; + } + void releaseRead() { + std::unique_lock rm(m_readLock); + m_readCount--; + if (m_readCount == 0) + m_cv.notify_all(); + } + void releaseWrite() { + serverAssert(m_wlocked); + m_wlocked = false; + m_writeLock.unlock(); + m_ul.unlock(); + m_cv.notify_all(); + } +}; \ No newline at end of file diff --git a/src/server.cpp b/src/server.cpp index 60454fe58..ac7a78488 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -67,6 +67,7 @@ #include "aelocker.h" #include "motd.h" #include "t_nhash.h" +#include "readwritelock.h" #ifdef __linux__ #include #include @@ -91,8 +92,10 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan; /* Global vars */ namespace GlobalHidden { struct redisServer server; /* Server global state */ +readWriteLock forkLock; } redisServer *g_pserver = &GlobalHidden::server; +readWriteLock *g_forkLock = &GlobalHidden::forkLock; struct redisServerConst cserver; __thread struct redisServerThreadVars *serverTL = NULL; // thread local server vars std::mutex time_thread_mutex; @@ -2629,6 +2632,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { sleeping_threads++; serverAssert(sleeping_threads <= cserver.cthreads); } + + g_forkLock->releaseRead(); /* Determine whether the modules are enabled before sleeping, and use that result both here, and after wakeup to avoid double acquire or release of the GIL */ @@ -2651,6 +2656,7 @@ void afterSleep(struct aeEventLoop *eventLoop) { if (!ProcessingEventsWhileBlocked) { wakeTimeThread(); if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); + g_forkLock->acquireRead(); } } diff --git a/src/server.h b/src/server.h index 7d813c4a0..7f846f199 100644 --- a/src/server.h +++ b/src/server.h @@ -96,6 +96,7 @@ typedef long long ustime_t; /* microsecond time type. */ #include "connection.h" /* Connection abstraction */ #include "serverassert.h" #include "expire.h" +#include "readwritelock.h" #define REDISMODULE_CORE 1 #include "redismodule.h" /* Redis modules API defines. */ @@ -2113,6 +2114,7 @@ typedef struct { //extern struct redisServer server; extern redisServer *g_pserver; +extern readWriteLock *g_forkLock; extern struct redisServerConst cserver; extern __thread struct redisServerThreadVars *serverTL; // thread local server vars extern struct sharedObjectsStruct shared; From 4aaa7b7bd72968345797c2e0e9535b62c34aa25d Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 22 Dec 2021 11:06:15 -0500 Subject: [PATCH 02/19] add forklock to time thread, wake time thread after acquiring locks --- src/server.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index ac7a78488..581a92322 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2654,9 +2654,9 @@ void afterSleep(struct aeEventLoop *eventLoop) { Don't check here that modules are enabled, rather use the result from beforeSleep Otherwise you may double acquire the GIL and cause deadlocks in the module */ if (!ProcessingEventsWhileBlocked) { - wakeTimeThread(); if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); g_forkLock->acquireRead(); + wakeTimeThread(); } } @@ -6567,7 +6567,9 @@ void *timeThreadMain(void*) { time_thread_cv.wait(lock); } } + g_forkLock->acquireRead(); updateCachedTime(); + g_forkLock->releaseRead(); clock_nanosleep(CLOCK_MONOTONIC, 0, &delay, NULL); } } From 754d5a3ba1037ad02b4dc8a3ee1fd9864cf47ebf Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 22 Dec 2021 11:19:24 -0500 Subject: [PATCH 03/19] move fork lock management within redisFork --- src/rdb.cpp | 4 ---- src/server.cpp | 4 ++++ 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index b3e2fda4b..dfeaaa54b 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1511,8 +1511,6 @@ int rdbSaveBackground(rdbSaveInfo *rsi) { g_pserver->dirty_before_bgsave = g_pserver->dirty; g_pserver->lastbgsave_try = time(NULL); - g_forkLock->releaseRead(); - g_forkLock->acquireWrite(); if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) { int retval; @@ -1526,8 +1524,6 @@ int rdbSaveBackground(rdbSaveInfo *rsi) { } exitFromChild((retval == C_OK) ? 0 : 1); } else { - g_forkLock->releaseWrite(); - g_forkLock->acquireRead(); /* Parent */ if (childpid == -1) { g_pserver->lastbgsave_status = C_ERR; diff --git a/src/server.cpp b/src/server.cpp index 581a92322..15ad3b110 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6236,6 +6236,8 @@ int redisFork(int purpose) { openChildInfoPipe(); } + g_forkLock->releaseRead(); + g_forkLock->acquireWrite(); if ((childpid = fork()) == 0) { /* Child */ g_pserver->in_fork_child = purpose; @@ -6244,6 +6246,8 @@ int redisFork(int purpose) { closeChildUnusedResourceAfterFork(); } else { /* Parent */ + g_forkLock->releaseWrite(); + g_forkLock->acquireRead(); g_pserver->stat_total_forks++; g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ From d2c080d923ff949b1130dc99fe551c7256df5a1f Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 22 Dec 2021 13:59:58 -0500 Subject: [PATCH 04/19] further generalize readWrite lock and use it for moduleGIL --- src/module.cpp | 65 ++++++++-------------------------------- src/readwritelock.h | 73 ++++++++++++++++++++++++++++++++++----------- 2 files changed, 68 insertions(+), 70 deletions(-) diff --git a/src/module.cpp b/src/module.cpp index fb2a66fc1..ed581086c 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -365,11 +365,7 @@ typedef struct RedisModuleCommandFilter { static list *moduleCommandFilters; /* Module GIL Variables */ -static int s_cAcquisitionsServer = 0; -static int s_cAcquisitionsModule = 0; -static std::mutex s_mutex; -static std::condition_variable s_cv; -static std::recursive_mutex s_mutexModule; +static readWriteLock s_moduleGIL; thread_local bool g_fModuleThread = false; typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); @@ -5962,95 +5958,58 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) { // as the server thread acquisition is sufficient. If we did try to lock we would deadlock static bool FModuleCallBackLock(bool fServerThread) { - return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0; + return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_moduleGIL.hasReader(); } void moduleAcquireGIL(int fServerThread, int fExclusive) { - std::unique_lock lock(s_mutex); - int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer; - if (FModuleCallBackLock(fServerThread)) { return; } - while (*pcheck > 0) - s_cv.wait(lock); - if (fServerThread) { - ++s_cAcquisitionsServer; + s_moduleGIL.acquireRead(); } else { - // only try to acquire the mutexModule in exclusive mode - if (fExclusive){ - // It is possible that another module thread holds the GIL (and s_mutexModule as a result). - // When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns. - // This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns. - // As a result, a deadlock has occured. - // We release the lock on s_mutex and wait until we are able to safely acquire the GIL - // in order to prevent this deadlock from occuring. - while (!s_mutexModule.try_lock()) - s_cv.wait(lock); - } - ++s_cAcquisitionsModule; - fModuleGILWlocked++; + s_moduleGIL.acquireWrite(fExclusive); } } int moduleTryAcquireGIL(bool fServerThread, int fExclusive) { - std::unique_lock lock(s_mutex, std::defer_lock); - if (!lock.try_lock()) - return 1; - int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer; - if (FModuleCallBackLock(fServerThread)) { return 0; } - if (*pcheck > 0) - return 1; - if (fServerThread) { - ++s_cAcquisitionsServer; + if (!s_moduleGIL.tryAcquireRead()) + return 1; } else { - // only try to acquire the mutexModule in exclusive mode - if (fExclusive){ - if (!s_mutexModule.try_lock()) - return 1; - } - ++s_cAcquisitionsModule; - fModuleGILWlocked++; + if (!s_moduleGIL.tryAcquireWrite(fExclusive)) + return 1; } return 0; } void moduleReleaseGIL(int fServerThread, int fExclusive) { - std::unique_lock lock(s_mutex); - if (FModuleCallBackLock(fServerThread)) { return; } - + if (fServerThread) { - --s_cAcquisitionsServer; + s_moduleGIL.releaseRead(); } else { - // only try to release the mutexModule in exclusive mode - if (fExclusive) - s_mutexModule.unlock(); - --s_cAcquisitionsModule; - fModuleGILWlocked--; + s_moduleGIL.releaseWrite(fExclusive); } - s_cv.notify_all(); } int moduleGILAcquiredByModule(void) { - return fModuleGILWlocked > 0; + return s_moduleGIL.hasWriter(); } diff --git a/src/readwritelock.h b/src/readwritelock.h index ce8f6c07d..f9e3e9d13 100644 --- a/src/readwritelock.h +++ b/src/readwritelock.h @@ -2,36 +2,75 @@ #include class readWriteLock { - std::condition_variable m_cv; std::mutex m_readLock; - std::mutex m_writeLock; - std::unique_lock m_ul; + std::recursive_mutex m_writeLock; + std::condition_variable m_cv; int m_readCount = 0; - bool m_wlocked = false; + int m_writeCount = 0; public: void acquireRead() { std::unique_lock rm(m_readLock); + while (m_writeCount > 0) + m_cv.wait(rm); + m_readCount++; + } + + bool tryAcquireRead() { + std::unique_lock rm(m_readLock, std::defer_lock); + if (!rm.try_lock()) + return false; + if (m_writeCount > 0) + return false; m_readCount++; + return true; } - void acquireWrite() { - m_ul = std::unique_lock(m_readLock); - while (m_readCount > 0) { - m_cv.wait(m_ul); - } - m_writeLock.lock(); - m_wlocked = true; + + void acquireWrite(bool exclusive = true) { + std::unique_lock rm(m_readLock); + while (m_readCount > 0) + m_cv.wait(rm); + if (exclusive) + while(!m_writeLock.try_lock()) + m_cv.wait(rm); + m_writeCount++; + } + + bool tryAcquireWrite(bool exclusive = true) { + std::unique_lock rm(m_readLock, std::defer_lock); + if (!rm.try_lock()) + return false; + if (m_readCount > 0) + return false; + if (exclusive) + if (!m_writeLock.try_lock()) + return false; + m_writeCount++; + return true; } + void releaseRead() { std::unique_lock rm(m_readLock); + serverAssert(m_readCount > 0); m_readCount--; if (m_readCount == 0) m_cv.notify_all(); } - void releaseWrite() { - serverAssert(m_wlocked); - m_wlocked = false; - m_writeLock.unlock(); - m_ul.unlock(); - m_cv.notify_all(); + + void releaseWrite(bool exclusive = true) { + std::unique_lock rm(m_readLock); + serverAssert(m_writeCount > 0); + if (exclusive) + m_writeLock.unlock(); + m_writeCount--; + if (m_writeCount == 0) + m_cv.notify_all(); + } + + bool hasReader() { + return m_readCount > 0; + } + + bool hasWriter() { + return m_writeCount > 0; } }; \ No newline at end of file From eec6b5142ed79132cb964dcd933ecffbf7670526 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 22 Dec 2021 14:05:46 -0500 Subject: [PATCH 05/19] explain deadlock possibility in acquireWrite --- src/readwritelock.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/readwritelock.h b/src/readwritelock.h index f9e3e9d13..117046567 100644 --- a/src/readwritelock.h +++ b/src/readwritelock.h @@ -30,6 +30,9 @@ class readWriteLock { while (m_readCount > 0) m_cv.wait(rm); if (exclusive) + /* Another thread might have the write lock while we have the read lock + but won't be able to release it until they can acquire the read lock + so release the read lock and try again instead of waiting to avoid deadlock */ while(!m_writeLock.try_lock()) m_cv.wait(rm); m_writeCount++; From 8dff4f34379e03a9dd6e7866fd6841c78e7234cd Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 22 Dec 2021 14:06:55 -0500 Subject: [PATCH 06/19] need to acquire forklock on thread start --- src/server.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server.cpp b/src/server.cpp index 15ad3b110..3027d281e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6593,6 +6593,7 @@ void *workerThreadMain(void *parg) } moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run + g_forkLock->acquireRead(); aeEventLoop *el = g_pserver->rgthreadvar[iel].el; try { From 2c932d7b80a03cb5c45777b32589eaf8a9502da7 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 22 Dec 2021 14:08:00 -0500 Subject: [PATCH 07/19] release fork lock on thread end --- src/server.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server.cpp b/src/server.cpp index 3027d281e..f789bbcf6 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6602,6 +6602,7 @@ void *workerThreadMain(void *parg) catch (ShutdownException) { } + g_forkLock->releaseRead(); moduleReleaseGIL(true); serverAssert(!GlobalLocksAcquired()); aeDeleteEventLoop(el); From c8bd5dbb4f92cc260e09b6e119b2c831f87aee34 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 22 Dec 2021 14:37:41 -0500 Subject: [PATCH 08/19] timethread only releases forklock when waiting --- src/server.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index f789bbcf6..b089d57c7 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6564,16 +6564,17 @@ void *timeThreadMain(void*) { timespec delay; delay.tv_sec = 0; delay.tv_nsec = 100; + g_forkLock->acquireRead(); while (true) { { std::unique_lock lock(time_thread_mutex); if (sleeping_threads >= cserver.cthreads) { + g_forkLock->releaseRead(); time_thread_cv.wait(lock); + g_forkLock->acquireRead(); } } - g_forkLock->acquireRead(); updateCachedTime(); - g_forkLock->releaseRead(); clock_nanosleep(CLOCK_MONOTONIC, 0, &delay, NULL); } } From c167902cbf5ef1c12a9e8247253e061a9f90d9c4 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 22 Dec 2021 14:42:14 -0500 Subject: [PATCH 09/19] reacquire fork lock every 500 time thread loops --- src/server.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/server.cpp b/src/server.cpp index b089d57c7..a7375f643 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6564,6 +6564,7 @@ void *timeThreadMain(void*) { timespec delay; delay.tv_sec = 0; delay.tv_nsec = 100; + int cycle_count = 0; g_forkLock->acquireRead(); while (true) { { @@ -6575,7 +6576,13 @@ void *timeThreadMain(void*) { } } updateCachedTime(); + if (cycle_count == 500) { + g_forkLock->releaseRead(); + g_forkLock->acquireRead(); + cycle_count = 0; + } clock_nanosleep(CLOCK_MONOTONIC, 0, &delay, NULL); + cycle_count++; } } From 9e92c038b814080b96640fccd8ff92c3a5027575 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 22 Dec 2021 15:18:45 -0500 Subject: [PATCH 10/19] reset cycle count on cvwait and name constant --- src/server.cpp | 3 ++- src/server.h | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index a7375f643..7085d8a51 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6573,10 +6573,11 @@ void *timeThreadMain(void*) { g_forkLock->releaseRead(); time_thread_cv.wait(lock); g_forkLock->acquireRead(); + cycle_count = 0; } } updateCachedTime(); - if (cycle_count == 500) { + if (cycle_count == MAX_CYCLES_TO_HOLD_FORK_LOCK) { g_forkLock->releaseRead(); g_forkLock->acquireRead(); cycle_count = 0; diff --git a/src/server.h b/src/server.h index 7f846f199..6ff873416 100644 --- a/src/server.h +++ b/src/server.h @@ -734,6 +734,9 @@ typedef enum { #define REDISMODULE_AUX_BEFORE_RDB (1<<0) #define REDISMODULE_AUX_AFTER_RDB (1<<1) +/* Number of cycles before time thread gives up fork lock */ +#define MAX_CYCLES_TO_HOLD_FORK_LOCK 10 + struct RedisModule; struct RedisModuleIO; struct RedisModuleDigest; From cddd3b6896316e733fea2876857f02dff5dd3db6 Mon Sep 17 00:00:00 2001 From: malavan Date: Thu, 23 Dec 2021 20:59:08 +0000 Subject: [PATCH 11/19] add executeWithoutGlobalLock function --- src/networking.cpp | 99 ++++++++++++++-------------------------------- src/server.cpp | 53 +++++++++++++++++++++++-- src/server.h | 1 + 3 files changed, 80 insertions(+), 73 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index cc031a25a..0ffdf75d2 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3859,82 +3859,43 @@ int checkClientPauseTimeoutAndReturnIfPaused(void) { * * The function returns the total number of events processed. */ void processEventsWhileBlocked(int iel) { - serverAssert(GlobalLocksAcquired()); - int iterations = 4; /* See the function top-comment. */ - - std::vector vecclients; - listIter li; - listNode *ln; - listRewind(g_pserver->clients, &li); - // All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks - // so unlock here, and save them for reacquisition later - while ((ln = listNext(&li)) != nullptr) - { - client *c = (client*)listNodeValue(ln); - if (c->lock.fOwnLock()) { - serverAssert(c->flags & CLIENT_PROTECTED); // If the client is not protected we have no gurantee they won't be free'd in the event loop - c->lock.unlock(); - vecclients.push_back(c); + int eventsCount = 0; + executeWithoutGlobalLock([&](std::vector& vecclients){ + int iterations = 4; /* See the function top-comment. */ + try + { + ProcessingEventsWhileBlocked = 1; + while (iterations--) { + long long startval = g_pserver->events_processed_while_blocked; + long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el, + AE_FILE_EVENTS|AE_DONT_WAIT| + AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); + /* Note that g_pserver->events_processed_while_blocked will also get + * incremeted by callbacks called by the event loop handlers. */ + eventsCount += ae_events; + long long events = eventsCount - startval; + if (!events) break; + } + ProcessingEventsWhileBlocked = 0; } - } - - /* Since we're about to release our lock we need to flush the repl backlog queue */ - bool fReplBacklog = g_pserver->repl_batch_offStart >= 0; - if (fReplBacklog) { - flushReplBacklogToClients(); - g_pserver->repl_batch_idxStart = -1; - g_pserver->repl_batch_offStart = -1; - } - - long long eventsCount = 0; - aeReleaseLock(); - serverAssert(!GlobalLocksAcquired()); - try - { - ProcessingEventsWhileBlocked = 1; - while (iterations--) { - long long startval = g_pserver->events_processed_while_blocked; - long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el, - AE_FILE_EVENTS|AE_DONT_WAIT| - AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); - /* Note that g_pserver->events_processed_while_blocked will also get - * incremeted by callbacks called by the event loop handlers. */ - eventsCount += ae_events; - long long events = eventsCount - startval; - if (!events) break; - } - ProcessingEventsWhileBlocked = 0; - } - catch (...) - { - // Caller expects us to be locked so fix and rethrow - ProcessingEventsWhileBlocked = 0; - AeLocker locker; - locker.arm(nullptr); - locker.release(); - for (client *c : vecclients) - c->lock.lock(); - throw; - } - - AeLocker locker; - locker.arm(nullptr); - locker.release(); + catch (...) + { + // Caller expects us to be locked so fix and rethrow + ProcessingEventsWhileBlocked = 0; + AeLocker locker; + locker.arm(nullptr); + locker.release(); + for (client *c : vecclients) + c->lock.lock(); + throw; + } + }); g_pserver->events_processed_while_blocked += eventsCount; whileBlockedCron(); - // Restore it so the calling code is not confused - if (fReplBacklog) { - g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; - g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; - } - - for (client *c : vecclients) - c->lock.lock(); - // If a different thread processed the shutdown we need to abort the lua command or we will hang if (serverTL->el->stop) throw ShutdownException(); diff --git a/src/server.cpp b/src/server.cpp index 7085d8a51..739246834 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6224,6 +6224,52 @@ void closeChildUnusedResourceAfterFork() { cserver.pidfile = NULL; } +void executeWithoutGlobalLock(std::function&)> func) { + serverAssert(GlobalLocksAcquired()); + + std::vector vecclients; + listIter li; + listNode *ln; + listRewind(g_pserver->clients, &li); + + // All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks + // so unlock here, and save them for reacquisition later + while ((ln = listNext(&li)) != nullptr) + { + client *c = (client*)listNodeValue(ln); + if (c->lock.fOwnLock()) { + //serverAssert(c->flags & CLIENT_PROTECTED); // If the client is not protected we have no gurantee they won't be free'd in the event loop + c->lock.unlock(); + vecclients.push_back(c); + } + } + + /* Since we're about to release our lock we need to flush the repl backlog queue */ + bool fReplBacklog = g_pserver->repl_batch_offStart >= 0; + if (fReplBacklog) { + flushReplBacklogToClients(); + g_pserver->repl_batch_idxStart = -1; + g_pserver->repl_batch_offStart = -1; + } + + aeReleaseLock(); + serverAssert(!GlobalLocksAcquired()); + func(vecclients); + + AeLocker locker; + locker.arm(nullptr); + locker.release(); + + // Restore it so the calling code is not confused + if (fReplBacklog) { + g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; + } + + for (client *c : vecclients) + c->lock.lock(); +} + /* purpose is one of CHILD_TYPE_ types */ int redisFork(int purpose) { int childpid; @@ -6236,8 +6282,7 @@ int redisFork(int purpose) { openChildInfoPipe(); } - g_forkLock->releaseRead(); - g_forkLock->acquireWrite(); + executeWithoutGlobalLock([](std::vector&){ g_forkLock->releaseRead(); g_forkLock->acquireWrite(); }); if ((childpid = fork()) == 0) { /* Child */ g_pserver->in_fork_child = purpose; @@ -6246,8 +6291,7 @@ int redisFork(int purpose) { closeChildUnusedResourceAfterFork(); } else { /* Parent */ - g_forkLock->releaseWrite(); - g_forkLock->acquireRead(); + executeWithoutGlobalLock([](std::vector&){ g_forkLock->releaseWrite(); g_forkLock->acquireRead(); }); g_pserver->stat_total_forks++; g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ @@ -6585,6 +6629,7 @@ void *timeThreadMain(void*) { clock_nanosleep(CLOCK_MONOTONIC, 0, &delay, NULL); cycle_count++; } + g_forkLock->releaseRead(); } void *workerThreadMain(void *parg) diff --git a/src/server.h b/src/server.h index 6ff873416..fd7574910 100644 --- a/src/server.h +++ b/src/server.h @@ -2501,6 +2501,7 @@ void sendChildInfo(childInfoType info_type, size_t keys, const char *pname); void receiveChildInfo(void); /* Fork helpers */ +void executeWithoutGlobalLock(std::function&)> func); int redisFork(int type); int hasActiveChildProcess(); void resetChildState(); From b4b30dcf9d441a0c3db3421f69d829ae2880466a Mon Sep 17 00:00:00 2001 From: malavan Date: Thu, 23 Dec 2021 21:08:25 +0000 Subject: [PATCH 12/19] add write waiting to readWriteLock to avoid starvation --- src/readwritelock.h | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/readwritelock.h b/src/readwritelock.h index 117046567..dc38513ef 100644 --- a/src/readwritelock.h +++ b/src/readwritelock.h @@ -7,10 +7,11 @@ class readWriteLock { std::condition_variable m_cv; int m_readCount = 0; int m_writeCount = 0; + bool writeWaiting = false; public: void acquireRead() { std::unique_lock rm(m_readLock); - while (m_writeCount > 0) + while (m_writeCount > 0 || writeWaiting) m_cv.wait(rm); m_readCount++; } @@ -19,7 +20,7 @@ class readWriteLock { std::unique_lock rm(m_readLock, std::defer_lock); if (!rm.try_lock()) return false; - if (m_writeCount > 0) + if (m_writeCount > 0 || writeWaiting) return false; m_readCount++; return true; @@ -27,15 +28,18 @@ class readWriteLock { void acquireWrite(bool exclusive = true) { std::unique_lock rm(m_readLock); + writeWaiting = true; while (m_readCount > 0) m_cv.wait(rm); - if (exclusive) + if (exclusive) { /* Another thread might have the write lock while we have the read lock but won't be able to release it until they can acquire the read lock so release the read lock and try again instead of waiting to avoid deadlock */ while(!m_writeLock.try_lock()) m_cv.wait(rm); + } m_writeCount++; + writeWaiting = false; } bool tryAcquireWrite(bool exclusive = true) { @@ -55,8 +59,7 @@ class readWriteLock { std::unique_lock rm(m_readLock); serverAssert(m_readCount > 0); m_readCount--; - if (m_readCount == 0) - m_cv.notify_all(); + m_cv.notify_all(); } void releaseWrite(bool exclusive = true) { @@ -65,8 +68,7 @@ class readWriteLock { if (exclusive) m_writeLock.unlock(); m_writeCount--; - if (m_writeCount == 0) - m_cv.notify_all(); + m_cv.notify_all(); } bool hasReader() { From 9957c11e2d110a8ff35e16f3651e587a80d8bd8d Mon Sep 17 00:00:00 2001 From: malavan Date: Fri, 24 Dec 2021 03:02:51 +0000 Subject: [PATCH 13/19] restore assert that client is safe to unlock --- src/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index 739246834..501d0dcde 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6238,7 +6238,7 @@ void executeWithoutGlobalLock(std::function&)> func) { { client *c = (client*)listNodeValue(ln); if (c->lock.fOwnLock()) { - //serverAssert(c->flags & CLIENT_PROTECTED); // If the client is not protected we have no gurantee they won't be free'd in the event loop + serverAssert(c->flags & CLIENT_PROTECTED || c->flags & CLIENT_EXECUTING_COMMAND); // If the client is not protected we have no gurantee they won't be free'd in the event loop c->lock.unlock(); vecclients.push_back(c); } From 6749c2bcf0ba6c820d6562c8c0229649a8f1d36f Mon Sep 17 00:00:00 2001 From: malavan Date: Fri, 24 Dec 2021 06:54:54 +0000 Subject: [PATCH 14/19] add functions to upgrade/downgrade read to/from write in readWriteLock --- src/readwritelock.h | 28 ++++++++++++++++++++++++++++ src/server.cpp | 4 ++-- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/readwritelock.h b/src/readwritelock.h index dc38513ef..bec1f5e45 100644 --- a/src/readwritelock.h +++ b/src/readwritelock.h @@ -42,6 +42,23 @@ class readWriteLock { writeWaiting = false; } + void upgradeWrite(bool exclusive = true) { + std::unique_lock rm(m_readLock); + writeWaiting = true; + while (m_readCount > 1) + m_cv.wait(rm); + if (exclusive) { + /* Another thread might have the write lock while we have the read lock + but won't be able to release it until they can acquire the read lock + so release the read lock and try again instead of waiting to avoid deadlock */ + while(!m_writeLock.try_lock()) + m_cv.wait(rm); + } + m_writeCount++; + m_readCount--; + writeWaiting = false; + } + bool tryAcquireWrite(bool exclusive = true) { std::unique_lock rm(m_readLock, std::defer_lock); if (!rm.try_lock()) @@ -71,6 +88,17 @@ class readWriteLock { m_cv.notify_all(); } + void downgradeWrite(bool exclusive = true) { + std::unique_lock rm(m_readLock); + serverAssert(m_writeCount > 0); + if (exclusive) + m_writeLock.unlock(); + m_writeCount--; + while (m_writeCount > 0 || writeWaiting) + m_cv.wait(rm); + m_readCount++; + } + bool hasReader() { return m_readCount > 0; } diff --git a/src/server.cpp b/src/server.cpp index 501d0dcde..425f6b046 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6282,7 +6282,7 @@ int redisFork(int purpose) { openChildInfoPipe(); } - executeWithoutGlobalLock([](std::vector&){ g_forkLock->releaseRead(); g_forkLock->acquireWrite(); }); + executeWithoutGlobalLock([](std::vector&){ g_forkLock->upgradeWrite(); }); if ((childpid = fork()) == 0) { /* Child */ g_pserver->in_fork_child = purpose; @@ -6291,7 +6291,7 @@ int redisFork(int purpose) { closeChildUnusedResourceAfterFork(); } else { /* Parent */ - executeWithoutGlobalLock([](std::vector&){ g_forkLock->releaseWrite(); g_forkLock->acquireRead(); }); + executeWithoutGlobalLock([](std::vector&){ g_forkLock->downgradeWrite(); }); g_pserver->stat_total_forks++; g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ From cbb9ac38d26cac1bf8bfd5878ca37e42143149c3 Mon Sep 17 00:00:00 2001 From: malavan Date: Fri, 24 Dec 2021 17:08:27 +0000 Subject: [PATCH 15/19] reduce work done outside global lock --- src/server.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index 425f6b046..0d24e0583 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6281,8 +6281,8 @@ int redisFork(int purpose) { openChildInfoPipe(); } - - executeWithoutGlobalLock([](std::vector&){ g_forkLock->upgradeWrite(); }); + g_forkLock->releaseRead(); + executeWithoutGlobalLock([](std::vector&){ g_forkLock->acquireWrite(); }); if ((childpid = fork()) == 0) { /* Child */ g_pserver->in_fork_child = purpose; @@ -6291,7 +6291,8 @@ int redisFork(int purpose) { closeChildUnusedResourceAfterFork(); } else { /* Parent */ - executeWithoutGlobalLock([](std::vector&){ g_forkLock->downgradeWrite(); }); + g_forkLock->releaseWrite(); + g_forkLock->acquireRead(); g_pserver->stat_total_forks++; g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ From bb83aef1dac277d219aff2dbc82102d5fb9a8677 Mon Sep 17 00:00:00 2001 From: malavan Date: Fri, 24 Dec 2021 17:54:19 +0000 Subject: [PATCH 16/19] add latency monitor for forklock and expose writeWaiting status --- src/readwritelock.h | 20 ++++++++++++-------- src/server.cpp | 2 ++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/readwritelock.h b/src/readwritelock.h index bec1f5e45..d03e1c82b 100644 --- a/src/readwritelock.h +++ b/src/readwritelock.h @@ -7,11 +7,11 @@ class readWriteLock { std::condition_variable m_cv; int m_readCount = 0; int m_writeCount = 0; - bool writeWaiting = false; + bool m_writeWaiting = false; public: void acquireRead() { std::unique_lock rm(m_readLock); - while (m_writeCount > 0 || writeWaiting) + while (m_writeCount > 0 || m_writeWaiting) m_cv.wait(rm); m_readCount++; } @@ -20,7 +20,7 @@ class readWriteLock { std::unique_lock rm(m_readLock, std::defer_lock); if (!rm.try_lock()) return false; - if (m_writeCount > 0 || writeWaiting) + if (m_writeCount > 0 || m_writeWaiting) return false; m_readCount++; return true; @@ -28,7 +28,7 @@ class readWriteLock { void acquireWrite(bool exclusive = true) { std::unique_lock rm(m_readLock); - writeWaiting = true; + m_writeWaiting = true; while (m_readCount > 0) m_cv.wait(rm); if (exclusive) { @@ -39,12 +39,12 @@ class readWriteLock { m_cv.wait(rm); } m_writeCount++; - writeWaiting = false; + m_writeWaiting = false; } void upgradeWrite(bool exclusive = true) { std::unique_lock rm(m_readLock); - writeWaiting = true; + m_writeWaiting = true; while (m_readCount > 1) m_cv.wait(rm); if (exclusive) { @@ -56,7 +56,7 @@ class readWriteLock { } m_writeCount++; m_readCount--; - writeWaiting = false; + m_writeWaiting = false; } bool tryAcquireWrite(bool exclusive = true) { @@ -94,7 +94,7 @@ class readWriteLock { if (exclusive) m_writeLock.unlock(); m_writeCount--; - while (m_writeCount > 0 || writeWaiting) + while (m_writeCount > 0 || m_writeWaiting) m_cv.wait(rm); m_readCount++; } @@ -106,4 +106,8 @@ class readWriteLock { bool hasWriter() { return m_writeCount > 0; } + + bool writeWaiting() { + return m_writeWaiting; + } }; \ No newline at end of file diff --git a/src/server.cpp b/src/server.cpp index 0d24e0583..e473ffd7f 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6282,7 +6282,9 @@ int redisFork(int purpose) { openChildInfoPipe(); } g_forkLock->releaseRead(); + long long startWriteLock = ustime(); executeWithoutGlobalLock([](std::vector&){ g_forkLock->acquireWrite(); }); + latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000); if ((childpid = fork()) == 0) { /* Child */ g_pserver->in_fork_child = purpose; From b57b1c8ff8e83a2543dc30f50f70051274d3ed7f Mon Sep 17 00:00:00 2001 From: malavan Date: Fri, 24 Dec 2021 18:00:07 +0000 Subject: [PATCH 17/19] don't read query if waiting on fork --- src/networking.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/networking.cpp b/src/networking.cpp index 0ffdf75d2..693f52abd 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2522,6 +2522,9 @@ void readQueryFromClient(connection *conn) { size_t qblen; serverAssert(FCorrectThread(c)); + + if (g_forkLock->writeWaiting()) + return; AeLocker aelock; AssertCorrectThread(c); From 127becc9e0d4d358454c51ae615aa85b89a8136a Mon Sep 17 00:00:00 2001 From: malavan Date: Sun, 2 Jan 2022 11:09:21 +0000 Subject: [PATCH 18/19] catch exceptions and relock in executeWithoutGlobalLock --- src/networking.cpp | 8 +------- src/server.cpp | 17 ++++++++++++++--- src/server.h | 2 +- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 693f52abd..53ed1d413 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3864,7 +3864,7 @@ int checkClientPauseTimeoutAndReturnIfPaused(void) { void processEventsWhileBlocked(int iel) { int eventsCount = 0; - executeWithoutGlobalLock([&](std::vector& vecclients){ + executeWithoutGlobalLock([&](){ int iterations = 4; /* See the function top-comment. */ try { @@ -3884,13 +3884,7 @@ void processEventsWhileBlocked(int iel) { } catch (...) { - // Caller expects us to be locked so fix and rethrow ProcessingEventsWhileBlocked = 0; - AeLocker locker; - locker.arm(nullptr); - locker.release(); - for (client *c : vecclients) - c->lock.lock(); throw; } }); diff --git a/src/server.cpp b/src/server.cpp index e473ffd7f..f6f94ce54 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6224,7 +6224,7 @@ void closeChildUnusedResourceAfterFork() { cserver.pidfile = NULL; } -void executeWithoutGlobalLock(std::function&)> func) { +void executeWithoutGlobalLock(std::function func) { serverAssert(GlobalLocksAcquired()); std::vector vecclients; @@ -6254,7 +6254,18 @@ void executeWithoutGlobalLock(std::function&)> func) { aeReleaseLock(); serverAssert(!GlobalLocksAcquired()); - func(vecclients); + try { + func(); + } + catch (...) { + // Caller expects us to be locked so fix and rethrow + AeLocker locker; + locker.arm(nullptr); + locker.release(); + for (client *c : vecclients) + c->lock.lock(); + throw; + } AeLocker locker; locker.arm(nullptr); @@ -6283,7 +6294,7 @@ int redisFork(int purpose) { } g_forkLock->releaseRead(); long long startWriteLock = ustime(); - executeWithoutGlobalLock([](std::vector&){ g_forkLock->acquireWrite(); }); + executeWithoutGlobalLock([](){ g_forkLock->acquireWrite(); }); latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000); if ((childpid = fork()) == 0) { /* Child */ diff --git a/src/server.h b/src/server.h index fd7574910..d0c443934 100644 --- a/src/server.h +++ b/src/server.h @@ -2501,7 +2501,7 @@ void sendChildInfo(childInfoType info_type, size_t keys, const char *pname); void receiveChildInfo(void); /* Fork helpers */ -void executeWithoutGlobalLock(std::function&)> func); +void executeWithoutGlobalLock(std::function func); int redisFork(int type); int hasActiveChildProcess(); void resetChildState(); From 1e3fe8cab38d938c266fc0cfa059b123d9871e1b Mon Sep 17 00:00:00 2001 From: malavan Date: Wed, 12 Jan 2022 23:41:06 +0000 Subject: [PATCH 19/19] forklock only for time thread --- src/networking.cpp | 3 --- src/server.cpp | 9 +-------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 53ed1d413..211ea1046 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2522,9 +2522,6 @@ void readQueryFromClient(connection *conn) { size_t qblen; serverAssert(FCorrectThread(c)); - - if (g_forkLock->writeWaiting()) - return; AeLocker aelock; AssertCorrectThread(c); diff --git a/src/server.cpp b/src/server.cpp index f6f94ce54..b98a44307 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2632,8 +2632,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) { sleeping_threads++; serverAssert(sleeping_threads <= cserver.cthreads); } - - g_forkLock->releaseRead(); /* Determine whether the modules are enabled before sleeping, and use that result both here, and after wakeup to avoid double acquire or release of the GIL */ @@ -2655,7 +2653,6 @@ void afterSleep(struct aeEventLoop *eventLoop) { Otherwise you may double acquire the GIL and cause deadlocks in the module */ if (!ProcessingEventsWhileBlocked) { if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); - g_forkLock->acquireRead(); wakeTimeThread(); } } @@ -6292,9 +6289,8 @@ int redisFork(int purpose) { openChildInfoPipe(); } - g_forkLock->releaseRead(); long long startWriteLock = ustime(); - executeWithoutGlobalLock([](){ g_forkLock->acquireWrite(); }); + g_forkLock->acquireWrite(); latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000); if ((childpid = fork()) == 0) { /* Child */ @@ -6305,7 +6301,6 @@ int redisFork(int purpose) { } else { /* Parent */ g_forkLock->releaseWrite(); - g_forkLock->acquireRead(); g_pserver->stat_total_forks++; g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ @@ -6661,7 +6656,6 @@ void *workerThreadMain(void *parg) } moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run - g_forkLock->acquireRead(); aeEventLoop *el = g_pserver->rgthreadvar[iel].el; try { @@ -6670,7 +6664,6 @@ void *workerThreadMain(void *parg) catch (ShutdownException) { } - g_forkLock->releaseRead(); moduleReleaseGIL(true); serverAssert(!GlobalLocksAcquired()); aeDeleteEventLoop(el);