Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix style for raft module #1423

Merged
merged 3 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ public class RaftBackendStore implements BackendStore {

private final BackendStore store;
private final RaftSharedContext context;
private final ThreadLocal<MutationBatch> threadLocalBatch;
private final ThreadLocal<MutationBatch> mutationBatch;

public RaftBackendStore(BackendStore store, RaftSharedContext context) {
this.store = store;
this.context = context;
this.threadLocalBatch = new ThreadLocal<>();
this.mutationBatch = new ThreadLocal<>();
}

public BackendStore originStore() {
Expand Down Expand Up @@ -137,18 +137,18 @@ public void mutate(BackendMutation mutation) {
@Override
@SuppressWarnings("unchecked")
public Iterator<BackendEntry> query(Query query) {
return (Iterator<BackendEntry>) this.queryByRaft(
query, o -> store.query(query));
return (Iterator<BackendEntry>)
this.queryByRaft(query, o -> this.store.query(query));
}

@Override
public Number queryNumber(Query query) {
return (Number) this.queryByRaft(query, o -> store.queryNumber(query));
return (Number) this.queryByRaft(query, o -> this.store.queryNumber(query));
}

@Override
public void beginTx() {
// Don't write raft log, commitTx(in statemachine) will call beginTx
// Don't write raft log, commitTx(in StateMachine) will call beginTx
}

@Override
Expand Down Expand Up @@ -195,7 +195,7 @@ private Object submitAndWait(StoreAction action, byte[] data) {
}

private Object submitAndWait(StoreCommand command) {
StoreClosure closure = new StoreClosure(command);
RaftStoreClosure closure = new RaftStoreClosure(command);
return this.node().submitAndWait(command, closure);
}

Expand All @@ -221,39 +221,40 @@ public void run(Status status, long index, byte[] reqCtx) {
try {
return future.waitFinished();
} catch (Throwable e) {
LOG.warn("Failed to execute query {} with read-index: {}",
LOG.warn("Failed to execute query '{}' with read-index: {}",
query, future.status());
throw new BackendException("Failed to execute query", e);
throw new BackendException("Failed to execute query: %s", e, query);
}
}

private MutationBatch getOrNewBatch() {
MutationBatch batch = this.mutationBatch.get();
if (batch == null) {
batch = new MutationBatch();
this.mutationBatch.set(batch);
}
return batch;
}

private static class MutationBatch {

private List<BackendMutation> mutations;
// This object will stay in memory for a long time
private final List<BackendMutation> mutations;

public MutationBatch() {
this.mutations = new ArrayList<>();
this.mutations = new ArrayList<>((int) Query.COMMIT_BATCH);
}

public void add(BackendMutation mutation) {
this.mutations.add(mutation);
}

public void clear() {
this.mutations = new ArrayList<>();
this.mutations.clear();
}
}

private MutationBatch getOrNewBatch() {
MutationBatch batch = this.threadLocalBatch.get();
if (batch == null) {
batch = new MutationBatch();
this.threadLocalBatch.set(batch);
}
return batch;
}

public static class IncrCounter {
protected static final class IncrCounter {

private HugeType type;
private long increment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public void open(String name) {
public void waitStoreStarted() {
this.context.initRaftNode();
LOG.info("The raft node is initialized");

this.context.waitRaftNodeStarted();
LOG.info("The raft store is started");
}
Expand Down Expand Up @@ -202,7 +203,7 @@ public void initSystemInfo(HugeGraph graph) {
public void createSnapshot() {
StoreCommand command = new StoreCommand(StoreType.ALL,
StoreAction.SNAPSHOT, null);
StoreClosure closure = new StoreClosure(command);
RaftStoreClosure closure = new RaftStoreClosure(command);
this.context.node().submitAndWait(command, closure);
LOG.debug("Graph '{}' has writed snapshot", this.graph());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package com.baidu.hugegraph.backend.store.raft;

import static com.baidu.hugegraph.backend.store.raft.RaftSharedContext.WAIT_RAFT_LOG_TIMEOUT;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,7 +34,7 @@

public class RaftClosure<T> implements Closure {

private static final Logger LOG = Log.logger(StoreClosure.class);
private static final Logger LOG = Log.logger(RaftStoreClosure.class);

private final CompletableFuture<RaftResult<T>> future;

Expand All @@ -59,7 +57,8 @@ public Status status() {

private RaftResult<T> get() {
try {
return this.future.get(WAIT_RAFT_LOG_TIMEOUT, TimeUnit.MILLISECONDS);
return this.future.get(RaftSharedContext.WAIT_RAFTLOG_TIMEOUT,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new BackendException("ExecutionException", e);
} catch (InterruptedException e) {
Expand All @@ -83,7 +82,7 @@ public void run(Status status) {
this.complete(status, () -> null);
} else {
LOG.error("Failed to apply command: {}", status);
String msg = "Failed to apply command in raft node with error : " +
String msg = "Failed to apply command in raft node with error: " +
status.getErrorMsg();
this.failure(status, new BackendException(msg));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public List<String> listPeers() {
public String getLeader() {
PeerId leaderId = this.raftNode.leaderId();
E.checkState(leaderId != null,
"There is no leader for raft group %s", this.group);
"There is no leader for raft group '%s'", this.group);
return leaderId.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package com.baidu.hugegraph.backend.store.raft;

import static com.baidu.hugegraph.backend.store.raft.RaftSharedContext.BUSY_SLEEP_FACTOR;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -44,7 +42,7 @@
import com.baidu.hugegraph.util.LZ4Util;
import com.baidu.hugegraph.util.Log;

public class RaftNode {
public final class RaftNode {

private static final Logger LOG = Log.logger(RaftNode.class);

Expand All @@ -63,7 +61,7 @@ public RaftNode(RaftSharedContext context) {
} catch (IOException e) {
throw new BackendException("Failed to init raft node", e);
}
this.node.addReplicatorStateListener(new RaftNodeStateListener());
this.node.addReplicatorStateListener(new RaftStateListener());
this.leaderInfo = new AtomicReference<>(LeaderInfo.NO_LEADER);
this.started = new AtomicBoolean(false);
this.busyCounter = new AtomicInteger();
Expand All @@ -74,6 +72,7 @@ public RaftSharedContext context() {
}

public Node node() {
assert this.node != null;
return this.node;
}

Expand Down Expand Up @@ -107,7 +106,7 @@ public void snapshot() {
this.node().snapshot(future);
future.waitFinished();
} catch (Throwable e) {
throw new BackendException("Failed to generate snapshot", e);
throw new BackendException("Failed to create snapshot", e);
}
}

Expand All @@ -126,7 +125,7 @@ private Node initRaftNode() throws IOException {
return raftGroupService.start(false);
}

private void submitCommand(StoreCommand command, StoreClosure closure) {
private void submitCommand(StoreCommand command, RaftStoreClosure closure) {
// Wait leader elected
LeaderInfo leaderInfo = this.waitLeaderElected(
RaftSharedContext.NO_TIMEOUT);
Expand All @@ -152,7 +151,7 @@ private void submitCommand(StoreCommand command, StoreClosure closure) {
this.node.apply(task);
}

public Object submitAndWait(StoreCommand command, StoreClosure future) {
public Object submitAndWait(StoreCommand command, RaftStoreClosure future) {
this.submitCommand(command, future);
try {
/*
Expand All @@ -177,9 +176,8 @@ protected LeaderInfo waitLeaderElected(int timeout) {
try {
Thread.sleep(RaftSharedContext.POLL_INTERVAL);
} catch (InterruptedException e) {
throw new BackendException(
"Waiting for raft group '%s' election is interrupted",
e, group);
LOG.info("Waiting for raft group '{}' election is " +
"interrupted: {}", group, e);
}
long consumedTime = System.currentTimeMillis() - beginTime;
if (timeout > 0 && consumedTime >= timeout) {
Expand Down Expand Up @@ -212,8 +210,7 @@ public void run(Status status, long index, byte[] reqCtx) {
try {
Thread.sleep(RaftSharedContext.POLL_INTERVAL);
} catch (InterruptedException e) {
throw new BackendException("Try to sleep a while for waiting " +
"heartbeat is interrupted", e);
LOG.info("Waiting for heartbeat is interrupted: {}", e);
}
long consumedTime = System.currentTimeMillis() - beginTime;
if (timeout > 0 && consumedTime >= timeout) {
Expand All @@ -232,13 +229,13 @@ private void waitIfBusy() {
return;
}
// It may lead many thread sleep, but this is exactly what I want
long time = counter * BUSY_SLEEP_FACTOR;
LOG.info("The node {} will sleep {} ms", this.node, time);
long time = counter * RaftSharedContext.BUSY_SLEEP_FACTOR;
LOG.info("The node {} will try to sleep {} ms", this.node, time);
try {
Thread.sleep(time);
} catch (InterruptedException e) {
// throw busy exception
throw new BackendException("The raft backend store is busy");
// Throw busy exception if the request is timeout
throw new BackendException("The raft backend store is busy", e);
} finally {
if (this.busyCounter.get() > 0) {
synchronized (this) {
Expand All @@ -256,13 +253,11 @@ public String toString() {
return String.format("[%s-%s]", this.context.group(), this.nodeId());
}

private class RaftNodeStateListener implements ReplicatorStateListener {
protected final class RaftStateListener implements ReplicatorStateListener {

// unit is ms
private static final long ERROR_PRINT_INTERVAL = 60 * 1000;
private volatile long lastPrintTime;

public RaftNodeStateListener() {
public RaftStateListener() {
this.lastPrintTime = 0L;
}

Expand All @@ -271,17 +266,23 @@ public void onCreated(PeerId peer) {
LOG.info("The node {} replicator has created", peer);
}

@Override
public void onDestroyed(PeerId peer) {
LOG.warn("Replicator '{}' is ready to go offline", peer);
}

@Override
public void onError(PeerId peer, Status status) {
long now = System.currentTimeMillis();
if (now - this.lastPrintTime >= ERROR_PRINT_INTERVAL) {
long interval = now - this.lastPrintTime;
if (interval >= RaftSharedContext.LOG_WARN_INTERVAL) {
LOG.warn("Replicator meet error: {}", status);
this.lastPrintTime = now;
}
if (this.isWriteBufferOverflow(status)) {
// increment busy counter
// Increment busy counter
int count = RaftNode.this.busyCounter.incrementAndGet();
LOG.info("Increase busy counter: [{}]", count);
LOG.info("Increase busy counter due to overflow: [{}]", count);
}
}

Expand Down Expand Up @@ -312,11 +313,6 @@ private boolean isRpcTimeout(Status status) {
status.getErrorMsg() != null &&
status.getErrorMsg().contains(expectMsg);
}

@Override
public void onDestroyed(PeerId peer) {
LOG.warn("Replicator {} prepare to offline", peer);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.alipay.sofa.jraft.Status;
import com.baidu.hugegraph.util.E;

public class RaftResult<T> {
public final class RaftResult<T> {

private final Status status;
private final Supplier<T> callback;
Expand Down
Loading