Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New algorithms implemented in Giraph #88

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions dga-giraph/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ configurations {

dependencies {
compile project(':dga-core')
compile('org.apache.giraph:giraph-core:1.1.0-hadoop2') {
compile('org.apache.giraph:giraph-core:1.2.0-SNAPSHOT') {
exclude module: 'guava'
exclude module: 'zookeeper'
}
hadoopProvided group: 'org.apache.hadoop', name: 'hadoop-client', version: cdh_version
testCompile 'org.apache.giraph:giraph-core:1.1.0-hadoop2'
testCompile 'org.apache.giraph:giraph-core:1.2.0'
testCompile "org.mockito:mockito-core:1.9.5"
testCompile 'junit:junit:4.11'
testCompile 'commons-httpclient:commons-httpclient:3.0.1'
Expand All @@ -50,7 +50,6 @@ task distConf(dependsOn: 'assemble', type: Copy) {

task dist(dependsOn: 'distJars', type: Copy) {
from "src/main/resources/"
include "dga-mr1-giraph"
include "dga-yarn-giraph"
include "dga-*-giraph"
into "${buildDir}/dist/bin"
}
2 changes: 1 addition & 1 deletion dga-giraph/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cdh5version=2.6.0-cdh5.4.0
cdh5version=2.6.0-cdh5.4.2
cdh4version=2.0.0-mr1-cdh4.7.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.soteradefense.dga.BfsTree;

import com.soteradefense.dga.DGALoggingUtil;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerGlobalCommUsage;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;

/**
*
* Starting from a provided search key (vertex ID) we create a BFS tree from the graph.
*
* All vertex ids must be > 0 for valid results due to default vertex values being set to 0 and
* vertices with no output edges aren't loaded in step 0, so their default vertex (parent id) value can
* not be reliable set to -1. Instead we must check for parent values < 1 and set them explicitly to -1 at each step.
*
* input: int value csv.
*
* Created by ekimbrel on 9/24/15.
*/
public class BfsTreeComputation extends BasicComputation<IntWritable, IntWritable, NullWritable, IntWritable> {

private static final Logger logger = LoggerFactory.getLogger(BfsTreeComputation.class);

@Override
public void initialize(GraphState graphState, WorkerClientRequestProcessor<IntWritable, IntWritable, NullWritable> workerClientRequestProcessor, GraphTaskManager<IntWritable, IntWritable, NullWritable> graphTaskManager, WorkerGlobalCommUsage workerGlobalCommUsage, WorkerContext workerContext) {
super.initialize(graphState, workerClientRequestProcessor, graphTaskManager, workerGlobalCommUsage, workerContext);
DGALoggingUtil.setDGALogLevel(this.getConf());
}

@Override
public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex, Iterable<IntWritable> messages) throws IOException {

long step = getSuperstep();
int thisVertexId = vertex.getId().get();
if (thisVertexId < 1){
throw new IllegalStateException("Invalid vertex id: all ids must be > 0 for bfs-tree-computation");
}


// on step 0 send original messages from root node to its adjacent nodes.
if (0 == step){

// default all parent values to -1
vertex.setValue(new IntWritable(-1));

// get the search key from the global aggregator (set by BfsTreeMasterCompute)
int searchKey = ( (IntWritable) getAggregatedValue(BfsTreeMasterCompute.SEARCH_KEY_AGG)).get();

// if the search key matches this vertex set the partent to itself
// and send out thisVertexId to all adjacent nodes.
if (searchKey == thisVertexId){
vertex.setValue(new IntWritable(thisVertexId));
this.sendMessageToAllEdges(vertex,vertex.getId());
}

}

/*
on each step after step 0:
if this node gets any messages:
if this nodes parent is -1
set its parent to the first message value
send out this nodes id to all adjacent nodes
*/

else {

// 0 can be used as a default value for verticies that aren't loaded until later in the computation due to have no out edges
// as a result we must check and replace any 0's with -1, and all graph vertices must have id >= 1
if (vertex.getValue().get() < 1){
vertex.setValue(new IntWritable(-1));
}

int thisParentId = vertex.getValue().get();
if (-1 == thisParentId){
// to ensure consistent results on multiple runs take the max value
int maxValue = -1;
for (IntWritable message: messages) maxValue = Math.max(maxValue,message.get());

if (maxValue > -1){
vertex.setValue(new IntWritable(maxValue));
sendMessageToAllEdges(vertex,vertex.getId());
}
}
}

// all nodes vote to halt after every super step.
vertex.voteToHalt();

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.soteradefense.dga.BfsTree;

import com.soteradefense.dga.DGALoggingUtil;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.hadoop.io.IntWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.giraph.aggregators.IntOverwriteAggregator;

/**
* Created by ekimbrel on 9/24/15.
*
* For algorithim description see BfsTreeComputation.
*
* The job of the master compute class in the BFS tree computation is to set the initial search key
* in a global aggregator before super step 0.
*
*
*/
public class BfsTreeMasterCompute extends DefaultMasterCompute {


private static final Logger logger = LoggerFactory.getLogger(BfsTreeMasterCompute.class);
public static final String SEARCH_KEY_AGG = "com.soteradefense.dga.BfsTree.searchKeyAggregator";
public static final String SEARCH_KEY_CONF = "dga.bfstree.searchkey";
private int searchKey;


@Override
public void initialize() throws InstantiationException, IllegalAccessException {
DGALoggingUtil.setDGALogLevel(this.getConf());
registerAggregator(SEARCH_KEY_AGG,IntOverwriteAggregator.class);
searchKey = Integer.parseInt(getConf().get(SEARCH_KEY_CONF,"-1"));
if (searchKey == -1){
throw new IllegalArgumentException("Search Key value must be set to a postive integer. set -ca "+SEARCH_KEY_CONF+"=<value> when running BfsTree");
}
}

@Override
public void compute() {
if (0 == this.getSuperstep()){
setAggregatedValue(SEARCH_KEY_AGG,new IntWritable(searchKey));
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public static void printUsageAndExit(Options options, int exitCode) {
"\t\thbse - High Betweenness Set Extraction\n" +
"\t\twcc - Weakly Connected Components\n" +
"\t\tlc - Leaf Compression\n" +
"\t\tpr - Page Rank\n";
"\t\tpr - Page Rank\n" +
"\t\ttricount - Triangle Counting\n" +
"\t\tbfstree - BFS Tree\n";
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(commandLine, options);
System.exit(exitCode);
Expand Down
89 changes: 89 additions & 0 deletions dga-giraph/src/main/java/com/soteradefense/dga/DGARunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@
package com.soteradefense.dga;


import com.soteradefense.dga.BfsTree.BfsTreeComputation;
import com.soteradefense.dga.BfsTree.BfsTreeMasterCompute;
import com.soteradefense.dga.combiners.IntIntMaxMessageCombiner;
import com.soteradefense.dga.hbse.HBSEComputation;
import com.soteradefense.dga.hbse.HBSEConfigurationConstants;
import com.soteradefense.dga.hbse.HBSEMasterCompute;
import com.soteradefense.dga.io.formats.*;
import com.soteradefense.dga.lc.LeafCompressionComputation;
import com.soteradefense.dga.pr.NormalizedPageRankComputation;
import com.soteradefense.dga.pr.PageRankComputation;
import com.soteradefense.dga.pr.PageRankMasterCompute;
import com.soteradefense.dga.scan1.Scan1Computation;
import com.soteradefense.dga.triangles.TriangleCountCombiner;
import com.soteradefense.dga.triangles.TriangleCountComputation;
import com.soteradefense.dga.triangles.TriangleCountMasterCompute;
import com.soteradefense.dga.wcc.WeaklyConnectedComponentComputation;
import org.apache.commons.cli.Options;
import org.apache.giraph.GiraphRunner;
Expand All @@ -49,6 +57,10 @@ public class DGARunner {
supportedAnalytics.add("hbse");
supportedAnalytics.add("lc");
supportedAnalytics.add("pr");
supportedAnalytics.add("pr-norm");
supportedAnalytics.add("scan1");
supportedAnalytics.add("tricount");
supportedAnalytics.add("bfstree");
}

public void run(String[] args) throws Exception {
Expand Down Expand Up @@ -162,8 +174,85 @@ public void run(String[] args) throws Exception {

String[] giraphArgs = finalConf.convertToCommandLineArguments(PageRankComputation.class.getCanonicalName());
System.exit(ToolRunner.run(new GiraphRunner(), giraphArgs));
} else if (analytic.equals("pr-norm")) {

logger.info("Analytic: PageRank-Normalized");
DGAConfiguration requiredConf = new DGAConfiguration();
requiredConf.setDGAGiraphProperty("-eif", DGATextEdgeValueInputFormat.class.getCanonicalName());
requiredConf.setDGAGiraphProperty("-vof", PageRankVertexOutputFormat.class.getCanonicalName());
requiredConf.setDGAGiraphProperty("-eip", inputPath);
requiredConf.setDGAGiraphProperty("-mc", PageRankMasterCompute.class.getCanonicalName());
requiredConf.setDGAGiraphProperty("-op", outputPath);
requiredConf.setDGAGiraphProperty("-vsd", outputPath);
requiredConf.setCustomProperty(DGAEdgeTDTOutputFormat.WRITE_VERTEX_VALUE, "true");
DGAConfiguration finalConf = DGAConfiguration.coalesce(fileConf, commandLineConf, requiredConf);

finalConf.setLibDir(libDir);

String[] giraphArgs = finalConf.convertToCommandLineArguments(NormalizedPageRankComputation.class.getCanonicalName());
System.exit(ToolRunner.run(new GiraphRunner(), giraphArgs));

} else if (analytic.equals("scan1")) {
logger.info("Analytic: Scan1");
DGAConfiguration requiredConf = new DGAConfiguration();
requiredConf.setDGAGiraphProperty("-eif", UndirectedIntCsvEdgeInputFormat.class.getCanonicalName());
requiredConf.setDGAGiraphProperty("-vof", IntVertexOutputFormat.class.getCanonicalName());
requiredConf.setDGAGiraphProperty("-eip", inputPath);
requiredConf.setDGAGiraphProperty("-op", outputPath);
requiredConf.setDGAGiraphProperty("-vsd", outputPath);
requiredConf.setCustomProperty("io.edge.reverse.duplicator", "true");
DGAConfiguration minimalDefaults = new DGAConfiguration();
minimalDefaults.setCustomProperty(DGAEdgeTTTOutputFormat.WRITE_VERTEX_VALUE, "true");
DGAConfiguration finalConf = DGAConfiguration.coalesce(minimalDefaults, fileConf, commandLineConf, requiredConf);

finalConf.setLibDir(libDir);

String[] giraphArgs = finalConf.convertToCommandLineArguments(Scan1Computation.class.getCanonicalName());
System.exit(ToolRunner.run(new GiraphRunner(), giraphArgs));

} else if (analytic.equals("tricount")) {
logger.info("Analytic: Triangle Counting");
DGAConfiguration requiredConf = new DGAConfiguration();
requiredConf.setDGAGiraphProperty("-eif", UndirectedIntCsvEdgeInputFormat.class.getCanonicalName());
requiredConf.setDGAGiraphProperty("-vof", IntVertexOutputFormat.class.getCanonicalName());
requiredConf.setDGAGiraphProperty("-mc", TriangleCountMasterCompute.class.getCanonicalName());
requiredConf.setDGAGiraphProperty("-eip", inputPath);
requiredConf.setDGAGiraphProperty("-op", outputPath);
requiredConf.setDGAGiraphProperty("-vsd", outputPath);
//requiredConf.setCustomProperty("giraph.messageCombinerClass", TriangleCountCombiner.class.getCanonicalName());
DGAConfiguration minimalDefaults = new DGAConfiguration();
minimalDefaults.setCustomProperty(DGAEdgeTTTOutputFormat.WRITE_VERTEX_VALUE, "true");
DGAConfiguration finalConf = DGAConfiguration.coalesce(minimalDefaults, fileConf, commandLineConf, requiredConf);

finalConf.setLibDir(libDir);

String[] giraphArgs = finalConf.convertToCommandLineArguments(TriangleCountComputation.class.getCanonicalName());
System.exit(ToolRunner.run(new GiraphRunner(), giraphArgs));

} else if (analytic.equals("bfstree")) {
logger.info("Analytic: BFS Tree");
DGAConfiguration requiredConf = new DGAConfiguration();
requiredConf.setDGAGiraphProperty("-eif", DirectedIntCsvEdgeInputFormat.class.getCanonicalName());
requiredConf.setDGAGiraphProperty("-vof", IntVertexOutputFormat.class.getCanonicalName());
requiredConf.setDGAGiraphProperty("-mc", BfsTreeMasterCompute.class.getCanonicalName());
requiredConf.setDGAGiraphProperty("-eip", inputPath);
requiredConf.setDGAGiraphProperty("-op", outputPath);
requiredConf.setDGAGiraphProperty("-vsd", outputPath);
requiredConf.setCustomProperty("giraph.messageCombinerClass", IntIntMaxMessageCombiner.class.getCanonicalName());
DGAConfiguration minimalDefaults = new DGAConfiguration();
minimalDefaults.setCustomProperty(DGAEdgeTTTOutputFormat.WRITE_VERTEX_VALUE, "true");
DGAConfiguration finalConf = DGAConfiguration.coalesce(minimalDefaults, fileConf, commandLineConf, requiredConf);

finalConf.setLibDir(libDir);

String[] giraphArgs = finalConf.convertToCommandLineArguments(BfsTreeComputation.class.getCanonicalName());
System.exit(ToolRunner.run(new GiraphRunner(), giraphArgs));
}





} catch (Exception e) {
logger.error("Unable to run analytic; ", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.soteradefense.dga.combiners;


import org.apache.hadoop.io.IntWritable;
import org.apache.giraph.combiner.MessageCombiner;

/**
* For IntWriteable Vertex Ids sending IntWritable messages where only a single message per vertex
* is desired. Simply use the message with the highest value, and throw away others.
*
* Created by ekimbrel on 9/24/15.
*
*/
public class IntIntMaxMessageCombiner implements MessageCombiner<IntWritable,IntWritable> {

public void combine(IntWritable vertexId, IntWritable originalMessage, IntWritable messageToCombine) {
int original = originalMessage.get();
int other = messageToCombine.get();
if (other > original){
originalMessage.set(other);
}
}


public IntWritable createInitialMessage() {
return new IntWritable(Integer.MIN_VALUE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.soteradefense.dga.io.formats;

import com.soteradefense.dga.DGALoggingUtil;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.ReverseEdgeDuplicator;
import org.apache.giraph.io.formats.TextEdgeInputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.giraph.utils.IntPair;

import java.io.IOException;

/**
* Read a simple undirected Edge List in csv format. "VertexId,VertexId" no edge values.
*/
public class DirectedIntCsvEdgeInputFormat extends TextEdgeInputFormat<IntWritable,NullWritable>{

public EdgeReader<IntWritable, NullWritable> createEdgeReader(InputSplit split, TaskAttemptContext context) throws IOException {
return new DirectedIntCsvEdgeReader();
}

public class DirectedIntCsvEdgeReader extends TextEdgeReaderFromEachLineProcessed<IntPair> {
@Override
protected IntPair preprocessLine(Text line) throws IOException {
String[] tokens = line.toString().split(",");
return new IntPair(Integer.parseInt(tokens[0]),Integer.parseInt(tokens[1]));
}

@Override
protected IntWritable getSourceVertexId(IntPair endpoints)
throws IOException {
return new IntWritable(endpoints.getFirst());
}

@Override
protected IntWritable getTargetVertexId(IntPair endpoints)
throws IOException {
return new IntWritable(endpoints.getSecond());
}

@Override
protected NullWritable getValue(IntPair endpoints) throws IOException {
return NullWritable.get();
}


}

}
Loading