diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 023607af..4c8644c5 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -26,7 +26,7 @@
23
org.apache.doris
- spark-doris-connector-${spark.minor.version}_${scala.version}
+ spark-doris-connector-${spark.major.version}_${scala.version}
1.0.0-SNAPSHOT
Spark Doris Connector
https://doris.apache.org/
@@ -64,7 +64,7 @@
${env.scala.version}
${env.spark.version}
- ${env.spark.minor.version}
+ ${env.spark.major.version}
0.13.0
5.0.0
3.8.1
@@ -285,6 +285,7 @@
process-resources
compile
+ add-source
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index f375c769..67684aaf 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -47,6 +47,8 @@
import java.util.Calendar;
import java.util.Properties;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
/**
* DorisStreamLoad
@@ -79,7 +81,7 @@ public DorisStreamLoad(String hostPort, String db, String tbl, String user, Stri
this.user = user;
this.passwd = passwd;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
- this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+ this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(UTF_8));
}
public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
@@ -91,7 +93,7 @@ public DorisStreamLoad(SparkSettings settings) throws IOException, DorisExceptio
this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
- this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+ this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(UTF_8));
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
@@ -112,7 +114,7 @@ public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOExce
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
- this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+ this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(UTF_8));
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
this.dfColumns = dfColumns;
@@ -257,20 +259,21 @@ private LoadResponse loadBatch(String value) {
HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
+ BufferedReader br = null;
int status = -1;
try {
// build request and send to new be location
beConn = getConnection(loadUrlStr, label);
// send data to be
BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
- bos.write(value.getBytes("UTF-8"));
+ bos.write(value.getBytes(UTF_8));
bos.close();
// get respond
status = beConn.getResponseCode();
String respMsg = beConn.getResponseMessage();
InputStream stream = (InputStream) beConn.getContent();
- BufferedReader br = new BufferedReader(new InputStreamReader(stream));
+ br = new BufferedReader(new InputStreamReader(stream, UTF_8));
StringBuilder response = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
@@ -290,6 +293,13 @@ private LoadResponse loadBatch(String value) {
if (beConn != null) {
beConn.disconnect();
}
+ if( br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ LOG.warn("Exception occurred during closing BufferedReader, {}", e.getMessage());
+ }
+ }
}
}
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index 03643b2e..cd262f6c 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -31,10 +31,9 @@ import org.apache.doris.spark.rest.PartitionDefinition
import org.apache.doris.spark.rest.models.Schema
import org.apache.doris.spark.serialization.{Routing, RowBatch}
import org.apache.doris.spark.sql.SchemaUtils
-import org.apache.doris.spark.util.ErrorMessages
import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE
import org.apache.doris.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult}
-import org.apache.log4j.Logger
+import org.apache.spark.internal.Logging
import scala.util.control.Breaks
@@ -43,8 +42,7 @@ import scala.util.control.Breaks
* @param partition Doris RDD partition
* @param settings request configuration
*/
-class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
- private val logger = Logger.getLogger(classOf[ScalaValueReader])
+class ScalaValueReader(partition: PartitionDefinition, settings: Settings) extends Logging {
protected val client = new BackendClient(new Routing(partition.getBeAddress), settings)
protected val clientLock =
@@ -57,7 +55,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try {
settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC, DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean
} getOrElse {
- logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC))
+ logWarning(s"Parse '${DORIS_DESERIALIZE_ARROW_ASYNC}' to boolean failed. Original string is '${settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)}'." )
DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT
}
@@ -65,7 +63,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
val blockingQueueSize = Try {
settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE, DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString).toInt
} getOrElse {
- logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE))
+ logWarning(s"Parse '${DORIS_DESERIALIZE_QUEUE_SIZE}' to number failed. Original string is '${settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE)}'.")
DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT
}
@@ -89,21 +87,21 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
val batchSize = Try {
settings.getProperty(DORIS_BATCH_SIZE, DORIS_BATCH_SIZE_DEFAULT.toString).toInt
} getOrElse {
- logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, settings.getProperty(DORIS_BATCH_SIZE))
+ logWarning(s"Parse '${DORIS_BATCH_SIZE}' to number failed. Original string is '${settings.getProperty(DORIS_BATCH_SIZE)}'." )
DORIS_BATCH_SIZE_DEFAULT
}
val queryDorisTimeout = Try {
settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S, DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString).toInt
} getOrElse {
- logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S))
+ logWarning(s"Parse '${DORIS_REQUEST_QUERY_TIMEOUT_S}' to number failed. Original string is '${settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S)}'.")
DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT
}
val execMemLimit = Try {
settings.getProperty(DORIS_EXEC_MEM_LIMIT, DORIS_EXEC_MEM_LIMIT_DEFAULT.toString).toLong
} getOrElse {
- logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT))
+ logWarning(s"Parse '${DORIS_EXEC_MEM_LIMIT}' to number failed. Original string is '${settings.getProperty(DORIS_EXEC_MEM_LIMIT)}'.")
DORIS_EXEC_MEM_LIMIT_DEFAULT
}
@@ -113,7 +111,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, ""))
params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""))
- logger.debug(s"Open scan params is, " +
+ logDebug(s"Open scan params is, " +
s"cluster: ${params.getCluster}, " +
s"database: ${params.getDatabase}, " +
s"table: ${params.getTable}, " +
@@ -159,7 +157,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
started
}
- logger.debug(s"Open scan result is, contextId: $contextId, schema: $schema.")
+ logDebug(s"Open scan result is, contextId: $contextId, schema: $schema.")
/**
* read data and cached in rowBatch.
@@ -213,7 +211,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
*/
def next: AnyRef = {
if (!hasNext) {
- logger.error(SHOULD_NOT_HAPPEN_MESSAGE)
+ logError(SHOULD_NOT_HAPPEN_MESSAGE)
throw new ShouldNeverHappenException
}
rowBatch.next