diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 3ff7c9d5239..97cf436339e 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -254,18 +254,6 @@ jobs: volumes: - /tmp/ccache/nebula/${{ matrix.os }}-${{ matrix.compiler }}:/tmp/ccache/nebula/${{ matrix.os }}-${{ matrix.compiler }} options: --cap-add=SYS_PTRACE - services: - elasticsearch: - image: elasticsearch:7.17.7 - ports: - - 9200:9200 - env: - discovery.type: single-node - options: >- - --health-cmd "curl elasticsearch:9200" - --health-interval 10s - --health-timeout 5s - --health-retries 5 steps: - uses: webiny/action-post-run@2.0.1 with: diff --git a/src/kvstore/plugins/elasticsearch/ESListener.cpp b/src/kvstore/plugins/elasticsearch/ESListener.cpp index 4ea02a4e78b..e770269f5ed 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.cpp +++ b/src/kvstore/plugins/elasticsearch/ESListener.cpp @@ -221,8 +221,9 @@ void ESListener::processLogs() { } LogID lastApplyId = -1; - // the kv pair which can sync to remote safely - std::vector data; + // // the kv pair which can sync to remote safely + // std::vector data; + BatchHolder batch; while (iter->valid()) { lastApplyId = iter->logId(); @@ -238,28 +239,52 @@ void ESListener::processLogs() { case OP_PUT: { auto pieces = decodeMultiValues(log); DCHECK_EQ(2, pieces.size()); - data.emplace_back(pieces[0], pieces[1]); + batch.put(pieces[0].toString(), pieces[1].toString()); break; } case OP_MULTI_PUT: { auto kvs = decodeMultiValues(log); DCHECK_EQ(0, kvs.size() % 2); for (size_t i = 0; i < kvs.size(); i += 2) { - data.emplace_back(kvs[i], kvs[i + 1]); + batch.put(kvs[i].toString(), kvs[i + 1].toString()); } break; } - case OP_REMOVE: - case OP_REMOVE_RANGE: + case OP_REMOVE: { + auto key = decodeSingleValue(log); + batch.remove(key.toString()); + break; + } + case OP_REMOVE_RANGE: { + auto kvs = decodeMultiValues(log); + DCHECK_EQ(2, kvs.size()); + batch.rangeRemove(kvs[0].toString(), kvs[1].toString()); + break; + } case OP_MULTI_REMOVE: { + auto keys = decodeMultiValues(log); + for (auto key : keys) { + batch.remove(key.toString()); + } break; } case OP_BATCH_WRITE: { - auto batch = decodeBatchValue(log); - for (auto& op : batch) { + auto batchData = decodeBatchValue(log); + for (auto& op : batchData) { // OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored - if (op.first == BatchLogType::OP_BATCH_PUT) { - data.emplace_back(op.second.first, op.second.second); + switch (op.first) { + case BatchLogType::OP_BATCH_PUT: { + batch.put(op.second.first.toString(), op.second.second.toString()); + break; + } + case BatchLogType::OP_BATCH_REMOVE: { + batch.remove(op.second.first.toString()); + break; + } + case BatchLogType::OP_BATCH_REMOVE_RANGE: { + batch.rangeRemove(op.second.first.toString(), op.second.second.toString()); + break; + } } } break; @@ -275,13 +300,14 @@ void ESListener::processLogs() { } } - if (static_cast(data.size()) > FLAGS_listener_commit_batch_size) { + if (static_cast(batch.size()) > FLAGS_listener_commit_batch_size) { break; } ++(*iter); } + // apply to state machine - if (lastApplyId != -1 && apply(data)) { + if (lastApplyId != -1 && apply(batch)) { std::lock_guard guard(raftLock_); lastApplyLogId_ = lastApplyId; persist(committedLogId_, term_, lastApplyLogId_); @@ -298,15 +324,14 @@ std::tuple ESListener::commitSnapshot VLOG(2) << idStr_ << "Listener is committing snapshot."; int64_t count = 0; int64_t size = 0; - std::vector data; - data.reserve(rows.size()); + BatchHolder batch; for (const auto& row : rows) { count++; size += row.size(); auto kv = decodeKV(row); - data.emplace_back(kv.first, kv.second); + batch.put(kv.first.toString(), kv.second.toString()); } - if (!apply(data)) { + if (!apply(batch)) { LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot."; return { nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED, kNoSnapshotCount, kNoSnapshotSize}; diff --git a/src/kvstore/plugins/elasticsearch/ESListener.h b/src/kvstore/plugins/elasticsearch/ESListener.h index f9e2780cd76..c53ff57251e 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.h +++ b/src/kvstore/plugins/elasticsearch/ESListener.h @@ -55,7 +55,6 @@ class ESListener : public Listener { */ bool apply(const BatchHolder& batch); - /** * @brief Persist commitLogId commitLogTerm and lastApplyLogId */ diff --git a/src/kvstore/test/NebulaListenerTest.cpp b/src/kvstore/test/NebulaListenerTest.cpp index 5d91fba3cf1..718ce8a298f 100644 --- a/src/kvstore/test/NebulaListenerTest.cpp +++ b/src/kvstore/test/NebulaListenerTest.cpp @@ -63,15 +63,14 @@ class DummyListener : public Listener { int64_t size = 0; std::tuple result{ nebula::cpp2::ErrorCode::SUCCEEDED, count, size}; - std::vector data; - data.reserve(rows.size()); + BatchHolder batch; for (const auto& row : rows) { count++; size += row.size(); auto kv = decodeKV(row); - data.emplace_back(kv.first, kv.second); + batch.put(kv.first.toString(), kv.second.toString()); } - if (!apply(data)) { + if (!apply(batch)) { LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot."; result = {nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED, kNoSnapshotCount, @@ -124,8 +123,8 @@ class DummyListener : public Listener { } LogID lastApplyId = -1; - // the kv pair which can sync to remote safely - std::vector data; + // // the kv pair which can sync to remote safely + BatchHolder batch; while (iter->valid()) { lastApplyId = iter->logId(); @@ -141,28 +140,52 @@ class DummyListener : public Listener { case OP_PUT: { auto pieces = decodeMultiValues(log); DCHECK_EQ(2, pieces.size()); - data.emplace_back(pieces[0], pieces[1]); + batch.put(pieces[0].toString(), pieces[1].toString()); break; } case OP_MULTI_PUT: { auto kvs = decodeMultiValues(log); DCHECK_EQ(0, kvs.size() % 2); for (size_t i = 0; i < kvs.size(); i += 2) { - data.emplace_back(kvs[i], kvs[i + 1]); + batch.put(kvs[i].toString(), kvs[i + 1].toString()); } break; } - case OP_REMOVE: - case OP_REMOVE_RANGE: + case OP_REMOVE: { + auto key = decodeSingleValue(log); + batch.remove(key.toString()); + break; + } + case OP_REMOVE_RANGE: { + auto kvs = decodeMultiValues(log); + DCHECK_EQ(2, kvs.size()); + batch.rangeRemove(kvs[0].toString(), kvs[1].toString()); + break; + } case OP_MULTI_REMOVE: { + auto keys = decodeMultiValues(log); + for (auto key : keys) { + batch.remove(key.toString()); + } break; } case OP_BATCH_WRITE: { - auto batch = decodeBatchValue(log); - for (auto& op : batch) { + auto batchData = decodeBatchValue(log); + for (auto& op : batchData) { // OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored - if (op.first == BatchLogType::OP_BATCH_PUT) { - data.emplace_back(op.second.first, op.second.second); + switch (op.first) { + case BatchLogType::OP_BATCH_PUT: { + batch.put(op.second.first.toString(), op.second.second.toString()); + break; + } + case BatchLogType::OP_BATCH_REMOVE: { + batch.remove(op.second.first.toString()); + break; + } + case BatchLogType::OP_BATCH_REMOVE_RANGE: { + batch.rangeRemove(op.second.first.toString(), op.second.second.toString()); + break; + } } } break; @@ -180,7 +203,7 @@ class DummyListener : public Listener { ++(*iter); } // apply to state machine - if (lastApplyId != -1 && apply(data)) { + if (lastApplyId != -1 && apply(batch)) { std::lock_guard guard(raftLock_); lastApplyLogId_ = lastApplyId; persist(committedLogId_, term_, lastApplyLogId_); @@ -192,7 +215,7 @@ class DummyListener : public Listener { protected: void init() override {} - bool apply(const BatchHolder& batch) override { + bool apply(const BatchHolder& batch) { for (auto& log : batch.getBatch()) { switch (std::get<0>(log)) { case BatchLogType::OP_BATCH_PUT: {