Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setting Directed Flag Causes NullPointerException #29

Open
qantik opened this issue Dec 10, 2017 · 1 comment
Open

Setting Directed Flag Causes NullPointerException #29

qantik opened this issue Dec 10, 2017 · 1 comment

Comments

@qantik
Copy link

qantik commented Dec 10, 2017

Hi,

Somehow the Spark implementation crashes whenever the directed flag is set to true.
I ran it with both the karate.edgelist example and some dummy two-edge graph.
The exception is always raised at the same location inside initTransitionProb after the
graph has been loaded.

java.lang.NullPointerException
at Node2vec$$anonfun$initTransitionProb$2.apply(Node2vec.scala:69)
at Node2vec$$anonfun$initTransitionProb$2.apply(Node2vec.scala:68)
at org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$5.apply(GraphImpl.scala:129)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$5.apply(GraphImpl.scala:129)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

Cheers.

@datazhen
Copy link

datazhen commented Apr 27, 2018

Reason

I had met same error when running val (j, q) = GraphOps.setupAlias(nodeAttr.neighbors) in GraphOps.initTransitionProb, This is due to the nodeAttr object being Null.

Detail

  1. Some dst nodes should be included in the node2attr object but not in the following code:
    val node2attr = triplets.map { case (src, dst, weight) =>
      (src, Array((dst, weight))) 
    }.reduceByKey(_++_).map { case (srcId, neighbors: Array[(Long, Double)]) =>
      var neighbors_ : Array[(Long, Double)] = neighbors.groupBy(_._1).map { case (group, traversable) =>  
        traversable.head
      }.toArray
      if (neighbors_.length > bcMaxDegree.value) {
        neighbors_ = neighbors.sortWith{ case (left, right) => left._2 > right._2 }.slice(0, bcMaxDegree.value)
      }
  1. Then when creating graph object by val graph = Graph(indexedNodes, indexedEdges), the dst nodes mentioned above, which are missing, will be created by default. And the format by default is [vertexId, Null] ,instead of [vertexId, NodeAttr]. So the error come.

Solutions

To solve the problem, some modules should be modified. The details is shown below:

  1. GraphOps.initTransitionProb
    val graph = Graph(indexedNodes, indexedEdges).mapVertices[NodeAttr] { case (vertexId, nodeAttr) =>
      var path:Array[Long] = null
        if (nodeAttr != null) {  // add 
          val (j, q) = GraphOps.setupAlias(nodeAttr.neighbors)
          val nextNodeIndex = GraphOps.drawAlias(j, q)
          nodeAttr.path = Array(vertexId, nodeAttr.neighbors(nextNodeIndex)._1)
          nodeAttr
        }else{
          NodeAttr() // create a new object
        }
    }
  1. Node2Vec.randomWalk
// add:.filter(x=>x._2.path.nonEmpty).
    val examples = g.vertices.filter(x=>x._2.path.nonEmpty).cache
...

// add the condition: attr.dstNeighbors != null && attr.dstNeighbors.nonEmpty
iter.map { case (edge, (attr, pathBuffer)) =>
            try {
              if (pathBuffer != null && pathBuffer.nonEmpty && attr.dstNeighbors != null && attr.dstNeighbors.nonEmpty) {
                val nextNodeIndex = GraphOps.drawAlias(attr.J, attr.q)
                val nextNodeId = attr.dstNeighbors(nextNodeIndex)
                s"$pathBuffer\t$nextNodeId" 
              } else {
                pathBuffer //add
              }
            } catch {
              case e: Exception => throw new RuntimeException(e.getMessage)
            }

Hope this can help you! Good luck!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants