Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](merge cloud) Fix cloud be set be tag map #32864

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,13 @@ private void checkToAddCluster(Map<String, ClusterPB> remoteClusterIdToPB, Set<S
LOG.debug("begin to add clusterId: {}", addId);
}
// Attach tag to BEs
Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
String clusterName = remoteClusterIdToPB.get(addId).getClusterName();
String clusterId = remoteClusterIdToPB.get(addId).getClusterId();
String publicEndpoint = remoteClusterIdToPB.get(addId).getPublicEndpoint();
String privateEndpoint = remoteClusterIdToPB.get(addId).getPrivateEndpoint();
newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, publicEndpoint);
newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, privateEndpoint);
// For old versions that do no have status field set
ClusterStatus clusterStatus = remoteClusterIdToPB.get(addId).hasClusterStatus()
? remoteClusterIdToPB.get(addId).getClusterStatus() : ClusterStatus.NORMAL;
newTagMap.put(Tag.CLOUD_CLUSTER_STATUS, String.valueOf(clusterStatus));
MetricRepo.registerCloudMetrics(clusterId, clusterName);
//toAdd.forEach(i -> i.setTagMap(newTagMap));
List<Backend> toAdd = new ArrayList<>();
Expand All @@ -117,6 +111,12 @@ private void checkToAddCluster(Map<String, ClusterPB> remoteClusterIdToPB, Set<S
continue;
}
Backend b = new Backend(Env.getCurrentEnv().getNextId(), addr, node.getHeartbeatPort());
Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
newTagMap.put(Tag.CLOUD_CLUSTER_STATUS, String.valueOf(clusterStatus));
newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, publicEndpoint);
newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, privateEndpoint);
newTagMap.put(Tag.CLOUD_UNIQUE_ID, node.getCloudUniqueId());
b.setTagMap(newTagMap);
toAdd.add(b);
Expand Down Expand Up @@ -250,13 +250,6 @@ private void checkDiffNode(Map<String, ClusterPB> remoteClusterIdToPB,

updateStatus(currentBes, expectedBes);

// Attach tag to BEs
Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
newTagMap.put(Tag.CLOUD_CLUSTER_NAME, remoteClusterIdToPB.get(cid).getClusterName());
newTagMap.put(Tag.CLOUD_CLUSTER_ID, remoteClusterIdToPB.get(cid).getClusterId());
newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, remoteClusterIdToPB.get(cid).getPublicEndpoint());
newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, remoteClusterIdToPB.get(cid).getPrivateEndpoint());

diffNodes(toAdd, toDel, () -> {
Map<String, Backend> currentMap = new HashMap<>();
for (Backend be : currentBes) {
Expand All @@ -280,6 +273,14 @@ private void checkDiffNode(Map<String, ClusterPB> remoteClusterIdToPB,
if (node.hasIsSmoothUpgrade()) {
b.setSmoothUpgradeDst(node.getIsSmoothUpgrade());
}

// Attach tag to BEs
Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
newTagMap.put(Tag.CLOUD_CLUSTER_NAME, remoteClusterIdToPB.get(cid).getClusterName());
newTagMap.put(Tag.CLOUD_CLUSTER_ID, remoteClusterIdToPB.get(cid).getClusterId());
newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, remoteClusterIdToPB.get(cid).getPublicEndpoint());
newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
remoteClusterIdToPB.get(cid).getPrivateEndpoint());
newTagMap.put(Tag.CLOUD_UNIQUE_ID, node.getCloudUniqueId());
b.setTagMap(newTagMap);
nodeMap.put(endpoint, b);
Expand Down Expand Up @@ -350,8 +351,8 @@ private void checkFeNodesMapValid() {
}

private void getCloudObserverFes() {
Cloud.GetClusterResponse response = CloudSystemInfoService
.getCloudCluster(Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, "");
Cloud.GetClusterResponse response = cloudSystemInfoService.getCloudCluster(
Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, "");
if (!response.hasStatus() || !response.getStatus().hasCode()
|| response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("failed to get cloud cluster due to incomplete response, "
Expand Down Expand Up @@ -416,7 +417,7 @@ private void getCloudObserverFes() {
return;
}
try {
CloudSystemInfoService.updateFrontends(toAdd, toDel);
cloudSystemInfoService.updateFrontends(toAdd, toDel);
} catch (DdlException e) {
LOG.warn("update cloud frontends exception e: {}, msg: {}", e, e.getMessage());
}
Expand All @@ -426,7 +427,7 @@ private void getCloudBackends() {
Map<String, List<Backend>> clusterIdToBackend = cloudSystemInfoService.getCloudClusterIdToBackend();
//rpc to ms, to get mysql user can use cluster_id
// NOTE: rpc args all empty, use cluster_unique_id to get a instance's all cluster info.
Cloud.GetClusterResponse response = CloudSystemInfoService.getCloudCluster("", "", "");
Cloud.GetClusterResponse response = cloudSystemInfoService.getCloudCluster("", "", "");
if (!response.hasStatus() || !response.getStatus().hasCode()
|| (response.getStatus().getCode() != Cloud.MetaServiceCode.OK
&& response.getStatus().getCode() != MetaServiceCode.CLUSTER_NOT_FOUND)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public static String genFeNodeNameFromMeta(String host, int port, long timeMs) {

private Cloud.NodeInfoPB getLocalTypeFromMetaService() {
// get helperNodes from ms
Cloud.GetClusterResponse response = CloudSystemInfoService.getCloudCluster(
Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, "");
Cloud.GetClusterResponse response = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudCluster(Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, "");
if (!response.hasStatus() || !response.getStatus().hasCode()
|| response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("failed to get cloud cluster due to incomplete response, "
Expand Down Expand Up @@ -392,7 +392,7 @@ public void checkCloudClusterPriv(String clusterName) throws DdlException {

public void changeCloudCluster(String clusterName, ConnectContext ctx) throws DdlException {
checkCloudClusterPriv(clusterName);
CloudSystemInfoService.waitForAutoStart(clusterName);
((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStart(clusterName);
try {
((CloudSystemInfoService) Env.getCurrentSystemInfo()).addCloudCluster(clusterName, "");
} catch (UserException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private long getBackendIdImpl(String cluster) {

// if cluster is SUSPENDED, wait
try {
CloudSystemInfoService.waitForAutoStart(cluster);
((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStart(cluster);
} catch (DdlException e) {
// this function cant throw exception. so just log it
LOG.warn("cant resume cluster {}", cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
Expand All @@ -33,14 +34,14 @@ public CloudLoadManager(LoadJobScheduler loadJobScheduler) {

@Override
public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException, UserException {
CloudSystemInfoService.waitForAutoStartCurrentCluster();
((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStartCurrentCluster();

return super.createLoadJobFromStmt(stmt);
}

@Override
public long createLoadJobFromStmt(InsertStmt stmt) throws DdlException {
CloudSystemInfoService.waitForAutoStartCurrentCluster();
((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStartCurrentCluster();

return super.createLoadJobFromStmt(stmt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public Pair<Map<Tag, List<Long>>, TStorageMedium> selectBackendIdsForReplicaCrea
* @param clusterId cluster id
* @return
*/
public static Cloud.GetClusterResponse getCloudCluster(String clusterName, String clusterId, String userName) {
public Cloud.GetClusterResponse getCloudCluster(String clusterName, String clusterId, String userName) {
Cloud.GetClusterRequest.Builder builder = Cloud.GetClusterRequest.newBuilder();
builder.setCloudUniqueId(Config.cloud_unique_id)
.setClusterName(clusterName).setClusterId(clusterId).setMysqlUserName(userName);
Expand Down Expand Up @@ -261,8 +261,8 @@ public void updateCloudClusterMap(List<Backend> toAdd, List<Backend> toDel) {
}


public static synchronized void updateFrontends(List<Frontend> toAdd,
List<Frontend> toDel) throws DdlException {
public synchronized void updateFrontends(List<Frontend> toAdd, List<Frontend> toDel)
throws DdlException {
if (LOG.isDebugEnabled()) {
LOG.debug("updateCloudFrontends toAdd={} toDel={}", toAdd, toDel);
}
Expand Down Expand Up @@ -570,7 +570,7 @@ public void setInstanceStatus(InstanceInfoPB.Status instanceStatus) {
this.instanceStatus = instanceStatus;
}

public static void waitForAutoStartCurrentCluster() throws DdlException {
public void waitForAutoStartCurrentCluster() throws DdlException {
ConnectContext context = ConnectContext.get();
if (context != null) {
String cloudCluster = context.getCloudCluster();
Expand All @@ -580,7 +580,7 @@ public static void waitForAutoStartCurrentCluster() throws DdlException {
}
}

public static String getClusterNameAutoStart(final String clusterName) {
public String getClusterNameAutoStart(final String clusterName) {
if (!Strings.isNullOrEmpty(clusterName)) {
return clusterName;
}
Expand All @@ -607,7 +607,7 @@ public static String getClusterNameAutoStart(final String clusterName) {
return cloudClusterTypeAndName.clusterName;
}

public static void waitForAutoStart(String clusterName) throws DdlException {
public void waitForAutoStart(String clusterName) throws DdlException {
if (Config.isNotCloudMode()) {
return;
}
Expand All @@ -616,7 +616,7 @@ public static void waitForAutoStart(String clusterName) throws DdlException {
LOG.warn("auto start in cloud mode, but clusterName empty {}", clusterName);
return;
}
String clusterStatus = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudStatusByName(clusterName);
String clusterStatus = getCloudStatusByName(clusterName);
if (Strings.isNullOrEmpty(clusterStatus)) {
// for cluster rename or cluster dropped
LOG.warn("cant find clusterStatus in fe, clusterName {}", clusterName);
Expand All @@ -631,8 +631,7 @@ public static void waitForAutoStart(String clusterName) throws DdlException {
builder.setOp(Cloud.AlterClusterRequest.Operation.SET_CLUSTER_STATUS);

Cloud.ClusterPB.Builder clusterBuilder = Cloud.ClusterPB.newBuilder();
clusterBuilder.setClusterId(((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudClusterIdByName(clusterName));
clusterBuilder.setClusterId(getCloudClusterIdByName(clusterName));
clusterBuilder.setClusterStatus(Cloud.ClusterStatus.TO_RESUME);
builder.setCluster(clusterBuilder);

Expand Down Expand Up @@ -671,7 +670,7 @@ public static void waitForAutoStart(String clusterName) throws DdlException {
} catch (InterruptedException e) {
LOG.info("change cluster sleep wait InterruptedException: ", e);
}
clusterStatus = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudStatusByName(clusterName);
clusterStatus = getCloudStatusByName(clusterName);
}
if (retryTime >= retryTimes) {
// auto start timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,8 @@ private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
deadCloudClusterStatus);
if (Strings.isNullOrEmpty(deadCloudClusterStatus)
|| ClusterStatus.valueOf(deadCloudClusterStatus) != ClusterStatus.NORMAL) {
CloudSystemInfoService.waitForAutoStart(deadCloudClusterClusterName);
((CloudSystemInfoService) Env.getCurrentSystemInfo())
.waitForAutoStart(deadCloudClusterClusterName);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class StreamLoadAction implements SuiteAction {
Map<String, String> headers
SuiteContext context
boolean directToBe = false
boolean twoPhaseCommit = false

StreamLoadAction(SuiteContext context) {
this.address = context.getFeHttpAddress()
Expand Down Expand Up @@ -137,6 +138,22 @@ class StreamLoadAction implements SuiteAction {
this.time = time.call()
}

void twoPhaseCommit(boolean twoPhaseCommit) {
this.twoPhaseCommit = twoPhaseCommit;
}

void twoPhaseCommit(Closure<Boolean> twoPhaseCommit) {
this.twoPhaseCommit = twoPhaseCommit.call();
}

// compatible with selectdb case
void isCloud(boolean isCloud) {
}

// compatible with selectdb case
void isCloud(Closure<Boolean> isCloud) {
}

void check(@ClosureParams(value = FromString, options = ["String,Throwable,Long,Long"]) Closure check) {
this.check = check
}
Expand All @@ -156,8 +173,14 @@ class StreamLoadAction implements SuiteAction {
long startTime = System.currentTimeMillis()
def isHttpStream = headers.containsKey("version")
try {
def uri = isHttpStream ? "http://${address.hostString}:${address.port}/api/_http_stream"
: "http://${address.hostString}:${address.port}/api/${db}/${table}/_stream_load"
def uri = ""
if (isHttpStream) {
uri = "http://${address.hostString}:${address.port}/api/_http_stream"
} else if (twoPhaseCommit) {
uri = "http://${address.hostString}:${address.port}/api/${db}/_stream_load_2pc"
} else {
uri = "http://${address.hostString}:${address.port}/api/${db}/${table}/_stream_load"
}
HttpClients.createDefault().withCloseable { client ->
RequestBuilder requestBuilder = prepareRequestHeader(RequestBuilder.put(uri))
HttpEntity httpEntity = prepareHttpEntity(client)
Expand Down Expand Up @@ -362,6 +385,10 @@ class StreamLoadAction implements SuiteAction {
def jsonSlurper = new JsonSlurper()
def parsed = jsonSlurper.parseText(responseText)
String status = parsed.Status
if (twoPhaseCommit) {
status = parsed.status
return status;
}
long txnId = parsed.TxnId
if (!status.equalsIgnoreCase("Publish Timeout")) {
return status;
Expand Down
Loading