Skip to content

Commit

Permalink
improve raft module and test (#1721)
Browse files Browse the repository at this point in the history
* improve raft moddule
* add raft-server test
* set safe_read=true and use_snapshot=false
* fix leader not wait for apply-task
* add auth support to raft-tools.sh
* don't clear non-shared-storage backend
* set task timer interval from 3s to 1s
* improve CachedSchemaTransaction
* improve update schema status
* remove truncate from project test
* fix codecov: api test report not been uploaded
* improve task cancel() test with cancelled status

Change-Id: I016d3fcc4ab50614afdf452e7c9691ee3cc3c70b
  • Loading branch information
javeme committed Jan 12, 2022
1 parent df0a7b5 commit 35cffbc
Show file tree
Hide file tree
Showing 32 changed files with 849 additions and 249 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jobs:
runs-on: ubuntu-20.04
env:
TRAVIS_DIR: hugegraph-dist/src/assembly/travis
REPORT_DIR: target/site/jacoco
BACKEND: ${{ matrix.BACKEND }}
TRIGGER_BRANCH_NAME: ${{ github.ref_name }}
HEAD_BRANCH_NAME: ${{ github.head_ref }}
Expand Down Expand Up @@ -72,9 +73,14 @@ jobs:
- name: Run test
run: |
$TRAVIS_DIR/run-core-test.sh $BACKEND
$TRAVIS_DIR/run-api-test.sh $BACKEND
$TRAVIS_DIR/run-api-test.sh $BACKEND $REPORT_DIR
$TRAVIS_DIR/run-unit-test.sh $BACKEND
- name: Run Raft test
if: ${{ env.BACKEND == 'rocksdb' }}
run: |
$TRAVIS_DIR/run-api-test-for-raft.sh $BACKEND $REPORT_DIR
- name: Run TinkerPop test
if: ${{ env.RELEASE_BRANCH == 'true' }}
run: |
Expand All @@ -83,4 +89,4 @@ jobs:
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
file: target/site/jacoco/jacoco.xml
file: ${{ env.REPORT_DIR }}/*.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public Map<String, Object> update(@Context GraphManager manager,
HugeTask<?> task = scheduler.task(IdGenerator.of(id));
if (!task.completed() && !task.cancelling()) {
scheduler.cancel(task);
if (task.cancelling()) {
if (task.cancelling() || task.cancelled()) {
return task.asMap();
}
}
Expand Down
2 changes: 1 addition & 1 deletion hugegraph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>jraft-core</artifactId>
<version>1.3.5</version>
<version>1.3.9</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,7 @@ private void listenChanges() {
event.checkArgs(String.class, HugeType.class, Id.class);
HugeType type = (HugeType) args[1];
Id id = (Id) args[2];
this.arrayCaches.remove(type, id);

id = generateId(type, id);
Object value = this.idCache.get(id);
if (value != null) {
// Invalidate id cache
this.idCache.invalidate(id);

// Invalidate name cache
SchemaElement schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(),
schema.name());
this.nameCache.invalidate(prefixedName);
}
this.invalidateCache(type, id);
this.resetCachedAll(type);
return true;
} else if (Cache.ACTION_CLEAR.equals(args[0])) {
Expand All @@ -140,21 +127,6 @@ private void listenChanges() {
}
}

private final void resetCachedAll(HugeType type) {
// Set the cache all flag of the schema type to false
this.cachedTypes().put(type, false);
}

private void clearCache(boolean notify) {
this.idCache.clear();
this.nameCache.clear();
this.arrayCaches.clear();

if (notify) {
this.notifyChanges(Cache.ACTION_CLEARED, null, null);
}
}

private void unlistenChanges() {
// Unlisten store event
this.store().provider().unlisten(this.storeEventListener);
Expand All @@ -164,11 +136,16 @@ private void unlistenChanges() {
schemaEventHub.unlisten(Events.CACHE, this.cacheEventListener);
}

private void notifyChanges(String action, HugeType type, Id id) {
private final void notifyChanges(String action, HugeType type, Id id) {
EventHub graphEventHub = this.params().schemaEventHub();
graphEventHub.notify(Events.CACHE, action, type, id);
}

private final void resetCachedAll(HugeType type) {
// Set the cache all flag of the schema type to false
this.cachedTypes().put(type, false);
}

private final void resetCachedAllIfReachedCapacity() {
if (this.idCache.size() >= this.idCache.capacity()) {
LOG.warn("Schema cache reached capacity({}): {}",
Expand All @@ -181,20 +158,17 @@ private final CachedTypes cachedTypes() {
return this.arrayCaches.cachedTypes();
}

private static Id generateId(HugeType type, Id id) {
// NOTE: it's slower performance to use:
// String.format("%x-%s", type.code(), name)
return IdGenerator.of(type.string() + "-" + id.asString());
}
private final void clearCache(boolean notify) {
this.idCache.clear();
this.nameCache.clear();
this.arrayCaches.clear();

private static Id generateId(HugeType type, String name) {
return IdGenerator.of(type.string() + "-" + name);
if (notify) {
this.notifyChanges(Cache.ACTION_CLEARED, null, null);
}
}

@Override
protected void addSchema(SchemaElement schema) {
super.addSchema(schema);

private final void updateCache(SchemaElement schema) {
this.resetCachedAllIfReachedCapacity();

// update id cache
Expand All @@ -207,6 +181,39 @@ protected void addSchema(SchemaElement schema) {

// update optimized array cache
this.arrayCaches.updateIfNeeded(schema);
}

private final void invalidateCache(HugeType type, Id id) {
// remove from id cache and name cache
Id prefixedId = generateId(type, id);
Object value = this.idCache.get(prefixedId);
if (value != null) {
this.idCache.invalidate(prefixedId);

SchemaElement schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.invalidate(prefixedName);
}

// remove from optimized array cache
this.arrayCaches.remove(type, id);
}

private static Id generateId(HugeType type, Id id) {
// NOTE: it's slower performance to use:
// String.format("%x-%s", type.code(), name)
return IdGenerator.of(type.string() + "-" + id.asString());
}

private static Id generateId(HugeType type, String name) {
return IdGenerator.of(type.string() + "-" + name);
}

@Override
protected void addSchema(SchemaElement schema) {
super.addSchema(schema);

this.updateCache(schema);

this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
}
Expand All @@ -227,19 +234,15 @@ protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
if (value == null) {
value = super.getSchema(type, id);
if (value != null) {
this.resetCachedAllIfReachedCapacity();

this.idCache.update(prefixedId, value);

SchemaElement schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
// update id cache, name cache and optimized array cache
this.updateCache(schema);
}
} else {
// update optimized array cache for the result from id cache
this.arrayCaches.updateIfNeeded((SchemaElement) value);
}

// update optimized array cache
this.arrayCaches.updateIfNeeded((SchemaElement) value);

return (T) value;
}

Expand All @@ -252,13 +255,8 @@ protected <T extends SchemaElement> T getSchema(HugeType type,
if (value == null) {
value = super.getSchema(type, name);
if (value != null) {
this.resetCachedAllIfReachedCapacity();

this.nameCache.update(prefixedName, value);

SchemaElement schema = (SchemaElement) value;
Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);
this.updateCache(schema);
}
}
return (T) value;
Expand All @@ -268,18 +266,7 @@ protected <T extends SchemaElement> T getSchema(HugeType type,
protected void removeSchema(SchemaElement schema) {
super.removeSchema(schema);

Id prefixedId = generateId(schema.type(), schema.id());
Object value = this.idCache.get(prefixedId);
if (value != null) {
this.idCache.invalidate(prefixedId);

schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.invalidate(prefixedName);
}

// remove from optimized array cache
this.arrayCaches.remove(schema.type(), schema.id());
this.invalidateCache(schema.type(), schema.id());

this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
}
Expand All @@ -299,16 +286,13 @@ protected <T extends SchemaElement> List<T> getAllSchema(HugeType type) {
});
return results;
} else {
this.cachedTypes().remove(type);
List<T> results = super.getAllSchema(type);
long free = this.idCache.capacity() - this.idCache.size();
if (results.size() <= free) {
// Update cache
for (T schema : results) {
Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);

Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
this.updateCache(schema);
}
this.cachedTypes().putIfAbsent(type, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ public class RaftBackendStore implements BackendStore {
private final BackendStore store;
private final RaftSharedContext context;
private final ThreadLocal<MutationBatch> mutationBatch;
private final boolean isSafeRead;

public RaftBackendStore(BackendStore store, RaftSharedContext context) {
this.store = store;
this.context = context;
this.mutationBatch = new ThreadLocal<>();
this.isSafeRead = this.context.isSafeRead();
}

public BackendStore originStore() {
Expand Down Expand Up @@ -143,7 +145,8 @@ public Iterator<BackendEntry> query(Query query) {

@Override
public Number queryNumber(Query query) {
return (Number) this.queryByRaft(query, o -> this.store.queryNumber(query));
return (Number)
this.queryByRaft(query, o -> this.store.queryNumber(query));
}

@Override
Expand Down Expand Up @@ -186,7 +189,10 @@ public void increaseCounter(HugeType type, long increment) {

@Override
public long getCounter(HugeType type) {
return (Long) this.queryByRaft(type, o -> this.store.getCounter(type));
Object counter = this.queryByRaft(type, true,
o -> this.store.getCounter(type));
assert counter instanceof Long;
return (Long) counter;
}

private Object submitAndWait(StoreAction action, byte[] data) {
Expand All @@ -200,7 +206,12 @@ private Object submitAndWait(StoreCommand command) {
}

private Object queryByRaft(Object query, Function<Object, Object> func) {
if (!this.context.isSafeRead()) {
return this.queryByRaft(query, this.isSafeRead, func);
}

private Object queryByRaft(Object query, boolean safeRead,
Function<Object, Object> func) {
if (!safeRead) {
return func.apply(query);
}

Expand All @@ -212,17 +223,17 @@ public void run(Status status, long index, byte[] reqCtx) {
future.complete(status, () -> func.apply(query));
} else {
future.failure(status, new BackendException(
"Failed to execute query '%s' with read-index: %s",
query, status));
"Failed to do raft read-index: %s",
status));
}
}
};
this.node().node().readIndex(BytesUtil.EMPTY_BYTES, readIndexClosure);
try {
return future.waitFinished();
} catch (Throwable e) {
LOG.warn("Failed to execute query '{}' with read-index: {}",
query, future.status());
LOG.warn("Failed to execute query '{}': {}",
query, future.status(), e);
throw new BackendException("Failed to execute query: %s", e, query);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void init() {
for (RaftBackendStore store : this.stores()) {
store.init();
}
this.notifyAndWaitEvent(Events.STORE_INITED);
this.notifyAndWaitEvent(Events.STORE_INIT);

LOG.debug("Graph '{}' store has been initialized", this.graph());
}
Expand Down Expand Up @@ -214,7 +214,8 @@ public void initSystemInfo(HugeGraph graph) {

@Override
public void createSnapshot() {
StoreCommand command = new StoreCommand(StoreType.ALL,
// TODO: snapshot for StoreType.ALL instead of StoreType.GRAPH
StoreCommand command = new StoreCommand(StoreType.GRAPH,
StoreAction.SNAPSHOT, null);
RaftStoreClosure closure = new RaftStoreClosure(command);
this.context.node().submitAndWait(command, closure);
Expand Down
Loading

0 comments on commit 35cffbc

Please sign in to comment.