Skip to content

Commit

Permalink
fix edge partition error
Browse files Browse the repository at this point in the history
  • Loading branch information
qianyong committed May 24, 2019
1 parent 8f663e9 commit 1b2041d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ object SparkSstFileGenerator {
val keyEncoded: Array[Byte] = NativeClient.createVertexKey(partitionId, vertexId, tagType, DefaultVersion)
val valuesEncoded: Array[Byte] = NativeClient.encode(values.toArray)
log.debug(s"Tag(partition=${partitionId}): " + DatatypeConverter.printHexBinary(keyEncoded) + " = " + DatatypeConverter.printHexBinary(valuesEncoded))
(PartitionIdAndBytesEncoded(partitionId.toLong, new BytesWritable(keyEncoded)), new PartitionIdAndValueBinaryWritable(partitionId, new BytesWritable(valuesEncoded)))
(PartitionIdAndBytesEncoded(partitionId, new BytesWritable(keyEncoded)), new PartitionIdAndValueBinaryWritable(partitionId, new BytesWritable(valuesEncoded)))
}
}.repartitionAndSortWithinPartitions(new SortByKeyPartitioner(repartitionNumber)).map(v => (v._1.valueEncoded, v._2)).saveAsNewAPIHadoopFile(localSstFileOutput, classOf[BytesWritable], classOf[PartitionIdAndValueBinaryWritable], classOf[SstFileOutputFormat])
}
Expand Down Expand Up @@ -350,7 +350,7 @@ object SparkSstFileGenerator {

val valuesEncoded: Array[Byte] = NativeClient.encode(values.toArray)
log.debug(s"Edge(partition=${partitionId}): " + DatatypeConverter.printHexBinary(keyEncoded) + " = " + DatatypeConverter.printHexBinary(valuesEncoded))
(PartitionIdAndBytesEncoded(id, new BytesWritable(keyEncoded)), new PartitionIdAndValueBinaryWritable(partitionId, new BytesWritable(valuesEncoded), VertexOrEdgeEnum.Edge))
(PartitionIdAndBytesEncoded(partitionId, new BytesWritable(keyEncoded)), new PartitionIdAndValueBinaryWritable(partitionId, new BytesWritable(valuesEncoded), VertexOrEdgeEnum.Edge))
}
}
}.repartitionAndSortWithinPartitions(new SortByKeyPartitioner(repartitionNumber)).map(v => (v._1.valueEncoded, v._2)).saveAsNewAPIHadoopFile(localSstFileOutput, classOf[BytesWritable], classOf[PartitionIdAndValueBinaryWritable], classOf[SstFileOutputFormat])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.vesoft.tools

import java.io.File
import java.io.{File, IOException}

import com.vesoft.tools.VertexOrEdgeEnum.VertexOrEdgeEnum
import javax.xml.bind.DatatypeConverter
Expand All @@ -23,30 +23,30 @@ import scala.sys.process._
* worker_node1
* |
* |-sstFileOutput
* |
* |--1
* | |
* | |——vertex-${FIRST_KEY}.data
* | |--edge-${FIRST_KEY}.data
* |
* |--2
* |
* |——vertex-${FIRST_KEY}.data
* |--edge-${FIRST_KEY}.data
* |
* |--1
* | |
* | |——vertex-${FIRST_KEY1}.data
* | |--edge-${FIRST_KEY}.data
* |
* |--2
* |
* |——vertex-${FIRST_KEY}.data
* |--edge-${FIRST_KEY}.data
* worker_node2
* |
* |-sstFileOutput
* |
* |--1
* | |
* | |——vertex-${FIRST_KEY}.data
* | |--edge-${FIRST_KEY}.data
* |
* |--2
* |
* |——vertex-${FIRST_KEY}.data
* |--edge-${FIRST_KEY}.data
*/
* |
* |--1
* | |
* | |——vertex-${FIRST_KEY}.data
* | |--edge-${FIRST_KEY}.data
* |
* |--2
* |
* |——vertex-${FIRST_KEY}.data
* |--edge-${FIRST_KEY}.data
**/
class SstFileOutputFormat extends FileOutputFormat[BytesWritable, PartitionIdAndValueBinaryWritable] {
override def getRecordWriter(job: TaskAttemptContext): RecordWriter[BytesWritable, PartitionIdAndValueBinaryWritable] = {
if (FileOutputFormat.getCompressOutput(job)) {
Expand Down Expand Up @@ -196,8 +196,16 @@ class SstRecordWriter(localSstFileOutput: String, configuration: Configuration)
val destinationHdfsDir = s"${hdfsParentDir}${hdfsDirectory}"
val destinationPath = new Path(destinationHdfsDir)

if (!hdfsFileSystem.exists(destinationPath)) {
hdfsFileSystem.mkdirs(destinationPath)
try {
if (!hdfsFileSystem.exists(destinationPath)) {
hdfsFileSystem.mkdirs(destinationPath)
}
}
catch {
case e: IOException => {
log.error(s"Error when mkdir hdfs dir ${destinationPath}", e)
throw e
}
}

val command = List(s"${hadoopHome}/bin/hdfs", "dfs", "-copyFromLocal", s"${localSstFile}", destinationHdfsDir)
Expand Down

0 comments on commit 1b2041d

Please sign in to comment.