Skip to content

Commit

Permalink
feat(dup): handle non-idempotent writes during duplication (#411)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored Mar 19, 2020
1 parent 558cfc7 commit 0c3fbcf
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 15 deletions.
6 changes: 0 additions & 6 deletions src/dist/replication/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ replication_options::replication_options()
verbose_commit_log_on_start = false;
delay_for_fd_timeout_on_start = false;
empty_write_disabled = false;
allow_non_idempotent_write = false;
duplication_enabled = true;

prepare_timeout_ms_for_secondaries = 1000;
Expand Down Expand Up @@ -265,11 +264,6 @@ void replication_options::initialize()
"empty_write_disabled",
empty_write_disabled,
"whether to disable empty write, default is false");
allow_non_idempotent_write =
dsn_config_get_value_bool("replication",
"allow_non_idempotent_write",
allow_non_idempotent_write,
"whether to allow non-idempotent write, default is false");

duplication_enabled = dsn_config_get_value_bool(
"replication", "duplication_enabled", duplication_enabled, "is duplication enabled");
Expand Down
1 change: 0 additions & 1 deletion src/dist/replication/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class replication_options
bool verbose_commit_log_on_start;
bool delay_for_fd_timeout_on_start;
bool empty_write_disabled;
bool allow_non_idempotent_write;
bool duplication_enabled;

int32_t prepare_timeout_ms_for_secondaries;
Expand Down
9 changes: 8 additions & 1 deletion src/dist/replication/lib/duplication/mutation_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,14 @@ add_mutation_if_valid(mutation_ptr &mu, mutation_tuple_set &mutations, decree st
if (update.code == RPC_REPLICATION_WRITE_EMPTY) {
continue;
}

// Ignore non-idempotent writes.
// Normally a duplicating replica will reply non-idempotent writes with
// ERR_OPERATION_DISABLED, but there could still be a mutation written
// before the duplication was added.
// To ignore means this write will be lost, which is acceptable under this rare case.
if (!task_spec::get(update.code)->rpc_request_is_write_idempotent) {
continue;
}
blob bb;
if (update.data.buffer() != nullptr) {
bb = std::move(update.data);
Expand Down
10 changes: 10 additions & 0 deletions src/dist/replication/lib/duplication/test/duplication_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
namespace dsn {
namespace replication {

DEFINE_STORAGE_WRITE_RPC_CODE(RPC_DUPLICATION_IDEMPOTENT_WRITE, NOT_ALLOW_BATCH, IS_IDEMPOTENT)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_DUPLICATION_NON_IDEMPOTENT_WRITE, NOT_ALLOW_BATCH, NOT_IDEMPOTENT)

class duplication_test_base : public replica_test_base
{
public:
Expand Down Expand Up @@ -55,6 +58,13 @@ class duplication_test_base : public replica_test_base
EXPECT_EQ(err, error_s::ok());
return log_file_map;
}

mutation_ptr create_test_mutation(int64_t decree, string_view data) override
{
auto mut = replica_test_base::create_test_mutation(decree, data);
mut->data.updates[0].code = RPC_DUPLICATION_IDEMPOTENT_WRITE; // must be idempotent write
return mut;
}
};

} // namespace replication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
namespace dsn {
namespace replication {

DEFINE_TASK_CODE_RPC(RPC_RRDB_RRDB_PUT, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT);
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_PUT, ALLOW_BATCH, IS_IDEMPOTENT)

class load_from_private_log_test : public duplication_test_base
{
Expand Down
16 changes: 14 additions & 2 deletions src/dist/replication/lib/duplication/test/mutation_batch_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
* THE SOFTWARE.
*/

#include "dist/replication/test/replica_test/unit_test/replica_test_base.h"
#include "duplication_test_base.h"
#include "dist/replication/lib/duplication/mutation_batch.h"

namespace dsn {
namespace replication {

class mutation_batch_test : public replica_test_base
class mutation_batch_test : public duplication_test_base
{
public:
};

TEST_F(mutation_batch_test, add_mutation_if_valid)
Expand Down Expand Up @@ -59,5 +60,16 @@ TEST_F(mutation_batch_test, add_mutation_if_valid)
ASSERT_EQ(result.size(), 2);
}

TEST_F(mutation_batch_test, ignore_non_idempotent_write)
{
mutation_tuple_set result;

std::string s = "hello";
mutation_ptr mu = create_test_mutation(1, s);
mu->data.updates[0].code = RPC_DUPLICATION_NON_IDEMPOTENT_WRITE;
add_mutation_if_valid(mu, result, 0);
ASSERT_EQ(result.size(), 0);
}

} // namespace replication
} // namespace dsn
7 changes: 6 additions & 1 deletion src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ replica::replica(
_cur_download_size(0),
_restore_progress(0),
_restore_status(ERR_OK),
_duplication_mgr(new replica_duplicator_manager(this))
_duplication_mgr(new replica_duplicator_manager(this)),
_duplicating(app.duplicating)
{
dassert(_app_info.app_type != "", "");
dassert(stub != nullptr, "");
Expand All @@ -79,6 +80,10 @@ replica::replica(
_counter_recent_write_throttling_reject_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

counter_str = fmt::format("dup.disabled_non_idempotent_write_count@{}", _app_info.app_name);
_counter_dup_disabled_non_idempotent_write_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

// init table level latency perf counters
init_table_level_latency_counters();

Expand Down
4 changes: 3 additions & 1 deletion src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// Duplication
//
replica_duplicator_manager *get_duplication_manager() const { return _duplication_mgr.get(); }
bool is_duplicating() const { return _app_info.duplicating; }
bool is_duplicating() const { return _duplicating; }

void update_last_checkpoint_generate_time();

Expand Down Expand Up @@ -495,6 +495,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

// duplication
std::unique_ptr<replica_duplicator_manager> _duplication_mgr;
bool _duplicating{false};

// partition split
// _child_gpid = gpid({app_id},{pidx}+{old_partition_count}) for parent partition
Expand All @@ -512,6 +513,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
perf_counter_wrapper _counter_recent_write_throttling_delay_count;
perf_counter_wrapper _counter_recent_write_throttling_reject_count;
std::vector<perf_counter *> _counters_table_level_latency;
perf_counter_wrapper _counter_dup_disabled_non_idempotent_write_count;
perf_counter_wrapper _counter_backup_request_qps;

dsn::task_tracker _tracker;
Expand Down
5 changes: 4 additions & 1 deletion src/dist/replication/lib/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
}

task_spec *spec = task_spec::get(request->rpc_code());
if (!_options->allow_non_idempotent_write && !spec->rpc_request_is_write_idempotent) {
if (is_duplicating() && !spec->rpc_request_is_write_idempotent) {
// Ignore non-idempotent write, because duplication provides no guarantee of atomicity to
// make this write produce the same result on multiple clusters.
_counter_dup_disabled_non_idempotent_write_count->increment();
response_client_write(request, ERR_OPERATION_DISABLED);
return;
}
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,7 @@ void replica::on_config_sync(const app_info &info, const partition_configuration
return;

update_app_envs(info.envs);
_duplicating = info.duplicating;

if (status() == partition_status::PS_PRIMARY ||
nullptr != _primary_states.reconfiguration_task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct replica_test_base : replica_stub_test_base

replica_test_base() { _replica = create_mock_replica(stub.get(), 1, 1, _log_dir.c_str()); }

mutation_ptr create_test_mutation(int64_t decree, string_view data)
virtual mutation_ptr create_test_mutation(int64_t decree, string_view data)
{
mutation_ptr mu(new mutation());
mu->data.header.ballot = 1;
Expand Down

0 comments on commit 0c3fbcf

Please sign in to comment.