From 769b3d710897882a5f89d559d2188f18715fbeed Mon Sep 17 00:00:00 2001 From: Anqi Date: Wed, 17 May 2023 15:37:24 +0800 Subject: [PATCH] remove data count --- .../scala/com/vesoft/nebula/exchange/Exchange.scala | 12 ++---------- .../scala/com/vesoft/nebula/exchange/Exchange.scala | 10 ++-------- .../scala/com/vesoft/nebula/exchange/Exchange.scala | 11 ++--------- 3 files changed, 6 insertions(+), 27 deletions(-) diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 44bcf0b3..973f85ac 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -142,8 +142,6 @@ object Exchange { val fields = tagConfig.vertexField :: tagConfig.fields val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { - data.get.cache() - val count = data.get.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") @@ -161,10 +159,8 @@ object Exchange { batchFailure ) processor.process() - data.get.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info( - s"import for tag ${tagConfig.name}: data total count: $count, total time: ${costTime}s") + LOG.info(s"import for tag ${tagConfig.name}, total time: ${costTime}s") if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") @@ -195,8 +191,6 @@ object Exchange { } val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { - data.get.cache() - val count = data.get.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}") @@ -212,10 +206,8 @@ object Exchange { batchFailure ) processor.process() - data.get.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info( - s"import for edge ${edgeConfig.name}: data total count: $count, total time: ${costTime}s") + LOG.info(s"import for edge ${edgeConfig.name}, total time: ${costTime}s") if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 073f8045..82d96405 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -142,8 +142,6 @@ object Exchange { val fields = tagConfig.vertexField :: tagConfig.fields val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { - data.get.cache() - val count = data.get.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") @@ -161,9 +159,8 @@ object Exchange { batchFailure ) processor.process() - data.get.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for tag ${tagConfig.name}, data count: $count, cost time: ${costTime}s") + LOG.info(s"import for tag ${tagConfig.name}, cost time: ${costTime}s") if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") @@ -194,8 +191,6 @@ object Exchange { } val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { - data.get.cache() - val count = data.get.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}") @@ -213,8 +208,7 @@ object Exchange { processor.process() data.get.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info( - s"import for edge ${edgeConfig.name}, data count: $count, cost time: ${costTime}s") + LOG.info(s"import for edge ${edgeConfig.name}, cost time: ${costTime}s") if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 58143415..a631c753 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -142,8 +142,6 @@ object Exchange { val fields = tagConfig.vertexField :: tagConfig.fields val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { - data.get.cache() - val count = data.get.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") @@ -161,9 +159,8 @@ object Exchange { batchFailure ) processor.process() - data.get.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for tag ${tagConfig.name}, data count: $count, cost time: ${costTime}s") + LOG.info(s"import for tag ${tagConfig.name}, cost time: ${costTime}s") if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") @@ -194,8 +191,6 @@ object Exchange { } val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { - data.get.cache() - val count = data.get.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}") @@ -211,10 +206,8 @@ object Exchange { batchFailure ) processor.process() - data.get.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info( - s"import for edge ${edgeConfig.name}, data count: $count, cost time: ${costTime}s") + LOG.info(s"import for edge ${edgeConfig.name}, cost time: ${costTime}s") if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}")