diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index ef23713a5010b4..af1b7ff4f5dd00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -422,6 +422,7 @@ public class Env { private AtomicBoolean canRead = new AtomicBoolean(false); private String toMasterProgress = ""; private BlockingQueue typeTransferQueue; + private long transferToMasterTime = -1L; // node name is used for bdbje NodeName. protected String nodeName; @@ -1687,6 +1688,8 @@ private void transferToMaster() { if (analysisManager != null) { analysisManager.getStatisticsCache().preHeat(); } + + transferToMasterTime = System.currentTimeMillis(); } catch (Throwable e) { // When failed to transfer to master, we need to exit the process. // Otherwise, the process will be in an unknown state. @@ -1917,6 +1920,8 @@ private void transferToNonMaster(FrontendNodeType newType) { followerColumnSender = new FollowerColumnSender(); followerColumnSender.start(); } + + transferToMasterTime = -1L; } catch (Throwable e) { // When failed to transfer to non-master, we need to exit the process. // Otherwise, the process will be in an unknown state. @@ -1925,6 +1930,10 @@ private void transferToNonMaster(FrontendNodeType newType) { } } + public long getTransferToMasterTime() { + return transferToMasterTime; + } + // Set global variable 'lower_case_table_names' only when the cluster is initialized. private void initLowerCaseTableNames() { if (Config.lower_case_table_names > 2 || Config.lower_case_table_names < 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 78947afdb11e39..ba824efb346295 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -508,7 +508,6 @@ public void checkDecommissionState(Map> clusterToBes) { private boolean completeRouteInfo() { List updateReplicaInfos = new ArrayList(); long[] assignedErrNum = {0L}; - long needRehashDeadTime = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L; loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> { boolean assigned = false; List beIds = new ArrayList(); @@ -527,8 +526,7 @@ private boolean completeRouteInfo() { // primary backend is alive or dead not long Backend be = replica.getPrimaryBackend(cluster); - if (be != null && (be.isQueryAvailable() - || (!be.isQueryDisabled() && be.getLastUpdateMs() > needRehashDeadTime))) { + if (!needRehashTabletOnPrimaryBackend(be)) { beIds.add(be.getId()); tabletIds.add(tablet.getId()); continue; @@ -592,6 +590,40 @@ private boolean completeRouteInfo() { return true; } + private boolean needRehashTabletOnPrimaryBackend(Backend be) { + if (be == null) { + return true; + } + + // be is alive and not disable query + if (be.isQueryAvailable()) { + return false; + } + + // disable query + if (be.isQueryDisabled()) { + return true; + } + + // backend's last heartbeat time maybe incorrect because not always write heartbeat editlog. + // only backend state change can write a heartbeat editlog, like alive change to dead, or dead change to alive. + // + // suppose steps as follow: + // 1. be dead at time T1; + // 2. be alive at time T2, since be state change, write an editlog, be's last update ms = T2; + // 3. be heartbeat ok at time T3. but no state change, no write editlog; + // 4. kill -9 master fe at time T4; + // 5. kill -9 be at time T5; + // 6. master fe become alive, it will replay editlog of step 2, now be's last update ms = T2. + // this is incorrect, should least >= T3. + // + // so rehash a primary be need two condititions: + // a. be lost heartbeat for a long time; + // b. fe had become master for a long time; + long ts = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L; + return be.getLastUpdateMs() < ts && Env.getCurrentEnv().getTransferToMasterTime() < ts; + } + public void fillBeToTablets(long be, long tableId, long partId, long indexId, Tablet tablet, Map> globalBeToTablets, Map>> beToTabletsInTable, diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index b0304648b73471..bf21d1c8166d7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -803,7 +803,7 @@ public String toString() { public String getHealthyStatus() { return "Backend [id=" + id + ", isDecommission: " + isDecommissioned + ", backendStatus: " + backendStatus + ", isAlive: " + isAlive.get() + ", lastUpdateTime: " - + TimeUtils.longToTimeString(lastUpdateMs); + + TimeUtils.longToTimeString(lastUpdateMs) + "]"; } /** @@ -882,6 +882,10 @@ public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay) heartbeatErrMsg = ""; this.heartbeatFailureCounter = 0; } else { + // for a bad BackendHbResponse, its hbTime is last succ hbTime, not this hbTime + if (hbResponse.getHbTime() > 0) { + this.lastUpdateMs = hbResponse.getHbTime(); + } // Only set backend to dead if the heartbeat failure counter exceed threshold. // And if it is a replay process, must set backend to dead. if (isReplay || ++this.heartbeatFailureCounter >= Config.max_backend_heartbeat_failure_tolerance_count) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java index a0311a9b737847..88fe998ad4d7cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java @@ -98,18 +98,20 @@ public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long this.beMemory = beMemory; } - public BackendHbResponse(long beId, String errMsg) { + public BackendHbResponse(long beId, long lastHbTime, String errMsg) { super(HeartbeatResponse.Type.BACKEND); this.status = HbStatus.BAD; this.beId = beId; + this.hbTime = lastHbTime; this.msg = errMsg; } - public BackendHbResponse(long beId, String host, String errMsg) { + public BackendHbResponse(long beId, String host, long lastHbTime, String errMsg) { super(HeartbeatResponse.Type.BACKEND); this.status = HbStatus.BAD; this.beId = beId; this.host = host; + this.hbTime = lastHbTime; this.msg = errMsg; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 3fc09b31f2d312..89f55239f7fb56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -315,13 +315,13 @@ public HeartbeatResponse call() { System.currentTimeMillis(), beStartTime, version, nodeRole, fragmentNum, lastFragmentUpdateTime, isShutDown, arrowFlightSqlPort, beMemory); } else { - return new BackendHbResponse(backendId, backend.getHost(), + return new BackendHbResponse(backendId, backend.getHost(), backend.getLastUpdateMs(), result.getStatus().getErrorMsgs().isEmpty() ? "Unknown error" : result.getStatus().getErrorMsgs().get(0)); } } catch (Exception e) { LOG.warn("backend heartbeat got exception", e); - return new BackendHbResponse(backendId, backend.getHost(), + return new BackendHbResponse(backendId, backend.getHost(), backend.getLastUpdateMs(), Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage()); } finally { if (client != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatResponse.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatResponse.java index 447ffad81899aa..3fffd1214503d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatResponse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatResponse.java @@ -51,10 +51,12 @@ public enum HbStatus { protected boolean isTypeRead = false; /** - * msg and hbTime are no need to be synchronized to other Frontends, + * msg no need to be synchronized to other Frontends, * and only Master Frontend has these info */ protected String msg; + + @SerializedName(value = "hbTime") protected long hbTime; public HeartbeatResponse(Type type) {