diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 4a80eb35ca6f51..19d25e9ffa1e4a 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -45,6 +45,7 @@ #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_context.h" #include "thrift/protocol/TDebugProtocol.h" +#include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/thrift_rpc_helper.h" #include "util/time.h" @@ -242,6 +243,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { request.__set_timeout(ctx->timeout_second); } request.__set_request_id(ctx->id.to_thrift()); + request.__set_backend_id(_exec_env->master_info()->backend_id); TLoadTxnBeginResult result; Status status; @@ -374,6 +376,8 @@ void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx, } Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { + DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK); + DorisMetrics::instance()->stream_load_txn_commit_request_total->increment(1); TLoadTxnCommitRequest request; diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index 1c2ab9939a175a..fbab5cfcd157c2 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -587,6 +587,16 @@ Is it possible to configure dynamically: true Whether it is a configuration item unique to the Master FE node: true +### `abort_txn_after_lost_heartbeat_time_second` + +Abort transaction time after lost heartbeat. The default value is 300, which means transactions of be will be aborted after lost heartbeat 300s. + +Default: 300(s) + +Is it possible to configure dynamically: true + +Whether it is a configuration item unique to the Master FE node: true + #### `enable_access_file_without_broker` Default:false diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index c93b2d17e01747..60e2198b711f82 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -587,6 +587,16 @@ FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。 是否为 Master FE 节点独有的配置项:true +#### `abort_txn_after_lost_heartbeat_time_second` + +丢失be心跳后丢弃be事务的时间。默认时间为三百秒,当三百秒fe没有接收到be心跳时,会丢弃该be的所有事务。 + +默认值:300(秒) + +是否可以动态配置:true + +是否为 Master FE 节点独有的配置项:true + #### `enable_access_file_without_broker` 默认值:false diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index a5c0ad36ad8975..44882f217683f0 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1812,6 +1812,20 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static long max_backend_heartbeat_failure_tolerance_count = 1; + /** + * Abort transaction time after lost heartbeat. + * The default value is 300s, which means transactions of be will be aborted after lost heartbeat 300s. + */ + @ConfField(mutable = true, masterOnly = true) + public static int abort_txn_after_lost_heartbeat_time_second = 300; + + /** + * Heartbeat interval in seconds. + * Default is 5, which means every 5 seconds, the master will send a heartbeat to all backends. + */ + @ConfField(mutable = false, masterOnly = false) + public static int heartbeat_interval_second = 5; + /** * The iceberg and hudi table will be removed in v1.3 * Use multi catalog instead. diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 9625db3ea5c4c5..f95efcd3474c9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -53,6 +53,7 @@ import org.apache.doris.planner.external.jdbc.JdbcTableSink; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.ExprRewriter; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TUniqueId; @@ -358,7 +359,9 @@ public void analyze(Analyzer analyzer) throws UserException { LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), Lists.newArrayList(targetTable.getId()), label.getLabelName(), - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), sourceType, timeoutSecond); } isTransactionBegin = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java index 7308a22540234e..bdfffbe4802f00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java @@ -27,7 +27,7 @@ public class ClientPool { static GenericKeyedObjectPoolConfig heartbeatConfig = new GenericKeyedObjectPoolConfig(); - static int heartbeatTimeoutMs = FeConstants.heartbeat_interval_second * 1000; + static int heartbeatTimeoutMs = Config.heartbeat_interval_second * 1000; static GenericKeyedObjectPoolConfig backendConfig = new GenericKeyedObjectPoolConfig(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index 1afa12856f05e8..2f45d87e8951d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -34,7 +34,6 @@ public class FeConstants { public static int shortkey_max_column_count = 3; public static int shortkey_maxsize_bytes = 36; - public static int heartbeat_interval_second = 5; public static int checkpoint_interval_second = 60; // 1 minutes // dpp version diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index b358ea60b9ae96..c021c62f367e28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -22,6 +22,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.LoadException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.entity.RestBaseResult; import org.apache.doris.httpv2.exception.UnauthorizedException; @@ -229,6 +230,11 @@ private Object executeStreamLoad2PC(HttpServletRequest request, String db) { } private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadException { + long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L); + if (debugBackendId != -1L) { + Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId); + return new TNetworkAddress(backend.getHost(), backend.getHttpPort()); + } String qualifiedUser = ConnectContext.get().getQualifiedUser(); Set userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); BeSelectionPolicy policy = new BeSelectionPolicy.Builder() diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java index b771eb8c9fa03b..70b3765227a962 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java @@ -69,6 +69,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.QueryStateException; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; @@ -245,7 +246,9 @@ public void process(DeleteStmt stmt) throws DdlException, QueryStateException { // begin txn here and generate txn id transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), Lists.newArrayList(olapTable.getId()), label, null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), TransactionState.LoadJobSourceType.FRONTEND, jobId, Config.stream_load_default_timeout_second); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 656b9b9d699a33..4591bb91b031b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -49,6 +49,7 @@ import org.apache.doris.load.FailMsg; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; @@ -104,7 +105,9 @@ public void beginTxn() QuotaExceedException, MetaNotFoundException { transactionId = Env.getCurrentGlobalTransactionMgr() .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id, getTimeout()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 09a85d3dffe692..878a400e2815ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -61,6 +61,7 @@ import org.apache.doris.load.EtlStatus; import org.apache.doris.load.FailMsg; import org.apache.doris.qe.OriginStatement; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.sparkdpp.DppResult; import org.apache.doris.sparkdpp.EtlJobConfig; @@ -198,7 +199,9 @@ public void beginTxn() QuotaExceedException, MetaNotFoundException { transactionId = Env.getCurrentGlobalTransactionMgr() .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), LoadJobSourceType.FRONTEND, id, getTimeout()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 10d57e66d67334..ae2570224c40ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -27,6 +27,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; @@ -206,7 +207,9 @@ public boolean beginTxn() throws UserException { try { txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(), Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(), timeoutMs / 1000); } catch (DuplicatedRequestException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index 5b10ecea818ffa..825d63292efaac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -32,6 +32,7 @@ import org.apache.doris.load.sync.model.Data; import org.apache.doris.proto.InternalService; import org.apache.doris.qe.InsertStreamTxnExecutor; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.SyncTask; import org.apache.doris.task.SyncTaskPool; @@ -133,8 +134,10 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce try { long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label, - new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, - FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond); + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), + sourceType, timeoutSecond); String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); request = new TStreamLoadPutRequest() .setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java index 994f5e36055f17..db6598a1e1a7a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java @@ -34,6 +34,7 @@ import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.TQueryType; @@ -80,7 +81,9 @@ public Transaction(ConnectContext ctx, Database database, Table table, String la this.coordinator = new Coordinator(ctx, null, planner, ctx.getStatsErrorEstimator()); this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( database.getId(), ImmutableList.of(table.getId()), labelName, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout()); this.createAt = System.currentTimeMillis(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java index ace88c8ac0da1c..48c7744576c922 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java @@ -19,7 +19,6 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.UserException; @@ -176,7 +175,7 @@ public static void addToBlacklist(Long backendID, String reason) { return; } - blacklistBackends.put(backendID, Pair.of(FeConstants.heartbeat_interval_second + 1, reason)); + blacklistBackends.put(backendID, Pair.of(Config.heartbeat_interval_second + 1, reason)); LOG.warn("add backend {} to black list. reason: {}", backendID, reason); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 8dda65e81f78a9..f47121348f860e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -160,6 +160,7 @@ import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.util.InternalQueryBuffer; @@ -1908,9 +1909,10 @@ private void beginTxn(String dbName, String tblName) throws UserException, TExce String label = txnEntry.getLabel(); if (Env.getCurrentEnv().isMaster()) { long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), - label, new TransactionState.TxnCoordinator( - TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label, + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), sourceType, timeoutSecond); txnConf.setTxnId(txnId); String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java index a7ac522b5bd713..ecd544c8bffded 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java @@ -26,10 +26,12 @@ public class ExecuteEnv { private static volatile ExecuteEnv INSTANCE; private MultiLoadMgr multiLoadMgr; private ConnectScheduler scheduler; + private long startupTime; private ExecuteEnv() { multiLoadMgr = new MultiLoadMgr(); scheduler = new ConnectScheduler(Config.qe_max_connection); + startupTime = System.currentTimeMillis(); } public static ExecuteEnv getInstance() { @@ -50,4 +52,9 @@ public ConnectScheduler getScheduler() { public MultiLoadMgr getMultiLoadMgr() { return multiLoadMgr; } + + public long getStartupTime() { + return startupTime; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 36fdec157ac6c4..a38921221fd661 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1249,10 +1249,12 @@ private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, TableType.OLAP); // begin long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; + Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId()); + long startTime = backend != null ? backend.getLastStartTime() : 0; + TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime); long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequestId(), - new TxnCoordinator(TxnSourceType.BE, clientIp), - TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond); + txnCoord, TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond); TLoadTxnBeginResult result = new TLoadTxnBeginResult(); result.setTxnId(txnId).setDbId(db.getId()); return result; @@ -1356,10 +1358,12 @@ private TBeginTxnResult beginTxnImpl(TBeginTxnRequest request, String clientIp) // step 5: get timeout long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; + Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId()); + long startTime = backend != null ? backend.getLastStartTime() : 0; + TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime); // step 6: begin transaction long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - db.getId(), tableIdList, request.getLabel(), request.getRequestId(), - new TxnCoordinator(TxnSourceType.BE, clientIp), + db.getId(), tableIdList, request.getLabel(), request.getRequestId(), txnCoord, TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond); // step 7: return result diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 7c081c12cd07b8..f621ae1e322f38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -77,7 +77,7 @@ public class HeartbeatMgr extends MasterDaemon { private static volatile AtomicReference masterInfo = new AtomicReference<>(); public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) { - super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000); + super("heartbeat mgr", Config.heartbeat_interval_second * 1000); this.nodeMgr = nodeMgr; this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num, Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric); @@ -168,13 +168,21 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) { BackendHbResponse hbResponse = (BackendHbResponse) response; Backend be = nodeMgr.getBackend(hbResponse.getBeId()); if (be != null) { + long oldStartTime = be.getLastStartTime(); boolean isChanged = be.handleHbResponse(hbResponse, isReplay); - if (hbResponse.getStatus() != HbStatus.OK) { + if (hbResponse.getStatus() == HbStatus.OK) { + long newStartTime = be.getLastStartTime(); + if (!isReplay && oldStartTime != newStartTime) { + Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart( + be.getId(), be.getHost(), newStartTime); + } + } else { // invalid all connections cached in ClientPool ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort())); - if (!isReplay) { - Env.getCurrentEnv().getGlobalTransactionMgr() - .abortTxnWhenCoordinateBeDown(be.getHost(), 100); + if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs() + >= Config.abort_txn_after_lost_heartbeat_time_second * 1000L) { + Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown( + be.getId(), be.getHost(), 100); } } return isChanged; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 7ef043136baa2b..d733d863f4e136 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1733,13 +1733,16 @@ public TransactionState getTransactionStateByCallbackId(long callbackId) { return null; } - public List> getTransactionIdByCoordinateBe(String coordinateHost, int limit) { + public List> getPrepareTransactionIdByCoordinateBe(long coordinateBeId, + String coordinateHost, int limit) { ArrayList> txnInfos = new ArrayList<>(); readLock(); try { idToRunningTransactionState.values().stream() .filter(t -> (t.getCoordinator().sourceType == TransactionState.TxnSourceType.BE - && t.getCoordinator().ip.equals(coordinateHost))) + && t.getTransactionStatus() == TransactionStatus.PREPARE + && t.getCoordinator().ip.equals(coordinateHost) + && (t.getCoordinator().id == 0 || t.getCoordinator().id == coordinateBeId))) .limit(limit) .forEach(t -> txnInfos.add(Pair.of(t.getDbId(), t.getTransactionId()))); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 22a019c4c0dd03..35c2195eb33055 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -653,10 +653,12 @@ public TransactionState getTransactionStateByCallbackId(long dbId, long callback } } - public List> getTransactionIdByCoordinateBe(String coordinateHost, int limit) { + private List> getPrepareTransactionIdByCoordinateBe(long coordinateBeId, + String coordinateHost, int limit) { ArrayList> txnInfos = new ArrayList<>(); for (DatabaseTransactionMgr databaseTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { - txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost, limit)); + txnInfos.addAll(databaseTransactionMgr.getPrepareTransactionIdByCoordinateBe( + coordinateBeId, coordinateHost, limit)); if (txnInfos.size() > limit) { break; } @@ -664,19 +666,33 @@ public List> getTransactionIdByCoordinateBe(String coordinateHo return txnInfos.size() > limit ? new ArrayList<>(txnInfos.subList(0, limit)) : txnInfos; } + public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordinateHost, long beStartTime) { + List> transactionIdByCoordinateBe + = getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, Integer.MAX_VALUE); + for (Pair txnInfo : transactionIdByCoordinateBe) { + try { + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first); + TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second); + long coordStartTime = transactionState.getCoordinator().startTime; + if (coordStartTime < beStartTime) { + dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE restart", null); + } + } catch (UserException e) { + LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage()); + } + } + } + /** * If a Coordinate BE is down when running txn, the txn will remain in FE until killed by timeout * So when FE identify the Coordinate BE is down, FE should cancel it initiative */ - public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit) { - List> transactionIdByCoordinateBe = getTransactionIdByCoordinateBe(coordinateHost, limit); + public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String coordinateHost, int limit) { + List> transactionIdByCoordinateBe + = getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, limit); for (Pair txnInfo : transactionIdByCoordinateBe) { try { DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first); - TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second); - if (transactionState.getTransactionStatus() == TransactionStatus.PRECOMMITTED) { - continue; - } dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE is down", null); } catch (UserException e) { LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index f9a094eceb9b0f..cdef27c935924e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -162,15 +162,23 @@ public static TxnSourceType valueOf(int flag) { public static class TxnCoordinator { @SerializedName(value = "sourceType") public TxnSourceType sourceType; + // backendId for backend, 0 for frontend + @SerializedName(value = "id") + public long id = 0; @SerializedName(value = "ip") public String ip; + // frontend/backend start time + @SerializedName(value = "st") + public long startTime = 0; public TxnCoordinator() { } - public TxnCoordinator(TxnSourceType sourceType, String ip) { + public TxnCoordinator(TxnSourceType sourceType, long id, String ip, long startTime) { this.sourceType = sourceType; + this.id = id; this.ip = ip; + this.startTime = startTime; } @Override @@ -301,7 +309,8 @@ public TransactionState() { this.transactionId = -1; this.label = ""; this.idToTableCommitInfos = Maps.newHashMap(); - this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, "127.0.0.1"); // mocked, to avoid NPE + // mocked, to avoid NPE + this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, 0, "127.0.0.1", System.currentTimeMillis()); this.transactionStatus = TransactionStatus.PREPARE; this.sourceType = LoadJobSourceType.FRONTEND; this.prepareTime = -1; @@ -721,7 +730,7 @@ public void readFields(DataInput in) throws IOException { info.readFields(in); idToTableCommitInfos.put(info.getTableId(), info); } - txnCoordinator = new TxnCoordinator(TxnSourceType.valueOf(in.readInt()), Text.readString(in)); + txnCoordinator = new TxnCoordinator(TxnSourceType.valueOf(in.readInt()), 0, Text.readString(in), 0); transactionStatus = TransactionStatus.valueOf(in.readInt()); sourceType = LoadJobSourceType.valueOf(in.readInt()); prepareTime = in.readLong(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java index 4a413495e98f2e..e288d046de1e17 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java @@ -54,7 +54,7 @@ protected void beforeCreatingConnectContext() throws Exception { Config.disable_balance = true; Config.schedule_batch_size = 1000; Config.schedule_slot_num_per_hdd_path = 1000; - FeConstants.heartbeat_interval_second = 5; + Config.heartbeat_interval_second = 5; Config.max_backend_heartbeat_failure_tolerance_count = 1; Config.min_clone_task_timeout_sec = 20 * 60 * 1000; } @@ -114,7 +114,7 @@ public void test() throws Exception { params2.put("deadBeIds", String.valueOf(destBeId)); DebugPointUtil.addDebugPointWithParams("HeartbeatMgr.BackendHeartbeatHandler", params2); - Thread.sleep((FeConstants.heartbeat_interval_second + Thread.sleep((Config.heartbeat_interval_second * Config.max_backend_heartbeat_failure_tolerance_count + 4) * 1000L); destBe = Env.getCurrentSystemInfo().getBackend(destBeId); diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index b917bc41a7e6b3..ff5f5292be8926 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -66,7 +66,7 @@ protected void beforeCreatingConnectContext() throws Exception { Config.disable_balance = true; Config.schedule_batch_size = 1000; Config.schedule_slot_num_per_hdd_path = 1000; - FeConstants.heartbeat_interval_second = 5; + Config.heartbeat_interval_second = 5; } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java index 6ba2d2715663dd..ac13900d2cfff6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java @@ -17,7 +17,7 @@ package org.apache.doris.qe; -import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Config; import org.apache.doris.common.Reference; import org.apache.doris.common.UserException; import org.apache.doris.system.Backend; @@ -47,7 +47,7 @@ public class SimpleSchedulerTest { @BeforeClass public static void setUp() { SimpleScheduler.init(); - FeConstants.heartbeat_interval_second = 2; + Config.heartbeat_interval_second = 2; be1 = new Backend(1000L, "192.168.100.0", 9050); be2 = new Backend(1001L, "192.168.100.1", 9050); be3 = new Backend(1002L, "192.168.100.2", 9050); @@ -139,7 +139,7 @@ public void run() { t3.join(); Assert.assertFalse(SimpleScheduler.isAvailable(be1)); - Thread.sleep((FeConstants.heartbeat_interval_second + 5) * 1000); + Thread.sleep((Config.heartbeat_interval_second + 5) * 1000); Assert.assertTrue(SimpleScheduler.isAvailable(be1)); } @@ -194,7 +194,7 @@ public void testGetHostAbnormal() throws UserException, InterruptedException { System.out.println(e.getMessage()); } - Thread.sleep((FeConstants.heartbeat_interval_second + 5) * 1000); + Thread.sleep((Config.heartbeat_interval_second + 5) * 1000); Assert.assertNotNull(SimpleScheduler.getHost(locations.get(0).backend_id, locations, backends, ref)); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index 9108570e5e4e09..ea63a5e18b10d0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -57,8 +57,8 @@ public class DatabaseTransactionMgrTest { private static Env slaveEnv; private static Map LabelToTxnId; - private TransactionState.TxnCoordinator transactionSource = - new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe"); + private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.FE, 0, "localfe", System.currentTimeMillis()); public static void setTransactionFinishPublish(TransactionState transactionState, List backendIds) { for (long backendId : backendIds) { @@ -118,7 +118,9 @@ public Map addTransactionToTransactionMgr() throws UserException { masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId1); labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1); - TransactionState.TxnCoordinator beTransactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1"); + // txn 2, 3, 4 + TransactionState.TxnCoordinator beTransactionSource = new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.BE, 0, "be1", System.currentTimeMillis()); long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLabel2, beTransactionSource, @@ -204,7 +206,7 @@ public void testAbortTransactionWithNotFoundException() throws UserException { @Test public void testGetTransactionIdByCoordinateBe() throws UserException { DatabaseTransactionMgr masterDbTransMgr = masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1); - List> transactionInfoList = masterDbTransMgr.getTransactionIdByCoordinateBe("be1", 10); + List> transactionInfoList = masterDbTransMgr.getPrepareTransactionIdByCoordinateBe(0, "be1", 10); Assert.assertEquals(3, transactionInfoList.size()); Assert.assertEquals(CatalogTestUtil.testDbId1, transactionInfoList.get(0).first.longValue()); Assert.assertEquals(TransactionStatus.PREPARE, diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 414f5cf03c4db5..cc00237a4c48ed 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -77,7 +77,8 @@ public class GlobalTransactionMgrTest { private static Env masterEnv; private static Env slaveEnv; - private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe"); + private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.FE, 0, "localfe", System.currentTimeMillis()); @Before public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException, @@ -323,7 +324,9 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null, - LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(), + LoadJobSourceType.ROUTINE_LOAD_TASK, + new TxnCoordinator(TxnSourceType.BE, 0, "be1", System.currentTimeMillis()), + routineLoadJob.getId(), Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); masterTransMgr.getCallbackFactory().addCallback(routineLoadJob); @@ -395,7 +398,9 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null, - LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(), + LoadJobSourceType.ROUTINE_LOAD_TASK, + new TxnCoordinator(TxnSourceType.BE, 0, "be1", System.currentTimeMillis()), + routineLoadJob.getId(), Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); masterTransMgr.getCallbackFactory().addCallback(routineLoadJob); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java index c20b2097f8d22e..f08b7478d069d0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java @@ -61,8 +61,9 @@ public void testSerDe() throws IOException { UUID uuid = UUID.randomUUID(); TransactionState transactionState = new TransactionState(1000L, Lists.newArrayList(20000L, 20001L), 3000, "label123", new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()), - LoadJobSourceType.BACKEND_STREAMING, new TxnCoordinator(TxnSourceType.BE, "127.0.0.1"), 50000L, - 60 * 1000L); + LoadJobSourceType.BACKEND_STREAMING, + new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1", System.currentTimeMillis()), + 50000L, 60 * 1000L); transactionState.write(out); out.flush(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index c61fcd28afeb6c..59e4eae0a3729d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -405,7 +405,7 @@ protected void createDorisCluster(String runningDir, int backendNum) } private void checkBEHeartbeat(List bes) throws InterruptedException { - int maxTry = FeConstants.heartbeat_interval_second + 5; + int maxTry = Config.heartbeat_interval_second + 5; boolean allAlive = false; while (maxTry-- > 0 && !allAlive) { Thread.sleep(1000); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index da4c915e9b0745..75a6537ee85cf4 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -559,6 +559,7 @@ struct TLoadTxnBeginRequest { 10: optional i64 timeout 11: optional Types.TUniqueId request_id 12: optional string token + 13: optional i64 backend_id } struct TLoadTxnBeginResult { @@ -581,6 +582,7 @@ struct TBeginTxnRequest { 9: optional i64 timeout 10: optional Types.TUniqueId request_id 11: optional string token + 12: optional i64 backend_id } struct TBeginTxnResult { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index afc23cc2cc01cb..9a14346af10bd8 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -422,6 +422,41 @@ class Suite implements GroovyInterceptable { return result; } + long getTableId(String tableName) { + return getTableId(getDbName(), tableName) + } + + long getTableId(String dbName, String tableName) { + def dbInfo = sql "show proc '/dbs'" + for(List row : dbInfo) { + if (row[1].replace("default_cluster:", "").equals(dbName)) { + def tbInfo = sql "show proc '/dbs/${row[0]}' " + for (List tb : tbInfo) { + if (tb[1].equals(tableName)) { + return tb[0].toLong() + } + } + } + } + } + + long getDbId() { + return getDbId(getDbName()) + } + + long getDbId(String dbName) { + def dbInfo = sql "show proc '/dbs'" + for (List row : dbInfo) { + if (row[1].replace("default_cluster:", "").equals(dbName)) { + return row[0].toLong() + } + } + } + + String getDbName() { + return context.dbName + } + List> order_sql(String sqlStr) { return sql(sqlStr, true) } @@ -681,6 +716,33 @@ class Suite implements GroovyInterceptable { return hdfs.downLoad(label) } + void runStreamLoadExample(String tableName, String coordidateBeHostPort = "") { + def backends = sql_return_maparray "show backends" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + id int, + name varchar(255) + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "${backends.size()}" + ) + """ + + streamLoad { + table tableName + set 'column_separator', ',' + file context.config.dataPath + "/demo_p0/streamload_input.csv" + time 10000 + if (!coordidateBeHostPort.equals("")) { + def pos = coordidateBeHostPort.indexOf(':') + def host = coordidateBeHostPort.substring(0, pos) + def httpPort = coordidateBeHostPort.substring(pos + 1).toInteger() + directToBe host, httpPort + } + } + } + void streamLoad(Closure actionSupplier) { runAction(new StreamLoadAction(context), actionSupplier) } diff --git a/regression-test/suites/demo_p0/streamLoad_action.groovy b/regression-test/suites/demo_p0/streamLoad_action.groovy index a11aed7a1c52d6..b0ca182c8d971c 100644 --- a/regression-test/suites/demo_p0/streamLoad_action.groovy +++ b/regression-test/suites/demo_p0/streamLoad_action.groovy @@ -124,6 +124,11 @@ suite("streamLoad_action") { LIMIT 5; """ + def tableName2 = "test_streamload_action2" + runStreamLoadExample(tableName2) + sql """ DROP TABLE ${tableName} """ + sql """ DROP TABLE ${tableName2}""" + sql """ DROP TABLE B """ } diff --git a/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy new file mode 100644 index 00000000000000..cd2b8ea9e48b3a --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException + +suite('test_coordidator_be_restart') { + def options = new ClusterOptions() + options.cloudMode = false + options.enableDebugPoints() + + docker(options) { + def db = context.config.getDbNameByFile(context.file) + def tableName1 = 'tbl_test_coordidator_be_restart_t1' + setFeConfig('abort_txn_after_lost_heartbeat_time_second', 3600) + + def dbId = getDbId() + + def txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + assertEquals(0, txns.size()) + txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'" + assertEquals(0, txns.size()) + + def coordinatorBe = cluster.getAllBackends().get(0) + def coordinatorBeHost = coordinatorBe.host + + GetDebugPoint().enableDebugPointForAllFEs('LoadAction.selectRedirectBackend.backendId', [value: coordinatorBe.backendId]) + GetDebugPoint().enableDebugPointForAllBEs('StreamLoadExecutor.commit_txn.block') + + thread { + try { + runStreamLoadExample(tableName1, coordinatorBe.host + ':' + coordinatorBe.httpPort) + } catch (NoHttpResponseException t) { + // be down will raise NoHttpResponseException + } + } + + sleep(5000) + txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + logger.info('running txns: ' + txns) + assertEquals(1, txns.size()) + for (def txn : txns) { + assertEquals('PREPARE', txn.TransactionStatus) + } + + txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'" + assertEquals(0, txns.size()) + + // coordinatorBe shutdown not abort txn because abort_txn_after_lost_heartbeat_time_second = 3600 + cluster.stopBackends(coordinatorBe.index) + def isDead = false + for (def i = 0; i < 10; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (!be.Alive.toBoolean()) { + isDead = true + break + } + sleep 1000 + } + assertTrue(isDead) + sleep 5000 + txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + logger.info('running txns: ' + txns) + assertEquals(1, txns.size()) + for (def txn : txns) { + assertEquals('PREPARE', txn.TransactionStatus) + } + + // coordinatorBe restart, abort txn on it + cluster.startBackends(coordinatorBe.index) + def isAlive = false + for (def i = 0; i < 20; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (be.Alive.toBoolean()) { + isAlive = true + break + } + sleep 1000 + } + assertTrue(isAlive) + sleep 5000 + txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + logger.info('running txns: ' + txns) + assertEquals(0, txns.size()) + txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'" + logger.info('finished txns: ' + txns) + assertEquals(1, txns.size()) + for (def txn : txns) { + assertEquals('ABORTED', txn.TransactionStatus) + } + } +}