Skip to content

Commit

Permalink
add write blocking properties to control write blocking.
Browse files Browse the repository at this point in the history
  • Loading branch information
CodeCooker17 committed Sep 13, 2023
1 parent 0daf6c4 commit fc5cf99
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ public interface ConfigurationOptions {

int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50;

String DORIS_SINK_MAX_BLOCKING_TIMES = "doris.sink.max.blocking.times";
int SINK_MAX_BLOCKING_TIMES_DEFAULT = 1;

String DORIS_SINK_MAX_BLOCKING_INTERVAL_MS = "doris.sink.max.blocking.interval.ms";
int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 1000;

String DORIS_SINK_BLOCKING_TRIGGER_KEYS = "doris.sink.block.trigger.keys";
String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = "-235";


/**
* set types to ignore, split by comma
* e.g.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,43 @@ private[spark] object Utils {
finalParams
}

@tailrec
def retry[R, T <: Throwable : ClassTag](retryTimes: Int, maxBlockTimes: Int, batchInterval: Duration, maxBlockInterval: Duration,
blockTriggerKeysArray: Array[String], logger: Logger)(f: => R): Try[R] = {
assert(retryTimes >= 0)
assert(maxBlockTimes >= 0)
var currentBlockInterval = batchInterval

def increaseBackoffTime(): Unit = {
currentBlockInterval = Duration.ofNanos(Math.min(batchInterval.toNanos * 2, maxBlockInterval.toNanos))
}

def shouldBlock(exception: String): Boolean = {
blockTriggerKeysArray.exists(exception.contains)
}

val result = Try(f)
result match {
case Success(result) =>
LockSupport.parkNanos(currentBlockInterval.toNanos)
Success(result)
case Failure(exception: T) if retryTimes > 0 && !shouldBlock(exception.getMessage) =>
logger.warn(s"Execution failed caused by: ", exception)
logger.warn(s"$retryTimes times retry remaining, the next will be in ${batchInterval.toMillis}ms")
LockSupport.parkNanos(batchInterval.toNanos)
retry(retryTimes - 1, maxBlockTimes, currentBlockInterval, maxBlockInterval, blockTriggerKeysArray, logger)(f)
case Failure(exception: T) if maxBlockTimes > 0 && shouldBlock(exception.getMessage) =>
logger.warn(s"Execution failed caused by: ", exception)
increaseBackoffTime()
logger.warn(s"$maxBlockTimes times write blocking retry remaining, the next will be in ${currentBlockInterval.toMillis}ms")
LockSupport.parkNanos(currentBlockInterval.toNanos)
retry(retryTimes, maxBlockTimes - 1, currentBlockInterval, maxBlockInterval, blockTriggerKeysArray, logger)(f)
case Failure(exception) =>
logger.warn(s"Execution failed caused by: ", exception)
Failure(exception)
}
}

@tailrec
def retry[R, T <: Throwable : ClassTag](retryTimes: Int, interval: Duration, logger: Logger)(f: => R): Try[R] = {
assert(retryTimes >= 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
private val batchInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
private val maxSinkBlocks: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_BLOCKING_TIMES,
ConfigurationOptions.SINK_MAX_BLOCKING_TIMES_DEFAULT)
private val blockTriggerKeys: String = settings.getProperty(ConfigurationOptions.DORIS_SINK_BLOCKING_TRIGGER_KEYS,
ConfigurationOptions.SINK_BLOCKING_TRIGGER_KEYS_DEFAULT)
private val maxBlockInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_BLOCKING_INTERVAL_MS,
ConfigurationOptions.SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT)
private val blockTriggerKeysArray: Array[String] = blockTriggerKeys.split(",")

private val enable2PC: Boolean = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC,
ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT);
Expand Down Expand Up @@ -78,7 +85,7 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
resultRdd.foreachPartition(iterator => {
while (iterator.hasNext) {
// do load batch with retries
Utils.retry[Int, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
Utils.retry[Int, Exception](maxRetryTimes, maxSinkBlocks, Duration.ofMillis(batchInterValMs.toLong), Duration.ofMillis(maxBlockInterValMs.toLong), blockTriggerKeysArray, logger) {
loadFunc(iterator.asJava, schema)
} match {
case Success(txnId) => if (enable2PC) handleLoadSuccess(txnId, preCommittedTxnAcc)
Expand Down

0 comments on commit fc5cf99

Please sign in to comment.