Skip to content

Commit

Permalink
feat(dup): implement server handling of duplicate rpc (part 1) (#456)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and hycdong committed Jan 13, 2020
1 parent 0268530 commit c5dec62
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 41 deletions.
10 changes: 8 additions & 2 deletions src/base/pegasus_value_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ namespace pegasus {

#define PEGASUS_DATA_VERSION_MAX 0u

/// Generates timetag in host endian.
inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool delete_tag)
{
return timestamp << 8u | cluster_id << 1u | delete_tag;
}

/// Extracts expire_ts from rocksdb value with given version.
/// The value schema must be in v0.
/// \return expire_ts in host endian
Expand All @@ -46,7 +52,7 @@ pegasus_extract_user_data(uint32_t version, std::string &&raw_value, ::dsn::blob
version,
PEGASUS_DATA_VERSION_MAX);

std::string *s = new std::string(std::move(raw_value));
auto *s = new std::string(std::move(raw_value));
dsn::data_input input(*s);
input.skip(sizeof(uint32_t));
dsn::string_view view = input.read_str();
Expand Down Expand Up @@ -127,7 +133,7 @@ class pegasus_value_generator
_write_slices.emplace_back(user_data.data(), user_data.length());
}

return rocksdb::SliceParts(&_write_slices[0], static_cast<int>(_write_slices.size()));
return {&_write_slices[0], static_cast<int>(_write_slices.size())};
}

private:
Expand Down
54 changes: 54 additions & 0 deletions src/base/rrdb_types.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,17 @@ struct duplicate_request

// ID of the cluster where this write comes from.
4: optional byte cluster_id

// Whether to compare the timetag of old value with the new write's.
5: optional bool verify_timetag
}

struct duplicate_response
{
1: optional i32 error;

// hints on the reason why this duplicate failed.
2: optional string error_hint;
}

service rrdb
Expand Down
28 changes: 24 additions & 4 deletions src/include/rrdb/rrdb_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/server/pegasus_mutation_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
err == dsn::ERR_OK ? _client->get_error_string(perr) : err.to_string(),
rpc.request().timestamp);
}
// duplicating an illegal write to server is unacceptable, fail fast.
dassert_replica(perr != PERR_INVALID_ARGUMENT, rpc.response().error_hint);
} else {
_shipped_ops->increment();
_total_shipped_size +=
Expand Down
Loading

0 comments on commit c5dec62

Please sign in to comment.