Skip to content

Commit

Permalink
speed up tinkerpop test
Browse files Browse the repository at this point in the history
how to:
1.speed up rocksdb backend by truncating tables
2.don't need to clear if the database is empty
3.don't need to clear variables if it's empty
4.don't need to commit hbase each len-prefix for delete index-label

potential problems:
1.mysql may block if create task vertex-label when truncating table
2.rocksdb may miss CF if create task vertex-label when truncating table
3.hbase may block if truncate hbase with version<2.0

improve #14

Change-Id: I4b2393aea8b0fc63c1886e984a576a1f5808b25c
  • Loading branch information
javeme authored and zhoney committed Nov 21, 2018
1 parent 255df7e commit 5ef25cf
Show file tree
Hide file tree
Showing 39 changed files with 762 additions and 309 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.SecurityContext;

import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.core.GraphManager;
import com.baidu.hugegraph.schema.SchemaManager;
import com.baidu.hugegraph.server.RestServer;
import com.baidu.hugegraph.type.define.GraphMode;
import com.baidu.hugegraph.util.E;
Expand Down Expand Up @@ -129,27 +126,7 @@ public void clear(@Context GraphManager manager,
throw new IllegalArgumentException(String.format(
"Please take the message: %s", CONFIRM_CLEAR));
}

// Clear vertex and edge
commit(g, () -> {
g.traversal().E().toStream().forEach(Edge::remove);
g.traversal().V().toStream().forEach(Vertex::remove);
});

// Schema operation will auto commit
SchemaManager schema = g.schema();
schema.getIndexLabels().forEach(elem -> {
schema.indexLabel(elem.name()).remove();
});
schema.getEdgeLabels().forEach(elem -> {
schema.edgeLabel(elem.name()).remove();
});
schema.getVertexLabels().forEach(elem -> {
schema.vertexLabel(elem.name()).remove();
});
schema.getPropertyKeys().forEach(elem -> {
schema.propertyKey(elem.name()).remove();
});
g.truncateBackend();
}

@PUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ public void clearBackend() {
this.hugegraph.clearBackend();
}

@Override
public void truncateBackend() {
this.verifyPermission(ROLE_ADMIN);
this.hugegraph.truncateBackend();
}

private void verifyPermission() {
/*
* The owner role should match the graph name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ public void open() {
this.session = cluster().connect(keyspace());
}

public boolean opened() {
return this.session != null;
}

@Override
public boolean closed() {
if (this.session == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package com.baidu.hugegraph.backend.store.cassandra;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -84,7 +87,7 @@ public CassandraStore(final BackendStoreProvider provider,

private void registerMetaHandlers() {
this.registerMetaHandler("metrics", (session, meta, args) -> {
CassandraMetrics metrics = new CassandraMetrics(cluster(), conf);
CassandraMetrics metrics = new CassandraMetrics(cluster(), this.conf);
return metrics.getMetrics();
});
}
Expand Down Expand Up @@ -243,12 +246,25 @@ public BackendFeatures features() {
@Override
public void init() {
this.checkClusterConnected();
this.initKeyspace();

if (this.sessions.session().opened()) {
// Session has ever been opened.
LOG.warn("Session has ever been opened(exist keyspace '{}' before)",
this.keyspace);
} else {
// Create keyspace if needed
if (!this.existsKeyspace()) {
this.initKeyspace();
}
// Open session explicitly to get the exception when it fails
this.sessions.session().open();
}

// Create tables
this.checkSessionConnected();
this.initTables();

LOG.info("Store initialized: {}", this.store);
LOG.debug("Store initialized: {}", this.store);
}

@Override
Expand All @@ -261,7 +277,15 @@ public void clear() {
this.clearKeyspace();
}

LOG.info("Store cleared: {}", this.store);
LOG.debug("Store cleared: {}", this.store);
}

@Override
public void truncate() {
this.checkSessionConnected();

this.truncateTables();
LOG.debug("Store truncated: {}", this.store);
}

@Override
Expand Down Expand Up @@ -359,7 +383,8 @@ protected void initKeyspace() {
replication.putIfAbsent("replication_factor", factor);

Statement stmt = SchemaBuilder.createKeyspace(this.keyspace)
.ifNotExists().with().replication(replication);
.ifNotExists().with()
.replication(replication);

// Create keyspace with non-keyspace-session
LOG.debug("Create keyspace: {}", stmt);
Expand All @@ -371,7 +396,6 @@ protected void initKeyspace() {
session.close();
}
}
this.sessions.session().open();
}

protected void clearKeyspace() {
Expand All @@ -395,18 +419,29 @@ protected boolean existsKeyspace() {

protected void initTables() {
CassandraSessionPool.Session session = this.sessions.session();
for (CassandraTable table : this.tables.values()) {
for (CassandraTable table : this.tables()) {
table.init(session);
}
}

protected void clearTables() {
CassandraSessionPool.Session session = this.sessions.session();
for (CassandraTable table : this.tables.values()) {
for (CassandraTable table : this.tables()) {
table.clear(session);
}
}

protected void truncateTables() {
CassandraSessionPool.Session session = this.sessions.session();
for (CassandraTable table : this.tables()) {
table.truncate(session);
}
}

protected Collection<CassandraTable> tables() {
return this.tables.values();
}

@Override
protected final CassandraTable table(HugeType type) {
assert type != null;
Expand Down Expand Up @@ -472,19 +507,10 @@ public CassandraSchemaStore(BackendStoreProvider provider,
}

@Override
protected void initTables() {
super.initTables();

CassandraSessionPool.Session session = super.sessions.session();
this.counters.init(session);
}

@Override
protected void clearTables() {
super.clearTables();

CassandraSessionPool.Session session = super.sessions.session();
this.counters.clear(session);
protected Collection<CassandraTable> tables() {
List<CassandraTable> tables = new ArrayList<>(super.tables());
tables.add(this.counters);
return tables;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ protected List<Select> query2Select(String table, Query query) {

// NOTE: Cassandra does not support query.offset()
if (query.offset() != 0) {
LOG.warn("Query offset is not supported currently " +
"on Cassandra store, it will be ignored");
LOG.debug("Query offset is not supported on Cassandra store " +
"currently, it will be replaced by [0, offset + limit)");
}

// Set limit
if (query.limit() != Query.NO_LIMIT) {
long total = query.limit() + query.offset();
long total = query.total();
String page = query.page();
if (page == null) {
select.limit((int) total);
Expand Down Expand Up @@ -550,11 +550,16 @@ protected void createTable(CassandraSessionPool.Session session,
session.execute(table);
}

public void dropTable(CassandraSessionPool.Session session) {
protected void dropTable(CassandraSessionPool.Session session) {
LOG.debug("Drop table: {}", this.table());
session.execute(SchemaBuilder.dropTable(this.table()).ifExists());
}

protected void truncateTable(CassandraSessionPool.Session session) {
LOG.debug("Truncate table: {}", this.table());
session.execute(QueryBuilder.truncate(this.table()));
}

protected void createIndex(CassandraSessionPool.Session session,
String indexLabel, HugeKeys column) {
String indexName = joinTableName(this.table(), indexLabel);
Expand All @@ -570,4 +575,8 @@ protected void createIndex(CassandraSessionPool.Session session,
public void clear(CassandraSessionPool.Session session) {
this.dropTable(session);
}

public void truncate(CassandraSessionPool.Session session) {
this.truncateTable(session);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public interface GremlinGraph extends Graph {
public SchemaManager schema();

public String backend();

public void initBackend();
public void clearBackend();
public void truncateBackend();
}
44 changes: 38 additions & 6 deletions hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
Expand Down Expand Up @@ -71,7 +72,6 @@
import com.baidu.hugegraph.util.LockUtil;
import com.baidu.hugegraph.util.Log;
import com.baidu.hugegraph.variables.HugeVariables;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;

/**
Expand All @@ -97,9 +97,10 @@ public class HugeGraph implements GremlinGraph {
LockUtil.init();
}

private volatile boolean closed;
private volatile GraphMode mode;

private final String name;
private boolean closed;
private GraphMode mode;

private final HugeConfig configuration;

Expand Down Expand Up @@ -192,7 +193,7 @@ public void initBackend() {
this.loadGraphStore().open(this.configuration);
try {
this.storeProvider.init();
new BackendStoreInfo(this).init();
this.initBackendStoreInfo();
} finally {
this.loadGraphStore().close();
this.loadSystemStore().close();
Expand All @@ -202,6 +203,8 @@ public void initBackend() {

@Override
public void clearBackend() {
this.waitUntilAllTasksCompleted();

this.loadSchemaStore().open(this.configuration);
this.loadSystemStore().open(this.configuration);
this.loadGraphStore().open(this.configuration);
Expand All @@ -214,7 +217,29 @@ public void clearBackend() {
}
}

@Override
public void truncateBackend() {
this.waitUntilAllTasksCompleted();

this.storeProvider.truncate();
this.initBackendStoreInfo();
}

private void waitUntilAllTasksCompleted() {
long timeout = this.configuration.get(CoreOptions.TASK_WAIT_TIMEOUT);
try {
this.taskScheduler().waitUntilAllTasksCompleted(timeout);
} catch (TimeoutException e) {
throw new HugeException("Failed to wait all tasks to complete", e);
}
}

private void initBackendStoreInfo() {
new BackendStoreInfo(this).init();
}

private SchemaTransaction openSchemaTransaction() throws HugeException {
this.checkGraphNotClosed();
try {
return new CachedSchemaTransaction(this, this.loadSchemaStore());
} catch (BackendException e) {
Expand All @@ -225,6 +250,7 @@ private SchemaTransaction openSchemaTransaction() throws HugeException {
}

private GraphTransaction openGraphTransaction() throws HugeException {
this.checkGraphNotClosed();
try {
return new CachedGraphTransaction(this, this.loadGraphStore());
} catch (BackendException e) {
Expand All @@ -241,12 +267,16 @@ private BackendStoreProvider loadStoreProvider() {
return BackendProviderFactory.open(backend, this.name);
}

private BackendStore loadSchemaStore() {
private void checkGraphNotClosed() {
E.checkState(!this.closed, "Graph '%s' has been closed", this);
}

public BackendStore loadSchemaStore() {
String name = this.configuration.get(CoreOptions.STORE_SCHEMA);
return this.storeProvider.loadSchemaStore(name);
}

private BackendStore loadGraphStore() {
public BackendStore loadGraphStore() {
String graph = this.configuration.get(CoreOptions.STORE_GRAPH);
return this.storeProvider.loadGraphStore(graph);
}
Expand All @@ -257,6 +287,7 @@ public BackendStore loadSystemStore() {
}

public SchemaTransaction schemaTransaction() {
this.checkGraphNotClosed();
/*
* NOTE: each schema operation will be auto committed,
* Don't need to open tinkerpop tx by readWrite() and commit manually.
Expand All @@ -265,6 +296,7 @@ public SchemaTransaction schemaTransaction() {
}

public GraphTransaction graphTransaction() {
this.checkGraphNotClosed();
/*
* NOTE: graph operations must be committed manually,
* Maybe users need to auto open tinkerpop tx by readWrite().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,8 @@ public synchronized void increaseCounter(HugeType type, long increment) {
AtomicLong value = new AtomicLong(oldValue + increment);
this.counters.put(type, value);
}

public void reset() {
this.counters.clear();
}
}
Loading

0 comments on commit 5ef25cf

Please sign in to comment.