Skip to content

Commit

Permalink
fix: bugfix on incr (#608)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored Sep 25, 2020
1 parent 49b85c2 commit 95ac849
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 48 deletions.
98 changes: 50 additions & 48 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,59 +182,61 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
uint32_t new_expire_ts = 0;
db_get_context get_ctx;
int err = db_get(raw_key, &get_ctx);
if (err == 0) {
if (!get_ctx.found) {
// old value is not found, set to 0 before increment
new_value = update.increment;
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else if (get_ctx.expired) {
// ttl timeout, set to 0 before increment
_pfc_recent_expire_count->increment();
if (err != 0) {
resp.error = err;
return err;
}
if (!get_ctx.found) {
// old value is not found, set to 0 before increment
new_value = update.increment;
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else if (get_ctx.expired) {
// ttl timeout, set to 0 before increment
_pfc_recent_expire_count->increment();
new_value = update.increment;
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else {
::dsn::blob old_value;
pegasus_extract_user_data(
_pegasus_data_version, std::move(get_ctx.raw_value), old_value);
if (old_value.length() == 0) {
// empty old value, set to 0 before increment
new_value = update.increment;
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else {
::dsn::blob old_value;
pegasus_extract_user_data(
_pegasus_data_version, std::move(get_ctx.raw_value), old_value);
if (old_value.length() == 0) {
// empty old value, set to 0 before increment
new_value = update.increment;
} else {
int64_t old_value_int;
if (!dsn::buf2int64(old_value, old_value_int)) {
// invalid old value
derror_replica("incr failed: decree = {}, error = "
"old value \"{}\" is not an integer or out of range",
decree,
utils::c_escape_string(old_value));
resp.error = rocksdb::Status::kInvalidArgument;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
new_value = old_value_int + update.increment;
if ((update.increment > 0 && new_value < old_value_int) ||
(update.increment < 0 && new_value > old_value_int)) {
// new value is out of range, return old value by 'new_value'
derror_replica("incr failed: decree = {}, error = "
"new value is out of range, old_value = {}, increment = {}",
decree,
old_value_int,
update.increment);
resp.error = rocksdb::Status::kInvalidArgument;
resp.new_value = old_value_int;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
int64_t old_value_int;
if (!dsn::buf2int64(old_value, old_value_int)) {
// invalid old value
derror_replica("incr failed: decree = {}, error = "
"old value \"{}\" is not an integer or out of range",
decree,
utils::c_escape_string(old_value));
resp.error = rocksdb::Status::kInvalidArgument;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
// set new ttl
if (update.expire_ts_seconds == 0) {
new_expire_ts = get_ctx.expire_ts;
} else if (update.expire_ts_seconds < 0) {
new_expire_ts = 0;
} else { // update.expire_ts_seconds > 0
new_expire_ts = update.expire_ts_seconds;
new_value = old_value_int + update.increment;
if ((update.increment > 0 && new_value < old_value_int) ||
(update.increment < 0 && new_value > old_value_int)) {
// new value is out of range, return old value by 'new_value'
derror_replica("incr failed: decree = {}, error = "
"new value is out of range, old_value = {}, increment = {}",
decree,
old_value_int,
update.increment);
resp.error = rocksdb::Status::kInvalidArgument;
resp.new_value = old_value_int;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
}
// set new ttl
if (update.expire_ts_seconds == 0) {
new_expire_ts = get_ctx.expire_ts;
} else if (update.expire_ts_seconds < 0) {
new_expire_ts = 0;
} else { // update.expire_ts_seconds > 0
new_expire_ts = update.expire_ts_seconds;
}
}

resp.error =
Expand Down
26 changes: 26 additions & 0 deletions src/server/test/pegasus_write_service_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,5 +217,31 @@ TEST_F(incr_test, invalid_incr)
ASSERT_EQ(resp.new_value, 100);
}

TEST_F(incr_test, fail_on_get)
{
dsn::fail::setup();
dsn::fail::cfg("db_get", "100%1*return()");
// when db_get failed, incr should return an error.

req.increment = 10;
_write_impl->incr(1, req, resp);
ASSERT_EQ(resp.error, FAIL_DB_GET);

dsn::fail::teardown();
}

TEST_F(incr_test, fail_on_put)
{
dsn::fail::setup();
dsn::fail::cfg("db_write_batch_put", "100%1*return()");
// when rocksdb put failed, incr should return an error.

req.increment = 10;
_write_impl->incr(1, req, resp);
ASSERT_EQ(resp.error, FAIL_DB_WRITE_BATCH_PUT);

dsn::fail::teardown();
}

} // namespace server
} // namespace pegasus

0 comments on commit 95ac849

Please sign in to comment.