Skip to content

Commit

Permalink
Revert "refactor client tracking, fix atomicity, squashing and multi/… (
Browse files Browse the repository at this point in the history
#3122)

Revert "refactor client tracking, fix atomicity, squashing and multi/exec (#2970)"

This reverts commit b1063f7.
  • Loading branch information
adiholden authored Jun 4, 2024
1 parent 6e33261 commit 2a5a53f
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 351 deletions.
15 changes: 11 additions & 4 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
cc_->conn_closing = true; // Signal dispatch to close.
evc_.notify();
phase_ = SHUTTING_DOWN;

VLOG(2) << "Before dispatch_fb.join()";
dispatch_fb_.JoinIfNeeded();
VLOG(2) << "After dispatch_fb.join()";
Expand Down Expand Up @@ -1123,10 +1124,6 @@ void Connection::HandleMigrateRequest() {
}
}

// This triggers when a pub/sub connection both publish and subscribe to the
// same channel. See #3035 on github for details.
// DCHECK(dispatch_q_.empty());

// In case we Yield()ed in Migrate() above, dispatch_fb_ might have been started.
LaunchDispatchFiberIfNeeded();
}
Expand Down Expand Up @@ -1655,6 +1652,16 @@ void Connection::RequestAsyncMigration(util::fb2::ProactorBase* dest) {
migration_request_ = dest;
}

void Connection::SetClientTrackingSwitch(bool is_on) {
tracking_enabled_ = is_on;
if (tracking_enabled_)
cc_->subscriptions++;
}

bool Connection::IsTrackingOn() const {
return tracking_enabled_;
}

void Connection::StartTrafficLogging(string_view path) {
OpenTrafficLogger(path);
}
Expand Down
6 changes: 6 additions & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ class Connection : public util::Connection {
// Connections will migrate at most once, and only when the flag --migrate_connections is true.
void RequestAsyncMigration(util::fb2::ProactorBase* dest);

void SetClientTrackingSwitch(bool is_on);

bool IsTrackingOn() const;

// Starts traffic logging in the calling thread. Must be a proactor thread.
// Each thread creates its own log file combining requests from all the connections in
// that thread. A noop if the thread is already logging.
Expand Down Expand Up @@ -446,6 +450,8 @@ class Connection : public util::Connection {
// Per-thread queue backpressure structs.
static thread_local QueueBackpressure tl_queue_backpressure_;

// a flag indicating whether the client has turned on client tracking.
bool tracking_enabled_ = false;
bool skip_next_squashing_ = false; // Forcefully skip next squashing

// Connection migration vars, see RequestAsyncMigration() above.
Expand Down
3 changes: 2 additions & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ add_library(dfly_transaction db_slice.cc malloc_stats.cc blocking_controller.cc
common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc
server_state.cc table.cc top_keys.cc transaction.cc tx_base.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
${TX_LINUX_SRCS} acl/acl_log.cc slowlog.cc)
${TX_LINUX_SRCS} acl/acl_log.cc slowlog.cc
)

SET(SEARCH_FILES search/search_family.cc search/doc_index.cc search/doc_accessors.cc
search/aggregator.cc)
Expand Down
8 changes: 0 additions & 8 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,4 @@ void ConnectionState::ExecInfo::ClearWatched() {
watched_existed = 0;
}

bool ConnectionState::ClientTracking::ShouldTrackKeys() const {
if (!IsTrackingOn()) {
return false;
}

return !optin_ || (seq_num_ == (1 + caching_seq_num_));
}

} // namespace dfly
83 changes: 0 additions & 83 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,87 +147,6 @@ struct ConnectionState {

size_t UsedMemory() const;

// Client tracking is a per-connection state machine that adheres to the requirements
// of the CLIENT TRACKING command. Note that the semantics described below are enforced
// by the tests in server_family_test. The rules are:
// 1. If CLIENT TRACKING is ON then each READ command must be tracked. Invalidation
// messages are sent `only once`. Subsequent changes of the same key require the
// client to re-read the key in order to receive the next invalidation message.
// 2. CLIENT TRACKING ON OPTIN turns on optional tracking. Read commands are not
// tracked unless the client issues a CLIENT CACHING YES command which conditionally
// allows the tracking of the command that follows CACHING YES). For example:
// >> CLIENT TRACKING ON
// >> CLIENT CACHING YES
// >> GET foo <--------------------- From now foo is being tracked
// However:
// >> CLIENT TRACKING ON
// >> CLIENT CACHING YES
// >> SET foo bar
// >> GET foo <--------------------- is *NOT* tracked since GET does not succeed CACHING
// Also, in the context of multi transactions, CLIENT CACHING YES is *STICKY*:
// >> CLIENT TRACKING ON
// >> CLIENT CACHING YES
// >> MULTI
// >> GET foo
// >> SET foo bar
// >> GET brother_foo
// >> EXEC
// From this point onwards `foo` and `get` keys are tracked. Same aplies if CACHING YES
// is used within the MULTI/EXEC block.
//
// The state machine implements the above rules. We need to track:
// 1. If TRACKING is ON and OPTIN
// 2. Stickiness of CACHING as described above
//
// We introduce a monotonic counter called sequence number which we increment only:
// * On InvokeCmd when we are not Collecting (multi)
// We introduce another counter called caching_seq_num which is set to seq_num
// when the users sends a CLIENT CACHING YES command
// If seq_num == caching_seq_num + 1 then we know that we should Track().
class ClientTracking {
public:
// Sets to true when CLIENT TRACKING is ON
void SetClientTracking(bool is_on) {
tracking_enabled_ = is_on;
}

// Increment current sequence number
void IncrementSequenceNumber() {
++seq_num_;
}

// Set if OPTIN subcommand is used in CLIENT TRACKING
void SetOptin(bool optin) {
optin_ = optin;
}

// Check if the keys should be tracked. Result adheres to the state machine described above.
bool ShouldTrackKeys() const;

// Check only if CLIENT TRACKING is ON
bool IsTrackingOn() const {
return tracking_enabled_;
}

// Called by CLIENT CACHING YES and caches the current seq_num_
void SetCachingSequenceNumber(bool is_multi) {
// We need -1 when we are in multi
caching_seq_num_ = is_multi && seq_num_ != 0 ? seq_num_ - 1 : seq_num_;
}

void ResetCachingSequenceNumber() {
caching_seq_num_ = 0;
}

private:
// a flag indicating whether the client has turned on client tracking.
bool tracking_enabled_ = false;
bool optin_ = false;
// sequence number
size_t seq_num_ = 0;
size_t caching_seq_num_ = 0;
};

public:
DbIndex db_index = 0;

Expand All @@ -242,7 +161,6 @@ struct ConnectionState {
std::optional<SquashingInfo> squashing_info;
std::unique_ptr<ScriptInfo> script_info;
std::unique_ptr<SubscribeInfo> subscribe_info;
ClientTracking tracking_info_;
};

class ConnectionContext : public facade::ConnectionContext {
Expand All @@ -265,7 +183,6 @@ class ConnectionContext : public facade::ConnectionContext {
// TODO: to introduce proper accessors.
Transaction* transaction = nullptr;
const CommandId* cid = nullptr;

ConnectionState conn_state;

DbIndex db_index() const {
Expand Down
39 changes: 17 additions & 22 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1423,29 +1423,24 @@ void DbSlice::SendInvalidationTrackingMessage(std::string_view key) {
return;

auto it = client_tracking_map_.find(key);
if (it == client_tracking_map_.end()) {
return;
}
auto& client_set = it->second;
// Notify all the clients. We copy key because we dispatch briefly below and
// we need to preserve its lifetime
// TODO this key is further copied within DispatchFiber. Fix this.
auto cb = [key = std::string(key), client_set = std::move(client_set)](unsigned idx,
util::ProactorBase*) {
for (auto& client : client_set) {
if (client.IsExpired() || (client.Thread() != idx)) {
continue;
}
auto* conn = client.Get();
auto* cntx = static_cast<ConnectionContext*>(conn->cntx());
if (cntx && cntx->conn_state.tracking_info_.IsTrackingOn()) {
conn->SendInvalidationMessageAsync({key});
if (it != client_tracking_map_.end()) {
// notify all the clients.
auto& client_set = it->second;
auto cb = [key, client_set = std::move(client_set)](unsigned idx, util::ProactorBase*) {
for (auto it = client_set.begin(); it != client_set.end(); ++it) {
if ((unsigned int)it->Thread() != idx)
continue;
facade::Connection* conn = it->Get();
if ((conn != nullptr) && conn->IsTrackingOn()) {
std::string key_str = {key.begin(), key.end()};
conn->SendInvalidationMessageAsync({key_str});
}
}
}
};
shard_set->pool()->DispatchBrief(std::move(cb));
// remove this key from the tracking table as the key no longer exists
client_tracking_map_.erase(key);
};
shard_set->pool()->DispatchBrief(std::move(cb));
// remove this key from the tracking table as the key no longer exists
client_tracking_map_.erase(key);
}
}

void DbSlice::PerformDeletion(PrimeIterator del_it, DbTable* table) {
Expand Down
Loading

0 comments on commit 2a5a53f

Please sign in to comment.