Skip to content

Commit

Permalink
add a retry interval config
Browse files Browse the repository at this point in the history
  • Loading branch information
fornaix committed Sep 15, 2023
1 parent 787de06 commit f709ba6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public interface ConfigurationOptions {

int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50;

String DORIS_SINK_RETRY_INTERVAL_MS = "doris.sink.retry.interval.ms";

int DORIS_SINK_RETRY_INTERVAL_MS_DEFAULT = 50;

/**
* set types to ignore, split by comma
* e.g.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import java.io.IOException
import java.time.Duration
import java.util
import java.util.Objects
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.LockSupport
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success}
Expand All @@ -46,8 +48,10 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
private val sinkTaskPartitionSize: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
private val sinkTaskUseRepartition: Boolean = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
private val batchInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
private val batchIntervalMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
private val retryIntervalMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_RETRY_INTERVAL_MS,
ConfigurationOptions.DORIS_SINK_RETRY_INTERVAL_MS_DEFAULT)

private val enable2PC: Boolean = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC,
ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT);
Expand Down Expand Up @@ -80,10 +84,12 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
*
*/
def flush(batch: Seq[util.List[Object]], dfColumns: Array[String]): Unit = {
Utils.retry[util.List[Integer], Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
Utils.retry[util.List[Integer], Exception](maxRetryTimes, Duration.ofMillis(retryIntervalMs.toLong), logger) {
dorisStreamLoader.loadV2(batch.asJava, dfColumns, enable2PC)
} match {
case Success(txnIds) => if (enable2PC) handleLoadSuccess(txnIds.asScala, preCommittedTxnAcc)
case Success(txnIds) =>
if (enable2PC) handleLoadSuccess(txnIds.asScala, preCommittedTxnAcc)
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(batchIntervalMs.toLong))
case Failure(e) =>
if (enable2PC) handleLoadFailure(preCommittedTxnAcc)
throw new IOException(
Expand Down Expand Up @@ -120,10 +126,12 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
*
*/
def flush(batch: Seq[InternalRow], dfColumns: Array[String]): Unit = {
Utils.retry[util.List[Integer], Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
Utils.retry[util.List[Integer], Exception](maxRetryTimes, Duration.ofMillis(retryIntervalMs.toLong), logger) {
dorisStreamLoader.loadStream(convertToObjectList(batch, schema), dfColumns, enable2PC)
} match {
case Success(txnIds) => if (enable2PC) handleLoadSuccess(txnIds.asScala, preCommittedTxnAcc)
case Success(txnIds) =>
if (enable2PC) handleLoadSuccess(txnIds.asScala, preCommittedTxnAcc)
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(batchIntervalMs.toLong))
case Failure(e) =>
if (enable2PC) handleLoadFailure(preCommittedTxnAcc)
throw new IOException(
Expand Down

0 comments on commit f709ba6

Please sign in to comment.