Skip to content

Commit

Permalink
modify config default value, solve conflict.
Browse files Browse the repository at this point in the history
  • Loading branch information
CodeCooker17 committed Sep 7, 2023
1 parent 70fe118 commit ec402eb
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ public interface ConfigurationOptions {
int SINK_MAX_BLOCKING_TIMES_DEFAULT = 1;

String DORIS_SINK_MAX_BLOCKING_INTERVAL_MS = "doris.sink.max.blocking.interval.ms";
int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 1000;
int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 300000;

String DORIS_SINK_BLOCKING_TRIGGER_KEYS = "doris.sink.block.trigger.keys";
String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = "-235";
String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = "";


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private[spark] object Utils {
}

def shouldBlock(exception: String): Boolean = {
blockTriggerKeysArray.exists(exception.contains)
blockTriggerKeysArray.nonEmpty && blockTriggerKeysArray.exists(exception.contains)
}

val result = Try(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
*
*/
def flush(batch: Seq[util.List[Object]], dfColumns: Array[String]): Unit = {
Utils.retry[util.List[Integer], Exception](maxRetryTimes, maxSinkBlocks, Duration.ofMillis(batchInterValMs.toLong), Duration.ofMillis(maxBlockInterValMs.toLong), blockTriggerKeysArray, logger) {
dorisStreamLoader.loadV2(batch.toList.asJava, dfColumns, enable2PC)
Utils.retry[util.List[Integer], Exception](maxRetryTimes, maxSinkBlocks, Duration.ofMillis(batchInterValMs.toLong),
Duration.ofMillis(maxBlockInterValMs.toLong), blockTriggerKeysArray, logger) {
dorisStreamLoader.loadV2(batch.asJava, dfColumns, enable2PC)
} match {
case Success(txnIds) => if (enable2PC) handleLoadSuccess(txnIds.asScala, preCommittedTxnAcc)
case Failure(e) =>
Expand Down Expand Up @@ -127,7 +128,8 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
*
*/
def flush(batch: Seq[InternalRow], dfColumns: Array[String]): Unit = {
Utils.retry[util.List[Integer], Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
Utils.retry[util.List[Integer], Exception](maxRetryTimes, maxSinkBlocks, Duration.ofMillis(batchInterValMs.toLong),
Duration.ofMillis(maxBlockInterValMs.toLong), blockTriggerKeysArray, logger) {
dorisStreamLoader.loadStream(convertToObjectList(batch, schema), dfColumns, enable2PC)
} match {
case Success(txnIds) => if (enable2PC) handleLoadSuccess(txnIds.asScala, preCommittedTxnAcc)
Expand Down

0 comments on commit ec402eb

Please sign in to comment.