diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 09c0416f..6041dbd9 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -90,6 +90,16 @@ public interface ConfigurationOptions { int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50; + String DORIS_SINK_MAX_BLOCKING_TIMES = "doris.sink.max.blocking.times"; + int SINK_MAX_BLOCKING_TIMES_DEFAULT = 1; + + String DORIS_SINK_MAX_BLOCKING_INTERVAL_MS = "doris.sink.max.blocking.interval.ms"; + int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 300000; + + String DORIS_SINK_BLOCKING_TRIGGER_KEYS = "doris.sink.block.trigger.keys"; + String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = ""; + + /** * set types to ignore, split by comma * e.g. diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala index 54976a7d..31147c63 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala @@ -170,6 +170,43 @@ private[spark] object Utils { finalParams } + @tailrec + def retry[R, T <: Throwable : ClassTag](retryTimes: Int, maxBlockTimes: Int, batchInterval: Duration, maxBlockInterval: Duration, + blockTriggerKeysArray: Array[String], logger: Logger)(f: => R): Try[R] = { + assert(retryTimes >= 0) + assert(maxBlockTimes >= 0) + var currentBlockInterval = batchInterval + + def increaseBackoffTime(): Unit = { + currentBlockInterval = Duration.ofNanos(Math.min(batchInterval.toNanos * 2, maxBlockInterval.toNanos)) + } + + def shouldBlock(exception: String): Boolean = { + blockTriggerKeysArray.nonEmpty && blockTriggerKeysArray.exists(exception.contains) + } + + val result = Try(f) + result match { + case Success(result) => + LockSupport.parkNanos(currentBlockInterval.toNanos) + Success(result) + case Failure(exception: T) if retryTimes > 0 && !shouldBlock(exception.getMessage) => + logger.warn(s"Execution failed caused by: ", exception) + logger.warn(s"$retryTimes times retry remaining, the next will be in ${batchInterval.toMillis}ms") + LockSupport.parkNanos(batchInterval.toNanos) + retry(retryTimes - 1, maxBlockTimes, currentBlockInterval, maxBlockInterval, blockTriggerKeysArray, logger)(f) + case Failure(exception: T) if maxBlockTimes > 0 && shouldBlock(exception.getMessage) => + logger.warn(s"Execution failed caused by: ", exception) + increaseBackoffTime() + logger.warn(s"$maxBlockTimes times write blocking retry remaining, the next will be in ${currentBlockInterval.toMillis}ms") + LockSupport.parkNanos(currentBlockInterval.toNanos) + retry(retryTimes, maxBlockTimes - 1, currentBlockInterval, maxBlockInterval, blockTriggerKeysArray, logger)(f) + case Failure(exception) => + logger.warn(s"Execution failed caused by: ", exception) + Failure(exception) + } + } + @tailrec def retry[R, T <: Throwable : ClassTag](retryTimes: Int, interval: Duration, logger: Logger)(f: => R): Try[R] = { assert(retryTimes >= 0) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala index b278a385..3512c25c 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -46,6 +46,13 @@ class DorisWriter(settings: SparkSettings) extends Serializable { ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean private val batchInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT) + private val maxSinkBlocks: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_BLOCKING_TIMES, + ConfigurationOptions.SINK_MAX_BLOCKING_TIMES_DEFAULT) + private val blockTriggerKeys: String = settings.getProperty(ConfigurationOptions.DORIS_SINK_BLOCKING_TRIGGER_KEYS, + ConfigurationOptions.SINK_BLOCKING_TRIGGER_KEYS_DEFAULT) + private val maxBlockInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_BLOCKING_INTERVAL_MS, + ConfigurationOptions.SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT) + private val blockTriggerKeysArray: Array[String] = blockTriggerKeys.split(",") private val enable2PC: Boolean = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC, ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT); @@ -72,13 +79,14 @@ class DorisWriter(settings: SparkSettings) extends Serializable { var resultRdd = dataFrame.queryExecution.toRdd val schema = dataFrame.schema + val dfColumns = dataFrame.columns if (Objects.nonNull(sinkTaskPartitionSize)) { resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) } resultRdd.foreachPartition(iterator => { while (iterator.hasNext) { // do load batch with retries - Utils.retry[Int, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) { + Utils.retry[Int, Exception](maxRetryTimes, maxSinkBlocks, Duration.ofMillis(batchInterValMs.toLong), Duration.ofMillis(maxBlockInterValMs.toLong), blockTriggerKeysArray, logger) { loadFunc(iterator.asJava, schema) } match { case Success(txnId) => if (enable2PC) handleLoadSuccess(txnId, preCommittedTxnAcc)