diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 77b37c16..a5a7e7a1 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -27,9 +27,10 @@
org.apache.doris
spark-doris-connector-${spark.major.version}_${scala.version}
- 1.0.0-SNAPSHOT
+ ${revision}
Spark Doris Connector
https://doris.apache.org/
+
Apache 2.0 License
@@ -37,16 +38,19 @@
repo
+
scm:git:https://git@github.com/apache/doris-spark-connector.git
scm:git:https://git@github.com/apache/doris-spark-connector.git
scm:git:https://git@github.com/apache/doris-spark-connector.git
HEAD
+
GitHub
https://github.com/apache/doris/issues
+
Dev Mailing List
@@ -61,7 +65,9 @@
commits-unsubscribe@doris.apache.org
+
+ 1.2.0-SNAPSHOT
3.1.2
3.1
2.12
@@ -70,7 +76,7 @@
UTF-8
github
4.1.77.Final
- 2.13.3
+ 2.10.5
1.0.0
@@ -180,6 +186,170 @@
+
+
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+
+
+ sign-artifacts
+ verify
+
+ sign
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 3.2.0
+
+
+ add-source
+ generate-sources
+
+ add-source
+
+
+
+
+
+
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 3.2.1
+
+
+ scala-compile-first
+ process-resources
+
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+
+ -feature
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.1
+
+
+
+ com.google.code.findbugs:*
+ org.slf4j:*
+
+
+
+
+ org.apache.arrow
+ org.apache.doris.shaded.org.apache.arrow
+
+
+ io.netty
+ org.apache.doris.shaded.io.netty
+
+
+ com.fasterxml.jackson
+ org.apache.doris.shaded.com.fasterxml.jackson
+
+
+ org.apache.commons.codec
+ org.apache.doris.shaded.org.apache.commons.codec
+
+
+ com.google.flatbuffers
+ org.apache.doris.shaded.com.google.flatbuffers
+
+
+ org.apache.thrift
+ org.apache.doris.shaded.org.apache.thrift
+
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ 8
+
+
+
+ org.codehaus.mojo
+ license-maven-plugin
+ 2.0.0
+
+
+ add-third-party
+
+ add-third-party
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+
+ org.codehaus.mojo
+ license-maven-plugin
+
+
+
+
+
@@ -202,6 +372,7 @@
+
general-env
@@ -218,178 +389,103 @@
+
+
+ apache-release
+
+
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+
+
+ sign-artifacts
+ verify
+
+ sign
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+
+ true
+
+ false
+
+
+
+ attach-javadocs
+
+ jar
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+
+ true
+
+
+
+ compile
+
+ jar
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-release-plugin
+
+ ${releaseArgs}
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ 3.0.0-M1
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-release-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+
+
+
+
-
-
-
- org.apache.maven.plugins
- maven-gpg-plugin
-
-
- sign-artifacts
- verify
-
- sign
-
-
-
-
-
-
- org.codehaus.mojo
- build-helper-maven-plugin
- 3.2.0
-
-
- add-source
- generate-sources
-
- add-source
-
-
-
-
-
-
-
-
-
-
-
-
- net.alchim31.maven
- scala-maven-plugin
- 3.2.1
-
-
- scala-compile-first
- process-resources
-
- compile
-
-
-
- scala-test-compile
- process-test-resources
-
- testCompile
-
-
-
-
-
- -feature
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 3.2.1
-
-
-
- com.google.code.findbugs:*
- org.slf4j:*
-
-
-
-
- org.apache.arrow
- org.apache.doris.shaded.org.apache.arrow
-
-
- io.netty
- org.apache.doris.shaded.io.netty
-
-
- com.fasterxml.jackson
- org.apache.doris.shaded.com.fasterxml.jackson
-
-
- org.apache.commons.codec
- org.apache.doris.shaded.org.apache.commons.codec
-
-
- com.google.flatbuffers
- org.apache.doris.shaded.com.google.flatbuffers
-
-
- org.apache.thrift
- org.apache.doris.shaded.org.apache.thrift
-
-
-
-
-
- package
-
- shade
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
-
-
- 8
-
-
-
- org.apache.maven.plugins
- maven-javadoc-plugin
-
- true
-
- false
-
-
-
- attach-javadocs
-
- jar
-
-
-
-
-
- org.apache.maven.plugins
- maven-source-plugin
-
- true
-
-
-
- compile
-
- jar
-
-
-
-
-
- org.codehaus.mojo
- license-maven-plugin
- 2.0.0
-
-
- add-third-party
-
- add-third-party
-
-
-
-
-
- org.apache.maven.plugins
- maven-release-plugin
-
- ${releaseArgs}
-
-
-
-
-
+
\ No newline at end of file
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index a3f40611..a93e7fe0 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -90,4 +90,16 @@ public interface ConfigurationOptions {
int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50;
+
+
+
+ /*
+ Set only one StreamLoad task to be submitted per partition
+ to ensure that task retries do not result in repeated submission
+ of StreamLoad tasks on the same batch of data if the task fails.
+ */
+ String DORIS_SINK_PER_PARTITION_TASK_ATOMICITY = "doris.sink.per.partition.task.atomicity";
+
+ boolean DORIS_SINK_PER_PARTITION_TASK_ATOMICITY_DEFAULT = false;
+
}
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
similarity index 90%
rename from spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
rename to spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
index 1d891261..d3dab491 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
@@ -15,17 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.spark;
+package org.apache.doris.spark.load;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
import org.apache.doris.spark.cfg.SparkSettings;
-import org.apache.doris.spark.exception.DorisException;
-
-import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
similarity index 96%
rename from spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
rename to spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 6738c099..61379e36 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -14,15 +14,8 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.spark;
+package org.apache.doris.spark.load;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.StreamLoadException;
@@ -30,6 +23,14 @@
import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.RespContent;
import org.apache.doris.spark.util.ListUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
@@ -45,10 +46,17 @@
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
-import java.sql.Date;
import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -73,13 +81,11 @@ public class DorisStreamLoad implements Serializable {
private String tbl;
private String authEncoded;
private String columns;
- private String[] dfColumns;
private String maxFilterRatio;
private Map streamLoadProp;
private static final long cacheExpireTimeout = 4 * 60;
private final LoadingCache> cache;
private final String fileType;
- private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
public DorisStreamLoad(SparkSettings settings) {
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
@@ -101,11 +107,6 @@ public DorisStreamLoad(SparkSettings settings) {
}
}
- public DorisStreamLoad(SparkSettings settings, String[] dfColumns) {
- this(settings);
- this.dfColumns = dfColumns;
- }
-
public String getLoadUrlStr() {
if (StringUtils.isEmpty(loadUrlStr)) {
return "";
@@ -168,7 +169,7 @@ public String listToString(List> rows) {
}
- public void loadV2(List> rows) throws StreamLoadException, JsonProcessingException {
+ public void loadV2(List> rows, String[] dfColumns) throws StreamLoadException, JsonProcessingException {
if (fileType.equals("csv")) {
load(listToString(rows));
} else if(fileType.equals("json")) {
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index e469f38a..94fab9e6 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -17,9 +17,9 @@
package org.apache.doris.spark.sql
-import org.apache.doris.spark.DorisStreamLoad
-import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.cfg.SparkSettings
import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME
+import org.apache.doris.spark.writer.DorisWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources._
@@ -28,12 +28,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.slf4j.{Logger, LoggerFactory}
-import java.io.IOException
-import java.time.Duration
-import java.util
-import java.util.Objects
import scala.collection.JavaConverters.mapAsJavaMapConverter
-import scala.util.{Failure, Success}
private[sql] class DorisSourceProvider extends DataSourceRegister
with RelationProvider
@@ -60,58 +55,9 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
sparkSettings.merge(Utils.params(parameters, logger).asJava)
// init stream loader
- val dorisStreamLoader = new DorisStreamLoad(sparkSettings, data.columns)
+ val writer = new DorisWriter(sparkSettings)
+ writer.write(data)
- val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
- val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
- val sinkTaskPartitionSize = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
- val sinkTaskUseRepartition = sparkSettings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
- val batchInterValMs = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
-
- logger.info(s"maxRowCount ${maxRowCount}")
- logger.info(s"maxRetryTimes ${maxRetryTimes}")
- logger.info(s"batchInterVarMs ${batchInterValMs}")
-
- var resultRdd = data.rdd
- if (Objects.nonNull(sinkTaskPartitionSize)) {
- resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize)
- }
-
- resultRdd.foreachPartition(partition => {
- val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount)
- partition.foreach(row => {
- val line: util.List[Object] = new util.ArrayList[Object]()
- for (i <- 0 until row.size) {
- val field = row.get(i)
- line.add(field.asInstanceOf[AnyRef])
- }
- rowsBuffer.add(line)
- if (rowsBuffer.size > maxRowCount - 1 ) {
- flush()
- }
- })
- // flush buffer
- if (!rowsBuffer.isEmpty) {
- flush()
- }
-
- /**
- * flush data to Doris and do retry when flush error
- *
- */
- def flush(): Unit = {
- Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
- dorisStreamLoader.loadV2(rowsBuffer)
- rowsBuffer.clear()
- } match {
- case Success(_) =>
- case Failure(e) =>
- throw new IOException(
- s"Failed to load $maxRowCount batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", e)
- }
- }
-
- })
new BaseRelation {
override def sqlContext: SQLContext = unsupportedException
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index 46448205..342e940e 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -17,69 +17,27 @@
package org.apache.doris.spark.sql
-import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
-import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
-import org.apache.spark.rdd.RDD
+import org.apache.doris.spark.cfg.SparkSettings
+import org.apache.doris.spark.writer.DorisWriter
import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext}
import org.slf4j.{Logger, LoggerFactory}
-import java.io.IOException
-import java.time.Duration
-import java.util
-import java.util.Objects
-import scala.collection.JavaConverters._
-import scala.util.{Failure, Success}
-
private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSettings) extends Sink with Serializable {
private val logger: Logger = LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName)
@volatile private var latestBatchId = -1L
- val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
- val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
- val sinkTaskPartitionSize = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
- val sinkTaskUseRepartition = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
- val batchInterValMs = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
- val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings)
+ private val writer = new DorisWriter(settings)
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= latestBatchId) {
logger.info(s"Skipping already committed batch $batchId")
} else {
- write(data.rdd)
+ writer.write(data)
latestBatchId = batchId
}
}
- def write(rdd: RDD[Row]): Unit = {
- var resultRdd = rdd
- if (Objects.nonNull(sinkTaskPartitionSize)) {
- resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize)
- }
- resultRdd
- .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava)
- .foreachPartition(partition => {
- partition
- .grouped(batchSize)
- .foreach(batch => flush(batch))
- })
-
- /**
- * flush data to Doris and do retry when flush error
- *
- */
- def flush(batch: Iterable[util.List[Object]]): Unit = {
- Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
- dorisStreamLoader.loadV2(batch.toList.asJava)
- } match {
- case Success(_) =>
- case Failure(e) =>
- throw new IOException(
- s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max $maxRetryTimes retry times.", e)
- }
- }
- }
-
override def toString: String = "DorisStreamLoadSink"
}
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
index ba6fa861..2f3a5bb0 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
@@ -31,7 +31,7 @@ import scala.annotation.tailrec
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
-private[sql] object Utils {
+private[spark] object Utils {
/**
* quote column name
* @param colName column name
@@ -169,7 +169,9 @@ private[sql] object Utils {
assert(retryTimes >= 0)
val result = Try(f)
result match {
- case Success(result) => Success(result)
+ case Success(result) =>
+ LockSupport.parkNanos(interval.toNanos)
+ Success(result)
case Failure(exception: T) if retryTimes > 0 =>
logger.warn(s"Execution failed caused by: ", exception)
logger.warn(s"$retryTimes times retry remaining, the next will be in ${interval.toMillis}ms")
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
new file mode 100644
index 00000000..66f58c60
--- /dev/null
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.writer
+
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.load.{CachedDorisStreamLoadClient, DorisStreamLoad}
+import org.apache.doris.spark.sql.Utils
+import org.apache.spark.sql.DataFrame
+import org.slf4j.{Logger, LoggerFactory}
+
+import java.io.IOException
+import java.time.Duration
+import java.util
+import java.util.Objects
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success}
+
+class DorisWriter(settings: SparkSettings) extends Serializable {
+
+ private val logger: Logger = LoggerFactory.getLogger(classOf[DorisWriter])
+
+ val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
+ ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
+ private val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
+ ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
+ private val sinkTaskPartitionSize: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
+ private val sinkTaskUseRepartition: Boolean = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
+ ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
+ private val batchInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
+ ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
+ val partitionTaskAtomicity = settings.getProperty(ConfigurationOptions.DORIS_SINK_PER_PARTITION_TASK_ATOMICITY,
+ ConfigurationOptions.DORIS_SINK_PER_PARTITION_TASK_ATOMICITY_DEFAULT.toString).toBoolean
+ private val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings)
+
+ def write(dataFrame: DataFrame): Unit = {
+ var resultRdd = dataFrame.rdd
+ val dfColumns = dataFrame.columns
+ if (Objects.nonNull(sinkTaskPartitionSize)) {
+ resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize)
+ }
+ if (partitionTaskAtomicity) {
+ resultRdd
+ .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava)
+ .foreachPartition(partition => {
+ flush(partition.toIterable, dfColumns)
+ })
+ } else {
+ resultRdd
+ .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava)
+ .foreachPartition(partition => {
+ partition
+ .grouped(batchSize)
+ .foreach(batch => flush(batch, dfColumns))
+ })
+ }
+
+ /**
+ * flush data to Doris and do retry when flush error
+ *
+ */
+ def flush(batch: Iterable[util.List[Object]], dfColumns: Array[String]): Unit = {
+ Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
+ dorisStreamLoader.loadV2(batch.toList.asJava, dfColumns)
+ } match {
+ case Success(_) =>
+ case Failure(e) =>
+ throw new IOException(
+ s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", e)
+ }
+ }
+
+ }
+
+
+}