Skip to content

Commit

Permalink
improve code
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxxoo committed Aug 17, 2022
1 parent f342b47 commit b458556
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@

public class RoleElectionStateMachineImpl implements RoleElectionStateMachine {

private volatile boolean shutdown = false;
private volatile boolean shutdown;
private final Config config;
private volatile RoleState state;
private final RoleStataDataAdapter roleStataDataAdapter;
private final RoleTypeDataAdapter roleTypeDataAdapter;

public RoleElectionStateMachineImpl(Config config, RoleStataDataAdapter adapter) {
public RoleElectionStateMachineImpl(Config config, RoleTypeDataAdapter adapter) {
this.config = config;
this.roleStataDataAdapter = adapter;
this.roleTypeDataAdapter = adapter;
this.state = new UnKnownState(null);
this.shutdown = false;
}

@Override
Expand All @@ -58,7 +59,7 @@ public void apply(StateMachineCallback stateMachineCallback) {
stateMachineCallback.error(context, e);
failCount ++;
if (failCount >= this.config.exceedsFailCount()) {
this.state = new SafeState(context.epoch());
this.state = new AbdicationState(context.epoch());
Callback runnable = this.state.callback(stateMachineCallback);
runnable.call(context);
}
Expand Down Expand Up @@ -103,7 +104,7 @@ public UnKnownState(Integer epoch) {

@Override
public RoleState transform(StateMachineContext context) {
RoleStataDataAdapter adapter = context.adapter();
RoleTypeDataAdapter adapter = context.adapter();
Optional<RoleStateData> stateDataOpt = adapter.query();
if (!stateDataOpt.isPresent()) {
context.reset();
Expand Down Expand Up @@ -134,11 +135,11 @@ public Callback callback(StateMachineCallback callback) {
}
}

private static class SafeState implements RoleState {
private static class AbdicationState implements RoleState {

private final Integer epoch;

public SafeState(Integer epoch) {
public AbdicationState(Integer epoch) {
this.epoch = epoch;
}

Expand All @@ -150,7 +151,7 @@ public RoleState transform(StateMachineContext context) {

@Override
public Callback callback(StateMachineCallback callback) {
return callback::safe;
return callback::abdication;
}
}

Expand All @@ -164,9 +165,9 @@ public MasterState(RoleStateData stateData) {

@Override
public RoleState transform(StateMachineContext context) {
this.stateData.increaseCount();
this.stateData.increaseClock();
RoleState.heartBeatPark(context);
if (context.adapter().delayIfNodePresent(this.stateData, -1)) {
if (context.adapter().updateIfNodePresent(this.stateData, -1)) {
return this;
}
context.reset();
Expand All @@ -183,10 +184,11 @@ public Callback callback(StateMachineCallback callback) {
private static class WorkerState implements RoleState {

private RoleStateData stateData;
private int count = 0;
private int count;

public WorkerState(RoleStateData stateData) {
this.stateData = stateData;
this.count = 0;
}

@Override
Expand Down Expand Up @@ -217,10 +219,10 @@ public void merge(WorkerState state) {
} else if (state.stateData.epoch() < this.stateData.epoch()){
throw new IllegalStateException("Epoch must increase");
} else if (state.stateData.epoch() == this.stateData.epoch() &&
state.stateData.count() < this.stateData.count()) {
throw new IllegalStateException("Meta count must increase");
state.stateData.clock() < this.stateData.clock()) {
throw new IllegalStateException("Clock must increase");
} else if (state.stateData.epoch() == this.stateData.epoch() &&
state.stateData.count() > this.stateData.count()) {
state.stateData.clock() > this.stateData.clock()) {
this.count = 0;
this.stateData = state.stateData;
} else {
Expand All @@ -244,10 +246,10 @@ public RoleState transform(StateMachineContext context) {
RoleStateData stateData = new RoleStateData(context.config().node(), epoch);
//failover to master success
context.epoch(stateData.epoch());
if (context.adapter().delayIfNodePresent(stateData, -1)) {
if (context.adapter().updateIfNodePresent(stateData, -1)) {
return new MasterState(stateData);
} else {
return new WorkerState(stateData);
return new UnKnownState(epoch).transform(context);
}
}

Expand Down Expand Up @@ -284,7 +286,7 @@ public void epoch(Integer epoch) {
}

@Override
public RoleStataDataAdapter adapter() {
public RoleTypeDataAdapter adapter() {
return this.machine.adapter();
}

Expand All @@ -304,7 +306,7 @@ public void reset() {
}
}

protected RoleStataDataAdapter adapter() {
return this.roleStataDataAdapter;
protected RoleTypeDataAdapter adapter() {
return this.roleTypeDataAdapter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@
public class RoleStateData {

private String node;
private long count;
private long clock;
private int epoch;

public RoleStateData(String node, int epoch) {
this(node, epoch, 1);
}

public RoleStateData(String node, int epoch, long count) {
public RoleStateData(String node, int epoch, long clock) {
this.node = node;
this.epoch = epoch;
this.count = count;
this.clock = clock;
}

public void increaseCount() {
this.count++;
public void increaseClock() {
this.clock++;
}

public boolean isMaster(String node) {
Expand All @@ -49,42 +49,42 @@ public int epoch() {
return this.epoch;
}

public long count() {
return this.count;
public long clock() {
return this.clock;
}

public void count(long count) {
this.count = count;
public void clock(long clock) {
this.clock = clock;
}

public String node() {
return this.node;
}

@Override
public boolean equals(Object o) {
if (this == o) {
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(o instanceof RoleStateData)) {
if (!(obj instanceof RoleStateData)) {
return false;
}
RoleStateData metaData = (RoleStateData) o;
return count == metaData.count &&
RoleStateData metaData = (RoleStateData) obj;
return clock == metaData.clock &&
epoch == metaData.epoch &&
Objects.equals(node, metaData.node);
}

@Override
public int hashCode() {
return Objects.hash(node, count, epoch);
return Objects.hash(node, clock, epoch);
}

@Override
public String toString() {
return "RoleStateData{" +
"node='" + node + '\'' +
", count=" + count +
", clock=" + clock +
", epoch=" + epoch +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

import java.util.Optional;

public interface RoleStataDataAdapter {
public interface RoleTypeDataAdapter {

boolean delayIfNodePresent(RoleStateData metaData, long delaySecond);
boolean updateIfNodePresent(RoleStateData metaData, long delaySecond);

Optional<RoleStateData> queryWithDelay(long delaySecond);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface StateMachineCallback {

void unknown(StateMachineContext context);

void safe(StateMachineContext context);
void abdication(StateMachineContext context);

void error(StateMachineContext context, Throwable e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface StateMachineContext {

Config config();

RoleStataDataAdapter adapter();
RoleTypeDataAdapter adapter();

void reset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.LockSupport;

import org.junit.Assert;
import org.junit.Test;

import com.baidu.hugegraph.election.Config;
import com.baidu.hugegraph.election.RoleStateData;
import com.baidu.hugegraph.election.RoleStataDataAdapter;
import com.baidu.hugegraph.election.RoleTypeDataAdapter;
import com.baidu.hugegraph.election.RoleElectionStateMachine;
import com.baidu.hugegraph.election.RoleElectionStateMachineImpl;
import com.baidu.hugegraph.election.StateMachineCallback;
import com.baidu.hugegraph.election.StateMachineContext;
import com.baidu.hugegraph.testutil.Assert;

public class RoleElectionStateMachineTest {

Expand All @@ -57,7 +57,7 @@ enum Role {
master,
worker,
candidate,
safe,
abdication,
unknown
}

Expand All @@ -68,11 +68,12 @@ public LogEntry(Integer epoch, String node, Role role) {
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof LogEntry)) return false;
LogEntry logEntry = (LogEntry) o;
return Objects.equals(epoch, logEntry.epoch) && Objects.equals(node, logEntry.node) && role == logEntry.role;
public boolean equals(Object obj) {
if (this == obj) return true;
if (!(obj instanceof LogEntry)) return false;
LogEntry logEntry = (LogEntry) obj;
return Objects.equals(epoch, logEntry.epoch) &&
Objects.equals(node, logEntry.node) && role == logEntry.role;
}

@Override
Expand Down Expand Up @@ -180,10 +181,10 @@ public void unknown(StateMachineContext context) {
}

@Override
public void safe(StateMachineContext context) {
public void abdication(StateMachineContext context) {
Integer epochId = context.epoch();
String node = context.node();
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.safe));
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.abdication));
if (logRecords.size() > MAX_COUNT) {
context.stateMachine().shutdown();
}
Expand All @@ -196,7 +197,7 @@ public void error(StateMachineContext context, Throwable e) {
};

final List<RoleStateData> metaDataLogs = Collections.synchronizedList(new ArrayList<>(100));
final RoleStataDataAdapter adapter = new RoleStataDataAdapter() {
final RoleTypeDataAdapter adapter = new RoleTypeDataAdapter() {

volatile int epoch = 0;

Expand All @@ -206,11 +207,11 @@ RoleStateData copy(RoleStateData stateData) {
if (stateData == null) {
return null;
}
return new RoleStateData(stateData.node(), stateData.epoch(), stateData.count());
return new RoleStateData(stateData.node(), stateData.epoch(), stateData.clock());
}

@Override
public boolean delayIfNodePresent(RoleStateData stateData, long delaySecond) {
public boolean updateIfNodePresent(RoleStateData stateData, long delaySecond) {
if (delaySecond > 0) {
LockSupport.parkNanos(delaySecond * 1_000_000_000);
}
Expand All @@ -230,10 +231,10 @@ public boolean delayIfNodePresent(RoleStateData stateData, long delaySecond) {

Assert.assertEquals(value.epoch(), copy.epoch());
if (Objects.equals(value.node(), copy.node()) &&
value.count() <= copy.count()) {
value.clock() <= copy.clock()) {
System.out.println("----2" + copy);
metaDataLogs.add(copy);
if (value.count() == copy.count()) {
if (value.clock() == copy.clock()) {
Exception e = new Exception("eq");
e.printStackTrace();
}
Expand Down Expand Up @@ -308,7 +309,7 @@ public Optional<RoleStateData> query() {
randomShutdown.start();
stop.await();

Assert.assertTrue(logRecords.size() > 0);
Assert.assertGt(0, logRecords.size());
Map<Integer, String> masters = new HashMap<>();
for (LogEntry entry: logRecords) {
if (entry.role == LogEntry.Role.master) {
Expand All @@ -319,6 +320,6 @@ public Optional<RoleStateData> query() {
}
}

Assert.assertTrue(masters.size() > 0);
Assert.assertGt(0, masters.size());
}
}

0 comments on commit b458556

Please sign in to comment.