From 2162ebff77b2b85d7dde5f44fb903a661b15ab30 Mon Sep 17 00:00:00 2001 From: Terry Date: Wed, 17 Aug 2022 08:17:56 +0800 Subject: [PATCH 1/6] code work ZhangYinhan --- .../embedding/struct2vec/Struct2vec.scala | 315 ++++++++++++++++++ .../graph/embedding/struct2vec/utils.scala | 16 + 2 files changed, 331 insertions(+) create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/utils.scala diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala new file mode 100644 index 000000000..9343970b9 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala @@ -0,0 +1,315 @@ +package com.tencent.angel.graph.embedding.struct2vec + +import breeze.linalg.BroadcastedColumns.broadcastOp +import breeze.linalg.BroadcastedRows.broadcastOp +import breeze.linalg.Vector.castFunc + +import java.lang.Math.{abs, max, min} +import breeze.linalg.sum +import breeze.numerics.exp + +import scala.collection.JavaConversions.asJavaCollection +import scala.collection.immutable.Map +import scala.collection.mutable +import scala.collection.mutable.{ArrayBuffer, ListBuffer, Queue, Set} +import scala.util.Random + +class Struct2vec { + + private val graph_Nodes : List[Int] = List() + private val idx2Nodes : List[Int] = List() + private val Nodes2idx : List[Int] = List() + private val idx : List[Int] = List.range(0,idx2Nodes.length,1) + private val graph : List[Int] = List() + private val embedding :Map[String,Int] = Map() + + val reuse :Boolean => _ + val output :String => _ + val opt1_reduce_len :Boolean => _ + val opt2_reduce_sim_calc :Boolean => _ + val opt3_num_layers: Int => _ + +// def this() { +// this(s"Node2Vec_${(new Random).nextInt()}") +// } +// +// def setOutputDir(in: String): Unit = { +// output = in +// } + + + private def Init( + name: String, walk_length: Int, workers: Int, stay_prob: Float, opt1_reduce_len: Boolean, + opt2_reduce_sim_calc: Boolean, opt3_num_layers: Int, reuse : Boolean + ) = { + //inital parmas + val opt1_reduce_len :Boolean = opt1_reduce_len + val opt2_reduce_sim_calc :Boolean = opt2_reduce_sim_calc + val opt3_num_layers: Int = opt3_num_layers + + } + + private def create_context_graph(max_num_layers: Int, workers: Int): Unit = { + + } + + def compute_orderd_degreelist(max_num_layers: Int): Map[Int, Int] = { + var degreeList : Map[Int, Int] = Map() + var nodes = graph_Nodes + + for(node <- nodes){ + degreeList+=(node->get_order_degreelist_node(node,max_num_layers)) + } + return degreeList + } + + def get_order_degreelist_node(root: Int, max_num_layers: Int=0):Int = { + + if (max_num_layers == 0) {max_num_layers = Double.PositiveInfinity } + + var order_degree_sequence_dict:Map[Int,Int] = Map() + + var visited:ListBuffer[Boolean] = ListBuffer.fill(graph_Nodes.length)(false) + + val queue : Queue[Int] = Queue() + var level =0 + + // in queue + queue += root + visited(root) = true + + while(queue.length >0 && level <= max_num_layers){ + var count :Int = queue.length + + + while(count > 0){ + var top = queue.dequeue() + var node = idx2Nodes(top) + var degree = graph(node) + + if (opt1_reduce_len == true){ + var degree_list : mutable.Map[Int,Int] = mutable.Map() + degree_list(degree) = degree_list.get(degree) + 1 + }else{ + var degree_list : List[Int] = List() + degree_list:+degree + } + + + + for (nei <- graph(node)){ + val nei_idx = Nodes2idx(nei) + if (visited(nei_idx) == false){ + visited(nei_idx) = true + queue.enqueue(nei_idx) + } + } + count-=1 + } + + var order_degree_list:ListBuffer[(Int,Int)] = ListBuffer() + if(opt1_reduce_len == true){ + for((degree :Int,freq :Int)<- degree_list){ + order_degree_list.append((degree,freq)) + } + order_degree_list.sortBy(_._1) + + }else{order_degree_list.sorted} + + order_degree_sequence_dict+=(level-> order_degree_list) + level-=1 + } + } + + def compute_structural_distance(max_num_layers:Int,workers:Int=1,verbose:Int=0): Unit = { + + if (opt1_reduce_len == true){ + var dist_func = cost_max() + }else{ + var dist_func = cost() + } + + var degreeList = compute_orderd_degreelist(max_num_layers) + + if (opt2_reduce_sim_calc == true){ + var degrees = create_vector() + var degreeListSelected: Map[Int,Int] = Map() + var vertices: Map[Int,Int] = Map() + val n_nodes = idx.length + for(v<-idx){ + var nbs = get_vertices(v,graph(idx2Nodes(v)),degrees,n_nodes) + vertices+=(v-> nbs) + degreeListSelected+=(v->degreeList(v)) + for(n<-nbs){ + degreeListSelected+=(n-> degreeList(n)) + } + } + + }else{ + var vertices :Map[Int,List[Int]] = Map() + for(v<-degreeList) { + for(vd <- degreeList.keys){ + if (vd>v){vertices+=(v->vd)} + } + } +// for(part_list in partition_dict(vertices,workers)){ +// var results = Parallel(workers)(delayed(cpmpute_dtw_dist)(part_list,degreelist,dist_func)) +// } + // var dtw_dist = new Map[Map[]]() +// val structural_dist = convert_dtw_struc_dist(dtw_dist) + + } +// return structural_dist + } + + def create_vector(){ + val degrees :Map[Int,Map[String,ListBuffer[Int]]]= Map() + val degrees_sorted :Set[Int] = Set() + val G = graph + for(v<-idx){ + var degree :Int = G(idx2Nodes(v)).length + degrees_sorted.add(degree) +// if(degrees.contains(degree) == false){ +// degrees(degree) = +// } + degrees(degree)("vertices").append(v) + degrees_sorted = degrees_sorted.toArray + degrees_sorted.sorted + + var l = degrees_sorted.length + var index = 0 + for(degree<- degree_sorted){ + if(index>0){ + degrees(degree)+=("before"-> degrees_sorted(index-1)) + } + if(index<(l-1)){ + degrees(degree)+=("after"-> degrees_sorted(index+1)) + } + } + } + return degrees + } + + def get_layer_rep(pair_distance:Map[(Int,Int),Map[Int,(Int,Int)]]) { + val layer_distances:Map[Int,Map[(Int,Int),(Int,Int)]]= Map() + var layer_adj:Map[Int,Map[Int,ListBuffer[Int]]] = Map() + + for((v_pair,layer_dist)<- pair_distance ){ + for((layer,distance)<- layer_dist){ + var vx:Int = v_pair._1 + var vy:Int = v_pair._2 + + layer_distances(layer)(vx,vy) -> (layer,distance) + layer_adj(layer)(vx).append(vy) + layer_adj(layer)(vy).append(vx) + + } + } + return layer_adj,layer_distances + } + + def get_transition_probs(layers_adj:Map[Int,Map[Int,ListBuffer[Int]]], + layers_distances:Map[Int,Map[(Int,Int),(Int,Int)]]) { + + var layers_alias : Map[Int,Map[Int,Map[Int,List[Int]]]] = Map() + var layers_accept :Map[Int,Map[Int,Map[Int,List[Int]]]] = Map() + + for(layer<- layers_adj.keys){ +// var neighbors :Map[Int,ListBuffer[Int]] = layers_adj(layer) +// var layer_distances : Map[(Int,Int),(Int,Int)] = layers_distances(layer) + var node_alias_dict :Map[Int,Map[Int,List[Int]]] = Map() + var node_accept_dict :Map[Int,Map[Int,List[Int]]] = Map() + var norm_weights :Map[Int,ListBuffer[Double]] = Map() + + for((v,neighbors)<-layers_adj(layer)){ + var edge_list:ListBuffer[Double] = ListBuffer() + var sum_weight :Double = 0.0 + + for(n<- neighbors){ + if(layers_distances.contains((v,n))==true){ + edge_list.append(exp(layers_distances(layer)(v,n)) + }else{ + edge_list.append(exp(layers_distances(layer)(n,v)) + } + sum_weight += exp(layers_distances(layer)(n,v)) + + edge_list = for(x<-edge_list) yield x/sum_weight + norm_weights+=(v->edge_list) +// node_alias_dict+=(v->create_alias_table(edge_list)) +// node_accept_dict+=(v->create_alias_table(edge_list)) + } + layers_alias+=(layer->node_alias_dict) + layers_accept+=(layer->node_accept_dict) + } + } + return layers_accept , layers_alias + } + + def cost(a:List[Int],b:List[Int]):Double ={ + val ep=0.5 + val m = max(a(0),b(0))+ep + val mi = min(a(0),b(0))+ep + val result = ((m/mi)-1) + return result + } + + def cost_min(a:List[Int],b:List[Int]): Double ={ + val ep=0.5 + val m = max(a(0),b(0))+ep + val mi = min(a(0),b(0))+ep + val result = ((m/mi)-1) * min(a(1),b(1)) + return result + } + + def cost_max(a:List[Int],b:List[Int]):Double={ + val ep=0.5 + val m = max(a(0),b(0))+ep + val mi = min(a(0),b(0))+ep + val result = ((m/mi)-1)*max(a(1),b(1)) + return result + } + + def convert_dtw_struc_dist(distances:Map[Map[Int,List[(Int,Int)]],Map[Int,List[Int]]],startLayer:Int=1) = { + for((vertices , layers)<-distances){ + var keys_layers = layers.keys.toList + var startLayer:Int = min(keys_layers.length,startLayer) + + for(layer <- 0 to startLayer) {keys_layers.remove(0)} + for(layer <- keys_layers) {layers(layer)+=layers(layer - 1)} + } + return distances + } + + def verifyDegrees(degree:Int,degree_v_root:Int,degree_a:Int,degree_b:Int): Int ={ + var degree_now :Int = 0 + if(degree_b == -1){ + var degree_now:Int = degree_a + } else if (degree_a == -1) { + var degree_now :Int= degree_b + } else if (abs(degree_b - degree_v_root) < abs(degree_a - degree_v_root)){ + var degree_now :Int= degree_b + }else{ + var degree_now :Int= degree_a + } + return degree_now + } + + def compute_dtw_dist(part_list:List[(Int,List[Int])],degreeList:Map[Int,List[Int]],dist_func:String){ + var dtw_dist :Map[(Int,Int),Int] = Map() + for((v1,nbs)<- part_list){ + var lists_v1 = degreeList(v1) + for(v2<-nbs){ + var lists_v2 = degreeList(v2) + var max_layer = min(lists_v1.length,lists_v2.length) + for(layer<- 0 to max_layer){ + var dist , path = fastdtw(list_v1(layer),list_v2(layer),radius=1,dist=dist_func) + dtw_dist((v1,v2))+=(layer->dist) + } + } + } + return dtw_dist + } + + + +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/utils.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/utils.scala new file mode 100644 index 000000000..ac58d3629 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/utils.scala @@ -0,0 +1,16 @@ +package com.tencent.angel.graph.embedding.struct2vec + + + +object utils { + +// def preprocess_nxgraph() = { +// +// } + + + + + } + +} From 67937249f9ad3895d4302927b78cc44eaa6b3a36 Mon Sep 17 00:00:00 2001 From: YinHan-Zhang <1513032551@qq.com> Date: Fri, 19 Aug 2022 02:27:14 -0700 Subject: [PATCH 2/6] this is a test --- .../main/java/com/tencent/angel/graph/utils/GeneralGetUtils.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spark-on-angel/graph/src/main/java/com/tencent/angel/graph/utils/GeneralGetUtils.java b/spark-on-angel/graph/src/main/java/com/tencent/angel/graph/utils/GeneralGetUtils.java index cda772d68..81578d9ff 100644 --- a/spark-on-angel/graph/src/main/java/com/tencent/angel/graph/utils/GeneralGetUtils.java +++ b/spark-on-angel/graph/src/main/java/com/tencent/angel/graph/utils/GeneralGetUtils.java @@ -30,6 +30,7 @@ public static PartitionGetResult partitionGet(PSContext psContext, PartitionGetP MatrixMeta meta = psContext.getMatrixMetaManager().getMatrixMeta(param.getMatrixId()); try { + System.out.println("sssaaaa") return new PartGeneralGetResult(meta.getValueClass(), nodeIds, data); } catch (ClassNotFoundException e) { throw new AngelException("Can not get value class "); From c25e0bf7f5d5271aca8134af9356042570014d40 Mon Sep 17 00:00:00 2001 From: Terry Date: Mon, 22 Aug 2022 08:31:55 +0800 Subject: [PATCH 3/6] Week 5 Code Work --- .../embedding/struct2vec/Alias_table.scala | 53 +++++ .../embedding/struct2vec/BiasWalker.scala | 63 ++++++ .../graph/embedding/struct2vec/DTW.scala | 77 +++++++ .../embedding/struct2vec/Struct2vec.scala | 94 +++++---- .../Struct2vecGraphParatition.scala | 94 +++++++++ .../struct2vec/Struct2vecPSModel.scala | 67 ++++++ .../struct2vec/Struct2vecParams.scala | 199 ++++++++++++++++++ .../graph/embedding/struct2vec/fastdtw.scala | 31 +++ .../struct2vec/fastdtwUtils/CostMatrix.scala | 122 +++++++++++ .../fastdtwUtils/IndexConstraints.scala | 104 +++++++++ .../struct2vec/fastdtwUtils/Space.scala | 115 ++++++++++ .../fastdtwUtils/TimeSeriesElement.scala | 11 + .../struct2vec/fastdtwUtils/VectorValue.scala | 35 +++ 13 files changed, 1024 insertions(+), 41 deletions(-) create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Alias_table.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/BiasWalker.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/DTW.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecGraphParatition.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecPSModel.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecParams.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtw.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/CostMatrix.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/IndexConstraints.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/Space.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/TimeSeriesElement.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/VectorValue.scala diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Alias_table.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Alias_table.scala new file mode 100644 index 000000000..b0ece9aed --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Alias_table.scala @@ -0,0 +1,53 @@ +package com.tencent.angel.graph.embedding.struct2vec + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +object Alias_table { + + def createAliasTable(areaRatio: Array[Float]): (Array[Float], Array[Int]) = { + val len = areaRatio.length + val small = ArrayBuffer[Int]() + val large = ArrayBuffer[Int]() + val accept = Array.fill(len)(0f) + val alias = Array.fill(len)(0) + + for (idx <- areaRatio.indices) { + if (areaRatio(idx) < 1.0) small.append(idx) else large.append(idx) + } + while (small.nonEmpty && large.nonEmpty) { + val smallIndex = small.remove(small.size - 1) + val largeIndex = large.remove(large.size - 1) + accept(smallIndex) = areaRatio(smallIndex) + alias(smallIndex) = largeIndex + areaRatio(largeIndex) = areaRatio(largeIndex) - (1 - areaRatio(smallIndex)) + if (areaRatio(largeIndex) < 1.0) small.append(largeIndex) else large.append(largeIndex) + } + while (small.nonEmpty) { + val smallIndex = small.remove(small.size - 1) + accept(smallIndex) = 1 + } + + while (large.nonEmpty) { + val largeIndex = large.remove(large.size - 1) + accept(largeIndex) = 1 + } + (accept, alias) + } + + def alias_sample(rand:Random, accept: Array[Float], alias: Array[Int], sampleNum: Int): Array[Int] = { + + val indices = new Array[Int](sampleNum) + + for (i <- (0 until sampleNum)) { + val id = rand.nextInt(accept.length) + val v = rand.nextDouble().toFloat + if (v < accept(id)) { + indices(i) = id + } else { + indices(i) = alias(id) + } + } + indices + } +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/BiasWalker.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/BiasWalker.scala new file mode 100644 index 000000000..e2a46b621 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/BiasWalker.scala @@ -0,0 +1,63 @@ +package com.tencent.angel.graph.embedding.struct2vec + +import struct2vec.Alias_table.alias_sample + +import scala.collection.mutable.ListBuffer +import scala.util.Random + +class BiasWalker (srcNodesArray: Array[Long]) { + + private var idx2Node = srcNodesArray + private var idx: Array[Int] = Array(idx2Node.length) + for(i <- 0 to idx2Node.length) idx(i)=i + +// def simulate_walks(num_walks: Int,walk_length: Int,stay_prob: Double =0.3) ={ +// +// +// } + + def _simulate_walks(nodes:Array[Int],num_walks: Int,walk_length: Int,stay_prob: Double, + layers_adj:Array[Array[Int]],layers_accept:Array[Array[Float]], + layers_alias:Array[Array[Float]],gamma:Array[Array[Float]]) ={ + var walks = new Array[ListBuffer[Long]](num_walks) + Random.shuffle(nodes) + for( i <- 0 to num_walks ) { + for(node <- nodes) + walks(i) = exec_random_walk(layers_adj,layers_accept,layers_alias,node,walk_length,gamma,stay_prob) + } + walks + } + + def exec_random_walk(graphs: Array[Array[Array[Int]]], layers_accept: Array[Array[Array[Float]]], layers_alias: Array[Array[Array[Int]]] + , node: Int, walk_length: Int, gamma: Array[Array[Float]], stay_prob: Double)={ + var Initlayer: Int = 0 + var layer: Int = 0 + var path: ListBuffer[Long] = ListBuffer(idx.length) + path.append(idx2Node(node)) + + //同一层级 + while (path.length < walk_length){ + var rand = Random.nextFloat() + if (rand < stay_prob) { + node = ChooseNeigbor(node,graphs,layers_alias,layers_accept,layer) + path.append(idx2Node(node)) + }else{ // 不同层级 + var x = math.log(gamma(layer)(node))+1 + var p_moveup = (x / (x+1)) // 升层级的概率 + + if((rand > p_moveup)&&(layer > Initlayer)) //降层级 + layer = layer - 1 + if ((graphs.contains(layer+1))&&(graphs(layer+1).contains(node))) //升层级 + layer = layer + 1 + } + path + } + def ChooseNeigbor(node: Int, graphs: Array[Array[Array[Int]]], layers_alias: Array[Array[Array[Int]]], + layers_accept: Array[Array[Array[Float]]], layer: Int) ={ + val node_list = graphs(layer)(node) + val idx = alias_sample(Random,layers_accept(layer)(node),layers_alias(layer)(node),1) + //node_list(idx) + } + + } +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/DTW.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/DTW.scala new file mode 100644 index 000000000..0cd403227 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/DTW.scala @@ -0,0 +1,77 @@ +package com.tencent.angel.graph.embedding.struct2vec + +import com.tencent.angel.graph.embedding.struct2vec.fastdtwUtils.{ CostMatrix, IndexConstraints, MatrixEntry, PassthroughIndexConstraints, Space, TimeSeriesElement } + +/** + * A wrapper for the cost matrix calculation based on a specified metric + * + * @param space + */ +class DTW[T](space: Space[T]) { + + /** + * @param left + * @param right + * @param constraints + * @return + */ + def costMatrix(left: Seq[TimeSeriesElement[T]], right: Seq[TimeSeriesElement[T]], constraints: IndexConstraints = new PassthroughIndexConstraints): CostMatrix = + (left, right) match { + case (Nil, _) | (_, Nil) => //one of the series is empty + CostMatrix() + case _ => CostMatrix({ + + // COST MATRIX: + // 5|_|_|_|_|_|_|E| E = min Global Cost + // 4|_|_|_|_|_|_|_| S = Start point + // 3|_|_|_|_|_|_|_| each cell = min global cost to get to that point + // j 2|_|_|_|_|_|_|_| + // 1|_|_|_|_|_|_|_| + // 0|S|_|_|_|_|_|_| + // 0 1 2 3 4 5 6 + // i + // access is M(i,j)... column-row + + left.foldLeft(Seq[Seq[MatrixEntry]]()) { (columns, curLeft: TimeSeriesElement[T]) => + val i = columns.length //this is the x-index + val (smallestIndex, _) = constraints.columnRange(i, right.length - 1) + val constrainedRight: Seq[TimeSeriesElement[T]] = constraints.mask(right, i) + columns :+ ( + if (columns.isEmpty) { + //i = 0 + //create a matrix entry for every j within the applied constraints + constrainedRight.foldLeft(Seq[MatrixEntry]()) { (column: Seq[MatrixEntry], curLeft: TimeSeriesElement[T]) => + val lastDistance = column.lastOption.getOrElse(MatrixEntry(i -> 0, smallestIndex)) + column :+ MatrixEntry(i -> (smallestIndex + column.length), lastDistance.value + space.distance(left.head.v, curLeft.v)) + } + } else { + //i != 0 + val lastColumn: Seq[MatrixEntry] = constraints.constrain(columns.last, i) //i-1, * + val bottomElement: MatrixEntry = MatrixEntry(i -> smallestIndex, lastColumn.head.value + space.distance(curLeft.v, constrainedRight.head.v)) + + val filledOptionalLastColumn = lastColumn.map { v => Option(v) } ++ Seq.fill(constrainedRight.length)(None) + val slidingValues: Seq[Seq[Option[MatrixEntry]]] = filledOptionalLastColumn.sliding(2).toIndexedSeq + val neighbors: Seq[(TimeSeriesElement[T], Seq[Option[MatrixEntry]])] = constrainedRight.drop(1).zip(slidingValues) + + //neighbors is an element in the right series with the cost matrix entries for the West and Southwest directions + neighbors.foldLeft(Seq(bottomElement)) { (column: Seq[MatrixEntry], neighborhood: (TimeSeriesElement[T], Seq[Option[MatrixEntry]])) => + + val j = column.length + smallestIndex + val curRight = neighborhood._1 + val neighboringValues: Seq[Option[MatrixEntry]] = neighborhood._2 + + val costSouth: Double = column.last.value //i, j-1 + val minGlobalCost = (Seq(costSouth) ++ neighboringValues.flatten.map(_.value)).min + + //add an entry to the current column (for curLeft) containing the total cost to consider curRight a match + column :+ MatrixEntry(i -> j, minGlobalCost + space.distance(curLeft.v, curRight.v)) + } + + }) + + } + + }: _*) + } + +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala index 9343970b9..ac0262f51 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala @@ -1,58 +1,63 @@ package com.tencent.angel.graph.embedding.struct2vec -import breeze.linalg.BroadcastedColumns.broadcastOp -import breeze.linalg.BroadcastedRows.broadcastOp -import breeze.linalg.Vector.castFunc +import com.tencent.angel.graph.embedding.struct2vec.{Struct2vecParams,Struct2vecGraphPartition} import java.lang.Math.{abs, max, min} -import breeze.linalg.sum -import breeze.numerics.exp - import scala.collection.JavaConversions.asJavaCollection import scala.collection.immutable.Map import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, ListBuffer, Queue, Set} import scala.util.Random +import struct2vec.Struct2vecParams -class Struct2vec { - - private val graph_Nodes : List[Int] = List() - private val idx2Nodes : List[Int] = List() - private val Nodes2idx : List[Int] = List() - private val idx : List[Int] = List.range(0,idx2Nodes.length,1) - private val graph : List[Int] = List() - private val embedding :Map[String,Int] = Map() - - val reuse :Boolean => _ - val output :String => _ - val opt1_reduce_len :Boolean => _ - val opt2_reduce_sim_calc :Boolean => _ - val opt3_num_layers: Int => _ - -// def this() { -// this(s"Node2Vec_${(new Random).nextInt()}") -// } -// -// def setOutputDir(in: String): Unit = { -// output = in -// } - - - private def Init( - name: String, walk_length: Int, workers: Int, stay_prob: Float, opt1_reduce_len: Boolean, - opt2_reduce_sim_calc: Boolean, opt3_num_layers: Int, reuse : Boolean - ) = { - //inital parmas - val opt1_reduce_len :Boolean = opt1_reduce_len - val opt2_reduce_sim_calc :Boolean = opt2_reduce_sim_calc - val opt3_num_layers: Int = opt3_num_layers - } - private def create_context_graph(max_num_layers: Int, workers: Int): Unit = { +class Struct2vec(params: Struct2vecParams ) { + +// private val graph_Nodes : List[Int] = List() +// private val idx2Nodes : List[Int] = List() +// private val Nodes2idx : List[Int] = List() +// private val idx : List[Int] = List.range(0,idx2Nodes.length,1) +// private val graph : List[Int] = List() +// private val embedding :Map[String,Int] = Map() + + + + + private def createMatrix(name: String, + numRow: Int, + minId: Long, + maxId: Long, + rowType: RowType, + psNumPartition: Int, + data: RDD[(Long, Long)], + useBalancePartition: Boolean, + percent: Float): PSMatrixImpl = { + + val modelContext = new ModelContext($(psPartitionNum), minId, maxId, -1, name, + SparkContext.getOrCreate().hadoopConfiguration) + + val matrix = ModelContextUtils.createMatrixContext(modelContext, rowType, + classOf[LongArrayElement]) + + if (useBalancePartition && (!modelContext.isUseHashPartition)) { + index = data.flatMap(f => Iterator(f._1, f._2)) + .persist($(storageLevel)) + LoadBalancePartitioner.partition(index, modelContext.getMaxNodeId, + modelContext.getPartitionNum, matrix, percent) + } + + val psMatrix = PSMatrix.matrix(matrix) + val psMatrixImpl = new PSMatrixImpl(psMatrix.id, matrix.getName, 1, + modelContext.getMaxNodeId, matrix.getRowType) + + if (useBalancePartition && (!modelContext.isUseHashPartition)) + index.unpersist() + psMatrixImpl } + //计算有序的度列表 def compute_orderd_degreelist(max_num_layers: Int): Map[Int, Int] = { var degreeList : Map[Int, Int] = Map() var nodes = graph_Nodes @@ -63,6 +68,7 @@ class Struct2vec { return degreeList } + //获取有序度的节点 def get_order_degreelist_node(root: Int, max_num_layers: Int=0):Int = { if (max_num_layers == 0) {max_num_layers = Double.PositiveInfinity } @@ -121,6 +127,7 @@ class Struct2vec { } } + //计算结构距离 def compute_structural_distance(max_num_layers:Int,workers:Int=1,verbose:Int=0): Unit = { if (opt1_reduce_len == true){ @@ -190,6 +197,7 @@ class Struct2vec { return degrees } + //获得层级 def get_layer_rep(pair_distance:Map[(Int,Int),Map[Int,(Int,Int)]]) { val layer_distances:Map[Int,Map[(Int,Int),(Int,Int)]]= Map() var layer_adj:Map[Int,Map[Int,ListBuffer[Int]]] = Map() @@ -208,6 +216,7 @@ class Struct2vec { return layer_adj,layer_distances } + //权重表 def get_transition_probs(layers_adj:Map[Int,Map[Int,ListBuffer[Int]]], layers_distances:Map[Int,Map[(Int,Int),(Int,Int)]]) { @@ -269,6 +278,7 @@ class Struct2vec { return result } + //dtw转换成结构距离 def convert_dtw_struc_dist(distances:Map[Map[Int,List[(Int,Int)]],Map[Int,List[Int]]],startLayer:Int=1) = { for((vertices , layers)<-distances){ var keys_layers = layers.keys.toList @@ -280,6 +290,7 @@ class Struct2vec { return distances } + //确定度 def verifyDegrees(degree:Int,degree_v_root:Int,degree_a:Int,degree_b:Int): Int ={ var degree_now :Int = 0 if(degree_b == -1){ @@ -294,6 +305,7 @@ class Struct2vec { return degree_now } + //计算dtw距离 def compute_dtw_dist(part_list:List[(Int,List[Int])],degreeList:Map[Int,List[Int]],dist_func:String){ var dtw_dist :Map[(Int,Int),Int] = Map() for((v1,nbs)<- part_list){ @@ -307,7 +319,7 @@ class Struct2vec { } } } - return dtw_dist + dtw_dist } diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecGraphParatition.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecGraphParatition.scala new file mode 100644 index 000000000..2ea084260 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecGraphParatition.scala @@ -0,0 +1,94 @@ +package com.tencent.angel.graph.embedding.struct2vec + +import com.tencent.angel.graph.psf.neighbors.SampleNeighborsWithCount.NeighborsAliasTableElement + +import scala.util.Random + + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + + +class Struct2vecGraphPartition(index: Int, srcNodesArray: Array[Long], srcNodesSamplePaths: Array[Array[Long]], batchSize: Int) { + + def process(model: Struct2vecPSModel, iteration: Int): Struct2vecGraphPartition = { + println(s"partition $index: ---------- iteration $iteration starts ----------") + val rnd = new Random() + //按块读取 + srcNodesArray.indices.sliding(batchSize, batchSize).foreach { nodesIndex => + //尾节点集合 + val pullNodes = srcNodesSamplePaths.slice(nodesIndex.head, nodesIndex.last + 1) + .map(a => (a.last, 1)).groupBy(_._1).map(t => (t._1, t._2.size)) + val (nodes, count) = pullNodes.unzip + + //获取节点的邻居 + val beforeSample = System.currentTimeMillis() + val nodesToNeighboes = model.getSampledNeighbors(model.edgesPsMatrix, nodes.toArray, count.toArray) + + println(s"partition $index, iter $iteration, sampleTime: ${System.currentTimeMillis() - beforeSample} ms") + + //遍历每一个节点 nodesIndex 节点列表 + for (idx <- nodesIndex) { + val oldPath = srcNodesSamplePaths(idx) // 前一个节点的下标 + val oldPathTail = oldPath.last // 尾节点路径 + val tailNeighbors = nodesToNeighboes.get(oldPathTail) //尾节点的邻居 + if (tailNeighbors.nonEmpty) { + val sampleFromNeighbors = tailNeighbors(rnd.nextInt(tailNeighbors.size)) // sample a node randomly from tail node's neighbors + srcNodesSamplePaths(idx) = Array.concat(oldPath, Array(sampleFromNeighbors)) // merge old path and sample node + } + + } + } + + println(s"partition $index: ---------- iteration $iteration terminated ----------") + + //return idx2Node -> index srcNodesArray -> node2idx + new Struct2vecGraphPartition(index, srcNodesArray, srcNodesSamplePaths, batchSize) + + } + + + //存储路径 + def save(): Array[Array[Long]] = + srcNodesSamplePaths + + def deepClone(): Struct2vecGraphPartition = { + val newSrcNodesArray = srcNodesArray.clone() + val newPaths = new Array[Array[Long]](srcNodesSamplePaths.length) + var i = 0 + while (i < srcNodesSamplePaths.length) { + newPaths(i) = srcNodesSamplePaths(i) + i += 1 + } + new Struct2vecGraphPartition(index, newSrcNodesArray, newPaths, batchSize) + } +} + + +//单例对象 +object Struct2vecGraphPartition { + def initPSMatrixAndNodePath(model: Struct2vecPSModel, index: Int, iterator: Iterator[(Long, Array[Long], Array[Float], Array[Int])], batchSize: Int): DeepWalkGraphPartition = { + val srcNodes = new LongArrayList() + iterator.sliding(batchSize, batchSize).foreach { pairs => + val nodeId2Neighbors = new Long2ObjectOpenHashMap[NeighborsAliasTableElement](pairs.length) + + pairs.foreach { case (src, neighbors, accept, alias) => + val elem = new NeighborsAliasTableElement(neighbors, accept, alias) + nodeId2Neighbors.put(src, elem) + srcNodes.add(src) + } + model.initNodeNei(nodeId2Neighbors) + nodeId2Neighbors.clear() + } + initNodePaths(index,srcNodes.toLongArray(),batchSize) + } + + def initNodePaths(index: Int, iterator: Array[Long], batchSize: Int): Struct2vecGraphPartition = { + val srcNodesSamplePaths = ArrayBuffer[Array[Long]]() + iterator.foreach { node => + srcNodesSamplePaths.append(Array(node)) + } + new Struct2vecGraphPartition(index, iterator, srcNodesSamplePaths.toArray, batchSize) + } +} + diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecPSModel.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecPSModel.scala new file mode 100644 index 000000000..09f1c229a --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecPSModel.scala @@ -0,0 +1,67 @@ +package com.tencent.angel.graph.embedding.struct2vec + + +import com.tencent.angel.graph.common.param.ModelContext +import com.tencent.angel.graph.common.psf.param.LongKeysUpdateParam +import com.tencent.angel.graph.common.psf.result.GetLongsResult +import com.tencent.angel.graph.model.general.init.GeneralInit +import com.tencent.angel.graph.utils.ModelContextUtils +import com.tencent.angel.ml.matrix.RowType +import com.tencent.angel.ps.storage.vector.element.IElement +import com.tencent.angel.spark.ml.util.LoadBalancePartitioner +import com.tencent.angel.spark.models.PSMatrix +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap +import org.apache.spark.rdd.RDD +import com.tencent.angel.graph.psf.neighbors.SampleNeighborsWithCount.{GetNeighborWithCountParam, GetNeighborsWithCount, NeighborsAliasTableElement} + + +class Struct2vecPSModel(val edgesPsMatrix: PSMatrix) extends Serializable { + //分布式获取节点的邻接表 + + def initNodeNei(msgs: Long2ObjectOpenHashMap[NeighborsAliasTableElement]): Unit = { + //msgs是储存图节点的HashMap + val nodeIds = new Array[Long](msgs.size()) + val neighborElems = new Array[IElement](msgs.size()) //邻居节点 + val iter = msgs.long2ObjectEntrySet().fastIterator() //迭代器 + var index = 0 + //遍历迭代器,构建节点的id数组和邻居数组 + while (iter.hasNext) { + val i = iter.next() + nodeIds(index) = i.getLongKey + neighborElems(index) = i.getValue + index += 1 + } + //用构建的节点数组和邻居数组初始化边的矩阵 + edgesPsMatrix.psfUpdate(new GeneralInit(new LongKeysUpdateParam(edgesPsMatrix.id, nodeIds, neighborElems))).get() + } + + //取节点测试 + def getSampledNeighbors(psMatrix: PSMatrix, nodeIds: Array[Long], count: Array[Int]): Long2ObjectOpenHashMap[Array[Long]] = { + psMatrix.psfGet( + new GetNeighborsWithCount( + new GetNeighborWithCountParam(psMatrix.id, nodeIds, count))).asInstanceOf[GetLongsResult].getData + } + + def checkpoint(): Unit = { + edgesPsMatrix.checkpoint() + } + +} + +//伴生对象 +object Struct2vecPSModel { + def apply(modelContext: ModelContext, data: RDD[Long], + useBalancePartition: Boolean, balancePartitionPercent: Float): Struct2vecPSModel= { + //创建上下文 + val matrix = ModelContextUtils.createMatrixContext(modelContext, RowType.T_ANY_LONGKEY_SPARSE, classOf[NeighborsAliasTableElement]) + + // 调用后删除上下文 + if (!modelContext.isUseHashPartition && useBalancePartition) + LoadBalancePartitioner.partition( + data, modelContext.getMaxNodeId, modelContext.getPartitionNum, matrix, balancePartitionPercent) + + val psMatrix = PSMatrix.matrix(matrix) + new DeepWalkPSModel(psMatrix) + + } +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecParams.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecParams.scala new file mode 100644 index 000000000..d839dfe5d --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecParams.scala @@ -0,0 +1,199 @@ +package com.tencent.angel.graph.embedding.struct2vec + +class Struct2vecParams extends Serializable { + + var partitionNum: Int = _ + var walkLength: Int = _ + var stay_prob: Float = _ + var opt1_reduce_len: Boolean = true + var opt2_reduce_sim_calc: Boolean = false + var opt3_num_layers:Int = _ + var embeddingDim: Int = _ + var negSample: Int = _ + var learningRate: Float = _ + var decayRate: Float = _ + var batchSize: Int = _ + var logStep: Int = _ + var numEpoch: Int = _ + var maxIndex: Int = _ + var minIndex: Int = _ + var sampleRate: Float = _ + var numPSPart: Int = 1 + var modelPath: String = _ + var extraInputEmbeddingPath: String = _ + var extraContextEmbeddingPath: String = _ + var checkpointInterval: Int = Int.MaxValue + var saveModelInterval: Int = Int.MaxValue + var order: Int = _ + var nodesNumPerRow: Int = -1 + var numRowDataSet: Option[Long] = None + var seed: Int = _ + var maxLength: Int = -1 + var nodeTypePath: String = "" + var saveContextEmbedding: Boolean = _ + var output:String = _ + + def setMaxLength(maxLength: Int): this.type = { + this.maxLength = maxLength + this + } + + def setNumRowDataSet(numRowDataSet: Long): this.type = { + this.numRowDataSet = Some(numRowDataSet) + this + } + + def setSeed(seed: Int): this.type = { + this.seed = seed + this + } + + def setPartitionNum(partitionNum: Int): this.type = { + require(partitionNum > 0, s"require partitionNum > 0, $partitionNum given") + this.partitionNum = partitionNum + this + } + + def setWalkLength(walkLength: Int): this.type = { + require(walkLength > 0, s"require walkLength > 0, $walkLength given") + this.walkLength = walkLength + this + } + + def setEmbeddingDim(embeddingDim: Int): this.type = { + require(embeddingDim > 0, s"require embedding dimension > 0, $embeddingDim given") + this.embeddingDim = embeddingDim + this + } + + def setNegSample(negSample: Int): this.type = { + require(negSample > 0, s"require num of negative sample > 0, $negSample given") + this.negSample = negSample + this + } + + def setLearningRate(learningRate: Float): this.type = { + require(learningRate > 0, s"require learning rate > 0, $learningRate given") + this.learningRate = learningRate + this + } + + def setDecayRate(decayRate: Float): this.type = { + require(decayRate > 0, s"require decay rate > 0, $decayRate given") + this.decayRate = decayRate + this + } + + def setBatchSize(batchSize: Int): this.type = { + require(batchSize > 0, s"require batch size > 0, $batchSize given") + this.batchSize = batchSize + this + } + + def setLogStep(logStep: Int): this.type = { + require(logStep > 0, s"require log step > 0, $logStep given") + this.logStep = logStep + this + } + + def setNumEpoch(numEpoch: Int): this.type = { + require(numEpoch > 0, s"require num of epoch > 0, $numEpoch given") + this.numEpoch = numEpoch + this + } + + def setMaxIndex(maxIndex: Long): this.type = { + require(maxIndex > 0 && maxIndex < Int.MaxValue, s"require maxIndex > 0 && maxIndex < Int.maxValue, $maxIndex given") + this.maxIndex = maxIndex.toInt + this + } + + def setMinIndex(minIndex: Long): this.type = { + require(minIndex >= 0 && minIndex < Int.MaxValue, s"require minIndex >= 0 && minIndex < Int.maxValue, $minIndex given") + this.minIndex = minIndex.toInt + this + } + + def setSampleRate(sampleRate: Float): this.type = { + require(sampleRate > 0, s"sample rate belongs to [0, 1], $sampleRate given") + this.sampleRate = sampleRate + this + } + + def setModelPath(modelPath: String): this.type = { + require(null != modelPath && modelPath.nonEmpty, s"require non empty path to save model, $modelPath given") + this.modelPath = modelPath + this + } + + def setExtraInputEmbeddingPath(extraInputEmbeddingPath: String): this.type = { + this.extraInputEmbeddingPath = extraInputEmbeddingPath + this + } + + def setExtraContextEmbeddingPath(extraContextEmbeddingPath: String): this.type = { + this.extraContextEmbeddingPath = extraContextEmbeddingPath + this + } + + def setNodeTypePath(nodeTypePath: String): this.type = { + this.nodeTypePath = nodeTypePath + this + } + + def setSaveContextEmbedding(saveContextEmbedding: Boolean): this.type = { + this.saveContextEmbedding = saveContextEmbedding + this + } + + def setopt1_reduce_len(opt1_reduce_len: Boolean): this.type = { + this.opt1_reduce_len = opt1_reduce_len + this + } + + def setopt2_reduce_sim_calc(opt2_reduce_sim_calc: Boolean): this.type = { + this.opt2_reduce_sim_calc = opt2_reduce_sim_calc + this + } + + def setopt3_num_layers(opt3_num_layers: Int): this.type = { + this.opt3_num_layers = opt3_num_layers + this + } + + def setModelCPInterval(modelCPInterval: Int): this.type = { + require(modelCPInterval > 0, s"model checkpoint interval > 0, $modelCPInterval given") + this.checkpointInterval = modelCPInterval + this + } + + def setModelSaveInterval(modelSaveInterval: Int): this.type = { + require(modelSaveInterval > 0, s"model save interval > 0, $modelSaveInterval given") + this.saveModelInterval = modelSaveInterval + this + } + + def setOrder(order: Int): this.type = { + require(order == 1 || order == 2, s"order equals 1 or 2, $order given") + this.order = order + this + } + + def setNumPSPart(numPSPart: Option[Int]): this.type = { + require(numPSPart.fold(true)(_ > 0), s"require num of PS part > 0, $numPSPart given") + numPSPart.foreach(this.numPSPart = _) + this + } + + def setNodesNumPerRow(nodesNumPerRow: Option[Int]): this.type = { + nodesNumPerRow.foreach(this.nodesNumPerRow = _) + this + } + + def setoutPut(output: String): this.type = { + this.output = output + this + } + + +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtw.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtw.scala new file mode 100644 index 000000000..49db8d037 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtw.scala @@ -0,0 +1,31 @@ +package com.tencent.angel.graph.embedding.struct2vec.fastdtw + +import com.tencent.angel.graph.embedding.struct2vec.DTW +import com.tencent.angel.graph.embedding.struct2vec.fastdtwUtils.{CostMatrix, RangeDiagonalConstraints, Space, TimeSeriesElement} + +/** + * @param searchRadius + * @param space + * @tparam T + */ +class fastdtw[T](searchRadius: Int, space: Space[T]) { + + lazy val requiredSearchRaadius: Int = math.max(0, searchRadius) + lazy val minTsSize: Int = requiredSearchRaadius + 2 + lazy val dtw: DTW[T] = new DTW(space) + + /** + * @param left + * @param right + * @return + */ + //@tailrec + final def evaluate(left: Seq[TimeSeriesElement[T]], right: Seq[TimeSeriesElement[T]]): CostMatrix = + (space.coarsen(left.length / 2), space.coarsen(right.length / 2), left.size > minTsSize && right.size > minTsSize) match { + case (Some(coarsenLeft), Some(coarsenRight), true) => + val newLeft = coarsenLeft(left) + val newRight = coarsenRight(right) + dtw.costMatrix(left, right, RangeDiagonalConstraints.fromCostMatrix(evaluate(newLeft, newRight))) + case _ => dtw.costMatrix(left, right) + } +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/CostMatrix.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/CostMatrix.scala new file mode 100644 index 000000000..9c4fc8dfe --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/CostMatrix.scala @@ -0,0 +1,122 @@ +package struct2vec.fastdtwUtils + +import scala.annotation.tailrec + +/** + * An entry in a matrix + * + * @param index tuple representing index (i,j) + * @param value value of the matrix + */ +final case class MatrixEntry(index: (Int, Int), value: Double) + +/** + * A direction of travel when traversing a matrix + */ +trait Direction +object Direction extends Direction { + case object Southwest extends Direction + case object South extends Direction + case object West extends Direction +} + +/** + * A matrix representing the incremental distance cost incurred by comparing two time series at indices i and j + * + * @param structure + */ +final case class CostMatrix(structure: Seq[MatrixEntry]*) extends Seq[Seq[MatrixEntry]] { + + def apply(i: Int): Seq[MatrixEntry] = { + structure(i) + } + + def apply(i: Int, j: Int): MatrixEntry = { + structure(i)(j) + } + + override def length: Int = structure.length + + /** + * Prunes the top-right-most element from the cost matrix and + * + * @param direction + * @return + */ + def reduce(direction: Direction): CostMatrix = { + val topRightY: Int = structure.lastOption.flatMap(_.lastOption.map(_.index._2)).getOrElse(0) + import Direction._ + direction match { + case Southwest => //prune on diagonal (column above and row to right) + CostMatrix(structure.init.map(_.takeWhile(_.index._2 < topRightY)): _*) + case South => CostMatrix(structure.map(_.takeWhile(_.index._2 < topRightY)): _*) //prune the row above + case West => CostMatrix(structure.dropRight(1): _*) //prune the column to the right + } + } + + /** + * `optimalPath` is the cheapest sequence of comparison indices, ie. the calculated warp path of the DTW algorithm + * `optimalCost` is the cost of traversing this path + */ + lazy val (optimalPath: Seq[(Int, Int)], optimalCost: Double) = { + val initialEntries = structure.lastOption.flatMap(_.lastOption.map(o => Seq(o))).getOrElse(Seq()) + val entries: Seq[MatrixEntry] = computeOptimalPath.reverse ++ initialEntries + + entries.map(_.index) -> entries.lastOption.map(_.value).getOrElse(0D) + } + + @tailrec + private def dropRightWhile(f: MatrixEntry => Boolean, maxTimes: Int = Int.MaxValue)(column: Seq[MatrixEntry]): Seq[MatrixEntry] = { + if (maxTimes == 0 || column.isEmpty || !f(column.last)) + column + else + dropRightWhile(f, maxTimes - 1)(column.dropRight(1)) + } + private def shouldDrop(y: Int): MatrixEntry => Boolean = { e: MatrixEntry => e.index._2 >= y } + + private def computeOptimalPath: Seq[MatrixEntry] = { + val lastColumn = structure.lastOption + val secondToLastColumn = structure.dropRight(1).lastOption + + (lastColumn, secondToLastColumn) match { + case (None, _) | (_, None) => //less than 2 columns exist, we're done + Seq() + case (Some(l), Some(sl)) => + val (_, curY) = l.last.index + + //item, south ++ west + val evaluate = Seq( + dropRightWhile(shouldDrop(curY), 1)(sl).lastOption.map { _ -> Direction.Southwest }, + dropRightWhile(shouldDrop(curY), 1)(l).lastOption.map { _ -> Direction.South }, + sl.lastOption.map { _ -> Direction.West }).flatten + if (evaluate.isEmpty) { + Seq() + } else { + val (min, minAt) = evaluate.minBy(_._1.value) + min +: reduce(minAt).computeOptimalPath + } + } + } + + def asString: String = structure.foldLeft(Seq[String]()) { (acc, cur) => + if (acc.isEmpty) { + cur.map(_.value.round.toString) + } else { + acc.zip(cur).map { case (existing, item) => s"$existing ${item.value.round}" } + } + }.reverse.mkString("\n") + + override def iterator: Iterator[Seq[MatrixEntry]] = structure.iterator +} + +object CostMatrix { + + /** Creates a cost matrix from raw values */ + def fromValues(columns: Seq[Double]*): CostMatrix = CostMatrix(columns.foldLeft(Seq[Seq[MatrixEntry]]()) { (acc, cur) => + val columnIndex = acc.length + acc :+ cur.foldLeft(Seq[MatrixEntry]()) { (col, item) => + col :+ MatrixEntry(columnIndex -> col.length, item) + } + }: _*) + +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/IndexConstraints.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/IndexConstraints.scala new file mode 100644 index 000000000..f7386cb98 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/IndexConstraints.scala @@ -0,0 +1,104 @@ +package struct2vec.fastdtwUtils + +/** + * A contract for constraining which indices should be explored in a DTW traversal + */ +trait IndexConstraints { + /** Returns the min and max row indices for a given column index with a maximum number of entries */ + def columnRange(i: Int, max: Int): (Int, Int) + + /** Applies `columnRange` to a column of `TimeAndVector` entries */ + def mask[T](t: Seq[TimeSeriesElement[T]], column: Int): Seq[TimeSeriesElement[T]] + + /** Applies `columnRange` to a column of `MatrixEntry`s */ + def constrain(t: Seq[MatrixEntry], column: Int): Seq[MatrixEntry] +} + +/** + * A base class for constraints that are constrained around the diagonal of the matrix + */ +abstract class DiagonalConstraints() extends IndexConstraints { + def mask[T](t: Seq[TimeSeriesElement[T]], column: Int): Seq[TimeSeriesElement[T]] = { + val (minIndex, maxIndex) = columnRange(column, t.length - 1) + t.slice(minIndex, maxIndex + 1) + } + + def constrain(t: Seq[MatrixEntry], column: Int): Seq[MatrixEntry] = { + val (minIndex, maxIndex) = columnRange(column, t.maxBy(_.index._2).index._2) + t.filter { e => e.index._2 >= minIndex && e.index._2 <= maxIndex } + } +} + +/** + * Constraints allowing exploration of a window above and below the diagonal + * + * eg. applying window = 1 to + * 1 2 3 4 x x 3 4 + * 5 6 7 8 => x 6 7 8 + * 9 8 7 6 9 8 7 x + * 5 4 3 2 5 4 x x + * + * @param window radius of the window around the diagonal + */ +final case class UniformDiagonalConstraints(window: Int = 3) extends DiagonalConstraints { + def columnRange(i: Int, max: Int): (Int, Int) = math.max(0, i - window) -> math.min(max, i + window) +} + +/** + * Constraints allowing arbitrary specification of constraints by column. Defaults to all values if a + * column's range is not explicitly specified + * + * @param ranges explicit map of column index => range + */ +final case class RangeDiagonalConstraints(ranges: Map[Int, (Int, Int)]) extends DiagonalConstraints { + def columnRange(i: Int, max: Int): (Int, Int) = ranges.get(i).map { + case (lower, upper) => + lower -> math.min(upper, max) + }.getOrElse(0 -> max) +} + +object RangeDiagonalConstraints { + + /** + * Calculates the expected `RangeDiagonalConstraints` from a `CostMatrix`'s optimal path. + * Assumes the cost matrix is sorted by its indices and has values specified + * + * @param lowResCost + * @return + */ + def fromCostMatrix(lowResCost: CostMatrix): RangeDiagonalConstraints = RangeDiagonalConstraints { + lowResCost.optimalPath.foldLeft(Map[Int, (Int, Int)]()) { + case (acc, cur) => + //assume matrix columns are in order and are populated + val minY = 2 * cur._2 + val maxY = 2 * cur._2 + 1 + + val currentIndex = 2 * cur._1 + val previousIndex = currentIndex - 1 + val currentRange = acc.get(currentIndex) + val currentRangeNext = acc.get(currentIndex + 1) + val previousRange = acc.get(previousIndex) + acc ++ { + (currentRange, currentRangeNext, previousRange) match { + case (Some((mn, mx)), Some((mnn, mxn)), _) => + val range1 = math.min(mn, minY) -> math.max(mx, maxY) + val range2 = math.min(mnn, minY) -> math.max(mxn, maxY) + Map(currentIndex -> range1, currentIndex + 1 -> range2) + case (_, _, Some((mn, mx))) if mx < minY => + Map(currentIndex -> (minY - 1 -> maxY), currentIndex + 1 -> (minY -> maxY), previousIndex -> (mn -> (mx + 1))) + case _ => + Map(currentIndex -> (minY -> maxY), currentIndex + 1 -> (minY -> maxY)) + } + } + } + } +} + +/** + * A fully permissive `IndexConstraints` implementation + */ +class PassthroughIndexConstraints extends IndexConstraints { + def columnRange(i: Int, max: Int): (Int, Int) = 0 -> max + def mask[T](t: Seq[TimeSeriesElement[T]], column: Int): Seq[TimeSeriesElement[T]] = t + def constrain(t: Seq[MatrixEntry], column: Int): Seq[MatrixEntry] = t +} \ No newline at end of file diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/Space.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/Space.scala new file mode 100644 index 000000000..8551e8b3a --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/Space.scala @@ -0,0 +1,115 @@ +package struct2vec.fastdtwUtils + +/** + * A type, a metric, and some associated functions that allow coarsening and estimation of missing value contributions. + * + * @tparam T + */ +trait Space[T] { + /** + * The value to add to a distance metric when a value is missing + * + * @param otherValue + * @return + */ + def missingValue(otherValue: Option[T]): Double + + /** + * The distance function between two `T` when the values exist + * + * @return + */ + def metric: (T, T) => Double + + /** + * Takes non-overlapping groups of `resolution` points and applies a coarsening operation to them such that + * the value on the neighborhood so defined is minimized when it contains a point that's a + * minimum under the existing metric at high resolution. + * + * Should be `None` if no such functions are known. + * + * @param resolution + * @return + */ + def coarsen(resolution: Int): Option[Seq[TimeSeriesElement[T]] => Seq[TimeSeriesElement[T]]] + + /** + * The distance function accounting for missing values + * + * @return + */ + def distance: (Option[T], Option[T]) => Double = { + case (Some(a), Some(b)) => metric(a, b) + case (None, a) => missingValue(a) + case (a, None) => missingValue(a) + } + +} + +/** + * Mixin for spaces where a missing value is ignored in the distance calculations + * + * @tparam T + */ +trait MissingValueContributesNothing[T] { space: Space[T] => + def missingValue(otherValue: Option[T]): Double = 0 +} + +/** + * Vector[Double] space with a standard euclidean metric and ignored missing values + */ +object EuclideanSpace extends Space[VectorValue[Double]] with MissingValueContributesNothing[VectorValue[Double]] { + override def metric: (VectorValue[Double], VectorValue[Double]) => Double = { + (l: VectorValue[Double], r: VectorValue[Double]) => + val ds = r.v.zip(l.v).foldLeft(0D) { (acc, cur) => + val d = cur._1 - cur._2 + acc + d * d + } + math.sqrt(ds) + } + + override def coarsen(resolution: Int): Option[Seq[TimeSeriesElement[VectorValue[Double]]] => Seq[TimeSeriesElement[VectorValue[Double]]]] = Some { input => + input.grouped(input.size / resolution).map { groupedItems: Seq[TimeSeriesElement[VectorValue[Double]]] => //grouped items need to all be added together + val nonEmptyItems = groupedItems.dropWhile(_.v.isEmpty) + val summedVector = VectorValue(nonEmptyItems.drop(1).foldLeft(nonEmptyItems.head.v.get.toSeq) { (acc: Seq[Double], cur: TimeSeriesElement[VectorValue[Double]]) => + cur.v.map { vv => + acc.zip(vv).map { case (l: Double, r: Double) => l + r } + }.getOrElse(acc) //assumption about missing values here: they have no effect + }.map(_ / groupedItems.length.toDouble): _*) + TimeSeriesElement(groupedItems.head.t, Some(summedVector)) + }.toSeq + } +} + +/** + * String space counting locations where strings don't match and ignoring missing contributions + */ +object HammingSpace extends Space[String] with MissingValueContributesNothing[String] { + override def metric: (String, String) => Double = { (i, j) => i.zip(j).count { case (l, r) => l != r } } + + override def coarsen(resolution: Int): Option[Seq[TimeSeriesElement[String]] => Seq[TimeSeriesElement[String]]] = Some { input => + input.grouped(input.size / resolution).map { groupedItems => + val v: String = groupedItems.foldLeft("") { (acc: String, cur: TimeSeriesElement[String]) => + val s: String = cur.v.getOrElse("") + acc + s + } + TimeSeriesElement(groupedItems.head.t, Some(v)) + }.toSeq + } +} + +/** + * String space using a Jaccard metric and ignoring missing contributions + */ +object JaccardSpace extends Space[String] with MissingValueContributesNothing[String] { + override def metric: (String, String) => Double = { + case (i: String, j: String) if i.nonEmpty || j.nonEmpty => + val is = i.toSet + val js = j.toSet + val intersect = (is intersect js).size.toDouble + intersect / (is.size.toDouble + js.size.toDouble - intersect) + case _ => 1 + } + + override def coarsen(resolution: Int): Option[Seq[TimeSeriesElement[String]] => Seq[TimeSeriesElement[String]]] = None +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/TimeSeriesElement.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/TimeSeriesElement.scala new file mode 100644 index 000000000..328316448 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/TimeSeriesElement.scala @@ -0,0 +1,11 @@ +package struct2vec.fastdtwUtils + +import com.madsync.time.DateTime + +/** + * A timestamp and some optional value + * + * @param t timestamp associated with this entry + * @param v value associated with this element + */ +final case class TimeSeriesElement[T](t: DateTime, v: Option[T]) {} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/VectorValue.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/VectorValue.scala new file mode 100644 index 000000000..595844e2b --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/VectorValue.scala @@ -0,0 +1,35 @@ +package struct2vec.fastdtwUtils + +/** + * A "vector" of BigDecimal values + * + * @param v a varagrs sequence of big decimals making up the values for each dimension + */ +final case class VectorValue[T](v: T*) extends Seq[T] { + override def length: Int = v.size + + override def iterator: Iterator[T] = v.iterator + + def apply(n: Int): T = { + v(n) + } + + /** Creates a copy of this vector with the value `v` at the specified `index`` */ + def withElement(index: Int, v: T): VectorValue[T] = { + val r = index match { + case 0 => VectorValue(v) ++ tail + case x if x == (length - 1) => init :+ v + case _ => (dropRight(length - index) :+ v) ++ drop(index + 1) + } + VectorValue(r: _*) + } + +} + +object VectorValue { + /** Creates an empty vector */ + def empty[T](size: Int, init: T): VectorValue[T] = { + VectorValue(Seq.fill(size)(init): _*) + } + +} From c680e55319eed38857e68c264324b3e4a078e831 Mon Sep 17 00:00:00 2001 From: Terry Date: Mon, 29 Aug 2022 17:55:06 +0800 Subject: [PATCH 4/6] work week 6 --- .../embedding/struct2vec/Alias_table.scala | 13 +- .../embedding/struct2vec/BiasWalker.scala | 2 +- .../graph/embedding/struct2vec/DTW.scala | 4 +- .../embedding/struct2vec/Struct2vec.scala | 480 ++++++++++-------- .../Struct2vecGraphParatition.scala | 8 +- .../struct2vec/Struct2vecPSModel.scala | 2 +- .../struct2vec/Struct2vecParams.scala | 7 + .../graph/embedding/struct2vec/fastdtw.scala | 6 +- .../fastdtwUtils/TimeSeriesElement.scala | 2 +- 9 files changed, 301 insertions(+), 223 deletions(-) diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Alias_table.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Alias_table.scala index b0ece9aed..dcca23567 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Alias_table.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Alias_table.scala @@ -1,10 +1,21 @@ -package com.tencent.angel.graph.embedding.struct2vec +package struct2vec import scala.collection.mutable.ArrayBuffer import scala.util.Random object Alias_table { + def calcAliasTable(partId: Int, iter: Iterator[(Long, Array[(Long, Float)])]): Iterator[(Long, Array[Long], Array[Float], Array[Int])] = { + iter.map { case (src, neighbors) => + val (events, weights) = neighbors.unzip + val weightsSum = weights.sum + val len = weights.length + val areaRatio = weights.map(_ / weightsSum * len) + val (accept, alias) = createAliasTable(areaRatio) + (src, events, accept, alias) + } + } + def createAliasTable(areaRatio: Array[Float]): (Array[Float], Array[Int]) = { val len = areaRatio.length val small = ArrayBuffer[Int]() diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/BiasWalker.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/BiasWalker.scala index e2a46b621..87e8a4449 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/BiasWalker.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/BiasWalker.scala @@ -1,4 +1,4 @@ -package com.tencent.angel.graph.embedding.struct2vec +package struct2vec import struct2vec.Alias_table.alias_sample diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/DTW.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/DTW.scala index 0cd403227..d70ea782a 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/DTW.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/DTW.scala @@ -1,6 +1,6 @@ -package com.tencent.angel.graph.embedding.struct2vec +package struct2vec -import com.tencent.angel.graph.embedding.struct2vec.fastdtwUtils.{ CostMatrix, IndexConstraints, MatrixEntry, PassthroughIndexConstraints, Space, TimeSeriesElement } +import fastdtwUtils.{ CostMatrix, IndexConstraints, MatrixEntry, PassthroughIndexConstraints, Space, TimeSeriesElement } /** * A wrapper for the cost matrix calculation based on a specified metric diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala index ac0262f51..c3a8bd59f 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala @@ -1,257 +1,308 @@ -package com.tencent.angel.graph.embedding.struct2vec +package struct2vec -import com.tencent.angel.graph.embedding.struct2vec.{Struct2vecParams,Struct2vecGraphPartition} +import com.tencent.angel.graph.embedding.struct2vec.{Struct2vecGraphPartition, Struct2vecParams} -import java.lang.Math.{abs, max, min} +import java.lang.Math.{abs, log, max, min} import scala.collection.JavaConversions.asJavaCollection import scala.collection.immutable.Map import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, ListBuffer, Queue, Set} import scala.util.Random import struct2vec.Struct2vecParams +import struct2vec.fastdtw.fastdtw +import struct2vec.fastdtwUtils.{EuclideanSpace, TimeSeriesElement, VectorValue} + +import java.util +import java.util.ArrayList +import scala.util.control.Breaks.break class Struct2vec(params: Struct2vecParams ) { // private val graph_Nodes : List[Int] = List() -// private val idx2Nodes : List[Int] = List() -// private val Nodes2idx : List[Int] = List() + private val idx2Nodes : Array[Int] = Array() + private val Nodes2idx : List[Int] = List() // private val idx : List[Int] = List.range(0,idx2Nodes.length,1) // private val graph : List[Int] = List() // private val embedding :Map[String,Int] = Map() + override def transform(dataset: Dataset[_]): DataFrame = { + //create origin edges RDD and data preprocessing + val rawEdges = NeighborDataOps.loadEdgesWithWeight(dataset, params.srcNodeIdCol), params.dstNodeIdCol, params.weightCol, params.isWeighted, params.needReplicaEdge, true, false, false) + rawEdges.repartition(params.partitionNum).persist(params.StorageLevel.DISK_ONLY) + val (minId, maxId, numEdges) = Stats.summarizeWithWeight(rawEdges) + println(s"minId=$minId maxId=$maxId numEdges=$numEdges level=${params.StorageLevel}") + for(i <- 0 to maxId) idx2Nodes(i) = i - private def createMatrix(name: String, - numRow: Int, - minId: Long, - maxId: Long, - rowType: RowType, - psNumPartition: Int, - data: RDD[(Long, Long)], - useBalancePartition: Boolean, - percent: Float): PSMatrixImpl = { + val edges = rawEdges.map { case (src, dst, w) => (src, (dst, w)) } - val modelContext = new ModelContext($(psPartitionNum), minId, maxId, -1, name, - SparkContext.getOrCreate().hadoopConfiguration) + // calc alias table for each node + val aliasTable = edges.groupByKey(params.partitionNum).map(x => (x._1, x._2.toArray.distinct)) + .mapPartitionsWithIndex { case (partId, iter) => + Alias_table.calcAliasTable(partId, iter) + } - val matrix = ModelContextUtils.createMatrixContext(modelContext, rowType, - classOf[LongArrayElement]) + //ps process;create ps nodes adjacency matrix + println("start to run ps") + PSContext.getOrCreate(SparkContext.getOrCreate()) - if (useBalancePartition && (!modelContext.isUseHashPartition)) { - index = data.flatMap(f => Iterator(f._1, f._2)) - .persist($(storageLevel)) - LoadBalancePartitioner.partition(index, modelContext.getMaxNodeId, - modelContext.getPartitionNum, matrix, percent) - } + // Create model + val modelContext = new ModelContext(params.psPartitionNum, minId, maxId + 1, -1, + "struct2vec", SparkContext.getOrCreate().hadoopConfiguration) - val psMatrix = PSMatrix.matrix(matrix) - val psMatrixImpl = new PSMatrixImpl(psMatrix.id, matrix.getName, 1, - modelContext.getMaxNodeId, matrix.getRowType) + // val data = edges.map(_._2._1) // ps loadBalance by in degree + val data = edges.flatMap(f => Iterator(f._1, f._2._1)) //拿出(src,neighbors) - if (useBalancePartition && (!modelContext.isUseHashPartition)) - index.unpersist() + //val model = DeepWalkPSModel.fromMinMax(minId, maxId, data, $(psPartitionNum), useBalancePartition = $(useBalancePartition)) + val model = Struct2vecPSModel(modelContext, data, params.useBalancePartition, params.balancePartitionPercent) + val degreed_list = compute_orderd_degreelist(data,params.max_num_layers ) + val degrees = create_vector(data) - psMatrixImpl - } + //push node adjacency list into ps matrix; create graph with (node,sample path) + val graphOri = aliasTable.mapPartitionsWithIndex((index, adjTable) => + Iterator(Struct2vecGraphPartition.initPSMatrixAndNodePath(model, index, adjTable, params.batchSize)))) - //计算有序的度列表 - def compute_orderd_degreelist(max_num_layers: Int): Map[Int, Int] = { - var degreeList : Map[Int, Int] = Map() - var nodes = graph_Nodes + graphOri.persist($(storageLevel)) + //trigger action + graphOri.foreachPartition(_ => Unit) - for(node <- nodes){ - degreeList+=(node->get_order_degreelist_node(node,max_num_layers)) - } - return degreeList - } - - //获取有序度的节点 - def get_order_degreelist_node(root: Int, max_num_layers: Int=0):Int = { - if (max_num_layers == 0) {max_num_layers = Double.PositiveInfinity } - var order_degree_sequence_dict:Map[Int,Int] = Map() + } - var visited:ListBuffer[Boolean] = ListBuffer.fill(graph_Nodes.length)(false) + def compute_orderd_degreelist(graph_adj:Iterator[(Long,Array[Long])],max_num_layers:Int):ArrayBuffer[Array[Array[(Long,Long)]]]={ + val order_list: ArrayBuffer[Array[Array[(Long,Long)]]] = ArrayBuffer() + graph_adj.foreach { f => + order_list.append(get_orderd_degreelist(f._1.toInt,f._2,max_num_layers)) + } + order_list //[level,[node,[order,(degree,count)]] + } - val queue : Queue[Int] = Queue() - var level =0 + //计算有序的度列表,获取单个节点的有序度序列 graph:Array[(Long, Array[Long])] + def get_orderd_degreelist(root:Int,neighbors:Array[Long],max_num_layers: Int): Array[Array[(Long,Long)]] = { + var ordered_degree_sequence_dict :Array[Array[(Long,Long)]] = Array() + var visited :ArrayBuffer[Boolean] = ArrayBuffer.fill(idx2Nodes.length)(false) + var queue :mutable.Queue[Int] = mutable.Queue() + var degree_list:ArrayBuffer[Long] = ArrayBuffer() + var orderd_degree_list : Array[(Long,Long)] = Array() - // in queue - queue += root + var level = 0 visited(root) = true - while(queue.length >0 && level <= max_num_layers){ - var count :Int = queue.length - - - while(count > 0){ - var top = queue.dequeue() - var node = idx2Nodes(top) - var degree = graph(node) - - if (opt1_reduce_len == true){ - var degree_list : mutable.Map[Int,Int] = mutable.Map() - degree_list(degree) = degree_list.get(degree) + 1 - }else{ - var degree_list : List[Int] = List() - degree_list:+degree - } + while(queue.length>0 && level < max_num_layers ){ + var count = queue.length + //opt1_method + while(queue.length>0){ + var top = queue.dequeue().toInt + var node = idx2Nodes(top) + var degree = neighbors.length + degree_list(degree) += 1 // count node freq - for (nei <- graph(node)){ - val nei_idx = Nodes2idx(nei) - if (visited(nei_idx) == false){ - visited(nei_idx) = true - queue.enqueue(nei_idx) + for(nei <- neighbors){ + var nei_index = Nodes2idx(nei.toInt) + if( !visited(nei_index)){ + visited(nei_index) = true + queue.enqueue(nei_index) } } count-=1 } + for(degree <- degree_list;index <- 0 to degree_list.length) { + orderd_degree_list(index) = (degree,degree_list(degree.toInt)) + } + orderd_degree_list.sortBy(f => f._1) + ordered_degree_sequence_dict(level) = orderd_degree_list + level+=1 + } - var order_degree_list:ListBuffer[(Int,Int)] = ListBuffer() - if(opt1_reduce_len == true){ - for((degree :Int,freq :Int)<- degree_list){ - order_degree_list.append((degree,freq)) - } - order_degree_list.sortBy(_._1) + ordered_degree_sequence_dict + } - }else{order_degree_list.sorted} + // - order_degree_sequence_dict+=(level-> order_degree_list) - level-=1 - } - } //计算结构距离 - def compute_structural_distance(max_num_layers:Int,workers:Int=1,verbose:Int=0): Unit = { + def compute_structural_distance(degreeList:ArrayBuffer[Array[Array[(Long,Long)]]], + graph_adj:Iterator[(Long,Array[Long])], + max_num_layers:Int, + degrees:(ArrayBuffer[Long],Array[Long],Array[Long]), + workers:Int=1,verbose:Int=0): Unit = { - if (opt1_reduce_len == true){ - var dist_func = cost_max() - }else{ - var dist_func = cost() - } + if (params.opt2_reduce_sim_calc == true) { + var degreeListSelected: Array[Array[Array[(Long,Long)]]] = Array() + var vertices: Array[ArrayBuffer[Long]] = Array() - var degreeList = compute_orderd_degreelist(max_num_layers) - - if (opt2_reduce_sim_calc == true){ - var degrees = create_vector() - var degreeListSelected: Map[Int,Int] = Map() - var vertices: Map[Int,Int] = Map() - val n_nodes = idx.length - for(v<-idx){ - var nbs = get_vertices(v,graph(idx2Nodes(v)),degrees,n_nodes) - vertices+=(v-> nbs) - degreeListSelected+=(v->degreeList(v)) - for(n<-nbs){ - degreeListSelected+=(n-> degreeList(n)) - } + val n_nodes = idx2Nodes.length + + graph_adj.foreach{ case (v, neighbors) => + var nbs = get_vertices(v,neighbors.length,degrees, n_nodes) + vertices(v.toInt) = nbs // store nbs + degreeListSelected(v.toInt)=degreeList(v.toInt) //store dist + + for(n <- nbs) + degreeListSelected(n.toInt)=degreeList(n.toInt) //store dist of nbs } + + + }else{ - var vertices :Map[Int,List[Int]] = Map() - for(v<-degreeList) { - for(vd <- degreeList.keys){ - if (vd>v){vertices+=(v->vd)} - } - } + var vertices: Array[Array[Int]] = Array() + + for(v <- idx2Nodes) { + vertices(v) = for(vd <- idx2Nodes if vd > v ) yield vd + } + } // for(part_list in partition_dict(vertices,workers)){ // var results = Parallel(workers)(delayed(cpmpute_dtw_dist)(part_list,degreelist,dist_func)) // } - // var dtw_dist = new Map[Map[]]() -// val structural_dist = convert_dtw_struc_dist(dtw_dist) + var dtw_dist = compute_dtw_dist(graph_adj,degreeList) + structural_dist = convert_dtw_struc_dist(dtw_dist) + distances + } + + def create_vector(graph:Iterator[(Long,Array[Long])] ) = { + var degrees_sort :Set[Long] = Set() + var vertices:ArrayBuffer[Long] = ArrayBuffer() + + graph.foreach{ case (node,neighbors) => + var degree = neighbors.length + degrees_sort.add(degree) + vertices.append(node) } -// return structural_dist + var degrees_sorted = degrees_sort.toArray.sorted + var l = degrees_sorted.length + + var degrees_before:Array[Long] = Array.fill(l)(0) + var degrees_after:Array[Long] = Array.fill(l)(0) + + var index = 0 + for(degree<- degrees_sorted) { + if (index > 0) { + degrees_before(index) = degrees_sorted(index - 1) + } + if (index < (l - 1)) { + degrees_after(index) =degrees_sorted(index + 1) + } + index += 1 + } + (vertices,degrees_before,degrees_after) // ( degree->数组索引,(node,before,after)) } - def create_vector(){ - val degrees :Map[Int,Map[String,ListBuffer[Int]]]= Map() - val degrees_sorted :Set[Int] = Set() - val G = graph - for(v<-idx){ - var degree :Int = G(idx2Nodes(v)).length - degrees_sorted.add(degree) -// if(degrees.contains(degree) == false){ -// degrees(degree) = -// } - degrees(degree)("vertices").append(v) - degrees_sorted = degrees_sorted.toArray - degrees_sorted.sorted - - var l = degrees_sorted.length - var index = 0 - for(degree<- degree_sorted){ - if(index>0){ - degrees(degree)+=("before"-> degrees_sorted(index-1)) - } - if(index<(l-1)){ - degrees(degree)+=("after"-> degrees_sorted(index+1)) + + def judge_degree_one(before:Long):Long = { + if(before==0) -1 + else before + } + + def judge_degree_two(now:Long,degree_b:Long,before:Long,after:Long):Long = { + if(degree_b==now) judge_degree_one(before) + else judge_degree_one(after) + } + // opt2 + def get_vertices(v: Long, + degree_v: Int, + degrees: (ArrayBuffer[Long], Array[Long], Array[Long]), + n_nodes: Int): ArrayBuffer[Long] = { + val nodes = degrees._1 + val before = degrees._2 + val after = degrees._3 + val a_vertices_selected = 2 * (log(n_nodes)/log(2)) + var vertices :ArrayBuffer[Long] = ArrayBuffer() + try{ + var c_v = 0 + for(v2 <- nodes){ + if(v!=v2){ + vertices.append(v2) + c_v += 1 + if(c_v > a_vertices_selected) + return vertices// stop Iteration } } + var degree_a = judge_degree_one(before(degree_v)) + var degree_b = judge_degree_one(after(degree_v)) + + + var degree_now = verifyDegrees(degree_v,degree_a,degree_b) + + //nearest vaild degree + while (true){ + vertices.foreach(v2 =>{ + if(v!=v2){ + vertices.append(v2) + c_v+=1 + if(c_v > a_vertices_selected) + return vertices // stop Iteration + } + }) + degree_a = judge_degree_two(degree_now,degree_b,before(degree_b.toInt),after(degree_a.toInt)) + degree_b = judge_degree_two(degree_now,degree_b,before(degree_b.toInt),after(degree_a.toInt)) + + if(degree_a == -1 & degree_b == -1) + return vertices // stop Iteration + + degree_now = verifyDegrees(degree_v,degree_a,degree_b) + } } - return degrees + vertices } - //获得层级 - def get_layer_rep(pair_distance:Map[(Int,Int),Map[Int,(Int,Int)]]) { - val layer_distances:Map[Int,Map[(Int,Int),(Int,Int)]]= Map() - var layer_adj:Map[Int,Map[Int,ListBuffer[Int]]] = Map() + def get_layer_rep(pair_distance:Array[((Long,Long),ArrayBuffer[(Int,Double)])]) ={ + + var layer_distances:Array[(Long,Long,Double)] = Array() //(vx,vy,distance) + var layer_adj:Array[(Long,Long,Long)] = Array() // (layer,vx,vy) - for((v_pair,layer_dist)<- pair_distance ){ - for((layer,distance)<- layer_dist){ - var vx:Int = v_pair._1 - var vy:Int = v_pair._2 + pair_distance.foreach{ case (v_pair,layer_distance) => + for((layer,distance)<- layer_distance){ + var vx = v_pair._1 + var vy = v_pair._2 - layer_distances(layer)(vx,vy) -> (layer,distance) - layer_adj(layer)(vx).append(vy) - layer_adj(layer)(vy).append(vx) + layer_distances(layer) = (vx,vy,distance) + + layer_adj(layer) = (layer,vx,vy) + layer_adj(layer) = (layer,vy,vx) } } - return layer_adj,layer_distances + (layer_adj,layer_distances) } + def change_layeradj(layers_adj:Array[(Int,Long,Long)]) = { + layers_adj.map{case(layer,vx,vy) => (layer,(vx,vy))}.groupBy(_._1).map(x=>(x._1,x._2.map(x=>(x._2._2)))).toArray + + } + def change_layer_distance(layers_distance:Array[(Long,Long,Double)]) ={ + layers_distance.map{case(vx,vy,distance) => ((vx,vy),distance)}.toArray + } //权重表 - def get_transition_probs(layers_adj:Map[Int,Map[Int,ListBuffer[Int]]], - layers_distances:Map[Int,Map[(Int,Int),(Int,Int)]]) { - - var layers_alias : Map[Int,Map[Int,Map[Int,List[Int]]]] = Map() - var layers_accept :Map[Int,Map[Int,Map[Int,List[Int]]]] = Map() - - for(layer<- layers_adj.keys){ -// var neighbors :Map[Int,ListBuffer[Int]] = layers_adj(layer) -// var layer_distances : Map[(Int,Int),(Int,Int)] = layers_distances(layer) - var node_alias_dict :Map[Int,Map[Int,List[Int]]] = Map() - var node_accept_dict :Map[Int,Map[Int,List[Int]]] = Map() - var norm_weights :Map[Int,ListBuffer[Double]] = Map() - - for((v,neighbors)<-layers_adj(layer)){ - var edge_list:ListBuffer[Double] = ListBuffer() - var sum_weight :Double = 0.0 - - for(n<- neighbors){ - if(layers_distances.contains((v,n))==true){ - edge_list.append(exp(layers_distances(layer)(v,n)) - }else{ - edge_list.append(exp(layers_distances(layer)(n,v)) - } - sum_weight += exp(layers_distances(layer)(n,v)) + def get_transition_probs(layers_distance:Array[((Long,Long),Double)], + layers_adj:Array[(Int,Array[Long])]) ={ - edge_list = for(x<-edge_list) yield x/sum_weight - norm_weights+=(v->edge_list) -// node_alias_dict+=(v->create_alias_table(edge_list)) -// node_accept_dict+=(v->create_alias_table(edge_list)) +// var layers_alias +// var layers_accept + + for(layer <- 0 to layers_adj.length){ + var neighbors = layers_adj(layer) + var layer_distance = layers_distance(layer) + + for( (v,nei) <- neighbors ){ + var e_list : ArrayBuffer[Double]= ArrayBuffer() + var sum_weight =0.0 + + for(n <- nei) { +// var wd = layer_distance() + } } - layers_alias+=(layer->node_alias_dict) - layers_accept+=(layer->node_accept_dict) } - } - return layers_accept , layers_alias + + +// (layers_accept , layers_alias) } def cost(a:List[Int],b:List[Int]):Double ={ @@ -278,48 +329,57 @@ class Struct2vec(params: Struct2vecParams ) { return result } - //dtw转换成结构距离 - def convert_dtw_struc_dist(distances:Map[Map[Int,List[(Int,Int)]],Map[Int,List[Int]]],startLayer:Int=1) = { - for((vertices , layers)<-distances){ - var keys_layers = layers.keys.toList - var startLayer:Int = min(keys_layers.length,startLayer) - - for(layer <- 0 to startLayer) {keys_layers.remove(0)} - for(layer <- keys_layers) {layers(layer)+=layers(layer - 1)} + //dtw转换成结构距离 (layer,(v1,v2,dist)) + def convert_dtw_struc_dist(distances:ArrayBuffer[(Long,Long,Int,Double)],startLayer:Int =1) = { + var dist = distances.map{ case (src,dist,layer,distance) => ((src,dist),(layer,distance))} + var pair = dist.groupBy(_._1).map(x=>(x._1,x._2.map(x=>(x._2._1,x._2._2)))).toArray //((v1,v2),ArrayBuffer(layer,distance)) + for((vertices,layer_distance)<-pair){ + var keys_layers = for((layer,distance) <- layer_distance) yield layer + keys_layers.sorted + var startLayer = min(keys_layers.length,startLayer) + for (layer <- 0 to startLayer) + keys_layers.remove(0) + + for(layer <- keys_layers) + layer_distance(layer) = (layer,layer_distance(layer)._2+layer_distance(layer-1)._2) } - return distances + pair } //确定度 - def verifyDegrees(degree:Int,degree_v_root:Int,degree_a:Int,degree_b:Int): Int ={ - var degree_now :Int = 0 - if(degree_b == -1){ - var degree_now:Int = degree_a - } else if (degree_a == -1) { - var degree_now :Int= degree_b - } else if (abs(degree_b - degree_v_root) < abs(degree_a - degree_v_root)){ - var degree_now :Int= degree_b - }else{ - var degree_now :Int= degree_a - } - return degree_now + def verifyDegrees(degree_v_root:Int,degree_a:Long,degree_b:Long): Long ={ + if(degree_b == -1) + degree_a + else if (degree_a == -1) + degree_b + else if (abs(degree_b - degree_v_root) < abs(degree_a - degree_v_root)) + degree_b + else + degree_a } //计算dtw距离 - def compute_dtw_dist(part_list:List[(Int,List[Int])],degreeList:Map[Int,List[Int]],dist_func:String){ - var dtw_dist :Map[(Int,Int),Int] = Map() - for((v1,nbs)<- part_list){ - var lists_v1 = degreeList(v1) - for(v2<-nbs){ - var lists_v2 = degreeList(v2) + def compute_dtw_dist(part_graph:Iterator[(Long,Array[Long])],degreeList:ArrayBuffer[Array[Array[(Long,Long)]]]) ={ + val pair_v = new ArrayBuffer[(Long,Long,Int,Double)] +// val layer_dist = new ArrayList[(Int,Double)] + + part_graph.foreach{ case (v1,neighbors) => + var lists_v1 = degreeList(v1.toInt) + neighbors.foreach( v2 =>{ + var lists_v2 = degreeList(v2.toInt) var max_layer = min(lists_v1.length,lists_v2.length) - for(layer<- 0 to max_layer){ - var dist , path = fastdtw(list_v1(layer),list_v2(layer),radius=1,dist=dist_func) - dtw_dist((v1,v2))+=(layer->dist) + + for(layer <- 0 to max_layer) yield { + var v1_degree_list = lists_v1(layer).map(f=>f._1.toDouble).toSeq.map(v=>TimeSeriesElement(Some(VectorValue(v)))) + var v2_degree_list = lists_v2(layer).map(f=>f._1.toDouble).toSeq.map(v=>TimeSeriesElement(Some(VectorValue(v)))) + var fdtw = new fastdtw(1,EuclideanSpace) + // var path = fdtw.evaluate(v1_degree_list,v2_degree_list).optimalPath + var dist = fdtw.evaluate(v1_degree_list,v2_degree_list).optimalCost + pair_v.append((v1,v2,layer,dist)) } - } + }) } - dtw_dist + pair_v } diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecGraphParatition.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecGraphParatition.scala index 2ea084260..d8ecdccc2 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecGraphParatition.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecGraphParatition.scala @@ -1,10 +1,8 @@ -package com.tencent.angel.graph.embedding.struct2vec +package struct2vec import com.tencent.angel.graph.psf.neighbors.SampleNeighborsWithCount.NeighborsAliasTableElement import scala.util.Random - - import scala.collection.mutable.ArrayBuffer import scala.util.Random @@ -67,7 +65,7 @@ class Struct2vecGraphPartition(index: Int, srcNodesArray: Array[Long], srcNodesS //单例对象 object Struct2vecGraphPartition { - def initPSMatrixAndNodePath(model: Struct2vecPSModel, index: Int, iterator: Iterator[(Long, Array[Long], Array[Float], Array[Int])], batchSize: Int): DeepWalkGraphPartition = { + def initPSMatrixAndNodePath(model: Struct2vecPSModel, index: Int, iterator: Iterator[(Long, Array[Long], Array[Float], Array[Int])], batchSize: Int): Struct2vecGraphPartition = { val srcNodes = new LongArrayList() iterator.sliding(batchSize, batchSize).foreach { pairs => val nodeId2Neighbors = new Long2ObjectOpenHashMap[NeighborsAliasTableElement](pairs.length) @@ -90,5 +88,7 @@ object Struct2vecGraphPartition { } new Struct2vecGraphPartition(index, iterator, srcNodesSamplePaths.toArray, batchSize) } + + } diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecPSModel.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecPSModel.scala index 09f1c229a..ca82d37ef 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecPSModel.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecPSModel.scala @@ -1,4 +1,4 @@ -package com.tencent.angel.graph.embedding.struct2vec +package struct2vec import com.tencent.angel.graph.common.param.ModelContext diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecParams.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecParams.scala index d839dfe5d..4e3df8bd2 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecParams.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecParams.scala @@ -8,6 +8,7 @@ class Struct2vecParams extends Serializable { var opt1_reduce_len: Boolean = true var opt2_reduce_sim_calc: Boolean = false var opt3_num_layers:Int = _ + var max_num_layers:Int = _ var embeddingDim: Int = _ var negSample: Int = _ var learningRate: Float = _ @@ -190,6 +191,12 @@ class Struct2vecParams extends Serializable { this } + def setMaxLayers(layers: Int): this.type = { + require(max_num_layers>0, s"require num of max layer > 0, $max_num_layers given") + this.max_num_layers = layers + this + } + def setoutPut(output: String): this.type = { this.output = output this diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtw.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtw.scala index 49db8d037..1219edd7a 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtw.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtw.scala @@ -1,7 +1,7 @@ -package com.tencent.angel.graph.embedding.struct2vec.fastdtw +package struct2vec.fastdtw -import com.tencent.angel.graph.embedding.struct2vec.DTW -import com.tencent.angel.graph.embedding.struct2vec.fastdtwUtils.{CostMatrix, RangeDiagonalConstraints, Space, TimeSeriesElement} +import struct2vec.DTW +import struct2vec.fastdtwUtils.{CostMatrix, RangeDiagonalConstraints, Space, TimeSeriesElement} /** * @param searchRadius diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/TimeSeriesElement.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/TimeSeriesElement.scala index 328316448..d1a4d8962 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/TimeSeriesElement.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/fastdtwUtils/TimeSeriesElement.scala @@ -8,4 +8,4 @@ import com.madsync.time.DateTime * @param t timestamp associated with this entry * @param v value associated with this element */ -final case class TimeSeriesElement[T](t: DateTime, v: Option[T]) {} +final case class TimeSeriesElement[T](v: Option[T]) {} From 39a5b2bd3b02d87139d638746f20cc10cb194ca5 Mon Sep 17 00:00:00 2001 From: yinhan <1513032551@qq.com> Date: Wed, 7 Sep 2022 10:08:29 +0800 Subject: [PATCH 5/6] update code work --- .../graph/embedding/struct2vec/Example.scala | 74 +++++ .../embedding/struct2vec/Struct2vec.scala | 266 ++++++++++++------ .../Struct2vecGraphParatition.scala | 5 +- .../struct2vec/Struct2vecPSModel.scala | 4 +- .../graph/embedding/struct2vec/utils.scala | 16 -- 5 files changed, 258 insertions(+), 107 deletions(-) create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Example.scala delete mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/utils.scala diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Example.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Example.scala new file mode 100644 index 000000000..793fb9f3a --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Example.scala @@ -0,0 +1,74 @@ +package com.tencent.angel.graph.embedding.struct2vec + +import com.tencent.angel.conf.AngelConf +import com.tencent.angel.graph.embedding.struct2vec.Struct2vec +import com.tencent.angel.graph.utils.GraphIO +import com.tencent.angel.spark.context.PSContext +import org.apache.spark.storage.StorageLevel +import org.apache.spark.{SparkConf, SparkContext} + +object Example { + def main(args: Array[String]): Unit = { + val input = "data/bc/karate_club_network.txt" + val storageLevel = StorageLevel.fromString("MEMORY_ONLY") + val batchSize = 10 + val output = "data/output/output1" + val stay_prob: Double = 0.3 + val opt1_reduce_len: Boolean = true + val opt2_reduce_sim_calc: Boolean = false + val opt3_num_layers:Int = 10 + val max_num_layers:Int = 10 + val srcIndex = 0 + val dstIndex = 1 + val weightIndex = 2 + val psPartitionNum = 1 + val partitionNum = 1 + val useEdgeBalancePartition = false + val isWeighted = false + val needReplicateEdge =true + + val sep = " " + val walkLength = 10 + + + start() + + val struct2vec = new Struct2vec() + .setStorageLevel(storageLevel) + .setPSPartitionNum(psPartitionNum) + .setSrcNodeIdCol("src") + .setDstNodeIdCol("dst") + .setWeightCol("weight") + .setBatchSize(batchSize) + .setWalkLength(walkLength) + .setPartitionNum(partitionNum) + .setIsWeighted(isWeighted) + .setNeedReplicaEdge(needReplicateEdge) + .setUseEdgeBalancePartition(useEdgeBalancePartition) + .setEpochNum(2) + + struct2vec.setOutputDir(output) + val df = GraphIO.load(input, isWeighted = isWeighted, srcIndex, dstIndex, weightIndex, sep = sep) + val mapping = struct2vec.transform(df) + + stop() + } + + def start(mode: String = "local[4]"): Unit = { + val conf = new SparkConf() + conf.setMaster(mode) + conf.setAppName("Struct2vec") + conf.set(AngelConf.ANGEL_PSAGENT_UPDATE_SPLIT_ADAPTION_ENABLE, "false") + val sc = new SparkContext(conf) + sc.setLogLevel("ERROR") + sc.setCheckpointDir("data/cp") + //PSContext.getOrCreate(sc) + } + + def stop(): Unit = { + PSContext.stop() + SparkContext.getOrCreate().stop() + } + +} + diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala index c3a8bd59f..b3f3d9375 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala @@ -1,37 +1,46 @@ -package struct2vec +package com.tencent.angel.graph.embedding.struct2vec import com.tencent.angel.graph.embedding.struct2vec.{Struct2vecGraphPartition, Struct2vecParams} - -import java.lang.Math.{abs, log, max, min} -import scala.collection.JavaConversions.asJavaCollection -import scala.collection.immutable.Map -import scala.collection.mutable +import com.tencent.angel.graph.common.param.ModelContext +import com.tencent.angel.graph.data.neighbor.NeighborDataOps +import com.tencent.angel.graph.utils.params._ +import com.tencent.angel.graph.utils.{GraphIO, Stats} +import com.tencent.angel.spark.context.PSContext +import org.apache.spark.SparkContext +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.util.Identifiable +import org.apache.spark.sql.types.{StructType, _} +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.storage.StorageLevel + +import java.lang.Math.{abs, exp, log, max, min} +import scala.collection.{Seq, mutable} import scala.collection.mutable.{ArrayBuffer, ListBuffer, Queue, Set} -import scala.util.Random -import struct2vec.Struct2vecParams import struct2vec.fastdtw.fastdtw import struct2vec.fastdtwUtils.{EuclideanSpace, TimeSeriesElement, VectorValue} +import struct2vec.Alias_table.createAliasTable -import java.util -import java.util.ArrayList -import scala.util.control.Breaks.break - +import java.lang.Math.exp class Struct2vec(params: Struct2vecParams ) { -// private val graph_Nodes : List[Int] = List() + private val idx2Nodes : Array[Int] = Array() - private val Nodes2idx : List[Int] = List() -// private val idx : List[Int] = List.range(0,idx2Nodes.length,1) -// private val graph : List[Int] = List() -// private val embedding :Map[String,Int] = Map() + private var output: String = _ + + def this() = this(Identifiable.randomUID("Struct2vec")) + + def setOutputDir(in: String): Unit = { + output = in + } override def transform(dataset: Dataset[_]): DataFrame = { //create origin edges RDD and data preprocessing - val rawEdges = NeighborDataOps.loadEdgesWithWeight(dataset, params.srcNodeIdCol), params.dstNodeIdCol, params.weightCol, params.isWeighted, params.needReplicaEdge, true, false, false) + val rawEdges = NeighborDataOps.loadEdgesWithWeight(dataset, params.srcNodeIdCol, params.dstNodeIdCol, params.weightCol, params.isWeighted, params.needReplicaEdge, true, false, false) rawEdges.repartition(params.partitionNum).persist(params.StorageLevel.DISK_ONLY) val (minId, maxId, numEdges) = Stats.summarizeWithWeight(rawEdges) println(s"minId=$minId maxId=$maxId numEdges=$numEdges level=${params.StorageLevel}") @@ -69,10 +78,66 @@ class Struct2vec(params: Struct2vecParams ) { //trigger action graphOri.foreachPartition(_ => Unit) + // checkpoint + model.checkpoint() + + var epoch = 0 + while (epoch < $(epochNum)) { + var graph = graphOri.map(x => x.deepClone()) + //sample paths with random walk + var curIteration = 0 + var prev = graph + val beginTime = System.currentTimeMillis() + do { + val beforeSample = System.currentTimeMillis() + curIteration += 1 + graph = prev.map(_.process(model, curIteration)) + graph.persist($(storageLevel)) + graph.count() + prev.unpersist(true) + prev = graph + var sampleTime = (System.currentTimeMillis() - beforeSample) + println(s"epoch $epoch, iter $curIteration, sampleTime: $sampleTime") + } while (curIteration < $(walkLength) - 1) + + + val EndTime = (System.currentTimeMillis() - beginTime) + println(s"epoch $epoch, Struct2vecWithWeight all sampleTime: $EndTime") + + val temp = graph.flatMap(_.save()) + println(s"epoch $epoch, num path: ${temp.count()}") + println(s"epoch $epoch, num invalid path: ${ + temp.filter(_.length != ${ + walkLength + }).count() + }") + val tempRe = dataset.sparkSession.createDataFrame(temp.map(x => Row(x.mkString(" "))), transformSchema(dataset.schema)) + if (epoch == 0) { + GraphIO.save(tempRe, output) + } + + else { + GraphIO.appendSave(tempRe, output) + } + println(s"epoch $epoch, saved results to $output") + epoch += 1 + graph.unpersist() + } + val t = SparkContext.getOrCreate().parallelize(List("1", "2"), 1) + dataset.sparkSession.createDataFrame(t.map(x => Row(x)), transformSchema(dataset.schema)) + } + + override def transformSchema(schema: StructType): StructType = { + StructType(Seq(StructField("path", StringType, nullable = false))) } + + override def copy(extra: ParamMap): Transformer = defaultCopy(extra) + + + def compute_orderd_degreelist(graph_adj:Iterator[(Long,Array[Long])],max_num_layers:Int):ArrayBuffer[Array[Array[(Long,Long)]]]={ val order_list: ArrayBuffer[Array[Array[(Long,Long)]]] = ArrayBuffer() graph_adj.foreach { f => @@ -104,7 +169,7 @@ class Struct2vec(params: Struct2vecParams ) { degree_list(degree) += 1 // count node freq for(nei <- neighbors){ - var nei_index = Nodes2idx(nei.toInt) + var nei_index = idx2Nodes(nei.toInt) if( !visited(nei_index)){ visited(nei_index) = true queue.enqueue(nei_index) @@ -131,7 +196,7 @@ class Struct2vec(params: Struct2vecParams ) { graph_adj:Iterator[(Long,Array[Long])], max_num_layers:Int, degrees:(ArrayBuffer[Long],Array[Long],Array[Long]), - workers:Int=1,verbose:Int=0): Unit = { + workers:Int=1,verbose:Int=0) = { if (params.opt2_reduce_sim_calc == true) { var degreeListSelected: Array[Array[Array[(Long,Long)]]] = Array() @@ -162,8 +227,8 @@ class Struct2vec(params: Struct2vecParams ) { // var results = Parallel(workers)(delayed(cpmpute_dtw_dist)(part_list,degreelist,dist_func)) // } var dtw_dist = compute_dtw_dist(graph_adj,degreeList) - structural_dist = convert_dtw_struc_dist(dtw_dist) - distances + var structural_dist = convert_dtw_struc_dist(dtw_dist) + structural_dist } @@ -194,7 +259,17 @@ class Struct2vec(params: Struct2vecParams ) { } (vertices,degrees_before,degrees_after) // ( degree->数组索引,(node,before,after)) } - + //确定度 + def verifyDegrees(degree_v_root:Int,degree_a:Long,degree_b:Long): Long ={ + if(degree_b == -1) + degree_a + else if (degree_a == -1) + degree_b + else if (abs(degree_b - degree_v_root) < abs(degree_a - degree_v_root)) + degree_b + else + degree_a + } def judge_degree_one(before:Long):Long = { if(before==0) -1 @@ -280,108 +355,125 @@ class Struct2vec(params: Struct2vecParams ) { def change_layer_distance(layers_distance:Array[(Long,Long,Double)]) ={ layers_distance.map{case(vx,vy,distance) => ((vx,vy),distance)}.toArray } - //权重表 - def get_transition_probs(layers_distance:Array[((Long,Long),Double)], - layers_adj:Array[(Int,Array[Long])]) ={ -// var layers_alias -// var layers_accept + def search_distance(layers_distance:Array[((Long,Long),Double)],v:Tuple2[Long,Long]):Float = { + layers_distance.foreach { + case (v_pair, dist) => + if (v_pair == v) { + return dist.toFloat + } + } + 0 + } - for(layer <- 0 to layers_adj.length){ - var neighbors = layers_adj(layer) - var layer_distance = layers_distance(layer) + //转移概率 + def get_transition_probs(layers_distance: Array[((Long, Long), Double)], + layers_adj: Array[(Int, Array[Long])]) = { - for( (v,nei) <- neighbors ){ - var e_list : ArrayBuffer[Double]= ArrayBuffer() - var sum_weight =0.0 + var layers_alias: Array[Array[Int]] = Array() + var layers_accept: Array[Array[Float]] = Array() + var layers = 1 + val v_pair = layers_distance.map(f => f._1) + var wd, w = 0.0 + var norm_weights: Array[ArrayBuffer[Float]] = Array() - for(n <- nei) { -// var wd = layer_distance() - } - } + layers_adj.foreach { case (v, neighbors) => + + var e_list: ArrayBuffer[Float] = ArrayBuffer() + var sum_weight = 0.0 + + + for (nei <- neighbors) { + if (v_pair.contains((v, nei))) { + wd = search_distance(layers_distance, (v, nei)) + } else wd = search_distance(layers_distance, (nei, v)) + + w = exp(-wd) + e_list.append(w.toFloat) + sum_weight += w } + e_list = for (x <- e_list) yield { + x / sum_weight + } + var e_list_w: Array[Float] = Array() + for (i <- 0 to e_list.length) e_list_w(i) = e_list(i) + norm_weights(v) = e_list + var acp_alias = createAliasTable(e_list_w) + layers_accept(layers) = acp_alias._1 + layers_alias(layers) = acp_alias._2 -// (layers_accept , layers_alias) + } + (layers_accept, layers_alias) } - def cost(a:List[Int],b:List[Int]):Double ={ - val ep=0.5 - val m = max(a(0),b(0))+ep - val mi = min(a(0),b(0))+ep - val result = ((m/mi)-1) + + def cost(a: List[Int], b: List[Int]): Double = { + val ep = 0.5 + val m = max(a(0), b(0)) + ep + val mi = min(a(0), b(0)) + ep + val result = ((m / mi) - 1) return result } - def cost_min(a:List[Int],b:List[Int]): Double ={ - val ep=0.5 - val m = max(a(0),b(0))+ep - val mi = min(a(0),b(0))+ep - val result = ((m/mi)-1) * min(a(1),b(1)) + def cost_min(a: List[Int], b: List[Int]): Double = { + val ep = 0.5 + val m = max(a(0), b(0)) + ep + val mi = min(a(0), b(0)) + ep + val result = ((m / mi) - 1) * min(a(1), b(1)) return result } - def cost_max(a:List[Int],b:List[Int]):Double={ - val ep=0.5 - val m = max(a(0),b(0))+ep - val mi = min(a(0),b(0))+ep - val result = ((m/mi)-1)*max(a(1),b(1)) + def cost_max(a: List[Int], b: List[Int]): Double = { + val ep = 0.5 + val m = max(a(0), b(0)) + ep + val mi = min(a(0), b(0)) + ep + val result = ((m / mi) - 1) * max(a(1), b(1)) return result } //dtw转换成结构距离 (layer,(v1,v2,dist)) - def convert_dtw_struc_dist(distances:ArrayBuffer[(Long,Long,Int,Double)],startLayer:Int =1) = { - var dist = distances.map{ case (src,dist,layer,distance) => ((src,dist),(layer,distance))} - var pair = dist.groupBy(_._1).map(x=>(x._1,x._2.map(x=>(x._2._1,x._2._2)))).toArray //((v1,v2),ArrayBuffer(layer,distance)) - for((vertices,layer_distance)<-pair){ - var keys_layers = for((layer,distance) <- layer_distance) yield layer + def convert_dtw_struc_dist(distances: ArrayBuffer[(Long, Long, Int, Double)], startLayer: Int = 1) = { + var dist = distances.map { case (src, dist, layer, distance) => ((src, dist), (layer, distance)) } + var pair = dist.groupBy(_._1).map(x => (x._1, x._2.map(x => (x._2._1, x._2._2)))).toArray //((v1,v2),ArrayBuffer(layer,distance)) + for ((vertices, layer_distance) <- pair) { + var keys_layers = for ((layer, distance) <- layer_distance) yield layer keys_layers.sorted - var startLayer = min(keys_layers.length,startLayer) + var startLayer = min(keys_layers.length, startLayer) for (layer <- 0 to startLayer) keys_layers.remove(0) - for(layer <- keys_layers) - layer_distance(layer) = (layer,layer_distance(layer)._2+layer_distance(layer-1)._2) + for (layer <- keys_layers) + layer_distance(layer) = (layer, layer_distance(layer)._2 + layer_distance(layer - 1)._2) } pair } - //确定度 - def verifyDegrees(degree_v_root:Int,degree_a:Long,degree_b:Long): Long ={ - if(degree_b == -1) - degree_a - else if (degree_a == -1) - degree_b - else if (abs(degree_b - degree_v_root) < abs(degree_a - degree_v_root)) - degree_b - else - degree_a - } //计算dtw距离 - def compute_dtw_dist(part_graph:Iterator[(Long,Array[Long])],degreeList:ArrayBuffer[Array[Array[(Long,Long)]]]) ={ - val pair_v = new ArrayBuffer[(Long,Long,Int,Double)] -// val layer_dist = new ArrayList[(Int,Double)] + def compute_dtw_dist(part_graph: Iterator[(Long, Array[Long])], degreeList: ArrayBuffer[Array[Array[(Long, Long)]]]) = { + val pair_v = new ArrayBuffer[(Long, Long, Int, Double)] + // val layer_dist = new ArrayList[(Int,Double)] - part_graph.foreach{ case (v1,neighbors) => + part_graph.foreach { case (v1, neighbors) => var lists_v1 = degreeList(v1.toInt) - neighbors.foreach( v2 =>{ + neighbors.foreach(v2 => { var lists_v2 = degreeList(v2.toInt) - var max_layer = min(lists_v1.length,lists_v2.length) + var max_layer = min(lists_v1.length, lists_v2.length) - for(layer <- 0 to max_layer) yield { - var v1_degree_list = lists_v1(layer).map(f=>f._1.toDouble).toSeq.map(v=>TimeSeriesElement(Some(VectorValue(v)))) - var v2_degree_list = lists_v2(layer).map(f=>f._1.toDouble).toSeq.map(v=>TimeSeriesElement(Some(VectorValue(v)))) - var fdtw = new fastdtw(1,EuclideanSpace) + for (layer <- 0 to max_layer) yield { + var v1_degree_list = lists_v1(layer).map(f => f._1.toDouble).toSeq.map(v => TimeSeriesElement(Some(VectorValue(v)))) + var v2_degree_list = lists_v2(layer).map(f => f._1.toDouble).toSeq.map(v => TimeSeriesElement(Some(VectorValue(v)))) + var fdtw = new fastdtw(1, EuclideanSpace) // var path = fdtw.evaluate(v1_degree_list,v2_degree_list).optimalPath - var dist = fdtw.evaluate(v1_degree_list,v2_degree_list).optimalCost - pair_v.append((v1,v2,layer,dist)) + var dist = fdtw.evaluate(v1_degree_list, v2_degree_list).optimalCost + pair_v.append((v1, v2, layer, dist)) } }) } - pair_v + pair_v } - } + diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecGraphParatition.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecGraphParatition.scala index d8ecdccc2..6f972f429 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecGraphParatition.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecGraphParatition.scala @@ -1,10 +1,11 @@ -package struct2vec +package com.tencent.angel.graph.embedding.struct2vec import com.tencent.angel.graph.psf.neighbors.SampleNeighborsWithCount.NeighborsAliasTableElement -import scala.util.Random import scala.collection.mutable.ArrayBuffer import scala.util.Random +import scala.collection.mutable.ArrayBuffer + class Struct2vecGraphPartition(index: Int, srcNodesArray: Array[Long], srcNodesSamplePaths: Array[Array[Long]], batchSize: Int) { diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecPSModel.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecPSModel.scala index ca82d37ef..4166ffd42 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecPSModel.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecPSModel.scala @@ -1,4 +1,4 @@ -package struct2vec +package com.tencent.angel.graph.embedding.struct2vec import com.tencent.angel.graph.common.param.ModelContext @@ -61,7 +61,7 @@ object Struct2vecPSModel { data, modelContext.getMaxNodeId, modelContext.getPartitionNum, matrix, balancePartitionPercent) val psMatrix = PSMatrix.matrix(matrix) - new DeepWalkPSModel(psMatrix) + new Struct2vecPSModel(psMatrix) } } diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/utils.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/utils.scala deleted file mode 100644 index ac58d3629..000000000 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/utils.scala +++ /dev/null @@ -1,16 +0,0 @@ -package com.tencent.angel.graph.embedding.struct2vec - - - -object utils { - -// def preprocess_nxgraph() = { -// -// } - - - - - } - -} From a78cae13a19a440033326145837d9c65f86117c3 Mon Sep 17 00:00:00 2001 From: yinhan <1513032551@qq.com> Date: Mon, 12 Sep 2022 22:19:45 +0800 Subject: [PATCH 6/6] code work --- .../embedding/struct2vec/Struct2vec.scala | 32 ++++++++++--------- .../struct2vec/Struct2vecParams.scala | 11 ++++++- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala index b3f3d9375..532e3f312 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vec.scala @@ -13,6 +13,7 @@ import org.apache.spark.ml.util.Identifiable import org.apache.spark.sql.types.{StructType, _} import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.storage.StorageLevel +import struct2vec.Alias_table import java.lang.Math.{abs, exp, log, max, min} import scala.collection.{Seq, mutable} @@ -27,11 +28,10 @@ import java.lang.Math.exp class Struct2vec(params: Struct2vecParams ) { - private val idx2Nodes : Array[Int] = Array() + val idx2Nodes : Array[Long] = Array() - private var output: String = _ + var output: String = _ - def this() = this(Identifiable.randomUID("Struct2vec")) def setOutputDir(in: String): Unit = { output = in @@ -44,7 +44,9 @@ class Struct2vec(params: Struct2vecParams ) { rawEdges.repartition(params.partitionNum).persist(params.StorageLevel.DISK_ONLY) val (minId, maxId, numEdges) = Stats.summarizeWithWeight(rawEdges) println(s"minId=$minId maxId=$maxId numEdges=$numEdges level=${params.StorageLevel}") - for(i <- 0 to maxId) idx2Nodes(i) = i + for(i <- 0 to maxId) { + idx2Nodes(i)=i + } val edges = rawEdges.map { case (src, dst, w) => (src, (dst, w)) } @@ -62,11 +64,11 @@ class Struct2vec(params: Struct2vecParams ) { val modelContext = new ModelContext(params.psPartitionNum, minId, maxId + 1, -1, "struct2vec", SparkContext.getOrCreate().hadoopConfiguration) - // val data = edges.map(_._2._1) // ps loadBalance by in degree val data = edges.flatMap(f => Iterator(f._1, f._2._1)) //拿出(src,neighbors) - //val model = DeepWalkPSModel.fromMinMax(minId, maxId, data, $(psPartitionNum), useBalancePartition = $(useBalancePartition)) + val model = Struct2vecPSModel(modelContext, data, params.useBalancePartition, params.balancePartitionPercent) + val degreed_list = compute_orderd_degreelist(data,params.max_num_layers ) val degrees = create_vector(data) @@ -74,7 +76,7 @@ class Struct2vec(params: Struct2vecParams ) { val graphOri = aliasTable.mapPartitionsWithIndex((index, adjTable) => Iterator(Struct2vecGraphPartition.initPSMatrixAndNodePath(model, index, adjTable, params.batchSize)))) - graphOri.persist($(storageLevel)) + graphOri.persist((params.storageLevel)) //trigger action graphOri.foreachPartition(_ => Unit) @@ -82,7 +84,7 @@ class Struct2vec(params: Struct2vecParams ) { model.checkpoint() var epoch = 0 - while (epoch < $(epochNum)) { + while (epoch < params.epochNum) { var graph = graphOri.map(x => x.deepClone()) //sample paths with random walk var curIteration = 0 @@ -92,13 +94,13 @@ class Struct2vec(params: Struct2vecParams ) { val beforeSample = System.currentTimeMillis() curIteration += 1 graph = prev.map(_.process(model, curIteration)) - graph.persist($(storageLevel)) + graph.persist(params.storageLevel) graph.count() prev.unpersist(true) prev = graph var sampleTime = (System.currentTimeMillis() - beforeSample) println(s"epoch $epoch, iter $curIteration, sampleTime: $sampleTime") - } while (curIteration < $(walkLength) - 1) + } while (curIteration < params.walkLength - 1) val EndTime = (System.currentTimeMillis() - beginTime) @@ -108,7 +110,7 @@ class Struct2vec(params: Struct2vecParams ) { println(s"epoch $epoch, num path: ${temp.count()}") println(s"epoch $epoch, num invalid path: ${ temp.filter(_.length != ${ - walkLength + params.walkLength }).count() }") val tempRe = dataset.sparkSession.createDataFrame(temp.map(x => Row(x.mkString(" "))), transformSchema(dataset.schema)) @@ -170,9 +172,9 @@ class Struct2vec(params: Struct2vecParams ) { for(nei <- neighbors){ var nei_index = idx2Nodes(nei.toInt) - if( !visited(nei_index)){ - visited(nei_index) = true - queue.enqueue(nei_index) + if( !visited(nei_index.toInt)){ + visited(nei_index.toInt) = true + queue.enqueue(nei_index.toInt) } } count-=1 @@ -220,7 +222,7 @@ class Struct2vec(params: Struct2vecParams ) { var vertices: Array[Array[Int]] = Array() for(v <- idx2Nodes) { - vertices(v) = for(vd <- idx2Nodes if vd > v ) yield vd + vertices(v) = for(vd <- idx2Nodes if vd > v ) yield vd.toInt } } // for(part_list in partition_dict(vertices,workers)){ diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecParams.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecParams.scala index 4e3df8bd2..1206d72a7 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecParams.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struct2vec/Struct2vecParams.scala @@ -1,9 +1,18 @@ package com.tencent.angel.graph.embedding.struct2vec +import org.apache.spark.storage.StorageLevel + class Struct2vecParams extends Serializable { + var srcNodeIdCol= "src" + var dstNodeIdCol= "dst" + var weightCol= "weight" var partitionNum: Int = _ var walkLength: Int = _ + var psPartitionNum: Int = _ + var useBalancePartition: Boolean=true + var balancePartitionPercent: Float=_ + var storageLevel=StorageLevel.MEMORY_ONLY var stay_prob: Float = _ var opt1_reduce_len: Boolean = true var opt2_reduce_sim_calc: Boolean = false @@ -15,7 +24,7 @@ class Struct2vecParams extends Serializable { var decayRate: Float = _ var batchSize: Int = _ var logStep: Int = _ - var numEpoch: Int = _ + var epochNum: Int = _ var maxIndex: Int = _ var minIndex: Int = _ var sampleRate: Float = _