Skip to content

Commit

Permalink
add fusiform_similarity,rings_detect and kcore ap algorithm (#5)
Browse files Browse the repository at this point in the history
* improve
* move c_label to lower layer and add appendRow(value)
* add community limit 100w for louvain
* improve louvain log
* fix louvain bug

Change-Id: I886ac3e7a3f0dfd49e66fdf544f97f6f7db615df
  • Loading branch information
zhoney authored and javeme committed Oct 19, 2022
1 parent fa306b4 commit bd3a0be
Show file tree
Hide file tree
Showing 8 changed files with 623 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public abstract class AbstractAlgorithm implements Algorithm {
public static final String KEY_CLEAR = "clear";
public static final String KEY_CAPACITY = "capacity";
public static final String KEY_LIMIT = "limit";
public static final String KEY_ALPHA = "alpha";

public static final long DEFAULT_CAPACITY = 10000000L;
public static final long DEFAULT_LIMIT = 100L;
Expand All @@ -92,6 +93,9 @@ public abstract class AbstractAlgorithm implements Algorithm {
public static final long DEFAULT_TIMES = 20L;
public static final long DEFAULT_STABLE_TIMES= 3L;
public static final double DEFAULT_PRECISION = 1.0 / 1000;
public static final double DEFAULT_ALPHA = 0.5D;

public static final String C_LABEL = "c_label";

@Override
public void checkParameters(Map<String, Object> parameters) {
Expand Down Expand Up @@ -119,6 +123,21 @@ protected static Directions direction(Map<String, Object> parameters) {
return parseDirection(direction);
}

protected static double alpha(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_ALPHA)) {
return DEFAULT_ALPHA;
}
double alpha = parameterDouble(parameters, KEY_ALPHA);
checkAlpha(alpha);
return alpha;
}

public static void checkAlpha(double alpha) {
E.checkArgument(alpha > 0 && alpha <= 1.0,
"The alpha of must be in range (0, 1], but got %s",
alpha);
}

protected static long top(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_TOP)) {
return 0L;
Expand Down Expand Up @@ -281,10 +300,15 @@ protected Iterator<Vertex> vertices(long limit) {
return this.graph().vertices(query);
}

protected Iterator<Vertex> vertices(Object label, Object clabel,
long limit) {
return vertices(label, C_LABEL, clabel, limit);
}

protected Iterator<Vertex> vertices(Object label, String key,
Object value, long limit) {
Iterator<Vertex> vertices = this.vertices(label, limit);
if (key != null) {
if (value != null) {
vertices = filter(vertices, key, value);
}
return vertices;
Expand Down Expand Up @@ -490,6 +514,11 @@ public void appendRaw(String key, String rawJson) {
this.checkSizeLimit();
}

public void appendRaw(String rawJson) {
this.json.append(rawJson).append(',');
this.checkSizeLimit();
}

public void append(Set<Entry<Id, MutableLong>> kvs) {
for (Map.Entry<Id, MutableLong> top : kvs) {
this.append(top.getKey(), top.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import com.baidu.hugegraph.job.algorithm.cent.DegreeCentralityAlgorithm;
import com.baidu.hugegraph.job.algorithm.cent.EigenvectorCentralityAlgorithm;
import com.baidu.hugegraph.job.algorithm.comm.ClusterCoeffcientAlgorithm;
import com.baidu.hugegraph.job.algorithm.comm.KCoreAlgorithm;
import com.baidu.hugegraph.job.algorithm.comm.LouvainAlgorithm;
import com.baidu.hugegraph.job.algorithm.comm.LpaAlgorithm;
import com.baidu.hugegraph.job.algorithm.comm.TriangleCountAlgorithm;
import com.baidu.hugegraph.job.algorithm.path.RingsDetectAlgorithm;
import com.baidu.hugegraph.job.algorithm.similarity.FusiformSimilarityAlgorithm;

public class AlgorithmPool {

Expand All @@ -48,6 +51,10 @@ public class AlgorithmPool {
INSTANCE.register(new ClusterCoeffcientAlgorithm());
INSTANCE.register(new LpaAlgorithm());
INSTANCE.register(new LouvainAlgorithm());

INSTANCE.register(new FusiformSimilarityAlgorithm());
INSTANCE.register(new RingsDetectAlgorithm());
INSTANCE.register(new KCoreAlgorithm());
}

private final Map<String, Algorithm> algorithms;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@

import com.baidu.hugegraph.job.Job;
import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
import com.baidu.hugegraph.job.algorithm.comm.LpaAlgorithm;

public abstract class AbstractCentAlgorithm extends AbstractAlgorithm {

protected static final String C_LABEL = LpaAlgorithm.Traverser.C_LABEL;

@Override
public String category() {
return CATEGORY_CENT;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.job.algorithm.comm;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.job.Job;
import com.baidu.hugegraph.schema.EdgeLabel;
import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser;
import com.baidu.hugegraph.type.define.Directions;
import com.baidu.hugegraph.util.CollectionUtil;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.JsonUtil;
import com.google.common.collect.ImmutableSet;

public class KCoreAlgorithm extends AbstractCommAlgorithm {

public static final String KEY_K = "k";
public static final String KEY_MERGED = "merged";

public static final int DEFAULT_K = 3;

@Override
public String name() {
return "k_core";
}

@Override
public void checkParameters(Map<String, Object> parameters) {
k(parameters);
alpha(parameters);
merged(parameters);
degree(parameters);
sourceLabel(parameters);
sourceCLabel(parameters);
direction(parameters);
edgeLabel(parameters);
}

@Override
public Object call(Job<Object> job, Map<String, Object> parameters) {
Traverser traverser = new Traverser(job);
return traverser.kcore(sourceLabel(parameters),
sourceCLabel(parameters),
direction(parameters), edgeLabel(parameters),
k(parameters), alpha(parameters),
degree(parameters), merged(parameters));
}

protected static int k(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_K)) {
return DEFAULT_K;
}
int k = parameterInt(parameters, KEY_K);
E.checkArgument(k > 1, "The k of kcore must be > 1, but got %s", k);
return k;
}

protected static boolean merged(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_MERGED)) {
return false;
}
return parameterBoolean(parameters, KEY_MERGED);
}

public static class Traverser extends AlgoTraverser {

public Traverser(Job<Object> job) {
super(job);
}

public Object kcore(String sourceLabel, String sourceCLabel,
Directions dir, String label, int k, double alpha,
long degree, boolean merged) {
HugeGraph graph = this.graph();
Iterator<Vertex> vertices = this.vertices(sourceLabel, sourceCLabel,
Query.NO_LIMIT);
EdgeLabel edgeLabel = label == null ? null : graph.edgeLabel(label);

KcoreTraverser traverser = new KcoreTraverser(graph);
JsonMap kcoresJson = new JsonMap();
kcoresJson.startObject();
kcoresJson.appendKey("kcores");
kcoresJson.startList();
Set<Set<Id>> kcoreSet = new HashSet<>();
while(vertices.hasNext()) {
this.updateProgress(++this.progress);
Vertex vertex = vertices.next();
Set<Id> kcore = traverser.kcore(IteratorUtils.of(vertex),
dir, edgeLabel, k, alpha,
degree);
if (kcore.isEmpty()) {
continue;
}
if (merged) {
mergeKcores(kcoreSet, kcore);
} else {
kcoresJson.appendRaw(JsonUtil.toJson(kcore));
}
}
if (merged) {
for (Set<Id> kcore : kcoreSet) {
kcoresJson.appendRaw(JsonUtil.toJson(kcore));
}
}
kcoresJson.endList();
kcoresJson.endObject();

return kcoresJson.asJson();
}

@SuppressWarnings("unchecked")
private static void mergeKcores(Set<Set<Id>> kcores, Set<Id> kcore) {
boolean merged = false;
/*
* Iterate to collect merging kcores firstly, because merging
* kcores will be removed from all kcores.
* Besides one new kcore may connect to multiple existing kcores.
*/
Set<Set<Id>> mergingKcores = new HashSet<>();
for (Set<Id> existedKcore : kcores) {
if (CollectionUtil.hasIntersection(existedKcore, kcore)) {
mergingKcores.add(existedKcore);
merged = true;
}
}
if (merged) {
for (Set<Id> mergingKcore : mergingKcores) {
kcores.remove(mergingKcore);
kcore.addAll(mergingKcore);
}
}
kcores.add(kcore);
}
}

public static class KcoreTraverser extends FusiformSimilarityTraverser {

public KcoreTraverser(HugeGraph graph) {
super(graph);
}

public Set<Id> kcore(Iterator<Vertex> vertices, Directions direction,
EdgeLabel label, int k, double alpha,
long degree) {
int minNeighbors = (int) Math.floor(1 / alpha * k);
SimilarsMap map = fusiformSimilarity(vertices, direction, label,
minNeighbors, alpha, k - 1,
0, null, 1, degree,
NO_LIMIT, NO_LIMIT, true);
if (map.isEmpty()) {
return ImmutableSet.of();
}
return extractKcore(map, k);
}


@SuppressWarnings("unchecked")
private static Set<Id> extractKcore(SimilarsMap similarsMap, int k) {
assert similarsMap.size() == 1;
Map.Entry<Id, Set<Similar>> entry = similarsMap.entrySet()
.iterator().next();
Id source = entry.getKey();
Set<KcoreSimilar> similars = new HashSet<>();
for (Similar similar: entry.getValue()) {
similars.add(new KcoreSimilar(similar));
}

boolean stop;
do {
stop = true;
// Do statistics
Map<Id, MutableInt> counts = new HashMap<>();
for (KcoreSimilar similar : similars) {
for (Id id : similar.ids()) {
MutableInt count = counts.get(id);
if (count == null) {
count = new MutableInt(0);
counts.put(id, count);
}
count.increment();
}
}
/*
* Iterate similars to:
* 1. delete failed similar
* 2. delete failed intermediaries in survive similar
* 3. update statistics
*/
Set<KcoreSimilar> failedSimilars = new HashSet<>();
for (KcoreSimilar similar : similars) {
Set<Id> failedIds = new HashSet<>();
for (Id id : similar.ids()) {
MutableInt count = counts.get(id);
if (count.getValue() < k - 1) {
count.decrement();
failedIds.add(id);
stop = false;
}
}

Set<Id> survivedIds = new HashSet<>(CollectionUtils
.subtract(similar.ids(), failedIds));
if (survivedIds.size() < k) {
for (Id id : survivedIds) {
counts.get(id).decrement();
}
failedSimilars.add(similar);
} else {
similar.ids(survivedIds);
}
}
similars = new HashSet<>(CollectionUtils.subtract(
similars, failedSimilars));
} while (!stop);

if (similars.isEmpty()) {
return ImmutableSet.of();
}
Set<Id> kcores = new HashSet<>();
kcores.add(source);
for (KcoreSimilar similar : similars) {
kcores.add(similar.id());
kcores.addAll(similar.ids());
}
return kcores;
}
}

private static class KcoreSimilar extends
FusiformSimilarityTraverser.Similar {

private Set<Id> ids;

public KcoreSimilar(Id id, double score, List<Id> intermediaries) {
super(id, score, intermediaries);
this.ids = null;
}

public KcoreSimilar(FusiformSimilarityTraverser.Similar similar) {
super(similar.id(), similar.score(), similar.intermediaries());
this.ids = new HashSet<>(this.intermediaries());
}

public Set<Id> ids() {
if (this.ids == null) {
this.ids = new HashSet<>(this.intermediaries());
}
return this.ids;
}

public void ids(Set<Id> ids) {
this.ids = ids;
}
}
}
Loading

0 comments on commit bd3a0be

Please sign in to comment.