From fc5cf994623fc1377d4693407559b854214f7d2e Mon Sep 17 00:00:00 2001 From: codeCooker17 Date: Thu, 6 Jul 2023 10:22:02 +0800 Subject: [PATCH 1/2] add write blocking properties to control write blocking. --- .../doris/spark/cfg/ConfigurationOptions.java | 10 +++++ .../org/apache/doris/spark/sql/Utils.scala | 37 +++++++++++++++++++ .../doris/spark/writer/DorisWriter.scala | 9 ++++- 3 files changed, 55 insertions(+), 1 deletion(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 09c0416f..0faec661 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -90,6 +90,16 @@ public interface ConfigurationOptions { int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50; + String DORIS_SINK_MAX_BLOCKING_TIMES = "doris.sink.max.blocking.times"; + int SINK_MAX_BLOCKING_TIMES_DEFAULT = 1; + + String DORIS_SINK_MAX_BLOCKING_INTERVAL_MS = "doris.sink.max.blocking.interval.ms"; + int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 1000; + + String DORIS_SINK_BLOCKING_TRIGGER_KEYS = "doris.sink.block.trigger.keys"; + String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = "-235"; + + /** * set types to ignore, split by comma * e.g. diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala index 54976a7d..9170cdea 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala @@ -170,6 +170,43 @@ private[spark] object Utils { finalParams } + @tailrec + def retry[R, T <: Throwable : ClassTag](retryTimes: Int, maxBlockTimes: Int, batchInterval: Duration, maxBlockInterval: Duration, + blockTriggerKeysArray: Array[String], logger: Logger)(f: => R): Try[R] = { + assert(retryTimes >= 0) + assert(maxBlockTimes >= 0) + var currentBlockInterval = batchInterval + + def increaseBackoffTime(): Unit = { + currentBlockInterval = Duration.ofNanos(Math.min(batchInterval.toNanos * 2, maxBlockInterval.toNanos)) + } + + def shouldBlock(exception: String): Boolean = { + blockTriggerKeysArray.exists(exception.contains) + } + + val result = Try(f) + result match { + case Success(result) => + LockSupport.parkNanos(currentBlockInterval.toNanos) + Success(result) + case Failure(exception: T) if retryTimes > 0 && !shouldBlock(exception.getMessage) => + logger.warn(s"Execution failed caused by: ", exception) + logger.warn(s"$retryTimes times retry remaining, the next will be in ${batchInterval.toMillis}ms") + LockSupport.parkNanos(batchInterval.toNanos) + retry(retryTimes - 1, maxBlockTimes, currentBlockInterval, maxBlockInterval, blockTriggerKeysArray, logger)(f) + case Failure(exception: T) if maxBlockTimes > 0 && shouldBlock(exception.getMessage) => + logger.warn(s"Execution failed caused by: ", exception) + increaseBackoffTime() + logger.warn(s"$maxBlockTimes times write blocking retry remaining, the next will be in ${currentBlockInterval.toMillis}ms") + LockSupport.parkNanos(currentBlockInterval.toNanos) + retry(retryTimes, maxBlockTimes - 1, currentBlockInterval, maxBlockInterval, blockTriggerKeysArray, logger)(f) + case Failure(exception) => + logger.warn(s"Execution failed caused by: ", exception) + Failure(exception) + } + } + @tailrec def retry[R, T <: Throwable : ClassTag](retryTimes: Int, interval: Duration, logger: Logger)(f: => R): Try[R] = { assert(retryTimes >= 0) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala index b278a385..02db0bc7 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -46,6 +46,13 @@ class DorisWriter(settings: SparkSettings) extends Serializable { ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean private val batchInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT) + private val maxSinkBlocks: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_BLOCKING_TIMES, + ConfigurationOptions.SINK_MAX_BLOCKING_TIMES_DEFAULT) + private val blockTriggerKeys: String = settings.getProperty(ConfigurationOptions.DORIS_SINK_BLOCKING_TRIGGER_KEYS, + ConfigurationOptions.SINK_BLOCKING_TRIGGER_KEYS_DEFAULT) + private val maxBlockInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_BLOCKING_INTERVAL_MS, + ConfigurationOptions.SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT) + private val blockTriggerKeysArray: Array[String] = blockTriggerKeys.split(",") private val enable2PC: Boolean = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC, ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT); @@ -78,7 +85,7 @@ class DorisWriter(settings: SparkSettings) extends Serializable { resultRdd.foreachPartition(iterator => { while (iterator.hasNext) { // do load batch with retries - Utils.retry[Int, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) { + Utils.retry[Int, Exception](maxRetryTimes, maxSinkBlocks, Duration.ofMillis(batchInterValMs.toLong), Duration.ofMillis(maxBlockInterValMs.toLong), blockTriggerKeysArray, logger) { loadFunc(iterator.asJava, schema) } match { case Success(txnId) => if (enable2PC) handleLoadSuccess(txnId, preCommittedTxnAcc) From 31db70e8a5dc47bfb1df3c94b525ca0c91532eae Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 7 Sep 2023 17:00:29 +0800 Subject: [PATCH 2/2] modify config default value, solve conflict. --- .../java/org/apache/doris/spark/cfg/ConfigurationOptions.java | 4 ++-- .../src/main/scala/org/apache/doris/spark/sql/Utils.scala | 2 +- .../scala/org/apache/doris/spark/writer/DorisWriter.scala | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 0faec661..6041dbd9 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -94,10 +94,10 @@ public interface ConfigurationOptions { int SINK_MAX_BLOCKING_TIMES_DEFAULT = 1; String DORIS_SINK_MAX_BLOCKING_INTERVAL_MS = "doris.sink.max.blocking.interval.ms"; - int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 1000; + int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 300000; String DORIS_SINK_BLOCKING_TRIGGER_KEYS = "doris.sink.block.trigger.keys"; - String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = "-235"; + String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = ""; /** diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala index 9170cdea..31147c63 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala @@ -182,7 +182,7 @@ private[spark] object Utils { } def shouldBlock(exception: String): Boolean = { - blockTriggerKeysArray.exists(exception.contains) + blockTriggerKeysArray.nonEmpty && blockTriggerKeysArray.exists(exception.contains) } val result = Try(f) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala index 02db0bc7..3512c25c 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -79,6 +79,7 @@ class DorisWriter(settings: SparkSettings) extends Serializable { var resultRdd = dataFrame.queryExecution.toRdd val schema = dataFrame.schema + val dfColumns = dataFrame.columns if (Objects.nonNull(sinkTaskPartitionSize)) { resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) }