diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 7a217a81..bbe12098 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -152,8 +152,6 @@ object Exchange { } else { data.get } - df.cache() - val count = df.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") @@ -171,10 +169,8 @@ object Exchange { batchFailure ) processor.process() - df.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info( - s"import for tag ${tagConfig.name}: data total count: $count, total time: ${costTime}s") + LOG.info(s"import for tag ${tagConfig.name}, total time: ${costTime}s") if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") @@ -213,8 +209,6 @@ object Exchange { df = dataUdf(df, edgeConfig.dstVertexUdf.get) } - df.cache() - val count = df.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}") @@ -230,10 +224,8 @@ object Exchange { batchFailure ) processor.process() - df.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info( - s"import for edge ${edgeConfig.name}: data total count: $count, total time: ${costTime}s") + LOG.info(s"import for edge ${edgeConfig.name}, total time: ${costTime}s") if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index f6757f4c..56e3d996 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -152,8 +152,6 @@ object Exchange { } else { data.get } - df.cache() - val count = df.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") @@ -171,9 +169,8 @@ object Exchange { batchFailure ) processor.process() - df.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for tag ${tagConfig.name}, data count: $count, cost time: ${costTime}s") + LOG.info(s"import for tag ${tagConfig.name}, cost time: ${costTime}s") if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") @@ -212,8 +209,6 @@ object Exchange { df = dataUdf(df, edgeConfig.dstVertexUdf.get) } - df.cache() - val count = df.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}") @@ -229,10 +224,8 @@ object Exchange { batchFailure ) processor.process() - df.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info( - s"import for edge ${edgeConfig.name}, data count: $count, cost time: ${costTime}s") + LOG.info(s"import for edge ${edgeConfig.name}, cost time: ${costTime}s") if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 6ee1917d..9540fdd3 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -152,8 +152,6 @@ object Exchange { } else { data.get } - df.cache() - val count = df.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") @@ -171,9 +169,8 @@ object Exchange { batchFailure ) processor.process() - df.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for tag ${tagConfig.name}, data count: $count, cost time: ${costTime}s") + LOG.info(s"import for tag ${tagConfig.name}, cost time: ${costTime}s") if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") @@ -212,8 +209,6 @@ object Exchange { df = dataUdf(df, edgeConfig.dstVertexUdf.get) } - df.cache() - val count = df.count() val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}") @@ -229,10 +224,8 @@ object Exchange { batchFailure ) processor.process() - df.unpersist() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info( - s"import for edge ${edgeConfig.name}, data count: $count, cost time: ${costTime}s") + LOG.info(s"import for edge ${edgeConfig.name}, cost time: ${costTime}s") if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}")