Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for selective synchronization as a startup flag #1076

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <sys/resource.h>
#include <sys/time.h>
#include <signal.h>
#include <string.h>

#include <bedrockVersion.h>
#include <BedrockCore.h>
Expand Down Expand Up @@ -1338,6 +1339,20 @@ BedrockServer::BedrockServer(const SData& args_)
ref(_leaderVersion),
ref(_syncNodeQueuedCommands),
ref(*this));

// Add syncType arg. If no -syncType specified, default to QUORUM
_isSyncSet = args.isSet("-syncType");
_syncType = -1;
if (_isSyncSet) {
string cmd = args["-syncType"];
if ( strcasecmp(cmd.c_str(),SC_ONE) == 0) {
_syncType = SQLiteNode::ONE;
} else if (strcasecmp(cmd.c_str(),SC_ASYNC) == 0) {
_syncType = SQLiteNode::ASYNC;
} else if (strcasecmp(cmd.c_str(),SC_QUORUM) == 0) {
_syncType = SQLiteNode::QUORUM;
}
}
}

BedrockServer::~BedrockServer() {
Expand Down Expand Up @@ -1606,6 +1621,15 @@ void BedrockServer::postPoll(fd_map& fdm, uint64_t& nextActivity) {
// Create a command.
unique_ptr<BedrockCommand> command = getCommandFromPlugins(move(request));

// Check if -syncType is set and query starts with INSERT/DELETE/UPDATE
if (_isSyncSet && command->request.methodLine.compare("QUERY")) {
string query = STrim(SToUpper(command->request["query"]));
if (SStartsWith(SToUpper(query),"INSERT") || SStartsWith(SToUpper(query),"DELETE") || SStartsWith(SToUpper(query),"UPDATE")) {
command->writeConsistency = (SQLiteNode::ConsistencyLevel)_syncType;
SINFO("Forcing " << _syncType << " consistency for command " << command->request.methodLine);
}
}

if (command->writeConsistency != SQLiteNode::QUORUM
&& _syncCommands.find(command->request.methodLine) != _syncCommands.end()) {

Expand Down
8 changes: 8 additions & 0 deletions BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
#include "BedrockCommandQueue.h"
#include "BedrockTimeoutCommandQueue.h"

#define SC_ONE "ONE"
#define SC_QUORUM "QUORUM"
#define SC_ASYNC "ASYNC"

class BedrockServer : public SQLiteServer {
public:

Expand Down Expand Up @@ -483,4 +487,8 @@ class BedrockServer : public SQLiteServer {
// This is a snapshot of the state of the node taken at the beginning of any call to peekCommand or processCommand
// so that the state can't change for the lifetime of that call, from the view of that function.
static thread_local atomic<SQLiteNode::State> _nodeStateSnapshot;

// These allow for the ability to startup with -syncType while specifying ONE/ASYNC/QUORUM
bool _isSyncSet;
int _syncType;
};
2 changes: 2 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ To see a full list of Bedrock's configuration options, just run `bedrock -?` on
-readThreads <#> Number of read threads to start (min 1, defaults to 1)
-queryLog <filename> Set the query log filename (default 'queryLog.csv', SIGUSR2/SIGQUIT to enable/disable)
-maxJournalSize <#commits> Number of commits to retainin the historical journal (default 1000000)
-syncType <value> Selective synchronization (QUORUM, ONE, ASYNC) and (defaults to QUORUM)
-synchronous <value> Set the PRAGMA schema.synchronous

Quick Start Tips:
-----------------
Expand Down
1 change: 1 addition & 0 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ int main(int argc, char* argv[]) {
<< endl;
cout << "-maxJournalSize <#commits> Number of commits to retain in the historical journal (default 1000000)"
<< endl;
cout << "-syncType <value> Selective synchronization (QUORUM, ONE, ASYNC) and (defaults to QUORUM) " << endl;
cout << "-synchronous <value> Set the PRAGMA schema.synchronous "
"(defaults see https://sqlite.org/pragma.html#pragma_synchronous)"
<< endl;
Expand Down