Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support UPDATE and DELETE wite mode #25

Merged
merged 6 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NebulaBatchExecutor<T> {
private static final Logger LOG = LoggerFactory.getLogger(NebulaBatchExecutor.class);
public abstract class NebulaBatchExecutor<T> {

private final ExecutionOptions executionOptions;
private final NebulaBufferedRow nebulaBufferedRow;
private final boolean isVertex;
private final Map<String, Integer> schema;
private final VidTypeEnum vidType;
protected final ExecutionOptions executionOptions;
protected final Map<String, Integer> schema;
protected final VidTypeEnum vidType;

public NebulaBatchExecutor(ExecutionOptions executionOptions, boolean isVertex,
public NebulaBatchExecutor(ExecutionOptions executionOptions,
VidTypeEnum vidType, Map<String, Integer> schema) {
this.executionOptions = executionOptions;
this.nebulaBufferedRow = new NebulaBufferedRow();
this.isVertex = isVertex;
this.vidType = vidType;
this.schema = schema;
}
Expand All @@ -40,50 +35,12 @@ public NebulaBatchExecutor(ExecutionOptions executionOptions, boolean isVertex,
*
* @param record represent vertex or edge
*/
void addToBatch(T record) {
NebulaOutputFormatConverter converter;
if (isVertex) {
converter = new NebulaRowVertexOutputFormatConverter(
(VertexExecutionOptions) executionOptions, vidType, schema);
} else {
converter = new NebulaRowEdgeOutputFormatConverter(
(EdgeExecutionOptions) executionOptions, vidType, schema);
}
String value = converter.createValue(record, executionOptions.getPolicy());
if (value == null) {
return;
}
nebulaBufferedRow.putRow(value);
}
abstract void addToBatch(T record);

/**
* execute the insert statement
*
* @param session graph session
*/
String executeBatch(Session session) {
String propNames = String.join(NebulaConstant.COMMA, executionOptions.getFields());
String values = String.join(NebulaConstant.COMMA, nebulaBufferedRow.getRows());
String exec = String.format(NebulaConstant.BATCH_INSERT_TEMPLATE,
executionOptions.getDataType(), executionOptions.getLabel(), propNames, values);
LOG.info("insert statement={}", exec);
ResultSet execResult = null;
try {
execResult = session.execute(exec);
} catch (Exception e) {
LOG.error("insert error:", e);
nebulaBufferedRow.clean();
return exec;
}

if (execResult.isSucceeded()) {
LOG.debug("insert success");
} else {
LOG.error("insert failed: {}", execResult.getErrorMessage());
nebulaBufferedRow.clean();
return exec;
}
nebulaBufferedRow.clean();
return null;
}
abstract String executeBatch(Session session);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ public void open(int i, int i1) throws IOException {
LOG.error("failed to get graph session, ", e);
throw new IOException("get graph session error, ", e);
}
ResultSet resultSet;
try {
resultSet = session.execute("USE " + executionOptions.getGraphSpace());
} catch (IOErrorException e) {
LOG.error("switch space error, ", e);
throw new IOException("switch space error,", e);
}
if (!resultSet.isSucceeded()) {
LOG.error("switch space failed, {}", resultSet.getErrorMessage());
throw new RuntimeException("switch space failed, " + resultSet.getErrorMessage());
}

try {
metaClient = metaProvider.getMetaClient();
} catch (TException e) {
Expand All @@ -77,11 +89,12 @@ public void open(int i, int i1) throws IOException {
if (isVertex) {
schema = metaProvider.getTagSchema(metaClient, executionOptions.getGraphSpace(),
executionOptions.getLabel());
nebulaBatchExecutor = new NebulaVertexBatchExecutor(executionOptions, vidType, schema);
} else {
schema = metaProvider.getEdgeSchema(metaClient, executionOptions.getGraphSpace(),
executionOptions.getLabel());
nebulaBatchExecutor = new NebulaEdgeBatchExecutor(executionOptions, vidType, schema);
}
nebulaBatchExecutor = new NebulaBatchExecutor(executionOptions, isVertex, vidType, schema);
}

/**
Expand All @@ -100,24 +113,12 @@ public final synchronized void writeRecord(T row) throws IOException {
* commit batch insert statements
*/
private synchronized void commit() throws IOException {
ResultSet resultSet;
try {
resultSet = session.execute("USE " + executionOptions.getGraphSpace());
} catch (IOErrorException e) {
LOG.error("switch space error, ", e);
throw new IOException("switch space error,", e);
}

if (resultSet.isSucceeded()) {
String errorExec = nebulaBatchExecutor.executeBatch(session);
if (errorExec != null) {
errorBuffer.add(errorExec);
}
long pendingRow = numPendingRow.get();
numPendingRow.compareAndSet(pendingRow, 0);
} else {
LOG.error("switch space failed, ", resultSet.getErrorMessage());
String errorExec = nebulaBatchExecutor.executeBatch(session);
if (errorExec != null) {
errorBuffer.add(errorExec);
}
long pendingRow = numPendingRow.get();
numPendingRow.compareAndSet(pendingRow, 0);
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

package org.apache.flink.connector.nebula.sink;

import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.net.Session;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.utils.NebulaEdge;
import org.apache.flink.connector.nebula.utils.NebulaEdges;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NebulaEdgeBatchExecutor<T> extends NebulaBatchExecutor<T> {
private static final Logger LOG = LoggerFactory.getLogger(NebulaEdgeBatchExecutor.class);
private final List<NebulaEdge> nebulaEdgeList;

public NebulaEdgeBatchExecutor(ExecutionOptions executionOptions,
VidTypeEnum vidType, Map<String, Integer> schema) {
super(executionOptions, vidType, schema);
nebulaEdgeList = new ArrayList<>();
}

/**
* put record into buffer
*/
@Override
void addToBatch(T record) {
NebulaRowEdgeOutputFormatConverter converter =
new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions,
vidType, schema);
NebulaEdge edge = converter.createEdge((Row) record, executionOptions.getPolicy());
if (edge == null) {
return;
}
nebulaEdgeList.add(edge);
}

@Override
String executeBatch(Session session) {
NebulaEdges nebulaEdges = new NebulaEdges(executionOptions.getLabel(),
executionOptions.getFields(), nebulaEdgeList, executionOptions.getPolicy(),
executionOptions.getPolicy());
// generate the write ngql statement
String statement = null;
switch (executionOptions.getWriteMode()) {
case INSERT:
statement = nebulaEdges.getInsertStatement();
break;
case UPDATE:
statement = nebulaEdges.getUpdateStatement();
break;
case DELETE:
statement = nebulaEdges.getDeleteStatement();
break;
default:
throw new IllegalArgumentException("write mode {} is not supported");
Nicole00 marked this conversation as resolved.
Show resolved Hide resolved
}
LOG.debug("write statement={}", statement);

// execute ngql statement
ResultSet execResult = null;
try {
execResult = session.execute(statement);
} catch (Exception e) {
LOG.error("write data error, ", e);
nebulaEdgeList.clear();
return statement;
}

if (execResult.isSucceeded()) {
LOG.debug("write success");
} else {
LOG.error("write data failed: {}", execResult.getErrorMessage());
nebulaEdgeList.clear();
return statement;
}
nebulaEdgeList.clear();
return null;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@
package org.apache.flink.connector.nebula.sink;

import com.esotericsoftware.minlog.Log;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.connector.nebula.utils.NebulaEdge;
import org.apache.flink.connector.nebula.utils.NebulaUtils;
import org.apache.flink.connector.nebula.utils.PolicyEnum;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.apache.flink.types.Row;

public class NebulaRowEdgeOutputFormatConverter implements NebulaOutputFormatConverter<Row> {
public class NebulaRowEdgeOutputFormatConverter implements Serializable {

private final int srcIdIndex;
private final int dstIdIndex;
Expand All @@ -44,25 +46,27 @@ public NebulaRowEdgeOutputFormatConverter(EdgeExecutionOptions executionOptions,
}
}

@Override
public String createValue(Row row, PolicyEnum policy) {

public NebulaEdge createEdge(Row row, PolicyEnum policy) {
// check row data
if (row == null || row.getArity() == 0) {
Log.error("empty row");
return null;
}

Object srcId = row.getField(srcIdIndex);
Object dstId = row.getField(dstIdIndex);
if (srcId == null || dstId == null) {
return null;
Nicole00 marked this conversation as resolved.
Show resolved Hide resolved
}
// extract edge properties
List<String> edgeProps = new ArrayList<>();
for (int i : positions) {
String propName = pos2Field.get(i);
int type = schema.get(propName);
edgeProps.add(NebulaUtils.extraValue(row.getField(i), type));
}

// format edge source id and target id
String srcFormatId = srcId.toString();
String dstFormatId = dstId.toString();

Expand All @@ -76,20 +80,19 @@ public String createValue(Row row, PolicyEnum policy) {
}
} else {
assert (vidType == VidTypeEnum.INT);
srcFormatId = String.format(NebulaConstant.ENDPOINT_TEMPLATE, policy.policy(),
srcId.toString());
dstFormatId = String.format(NebulaConstant.ENDPOINT_TEMPLATE, policy.policy(),
dstId.toString());
}

// extract edge rank
Long rank = null;
if (rankIndex >= 0) {
assert row.getField(rankIndex) != null;
Long rank = Long.parseLong(row.getField(rankIndex).toString());
return String.format(NebulaConstant.EDGE_VALUE_TEMPLATE, srcFormatId, dstFormatId,
rank, String.join(NebulaConstant.COMMA, edgeProps));
} else {
return String.format(NebulaConstant.EDGE_VALUE_WITHOUT_RANKING_TEMPLATE, srcFormatId,
dstFormatId, String.join(NebulaConstant.COMMA, edgeProps));
if (row.getField(rankIndex) == null) {
rank = 0L;
} else {
rank = Long.parseLong(row.getField(rankIndex).toString());
}
}

NebulaEdge edge = new NebulaEdge(srcFormatId, dstFormatId, rank, edgeProps);
return edge;
}
}
Loading