diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index f4e96bbd7a8366..dd0aca5923e74a 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2033,6 +2033,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static long max_backend_heartbeat_failure_tolerance_count = 1; + /** + * Even if a backend is healthy, still write a heartbeat editlog to update backend's lastUpdateMs of bdb image. + */ + @ConfField(mutable = true, masterOnly = true) + public static int editlog_healthy_heartbeat_seconds = 300; + /** * Abort transaction time after lost heartbeat. * The default value is 300s, which means transactions of be will be aborted after lost heartbeat 300s. diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index da55ecee0de603..974c0e0cae13a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -47,6 +47,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -153,6 +154,8 @@ public class Backend implements Writable { // send some queries to this BE, it is not an important problem. private AtomicBoolean isShutDown = new AtomicBoolean(false); + private long nextForceEditlogHeartbeatTime = System.currentTimeMillis() + (new SecureRandom()).nextInt(60 * 1000); + public Backend() { this.host = ""; this.version = ""; @@ -876,7 +879,18 @@ public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay) heartbeatErrMsg = ""; this.heartbeatFailureCounter = 0; + + // even if no change, write an editlog to make lastUpdateMs in image update + if (System.currentTimeMillis() >= this.nextForceEditlogHeartbeatTime) { + isChanged = true; + int delaySecond = Config.editlog_healthy_heartbeat_seconds + (new SecureRandom()).nextInt(60); + this.nextForceEditlogHeartbeatTime = System.currentTimeMillis() + delaySecond * 1000L; + } } else { + // for a bad BackendHbResponse, its hbTime is last succ hbTime, not this hbTime + if (hbResponse.getHbTime() > 0) { + this.lastUpdateMs = hbResponse.getHbTime(); + } // Only set backend to dead if the heartbeat failure counter exceed threshold. // And if it is a replay process, must set backend to dead. if (isReplay || ++this.heartbeatFailureCounter >= Config.max_backend_heartbeat_failure_tolerance_count) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java index a0311a9b737847..479966d2ff3c8c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java @@ -98,18 +98,12 @@ public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long this.beMemory = beMemory; } - public BackendHbResponse(long beId, String errMsg) { - super(HeartbeatResponse.Type.BACKEND); - this.status = HbStatus.BAD; - this.beId = beId; - this.msg = errMsg; - } - - public BackendHbResponse(long beId, String host, String errMsg) { + public BackendHbResponse(long beId, String host, long lastHbTime, String errMsg) { super(HeartbeatResponse.Type.BACKEND); this.status = HbStatus.BAD; this.beId = beId; this.host = host; + this.hbTime = lastHbTime; this.msg = errMsg; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 3fc09b31f2d312..89f55239f7fb56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -315,13 +315,13 @@ public HeartbeatResponse call() { System.currentTimeMillis(), beStartTime, version, nodeRole, fragmentNum, lastFragmentUpdateTime, isShutDown, arrowFlightSqlPort, beMemory); } else { - return new BackendHbResponse(backendId, backend.getHost(), + return new BackendHbResponse(backendId, backend.getHost(), backend.getLastUpdateMs(), result.getStatus().getErrorMsgs().isEmpty() ? "Unknown error" : result.getStatus().getErrorMsgs().get(0)); } } catch (Exception e) { LOG.warn("backend heartbeat got exception", e); - return new BackendHbResponse(backendId, backend.getHost(), + return new BackendHbResponse(backendId, backend.getHost(), backend.getLastUpdateMs(), Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage()); } finally { if (client != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatResponse.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatResponse.java index 447ffad81899aa..3fffd1214503d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatResponse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatResponse.java @@ -51,10 +51,12 @@ public enum HbStatus { protected boolean isTypeRead = false; /** - * msg and hbTime are no need to be synchronized to other Frontends, + * msg no need to be synchronized to other Frontends, * and only Master Frontend has these info */ protected String msg; + + @SerializedName(value = "hbTime") protected long hbTime; public HeartbeatResponse(Type type) {