Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve raft module and test #1721

Merged
merged 14 commits into from
Jan 12, 2022
10 changes: 8 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jobs:
runs-on: ubuntu-20.04
env:
TRAVIS_DIR: hugegraph-dist/src/assembly/travis
REPORT_DIR: target/site/jacoco
BACKEND: ${{ matrix.BACKEND }}
TRIGGER_BRANCH_NAME: ${{ github.ref_name }}
HEAD_BRANCH_NAME: ${{ github.head_ref }}
Expand Down Expand Up @@ -72,9 +73,14 @@ jobs:
- name: Run test
run: |
$TRAVIS_DIR/run-core-test.sh $BACKEND
$TRAVIS_DIR/run-api-test.sh $BACKEND
$TRAVIS_DIR/run-api-test.sh $BACKEND $REPORT_DIR
$TRAVIS_DIR/run-unit-test.sh $BACKEND

- name: Run Raft test
if: ${{ env.BACKEND == 'rocksdb' }}
run: |
$TRAVIS_DIR/run-api-test-for-raft.sh $BACKEND $REPORT_DIR

- name: Run TinkerPop test
if: ${{ env.RELEASE_BRANCH == 'true' }}
run: |
Expand All @@ -83,4 +89,4 @@ jobs:
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
file: target/site/jacoco/jacoco.xml
file: ${{ env.REPORT_DIR }}/*.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public Map<String, Object> update(@Context GraphManager manager,
HugeTask<?> task = scheduler.task(IdGenerator.of(id));
if (!task.completed() && !task.cancelling()) {
scheduler.cancel(task);
if (task.cancelling()) {
if (task.cancelling() || task.cancelled()) {
return task.asMap();
}
}
Expand Down
2 changes: 1 addition & 1 deletion hugegraph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>jraft-core</artifactId>
<version>1.3.5</version>
<version>1.3.9</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,7 @@ private void listenChanges() {
event.checkArgs(String.class, HugeType.class, Id.class);
HugeType type = (HugeType) args[1];
Id id = (Id) args[2];
this.arrayCaches.remove(type, id);

id = generateId(type, id);
Object value = this.idCache.get(id);
if (value != null) {
// Invalidate id cache
this.idCache.invalidate(id);

// Invalidate name cache
SchemaElement schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(),
schema.name());
this.nameCache.invalidate(prefixedName);
}
this.invalidateCache(type, id);
this.resetCachedAll(type);
return true;
} else if (Cache.ACTION_CLEAR.equals(args[0])) {
Expand All @@ -140,21 +127,6 @@ private void listenChanges() {
}
}

private final void resetCachedAll(HugeType type) {
// Set the cache all flag of the schema type to false
this.cachedTypes().put(type, false);
}

private void clearCache(boolean notify) {
this.idCache.clear();
this.nameCache.clear();
this.arrayCaches.clear();

if (notify) {
this.notifyChanges(Cache.ACTION_CLEARED, null, null);
}
}

private void unlistenChanges() {
// Unlisten store event
this.store().provider().unlisten(this.storeEventListener);
Expand All @@ -164,11 +136,16 @@ private void unlistenChanges() {
schemaEventHub.unlisten(Events.CACHE, this.cacheEventListener);
}

private void notifyChanges(String action, HugeType type, Id id) {
private final void notifyChanges(String action, HugeType type, Id id) {
EventHub graphEventHub = this.params().schemaEventHub();
graphEventHub.notify(Events.CACHE, action, type, id);
}

private final void resetCachedAll(HugeType type) {
// Set the cache all flag of the schema type to false
this.cachedTypes().put(type, false);
}

private final void resetCachedAllIfReachedCapacity() {
if (this.idCache.size() >= this.idCache.capacity()) {
LOG.warn("Schema cache reached capacity({}): {}",
Expand All @@ -181,20 +158,17 @@ private final CachedTypes cachedTypes() {
return this.arrayCaches.cachedTypes();
}

private static Id generateId(HugeType type, Id id) {
// NOTE: it's slower performance to use:
// String.format("%x-%s", type.code(), name)
return IdGenerator.of(type.string() + "-" + id.asString());
}
private final void clearCache(boolean notify) {
this.idCache.clear();
this.nameCache.clear();
this.arrayCaches.clear();

private static Id generateId(HugeType type, String name) {
return IdGenerator.of(type.string() + "-" + name);
if (notify) {
this.notifyChanges(Cache.ACTION_CLEARED, null, null);
}
}

@Override
protected void addSchema(SchemaElement schema) {
super.addSchema(schema);

private final void updateCache(SchemaElement schema) {
this.resetCachedAllIfReachedCapacity();

// update id cache
Expand All @@ -207,6 +181,39 @@ protected void addSchema(SchemaElement schema) {

// update optimized array cache
this.arrayCaches.updateIfNeeded(schema);
}

private final void invalidateCache(HugeType type, Id id) {
// remove from id cache and name cache
Id prefixedId = generateId(type, id);
Object value = this.idCache.get(prefixedId);
if (value != null) {
this.idCache.invalidate(prefixedId);

SchemaElement schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.invalidate(prefixedName);
}

// remove from optimized array cache
this.arrayCaches.remove(type, id);
}

private static Id generateId(HugeType type, Id id) {
// NOTE: it's slower performance to use:
// String.format("%x-%s", type.code(), name)
return IdGenerator.of(type.string() + "-" + id.asString());
}

private static Id generateId(HugeType type, String name) {
return IdGenerator.of(type.string() + "-" + name);
}

@Override
protected void addSchema(SchemaElement schema) {
super.addSchema(schema);

this.updateCache(schema);

this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
}
Expand All @@ -227,19 +234,15 @@ protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
if (value == null) {
value = super.getSchema(type, id);
if (value != null) {
this.resetCachedAllIfReachedCapacity();

this.idCache.update(prefixedId, value);

SchemaElement schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
// update id cache, name cache and optimized array cache
this.updateCache(schema);
}
} else {
// update optimized array cache for the result from id cache
this.arrayCaches.updateIfNeeded((SchemaElement) value);
}

// update optimized array cache
this.arrayCaches.updateIfNeeded((SchemaElement) value);

return (T) value;
}

Expand All @@ -252,13 +255,8 @@ protected <T extends SchemaElement> T getSchema(HugeType type,
if (value == null) {
value = super.getSchema(type, name);
if (value != null) {
this.resetCachedAllIfReachedCapacity();

this.nameCache.update(prefixedName, value);

SchemaElement schema = (SchemaElement) value;
Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);
this.updateCache(schema);
}
}
return (T) value;
Expand All @@ -268,18 +266,7 @@ protected <T extends SchemaElement> T getSchema(HugeType type,
protected void removeSchema(SchemaElement schema) {
super.removeSchema(schema);

Id prefixedId = generateId(schema.type(), schema.id());
Object value = this.idCache.get(prefixedId);
if (value != null) {
this.idCache.invalidate(prefixedId);

schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.invalidate(prefixedName);
}

// remove from optimized array cache
this.arrayCaches.remove(schema.type(), schema.id());
this.invalidateCache(schema.type(), schema.id());

this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
}
Expand All @@ -299,16 +286,13 @@ protected <T extends SchemaElement> List<T> getAllSchema(HugeType type) {
});
return results;
} else {
this.cachedTypes().remove(type);
List<T> results = super.getAllSchema(type);
long free = this.idCache.capacity() - this.idCache.size();
if (results.size() <= free) {
// Update cache
for (T schema : results) {
Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);

Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
this.updateCache(schema);
}
this.cachedTypes().putIfAbsent(type, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ public class RaftBackendStore implements BackendStore {
private final BackendStore store;
private final RaftSharedContext context;
private final ThreadLocal<MutationBatch> mutationBatch;
private final boolean isSafeRead;

public RaftBackendStore(BackendStore store, RaftSharedContext context) {
this.store = store;
this.context = context;
this.mutationBatch = new ThreadLocal<>();
this.isSafeRead = this.context.isSafeRead();
}

public BackendStore originStore() {
Expand Down Expand Up @@ -143,7 +145,8 @@ public Iterator<BackendEntry> query(Query query) {

@Override
public Number queryNumber(Query query) {
return (Number) this.queryByRaft(query, o -> this.store.queryNumber(query));
return (Number)
this.queryByRaft(query, o -> this.store.queryNumber(query));
}

@Override
Expand Down Expand Up @@ -186,7 +189,10 @@ public void increaseCounter(HugeType type, long increment) {

@Override
public long getCounter(HugeType type) {
return (Long) this.queryByRaft(type, o -> this.store.getCounter(type));
Object counter = this.queryByRaft(type, true,
o -> this.store.getCounter(type));
assert counter instanceof Long;
return (Long) counter;
}

private Object submitAndWait(StoreAction action, byte[] data) {
Expand All @@ -200,7 +206,12 @@ private Object submitAndWait(StoreCommand command) {
}

private Object queryByRaft(Object query, Function<Object, Object> func) {
if (!this.context.isSafeRead()) {
return this.queryByRaft(query, this.isSafeRead, func);
}

private Object queryByRaft(Object query, boolean safeRead,
Function<Object, Object> func) {
if (!safeRead) {
return func.apply(query);
}

Expand All @@ -212,17 +223,17 @@ public void run(Status status, long index, byte[] reqCtx) {
future.complete(status, () -> func.apply(query));
} else {
future.failure(status, new BackendException(
"Failed to execute query '%s' with read-index: %s",
query, status));
"Failed to do raft read-index: %s",
status));
}
}
};
this.node().node().readIndex(BytesUtil.EMPTY_BYTES, readIndexClosure);
try {
return future.waitFinished();
} catch (Throwable e) {
LOG.warn("Failed to execute query '{}' with read-index: {}",
query, future.status());
LOG.warn("Failed to execute query '{}': {}",
query, future.status(), e);
throw new BackendException("Failed to execute query: %s", e, query);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void init() {
for (RaftBackendStore store : this.stores()) {
store.init();
}
this.notifyAndWaitEvent(Events.STORE_INITED);
this.notifyAndWaitEvent(Events.STORE_INIT);

LOG.debug("Graph '{}' store has been initialized", this.graph());
}
Expand Down Expand Up @@ -214,7 +214,8 @@ public void initSystemInfo(HugeGraph graph) {

@Override
public void createSnapshot() {
StoreCommand command = new StoreCommand(StoreType.ALL,
// TODO: snapshot for StoreType.ALL instead of StoreType.GRAPH
StoreCommand command = new StoreCommand(StoreType.GRAPH,
StoreAction.SNAPSHOT, null);
RaftStoreClosure closure = new RaftStoreClosure(command);
this.context.node().submitAndWait(command, closure);
Expand Down
Loading