diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index 4636b0edee..e729437185 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -37,7 +37,7 @@ jobs: needs: lint runs-on: self-hosted container: - image: apachepegasus/ci-env + image: registry.cn-beijing.aliyuncs.com/apachepegasus/thirdparties-bin:ubuntu1804 env: CCACHE_DIR: /tmp/ccache/pegasus CCACHE_MAXSIZE: 10G @@ -45,14 +45,19 @@ jobs: # Place ccache compilation intermediate results in host memory, that's shared among containers. - /tmp/ccache/pegasus:/tmp/ccache/pegasus # Read docs at https://docs.docker.com/storage/tmpfs/ for more details of using tmpfs in docker. - options: --mount type=tmpfs,destination=/tmp/pegasus,tmpfs-size=10737418240 --cap-add=SYS_PTRACE + options: --mount type=tmpfs,destination=/tmp/pegasus --cap-add=SYS_PTRACE + defaults: + run: + shell: bash + working-directory: /root/rdsn steps: - - uses: actions/checkout@v2 - with: - fetch-depth: 1 + - name: Clone rdsn source + working-directory: /root + run: | + git clone --depth=1 https://hub.fastgit.org/XiaoMi/rdsn.git - name: Unpack prebuilt third-parties if: contains(github.event.pull_request.labels.*.name, 'thirdparty-modified') == false - run: unzip /root/pegasus-thirdparty-output.zip -d ./thirdparty + run: unzip /root/thirdparties-bin.zip -d ./thirdparty - name: Rebuild third-parties if: contains(github.event.pull_request.labels.*.name, 'thirdparty-modified') working-directory: thirdparty @@ -63,4 +68,6 @@ jobs: - name: Compilation run: ./run.sh build -c --skip_thirdparty - name: Unit Testing - run: ./run.sh test --skip_thirdparty + run: | + export LD_LIBRARY_PATH=/root/rdsn/thirdparty/output/lib:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server + ./run.sh test --skip_thirdparty diff --git a/src/replica/replica.h b/src/replica/replica.h index 5886f7ee6b..f79d14fcdd 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -287,6 +287,15 @@ class replica : public serverlet, public ref_counter, public replica_ba void notify_learn_completion(); error_code apply_learned_state_from_private_log(learn_state &state); + // Prepares in-memory mutations for the replica's learning. + // Returns false if there's no delta data in cache (aka prepare-list). + bool prepare_cached_learn_state(const learn_request &request, + decree learn_start_decree, + decree local_committed_decree, + /*out*/ remote_learner_state &learner_state, + /*out*/ learn_response &response, + /*out*/ bool &delayed_replay_prepare_list); + // Gets the position where this round of the learning process should begin. // This method is called on primary-side. // TODO(wutao1): mark it const diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp index ca8c79f544..23dceb27d5 100644 --- a/src/replica/replica_learn.cpp +++ b/src/replica/replica_learn.cpp @@ -437,62 +437,14 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request) response.last_committed_decree = local_committed_decree; response.err = ERR_OK; - // set prepare_start_decree when to-be-learn state is covered by prepare list, - // note min_decree can be NOT present in prepare list when list.count == 0 - if (learn_start_decree > _prepare_list->min_decree() || - (learn_start_decree == _prepare_list->min_decree() && _prepare_list->count() > 0)) { - if (learner_state.prepare_start_decree == invalid_decree) { - // start from (last_committed_decree + 1) - learner_state.prepare_start_decree = local_committed_decree + 1; - - cleanup_preparing_mutations(false); - - // the replayed prepare msg needs to be AFTER the learning response msg - // to reduce probability that preparing messages arrive remote early than - // learning response msg. - delayed_replay_prepare_list = true; - - ddebug("%s: on_learn[%016" PRIx64 - "]: learner = %s, set prepare_start_decree = %" PRId64, - name(), - request.signature, - request.learner.to_string(), - local_committed_decree + 1); - } - - response.prepare_start_decree = learner_state.prepare_start_decree; - } else { - learner_state.prepare_start_decree = invalid_decree; - } - - // only learn mutation cache in range of [learn_start_decree, prepare_start_decree), - // in this case, the state on the PS should be contiguous (+ to-be-sent prepare list) - if (response.prepare_start_decree != invalid_decree) { - binary_writer writer; - int count = 0; - for (decree d = learn_start_decree; d < response.prepare_start_decree; d++) { - auto mu = _prepare_list->get_mutation_by_decree(d); - dassert(mu != nullptr, "mutation must not be nullptr, decree = %" PRId64 "", d); - mu->write_to(writer, nullptr); - count++; - } - response.type = learn_type::LT_CACHE; - response.state.meta = writer.get_buffer(); - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn mutation cache succeed, " - "learn_start_decree = %" PRId64 ", prepare_start_decree = %" PRId64 ", " - "learn_mutation_count = %d, learn_data_size = %d", - name(), - request.signature, - request.learner.to_string(), - learn_start_decree, - response.prepare_start_decree, - count, - response.state.meta.length()); - } - // learn delta state or checkpoint - // in this case, the state on the PS is still incomplete - else { + bool should_learn_cache = prepare_cached_learn_state(request, + learn_start_decree, + local_committed_decree, + learner_state, + response, + delayed_replay_prepare_list); + if (!should_learn_cache) { if (learn_start_decree > _app->last_durable_decree()) { ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " "because learn_start_decree(%" PRId64 ") > _app->last_durable_decree(%" PRId64 @@ -1004,6 +956,69 @@ void replica::on_learn_reply(error_code err, learn_request &&req, learn_response } } +bool replica::prepare_cached_learn_state(const learn_request &request, + decree learn_start_decree, + decree local_committed_decree, + /*out*/ remote_learner_state &learner_state, + /*out*/ learn_response &response, + /*out*/ bool &delayed_replay_prepare_list) +{ + // set prepare_start_decree when to-be-learn state is covered by prepare list, + // note min_decree can be NOT present in prepare list when list.count == 0 + if (learn_start_decree > _prepare_list->min_decree() || + (learn_start_decree == _prepare_list->min_decree() && _prepare_list->count() > 0)) { + if (learner_state.prepare_start_decree == invalid_decree) { + // start from (last_committed_decree + 1) + learner_state.prepare_start_decree = local_committed_decree + 1; + + cleanup_preparing_mutations(false); + + // the replayed prepare msg needs to be AFTER the learning response msg + // to reduce probability that preparing messages arrive remote early than + // learning response msg. + delayed_replay_prepare_list = true; + + ddebug("%s: on_learn[%016" PRIx64 + "]: learner = %s, set prepare_start_decree = %" PRId64, + name(), + request.signature, + request.learner.to_string(), + local_committed_decree + 1); + } + + response.prepare_start_decree = learner_state.prepare_start_decree; + } else { + learner_state.prepare_start_decree = invalid_decree; + } + + // only learn mutation cache in range of [learn_start_decree, prepare_start_decree), + // in this case, the state on the PS should be contiguous (+ to-be-sent prepare list) + if (response.prepare_start_decree != invalid_decree) { + binary_writer writer; + int count = 0; + for (decree d = learn_start_decree; d < response.prepare_start_decree; d++) { + auto mu = _prepare_list->get_mutation_by_decree(d); + dassert(mu != nullptr, "mutation must not be nullptr, decree = %" PRId64 "", d); + mu->write_to(writer, nullptr); + count++; + } + response.type = learn_type::LT_CACHE; + response.state.meta = writer.get_buffer(); + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn mutation cache succeed, " + "learn_start_decree = %" PRId64 ", prepare_start_decree = %" PRId64 ", " + "learn_mutation_count = %d, learn_data_size = %d", + name(), + request.signature, + request.learner.to_string(), + learn_start_decree, + response.prepare_start_decree, + count, + response.state.meta.length()); + return true; + } + return false; +} + void replica::on_copy_remote_state_completed(error_code err, size_t size, uint64_t copy_start_time,