Skip to content

Commit

Permalink
support time slot for batch write (#33)
Browse files Browse the repository at this point in the history
* rebase master

* refactor ssl name

* fix NebulaPool's not serializable

* fix typo

* support time solt for batch write
  • Loading branch information
Nicole00 authored Nov 2, 2021
1 parent 7272b2e commit caf8ade
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@

package org.apache.flink.connector.nebula.sink;

import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.net.Session;
import java.util.Map;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NebulaBatchExecutor<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,6 +50,10 @@ public class NebulaBatchOutputFormat<T> extends RichOutputFormat<T> implements F
private ExecutionOptions executionOptions;
private List<String> errorBuffer = new ArrayList<>();

private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile boolean closed = false;

public NebulaBatchOutputFormat(NebulaGraphConnectionProvider graphProvider,
NebulaMetaConnectionProvider metaProvider) {
this.graphProvider = graphProvider;
Expand Down Expand Up @@ -102,13 +111,28 @@ public void open(int i, int i1) throws IOException {
executionOptions.getLabel());
nebulaBatchExecutor = new NebulaEdgeBatchExecutor(executionOptions, vidType, schema);
}
// start the schedule task: submit the buffer records every batchInterval.
// If batchIntervalMs is 0, do not start the scheduler task.
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatch() != 1) {
this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory(
"nebula-write-output-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
synchronized (NebulaBatchOutputFormat.this) {
if (!closed) {
commit();
}
} },
executionOptions.getBatchIntervalMs(),
executionOptions.getBatchIntervalMs(),
TimeUnit.MILLISECONDS);
}
}

/**
* write one record to buffer
*/
@Override
public final synchronized void writeRecord(T row) throws IOException {
public final synchronized void writeRecord(T row) {
nebulaBatchExecutor.addToBatch(row);

if (numPendingRow.incrementAndGet() >= executionOptions.getBatch()) {
Expand All @@ -119,7 +143,7 @@ public final synchronized void writeRecord(T row) throws IOException {
/**
* commit batch insert statements
*/
private synchronized void commit() throws IOException {
private synchronized void commit() {
String errorExec = nebulaBatchExecutor.executeBatch(session);
if (errorExec != null) {
errorBuffer.add(errorExec);
Expand All @@ -132,21 +156,28 @@ private synchronized void commit() throws IOException {
* commit the batch write operator before release connection
*/
@Override
public final synchronized void close() throws IOException {
if (numPendingRow.get() > 0) {
commit();
}
if (!errorBuffer.isEmpty()) {
LOG.error("insert error statements: {}", errorBuffer);
}
if (session != null) {
session.release();
}
if (nebulaPool != null) {
nebulaPool.close();
}
if (metaClient != null) {
metaClient.close();
public final synchronized void close() {
if (!closed) {
closed = true;
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
scheduler.shutdown();
}
if (numPendingRow != null && numPendingRow.get() > 0) {
commit();
}
if (!errorBuffer.isEmpty()) {
LOG.error("insert error statements: {}", errorBuffer);
}
if (session != null) {
session.release();
}
if (nebulaPool != null) {
nebulaPool.close();
}
if (metaClient != null) {
metaClient.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ void addToBatch(T record) {

@Override
String executeBatch(Session session) {
if (nebulaEdgeList.size() == 0) {
return null;
}
NebulaEdges nebulaEdges = new NebulaEdges(executionOptions.getLabel(),
executionOptions.getFields(), nebulaEdgeList, executionOptions.getPolicy(),
executionOptions.getPolicy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.connector.nebula.utils.NebulaEdge;
import org.apache.flink.connector.nebula.utils.NebulaUtils;
import org.apache.flink.connector.nebula.utils.PolicyEnum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.connector.nebula.utils.NebulaUtils;
import org.apache.flink.connector.nebula.utils.NebulaVertex;
import org.apache.flink.connector.nebula.utils.PolicyEnum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public void open(Configuration parameters) throws Exception {
}

@Override
public void close() throws Exception {
public void close() {
outPutFormat.close();
}

@Override
public void invoke(T value, Context context) throws Exception {
public void invoke(T value, Context context) {
checkErrorAndRethrow();
outPutFormat.writeRecord(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ void addToBatch(T record) {

@Override
String executeBatch(Session session) {
if (nebulaVertexList.size() == 0) {
return null;
}
NebulaVertices nebulaVertices = new NebulaVertices(executionOptions.getLabel(),
executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy());
// generate the write ngql statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.apache.flink.connector.nebula.statement;

import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_BATCH_INTERVAL_MS;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_ROW_INFO_INDEX;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_SCAN_LIMIT;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_WRITE_BATCH;
Expand Down Expand Up @@ -41,10 +42,10 @@ public class EdgeExecutionOptions extends ExecutionOptions {
private EdgeExecutionOptions(String graphSpace, String executeStatement, List<String> fields,
List<Integer> positions, boolean noColumn, int limit,
long startTime, long endTime, long batch, PolicyEnum policy,
WriteModeEnum mode,
String edge, int srcIndex, int dstIndex, int rankIndex) {
WriteModeEnum mode, String edge, int srcIndex, int dstIndex,
int rankIndex, long batchIntervalMs) {
super(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime,
endTime, batch, policy, mode);
endTime, batch, policy, mode, batchIntervalMs);
this.edge = edge;
this.srcIndex = srcIndex;
this.dstIndex = dstIndex;
Expand Down Expand Up @@ -88,6 +89,7 @@ public static class ExecutionOptionBuilder {
private long startTime = 0;
private long endTime = Long.MAX_VALUE;
private int batch = DEFAULT_WRITE_BATCH;
private long batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
private PolicyEnum policy = null;
private WriteModeEnum mode = WriteModeEnum.INSERT;
private int srcIndex = DEFAULT_ROW_INFO_INDEX;
Expand Down Expand Up @@ -171,6 +173,11 @@ public ExecutionOptionBuilder setWriteMode(WriteModeEnum mode) {
return this;
}

public ExecutionOptionBuilder setBathIntervalMs(long batchIntervalMs) {
this.batchIntervalMs = batchIntervalMs;
return this;
}

public ExecutionOptions builder() {
if (graphSpace == null || graphSpace.trim().isEmpty()) {
throw new IllegalArgumentException("graph space can not be empty.");
Expand All @@ -180,8 +187,7 @@ public ExecutionOptions builder() {
}
return new EdgeExecutionOptions(graphSpace, executeStatement, fields, positions,
noColumn, limit, startTime, endTime, batch, policy, mode, edge, srcIndex,
dstIndex,
rankIndex);
dstIndex, rankIndex, batchIntervalMs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ public abstract class ExecutionOptions implements Serializable {
*/
private WriteModeEnum writeMode;

/**
* interval between write submit
*/
private long batchIntervalMs;


protected ExecutionOptions(String graphSpace,
String executeStatement,
Expand All @@ -143,7 +148,8 @@ protected ExecutionOptions(String graphSpace,
long endTime,
long batch,
PolicyEnum policy,
WriteModeEnum writeMode) {
WriteModeEnum writeMode,
long batchIntervalMs) {
this.graphSpace = graphSpace;

this.executeStatement = executeStatement;
Expand All @@ -156,6 +162,7 @@ protected ExecutionOptions(String graphSpace,
this.batch = batch;
this.policy = policy;
this.writeMode = writeMode;
this.batchIntervalMs = batchIntervalMs;
}

public String getGraphSpace() {
Expand Down Expand Up @@ -206,6 +213,10 @@ public WriteModeEnum getWriteMode() {
return writeMode;
}

public long getBatchIntervalMs() {
return batchIntervalMs;
}

@Override
public String toString() {
return "ExecutionOptions{"
Expand All @@ -220,6 +231,7 @@ public String toString() {
+ ", batch=" + batch
+ ", policy=" + policy
+ ", mode=" + writeMode
+ ", batchIntervalMs=" + batchIntervalMs
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.apache.flink.connector.nebula.statement;

import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_BATCH_INTERVAL_MS;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_ROW_INFO_INDEX;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_SCAN_LIMIT;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_WRITE_BATCH;
Expand Down Expand Up @@ -39,9 +40,10 @@ public VertexExecutionOptions(String graphSpace,
PolicyEnum policy,
WriteModeEnum mode,
String tag,
int idIndex) {
int idIndex,
long batchIntervalMs) {
super(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime,
endTime, batch, policy, mode);
endTime, batch, policy, mode, batchIntervalMs);
this.tag = tag;
this.idIndex = idIndex;
}
Expand Down Expand Up @@ -71,6 +73,7 @@ public static class ExecutionOptionBuilder {
private long startTime = 0;
private long endTime = Long.MAX_VALUE;
private int batch = DEFAULT_WRITE_BATCH;
private long batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
private PolicyEnum policy = null;
private WriteModeEnum mode = WriteModeEnum.INSERT;
private int idIndex = DEFAULT_ROW_INFO_INDEX;
Expand Down Expand Up @@ -146,6 +149,11 @@ public ExecutionOptionBuilder setWriteMode(WriteModeEnum mode) {
return this;
}

public ExecutionOptionBuilder setBathIntervalMs(long batchIntervalMs) {
this.batchIntervalMs = batchIntervalMs;
return this;
}

public ExecutionOptions builder() {
if (graphSpace == null || graphSpace.trim().isEmpty()) {
throw new IllegalArgumentException("graph space can not be empty.");
Expand All @@ -155,7 +163,7 @@ public ExecutionOptions builder() {
}
return new VertexExecutionOptions(graphSpace, executeStatement, fields,
positions, noColumn, limit, startTime, endTime, batch, policy, mode, tag,
idIndex);
idIndex, batchIntervalMs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class NebulaConstant {
// default value for read & write
public static final int DEFAULT_SCAN_LIMIT = 2000;
public static final int DEFAULT_WRITE_BATCH = 2000;
public static final long DEFAULT_BATCH_INTERVAL_MS = 0;
public static final int DEFAULT_ROW_INFO_INDEX = -1;

// default value for connection
Expand Down

0 comments on commit caf8ade

Please sign in to comment.