Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support all insert/update/delete operations for dynamic table sink #81

Merged
merged 6 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,18 @@ NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOp
NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);

ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
VertexExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setTag("player")
.setIdIndex(0)
.setFields(Arrays.asList("name", "age"))
.setPositions(Arrays.asList(1, 2))
.setBatch(2)
.builder();
.setBatchSize(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We suggest not change the user interfaces in current version, may update them in next major version.
vesoft-inc/nebula-java#486 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! Yeah I think it's a good idea to ensure compatibility of user-facing interfaces within the same major version.

I've added back the methods such as .setBatch() and .builder() and marked them as @Deprecated, while in README.md I use the new methods. Hopefully this can encourage users to move to the new methods, without breaking existing code. Does this approach look fine to you? @Nicole00

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully this can encourage

Great approach, thanks for changing it.

.build();

NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
NebulaVertexBatchOutputFormat outputFormat = new NebulaVertexBatchOutputFormat(
graphConnectionProvider, metaConnectionProvider, executionOptions);
NebulaSinkFunction<Row> nebulaSinkFunction = new NebulaSinkFunction<>(outputFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
Row record = new org.apache.flink.types.Row(row.size());
for (int i = 0; i < row.size(); i++) {
Expand Down
7 changes: 7 additions & 0 deletions connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeated dependency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this test-jar as a test dependency so that we can use the 'values' table connector when writing Flink integration tests. This connector allows us to build a table from test data in the code. I've seen this used in the official JDBC connector as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it, got a new approach for testing.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public VidTypeEnum getVidType(MetaClient metaClient, String space) {
spaceItem = metaClient.getSpace(space);
} catch (TException | ExecuteFailedException e) {
LOG.error("get space info error, ", e);
return null;
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good change, thanks~

}
PropertyType vidType = spaceItem.getProperties().getVid_type().getType();
if (vidType == PropertyType.FIXED_STRING) {
Expand All @@ -105,7 +105,7 @@ public Map<String, Integer> getTagSchema(MetaClient metaClient, String space, St
tagSchema = metaClient.getTag(space, tag);
} catch (TException | ExecuteFailedException e) {
LOG.error("get tag schema error, ", e);
return schema;
throw new RuntimeException(e);
}
List<ColumnDef> columnDefs = tagSchema.getColumns();
for (ColumnDef col : columnDefs) {
Expand All @@ -128,7 +128,7 @@ public Map<String, Integer> getEdgeSchema(MetaClient metaClient, String space, S
edgeSchema = metaClient.getEdge(space, edge);
} catch (TException | ExecuteFailedException e) {
LOG.error("get edge schema error, ", e);
return schema;
throw new RuntimeException(e);
}
List<ColumnDef> columnDefs = edgeSchema.getColumns();
for (ColumnDef col : columnDefs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,20 @@
package org.apache.flink.connector.nebula.sink;

import com.vesoft.nebula.client.graph.net.Session;
import java.util.Map;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;

public abstract class NebulaBatchExecutor<T> {

protected final ExecutionOptions executionOptions;
protected final Map<String, Integer> schema;
protected final VidTypeEnum vidType;

public NebulaBatchExecutor(ExecutionOptions executionOptions,
VidTypeEnum vidType, Map<String, Integer> schema) {
this.executionOptions = executionOptions;
this.vidType = vidType;
this.schema = schema;
}
public interface NebulaBatchExecutor<T> {

/**
* put record into buffer
*
* @param record represent vertex or edge
*/
abstract void addToBatch(T record);
void addToBatch(T record);

/**
* execute the insert statement
* execute the statement
*
* @param session graph session
*/
abstract String executeBatch(Session session);
String executeBatch(Session session);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -30,33 +29,35 @@
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NebulaBatchOutputFormat<T> extends RichOutputFormat<T> implements Flushable {
public abstract class NebulaBatchOutputFormat<T, OptionsT extends ExecutionOptions>
extends RichOutputFormat<T> implements Flushable {
private static final Logger LOG = LoggerFactory.getLogger(NebulaBatchOutputFormat.class);
private static final long serialVersionUID = 8846672119763512586L;
protected MetaClient metaClient;
protected NebulaBatchExecutor nebulaBatchExecutor;
protected NebulaMetaConnectionProvider metaProvider;
protected ExecutionOptions executionOptions;
protected final NebulaMetaConnectionProvider metaProvider;
protected final NebulaGraphConnectionProvider graphProvider;
protected final OptionsT executionOptions;
protected NebulaBatchExecutor<T> nebulaBatchExecutor;
private volatile AtomicLong numPendingRow;
private NebulaPool nebulaPool;
private Session session;
private NebulaGraphConnectionProvider graphProvider;
private List<String> errorBuffer = new ArrayList<>();
private final List<String> errorBuffer = new ArrayList<>();

private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile boolean closed = false;

public NebulaBatchOutputFormat(
NebulaGraphConnectionProvider graphProvider,
NebulaMetaConnectionProvider metaProvider) {
NebulaMetaConnectionProvider metaProvider,
OptionsT executionOptions) {
this.graphProvider = graphProvider;
this.metaProvider = metaProvider;
this.executionOptions = executionOptions;
}

@Override
Expand Down Expand Up @@ -97,10 +98,10 @@ public void open(int i, int i1) throws IOException {
}

numPendingRow = new AtomicLong(0);
setNebulaBatchExecutor();
nebulaBatchExecutor = createNebulaBatchExecutor();
// start the schedule task: submit the buffer records every batchInterval.
// If batchIntervalMs is 0, do not start the scheduler task.
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatch() != 1) {
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory(
"nebula-write-output-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
Expand All @@ -115,27 +116,7 @@ public void open(int i, int i1) throws IOException {
}
}

protected void setNebulaBatchExecutor() {
VidTypeEnum vidType = metaProvider.getVidType(metaClient, executionOptions.getGraphSpace());
boolean isVertex = executionOptions.getDataType().isVertex();
Map<String, Integer> schema;
if (isVertex) {
schema =
metaProvider.getTagSchema(
metaClient,
executionOptions.getGraphSpace(),
executionOptions.getLabel());
nebulaBatchExecutor =
new NebulaVertexBatchExecutor<T>(executionOptions, vidType, schema);
} else {
schema =
metaProvider.getEdgeSchema(
metaClient,
executionOptions.getGraphSpace(),
executionOptions.getLabel());
nebulaBatchExecutor = new NebulaEdgeBatchExecutor<T>(executionOptions, vidType, schema);
}
}
protected abstract NebulaBatchExecutor<T> createNebulaBatchExecutor();

/**
* write one record to buffer
Expand All @@ -144,7 +125,7 @@ protected void setNebulaBatchExecutor() {
public final synchronized void writeRecord(T row) {
nebulaBatchExecutor.addToBatch(row);

if (numPendingRow.incrementAndGet() >= executionOptions.getBatch()) {
if (numPendingRow.incrementAndGet() >= executionOptions.getBatchSize()) {
commit();
}
}
Expand Down Expand Up @@ -199,9 +180,4 @@ public synchronized void flush() throws IOException {
commit();
}
}

public NebulaBatchOutputFormat<T> setExecutionOptions(ExecutionOptions executionOptions) {
this.executionOptions = executionOptions;
return this;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,40 @@
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.utils.NebulaEdge;
import org.apache.flink.connector.nebula.utils.NebulaEdges;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NebulaEdgeBatchExecutor<T> extends NebulaBatchExecutor<T> {
public class NebulaEdgeBatchExecutor implements NebulaBatchExecutor<Row> {
private static final Logger LOG = LoggerFactory.getLogger(NebulaEdgeBatchExecutor.class);
protected final List<NebulaEdge> nebulaEdgeList;
private final EdgeExecutionOptions executionOptions;
private final List<NebulaEdge> nebulaEdgeList;
private final NebulaRowEdgeOutputFormatConverter converter;

public NebulaEdgeBatchExecutor(ExecutionOptions executionOptions,
public NebulaEdgeBatchExecutor(EdgeExecutionOptions executionOptions,
VidTypeEnum vidType, Map<String, Integer> schema) {
super(executionOptions, vidType, schema);
nebulaEdgeList = new ArrayList<>();
this.executionOptions = executionOptions;
this.nebulaEdgeList = new ArrayList<>();
this.converter = new NebulaRowEdgeOutputFormatConverter(executionOptions, vidType, schema);
}

/**
* put record into buffer
*/
@Override
void addToBatch(T record) {
NebulaRowEdgeOutputFormatConverter converter =
new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions,
vidType, schema);
NebulaEdge edge = converter.createEdge((Row) record, executionOptions.getPolicy());
public void addToBatch(Row record) {
NebulaEdge edge = converter.createEdge(record, executionOptions.getPolicy());
if (edge == null) {
return;
}
nebulaEdgeList.add(edge);
}

@Override
String executeBatch(Session session) {
public String executeBatch(Session session) {
if (nebulaEdgeList.size() == 0) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.apache.flink.connector.nebula.sink;

import java.util.Map;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.apache.flink.types.Row;

public class NebulaEdgeBatchOutputFormat
extends NebulaBatchOutputFormat<Row, EdgeExecutionOptions> {
public NebulaEdgeBatchOutputFormat(NebulaGraphConnectionProvider graphProvider,
NebulaMetaConnectionProvider metaProvider,
EdgeExecutionOptions executionOptions) {
super(graphProvider, metaProvider, executionOptions);
}

@Override
protected NebulaBatchExecutor<Row> createNebulaBatchExecutor() {
VidTypeEnum vidType = metaProvider.getVidType(metaClient, executionOptions.getGraphSpace());
Map<String, Integer> schema = metaProvider.getEdgeSchema(
metaClient,
executionOptions.getGraphSpace(),
executionOptions.getLabel());
return new NebulaEdgeBatchExecutor(executionOptions, vidType, schema);
}
}
Loading