diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 0faec661..6041dbd9 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -94,10 +94,10 @@ public interface ConfigurationOptions { int SINK_MAX_BLOCKING_TIMES_DEFAULT = 1; String DORIS_SINK_MAX_BLOCKING_INTERVAL_MS = "doris.sink.max.blocking.interval.ms"; - int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 1000; + int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 300000; String DORIS_SINK_BLOCKING_TRIGGER_KEYS = "doris.sink.block.trigger.keys"; - String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = "-235"; + String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = ""; /** diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala index 9170cdea..31147c63 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala @@ -182,7 +182,7 @@ private[spark] object Utils { } def shouldBlock(exception: String): Boolean = { - blockTriggerKeysArray.exists(exception.contains) + blockTriggerKeysArray.nonEmpty && blockTriggerKeysArray.exists(exception.contains) } val result = Try(f) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala index 02db0bc7..3512c25c 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -79,6 +79,7 @@ class DorisWriter(settings: SparkSettings) extends Serializable { var resultRdd = dataFrame.queryExecution.toRdd val schema = dataFrame.schema + val dfColumns = dataFrame.columns if (Objects.nonNull(sinkTaskPartitionSize)) { resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) }