diff --git a/src/IStorage.h b/src/IStorage.h index 79726daf5..82f8087a0 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -7,6 +7,13 @@ #define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f" struct StorageToken { + enum class TokenType { + SingleRead, + SingleWrite, + Delete, + BatchWrite, + }; + TokenType type; std::unordered_set setc; struct redisDbPersistentData *db; virtual ~StorageToken() {} @@ -46,6 +53,9 @@ class IStorage virtual StorageToken *begin_retrieve(struct aeEventLoop *, aePostFunctionTokenProc, sds *, size_t) {return nullptr;}; virtual void complete_retrieve(StorageToken * /*tok*/, callbackSingle /*fn*/) {}; + virtual StorageToken* begin_endWriteBatch(struct aeEventLoop *, aePostFunctionTokenProc*) {} // NOP + virtual void complete_endWriteBatch(StorageToken * /*tok*/) {}; + virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) { beginWriteBatch(); for (size_t ielem = 0; ielem < celem; ++ielem) { diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index c0ff305b1..51d2e8c1f 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -233,4 +233,5 @@ void StorageCache::emergencyFreeCache() { dictRelease(d); }); } -} \ No newline at end of file +} + diff --git a/src/StorageCache.h b/src/StorageCache.h index 614f8c27b..18fd7f7e4 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -45,6 +45,8 @@ class StorageCache void retrieve(sds key, IStorage::callbackSingle fn) const; StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds *rgkey, size_t ckey); void complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn); + StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc) {return m_spstorage->begin_endWriteBatch(el,proc);} // NOP + void complete_endWriteBatch(StorageToken *tok) {m_spstorage->complete_endWriteBatch(tok);}; bool erase(sds key); void emergencyFreeCache(); bool keycacheIsEnabled() const { return m_pdict != nullptr; } diff --git a/src/db.cpp b/src/db.cpp index 013473163..9a0f35684 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -3087,7 +3087,14 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot ** m_pdbSnapshotStorageFlush = nullptr; } if (m_spstorage != nullptr) - m_spstorage->endWriteBatch(); + { + auto *tok = m_spstorage->begin_endWriteBatch(serverTL->el, storageLoadCallback); + if (tok != nullptr) + { + tok->db = this; + tok->type = StorageToken::TokenType::BatchWrite; + } + } } redisDbPersistentData::~redisDbPersistentData() @@ -3420,7 +3427,13 @@ void redisDbPersistentData::prefetchKeysFlash(std::unordered_set &setc) void redisDbPersistentData::processStorageToken(StorageToken *tok) { auto setc = std::move(tok->setc); - tok->db->m_spstorage->complete_retrieve(tok, [&](const char *szKey, size_t cbKey, const void *data, size_t cb) { + switch (tok->type) + { + + case StorageToken::TokenType::SingleRead: + { + tok->db->m_spstorage->complete_retrieve(tok, [&](const char *szKey, size_t cbKey, const void *data, size_t cb) + { auto *db = tok->db; size_t offset = 0; sds key = sdsnewlen(szKey, -((ssize_t)cbKey)); @@ -3451,11 +3464,23 @@ void redisDbPersistentData::processStorageToken(StorageToken *tok) { serverAssert(db->m_setexpire->find(key) != db->m_setexpire->end()); } serverAssert(o->FExpires() == (db->m_setexpire->find(key) != db->m_setexpire->end())); - } - }); - tok = nullptr; // Invalid past this point + } }); + break; + } + case StorageToken::TokenType::BatchWrite: + { + tok->db->m_spstorage->complete_endWriteBatch(tok); + break; + } + default: + serverAssert((tok->type == StorageToken::TokenType::SingleRead) || (tok->type == StorageToken::TokenType::BatchWrite)); + break; + } //switch end - for (client *c : setc) { + tok = nullptr; // Invalid past this point + + for (client *c : setc) + { std::unique_lock ul(c->lock); if (c->flags & CLIENT_BLOCKED) unblockClient(c); diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 16aed43b7..56f00821f 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -255,6 +255,31 @@ void RocksDBStorageProvider::endWriteBatch() m_lock.unlock(); } +struct BatchStorageToken : public StorageToken { + std::shared_ptr tspdb; // Note: This must be first so it is deleted last + std::unique_ptr tspbatch; +}; + +StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* callback){ + BatchStorageToken *tok = new BatchStorageToken(); + tok->tspbatch = std::move(m_spbatch); + tok->tspdb = m_spdb; + m_spbatch = nullptr; + m_lock.unlock(); + (*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok]{ + tok->tspdb->Write(WriteOptions(),tok->tspbatch.get()->GetWriteBatch()); + aePostFunction(el,callback,tok); + }); + + return tok; +} + +void RocksDBStorageProvider::complete_endWriteBatch(StorageToken* tok){ + delete tok; + tok = nullptr; +} + + void RocksDBStorageProvider::batch_lock() { m_lock.lock(); @@ -330,4 +355,4 @@ void RocksDBStorageProvider::complete_retrieve(StorageToken *tok, callbackSingle } } delete rtok; -} \ No newline at end of file +} diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index 01edb4975..d5587c9e9 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -42,6 +42,8 @@ class RocksDBStorageProvider : public IStorage virtual void beginWriteBatch() override; virtual void endWriteBatch() override; + virtual StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc); + virtual void complete_endWriteBatch(StorageToken *tok); virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) override;