Skip to content

Commit

Permalink
save the failed request before quit due to tooManyError (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Nov 6, 2023
1 parent 079d161 commit 9432d3f
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,14 @@ class EdgeProcessor(spark: SparkSession,
batchFailure.add(1)
recordFailure.add(edge.toList.size)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
writeErrorStatement(errorBuffer)
throw TooManyErrorsException(
s"There are too many failed batches, batch amount: ${batchFailure.value}, " +
s"your config max error size: ${config.errorConfig.errorMaxSize}")
}
}
}
if (errorBuffer.nonEmpty) {
val appId = SparkEnv.get.blockManager.conf.getAppId
ErrorHandler.save(
errorBuffer,
s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}")
errorBuffer.clear()
}
writeErrorStatement(errorBuffer)
LOG.info(s"edge ${edgeConfig.name} import in spark partition ${TaskContext
.getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms")
writer.close()
Expand Down Expand Up @@ -408,4 +403,14 @@ class EdgeProcessor(spark: SparkSession,
val edgeValue = codec.encodeEdge(edgeItem, nebulaKeys.asJava, values.asJava)
(positiveEdgeKey, reverseEdgeKey, edgeValue)
}

private def writeErrorStatement(errorBuffer: ArrayBuffer[String]): Unit = {
if (errorBuffer.nonEmpty) {
val appId = SparkEnv.get.blockManager.conf.getAppId
ErrorHandler.save(
errorBuffer,
s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}")
errorBuffer.clear()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,14 @@ class VerticesProcessor(spark: SparkSession,
batchFailure.add(1)
recordFailure.add(vertex.toList.size)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
writeErrorStatement(errorBuffer)
throw TooManyErrorsException(
s"There are too many failed batches, batch amount: ${batchFailure.value}, " +
s"your config max error size: ${config.errorConfig.errorMaxSize}")
}
}
}
if (errorBuffer.nonEmpty) {
val appId = SparkEnv.get.blockManager.conf.getAppId
ErrorHandler.save(
errorBuffer,
s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId()}")
errorBuffer.clear()
}
writeErrorStatement(errorBuffer)
LOG.info(s">>>>>> tag ${tagConfig.name} import in spark partition ${TaskContext
.getPartitionId()} cost ${System.currentTimeMillis() - startTime} ms")
writer.close()
Expand Down Expand Up @@ -326,4 +321,14 @@ class VerticesProcessor(spark: SparkSession,
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
(orphanVertexKey, vertexKey, vertexValue)
}

private def writeErrorStatement(errorBuffer: ArrayBuffer[String]): Unit = {
if (errorBuffer.nonEmpty) {
val appId = SparkEnv.get.blockManager.conf.getAppId
ErrorHandler.save(
errorBuffer,
s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId}")
errorBuffer.clear()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,14 @@ class EdgeProcessor(spark: SparkSession,
batchFailure.add(1)
recordFailure.add(edge.toList.size)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
writeErrorStatement(errorBuffer)
throw TooManyErrorsException(
s"There are too many failed batches, batch amount: ${batchFailure.value}, " +
s"your config max error size: ${config.errorConfig.errorMaxSize}")
}
}
}
if (errorBuffer.nonEmpty) {
val appId = SparkEnv.get.blockManager.conf.getAppId
ErrorHandler.save(
errorBuffer,
s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}")
errorBuffer.clear()
}
writeErrorStatement(errorBuffer)
LOG.info(s">>>>> edge ${edgeConfig.name} import in spark partition ${TaskContext
.getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms")
writer.close()
Expand Down Expand Up @@ -431,4 +426,14 @@ class EdgeProcessor(spark: SparkSession,
val edgeValue = codec.encodeEdge(edgeItem, nebulaKeys.asJava, values.asJava)
(positiveEdgeKey, reverseEdgeKey, edgeValue)
}

private def writeErrorStatement(errorBuffer:ArrayBuffer[String]): Unit = {
if (errorBuffer.nonEmpty) {
val appId = SparkEnv.get.blockManager.conf.getAppId
ErrorHandler.save(
errorBuffer,
s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}")
errorBuffer.clear()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,14 @@ class VerticesProcessor(spark: SparkSession,
batchFailure.add(1)
recordFailure.add(vertex.toList.size)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
writeErrorStatement(errorBuffer)
throw TooManyErrorsException(
s"There are too many failed batches, batch amount: ${batchFailure.value}, " +
s"your config max error size: ${config.errorConfig.errorMaxSize}")
}
}
}
if (errorBuffer.nonEmpty) {
val appId = SparkEnv.get.blockManager.conf.getAppId
ErrorHandler.save(
errorBuffer,
s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId()}")
errorBuffer.clear()
}
writeErrorStatement(errorBuffer)
LOG.info(s">>>>> tag ${tagConfig.name} import in spark partition ${TaskContext
.getPartitionId()} cost ${System.currentTimeMillis() - startTime} ms")
writer.close()
Expand Down Expand Up @@ -340,4 +335,14 @@ class VerticesProcessor(spark: SparkSession,
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
(orphanVertexKey, vertexKey, vertexValue)
}

private def writeErrorStatement(errorBuffer: ArrayBuffer[String]): Unit = {
if (errorBuffer.nonEmpty) {
val appId = SparkEnv.get.blockManager.conf.getAppId
ErrorHandler.save(
errorBuffer,
s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId}")
errorBuffer.clear()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,14 @@ class EdgeProcessor(spark: SparkSession,
recordFailure.add(edge.toList.size)

if (batchFailure.value >= config.errorConfig.errorMaxSize) {
writeErrorStatement(errorBuffer)
throw TooManyErrorsException(
s"There are too many failed batches, batch amount: ${batchFailure.value}, " +
s"your config max error size: ${config.errorConfig.errorMaxSize}")
}
}
}
if (errorBuffer.nonEmpty) {
val appId = SparkEnv.get.blockManager.conf.getAppId
ErrorHandler.save(
errorBuffer,
s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}")
errorBuffer.clear()
}
writeErrorStatement(errorBuffer)
LOG.info(s">>>>> edge ${edgeConfig.name} import in spark partition ${TaskContext
.getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms")
writer.close()
Expand Down Expand Up @@ -431,4 +426,14 @@ class EdgeProcessor(spark: SparkSession,
val edgeValue = codec.encodeEdge(edgeItem, nebulaKeys.asJava, values.asJava)
(positiveEdgeKey, reverseEdgeKey, edgeValue)
}

private def writeErrorStatement(errorBuffer: ArrayBuffer[String]): Unit = {
if (errorBuffer.nonEmpty) {
val appId = SparkEnv.get.blockManager.conf.getAppId
ErrorHandler.save(
errorBuffer,
s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}")
errorBuffer.clear()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,14 @@ class VerticesProcessor(spark: SparkSession,
batchFailure.add(1)
recordFailure.add(vertex.toList.size)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
writeErrorStatement(errorBuffer)
throw TooManyErrorsException(
s"There are too many failed batches, batch amount: ${batchFailure.value}, " +
s"your config max error size: ${config.errorConfig.errorMaxSize}")
}
}
}
if (errorBuffer.nonEmpty) {
val appId = SparkEnv.get.blockManager.conf.getAppId
ErrorHandler.save(
errorBuffer,
s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId()}")
errorBuffer.clear()
}
writeErrorStatement(errorBuffer)
LOG.info(s">>>>> tag ${tagConfig.name} import in spark partition ${TaskContext
.getPartitionId()} cost ${System.currentTimeMillis() - startTime} ms")
writer.close()
Expand Down Expand Up @@ -345,4 +340,14 @@ class VerticesProcessor(spark: SparkSession,
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
(orphanVertexKey, vertexKey, vertexValue)
}

private def writeErrorStatement(errorBuffer: ArrayBuffer[String]): Unit = {
if (errorBuffer.nonEmpty) {
val appId = SparkEnv.get.blockManager.conf.getAppId
ErrorHandler.save(
errorBuffer,
s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId}")
errorBuffer.clear()
}
}
}

0 comments on commit 9432d3f

Please sign in to comment.