Skip to content
This repository has been archived by the owner on Oct 18, 2021. It is now read-only.

Commit

Permalink
Merge pull request #118 from Nicole00/v2.5
Browse files Browse the repository at this point in the history
add summary report for import
  • Loading branch information
HarrisChu authored Aug 3, 2021
2 parents 2b48014 + 6ba9818 commit 4d4bb7a
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package com.vesoft.nebula.exchange

import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.File

import com.vesoft.nebula.exchange.config.{
ClickHouseConfigEntry,
Configs,
Expand Down Expand Up @@ -136,6 +135,8 @@ object Exchange {

val data = createDataSource(spark, tagConfig.dataSourceConfigEntry)
if (data.isDefined && !c.dry) {
val count = data.get.count()
val startTime = System.currentTimeMillis()
val batchSuccess =
spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
val batchFailure =
Expand All @@ -150,6 +151,10 @@ object Exchange {
batchSuccess,
batchFailure)
processor.process()
val costTime = ((System.currentTimeMillis() - startTime) / 1000).formatted("%.2f")
LOG.info(
s"data source count: ${count}, " +
s"import for tag ${tagConfig.name} cost time: ${costTime} s")
if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) {
LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}")
LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}")
Expand All @@ -174,6 +179,8 @@ object Exchange {
LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}")
val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry)
if (data.isDefined && !c.dry) {
val count = data.get.count()
val startTime = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}")

Expand All @@ -187,6 +194,10 @@ object Exchange {
batchFailure
)
processor.process()
val costTime = ((System.currentTimeMillis() - startTime) / 1000).formatted("%.2f")
LOG.info(
s"data source count: ${count}, " +
s"import for edge ${edgeConfig.name} cost time: ${costTime} s")
if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) {
LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}")
LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}")
Expand All @@ -205,9 +216,12 @@ object Exchange {
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val data = spark.read.text(configs.errorConfig.errorPath)
data.count()
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
val count = data.count()
val startTime = System.currentTimeMillis()
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
processor.process()
val costTime = ((System.currentTimeMillis() - startTime) / 1000).formatted("%.2f")
LOG.info(s"reimport ngql count: ${count}, cost time: ${costTime}")
LOG.info(s"batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s"batchFailure.reimport: ${batchFailure.value}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

package com.vesoft.nebula.exchange.utils

import java.io.File
import java.nio.charset.Charset
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.Logger
import scala.io.Source

object HDFSUtils {
private[this] val LOG = Logger.getLogger(this.getClass)

def getFileSystem(namenode: String = null): FileSystem = {
val conf = new Configuration()
Expand Down Expand Up @@ -63,6 +66,17 @@ object HDFSUtils {
}

def upload(localPath: String, remotePath: String, namenode: String = null): Unit = {
try {
val localFile = new File(localPath)
if (!localFile.exists() || localFile.length() <= 0) {
return
}
} catch {
case e: Throwable =>
LOG.warn("check for empty local file error, but you can ignore this check error. " +
"If there is empty sst file in your hdfs, please delete it manually",
e)
}
val system = getFileSystem(namenode)
try {
system.copyFromLocalFile(new Path(localPath), new Path(remotePath))
Expand Down

0 comments on commit 4d4bb7a

Please sign in to comment.