Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split): child notify parent catch up #390

Merged
merged 29 commits into from
Mar 11, 2020

Conversation

hycdong
Copy link
Contributor

@hycdong hycdong commented Feb 10, 2020

Simple partition split process

  1. meta receives client partition split request, and change partition count split: meta start partition split #286
  2. replica notices partition count changed during on_config_sync
  3. parent partition create child partition split: parent replica create child replica #291
  4. parent prepare states for child to learn feat(split): parent replica prepare states #299
  5. child partition async learn states from parent feat(split): child replica learn parent prepare list and checkpoint #309 feat(split): child replica apply private logs, in-memory mutations and catch up parent #319
  6. child notify parent catch up
  7. meta server register child partitions
  8. child partition active, and parent recover read and write

More partition split discussion in issue #69 and partition split design doc
This pr solves the part of fifth step of partition split, which is bold in process description.

What this pr solved

  • child_notify_catch_up: child partition sent notify_catch_up_request to primary parent
  • parent_handle_child_catch_up: primary parent handle child's catch_up_request, if all child partitions caught up, parent will copy mutation to child synchronously.
  • sync_point: sync_point is the first decree after parent send write request to child synchronously. When sync_point committed, parent consider child has learned all data, and primary parent will update partition_count of child replica group (not in this pr, but in feat(split): update group partition count #392)

@hycdong hycdong changed the title feat(split): child notify parent catch up [WIP] feat(split): child notify parent catch up Feb 20, 2020
@hycdong hycdong marked this pull request as ready for review February 20, 2020 06:14
Copy link
Contributor

@neverchanje neverchanje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest creating a separate class for this function to reduce the complexity of replica. Other functions in replica_split.cpp likewise.

Of course, I'm just giving a suggestion here, because it may need a great time to refactor.

class child_notify_caught_up : public pipeline::when<>
{
public:
  void run() { // aka  replica::child_notify_catch_up
    notify_catch_up_rpc rpc;
    rpc.call([this] {
      if(ec==ERR_TIMEOUT) {
        // inherited from pipeline::when, this is a replacement for tasking::enqueue-like retry
        // so that you can get rid of threadpool, tracker, task_code...
        repeat(1_s);
        return;
      }
      ...
    });
  }
}

So how to execute child_caught_up_notifier::run() (child_notify_catch_up)?

class partition_split_executor : public pipeline::base {
public:
  explicit partition_split_executor(replica* r) {
    // set up the executing environment
    thread_pool(LPC_PARTITION_SPLIT).task_tracker(&tracker).from(&child_notify_caught_up);
  }
private:
  child_notify_caught_up _child_notify_caught_up;
};

// executing child_notify_caught_up();
partition_split_executor.run_pipeline();

As you can see, pipeline::base is designed for a more complicated situation, more specifically,
it's designed for multiple stages pipeline. A single function does not worth this refactoring.

Perhaps unconsciously, the split procedure is already modeled into multiple logical stages, each failed stage will retry(pipeline::repeat) or cleanup. It's natural to turn this execution logic into a coded pipeline.

// on_add_child(group_check)
// -> LPC_CREATE_CHILD: create_child_replica
// -> LPC_PARTITION_SPLIT: child_init_replica
// -> LPC_PARTITION_SPLIT: parent_prepare_states (repeat if fail)
// -> LPC_PARTITION_SPLIT: child_copy_prepare_list
// -> LPC_PARTITION_SPLIT_ASYNC_LEARN: child_learn_states
// -> LPC_PARTITION_SPLIT: child_catch_up_states
//   -->> LPC_CATCHUP_WITH_PRIVATE_LOGS: catch_up_with_private_logs
// -> LPC_PARTITION_SPLIT: child_notify_catch_up + RPC_SPLIT_NOTIFY_CATCH_UP
// -> LPC_PARTITION_SPLIT: parent_check_sync_point_commit
// -> ...
class partition_split_executor : public pipeline::base {
public:
  explicit partition_split_executor(replica* parent) {
    // set up the executing environment
    task_tracker(&_tracker);
  }
  void start_split() {
    thread_pool(LPC_CREATE_CHILD)
      .from(_create_child_replica) // initial stage
      .link(_child_init_replica, LPC_PARTITION_SPLIT, _child->thread_hash())
      .link(_parent_prepare_states, LPC_PARTITION_SPLIT, _parent->thread_hash())
      .link(_child_copy_prepare_list, LPC_PARTITION_SPLIT, _child->thread_hash())
      .link(_child_learn_states, LPC_PARTITION_SPLIT_ASYNC_LEARN)
      .link(_child_catch_up_states, LPC_PARTITION_SPLIT, _child->thread_hash());
      .link(_child_notify_catch_up) // LPC_PARTITION_SPLIT, _child->thread_hash()
      .link(_parent_check_sync_point_commit); // end stage

    // a stage that is forked from main pipeline, since it's not the
    // direct edge from `child_catch_up_states` to `_child_notify_catch_up`.
    fork(_catch_up_with_private_logs, 
         LPC_CATCHUP_WITH_PRIVATE_LOGS).link(_child_notify_catch_up);
  }
private:
  task_tracker _tracker;

  replica* _parent;
  replica* _child;
  split_states *_split_states;
  replica_stub *_stub;
};

// `when` and `result` mean when receives the empty argument
// this stage gives empty result to the next stage.
class child_catch_up_states : public pipeline::when<>, pipeline::result<>, {
public:
  void run() {
    if(...) {
      // -> `_child_notify_catch_up`
      step_down_next_stage();
    } else {
      _catch_up_with_private_logs->async();
    }
  }
};

The advantage of the pipeline design is decoupling the new logic from replica, to make it merely a "data class". pipeline is the organization of codes. So people encounter problems with split can quickly understand the entire execution logic. Duplication uses this technique too. Very few additional codes in replica serve for duplication.

I really hope we can at least consider this refactoring. We can have a discussion if you need.

src/dist/replication/replication.thrift Outdated Show resolved Hide resolved
src/dist/replication/lib/replica_split.cpp Outdated Show resolved Hide resolved
src/dist/replication/lib/replica_context.h Outdated Show resolved Hide resolved
src/dist/replication/lib/replica_context.h Outdated Show resolved Hide resolved
src/dist/replication/lib/replica_context.h Outdated Show resolved Hide resolved
src/dist/replication/lib/replica_context.h Outdated Show resolved Hide resolved
src/dist/replication/lib/replica_split.cpp Show resolved Hide resolved
src/dist/replication/lib/replica_split.cpp Outdated Show resolved Hide resolved
@hycdong
Copy link
Contributor Author

hycdong commented Mar 10, 2020

I suggest creating a separate class for this function to reduce the complexity of replica. Other functions in replica_split.cpp likewise.

Of course, I'm just giving a suggestion here, because it may need a great time to refactor.

class child_notify_caught_up : public pipeline::when<>
{
public:
  void run() { // aka  replica::child_notify_catch_up
    notify_catch_up_rpc rpc;
    rpc.call([this] {
      if(ec==ERR_TIMEOUT) {
        // inherited from pipeline::when, this is a replacement for tasking::enqueue-like retry
        // so that you can get rid of threadpool, tracker, task_code...
        repeat(1_s);
        return;
      }
      ...
    });
  }
}

So how to execute child_caught_up_notifier::run() (child_notify_catch_up)?

class partition_split_executor : public pipeline::base {
public:
  explicit partition_split_executor(replica* r) {
    // set up the executing environment
    thread_pool(LPC_PARTITION_SPLIT).task_tracker(&tracker).from(&child_notify_caught_up);
  }
private:
  child_notify_caught_up _child_notify_caught_up;
};

// executing child_notify_caught_up();
partition_split_executor.run_pipeline();

As you can see, pipeline::base is designed for a more complicated situation, more specifically,
it's designed for multiple stages pipeline. A single function does not worth this refactoring.

Perhaps unconsciously, the split procedure is already modeled into multiple logical stages, each failed stage will retry(pipeline::repeat) or cleanup. It's natural to turn this execution logic into a coded pipeline.

// on_add_child(group_check)
// -> LPC_CREATE_CHILD: create_child_replica
// -> LPC_PARTITION_SPLIT: child_init_replica
// -> LPC_PARTITION_SPLIT: parent_prepare_states (repeat if fail)
// -> LPC_PARTITION_SPLIT: child_copy_prepare_list
// -> LPC_PARTITION_SPLIT_ASYNC_LEARN: child_learn_states
// -> LPC_PARTITION_SPLIT: child_catch_up_states
//   -->> LPC_CATCHUP_WITH_PRIVATE_LOGS: catch_up_with_private_logs
// -> LPC_PARTITION_SPLIT: child_notify_catch_up + RPC_SPLIT_NOTIFY_CATCH_UP
// -> LPC_PARTITION_SPLIT: parent_check_sync_point_commit
// -> ...
class partition_split_executor : public pipeline::base {
public:
  explicit partition_split_executor(replica* parent) {
    // set up the executing environment
    task_tracker(&_tracker);
  }
  void start_split() {
    thread_pool(LPC_CREATE_CHILD)
      .from(_create_child_replica) // initial stage
      .link(_child_init_replica, LPC_PARTITION_SPLIT, _child->thread_hash())
      .link(_parent_prepare_states, LPC_PARTITION_SPLIT, _parent->thread_hash())
      .link(_child_copy_prepare_list, LPC_PARTITION_SPLIT, _child->thread_hash())
      .link(_child_learn_states, LPC_PARTITION_SPLIT_ASYNC_LEARN)
      .link(_child_catch_up_states, LPC_PARTITION_SPLIT, _child->thread_hash());
      .link(_child_notify_catch_up) // LPC_PARTITION_SPLIT, _child->thread_hash()
      .link(_parent_check_sync_point_commit); // end stage

    // a stage that is forked from main pipeline, since it's not the
    // direct edge from `child_catch_up_states` to `_child_notify_catch_up`.
    fork(_catch_up_with_private_logs, 
         LPC_CATCHUP_WITH_PRIVATE_LOGS).link(_child_notify_catch_up);
  }
private:
  task_tracker _tracker;

  replica* _parent;
  replica* _child;
  split_states *_split_states;
  replica_stub *_stub;
};

// `when` and `result` mean when receives the empty argument
// this stage gives empty result to the next stage.
class child_catch_up_states : public pipeline::when<>, pipeline::result<>, {
public:
  void run() {
    if(...) {
      // -> `_child_notify_catch_up`
      step_down_next_stage();
    } else {
      _catch_up_with_private_logs->async();
    }
  }
};

The advantage of the pipeline design is decoupling the new logic from replica, to make it merely a "data class". pipeline is the organization of codes. So people encounter problems with split can quickly understand the entire execution logic. Duplication uses this technique too. Very few additional codes in replica serve for duplication.

I really hope we can at least consider this refactoring. We can have a discussion if you need.

I will consider your refactoring suggestion after merging all split core codes. For the one hand, this refactor is a big project. For the other hand, I am not familiar with pipeline logic and not sure whether it is suitable to be used for split. I will learn this logic during merging remainder split code and discuss with you.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants