diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java index cc4a3c09d9cb7b..801f3166861497 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java @@ -94,19 +94,13 @@ private void checkToAddCluster(Map remoteClusterIdToPB, Set newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap(); String clusterName = remoteClusterIdToPB.get(addId).getClusterName(); String clusterId = remoteClusterIdToPB.get(addId).getClusterId(); String publicEndpoint = remoteClusterIdToPB.get(addId).getPublicEndpoint(); String privateEndpoint = remoteClusterIdToPB.get(addId).getPrivateEndpoint(); - newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName); - newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId); - newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, publicEndpoint); - newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, privateEndpoint); // For old versions that do no have status field set ClusterStatus clusterStatus = remoteClusterIdToPB.get(addId).hasClusterStatus() ? remoteClusterIdToPB.get(addId).getClusterStatus() : ClusterStatus.NORMAL; - newTagMap.put(Tag.CLOUD_CLUSTER_STATUS, String.valueOf(clusterStatus)); MetricRepo.registerCloudMetrics(clusterId, clusterName); //toAdd.forEach(i -> i.setTagMap(newTagMap)); List toAdd = new ArrayList<>(); @@ -117,6 +111,12 @@ private void checkToAddCluster(Map remoteClusterIdToPB, Set newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap(); + newTagMap.put(Tag.CLOUD_CLUSTER_STATUS, String.valueOf(clusterStatus)); + newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName); + newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId); + newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, publicEndpoint); + newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, privateEndpoint); newTagMap.put(Tag.CLOUD_UNIQUE_ID, node.getCloudUniqueId()); b.setTagMap(newTagMap); toAdd.add(b); @@ -250,13 +250,6 @@ private void checkDiffNode(Map remoteClusterIdToPB, updateStatus(currentBes, expectedBes); - // Attach tag to BEs - Map newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap(); - newTagMap.put(Tag.CLOUD_CLUSTER_NAME, remoteClusterIdToPB.get(cid).getClusterName()); - newTagMap.put(Tag.CLOUD_CLUSTER_ID, remoteClusterIdToPB.get(cid).getClusterId()); - newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, remoteClusterIdToPB.get(cid).getPublicEndpoint()); - newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, remoteClusterIdToPB.get(cid).getPrivateEndpoint()); - diffNodes(toAdd, toDel, () -> { Map currentMap = new HashMap<>(); for (Backend be : currentBes) { @@ -280,6 +273,14 @@ private void checkDiffNode(Map remoteClusterIdToPB, if (node.hasIsSmoothUpgrade()) { b.setSmoothUpgradeDst(node.getIsSmoothUpgrade()); } + + // Attach tag to BEs + Map newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap(); + newTagMap.put(Tag.CLOUD_CLUSTER_NAME, remoteClusterIdToPB.get(cid).getClusterName()); + newTagMap.put(Tag.CLOUD_CLUSTER_ID, remoteClusterIdToPB.get(cid).getClusterId()); + newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, remoteClusterIdToPB.get(cid).getPublicEndpoint()); + newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, + remoteClusterIdToPB.get(cid).getPrivateEndpoint()); newTagMap.put(Tag.CLOUD_UNIQUE_ID, node.getCloudUniqueId()); b.setTagMap(newTagMap); nodeMap.put(endpoint, b); @@ -350,8 +351,8 @@ private void checkFeNodesMapValid() { } private void getCloudObserverFes() { - Cloud.GetClusterResponse response = CloudSystemInfoService - .getCloudCluster(Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, ""); + Cloud.GetClusterResponse response = cloudSystemInfoService.getCloudCluster( + Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, ""); if (!response.hasStatus() || !response.getStatus().hasCode() || response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { LOG.warn("failed to get cloud cluster due to incomplete response, " @@ -416,7 +417,7 @@ private void getCloudObserverFes() { return; } try { - CloudSystemInfoService.updateFrontends(toAdd, toDel); + cloudSystemInfoService.updateFrontends(toAdd, toDel); } catch (DdlException e) { LOG.warn("update cloud frontends exception e: {}, msg: {}", e, e.getMessage()); } @@ -426,7 +427,7 @@ private void getCloudBackends() { Map> clusterIdToBackend = cloudSystemInfoService.getCloudClusterIdToBackend(); //rpc to ms, to get mysql user can use cluster_id // NOTE: rpc args all empty, use cluster_unique_id to get a instance's all cluster info. - Cloud.GetClusterResponse response = CloudSystemInfoService.getCloudCluster("", "", ""); + Cloud.GetClusterResponse response = cloudSystemInfoService.getCloudCluster("", "", ""); if (!response.hasStatus() || !response.getStatus().hasCode() || (response.getStatus().getCode() != Cloud.MetaServiceCode.OK && response.getStatus().getCode() != MetaServiceCode.CLUSTER_NOT_FOUND)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java index 613fef3be685f5..7c37f1dbcffa98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java @@ -83,8 +83,8 @@ public static String genFeNodeNameFromMeta(String host, int port, long timeMs) { private Cloud.NodeInfoPB getLocalTypeFromMetaService() { // get helperNodes from ms - Cloud.GetClusterResponse response = CloudSystemInfoService.getCloudCluster( - Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, ""); + Cloud.GetClusterResponse response = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudCluster(Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, ""); if (!response.hasStatus() || !response.getStatus().hasCode() || response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { LOG.warn("failed to get cloud cluster due to incomplete response, " @@ -392,7 +392,7 @@ public void checkCloudClusterPriv(String clusterName) throws DdlException { public void changeCloudCluster(String clusterName, ConnectContext ctx) throws DdlException { checkCloudClusterPriv(clusterName); - CloudSystemInfoService.waitForAutoStart(clusterName); + ((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStart(clusterName); try { ((CloudSystemInfoService) Env.getCurrentSystemInfo()).addCloudCluster(clusterName, ""); } catch (UserException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index a51e332a784218..aebc66128ed844 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -163,7 +163,7 @@ private long getBackendIdImpl(String cluster) { // if cluster is SUSPENDED, wait try { - CloudSystemInfoService.waitForAutoStart(cluster); + ((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStart(cluster); } catch (DdlException e) { // this function cant throw exception. so just log it LOG.warn("cant resume cluster {}", cluster); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java index 1d0bfc23f6c2de..5caa2108c592fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.catalog.Env; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; @@ -33,14 +34,14 @@ public CloudLoadManager(LoadJobScheduler loadJobScheduler) { @Override public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException, UserException { - CloudSystemInfoService.waitForAutoStartCurrentCluster(); + ((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStartCurrentCluster(); return super.createLoadJobFromStmt(stmt); } @Override public long createLoadJobFromStmt(InsertStmt stmt) throws DdlException { - CloudSystemInfoService.waitForAutoStartCurrentCluster(); + ((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStartCurrentCluster(); return super.createLoadJobFromStmt(stmt); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index 7171fbafd16267..59ba7340dfc65c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -89,7 +89,7 @@ public Pair>, TStorageMedium> selectBackendIdsForReplicaCrea * @param clusterId cluster id * @return */ - public static Cloud.GetClusterResponse getCloudCluster(String clusterName, String clusterId, String userName) { + public Cloud.GetClusterResponse getCloudCluster(String clusterName, String clusterId, String userName) { Cloud.GetClusterRequest.Builder builder = Cloud.GetClusterRequest.newBuilder(); builder.setCloudUniqueId(Config.cloud_unique_id) .setClusterName(clusterName).setClusterId(clusterId).setMysqlUserName(userName); @@ -261,8 +261,8 @@ public void updateCloudClusterMap(List toAdd, List toDel) { } - public static synchronized void updateFrontends(List toAdd, - List toDel) throws DdlException { + public synchronized void updateFrontends(List toAdd, List toDel) + throws DdlException { if (LOG.isDebugEnabled()) { LOG.debug("updateCloudFrontends toAdd={} toDel={}", toAdd, toDel); } @@ -570,7 +570,7 @@ public void setInstanceStatus(InstanceInfoPB.Status instanceStatus) { this.instanceStatus = instanceStatus; } - public static void waitForAutoStartCurrentCluster() throws DdlException { + public void waitForAutoStartCurrentCluster() throws DdlException { ConnectContext context = ConnectContext.get(); if (context != null) { String cloudCluster = context.getCloudCluster(); @@ -580,7 +580,7 @@ public static void waitForAutoStartCurrentCluster() throws DdlException { } } - public static String getClusterNameAutoStart(final String clusterName) { + public String getClusterNameAutoStart(final String clusterName) { if (!Strings.isNullOrEmpty(clusterName)) { return clusterName; } @@ -607,7 +607,7 @@ public static String getClusterNameAutoStart(final String clusterName) { return cloudClusterTypeAndName.clusterName; } - public static void waitForAutoStart(String clusterName) throws DdlException { + public void waitForAutoStart(String clusterName) throws DdlException { if (Config.isNotCloudMode()) { return; } @@ -616,7 +616,7 @@ public static void waitForAutoStart(String clusterName) throws DdlException { LOG.warn("auto start in cloud mode, but clusterName empty {}", clusterName); return; } - String clusterStatus = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudStatusByName(clusterName); + String clusterStatus = getCloudStatusByName(clusterName); if (Strings.isNullOrEmpty(clusterStatus)) { // for cluster rename or cluster dropped LOG.warn("cant find clusterStatus in fe, clusterName {}", clusterName); @@ -631,8 +631,7 @@ public static void waitForAutoStart(String clusterName) throws DdlException { builder.setOp(Cloud.AlterClusterRequest.Operation.SET_CLUSTER_STATUS); Cloud.ClusterPB.Builder clusterBuilder = Cloud.ClusterPB.newBuilder(); - clusterBuilder.setClusterId(((CloudSystemInfoService) - Env.getCurrentSystemInfo()).getCloudClusterIdByName(clusterName)); + clusterBuilder.setClusterId(getCloudClusterIdByName(clusterName)); clusterBuilder.setClusterStatus(Cloud.ClusterStatus.TO_RESUME); builder.setCluster(clusterBuilder); @@ -671,7 +670,7 @@ public static void waitForAutoStart(String clusterName) throws DdlException { } catch (InterruptedException e) { LOG.info("change cluster sleep wait InterruptedException: ", e); } - clusterStatus = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudStatusByName(clusterName); + clusterStatus = getCloudStatusByName(clusterName); } if (retryTime >= retryTimes) { // auto start timeout diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 40af03d9237d51..783323b03fac72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -820,7 +820,8 @@ private void handleQueryWithRetry(TUniqueId queryId) throws Exception { deadCloudClusterStatus); if (Strings.isNullOrEmpty(deadCloudClusterStatus) || ClusterStatus.valueOf(deadCloudClusterStatus) != ClusterStatus.NORMAL) { - CloudSystemInfoService.waitForAutoStart(deadCloudClusterClusterName); + ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .waitForAutoStart(deadCloudClusterClusterName); } } } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy index 56c80e88a40857..606b9bc4ac8922 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy @@ -54,6 +54,7 @@ class StreamLoadAction implements SuiteAction { Map headers SuiteContext context boolean directToBe = false + boolean twoPhaseCommit = false StreamLoadAction(SuiteContext context) { this.address = context.getFeHttpAddress() @@ -137,6 +138,22 @@ class StreamLoadAction implements SuiteAction { this.time = time.call() } + void twoPhaseCommit(boolean twoPhaseCommit) { + this.twoPhaseCommit = twoPhaseCommit; + } + + void twoPhaseCommit(Closure twoPhaseCommit) { + this.twoPhaseCommit = twoPhaseCommit.call(); + } + + // compatible with selectdb case + void isCloud(boolean isCloud) { + } + + // compatible with selectdb case + void isCloud(Closure isCloud) { + } + void check(@ClosureParams(value = FromString, options = ["String,Throwable,Long,Long"]) Closure check) { this.check = check } @@ -156,8 +173,14 @@ class StreamLoadAction implements SuiteAction { long startTime = System.currentTimeMillis() def isHttpStream = headers.containsKey("version") try { - def uri = isHttpStream ? "http://${address.hostString}:${address.port}/api/_http_stream" - : "http://${address.hostString}:${address.port}/api/${db}/${table}/_stream_load" + def uri = "" + if (isHttpStream) { + uri = "http://${address.hostString}:${address.port}/api/_http_stream" + } else if (twoPhaseCommit) { + uri = "http://${address.hostString}:${address.port}/api/${db}/_stream_load_2pc" + } else { + uri = "http://${address.hostString}:${address.port}/api/${db}/${table}/_stream_load" + } HttpClients.createDefault().withCloseable { client -> RequestBuilder requestBuilder = prepareRequestHeader(RequestBuilder.put(uri)) HttpEntity httpEntity = prepareHttpEntity(client) @@ -362,6 +385,10 @@ class StreamLoadAction implements SuiteAction { def jsonSlurper = new JsonSlurper() def parsed = jsonSlurper.parseText(responseText) String status = parsed.Status + if (twoPhaseCommit) { + status = parsed.status + return status; + } long txnId = parsed.TxnId if (!status.equalsIgnoreCase("Publish Timeout")) { return status;