Skip to content

Commit

Permalink
add import status and records count (#162)
Browse files Browse the repository at this point in the history
* add import status and records count

* update test
  • Loading branch information
Nicole00 authored Oct 13, 2023
1 parent e4bd644 commit 93fe258
Show file tree
Hide file tree
Showing 21 changed files with 540 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class GraphProvider(addresses: List[HostAddress], timeout: Int, sslConfigEntry:

def switchSpace(session: Session, space: String): ResultSet = {
val switchStatment = s"use $space"
LOG.info(s"switch space $space")
LOG.info(s">>>>>> switch space $space")
val result = submit(session, switchStatment)
result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ case class DataBaseConfigEntry(graphAddress: List[String],
"nebula.address.meta has wrong format, please make sure the format is [\"ip1:port1\",\"ip2:port2\"]")
}

override def toString: String = super.toString
override def toString: String = {
s"DataBaseConfigEntry:{graphAddress:$graphAddress, space:$space, metaAddress:$metaAddresses}"
}

def getGraphAddress: List[HostAddress] = {
val hostAndPorts = new ListBuffer[HostAddress]
Expand Down Expand Up @@ -94,7 +96,8 @@ case class DataBaseConfigEntry(graphAddress: List[String],
case class UserConfigEntry(user: String, password: String) {
require(user.trim.nonEmpty && password.trim.nonEmpty)

override def toString: String = super.toString
override def toString: String =
s"UserConfigEntry{user:$user, password:xxxxx}"
}

/**
Expand All @@ -106,7 +109,7 @@ case class UserConfigEntry(user: String, password: String) {
case class ConnectionConfigEntry(timeout: Int, retry: Int) {
require(timeout > 0 && retry > 0)

override def toString: String = super.toString
override def toString: String = s"cConnectionConfigEntry:{timeout:$timeout, retry:$retry}"
}

/**
Expand All @@ -119,7 +122,7 @@ case class ConnectionConfigEntry(timeout: Int, retry: Int) {
case class ExecutionConfigEntry(timeout: Int, retry: Int, interval: Int) {
require(timeout > 0 && retry > 0 && interval > 0)

override def toString: String = super.toString
override def toString: String = s"ExecutionConfigEntry:{timeout:$timeout, retry:$retry}"
}

/**
Expand All @@ -131,7 +134,8 @@ case class ExecutionConfigEntry(timeout: Int, retry: Int, interval: Int) {
case class ErrorConfigEntry(errorPath: String, errorMaxSize: Int) {
require(errorPath.trim.nonEmpty && errorMaxSize > 0)

override def toString: String = super.toString
override def toString: String =
s"ErrorConfigEntry:{errorPath:$errorPath, errorMaxSize:$errorMaxSize}"
}

/**
Expand All @@ -143,7 +147,7 @@ case class ErrorConfigEntry(errorPath: String, errorMaxSize: Int) {
case class RateConfigEntry(limit: Int, timeout: Int) {
require(limit > 0 && timeout > 0)

override def toString: String = super.toString
override def toString: String = s"RateConfigEntry:{limit:$limit, timeout:$timeout}"
}

/**
Expand All @@ -168,7 +172,7 @@ case class SslConfigEntry(enableGraph: Boolean,
}
}

override def toString: String = super.toString
override def toString: String = s"SslConfigEntry:{enableGraph:$enableGraph, enableMeta:$enableMeta, signType:${signType.toString}}"
}

case class CaSignParam(caCrtFilePath: String, crtFilePath: String, keyFilePath: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class ReloadProcessor(data: DataFrame,
.getPartitionId()}")
errorBuffer.clear()
}
LOG.info(s"data reload in partition ${TaskContext
LOG.info(s">>>>> data reload in partition ${TaskContext
.getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms")
writer.close()
graphProvider.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ class NebulaSSTWriter(path: String) extends Writer {

try {
RocksDB.loadLibrary()
LOG.info("Loading RocksDB successfully")
LOG.info(">>>>> Loading RocksDB successfully")
} catch {
case _: Exception =>
LOG.error("Can't load RocksDB library!")
LOG.error(">>>>> Can't load RocksDB library!")
}

// TODO More Config ...
Expand Down Expand Up @@ -108,7 +108,7 @@ class GenerateSstFile extends Serializable {
}
} catch {
case e: Throwable => {
LOG.error("sst file write error,", e)
LOG.error(">>>>> sst file write error,", e)
batchFailure.add(1)
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
throw new RuntimeException("Switch Failed for " + switchResult.getErrorMessage)
}

LOG.info(s"Connection to ${dataBaseConfigEntry.graphAddress}")
LOG.info(s">>>>>> Connection to ${dataBaseConfigEntry.graphAddress}")
}

def execute(vertices: Vertices, writeMode: WriteMode.Mode): String = {
Expand Down Expand Up @@ -329,16 +329,16 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
val result = graphProvider.submit(session, statement)
if (result.isSucceeded) {
LOG.info(
s" write ${config.name}, batch size(${vertices.values.size}), latency(${result.getLatency})")
s">>>>> write ${config.name}, batch size(${vertices.values.size}), latency(${result.getLatency})")
return null
}
LOG.error(s"write vertex failed for ${result.getErrorMessage}")
LOG.error(s">>>>> write vertex failed for ${result.getErrorMessage} statement: \n $statement")
if (result.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) {
throw new RuntimeException(
s"write ${config.name} failed for E_BAD_PERMISSION: ${result.getErrorMessage}")
}
} else {
LOG.error(s"write vertex failed because write speed is too fast")
LOG.error(s">>>>>> write vertex failed because write speed is too fast")
}
statement
}
Expand All @@ -349,16 +349,16 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
val result = graphProvider.submit(session, statement)
if (result.isSucceeded) {
LOG.info(
s" write ${config.name}, batch size(${edges.values.size}), latency(${result.getLatency}us)")
s">>>>>> write ${config.name}, batch size(${edges.values.size}), latency(${result.getLatency}us)")
return null
}
LOG.error(s"write edge failed for ${result.getErrorMessage}")
LOG.error(s">>>>>> write edge failed for ${result.getErrorMessage}")
if (result.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) {
throw new RuntimeException(
s"write ${config.name} failed for E_BAD_PERMISSION: ${result.getErrorMessage}")
}
} else {
LOG.error(s"write vertex failed because write speed is too fast")
LOG.error(s">>>>>> write vertex failed because write speed is too fast")
}
statement
}
Expand All @@ -369,9 +369,9 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
if (result.isSucceeded) {
return null
}
LOG.error(s"reimport ngql failed for ${result.getErrorMessage}")
LOG.error(s">>>>>> reimport ngql failed for ${result.getErrorMessage}")
} else {
LOG.error(s"reimport ngql failed because write speed is too fast")
LOG.error(s">>>>>> reimport ngql failed because write speed is too fast")
}
ngql
}
Expand Down
Loading

0 comments on commit 93fe258

Please sign in to comment.