Skip to content

Commit

Permalink
Add output
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Aug 21, 2024
1 parent 8ab455e commit d8b0dbc
Show file tree
Hide file tree
Showing 23 changed files with 1,450 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ build/
*.pyc

# maven ignore
output/

apache-hugegraph-*-incubating-*/
*.war
*.zip
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.computer.core.output;

import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.worker.Computation;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

public abstract class AbstractComputerOutput implements ComputerOutput {

private static final Logger LOG = Log.logger(ComputerOutput.class);

private String name;
private int partition;

@Override
public void init(Config config, int partition) {
Computation<?> computation = config.createObject(
ComputerOptions.WORKER_COMPUTATION_CLASS);
this.name = computation.name();
this.partition = partition;

LOG.info("Start write back partition {} for {}",
this.partition(), this.name());
}

@Override
public String name() {
return this.name;
}

public int partition() {
return this.partition;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.computer.core.output;

import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
import org.apache.hugegraph.computer.core.worker.Computation;

/**
* Computer output is used to output computer results. There is an output object
* for every partition.
*/
public interface ComputerOutput {

/**
* Initialize the output. Create connection to target output system.
*/
void init(Config config, int partition);

/**
* For each vertex in partition, this method is called regardless
* vertex's status.
*/
void write(Vertex vertex);

/**
* Write filter.
* True to commit the computation result, otherwise not to commit.
*/
default boolean filter(Config config, Computation computation, Vertex vertex) {
return true;
}

/**
* Merge output files of multiple partitions if applicable.
*/
default void mergePartitions(Config config) {
// pass
}

/**
* Close the connection to target output system. Commit if target output
* required.
*/
void close();

/**
* The name of output property.
*/
String name();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.computer.core.output.hg;

import java.util.ArrayList;
import java.util.List;

import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
import org.apache.hugegraph.computer.core.output.AbstractComputerOutput;
import org.apache.hugegraph.computer.core.output.hg.task.TaskManager;
import org.apache.hugegraph.driver.HugeClient;
import org.apache.hugegraph.structure.constant.WriteType;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

public abstract class HugeGraphOutput<T> extends AbstractComputerOutput {

private static final Logger LOG = Log.logger(HugeGraphOutput.class);

private TaskManager taskManager;
private List<org.apache.hugegraph.structure.graph.Vertex> localVertices;
private int batchSize;
private WriteType writeType;

@Override
public void init(Config config, int partition) {
super.init(config, partition);

this.taskManager = new TaskManager(config);
this.batchSize = config.get(ComputerOptions.OUTPUT_BATCH_SIZE);
this.localVertices = new ArrayList<>(this.batchSize);
this.writeType = WriteType.valueOf(
config.get(ComputerOptions.OUTPUT_RESULT_WRITE_TYPE));

this.prepareSchema();
}

public HugeClient client() {
return this.taskManager.client();
}

@Override
public void write(Vertex vertex) {
this.localVertices.add(this.constructHugeVertex(vertex));
if (this.localVertices.size() >= this.batchSize) {
this.commit();
}
}

@Override
public void close() {
if (!this.localVertices.isEmpty()) {
this.commit();
}
this.taskManager.waitFinished();
this.taskManager.shutdown();
LOG.info("End write back partition {}", this.partition());
}

protected void commit() {
this.taskManager.submitBatch(this.localVertices);
LOG.info("Write back {} vertices", this.localVertices.size());

this.localVertices = new ArrayList<>(this.batchSize);
}

protected org.apache.hugegraph.structure.graph.Vertex constructHugeVertex(
Vertex vertex) {
org.apache.hugegraph.structure.graph.Vertex hugeVertex =
new org.apache.hugegraph.structure.graph.Vertex(null);
hugeVertex.id(vertex.id().asObject());
hugeVertex.property(this.name(), this.value(vertex));
return hugeVertex;
}

protected T value(Vertex vertex) {
@SuppressWarnings("unchecked")
T value = (T) vertex.value().value();
return value;
}

protected WriteType writeType() {
return this.writeType;
}

protected abstract void prepareSchema();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.computer.core.output.hg.exceptions;

public class WriteBackException extends RuntimeException {

private static final long serialVersionUID = 5504623124963497613L;

public WriteBackException(String message) {
super(message);
}

public WriteBackException(String message, Throwable cause) {
super(message, cause);
}

public WriteBackException(String message, Object... args) {
super(String.format(message, args));
}

public WriteBackException(String message, Throwable cause, Object... args) {
super(String.format(message, args), cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.computer.core.output.hg.metrics;

import java.util.concurrent.atomic.LongAdder;

public final class LoadMetrics {

private final LongAdder insertSuccess;
private final LongAdder insertFailure;

public LoadMetrics() {
this.insertSuccess = new LongAdder();
this.insertFailure = new LongAdder();
}

public long insertSuccess() {
return this.insertSuccess.longValue();
}

public void plusInsertSuccess(long count) {
this.insertSuccess.add(count);
}

public long insertFailure() {
return this.insertFailure.longValue();
}

public void increaseInsertFailure() {
this.insertFailure.increment();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.computer.core.output.hg.metrics;

public final class LoadReport {

private long vertexInsertSuccess;
private long vertexInsertFailure;

public long vertexInsertSuccess() {
return this.vertexInsertSuccess;
}

public long vertexInsertFailure() {
return this.vertexInsertFailure;
}

public static LoadReport collect(LoadSummary summary) {
LoadReport report = new LoadReport();
LoadMetrics metrics = summary.metrics();
report.vertexInsertSuccess += metrics.insertSuccess();
report.vertexInsertFailure += metrics.insertFailure();
return report;
}
}
Loading

0 comments on commit d8b0dbc

Please sign in to comment.