Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support updateIfPresent/updateIfAbsent operation #1897

Merged
merged 7 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ public Id addPropertyKey(PropertyKey key) {
return this.hugegraph.addPropertyKey(key);
}

@Override
public void updatePropertyKey(PropertyKey key) {
verifySchemaPermission(HugePermission.WRITE, key);
this.hugegraph.updatePropertyKey(key);
}

@Override
public Id removePropertyKey(Id key) {
PropertyKey pkey = this.hugegraph.propertyKey(key);
Expand Down Expand Up @@ -240,6 +246,12 @@ public void addVertexLabel(VertexLabel label) {
this.hugegraph.addVertexLabel(label);
}

@Override
public void updateVertexLabel(VertexLabel label) {
verifySchemaPermission(HugePermission.WRITE, label);
this.hugegraph.updateVertexLabel(label);
}

@Override
public Id removeVertexLabel(Id id) {
VertexLabel label = this.hugegraph.vertexLabel(id);
Expand Down Expand Up @@ -293,6 +305,12 @@ public void addEdgeLabel(EdgeLabel label) {
this.hugegraph.addEdgeLabel(label);
}

@Override
public void updateEdgeLabel(EdgeLabel label) {
verifySchemaPermission(HugePermission.WRITE, label);
this.hugegraph.updateEdgeLabel(label);
}

@Override
public Id removeEdgeLabel(Id id) {
EdgeLabel label = this.hugegraph.edgeLabel(id);
Expand Down Expand Up @@ -339,6 +357,12 @@ public void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel) {
this.hugegraph.addIndexLabel(schemaLabel, indexLabel);
}

@Override
public void updateIndexLabel(IndexLabel label) {
verifySchemaPermission(HugePermission.WRITE, label);
this.hugegraph.updateIndexLabel(label);
}

@Override
public Id removeIndexLabel(Id id) {
IndexLabel label = this.hugegraph.indexLabel(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,49 +227,45 @@ private void mutate(CassandraSessionPool.Session session,
LOG.warn("The entry will be ignored due to no change: {}", entry);
}

CassandraTable table;
if (!entry.olap()) {
// Oltp table
table = this.table(entry.type());
} else {
if (entry.type().isIndex()) {
// Olap index
table = this.table(this.olapTableName(entry.type()));
} else {
// Olap vertex
table = this.table(this.olapTableName(entry.subId()));
}
}

switch (item.action()) {
case INSERT:
// Insert olap vertex
if (entry.olap()) {
this.table(this.olapTableName(entry.subId()))
.insert(session, entry.row());
break;
}
// Insert entry
if (entry.selfChanged()) {
this.table(entry.type()).insert(session, entry.row());
table.insert(session, entry.row());
}
// Insert sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
this.table(row.type()).insert(session, row);
}
break;
case DELETE:
// Delete olap vertex index by index label
if (entry.olap()) {
this.table(this.olapTableName(entry.type()))
.delete(session, entry.row());
break;
}
// Delete entry
if (entry.selfChanged()) {
this.table(entry.type()).delete(session, entry.row());
table.delete(session, entry.row());
}
// Delete sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
this.table(row.type()).delete(session, row);
}
break;
case APPEND:
// Append olap vertex index
if (entry.olap()) {
this.table(this.olapTableName(entry.type()))
.append(session, entry.row());
break;
}
// Append entry
if (entry.selfChanged()) {
this.table(entry.type()).append(session, entry.row());
table.append(session, entry.row());
}
// Append sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
Expand All @@ -279,13 +275,27 @@ private void mutate(CassandraSessionPool.Session session,
case ELIMINATE:
// Eliminate entry
if (entry.selfChanged()) {
this.table(entry.type()).eliminate(session, entry.row());
table.eliminate(session, entry.row());
}
// Eliminate sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
this.table(row.type()).eliminate(session, row);
}
break;
case UPDATE_IF_PRESENT:
if (entry.selfChanged()) {
// TODO: forward to master-writer node
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: override updateIfAbsent for cassandra by INSERT xx IF NOT EXISTS
https://docs.datastax.com/en/cql-oss/3.3/cql/cql_reference/cqlInsert.html

table.updateIfPresent(session, entry.row());
}
assert entry.subRows().isEmpty() : entry.subRows();
break;
case UPDATE_IF_ABSENT:
if (entry.selfChanged()) {
// TODO: forward to master-writer node
table.updateIfAbsent(session, entry.row());
}
assert entry.subRows().isEmpty() : entry.subRows();
break;
default:
throw new AssertionError(String.format(
"Unsupported mutate action: %s", item.action()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.baidu.hugegraph.backend.query.Aggregate;
import com.baidu.hugegraph.backend.query.Condition;
import com.baidu.hugegraph.backend.query.Condition.Relation;
import com.baidu.hugegraph.backend.query.IdQuery;
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.query.Query.Order;
import com.baidu.hugegraph.backend.store.BackendEntry;
Expand All @@ -46,6 +47,7 @@
import com.baidu.hugegraph.exception.NotFoundException;
import com.baidu.hugegraph.exception.NotSupportException;
import com.baidu.hugegraph.iterator.ExtendableIterator;
import com.baidu.hugegraph.iterator.WrappedIterator;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.HugeKeys;
import com.baidu.hugegraph.util.CopyUtil;
Expand Down Expand Up @@ -97,6 +99,18 @@ protected void registerMetaHandlers() {
});
}

@Override
public boolean queryExist(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
Query query = new IdQuery.OneIdQuery(HugeType.UNKNOWN, entry.id());
Iterator<BackendEntry> iter = this.query(session, query);
try {
return iter.hasNext();
} finally {
WrappedIterator.close(iter);
}
}

@Override
public Number queryNumber(CassandraSessionPool.Session session,
Query query) {
Expand Down
12 changes: 10 additions & 2 deletions hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public interface HugeGraph extends Graph {

Id addPropertyKey(PropertyKey key);

void updatePropertyKey(PropertyKey key);

Id removePropertyKey(Id key);

Id clearPropertyKey(PropertyKey propertyKey);
Expand All @@ -84,7 +86,9 @@ public interface HugeGraph extends Graph {

boolean existsPropertyKey(String key);

void addVertexLabel(VertexLabel vertexLabel);
void addVertexLabel(VertexLabel label);

void updateVertexLabel(VertexLabel label);

Id removeVertexLabel(Id label);

Expand All @@ -100,7 +104,9 @@ public interface HugeGraph extends Graph {

boolean existsLinkLabel(Id vertexLabel);

void addEdgeLabel(EdgeLabel edgeLabel);
void addEdgeLabel(EdgeLabel label);

void updateEdgeLabel(EdgeLabel label);

Id removeEdgeLabel(Id label);

Expand All @@ -116,6 +122,8 @@ public interface HugeGraph extends Graph {

void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel);

void updateIndexLabel(IndexLabel label);

Id removeIndexLabel(Id label);

Id rebuildIndex(SchemaElement schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,12 @@ public Id addPropertyKey(PropertyKey pkey) {
return this.schemaTransaction().addPropertyKey(pkey);
}

@Override
public void updatePropertyKey(PropertyKey pkey) {
assert this.name.equals(pkey.graph().name());
this.schemaTransaction().updatePropertyKey(pkey);
}

@Override
public Id removePropertyKey(Id pkey) {
if (this.propertyKey(pkey).olap()) {
Expand Down Expand Up @@ -756,9 +762,15 @@ public boolean existsPropertyKey(String name) {
}

@Override
public void addVertexLabel(VertexLabel vertexLabel) {
assert this.name.equals(vertexLabel.graph().name());
this.schemaTransaction().addVertexLabel(vertexLabel);
public void addVertexLabel(VertexLabel label) {
assert this.name.equals(label.graph().name());
this.schemaTransaction().addVertexLabel(label);
}

@Override
public void updateVertexLabel(VertexLabel label) {
assert this.name.equals(label.graph().name());
this.schemaTransaction().updateVertexLabel(label);
}

@Override
Expand Down Expand Up @@ -812,9 +824,15 @@ public boolean existsLinkLabel(Id vertexLabel) {
}

@Override
public void addEdgeLabel(EdgeLabel edgeLabel) {
assert this.name.equals(edgeLabel.graph().name());
this.schemaTransaction().addEdgeLabel(edgeLabel);
public void addEdgeLabel(EdgeLabel label) {
assert this.name.equals(label.graph().name());
this.schemaTransaction().addEdgeLabel(label);
}

@Override
public void updateEdgeLabel(EdgeLabel label) {
assert this.name.equals(label.graph().name());
this.schemaTransaction().updateEdgeLabel(label);
}

@Override
Expand Down Expand Up @@ -863,6 +881,12 @@ public void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel) {
this.schemaTransaction().addIndexLabel(schemaLabel, indexLabel);
}

@Override
public void updateIndexLabel(IndexLabel label) {
assert this.name.equals(label.graph().name());
this.schemaTransaction().updateIndexLabel(label);
}

@Override
public Id removeIndexLabel(Id id) {
return this.schemaTransaction().removeIndexLabel(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.backend.id.Id;
Expand Down Expand Up @@ -209,6 +210,16 @@ private static Id generateId(HugeType type, String name) {
return IdGenerator.of(type.string() + "-" + name);
}

@Override
protected void updateSchema(SchemaElement schema,
Consumer<SchemaElement> updateCallback) {
super.updateSchema(schema, updateCallback);

this.updateCache(schema);

this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
}

@Override
protected void addSchema(SchemaElement schema) {
super.addSchema(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,32 @@ protected void registerMetaHandlers() {
// pass
}

public void updateIfPresent(Session session, Entry entry) {
// TODO: use fine-grained row lock
synchronized (this.table) {
assert session == null || !session.hasChanges();
if (this.queryExist(session, entry)) {
this.insert(session, entry);
if (session != null) {
session.commit();
}
}
}
}

public void updateIfAbsent(Session session, Entry entry) {
// TODO: use fine-grained row lock
synchronized (this.table) {
assert session == null || !session.hasChanges();
if (!this.queryExist(session, entry)) {
this.insert(session, entry);
if (session != null) {
session.commit();
}
}
}
}

/**
* Mapping query-type to table-type
* @param query origin query
Expand Down Expand Up @@ -112,6 +138,8 @@ public static final String joinTableName(String prefix, String table) {

public abstract Number queryNumber(Session session, Query query);

public abstract boolean queryExist(Session session, Entry entry);

public abstract void insert(Session session, Entry entry);

public abstract void delete(Session session, Entry entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ protected void mutate(BackendAction item) {
LOG.debug("[store {}] eliminate entry: {}", this.store, entry);
table.eliminate(null, entry);
break;
case UPDATE_IF_PRESENT:
table.updateIfPresent(null, entry);
break;
case UPDATE_IF_ABSENT:
table.updateIfAbsent(null, entry);
break;
default:
throw new BackendException("Unsupported mutate type: %s",
item.action());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.baidu.hugegraph.util.InsertionOrderUtil;
import com.baidu.hugegraph.util.Log;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;

public class InMemoryDBTable extends BackendTable<BackendSession,
TextBackendEntry> {
Expand Down Expand Up @@ -133,6 +134,12 @@ public void eliminate(BackendSession session, TextBackendEntry entry) {
}
}

@Override
public boolean queryExist(BackendSession session, TextBackendEntry entry) {
List<Id> ids = ImmutableList.of(entry.id());
return !this.queryById(ids, this.store).isEmpty();
}

@Override
public Number queryNumber(BackendSession session, Query query) {
Aggregate aggregate = query.aggregateNotNull();
Expand Down
Loading