From bfe93f03db36e2a62e7e281bd586ca657d39273f Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Thu, 24 Feb 2022 17:23:50 +0800 Subject: [PATCH 1/7] support updateIfPresent/updateIfAbsent operation Change-Id: I63f179e0b6c2e2f86e6ae185ce3a7115a4da7657 --- .../cache/CachedSchemaTransaction.java | 9 ++++++ .../hugegraph/backend/store/BackendTable.java | 29 +++++++++++++++++++ .../backend/store/memory/InMemoryDBStore.java | 6 ++++ .../backend/store/memory/InMemoryDBTable.java | 7 +++++ .../backend/tx/AbstractTransaction.java | 10 +++++++ .../backend/tx/SchemaTransaction.java | 28 ++++++++++++++++-- .../baidu/hugegraph/type/define/Action.java | 10 ++++++- .../backend/store/rocksdb/RocksDBStore.java | 6 ++++ .../backend/store/rocksdb/RocksDBTable.java | 8 +++++ 9 files changed, 109 insertions(+), 4 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java index 0e7a55328c..fb3fe79c2b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java @@ -209,6 +209,15 @@ private static Id generateId(HugeType type, String name) { return IdGenerator.of(type.string() + "-" + name); } + @Override + protected void updateSchema(SchemaElement schema) { + super.updateSchema(schema); + + this.updateCache(schema); + + this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id()); + } + @Override protected void addSchema(SchemaElement schema) { super.addSchema(schema); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java index 7126e4a632..f15f97dc7d 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java @@ -66,6 +66,30 @@ protected void registerMetaHandlers() { // pass } + public void updateIfPresent(Session session, Entry entry) { + // TODO: use fine-grained row lock + synchronized (this.table) { + if (this.queryExist(session, entry)) { + this.insert(session, entry); + if (session != null) { + session.commit(); + } + } + } + } + + public void updateIfAbsent(Session session, Entry entry) { + // TODO: use fine-grained row lock + synchronized (this.table) { + if (!this.queryExist(session, entry)) { + this.insert(session, entry); + if (session != null) { + session.commit(); + } + } + } + } + /** * Mapping query-type to table-type * @param query origin query @@ -112,6 +136,11 @@ public static final String joinTableName(String prefix, String table) { public abstract Number queryNumber(Session session, Query query); +// public abstract boolean queryExist(Session session, Entry entry); + public boolean queryExist(Session session, Entry entry) { + return false; + } + public abstract void insert(Session session, Entry entry); public abstract void delete(Session session, Entry entry); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java index 5d90fc95b1..ead0553531 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java @@ -155,6 +155,12 @@ protected void mutate(BackendAction item) { LOG.debug("[store {}] eliminate entry: {}", this.store, entry); table.eliminate(null, entry); break; + case UPDATE_IF_PRESENT: + table.updateIfPresent(null, entry); + break; + case UPDATE_IF_ABSENT: + table.updateIfAbsent(null, entry); + break; default: throw new BackendException("Unsupported mutate type: %s", item.action()); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBTable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBTable.java index 7d5583dffd..2280488589 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBTable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBTable.java @@ -50,6 +50,7 @@ import com.baidu.hugegraph.util.InsertionOrderUtil; import com.baidu.hugegraph.util.Log; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; public class InMemoryDBTable extends BackendTable { @@ -133,6 +134,12 @@ public void eliminate(BackendSession session, TextBackendEntry entry) { } } + @Override + public boolean queryExist(BackendSession session, TextBackendEntry entry) { + List ids = ImmutableList.of(entry.id()); + return !this.queryById(ids, this.store).isEmpty(); + } + @Override public Number queryNumber(BackendSession session, Query query) { Aggregate aggregate = query.aggregateNotNull(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java index 454ea2af4e..db7d54eaca 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java @@ -414,6 +414,16 @@ public void doRemove(BackendEntry entry) { this.doAction(Action.DELETE, entry); } + @Watched(prefix = "tx") + public void doUpdateIfPresent(BackendEntry entry) { + this.doAction(Action.UPDATE_IF_PRESENT, entry); + } + + @Watched(prefix = "tx") + public void doUpdateIfAbsent(BackendEntry entry) { + this.doAction(Action.UPDATE_IF_ABSENT, entry); + } + protected void doAction(Action action, BackendEntry entry) { LOG.debug("Transaction {} entry {}", action, entry); E.checkNotNull(entry, "entry"); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java index d0496333ef..98cf4309d2 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java @@ -378,13 +378,20 @@ public boolean existsSchemaId(HugeType type, Id id) { } protected void updateSchema(SchemaElement schema) { - this.addSchema(schema); + LOG.debug("SchemaTransaction update {} with id '{}'", + schema.type(), schema.id()); + this.saveSchema(schema, true); } protected void addSchema(SchemaElement schema) { LOG.debug("SchemaTransaction add {} with id '{}'", schema.type(), schema.id()); setCreateTimeIfNeeded(schema); + this.saveSchema(schema, false); + } + + private void saveSchema(SchemaElement schema, boolean update) { + BackendEntry entry = this.serialize(schema); // System schema just put into SystemSchemaStore in memory if (schema.longId() < 0L) { @@ -396,9 +403,19 @@ protected void addSchema(SchemaElement schema) { try { locks.lockWrites(LockUtil.hugeType2Group(schema.type()), schema.id()); + this.beforeWrite(); - this.doInsert(this.serialize(schema)); - this.indexTx.updateNameIndex(schema, false); + + if (update) { + this.doUpdateIfPresent(entry); + // TODO: support updateIfPresent + this.indexTx.updateNameIndex(schema, false); + } else { + // TODO: support updateIfAbsentProperty + this.doInsert(entry); + this.indexTx.updateNameIndex(schema, false); + } + this.afterWrite(); } finally { locks.unlock(); @@ -459,6 +476,11 @@ protected List getAllSchema(HugeType type) { List results = new ArrayList<>(); Query query = new Query(type); Iterator entries = this.query(query).iterator(); + /* + * Can use MapperIterator instead if don't need to debug: + * new MapperIterator<>(entries, entry -> this.deserialize(entry, type)) + * QueryResults.fillList(iter, results); + */ try { while (entries.hasNext()) { BackendEntry entry = entries.next(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/Action.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/Action.java index 8411e929ea..255627d53c 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/Action.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/Action.java @@ -27,7 +27,11 @@ public enum Action implements SerialEnum { ELIMINATE(3, "eliminate"), - DELETE(4, "delete"); + DELETE(4, "delete"), + + UPDATE_IF_PRESENT(5, "update_if_present"), + + UPDATE_IF_ABSENT(6, "update_if_absent"); private final byte code; private final String name; @@ -61,6 +65,10 @@ public static Action fromCode(byte code) { return ELIMINATE; case 4: return DELETE; + case 5: + return UPDATE_IF_PRESENT; + case 6: + return UPDATE_IF_ABSENT; default: throw new AssertionError("Unsupported action code: " + code); } diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java index cd14d3e6da..56230ac931 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -473,6 +473,12 @@ private void mutate(Session session, BackendAction item) { case ELIMINATE: table.eliminate(session, entry); break; + case UPDATE_IF_PRESENT: + table.updateIfPresent(session, entry); + break; + case UPDATE_IF_ABSENT: + table.updateIfAbsent(session, entry); + break; default: throw new AssertionError(String.format( "Unsupported mutate action: %s", item.action())); diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java index 427161bfff..48c9a1af1a 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java @@ -121,6 +121,14 @@ public void eliminate(Session session, BackendEntry entry) { this.delete(session, entry); } + @Override + public boolean queryExist(Session session, BackendEntry entry) { + Id id = entry.id(); + try (BackendColumnIterator iter = this.queryById(session, id)) { + return iter.hasNext(); + } + } + @Override public Number queryNumber(Session session, Query query) { Aggregate aggregate = query.aggregateNotNull(); From ce5da349f3fd29cbfbcd19d487631055e5cb37fc Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Mon, 28 Feb 2022 20:11:00 +0800 Subject: [PATCH 2/7] implement for other backends Change-Id: Iab9ab42ba0504664ce3320c8905b2d92d01d849a --- .../store/cassandra/CassandraStore.java | 54 +++++++++++-------- .../store/cassandra/CassandraTable.java | 14 +++++ .../hugegraph/backend/store/BackendTable.java | 7 ++- .../backend/store/hbase/HbaseStore.java | 6 +++ .../backend/store/hbase/HbaseTable.java | 8 +++ .../backend/store/mysql/MysqlStore.java | 6 +++ .../backend/store/mysql/MysqlTable.java | 18 ++++++- 7 files changed, 85 insertions(+), 28 deletions(-) diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java index 75bf12f0c8..147664312b 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java @@ -227,17 +227,25 @@ private void mutate(CassandraSessionPool.Session session, LOG.warn("The entry will be ignored due to no change: {}", entry); } + CassandraTable table; + if (!entry.olap()) { + // Oltp table + table = this.table(entry.type()); + } else { + if (entry.type().isIndex()) { + // Olap index + table = this.table(this.olapTableName(entry.type())); + } else { + // Olap vertex + table = this.table(this.olapTableName(entry.subId())); + } + } + switch (item.action()) { case INSERT: - // Insert olap vertex - if (entry.olap()) { - this.table(this.olapTableName(entry.subId())) - .insert(session, entry.row()); - break; - } // Insert entry if (entry.selfChanged()) { - this.table(entry.type()).insert(session, entry.row()); + table.insert(session, entry.row()); } // Insert sub rows (edges) for (CassandraBackendEntry.Row row : entry.subRows()) { @@ -245,15 +253,9 @@ private void mutate(CassandraSessionPool.Session session, } break; case DELETE: - // Delete olap vertex index by index label - if (entry.olap()) { - this.table(this.olapTableName(entry.type())) - .delete(session, entry.row()); - break; - } // Delete entry if (entry.selfChanged()) { - this.table(entry.type()).delete(session, entry.row()); + table.delete(session, entry.row()); } // Delete sub rows (edges) for (CassandraBackendEntry.Row row : entry.subRows()) { @@ -261,15 +263,9 @@ private void mutate(CassandraSessionPool.Session session, } break; case APPEND: - // Append olap vertex index - if (entry.olap()) { - this.table(this.olapTableName(entry.type())) - .append(session, entry.row()); - break; - } // Append entry if (entry.selfChanged()) { - this.table(entry.type()).append(session, entry.row()); + table.append(session, entry.row()); } // Append sub rows (edges) for (CassandraBackendEntry.Row row : entry.subRows()) { @@ -279,13 +275,27 @@ private void mutate(CassandraSessionPool.Session session, case ELIMINATE: // Eliminate entry if (entry.selfChanged()) { - this.table(entry.type()).eliminate(session, entry.row()); + table.eliminate(session, entry.row()); } // Eliminate sub rows (edges) for (CassandraBackendEntry.Row row : entry.subRows()) { this.table(row.type()).eliminate(session, row); } break; + case UPDATE_IF_PRESENT: + if (entry.selfChanged()) { + // TODO: forward to master-writer node + table.updateIfPresent(session, entry.row()); + } + assert entry.subRows().isEmpty() : entry.subRows(); + break; + case UPDATE_IF_ABSENT: + if (entry.selfChanged()) { + // TODO: forward to master-writer node + table.updateIfAbsent(session, entry.row()); + } + assert entry.subRows().isEmpty() : entry.subRows(); + break; default: throw new AssertionError(String.format( "Unsupported mutate action: %s", item.action())); diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java index 548726b4a4..f9d65fe0ff 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java @@ -38,6 +38,7 @@ import com.baidu.hugegraph.backend.query.Aggregate; import com.baidu.hugegraph.backend.query.Condition; import com.baidu.hugegraph.backend.query.Condition.Relation; +import com.baidu.hugegraph.backend.query.IdQuery; import com.baidu.hugegraph.backend.query.Query; import com.baidu.hugegraph.backend.query.Query.Order; import com.baidu.hugegraph.backend.store.BackendEntry; @@ -46,6 +47,7 @@ import com.baidu.hugegraph.exception.NotFoundException; import com.baidu.hugegraph.exception.NotSupportException; import com.baidu.hugegraph.iterator.ExtendableIterator; +import com.baidu.hugegraph.iterator.WrappedIterator; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.HugeKeys; import com.baidu.hugegraph.util.CopyUtil; @@ -97,6 +99,18 @@ protected void registerMetaHandlers() { }); } + @Override + public boolean queryExist(CassandraSessionPool.Session session, + CassandraBackendEntry.Row entry) { + Query query = new IdQuery.OneIdQuery(HugeType.UNKNOWN, entry.id()); + Iterator iter = this.query(session, query); + try { + return iter.hasNext(); + } finally { + WrappedIterator.close(iter); + } + } + @Override public Number queryNumber(CassandraSessionPool.Session session, Query query) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java index f15f97dc7d..9753076ebf 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java @@ -69,6 +69,7 @@ protected void registerMetaHandlers() { public void updateIfPresent(Session session, Entry entry) { // TODO: use fine-grained row lock synchronized (this.table) { + assert !session.hasChanges(); if (this.queryExist(session, entry)) { this.insert(session, entry); if (session != null) { @@ -81,6 +82,7 @@ public void updateIfPresent(Session session, Entry entry) { public void updateIfAbsent(Session session, Entry entry) { // TODO: use fine-grained row lock synchronized (this.table) { + assert !session.hasChanges(); if (!this.queryExist(session, entry)) { this.insert(session, entry); if (session != null) { @@ -136,10 +138,7 @@ public static final String joinTableName(String prefix, String table) { public abstract Number queryNumber(Session session, Query query); -// public abstract boolean queryExist(Session session, Entry entry); - public boolean queryExist(Session session, Entry entry) { - return false; - } + public abstract boolean queryExist(Session session, Entry entry); public abstract void insert(Session session, Entry entry); diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java index 628048a5d7..9cd5c233a4 100644 --- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java +++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java @@ -215,6 +215,12 @@ private void mutate(Session session, BackendAction item) { case ELIMINATE: table.eliminate(session, entry); break; + case UPDATE_IF_PRESENT: + table.updateIfPresent(session, entry); + break; + case UPDATE_IF_ABSENT: + table.updateIfAbsent(session, entry); + break; default: throw new AssertionError(String.format( "Unsupported mutate action: %s", item.action())); diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java index 3cca9a5dcc..b4009dd9c5 100644 --- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java +++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java @@ -144,6 +144,14 @@ public void eliminate(Session session, BackendEntry entry) { this.delete(session, entry); } + @Override + public boolean queryExist(Session session, BackendEntry entry) { + Id id = entry.id(); + try (RowIterator iter = this.queryById(session, id)) { + return iter.hasNext(); + } + } + @Override public Number queryNumber(Session session, Query query) { Aggregate aggregate = query.aggregateNotNull(); diff --git a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java index 858b22edb5..90318c8db2 100644 --- a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java +++ b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java @@ -260,6 +260,12 @@ private void mutate(Session session, BackendAction item) { case ELIMINATE: table.eliminate(session, entry.row()); break; + case UPDATE_IF_PRESENT: + table.updateIfPresent(session, entry.row()); + break; + case UPDATE_IF_ABSENT: + table.updateIfAbsent(session, entry.row()); + break; default: throw new AssertionError(String.format( "Unsupported mutate action: %s", item.action())); diff --git a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java index f872b6f56a..727513334d 100644 --- a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java +++ b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java @@ -39,6 +39,7 @@ import com.baidu.hugegraph.backend.query.Aggregate; import com.baidu.hugegraph.backend.query.Condition; import com.baidu.hugegraph.backend.query.ConditionQuery; +import com.baidu.hugegraph.backend.query.IdQuery; import com.baidu.hugegraph.backend.query.Query; import com.baidu.hugegraph.backend.store.BackendEntry; import com.baidu.hugegraph.backend.store.BackendTable; @@ -48,6 +49,8 @@ import com.baidu.hugegraph.backend.store.mysql.MysqlSessions.Session; import com.baidu.hugegraph.exception.NotFoundException; import com.baidu.hugegraph.iterator.ExtendableIterator; +import com.baidu.hugegraph.iterator.WrappedIterator; +import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.HugeKeys; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.Log; @@ -328,6 +331,17 @@ public void eliminate(Session session, MysqlBackendEntry.Row entry) { this.delete(session, entry); } + @Override + public boolean queryExist(Session session, MysqlBackendEntry.Row entry) { + Query query = new IdQuery.OneIdQuery(HugeType.UNKNOWN, entry.id()); + Iterator iter = this.query(session, query); + try { + return iter.hasNext(); + } finally { + WrappedIterator.close(iter); + } + } + @Override public Number queryNumber(Session session, Query query) { Aggregate aggregate = query.aggregateNotNull(); @@ -353,8 +367,8 @@ public Iterator query(Session session, Query query) { } protected Iterator query(Session session, Query query, - BiFunction> - parser) { + BiFunction> parser) { ExtendableIterator rs = new ExtendableIterator<>(); if (query.limit() == 0L && !query.noLimit()) { From 73c7fb810d47c0b39cd5a94bbd07fc1790703593 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Tue, 1 Mar 2022 18:15:24 +0800 Subject: [PATCH 3/7] fix mysql/pgsql Change-Id: I397fd5d300c563cd9298b76ab336bff78f2564ea --- .../backend/store/mysql/MysqlTable.java | 201 +++++++++++++----- .../store/postgresql/PostgresqlTable.java | 47 ++-- 2 files changed, 183 insertions(+), 65 deletions(-) diff --git a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java index 727513334d..69efd96bda 100644 --- a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java +++ b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java @@ -63,10 +63,12 @@ public abstract class MysqlTable private static final String DECIMAL = "DECIMAL"; - // The template for insert and delete statements + // The template cache for insert and delete statements private String insertTemplate; private String insertTemplateTtl; private String deleteTemplate; + private String updateIfPresentTemplate; + private String updateIfAbsentTemplate; private final MysqlShardSplitter shardSplitter; @@ -75,6 +77,9 @@ public MysqlTable(String table) { this.insertTemplate = null; this.insertTemplateTtl = null; this.deleteTemplate = null; + this.updateIfPresentTemplate = null; + this.updateIfAbsentTemplate = null; + this.shardSplitter = new MysqlShardSplitter(this.table()); } @@ -178,31 +183,56 @@ protected List idColumnValue(Id id) { return ImmutableList.of(id.asObject()); } - protected String buildInsertTemplate(MysqlBackendEntry.Row entry) { - if (entry.ttl() != 0L) { - return this.buildInsertTemplateWithTtl(entry); - } - if (this.insertTemplate != null) { - return this.insertTemplate; + protected void insertOrUpdate(Session session, String template, + List params) { + PreparedStatement insertStmt; + try { + // Create or get insert prepare statement + insertStmt = session.prepareStatement(template); + int i = 1; + for (Object param : params) { + insertStmt.setObject(i++, param); + } + } catch (SQLException e) { + throw new BackendException("Failed to prepare statement '%s' " + + "with params: %s", template, params); } - - this.insertTemplate = this.buildInsertTemplateForce(entry); - return this.insertTemplate; + session.add(insertStmt); } - protected String buildInsertTemplateWithTtl(MysqlBackendEntry.Row entry) { - assert entry.ttl() != 0L; - if (this.insertTemplateTtl != null) { + protected final String buildUpdateTemplate(MysqlBackendEntry.Row entry) { + if (entry.ttl() != 0L) { + if (this.insertTemplateTtl != null) { + return this.insertTemplateTtl; + } + + this.insertTemplateTtl = this.buildUpdateForcedTemplate(entry); return this.insertTemplateTtl; + } else { + if (this.insertTemplate != null) { + return this.insertTemplate; + } + + this.insertTemplate = this.buildUpdateForcedTemplate(entry); + return this.insertTemplate; } + } - this.insertTemplateTtl = this.buildInsertTemplateForce(entry); - return this.insertTemplateTtl; + protected String buildUpdateForcedTemplate(MysqlBackendEntry.Row entry) { + StringBuilder insert = new StringBuilder(); + insert.append("REPLACE INTO ").append(this.table()); + return this.buildInsertKeys(insert, entry); } - protected String buildInsertTemplateForce(MysqlBackendEntry.Row entry) { + protected String buildUpdateIfAbsentTemplate(MysqlBackendEntry.Row entry) { StringBuilder insert = new StringBuilder(); - insert.append("REPLACE INTO ").append(this.table()).append(" ("); + insert.append("INSERT IGNORE INTO ").append(this.table()); + return this.buildInsertKeys(insert, entry); + } + + protected String buildInsertKeys(StringBuilder insert, + MysqlBackendEntry.Row entry) { + insert.append(" ("); int i = 0; int n = entry.columns().size(); @@ -213,7 +243,7 @@ protected String buildInsertTemplateForce(MysqlBackendEntry.Row entry) { } } insert.append(") VALUES ("); - // Fill with '?' + // Fill with '?' as a placeholder for (i = 0; i < n; i++) { insert.append("?"); if (i != n - 1) { @@ -225,11 +255,76 @@ protected String buildInsertTemplateForce(MysqlBackendEntry.Row entry) { return insert.toString(); } - protected String buildDeleteTemplate(List idNames) { - if (this.deleteTemplate != null) { - return this.deleteTemplate; + protected List buildUpdateForcedParams(MysqlBackendEntry.Row entry) { + return this.buildColumnsParams(entry); + } + + protected List buildUpdateIfAbsentParams(MysqlBackendEntry.Row entry) { + return this.buildColumnsParams(entry); + } + + protected List buildColumnsParams(MysqlBackendEntry.Row entry) { + return this.buildColumnsParams(entry, null); + } + + protected List buildColumnsParams(MysqlBackendEntry.Row entry, + List skipKeys) { + List objects = new ArrayList<>(); + for (Map.Entry e : entry.columns().entrySet()) { + HugeKeys key = e.getKey(); + Object value = e.getValue(); + if (skipKeys != null && skipKeys.contains(key)) { + continue; + } + String type = this.tableDefine().columns().get(key); + if (type.startsWith(DECIMAL)) { + value = new BigDecimal(value.toString()); + } + objects.add(value); } + return objects; + } + + protected String buildUpdateIfPresentTemplate(MysqlBackendEntry.Row entry) { + StringBuilder update = new StringBuilder(); + update.append("UPDATE ").append(this.table()); + update.append(" SET "); + + List idNames = this.idColumnName(); + + int i = 0; + int size = entry.columns().size(); + for (HugeKeys key : entry.columns().keySet()) { + if (idNames.contains(key)) { + size--; + continue; + } + update.append(formatKey(key)); + update.append("=?"); + if (++i != size) { + update.append(", "); + } + } + + WhereBuilder where = this.newWhereBuilder(); + where.and(formatKeys(idNames), "="); + update.append(where.build()); + + return update.toString(); + } + + protected List buildUpdateIfPresentParams(MysqlBackendEntry.Row entry) { + List idNames = this.idColumnName(); + List params = this.buildColumnsParams(entry, idNames); + + List idValues = this.idColumnValue(entry); + params.addAll(idValues); + + return params; + } + + protected String buildDeleteTemplate(List idNames) { StringBuilder delete = new StringBuilder(); delete.append("DELETE FROM ").append(this.table()); this.appendPartition(delete); @@ -238,8 +333,7 @@ protected String buildDeleteTemplate(List idNames) { where.and(formatKeys(idNames), "="); delete.append(where.build()); - this.deleteTemplate = delete.toString(); - return this.deleteTemplate; + return delete.toString(); } protected String buildDropTemplate() { @@ -259,40 +353,21 @@ protected void appendPartition(StringBuilder sb) { */ @Override public void insert(Session session, MysqlBackendEntry.Row entry) { - String template = this.buildInsertTemplate(entry); - - PreparedStatement insertStmt; - try { - // Create or get insert prepare statement - insertStmt = session.prepareStatement(template); - int i = 1; - for (Object object : this.buildInsertObjects(entry)) { - insertStmt.setObject(i++, object); - } - } catch (SQLException e) { - throw new BackendException("Failed to prepare statement '%s'" + - "for entry: %s", template, entry); - } - session.add(insertStmt); - } - - protected List buildInsertObjects(MysqlBackendEntry.Row entry) { - List objects = new ArrayList<>(); - for (Map.Entry e : entry.columns().entrySet()) { - Object value = e.getValue(); - String type = this.tableDefine().columns().get(e.getKey()); - if (type.startsWith(DECIMAL)) { - value = new BigDecimal(value.toString()); - } - objects.add(value); - } - return objects; + String template = this.buildUpdateTemplate(entry); + List params = this.buildUpdateForcedParams(entry); + this.insertOrUpdate(session, template, params); } @Override public void delete(Session session, MysqlBackendEntry.Row entry) { List idNames = this.idColumnName(); - String template = this.buildDeleteTemplate(idNames); + + String template = this.deleteTemplate; + if (template == null) { + template = this.buildDeleteTemplate(idNames); + this.deleteTemplate = template; + } + PreparedStatement deleteStmt; try { deleteStmt = session.prepareStatement(template); @@ -331,6 +406,28 @@ public void eliminate(Session session, MysqlBackendEntry.Row entry) { this.delete(session, entry); } + @Override + public void updateIfPresent(Session session, MysqlBackendEntry.Row entry) { + String template = this.updateIfPresentTemplate; + if (template == null) { + template = this.buildUpdateIfPresentTemplate(entry); + this.updateIfPresentTemplate = template; + } + List params = this.buildUpdateIfPresentParams(entry); + this.insertOrUpdate(session, template, params); + } + + @Override + public void updateIfAbsent(Session session, MysqlBackendEntry.Row entry) { + String template = this.updateIfAbsentTemplate; + if (template == null) { + template = this.buildUpdateIfAbsentTemplate(entry); + this.updateIfAbsentTemplate = template; + } + List params = this.buildUpdateIfAbsentParams(entry); + this.insertOrUpdate(session, template, params); + } + @Override public boolean queryExist(Session session, MysqlBackendEntry.Row entry) { Query query = new IdQuery.OneIdQuery(HugeType.UNKNOWN, entry.id()); diff --git a/hugegraph-postgresql/src/main/java/com/baidu/hugegraph/backend/store/postgresql/PostgresqlTable.java b/hugegraph-postgresql/src/main/java/com/baidu/hugegraph/backend/store/postgresql/PostgresqlTable.java index f811274c48..e3b0bea0be 100644 --- a/hugegraph-postgresql/src/main/java/com/baidu/hugegraph/backend/store/postgresql/PostgresqlTable.java +++ b/hugegraph-postgresql/src/main/java/com/baidu/hugegraph/backend/store/postgresql/PostgresqlTable.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.util.Strings; +import com.baidu.hugegraph.backend.serializer.TableBackendEntry.Row; import com.baidu.hugegraph.backend.store.mysql.MysqlBackendEntry; import com.baidu.hugegraph.backend.store.mysql.MysqlSessions.Session; import com.baidu.hugegraph.backend.store.mysql.MysqlTable; @@ -54,15 +55,31 @@ protected String engine(Session session) { } @Override - protected List buildInsertObjects(MysqlBackendEntry.Row entry) { - List objects = new ArrayList<>(); - objects.addAll(super.buildInsertObjects(entry)); - objects.addAll(super.buildInsertObjects(entry)); - return objects; + protected String buildUpdateForcedTemplate(MysqlBackendEntry.Row entry) { + return this.buildInsertKeys(entry, false); } @Override - protected String buildInsertTemplateForce(MysqlBackendEntry.Row entry) { + protected List buildUpdateForcedParams(MysqlBackendEntry.Row entry) { + List params = new ArrayList<>(); + List allColumns = this.buildColumnsParams(entry); + params.addAll(allColumns); + params.addAll(allColumns); + return params; + } + + @Override + protected String buildUpdateIfAbsentTemplate(Row entry) { + return this.buildInsertKeys(entry, true); + } + + @Override + protected List buildUpdateIfAbsentParams(MysqlBackendEntry.Row entry) { + return this.buildColumnsParams(entry); + } + + protected String buildInsertKeys(MysqlBackendEntry.Row entry, + boolean ignoreConflicts) { StringBuilder insert = new StringBuilder(); insert.append("INSERT INTO ").append(this.table()).append(" ("); @@ -95,13 +112,17 @@ protected String buildInsertTemplateForce(MysqlBackendEntry.Row entry) { } insert.append(")"); - i = 0; - size = entry.columns().keySet().size(); - insert.append(" DO UPDATE SET "); - for (HugeKeys key : entry.columns().keySet()) { - insert.append(formatKey(key)).append(" = ?"); - if (++i != size) { - insert.append(", "); + if (ignoreConflicts) { + insert.append(" DO NOTHING"); + } else { + i = 0; + size = entry.columns().keySet().size(); + insert.append(" DO UPDATE SET "); + for (HugeKeys key : entry.columns().keySet()) { + insert.append(formatKey(key)).append(" = ?"); + if (++i != size) { + insert.append(", "); + } } } From 40a5196a5b815f63b3885be4cf9aa03ca59c6ac3 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Wed, 2 Mar 2022 16:44:51 +0800 Subject: [PATCH 4/7] call updateIfAbsent() for schema insert Change-Id: I02989962791fa369600f8dedbfd56e2b40c79077 --- .../com/baidu/hugegraph/backend/store/BackendTable.java | 4 ++-- .../com/baidu/hugegraph/backend/tx/SchemaTransaction.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java index 9753076ebf..f8ec5fe238 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java @@ -69,7 +69,7 @@ protected void registerMetaHandlers() { public void updateIfPresent(Session session, Entry entry) { // TODO: use fine-grained row lock synchronized (this.table) { - assert !session.hasChanges(); + assert session == null || !session.hasChanges(); if (this.queryExist(session, entry)) { this.insert(session, entry); if (session != null) { @@ -82,7 +82,7 @@ public void updateIfPresent(Session session, Entry entry) { public void updateIfAbsent(Session session, Entry entry) { // TODO: use fine-grained row lock synchronized (this.table) { - assert !session.hasChanges(); + assert session == null || !session.hasChanges(); if (!this.queryExist(session, entry)) { this.insert(session, entry); if (session != null) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java index 98cf4309d2..340b66928c 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java @@ -408,11 +408,11 @@ private void saveSchema(SchemaElement schema, boolean update) { if (update) { this.doUpdateIfPresent(entry); - // TODO: support updateIfPresent + // TODO: also support updateIfPresent for index-update this.indexTx.updateNameIndex(schema, false); } else { - // TODO: support updateIfAbsentProperty - this.doInsert(entry); + // TODO: support updateIfAbsentProperty (property: label name) + this.doUpdateIfAbsent(entry); this.indexTx.updateNameIndex(schema, false); } From c1e9ceb9823c0288ea2cf9af1f8d0643b272781a Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Wed, 2 Mar 2022 20:52:15 +0800 Subject: [PATCH 5/7] add updateSchema() api for schema tx Change-Id: I760db3c964f913d3c552c1d85193d9f3e0d401a4 --- .../hugegraph/auth/HugeGraphAuthProxy.java | 24 +++++++++ .../java/com/baidu/hugegraph/HugeGraph.java | 12 ++++- .../baidu/hugegraph/StandardHugeGraph.java | 36 ++++++++++--- .../backend/tx/SchemaTransaction.java | 52 +++++++++++++------ .../schema/builder/EdgeLabelBuilder.java | 4 +- .../schema/builder/IndexLabelBuilder.java | 6 +-- .../schema/builder/PropertyKeyBuilder.java | 4 +- .../schema/builder/VertexLabelBuilder.java | 4 +- 8 files changed, 108 insertions(+), 34 deletions(-) diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java index d3efb5aa35..33c5cfa283 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java @@ -195,6 +195,12 @@ public Id addPropertyKey(PropertyKey key) { return this.hugegraph.addPropertyKey(key); } + @Override + public void updatePropertyKey(PropertyKey key) { + verifySchemaPermission(HugePermission.WRITE, key); + this.hugegraph.updatePropertyKey(key); + } + @Override public Id removePropertyKey(Id key) { PropertyKey pkey = this.hugegraph.propertyKey(key); @@ -240,6 +246,12 @@ public void addVertexLabel(VertexLabel label) { this.hugegraph.addVertexLabel(label); } + @Override + public void updateVertexLabel(VertexLabel label) { + verifySchemaPermission(HugePermission.WRITE, label); + this.hugegraph.updateVertexLabel(label); + } + @Override public Id removeVertexLabel(Id id) { VertexLabel label = this.hugegraph.vertexLabel(id); @@ -293,6 +305,12 @@ public void addEdgeLabel(EdgeLabel label) { this.hugegraph.addEdgeLabel(label); } + @Override + public void updateEdgeLabel(EdgeLabel label) { + verifySchemaPermission(HugePermission.WRITE, label); + this.hugegraph.updateEdgeLabel(label); + } + @Override public Id removeEdgeLabel(Id id) { EdgeLabel label = this.hugegraph.edgeLabel(id); @@ -339,6 +357,12 @@ public void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel) { this.hugegraph.addIndexLabel(schemaLabel, indexLabel); } + @Override + public void updateIndexLabel(IndexLabel label) { + verifySchemaPermission(HugePermission.WRITE, label); + this.hugegraph.updateIndexLabel(label); + } + @Override public Id removeIndexLabel(Id id) { IndexLabel label = this.hugegraph.indexLabel(id); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java index ca388899ce..18d9f795e5 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java @@ -72,6 +72,8 @@ public interface HugeGraph extends Graph { Id addPropertyKey(PropertyKey key); + void updatePropertyKey(PropertyKey key); + Id removePropertyKey(Id key); Id clearPropertyKey(PropertyKey propertyKey); @@ -84,7 +86,9 @@ public interface HugeGraph extends Graph { boolean existsPropertyKey(String key); - void addVertexLabel(VertexLabel vertexLabel); + void addVertexLabel(VertexLabel label); + + void updateVertexLabel(VertexLabel label); Id removeVertexLabel(Id label); @@ -100,7 +104,9 @@ public interface HugeGraph extends Graph { boolean existsLinkLabel(Id vertexLabel); - void addEdgeLabel(EdgeLabel edgeLabel); + void addEdgeLabel(EdgeLabel label); + + void updateEdgeLabel(EdgeLabel label); Id removeEdgeLabel(Id label); @@ -116,6 +122,8 @@ public interface HugeGraph extends Graph { void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel); + void updateIndexLabel(IndexLabel label); + Id removeIndexLabel(Id label); Id rebuildIndex(SchemaElement schema); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java index 217fc5cbba..72f8838899 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java @@ -714,6 +714,12 @@ public Id addPropertyKey(PropertyKey pkey) { return this.schemaTransaction().addPropertyKey(pkey); } + @Override + public void updatePropertyKey(PropertyKey pkey) { + assert this.name.equals(pkey.graph().name()); + this.schemaTransaction().updatePropertyKey(pkey); + } + @Override public Id removePropertyKey(Id pkey) { if (this.propertyKey(pkey).olap()) { @@ -756,9 +762,15 @@ public boolean existsPropertyKey(String name) { } @Override - public void addVertexLabel(VertexLabel vertexLabel) { - assert this.name.equals(vertexLabel.graph().name()); - this.schemaTransaction().addVertexLabel(vertexLabel); + public void addVertexLabel(VertexLabel label) { + assert this.name.equals(label.graph().name()); + this.schemaTransaction().addVertexLabel(label); + } + + @Override + public void updateVertexLabel(VertexLabel label) { + assert this.name.equals(label.graph().name()); + this.schemaTransaction().updateVertexLabel(label); } @Override @@ -812,9 +824,15 @@ public boolean existsLinkLabel(Id vertexLabel) { } @Override - public void addEdgeLabel(EdgeLabel edgeLabel) { - assert this.name.equals(edgeLabel.graph().name()); - this.schemaTransaction().addEdgeLabel(edgeLabel); + public void addEdgeLabel(EdgeLabel label) { + assert this.name.equals(label.graph().name()); + this.schemaTransaction().addEdgeLabel(label); + } + + @Override + public void updateEdgeLabel(EdgeLabel label) { + assert this.name.equals(label.graph().name()); + this.schemaTransaction().updateEdgeLabel(label); } @Override @@ -863,6 +881,12 @@ public void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel) { this.schemaTransaction().addIndexLabel(schemaLabel, indexLabel); } + @Override + public void updateIndexLabel(IndexLabel label) { + assert this.name.equals(label.graph().name()); + this.schemaTransaction().updateIndexLabel(label); + } + @Override public Id removeIndexLabel(Id id) { return this.schemaTransaction().removeIndexLabel(id); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java index 340b66928c..7887fd5141 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java @@ -124,10 +124,15 @@ public List getIndexLabels() { @Watched(prefix = "schema") public Id addPropertyKey(PropertyKey propertyKey) { this.addSchema(propertyKey); - if (propertyKey.olap()) { - return this.createOlapPk(propertyKey); + if (!propertyKey.olap()) { + return IdGenerator.ZERO; } - return IdGenerator.ZERO; + return this.createOlapPk(propertyKey); + } + + @Watched(prefix = "schema") + public void updatePropertyKey(PropertyKey propertyKey) { + this.updateSchema(propertyKey); } @Watched(prefix = "schema") @@ -185,6 +190,11 @@ public void addVertexLabel(VertexLabel vertexLabel) { this.addSchema(vertexLabel); } + @Watched(prefix = "schema") + public void updateVertexLabel(VertexLabel vertexLabel) { + this.updateSchema(vertexLabel); + } + @Watched(prefix = "schema") public VertexLabel getVertexLabel(Id id) { E.checkArgumentNotNull(id, "Vertex label id can't be null"); @@ -217,6 +227,11 @@ public void addEdgeLabel(EdgeLabel edgeLabel) { this.addSchema(edgeLabel); } + @Watched(prefix = "schema") + public void updateEdgeLabel(EdgeLabel edgeLabel) { + this.updateSchema(edgeLabel); + } + @Watched(prefix = "schema") public EdgeLabel getEdgeLabel(Id id) { E.checkArgumentNotNull(id, "Edge label id can't be null"); @@ -239,49 +254,54 @@ public Id removeEdgeLabel(Id id) { } @Watched(prefix = "schema") - public void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel) { + public void addIndexLabel(SchemaLabel baseLabel, IndexLabel indexLabel) { this.addSchema(indexLabel); /* * Update index name in base-label(VL/EL) * TODO: should wrap update base-label and create index in one tx. */ - if (schemaLabel.equals(VertexLabel.OLAP_VL)) { + if (baseLabel.equals(VertexLabel.OLAP_VL)) { return; } // FIXME: move schemaLabel update into updateSchema() lock block instead - synchronized (schemaLabel) { - schemaLabel.addIndexLabel(indexLabel.id()); - this.updateSchema(schemaLabel); + synchronized (baseLabel) { + baseLabel.addIndexLabel(indexLabel.id()); + this.updateSchema(baseLabel); } } + @Watched(prefix = "schema") + public void updateIndexLabel(IndexLabel indexLabel) { + this.updateSchema(indexLabel); + } + @Watched(prefix = "schema") public void removeIndexLabelFromBaseLabel(IndexLabel indexLabel) { HugeType baseType = indexLabel.baseType(); Id baseValue = indexLabel.baseValue(); - SchemaLabel schemaLabel; + SchemaLabel baseLabel; if (baseType == HugeType.VERTEX_LABEL) { - schemaLabel = this.getVertexLabel(baseValue); + baseLabel = this.getVertexLabel(baseValue); } else { assert baseType == HugeType.EDGE_LABEL; - schemaLabel = this.getEdgeLabel(baseValue); + baseLabel = this.getEdgeLabel(baseValue); } - if (schemaLabel == null) { + if (baseLabel == null) { LOG.info("The base label '{}' of index label '{}' " + "may be deleted before", baseValue, indexLabel); return; } - if (schemaLabel.equals(VertexLabel.OLAP_VL)) { + if (baseLabel.equals(VertexLabel.OLAP_VL)) { return; } // FIXME: move schemaLabel update into updateSchema() lock block instead - synchronized (schemaLabel) { - schemaLabel.removeIndexLabel(indexLabel.id()); - this.updateSchema(schemaLabel); + synchronized (baseLabel) { + baseLabel.removeIndexLabel(indexLabel.id()); + this.updateSchema(baseLabel); } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/EdgeLabelBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/EdgeLabelBuilder.java index 338b45fb2e..13b84d2be6 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/EdgeLabelBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/EdgeLabelBuilder.java @@ -262,7 +262,7 @@ public EdgeLabel append() { edgeLabel.nullableKey(propertyKey.id()); } edgeLabel.userdata(this.userdata); - this.graph().addEdgeLabel(edgeLabel); + this.graph().updateEdgeLabel(edgeLabel); return edgeLabel; } @@ -280,7 +280,7 @@ public EdgeLabel eliminate() { Userdata.check(this.userdata, Action.ELIMINATE); edgeLabel.removeUserdata(this.userdata); - this.graph().addEdgeLabel(edgeLabel); + this.graph().updateEdgeLabel(edgeLabel); return edgeLabel; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java index 44b21f4bec..ab00d7f81b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java @@ -287,8 +287,7 @@ public IndexLabel append() { this.checkStableVars(); Userdata.check(this.userdata, Action.APPEND); indexLabel.userdata(this.userdata); - SchemaLabel schemaLabel = indexLabel.baseLabel(); - this.graph().addIndexLabel(schemaLabel, indexLabel); + this.graph().updateIndexLabel(indexLabel); return indexLabel; } @@ -303,8 +302,7 @@ public IndexLabel eliminate() { Userdata.check(this.userdata, Action.ELIMINATE); indexLabel.removeUserdata(this.userdata); - SchemaLabel schemaLabel = indexLabel.baseLabel(); - this.graph().addIndexLabel(schemaLabel, indexLabel); + this.graph().updateIndexLabel(indexLabel); return indexLabel; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/PropertyKeyBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/PropertyKeyBuilder.java index 965b5b6fed..92941cfcc1 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/PropertyKeyBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/PropertyKeyBuilder.java @@ -194,7 +194,7 @@ public PropertyKey append() { Userdata.check(this.userdata, Action.APPEND); propertyKey.userdata(this.userdata); - this.graph().addPropertyKey(propertyKey); + this.graph().updatePropertyKey(propertyKey); return propertyKey; } @@ -209,7 +209,7 @@ public PropertyKey eliminate() { Userdata.check(this.userdata, Action.ELIMINATE); propertyKey.removeUserdata(this.userdata); - this.graph().addPropertyKey(propertyKey); + this.graph().updatePropertyKey(propertyKey); return propertyKey; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/VertexLabelBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/VertexLabelBuilder.java index 75fb3285c1..533245d717 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/VertexLabelBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/VertexLabelBuilder.java @@ -228,7 +228,7 @@ public VertexLabel append() { vertexLabel.nullableKey(propertyKey.id()); } vertexLabel.userdata(this.userdata); - this.graph().addVertexLabel(vertexLabel); + this.graph().updateVertexLabel(vertexLabel); return vertexLabel; } @@ -246,7 +246,7 @@ public VertexLabel eliminate() { Userdata.check(this.userdata, Action.ELIMINATE); vertexLabel.removeUserdata(this.userdata); - this.graph().addVertexLabel(vertexLabel); + this.graph().updateVertexLabel(vertexLabel); return vertexLabel; } From 327acaedc2060f484c3417b9a3790494b796755b Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Mon, 6 Jun 2022 11:57:02 +0800 Subject: [PATCH 6/7] Do schema update in the lock block Change-Id: I2daad8825939b04ff9b1374fff930ac9c4173f72 --- .../cache/CachedSchemaTransaction.java | 6 ++- .../backend/tx/SchemaTransaction.java | 54 +++++++++++-------- 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java index fb3fe79c2b..8c42b28950 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import com.baidu.hugegraph.HugeGraphParams; import com.baidu.hugegraph.backend.id.Id; @@ -210,8 +211,9 @@ private static Id generateId(HugeType type, String name) { } @Override - protected void updateSchema(SchemaElement schema) { - super.updateSchema(schema); + protected void updateSchema(SchemaElement schema, + Consumer updateCallback) { + super.updateSchema(schema, updateCallback); this.updateCache(schema); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java index 7887fd5141..0c79ac714b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.function.Consumer; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator; @@ -132,7 +133,7 @@ public Id addPropertyKey(PropertyKey propertyKey) { @Watched(prefix = "schema") public void updatePropertyKey(PropertyKey propertyKey) { - this.updateSchema(propertyKey); + this.updateSchema(propertyKey, null); } @Watched(prefix = "schema") @@ -192,7 +193,7 @@ public void addVertexLabel(VertexLabel vertexLabel) { @Watched(prefix = "schema") public void updateVertexLabel(VertexLabel vertexLabel) { - this.updateSchema(vertexLabel); + this.updateSchema(vertexLabel, null); } @Watched(prefix = "schema") @@ -229,7 +230,7 @@ public void addEdgeLabel(EdgeLabel edgeLabel) { @Watched(prefix = "schema") public void updateEdgeLabel(EdgeLabel edgeLabel) { - this.updateSchema(edgeLabel); + this.updateSchema(edgeLabel, null); } @Watched(prefix = "schema") @@ -255,26 +256,25 @@ public Id removeEdgeLabel(Id id) { @Watched(prefix = "schema") public void addIndexLabel(SchemaLabel baseLabel, IndexLabel indexLabel) { - this.addSchema(indexLabel); - /* - * Update index name in base-label(VL/EL) + * Create index and update index name in base-label(VL/EL) * TODO: should wrap update base-label and create index in one tx. */ + this.addSchema(indexLabel); + if (baseLabel.equals(VertexLabel.OLAP_VL)) { return; } - // FIXME: move schemaLabel update into updateSchema() lock block instead - synchronized (baseLabel) { + this.updateSchema(baseLabel, schema -> { + // NOTE: Do schema update in the lock block baseLabel.addIndexLabel(indexLabel.id()); - this.updateSchema(baseLabel); - } + }); } @Watched(prefix = "schema") public void updateIndexLabel(IndexLabel indexLabel) { - this.updateSchema(indexLabel); + this.updateSchema(indexLabel, null); } @Watched(prefix = "schema") @@ -298,11 +298,10 @@ public void removeIndexLabelFromBaseLabel(IndexLabel indexLabel) { return; } - // FIXME: move schemaLabel update into updateSchema() lock block instead - synchronized (baseLabel) { + this.updateSchema(baseLabel, schema -> { + // NOTE: Do schema update in the lock block baseLabel.removeIndexLabel(indexLabel.id()); - this.updateSchema(baseLabel); - } + }); } @Watched(prefix = "schema") @@ -388,8 +387,11 @@ public void updateSchemaStatus(SchemaElement schema, SchemaStatus status) { LOG.warn("Can't update schema '{}', it may be deleted", schema); return; } - schema.status(status); - this.updateSchema(schema); + + this.updateSchema(schema, schemaToUpdate -> { + // NOTE: Do schema update in the lock block + schema.status(status); + }); } @Watched(prefix = "schema") @@ -397,22 +399,22 @@ public boolean existsSchemaId(HugeType type, Id id) { return this.getSchema(type, id) != null; } - protected void updateSchema(SchemaElement schema) { + protected void updateSchema(SchemaElement schema, + Consumer updateCallback) { LOG.debug("SchemaTransaction update {} with id '{}'", schema.type(), schema.id()); - this.saveSchema(schema, true); + this.saveSchema(schema, true, updateCallback); } protected void addSchema(SchemaElement schema) { LOG.debug("SchemaTransaction add {} with id '{}'", schema.type(), schema.id()); setCreateTimeIfNeeded(schema); - this.saveSchema(schema, false); + this.saveSchema(schema, false, null); } - private void saveSchema(SchemaElement schema, boolean update) { - BackendEntry entry = this.serialize(schema); - + private void saveSchema(SchemaElement schema, boolean update, + Consumer updateCallback) { // System schema just put into SystemSchemaStore in memory if (schema.longId() < 0L) { this.systemSchemaStore.add(schema); @@ -424,6 +426,12 @@ private void saveSchema(SchemaElement schema, boolean update) { locks.lockWrites(LockUtil.hugeType2Group(schema.type()), schema.id()); + if (updateCallback != null) { + // NOTE: Do schema update in the lock block + updateCallback.accept(schema); + } + BackendEntry entry = this.serialize(schema); + this.beforeWrite(); if (update) { From cdac3ef883e328c2ab256e883ecf2c1cf48f8bf5 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Mon, 6 Jun 2022 13:44:47 +0800 Subject: [PATCH 7/7] fix '~task_status' not indexed in label '~task' Change-Id: I6383df825a003ff0e3f619398ee7c2873194eb56 --- .../hugegraph/backend/tx/SchemaTransaction.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java index 0c79ac714b..77ebba1d8e 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java @@ -415,21 +415,22 @@ protected void addSchema(SchemaElement schema) { private void saveSchema(SchemaElement schema, boolean update, Consumer updateCallback) { - // System schema just put into SystemSchemaStore in memory - if (schema.longId() < 0L) { - this.systemSchemaStore.add(schema); - return; - } - + // Lock for schema update LockUtil.Locks locks = new LockUtil.Locks(this.params().name()); try { - locks.lockWrites(LockUtil.hugeType2Group(schema.type()), - schema.id()); + locks.lockWrites(LockUtil.hugeType2Group(schema.type()), schema.id()); if (updateCallback != null) { // NOTE: Do schema update in the lock block updateCallback.accept(schema); } + + // System schema just put into SystemSchemaStore in memory + if (schema.longId() < 0L) { + this.systemSchemaStore.add(schema); + return; + } + BackendEntry entry = this.serialize(schema); this.beforeWrite();