From 3f979c9f520903284f1a9a72e74cefd596386083 Mon Sep 17 00:00:00 2001 From: Anqi Date: Thu, 23 Nov 2023 11:04:24 +0800 Subject: [PATCH 1/2] update the numPartitions for maxcompute to Long --- .../main/scala/com/vesoft/exchange/common/config/Configs.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index 0627c39..6faaf62 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -902,7 +902,7 @@ object Configs { case SourceCategory.MAXCOMPUTE => { val table = config.getString("table") val partitionSpec = getStringOrNull(config, "partitionSpec") - val numPartitions = getOrElse(config, "numPartitions", "1") + val numPartitions = getOrElse(config, "numPartitions", 1).toString val sentence = getStringOrNull(config, "sentence") MaxComputeConfigEntry( From e59fa4f443229ff3fa9bc39133e6426ab2119f54 Mon Sep 17 00:00:00 2001 From: Anqi Date: Thu, 23 Nov 2023 11:08:45 +0800 Subject: [PATCH 2/2] update the repartition for streaming data --- .../com/vesoft/nebula/exchange/Exchange.scala | 13 ++++++++----- .../com/vesoft/nebula/exchange/Exchange.scala | 16 ++++++++++------ .../com/vesoft/nebula/exchange/Exchange.scala | 13 ++++++++----- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 6aad409..db04722 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -439,12 +439,15 @@ object Exchange { private[this] def repartition(frame: DataFrame, partition: Int, sourceCategory: SourceCategory.Value): DataFrame = { - val currentPart = frame.rdd.partitions.length - if (partition > 0 && currentPart != partition - && !CheckPointHandler.checkSupportResume(sourceCategory)) { - frame.repartition(partition).toDF - } else { + if (frame.isStreaming || partition <= 0 || CheckPointHandler.checkSupportResume(sourceCategory)) { frame + } else { + val currentPart = frame.rdd.partitions.length + if (currentPart == partition) { + frame + } else { + frame.repartition(partition).toDF + } } } diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 3f06528..13f95f6 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -322,7 +322,8 @@ object Exchange { s">>>>>> total client recordsFailure:${totalClientRecordFailure} \n" + s">>>>>> total SST failure:${totalSstRecordFailure} \n" + s">>>>>> total SST Success:${totalSstRecordSuccess}") - LOG.info(s">>>>>> exchange import qps: ${(totalClientRecordSuccess/duration).formatted("%.2f")}/s") + LOG.info( + s">>>>>> exchange import qps: ${(totalClientRecordSuccess / duration).formatted("%.2f")}/s") } /** @@ -436,12 +437,15 @@ object Exchange { private[this] def repartition(frame: DataFrame, partition: Int, sourceCategory: SourceCategory.Value): DataFrame = { - val currentPart = frame.rdd.partitions.length - if (partition > 0 && currentPart != partition - && !CheckPointHandler.checkSupportResume(sourceCategory)) { - frame.repartition(partition).toDF - } else { + if (frame.isStreaming || partition <= 0 || CheckPointHandler.checkSupportResume(sourceCategory)) { frame + } else { + val currentPart = frame.rdd.partitions.length + if (currentPart == partition) { + frame + } else { + frame.repartition(partition).toDF + } } } diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 8729deb..af105a4 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -435,12 +435,15 @@ object Exchange { private[this] def repartition(frame: DataFrame, partition: Int, sourceCategory: SourceCategory.Value): DataFrame = { - val currentPart = frame.rdd.partitions.length - if (partition > 0 && currentPart != partition - && !CheckPointHandler.checkSupportResume(sourceCategory)) { - frame.repartition(partition).toDF - } else { + if (frame.isStreaming || partition <= 0 || CheckPointHandler.checkSupportResume(sourceCategory)) { frame + } else { + val currentPart = frame.rdd.partitions.length + if (currentPart == partition) { + frame + } else { + frame.repartition(partition).toDF + } } }