Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add readwrite lock for forking #386

Merged
merged 20 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b939389
add readwrite lock for forking
MalavanEQAlpha Dec 21, 2021
4aaa7b7
add forklock to time thread, wake time thread after acquiring locks
MalavanEQAlpha Dec 22, 2021
754d5a3
move fork lock management within redisFork
MalavanEQAlpha Dec 22, 2021
d2c080d
further generalize readWrite lock and use it for moduleGIL
MalavanEQAlpha Dec 22, 2021
eec6b51
explain deadlock possibility in acquireWrite
MalavanEQAlpha Dec 22, 2021
8dff4f3
need to acquire forklock on thread start
MalavanEQAlpha Dec 22, 2021
2c932d7
release fork lock on thread end
MalavanEQAlpha Dec 22, 2021
c8bd5db
timethread only releases forklock when waiting
MalavanEQAlpha Dec 22, 2021
c167902
reacquire fork lock every 500 time thread loops
MalavanEQAlpha Dec 22, 2021
9e92c03
reset cycle count on cvwait and name constant
MalavanEQAlpha Dec 22, 2021
cddd3b6
add executeWithoutGlobalLock function
MalavanEQAlpha Dec 23, 2021
b4b30dc
add write waiting to readWriteLock to avoid starvation
MalavanEQAlpha Dec 23, 2021
9957c11
restore assert that client is safe to unlock
MalavanEQAlpha Dec 24, 2021
6749c2b
add functions to upgrade/downgrade read to/from write in readWriteLock
MalavanEQAlpha Dec 24, 2021
cbb9ac3
reduce work done outside global lock
MalavanEQAlpha Dec 24, 2021
bb83aef
add latency monitor for forklock and expose writeWaiting status
MalavanEQAlpha Dec 24, 2021
b57b1c8
don't read query if waiting on fork
MalavanEQAlpha Dec 24, 2021
127becc
catch exceptions and relock in executeWithoutGlobalLock
MalavanEQAlpha Jan 2, 2022
1e3fe8c
forklock only for time thread
MalavanEQAlpha Jan 12, 2022
83563f6
Merge branch 'unstable' into fix_rdb_hang
MalavanEQAlpha Jan 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 12 additions & 53 deletions src/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,7 @@ typedef struct RedisModuleCommandFilter {
static list *moduleCommandFilters;

/* Module GIL Variables */
static int s_cAcquisitionsServer = 0;
static int s_cAcquisitionsModule = 0;
static std::mutex s_mutex;
static std::condition_variable s_cv;
static std::recursive_mutex s_mutexModule;
static readWriteLock s_moduleGIL;
thread_local bool g_fModuleThread = false;

typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
Expand Down Expand Up @@ -5969,95 +5965,58 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
// as the server thread acquisition is sufficient. If we did try to lock we would deadlock
static bool FModuleCallBackLock(bool fServerThread)
{
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0;
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_moduleGIL.hasReader();
}
void moduleAcquireGIL(int fServerThread, int fExclusive) {
std::unique_lock<std::mutex> lock(s_mutex);
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;

if (FModuleCallBackLock(fServerThread)) {
return;
}

while (*pcheck > 0)
s_cv.wait(lock);

if (fServerThread)
{
++s_cAcquisitionsServer;
s_moduleGIL.acquireRead();
}
else
{
// only try to acquire the mutexModule in exclusive mode
if (fExclusive){
// It is possible that another module thread holds the GIL (and s_mutexModule as a result).
// When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns.
// This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns.
// As a result, a deadlock has occured.
// We release the lock on s_mutex and wait until we are able to safely acquire the GIL
// in order to prevent this deadlock from occuring.
while (!s_mutexModule.try_lock())
s_cv.wait(lock);
}
++s_cAcquisitionsModule;
fModuleGILWlocked++;
s_moduleGIL.acquireWrite(fExclusive);
}
}

int moduleTryAcquireGIL(bool fServerThread, int fExclusive) {
std::unique_lock<std::mutex> lock(s_mutex, std::defer_lock);
if (!lock.try_lock())
return 1;
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;

if (FModuleCallBackLock(fServerThread)) {
return 0;
}

if (*pcheck > 0)
return 1;

if (fServerThread)
{
++s_cAcquisitionsServer;
if (!s_moduleGIL.tryAcquireRead())
return 1;
}
else
{
// only try to acquire the mutexModule in exclusive mode
if (fExclusive){
if (!s_mutexModule.try_lock())
return 1;
}
++s_cAcquisitionsModule;
fModuleGILWlocked++;
if (!s_moduleGIL.tryAcquireWrite(fExclusive))
return 1;
}
return 0;
}

void moduleReleaseGIL(int fServerThread, int fExclusive) {
std::unique_lock<std::mutex> lock(s_mutex);

if (FModuleCallBackLock(fServerThread)) {
return;
}

if (fServerThread)
{
--s_cAcquisitionsServer;
s_moduleGIL.releaseRead();
}
else
{
// only try to release the mutexModule in exclusive mode
if (fExclusive)
s_mutexModule.unlock();
--s_cAcquisitionsModule;
fModuleGILWlocked--;
s_moduleGIL.releaseWrite(fExclusive);
}
s_cv.notify_all();
}

int moduleGILAcquiredByModule(void) {
return fModuleGILWlocked > 0;
return s_moduleGIL.hasWriter();
}


Expand Down
93 changes: 24 additions & 69 deletions src/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3861,82 +3861,37 @@ int checkClientPauseTimeoutAndReturnIfPaused(void) {
*
* The function returns the total number of events processed. */
void processEventsWhileBlocked(int iel) {
serverAssert(GlobalLocksAcquired());
int iterations = 4; /* See the function top-comment. */

std::vector<client*> vecclients;
listIter li;
listNode *ln;
listRewind(g_pserver->clients, &li);

// All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks
// so unlock here, and save them for reacquisition later
while ((ln = listNext(&li)) != nullptr)
{
client *c = (client*)listNodeValue(ln);
if (c->lock.fOwnLock()) {
serverAssert(c->flags & CLIENT_PROTECTED); // If the client is not protected we have no gurantee they won't be free'd in the event loop
c->lock.unlock();
vecclients.push_back(c);
int eventsCount = 0;
executeWithoutGlobalLock([&](){
int iterations = 4; /* See the function top-comment. */
try
{
ProcessingEventsWhileBlocked = 1;
while (iterations--) {
long long startval = g_pserver->events_processed_while_blocked;
long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el,
AE_FILE_EVENTS|AE_DONT_WAIT|
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
/* Note that g_pserver->events_processed_while_blocked will also get
* incremeted by callbacks called by the event loop handlers. */
eventsCount += ae_events;
long long events = eventsCount - startval;
if (!events) break;
}
ProcessingEventsWhileBlocked = 0;
}
}

/* Since we're about to release our lock we need to flush the repl backlog queue */
bool fReplBacklog = g_pserver->repl_batch_offStart >= 0;
if (fReplBacklog) {
flushReplBacklogToClients();
g_pserver->repl_batch_idxStart = -1;
g_pserver->repl_batch_offStart = -1;
}

long long eventsCount = 0;
aeReleaseLock();
serverAssert(!GlobalLocksAcquired());
try
{
ProcessingEventsWhileBlocked = 1;
while (iterations--) {
long long startval = g_pserver->events_processed_while_blocked;
long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el,
AE_FILE_EVENTS|AE_DONT_WAIT|
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
/* Note that g_pserver->events_processed_while_blocked will also get
* incremeted by callbacks called by the event loop handlers. */
eventsCount += ae_events;
long long events = eventsCount - startval;
if (!events) break;
}
ProcessingEventsWhileBlocked = 0;
}
catch (...)
{
// Caller expects us to be locked so fix and rethrow
ProcessingEventsWhileBlocked = 0;
AeLocker locker;
locker.arm(nullptr);
locker.release();
for (client *c : vecclients)
c->lock.lock();
throw;
}

AeLocker locker;
locker.arm(nullptr);
locker.release();
catch (...)
{
ProcessingEventsWhileBlocked = 0;
throw;
}
});

g_pserver->events_processed_while_blocked += eventsCount;

whileBlockedCron();

// Restore it so the calling code is not confused
if (fReplBacklog) {
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
}

for (client *c : vecclients)
c->lock.lock();

// If a different thread processed the shutdown we need to abort the lua command or we will hang
if (serverTL->el->stop)
throw ShutdownException();
Expand Down
113 changes: 113 additions & 0 deletions src/readwritelock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#pragma once
#include <condition_variable>

class readWriteLock {
std::mutex m_readLock;
std::recursive_mutex m_writeLock;
std::condition_variable m_cv;
int m_readCount = 0;
int m_writeCount = 0;
bool m_writeWaiting = false;
public:
void acquireRead() {
std::unique_lock<std::mutex> rm(m_readLock);
while (m_writeCount > 0 || m_writeWaiting)
m_cv.wait(rm);
m_readCount++;
}

bool tryAcquireRead() {
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock);
if (!rm.try_lock())
return false;
if (m_writeCount > 0 || m_writeWaiting)
return false;
m_readCount++;
return true;
}

void acquireWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
m_writeWaiting = true;
while (m_readCount > 0)
m_cv.wait(rm);
if (exclusive) {
/* Another thread might have the write lock while we have the read lock
but won't be able to release it until they can acquire the read lock
so release the read lock and try again instead of waiting to avoid deadlock */
while(!m_writeLock.try_lock())
m_cv.wait(rm);
}
m_writeCount++;
m_writeWaiting = false;
}

void upgradeWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
m_writeWaiting = true;
while (m_readCount > 1)
m_cv.wait(rm);
if (exclusive) {
/* Another thread might have the write lock while we have the read lock
but won't be able to release it until they can acquire the read lock
so release the read lock and try again instead of waiting to avoid deadlock */
while(!m_writeLock.try_lock())
m_cv.wait(rm);
}
m_writeCount++;
m_readCount--;
m_writeWaiting = false;
}

bool tryAcquireWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock);
if (!rm.try_lock())
return false;
if (m_readCount > 0)
return false;
if (exclusive)
if (!m_writeLock.try_lock())
return false;
m_writeCount++;
return true;
}

void releaseRead() {
std::unique_lock<std::mutex> rm(m_readLock);
serverAssert(m_readCount > 0);
m_readCount--;
m_cv.notify_all();
}

void releaseWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
serverAssert(m_writeCount > 0);
if (exclusive)
m_writeLock.unlock();
m_writeCount--;
m_cv.notify_all();
}

void downgradeWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
serverAssert(m_writeCount > 0);
if (exclusive)
m_writeLock.unlock();
m_writeCount--;
while (m_writeCount > 0 || m_writeWaiting)
m_cv.wait(rm);
m_readCount++;
}

bool hasReader() {
return m_readCount > 0;
}

bool hasWriter() {
return m_writeCount > 0;
}

bool writeWaiting() {
return m_writeWaiting;
}
};
Loading