diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 471e5fb..86dcf55 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -78,19 +78,14 @@ class EdgeProcessor(spark: SparkSession, batchFailure.add(1) recordFailure.add(edge.toList.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { + writeErrorStatement(errorBuffer) throw TooManyErrorsException( s"There are too many failed batches, batch amount: ${batchFailure.value}, " + s"your config max error size: ${config.errorConfig.errorMaxSize}") } } } - if (errorBuffer.nonEmpty) { - val appId = SparkEnv.get.blockManager.conf.getAppId - ErrorHandler.save( - errorBuffer, - s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}") - errorBuffer.clear() - } + writeErrorStatement(errorBuffer) LOG.info(s"edge ${edgeConfig.name} import in spark partition ${TaskContext .getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms") writer.close() @@ -408,4 +403,14 @@ class EdgeProcessor(spark: SparkSession, val edgeValue = codec.encodeEdge(edgeItem, nebulaKeys.asJava, values.asJava) (positiveEdgeKey, reverseEdgeKey, edgeValue) } + + private def writeErrorStatement(errorBuffer: ArrayBuffer[String]): Unit = { + if (errorBuffer.nonEmpty) { + val appId = SparkEnv.get.blockManager.conf.getAppId + ErrorHandler.save( + errorBuffer, + s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}") + errorBuffer.clear() + } + } } diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 1213c76..73d53ae 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -85,19 +85,14 @@ class VerticesProcessor(spark: SparkSession, batchFailure.add(1) recordFailure.add(vertex.toList.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { + writeErrorStatement(errorBuffer) throw TooManyErrorsException( s"There are too many failed batches, batch amount: ${batchFailure.value}, " + s"your config max error size: ${config.errorConfig.errorMaxSize}") } } } - if (errorBuffer.nonEmpty) { - val appId = SparkEnv.get.blockManager.conf.getAppId - ErrorHandler.save( - errorBuffer, - s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId()}") - errorBuffer.clear() - } + writeErrorStatement(errorBuffer) LOG.info(s">>>>>> tag ${tagConfig.name} import in spark partition ${TaskContext .getPartitionId()} cost ${System.currentTimeMillis() - startTime} ms") writer.close() @@ -326,4 +321,14 @@ class VerticesProcessor(spark: SparkSession, val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes) (orphanVertexKey, vertexKey, vertexValue) } + + private def writeErrorStatement(errorBuffer: ArrayBuffer[String]): Unit = { + if (errorBuffer.nonEmpty) { + val appId = SparkEnv.get.blockManager.conf.getAppId + ErrorHandler.save( + errorBuffer, + s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId}") + errorBuffer.clear() + } + } } diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index b9861a4..a458c3b 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -79,19 +79,14 @@ class EdgeProcessor(spark: SparkSession, batchFailure.add(1) recordFailure.add(edge.toList.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { + writeErrorStatement(errorBuffer) throw TooManyErrorsException( s"There are too many failed batches, batch amount: ${batchFailure.value}, " + s"your config max error size: ${config.errorConfig.errorMaxSize}") } } } - if (errorBuffer.nonEmpty) { - val appId = SparkEnv.get.blockManager.conf.getAppId - ErrorHandler.save( - errorBuffer, - s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}") - errorBuffer.clear() - } + writeErrorStatement(errorBuffer) LOG.info(s">>>>> edge ${edgeConfig.name} import in spark partition ${TaskContext .getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms") writer.close() @@ -431,4 +426,14 @@ class EdgeProcessor(spark: SparkSession, val edgeValue = codec.encodeEdge(edgeItem, nebulaKeys.asJava, values.asJava) (positiveEdgeKey, reverseEdgeKey, edgeValue) } + + private def writeErrorStatement(errorBuffer:ArrayBuffer[String]): Unit = { + if (errorBuffer.nonEmpty) { + val appId = SparkEnv.get.blockManager.conf.getAppId + ErrorHandler.save( + errorBuffer, + s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}") + errorBuffer.clear() + } + } } diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 55c3a8a..ece0052 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -87,19 +87,14 @@ class VerticesProcessor(spark: SparkSession, batchFailure.add(1) recordFailure.add(vertex.toList.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { + writeErrorStatement(errorBuffer) throw TooManyErrorsException( s"There are too many failed batches, batch amount: ${batchFailure.value}, " + s"your config max error size: ${config.errorConfig.errorMaxSize}") } } } - if (errorBuffer.nonEmpty) { - val appId = SparkEnv.get.blockManager.conf.getAppId - ErrorHandler.save( - errorBuffer, - s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId()}") - errorBuffer.clear() - } + writeErrorStatement(errorBuffer) LOG.info(s">>>>> tag ${tagConfig.name} import in spark partition ${TaskContext .getPartitionId()} cost ${System.currentTimeMillis() - startTime} ms") writer.close() @@ -340,4 +335,14 @@ class VerticesProcessor(spark: SparkSession, val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes) (orphanVertexKey, vertexKey, vertexValue) } + + private def writeErrorStatement(errorBuffer: ArrayBuffer[String]): Unit = { + if (errorBuffer.nonEmpty) { + val appId = SparkEnv.get.blockManager.conf.getAppId + ErrorHandler.save( + errorBuffer, + s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId}") + errorBuffer.clear() + } + } } diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 334274b..a7002ec 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -80,19 +80,14 @@ class EdgeProcessor(spark: SparkSession, recordFailure.add(edge.toList.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { + writeErrorStatement(errorBuffer) throw TooManyErrorsException( s"There are too many failed batches, batch amount: ${batchFailure.value}, " + s"your config max error size: ${config.errorConfig.errorMaxSize}") } } } - if (errorBuffer.nonEmpty) { - val appId = SparkEnv.get.blockManager.conf.getAppId - ErrorHandler.save( - errorBuffer, - s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}") - errorBuffer.clear() - } + writeErrorStatement(errorBuffer) LOG.info(s">>>>> edge ${edgeConfig.name} import in spark partition ${TaskContext .getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms") writer.close() @@ -431,4 +426,14 @@ class EdgeProcessor(spark: SparkSession, val edgeValue = codec.encodeEdge(edgeItem, nebulaKeys.asJava, values.asJava) (positiveEdgeKey, reverseEdgeKey, edgeValue) } + + private def writeErrorStatement(errorBuffer: ArrayBuffer[String]): Unit = { + if (errorBuffer.nonEmpty) { + val appId = SparkEnv.get.blockManager.conf.getAppId + ErrorHandler.save( + errorBuffer, + s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}") + errorBuffer.clear() + } + } } diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 1fe43cc..d39785e 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -87,19 +87,14 @@ class VerticesProcessor(spark: SparkSession, batchFailure.add(1) recordFailure.add(vertex.toList.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { + writeErrorStatement(errorBuffer) throw TooManyErrorsException( s"There are too many failed batches, batch amount: ${batchFailure.value}, " + s"your config max error size: ${config.errorConfig.errorMaxSize}") } } } - if (errorBuffer.nonEmpty) { - val appId = SparkEnv.get.blockManager.conf.getAppId - ErrorHandler.save( - errorBuffer, - s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId()}") - errorBuffer.clear() - } + writeErrorStatement(errorBuffer) LOG.info(s">>>>> tag ${tagConfig.name} import in spark partition ${TaskContext .getPartitionId()} cost ${System.currentTimeMillis() - startTime} ms") writer.close() @@ -345,4 +340,14 @@ class VerticesProcessor(spark: SparkSession, val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes) (orphanVertexKey, vertexKey, vertexValue) } + + private def writeErrorStatement(errorBuffer: ArrayBuffer[String]): Unit = { + if (errorBuffer.nonEmpty) { + val appId = SparkEnv.get.blockManager.conf.getAppId + ErrorHandler.save( + errorBuffer, + s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId}") + errorBuffer.clear() + } + } }