Skip to content

Commit

Permalink
Support all insert/update/delete operations for dynamic table sink (#81)
Browse files Browse the repository at this point in the history
* Support configuring batching options

* Support insert/update/delete for dynamic table sink

* Add tests

* Fix time zone issue

* Fix 'UPDATE_BEFORE' issue

* Address PR comments
  • Loading branch information
linhr authored Feb 15, 2023
1 parent 29a0db2 commit 91e8132
Show file tree
Hide file tree
Showing 36 changed files with 1,396 additions and 946 deletions.
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,18 @@ NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOp
NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);
ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
VertexExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setTag("player")
.setIdIndex(0)
.setFields(Arrays.asList("name", "age"))
.setPositions(Arrays.asList(1, 2))
.setBatch(2)
.builder();
.setBatchSize(2)
.build();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
NebulaVertexBatchOutputFormat outputFormat = new NebulaVertexBatchOutputFormat(
graphConnectionProvider, metaConnectionProvider, executionOptions);
NebulaSinkFunction<Row> nebulaSinkFunction = new NebulaSinkFunction<>(outputFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
Row record = new org.apache.flink.types.Row(row.size());
for (int i = 0; i < row.size(); i++) {
Expand Down
7 changes: 7 additions & 0 deletions connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public VidTypeEnum getVidType(MetaClient metaClient, String space) {
spaceItem = metaClient.getSpace(space);
} catch (TException | ExecuteFailedException e) {
LOG.error("get space info error, ", e);
return null;
throw new RuntimeException(e);
}
PropertyType vidType = spaceItem.getProperties().getVid_type().getType();
if (vidType == PropertyType.FIXED_STRING) {
Expand All @@ -105,7 +105,7 @@ public Map<String, Integer> getTagSchema(MetaClient metaClient, String space, St
tagSchema = metaClient.getTag(space, tag);
} catch (TException | ExecuteFailedException e) {
LOG.error("get tag schema error, ", e);
return schema;
throw new RuntimeException(e);
}
List<ColumnDef> columnDefs = tagSchema.getColumns();
for (ColumnDef col : columnDefs) {
Expand All @@ -128,7 +128,7 @@ public Map<String, Integer> getEdgeSchema(MetaClient metaClient, String space, S
edgeSchema = metaClient.getEdge(space, edge);
} catch (TException | ExecuteFailedException e) {
LOG.error("get edge schema error, ", e);
return schema;
throw new RuntimeException(e);
}
List<ColumnDef> columnDefs = edgeSchema.getColumns();
for (ColumnDef col : columnDefs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,20 @@
package org.apache.flink.connector.nebula.sink;

import com.vesoft.nebula.client.graph.net.Session;
import java.util.Map;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;

public abstract class NebulaBatchExecutor<T> {

protected final ExecutionOptions executionOptions;
protected final Map<String, Integer> schema;
protected final VidTypeEnum vidType;

public NebulaBatchExecutor(ExecutionOptions executionOptions,
VidTypeEnum vidType, Map<String, Integer> schema) {
this.executionOptions = executionOptions;
this.vidType = vidType;
this.schema = schema;
}
public interface NebulaBatchExecutor<T> {

/**
* put record into buffer
*
* @param record represent vertex or edge
*/
abstract void addToBatch(T record);
void addToBatch(T record);

/**
* execute the insert statement
* execute the statement
*
* @param session graph session
*/
abstract String executeBatch(Session session);
String executeBatch(Session session);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -30,33 +29,35 @@
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NebulaBatchOutputFormat<T> extends RichOutputFormat<T> implements Flushable {
public abstract class NebulaBatchOutputFormat<T, OptionsT extends ExecutionOptions>
extends RichOutputFormat<T> implements Flushable {
private static final Logger LOG = LoggerFactory.getLogger(NebulaBatchOutputFormat.class);
private static final long serialVersionUID = 8846672119763512586L;
protected MetaClient metaClient;
protected NebulaBatchExecutor nebulaBatchExecutor;
protected NebulaMetaConnectionProvider metaProvider;
protected ExecutionOptions executionOptions;
protected final NebulaMetaConnectionProvider metaProvider;
protected final NebulaGraphConnectionProvider graphProvider;
protected final OptionsT executionOptions;
protected NebulaBatchExecutor<T> nebulaBatchExecutor;
private volatile AtomicLong numPendingRow;
private NebulaPool nebulaPool;
private Session session;
private NebulaGraphConnectionProvider graphProvider;
private List<String> errorBuffer = new ArrayList<>();
private final List<String> errorBuffer = new ArrayList<>();

private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile boolean closed = false;

public NebulaBatchOutputFormat(
NebulaGraphConnectionProvider graphProvider,
NebulaMetaConnectionProvider metaProvider) {
NebulaMetaConnectionProvider metaProvider,
OptionsT executionOptions) {
this.graphProvider = graphProvider;
this.metaProvider = metaProvider;
this.executionOptions = executionOptions;
}

@Override
Expand Down Expand Up @@ -97,10 +98,10 @@ public void open(int i, int i1) throws IOException {
}

numPendingRow = new AtomicLong(0);
setNebulaBatchExecutor();
nebulaBatchExecutor = createNebulaBatchExecutor();
// start the schedule task: submit the buffer records every batchInterval.
// If batchIntervalMs is 0, do not start the scheduler task.
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatch() != 1) {
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory(
"nebula-write-output-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
Expand All @@ -115,27 +116,7 @@ public void open(int i, int i1) throws IOException {
}
}

protected void setNebulaBatchExecutor() {
VidTypeEnum vidType = metaProvider.getVidType(metaClient, executionOptions.getGraphSpace());
boolean isVertex = executionOptions.getDataType().isVertex();
Map<String, Integer> schema;
if (isVertex) {
schema =
metaProvider.getTagSchema(
metaClient,
executionOptions.getGraphSpace(),
executionOptions.getLabel());
nebulaBatchExecutor =
new NebulaVertexBatchExecutor<T>(executionOptions, vidType, schema);
} else {
schema =
metaProvider.getEdgeSchema(
metaClient,
executionOptions.getGraphSpace(),
executionOptions.getLabel());
nebulaBatchExecutor = new NebulaEdgeBatchExecutor<T>(executionOptions, vidType, schema);
}
}
protected abstract NebulaBatchExecutor<T> createNebulaBatchExecutor();

/**
* write one record to buffer
Expand All @@ -144,7 +125,7 @@ protected void setNebulaBatchExecutor() {
public final synchronized void writeRecord(T row) {
nebulaBatchExecutor.addToBatch(row);

if (numPendingRow.incrementAndGet() >= executionOptions.getBatch()) {
if (numPendingRow.incrementAndGet() >= executionOptions.getBatchSize()) {
commit();
}
}
Expand Down Expand Up @@ -199,9 +180,4 @@ public synchronized void flush() throws IOException {
commit();
}
}

public NebulaBatchOutputFormat<T> setExecutionOptions(ExecutionOptions executionOptions) {
this.executionOptions = executionOptions;
return this;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,40 @@
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.utils.NebulaEdge;
import org.apache.flink.connector.nebula.utils.NebulaEdges;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NebulaEdgeBatchExecutor<T> extends NebulaBatchExecutor<T> {
public class NebulaEdgeBatchExecutor implements NebulaBatchExecutor<Row> {
private static final Logger LOG = LoggerFactory.getLogger(NebulaEdgeBatchExecutor.class);
protected final List<NebulaEdge> nebulaEdgeList;
private final EdgeExecutionOptions executionOptions;
private final List<NebulaEdge> nebulaEdgeList;
private final NebulaRowEdgeOutputFormatConverter converter;

public NebulaEdgeBatchExecutor(ExecutionOptions executionOptions,
public NebulaEdgeBatchExecutor(EdgeExecutionOptions executionOptions,
VidTypeEnum vidType, Map<String, Integer> schema) {
super(executionOptions, vidType, schema);
nebulaEdgeList = new ArrayList<>();
this.executionOptions = executionOptions;
this.nebulaEdgeList = new ArrayList<>();
this.converter = new NebulaRowEdgeOutputFormatConverter(executionOptions, vidType, schema);
}

/**
* put record into buffer
*/
@Override
void addToBatch(T record) {
NebulaRowEdgeOutputFormatConverter converter =
new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions,
vidType, schema);
NebulaEdge edge = converter.createEdge((Row) record, executionOptions.getPolicy());
public void addToBatch(Row record) {
NebulaEdge edge = converter.createEdge(record, executionOptions.getPolicy());
if (edge == null) {
return;
}
nebulaEdgeList.add(edge);
}

@Override
String executeBatch(Session session) {
public String executeBatch(Session session) {
if (nebulaEdgeList.size() == 0) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.apache.flink.connector.nebula.sink;

import java.util.Map;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.apache.flink.types.Row;

public class NebulaEdgeBatchOutputFormat
extends NebulaBatchOutputFormat<Row, EdgeExecutionOptions> {
public NebulaEdgeBatchOutputFormat(NebulaGraphConnectionProvider graphProvider,
NebulaMetaConnectionProvider metaProvider,
EdgeExecutionOptions executionOptions) {
super(graphProvider, metaProvider, executionOptions);
}

@Override
protected NebulaBatchExecutor<Row> createNebulaBatchExecutor() {
VidTypeEnum vidType = metaProvider.getVidType(metaClient, executionOptions.getGraphSpace());
Map<String, Integer> schema = metaProvider.getEdgeSchema(
metaClient,
executionOptions.getGraphSpace(),
executionOptions.getLabel());
return new NebulaEdgeBatchExecutor(executionOptions, vidType, schema);
}
}
Loading

0 comments on commit 91e8132

Please sign in to comment.