From 29f4daab3cacc458840f8bbd0e8cd433577cb924 Mon Sep 17 00:00:00 2001 From: "liwen.2022" Date: Wed, 9 Aug 2023 14:35:50 +0800 Subject: [PATCH] optimize: check timeout more efficient Change-Id: I103c0aad5bcc313cce02e091f7cd11bf4dd0eb32 --- .../storage/dledger/DLedgerEntryPusher.java | 24 ++++++++++--------- .../statemachine/StateMachineCaller.java | 2 +- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index 9467f5fd..e7a6a490 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -216,21 +216,23 @@ public boolean completeResponseFuture(final ApplyEntry task) { /** * Check responseFutures timeout from {beginIndex} in currentTerm */ - public void checkResponseFuturesTimeout() { + public void checkResponseFuturesTimeout(long beginIndex) { final long term = this.memberState.currTerm(); + long maxIndex = this.memberState.getCommittedIndex() + dLedgerConfig.getMaxPendingRequestsNum() + 1; + if (maxIndex > this.memberState.getLedgerEndIndex()) { + maxIndex = this.memberState.getLedgerEndIndex() + 1; + } ConcurrentMap closureMap = this.pendingClosure.get(term); if (closureMap != null) { - for (Map.Entry futureEntry : closureMap.entrySet()) { - if (futureEntry == null) { - continue; - } - Closure closure = futureEntry.getValue(); + for (long i = beginIndex; i < maxIndex; i++) { + Closure closure = closureMap.get(i); if (closure == null) { - continue; - } - if (closure.isTimeOut()) { + // index may be removed for complete, we should continue scan + } else if (closure.isTimeOut()) { closure.done(Status.error(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT)); - closureMap.remove(futureEntry.getKey()); + closureMap.remove(i); + } else { + break; } } } @@ -306,7 +308,7 @@ public void doWork() { } if (DLedgerUtils.elapsed(lastCheckTimeoutTimeMs) > 1000) { // clear the timeout pending closure should check all since it can timeout for different index - checkResponseFuturesTimeout(); + checkResponseFuturesTimeout(DLedgerEntryPusher.this.memberState.getAppliedIndex() + 1); lastCheckTimeoutTimeMs = System.currentTimeMillis(); } if (!memberState.isLeader()) { diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java index 99defbb0..340c80f9 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java @@ -203,7 +203,7 @@ private void doCommitted(final long committedIndex) { // Check response timeout. if (iter.getCompleteAckNums() == 0) { if (this.entryPusher != null) { - this.entryPusher.checkResponseFuturesTimeout(); + this.entryPusher.checkResponseFuturesTimeout(this.memberState.getAppliedIndex() + 1); } } }