diff --git a/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SparkSstFileGenerator.scala b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SparkSstFileGenerator.scala index 7a2c17c5469..2912c4ba74c 100644 --- a/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SparkSstFileGenerator.scala +++ b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SparkSstFileGenerator.scala @@ -301,7 +301,7 @@ object SparkSstFileGenerator { val keyEncoded: Array[Byte] = NativeClient.createVertexKey(partitionId, vertexId, tagType, DefaultVersion) val valuesEncoded: Array[Byte] = NativeClient.encode(values.toArray) log.debug(s"Tag(partition=${partitionId}): " + DatatypeConverter.printHexBinary(keyEncoded) + " = " + DatatypeConverter.printHexBinary(valuesEncoded)) - (PartitionIdAndBytesEncoded(partitionId.toLong, new BytesWritable(keyEncoded)), new PartitionIdAndValueBinaryWritable(partitionId, new BytesWritable(valuesEncoded))) + (PartitionIdAndBytesEncoded(partitionId, new BytesWritable(keyEncoded)), new PartitionIdAndValueBinaryWritable(partitionId, new BytesWritable(valuesEncoded))) } }.repartitionAndSortWithinPartitions(new SortByKeyPartitioner(repartitionNumber)).map(v => (v._1.valueEncoded, v._2)).saveAsNewAPIHadoopFile(localSstFileOutput, classOf[BytesWritable], classOf[PartitionIdAndValueBinaryWritable], classOf[SstFileOutputFormat]) } @@ -350,7 +350,7 @@ object SparkSstFileGenerator { val valuesEncoded: Array[Byte] = NativeClient.encode(values.toArray) log.debug(s"Edge(partition=${partitionId}): " + DatatypeConverter.printHexBinary(keyEncoded) + " = " + DatatypeConverter.printHexBinary(valuesEncoded)) - (PartitionIdAndBytesEncoded(id, new BytesWritable(keyEncoded)), new PartitionIdAndValueBinaryWritable(partitionId, new BytesWritable(valuesEncoded), VertexOrEdgeEnum.Edge)) + (PartitionIdAndBytesEncoded(partitionId, new BytesWritable(keyEncoded)), new PartitionIdAndValueBinaryWritable(partitionId, new BytesWritable(valuesEncoded), VertexOrEdgeEnum.Edge)) } } }.repartitionAndSortWithinPartitions(new SortByKeyPartitioner(repartitionNumber)).map(v => (v._1.valueEncoded, v._2)).saveAsNewAPIHadoopFile(localSstFileOutput, classOf[BytesWritable], classOf[PartitionIdAndValueBinaryWritable], classOf[SstFileOutputFormat]) diff --git a/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SstFileOutputFormat.scala b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SstFileOutputFormat.scala index 4d8412217a9..497ecb5e2e8 100644 --- a/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SstFileOutputFormat.scala +++ b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SstFileOutputFormat.scala @@ -1,6 +1,6 @@ package com.vesoft.tools -import java.io.File +import java.io.{File, IOException} import com.vesoft.tools.VertexOrEdgeEnum.VertexOrEdgeEnum import javax.xml.bind.DatatypeConverter @@ -23,30 +23,30 @@ import scala.sys.process._ * worker_node1 * | * |-sstFileOutput - * | - * |--1 - * | | - * | |——vertex-${FIRST_KEY}.data - * | |--edge-${FIRST_KEY}.data - * | - * |--2 - * | - * |——vertex-${FIRST_KEY}.data - * |--edge-${FIRST_KEY}.data + * | + * |--1 + * | | + * | |——vertex-${FIRST_KEY1}.data + * | |--edge-${FIRST_KEY}.data + * | + * |--2 + * | + * |——vertex-${FIRST_KEY}.data + * |--edge-${FIRST_KEY}.data * worker_node2 * | * |-sstFileOutput - * | - * |--1 - * | | - * | |——vertex-${FIRST_KEY}.data - * | |--edge-${FIRST_KEY}.data - * | - * |--2 - * | - * |——vertex-${FIRST_KEY}.data - * |--edge-${FIRST_KEY}.data - */ + * | + * |--1 + * | | + * | |——vertex-${FIRST_KEY}.data + * | |--edge-${FIRST_KEY}.data + * | + * |--2 + * | + * |——vertex-${FIRST_KEY}.data + * |--edge-${FIRST_KEY}.data + **/ class SstFileOutputFormat extends FileOutputFormat[BytesWritable, PartitionIdAndValueBinaryWritable] { override def getRecordWriter(job: TaskAttemptContext): RecordWriter[BytesWritable, PartitionIdAndValueBinaryWritable] = { if (FileOutputFormat.getCompressOutput(job)) { @@ -196,8 +196,16 @@ class SstRecordWriter(localSstFileOutput: String, configuration: Configuration) val destinationHdfsDir = s"${hdfsParentDir}${hdfsDirectory}" val destinationPath = new Path(destinationHdfsDir) - if (!hdfsFileSystem.exists(destinationPath)) { - hdfsFileSystem.mkdirs(destinationPath) + try { + if (!hdfsFileSystem.exists(destinationPath)) { + hdfsFileSystem.mkdirs(destinationPath) + } + } + catch { + case e: IOException => { + log.error(s"Error when mkdir hdfs dir ${destinationPath}", e) + throw e + } } val command = List(s"${hadoopHome}/bin/hdfs", "dfs", "-copyFromLocal", s"${localSstFile}", destinationHdfsDir)