Skip to content

Commit

Permalink
[refactor](stream-load) Adjust the content of the review Added direct…
Browse files Browse the repository at this point in the history
… load via benodes (apache#177)
  • Loading branch information
MYiYang authored Jan 4, 2024
1 parent 6ed3c61 commit 95c278d
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
public interface ConfigurationOptions {
// doris fe node address
String DORIS_FENODES = "doris.fenodes";

String DORIS_BENODES = "doris.benodes";
String DORIS_QUERY_PORT = "doris.query.port";

String DORIS_DEFAULT_CLUSTER = "default_cluster";
Expand Down Expand Up @@ -124,4 +126,7 @@ public interface ConfigurationOptions {
*/
String DORIS_SINK_AUTO_REDIRECT = "doris.sink.auto-redirect";
boolean DORIS_SINK_AUTO_REDIRECT_DEFAULT = false;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class DorisStreamLoad implements Serializable {
private static final long cacheExpireTimeout = 4 * 60;
private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
private final String fenodes;
private final String benodes;
private final DataFormat dataFormat;
private String FIELD_DELIMITER;
private final String LINE_DELIMITER;
Expand All @@ -112,6 +113,7 @@ public DorisStreamLoad(SparkSettings settings) {
this.db = dbTable[0];
this.tbl = dbTable[1];
this.fenodes = settings.getProperty(ConfigurationOptions.DORIS_FENODES);
this.benodes = settings.getProperty(ConfigurationOptions.DORIS_BENODES);
String user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
String passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
this.authEncoded = getAuthEncoded(user, passwd);
Expand Down Expand Up @@ -421,7 +423,8 @@ private String getBackend() {
return backend.getIp() + ":" + backend.getHttpPort();
} catch (ExecutionException e) {
throw new RuntimeException("get backends info fail", e);
} catch (IllegalArgumentException e) {
}
catch (IllegalArgumentException e) {
throw new RuntimeException("get frontend info fail", e);
}
}
Expand All @@ -444,9 +447,8 @@ public BackendCacheLoader(SparkSettings settings) {

@Override
public List<BackendV2.BackendRowV2> load(String key) throws Exception {
return RestService.getBackendRows(settings, LOG);
return RestService.getBackendRows(settings, LOG);
}

}

private String generateLoadLabel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLE_IDENTIFIER;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_BENODES;
import static org.apache.doris.spark.util.ErrorMessages.CONNECT_FAILED_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
Expand Down Expand Up @@ -517,10 +518,31 @@ static int tabletCountLimitForOnePartition(Settings cfg, Logger logger) {
@Deprecated
@VisibleForTesting
public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException {
return getBackend(sparkSettings, logger);
}

@Deprecated
@VisibleForTesting
public static String beBackend(SparkSettings sparkSettings , Logger logger) throws DorisException {
return getBackend(sparkSettings, logger);
}

/**
* choice a Doris BE node to request.
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException {
return getBackend(sparkSettings, logger);
}

private static String getBackend(SparkSettings sparkSettings, Logger logger) throws DorisException {
List<BackendV2.BackendRowV2> backends = getBackendRows(sparkSettings, logger);
Collections.shuffle(backends);
BackendV2.BackendRowV2 backend = backends.get(0);
return backend.getIp()+ ":" + backend.getHttpPort();
return backend.getIp() + ":" + backend.getHttpPort();
}

/**
Expand Down Expand Up @@ -561,48 +583,72 @@ static List<BackendRow> parseBackend(String response, Logger logger) throws Dori
}

/**
* get Doris BE node list.
* get Doris BE nodes.
* @param logger slf4j logger
* @return the Doris BE node list
* @return the Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
public static List<BackendV2.BackendRowV2> getBackendRows(SparkSettings sparkSettings, Logger logger) throws DorisException {
List<String> feNodeList = allEndpoints(sparkSettings.getProperty(DORIS_FENODES), logger);
for (String feNode : feNodeList){
public static List<BackendV2.BackendRowV2> getBeNodes(SparkSettings sparkSettings, Logger logger) throws DorisException {
List<String> backends = allBeEndpoints(sparkSettings.getProperty(DORIS_BENODES),logger);
List<BackendV2.BackendRowV2> backendRowV2s = new ArrayList<BackendV2.BackendRowV2>();
if (backends == null || backends.isEmpty()) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "benodes", backends);
throw new IllegalArgumentException("benodes", String.valueOf(backends));
}
BackendV2.BackendRowV2 backendRowV2 = new BackendV2.BackendRowV2();
for (int i = 0; i < backends.size(); i++) {
String ip = backends.get(i).substring(0,backends.get(i).indexOf(":"));
try {
String beUrl = String.format("http://%s" + BACKENDS_V2, feNode);
HttpGet httpGet = new HttpGet(beUrl);
String response = send(sparkSettings, httpGet, logger);
logger.info("Backend Info:{}", response);
List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
throw new IllegalArgumentException("beNodes", String.valueOf(backends));
}
return backends;
} catch (ConnectedFailedException e) {
logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage());
Integer port = Integer.valueOf(backends.get(i).substring(backends.get(i).indexOf(":")+1,backends.get(i).length()));
/**
* By default, the BE port you enter is is_alive=true
*/
BackendV2.BackendRowV2 backend = backendRowV2.of(ip,port,true);
backendRowV2s.add(backend);
} catch (NumberFormatException e) {
logger.error("Doris BE is port error, please check configuration");
throw new RuntimeException(e);
}
}
String errMsg = "No Doris FE is available, please check configuration";
logger.error(errMsg);
throw new DorisException(errMsg);
return backendRowV2s;
}

/**
* choice a Doris BE node to request.
* get Doris BE node list.
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @return the Doris BE node list
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException {
List<BackendV2.BackendRowV2> backends = getBackendRows(sparkSettings, logger);
Collections.shuffle(backends);
BackendV2.BackendRowV2 backend = backends.get(0);
return backend.getIp() + ":" + backend.getHttpPort();
public static List<BackendV2.BackendRowV2> getBackendRows(SparkSettings sparkSettings, Logger logger) throws DorisException {
/**
* If the specified BE does not exist, the FE mode is used
*/
if(notBeNode(sparkSettings,logger)){
List<String> feNodeList = allEndpoints(sparkSettings.getProperty(DORIS_FENODES), logger);
for (String feNode : feNodeList){
try {
String beUrl = String.format("http://%s" + BACKENDS_V2, feNode);
HttpGet httpGet = new HttpGet(beUrl);
String response = send(sparkSettings, httpGet, logger);
logger.info("Backend Info:{}", response);
List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
logger.trace("Parse benodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "benodes", backends);
throw new IllegalArgumentException("benodes", String.valueOf(backends));
}
return backends;
} catch (ConnectedFailedException e) {
logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage());
}
}
String errMsg = "No Doris FE is available, please check configuration";
logger.error(errMsg);
throw new DorisException(errMsg);
}else {
return getBeNodes(sparkSettings, logger);
}
}

static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException {
Expand Down Expand Up @@ -646,7 +692,7 @@ static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logge
*/
@VisibleForTesting
static List<PartitionDefinition> tabletsMapToPartition(Settings cfg, Map<String, List<Long>> be2Tablets,
String opaquedQueryPlan, String database, String table, Logger logger)
String opaquedQueryPlan, String database, String table, Logger logger)
throws IllegalArgumentException {
int tabletsSize = tabletCountLimitForOnePartition(cfg, logger);
List<PartitionDefinition> partitions = new ArrayList<>();
Expand Down Expand Up @@ -689,4 +735,41 @@ static List<String> allEndpoints(String feNodes, Logger logger) throws IllegalAr
Collections.shuffle(nodes);
return nodes;
}
}

/**
* choice a Doris BE node to request.
*
* @param beNodes Doris BE node list, separate be comma
* @param logger slf4j logger
* @return the array of Doris FE nodes
* @throws IllegalArgumentException fe nodes is illegal
*/
@VisibleForTesting
static List<String> allBeEndpoints(String beNodes, Logger logger) throws IllegalArgumentException {
logger.trace("Parse benodes '{}'.", beNodes);
if (StringUtils.isEmpty(beNodes)) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "benodes", beNodes);
throw new IllegalArgumentException("benodes", beNodes);
}
List<String> nodes = Arrays.stream(beNodes.split(",")).map(String::trim).collect(Collectors.toList());
Collections.shuffle(nodes);
return nodes;
}


/**
* Doris BE node is not
* @param logger slf4j logger
* @return Doris BE node Yes or no
*/
public static Boolean notBeNode(SparkSettings sparkSettings, Logger logger){
String beNodes = sparkSettings.getProperty(DORIS_BENODES);
if(null == beNodes){
return true;
}else {
return false;
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendV2 {


@JsonProperty(value = "backends")
private List<BackendRowV2> backends;

Expand All @@ -37,7 +38,11 @@ public void setRows(List<BackendRowV2> rows) {
this.backends = rows;
}

public static class BackendRowV2 {
public static class BackendRowV2 implements Cloneable{

public BackendRowV2(){
super();
}
@JsonProperty("ip")
public String ip;
@JsonProperty("http_port")
Expand Down Expand Up @@ -68,5 +73,12 @@ public boolean isAlive() {
public void setAlive(boolean alive) {
isAlive = alive;
}
public static BackendRowV2 of(String ip, int httpPort, boolean alive) {
BackendRowV2 rowV2 = new BackendRowV2();
rowV2.setIp(ip);
rowV2.setHttpPort(httpPort);
rowV2.setAlive(alive);
return rowV2;
}
}
}
}

0 comments on commit 95c278d

Please sign in to comment.