Skip to content

Commit

Permalink
tiny improve
Browse files Browse the repository at this point in the history
Change-Id: I845cc78b20b94eee6bd2aff43038dd22c1a7834a
  • Loading branch information
Linary committed Mar 22, 2021
1 parent dced7f1 commit 1c8460b
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public void truncateBackend() {
* When restarting, load the snapshot first and then read backend,
* will not encounter such an intermediate state.
*/
this.storeProvider.writeSnapshot();
this.storeProvider.createSnapshot();
} finally {
LockUtil.unlock(this.name, LockUtil.GRAPH_LOCK);
}
Expand All @@ -371,7 +371,7 @@ public void truncateBackend() {
public void createSnapshot() {
LockUtil.lock(this.name, LockUtil.GRAPH_LOCK);
try {
this.storeProvider.writeSnapshot();
this.storeProvider.createSnapshot();
} finally {
LockUtil.unlock(this.name, LockUtil.GRAPH_LOCK);
}
Expand All @@ -382,7 +382,7 @@ public void createSnapshot() {
public void resumeSnapshot() {
LockUtil.lock(this.name, LockUtil.GRAPH_LOCK);
try {
this.storeProvider.readSnapshot();
this.storeProvider.resumeSnapshot();
} finally {
LockUtil.unlock(this.name, LockUtil.GRAPH_LOCK);
}
Expand Down Expand Up @@ -1445,7 +1445,7 @@ public void invalid2(HugeType type, Object[] ids) {

@Override
public void clear(HugeType type) {
this.hub.notify(Events.CACHE, Cache.ACTION_CLEAR, type, null);
this.hub.notify(Events.CACHE, Cache.ACTION_CLEAR, type);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,23 @@ public void initSystemInfo(HugeGraph graph) {
}

@Override
public void writeSnapshot() {
public void createSnapshot() {
long begin = System.currentTimeMillis();
String snapshotPrefix = StoreSnapshotFile.SNAPSHOT_DIR;
for (BackendStore store : this.stores.values()) {
store.writeSnapshot(snapshotPrefix);
store.createSnapshot(snapshotPrefix);
}

LOG.info("Create snapshot cost {}ms",
System.currentTimeMillis() - begin);
}

@Override
public void readSnapshot() {
public void resumeSnapshot() {
String snapshotPrefix = StoreSnapshotFile.SNAPSHOT_DIR;
Set<String> snapshotDirs = new HashSet<>();
for (BackendStore store : this.stores.values()) {
snapshotDirs.addAll(store.readSnapshot(snapshotPrefix));
snapshotDirs.addAll(store.resumeSnapshot(snapshotPrefix));
}
// Delete all snapshot parent directories
for (String snapshotDir : snapshotDirs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ public default void setCounterLowest(HugeType type, long lowest) {
// Get current counter for a specific type
public long getCounter(HugeType type);

public default Set<String> writeSnapshot(String snapshotDir) {
public default Set<String> createSnapshot(String snapshotDir) {
throw new UnsupportedOperationException("writeSnapshot");
}

public default Set<String> readSnapshot(String snapshotDir) {
public default Set<String> resumeSnapshot(String snapshotDir) {
throw new UnsupportedOperationException("readSnapshot");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public interface BackendStoreProvider {

public void initSystemInfo(HugeGraph graph);

public void writeSnapshot();
public void createSnapshot();

public void readSnapshot();
public void resumeSnapshot();

public void listen(EventListener listener);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void initSystemInfo(HugeGraph graph) {
}

@Override
public void writeSnapshot() {
public void createSnapshot() {
StoreCommand command = new StoreCommand(StoreType.ALL,
StoreAction.SNAPSHOT, null);
StoreClosure closure = new StoreClosure(command);
Expand All @@ -208,7 +208,7 @@ public void writeSnapshot() {
}

@Override
public void readSnapshot() {
public void resumeSnapshot() {
// jraft doesn't expose API to load snapshot
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -43,6 +44,7 @@
import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.backend.cache.Cache;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType;
Expand Down Expand Up @@ -259,7 +261,7 @@ public void clearCache() {
this.notifyCache(Cache.ACTION_CLEAR, HugeType.VERTEX, null);
}

protected void notifyCache(String action, HugeType type, Object id) {
protected void notifyCache(String action, HugeType type, List<Id> ids) {
EventHub eventHub;
if (type.isGraph()) {
eventHub = this.params.graphEventHub();
Expand All @@ -270,7 +272,15 @@ protected void notifyCache(String action, HugeType type, Object id) {
}
try {
// How to avoid update cache from server info
eventHub.notify(Events.CACHE, action, type, id);
if (ids == null) {
eventHub.call(Events.CACHE, action, type);
} else {
if (ids.size() == 1) {
eventHub.call(Events.CACHE, action, type, ids.get(0));
} else {
eventHub.call(Events.CACHE, action, type, ids.toArray());
}
}
} catch (RejectedExecutionException e) {
LOG.warn("Can't update cache due to EventHub is too busy");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,15 @@ public void save(SnapshotWriter writer, Closure done,
// Write snapshot to real directory
Set<String> snapshotDirs = this.doSnapshotSave();
executor.execute(() -> {
long begin = System.currentTimeMillis();
Set<String> tarSnapshotFiles =
this.compressSnapshotDir(snapshotDirs, done);
String jraftSnapshotPath =
this.writeManifest(writer, tarSnapshotFiles, done);
this.deleteSnapshotDir(snapshotDirs, done);
this.compressJraftSnapshotDir(writer, jraftSnapshotPath, done);
LOG.info("Compress snapshot cost {}ms",
System.currentTimeMillis() - begin);
});
} catch (Throwable t) {
LOG.error("Failed to save snapshot", t);
Expand Down Expand Up @@ -111,15 +114,15 @@ public boolean load(SnapshotReader reader) {
private Set<String> doSnapshotSave() {
Set<String> snapshotDirs = new HashSet<>();
for (RaftBackendStore store : this.stores) {
snapshotDirs.addAll(store.originStore().writeSnapshot(SNAPSHOT_DIR));
snapshotDirs.addAll(store.originStore().createSnapshot(SNAPSHOT_DIR));
}
LOG.info("All snapshot dirs: {}", snapshotDirs);
return snapshotDirs;
}

private void doSnapshotLoad() {
for (RaftBackendStore store : this.stores) {
store.originStore().readSnapshot(SNAPSHOT_DIR);
store.originStore().resumeSnapshot(SNAPSHOT_DIR);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.serializer.BytesBuffer;
import com.baidu.hugegraph.backend.store.BackendAction;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.BackendMutation;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.raft.RaftBackendStore.IncrCounter;
Expand Down Expand Up @@ -86,21 +85,13 @@ private void updateCacheIfNeeded(BackendMutation mutation,
return;
}
for (HugeType type : mutation.types()) {
if (type.isSchema()) {
java.util.Iterator<BackendAction> it = mutation.mutation(type);
while (it.hasNext()) {
BackendEntry entry = it.next().entry();
this.context.notifyCache(Cache.ACTION_INVALID, type,
entry.originId());
}
} else if (type.isGraph()) {
List<Id> ids = new ArrayList<>((int) Query.COMMIT_BATCH);
List<Id> ids = new ArrayList<>((int) Query.COMMIT_BATCH);
if (type.isSchema() || type.isGraph()) {
java.util.Iterator<BackendAction> it = mutation.mutation(type);
while (it.hasNext()) {
ids.add(it.next().entry().originId());
}
this.context.notifyCache(Cache.ACTION_INVALID, type,
ids.toArray());
this.context.notifyCache(Cache.ACTION_INVALID, type, ids);
} else {
// Ignore other types due to not cached
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public class HugeSecurityManager extends SecurityManager {
);

private static final Set<String> SOFA_RPC_CLASSES = ImmutableSet.of(
"com.alipay.sofa.rpc.tracer.sofatracer.RpcSofaTracer"
"com.alipay.sofa.rpc.tracer.sofatracer.RpcSofaTracer",
"com.alipay.sofa.rpc.client.AbstractCluster"
);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ protected Session session(HugeType tableType) {
}

@Override
public Set<String> writeSnapshot(String snapshotPrefix) {
public Set<String> createSnapshot(String snapshotPrefix) {
Lock readLock = this.storeLock.readLock();
readLock.lock();
try {
Expand Down Expand Up @@ -641,7 +641,7 @@ public Set<String> writeSnapshot(String snapshotPrefix) {
}

@Override
public Set<String> readSnapshot(String snapshotPrefix) {
public Set<String> resumeSnapshot(String snapshotPrefix) {
Lock readLock = this.storeLock.readLock();
readLock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5489,7 +5489,11 @@ public void testRemoveEdgesOfSuperVertex() {
guido.remove();

// Clear all
graph.truncateBackend();
try {
graph.truncateBackend();
} catch (UnsupportedOperationException ignored) {
// pass
}
}

@Test
Expand Down

0 comments on commit 1c8460b

Please sign in to comment.