Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split-client): c++ client split related implementation #798

Merged
merged 4 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/dsn/utility/error_code.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,5 @@ DEFINE_ERR_CODE(ERR_SASL_INCOMPLETE)
DEFINE_ERR_CODE(ERR_ACL_DENY)
DEFINE_ERR_CODE(ERR_SPLITTING)
DEFINE_ERR_CODE(ERR_PARENT_PARTITION_MISUSED)
DEFINE_ERR_CODE(ERR_CHILD_NOT_READY)
} // namespace dsn
10 changes: 5 additions & 5 deletions src/client/partition_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ void partition_resolver::call_task(const rpc_response_task_ptr &t)
// update gpid when necessary
auto &hdr = *(t->get_request()->header);
if (hdr.gpid.value() != result.pid.value()) {
dassert(hdr.gpid.value() == 0, "inconsistent gpid");
hdr.gpid = result.pid;

// update thread hash if not assigned by applications
if (hdr.client.thread_hash == 0) {
if (hdr.client.thread_hash == 0 // thread_hash is not assigned by applications
||
hdr.gpid.value() != 0 // requests set to child redirect to parent
) {
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
hdr.client.thread_hash = result.pid.thread_hash();
}
hdr.gpid = result.pid;
}
dsn_rpc_call(result.address, t.get());
},
Expand Down
61 changes: 40 additions & 21 deletions src/client/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* THE SOFTWARE.
*/

#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/utils.h>
#include <dsn/utility/rand.h>
#include <dsn/tool-api/async_calls.h>
Expand All @@ -48,7 +49,13 @@ void partition_resolver_simple::resolve(uint64_t partition_hash,
if (_app_partition_count != -1) {
idx = get_partition_index(_app_partition_count, partition_hash);
rpc_address target;
if (ERR_OK == get_address(idx, target)) {
auto err = get_address(idx, target);
if (dsn_unlikely(err == ERR_CHILD_NOT_READY)) {
// child partition is not ready, its requests should be sent to parent partition
idx -= _app_partition_count / 2;
err = get_address(idx, target);
}
if (dsn_likely(err == ERR_OK)) {
callback(resolve_result{ERR_OK, target, {_app_id, idx}});
return;
}
Expand All @@ -68,28 +75,34 @@ void partition_resolver_simple::resolve(uint64_t partition_hash,

void partition_resolver_simple::on_access_failure(int partition_index, error_code err)
{
if (-1 != partition_index &&
err != ERR_CAPACITY_EXCEEDED // no need for reconfiguration on primary
&&
err != ERR_NOT_ENOUGH_MEMBER // primary won't change and we only r/w on primary in this
if (-1 == partition_index ||
err == ERR_CAPACITY_EXCEEDED // no need for reconfiguration on primary
||
err == ERR_NOT_ENOUGH_MEMBER // primary won't change and we only r/w on primary in this
// provider
&&
err != ERR_OPERATION_DISABLED // operation disabled
&&
err != ERR_BUSY // busy (rpc busy or throttling busy)
||
err == ERR_OPERATION_DISABLED // operation disabled
||
err == ERR_BUSY // busy (rpc busy or throttling busy)
||
err == ERR_SPLITTING // partition is splitting, reject read and write
) {
ddebug("clear partition configuration cache %d.%d due to access failure %s",
_app_id,
partition_index,
err.to_string());
return;
}

{
zauto_write_lock l(_config_lock);
auto it = _config_cache.find(partition_index);
if (it != _config_cache.end()) {
_config_cache.erase(it);
}
}
zauto_write_lock l(_config_lock);
if (err == ERR_PARENT_PARTITION_MISUSED) {
ddebug_f("clear all partition configuration cache due to access failure {} at {}.{}",
err,
_app_id,
partition_index);
_app_partition_count = -1;
} else {
ddebug_f("clear partition configuration cache {}.{} due to access failure {}",
_app_id,
partition_index,
err);
_config_cache.erase(partition_index);
}
}

Expand Down Expand Up @@ -272,7 +285,9 @@ void partition_resolver_simple::query_config_reply(error_code err,
_app_id,
resp.app_id);
}
if (_app_partition_count != -1 && _app_partition_count != resp.partition_count) {
if (_app_partition_count != -1 && _app_partition_count != resp.partition_count &&
_app_partition_count * 2 != resp.partition_count &&
_app_partition_count != resp.partition_count * 2) {
dassert(false,
"partition count is changed (mostly the app was removed and created with "
"the same name), local Vs remote: %u vs %u ",
Expand Down Expand Up @@ -429,6 +444,10 @@ error_code partition_resolver_simple::get_address(int partition_index, /*out*/ r
auto it = _config_cache.find(partition_index);
if (it != _config_cache.end()) {
// config = it->second->config;
if (it->second->config.ballot < 0) {
// client query config for splitting app, child partition is not ready
return ERR_CHILD_NOT_READY;
}
addr = get_address(it->second->config);
if (addr.is_invalid()) {
return ERR_IO_PENDING;
Expand Down