Skip to content

Commit

Permalink
fix replay heartbeat lost update time
Browse files Browse the repository at this point in the history
  • Loading branch information
yujun777 committed Oct 29, 2024
1 parent 40d0f77 commit 685260e
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 9 deletions.
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ public class Env {
private AtomicBoolean canRead = new AtomicBoolean(false);
private String toMasterProgress = "";
private BlockingQueue<FrontendNodeType> typeTransferQueue;
private long transferToMasterTime = -1L;

// node name is used for bdbje NodeName.
protected String nodeName;
Expand Down Expand Up @@ -1687,6 +1688,8 @@ private void transferToMaster() {
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}

transferToMasterTime = System.currentTimeMillis();
} catch (Throwable e) {
// When failed to transfer to master, we need to exit the process.
// Otherwise, the process will be in an unknown state.
Expand Down Expand Up @@ -1917,6 +1920,8 @@ private void transferToNonMaster(FrontendNodeType newType) {
followerColumnSender = new FollowerColumnSender();
followerColumnSender.start();
}

transferToMasterTime = -1L;
} catch (Throwable e) {
// When failed to transfer to non-master, we need to exit the process.
// Otherwise, the process will be in an unknown state.
Expand All @@ -1925,6 +1930,10 @@ private void transferToNonMaster(FrontendNodeType newType) {
}
}

public long getTransferToMasterTime() {
return transferToMasterTime;
}

// Set global variable 'lower_case_table_names' only when the cluster is initialized.
private void initLowerCaseTableNames() {
if (Config.lower_case_table_names > 2 || Config.lower_case_table_names < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,6 @@ public void checkDecommissionState(Map<String, List<Long>> clusterToBes) {
private boolean completeRouteInfo() {
List<UpdateCloudReplicaInfo> updateReplicaInfos = new ArrayList<UpdateCloudReplicaInfo>();
long[] assignedErrNum = {0L};
long needRehashDeadTime = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L;
loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> {
boolean assigned = false;
List<Long> beIds = new ArrayList<Long>();
Expand All @@ -527,8 +526,7 @@ private boolean completeRouteInfo() {

// primary backend is alive or dead not long
Backend be = replica.getPrimaryBackend(cluster);
if (be != null && (be.isQueryAvailable()
|| (!be.isQueryDisabled() && be.getLastUpdateMs() > needRehashDeadTime))) {
if (!needRehashTabletOnPrimaryBackend(be)) {
beIds.add(be.getId());
tabletIds.add(tablet.getId());
continue;
Expand Down Expand Up @@ -592,6 +590,40 @@ private boolean completeRouteInfo() {
return true;
}

private boolean needRehashTabletOnPrimaryBackend(Backend be) {
if (be == null) {
return true;
}

// be is alive and not disable query
if (be.isQueryAvailable()) {
return false;
}

// disable query
if (be.isQueryDisabled()) {
return true;
}

// backend's last heartbeat time maybe incorrect because not always write heartbeat editlog.
// only backend state change can write a heartbeat editlog, like alive change to dead, or dead change to alive.
//
// suppose steps as follow:
// 1. be dead at time T1;
// 2. be alive at time T2, since be state change, write an editlog, be's last update ms = T2;
// 3. be heartbeat ok at time T3. but no state change, no write editlog;
// 4. kill -9 master fe at time T4;
// 5. kill -9 be at time T5;
// 6. master fe become alive, it will replay editlog of step 2, now be's last update ms = T2.
// this is incorrect, should least >= T3.
//
// so rehash a primary be need two condititions:
// a. be lost heartbeat for a long time;
// b. fe had become master for a long time;
long ts = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L;
return be.getLastUpdateMs() < ts && Env.getCurrentEnv().getTransferToMasterTime() < ts;
}

public void fillBeToTablets(long be, long tableId, long partId, long indexId, Tablet tablet,
Map<Long, List<Tablet>> globalBeToTablets,
Map<Long, Map<Long, List<Tablet>>> beToTabletsInTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ public String toString() {
public String getHealthyStatus() {
return "Backend [id=" + id + ", isDecommission: " + isDecommissioned
+ ", backendStatus: " + backendStatus + ", isAlive: " + isAlive.get() + ", lastUpdateTime: "
+ TimeUtils.longToTimeString(lastUpdateMs);
+ TimeUtils.longToTimeString(lastUpdateMs) + "]";
}

/**
Expand Down Expand Up @@ -882,6 +882,10 @@ public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay)
heartbeatErrMsg = "";
this.heartbeatFailureCounter = 0;
} else {
// for a bad BackendHbResponse, its hbTime is last succ hbTime, not this hbTime
if (hbResponse.getHbTime() > 0) {
this.lastUpdateMs = hbResponse.getHbTime();
}
// Only set backend to dead if the heartbeat failure counter exceed threshold.
// And if it is a replay process, must set backend to dead.
if (isReplay || ++this.heartbeatFailureCounter >= Config.max_backend_heartbeat_failure_tolerance_count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,20 @@ public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long
this.beMemory = beMemory;
}

public BackendHbResponse(long beId, String errMsg) {
public BackendHbResponse(long beId, long lastHbTime, String errMsg) {
super(HeartbeatResponse.Type.BACKEND);
this.status = HbStatus.BAD;
this.beId = beId;
this.hbTime = lastHbTime;
this.msg = errMsg;
}

public BackendHbResponse(long beId, String host, String errMsg) {
public BackendHbResponse(long beId, String host, long lastHbTime, String errMsg) {
super(HeartbeatResponse.Type.BACKEND);
this.status = HbStatus.BAD;
this.beId = beId;
this.host = host;
this.hbTime = lastHbTime;
this.msg = errMsg;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,13 @@ public HeartbeatResponse call() {
System.currentTimeMillis(), beStartTime, version, nodeRole,
fragmentNum, lastFragmentUpdateTime, isShutDown, arrowFlightSqlPort, beMemory);
} else {
return new BackendHbResponse(backendId, backend.getHost(),
return new BackendHbResponse(backendId, backend.getHost(), backend.getLastUpdateMs(),
result.getStatus().getErrorMsgs().isEmpty()
? "Unknown error" : result.getStatus().getErrorMsgs().get(0));
}
} catch (Exception e) {
LOG.warn("backend heartbeat got exception", e);
return new BackendHbResponse(backendId, backend.getHost(),
return new BackendHbResponse(backendId, backend.getHost(), backend.getLastUpdateMs(),
Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
} finally {
if (client != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ public enum HbStatus {
protected boolean isTypeRead = false;

/**
* msg and hbTime are no need to be synchronized to other Frontends,
* msg no need to be synchronized to other Frontends,
* and only Master Frontend has these info
*/
protected String msg;

@SerializedName(value = "hbTime")
protected long hbTime;

public HeartbeatResponse(Type type) {
Expand Down

0 comments on commit 685260e

Please sign in to comment.