Skip to content

Commit

Permalink
improve code
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxxoo committed Aug 17, 2022
1 parent f3c5ab6 commit 664c614
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,27 +105,27 @@ public UnKnownState(Integer epoch) {
@Override
public RoleState transform(StateMachineContext context) {
RoleTypeDataAdapter adapter = context.adapter();
Optional<RoleStateData> stateDataOpt = adapter.query();
if (!stateDataOpt.isPresent()) {
Optional<RoleTypeData> roleTypeDataOpt = adapter.query();
if (!roleTypeDataOpt.isPresent()) {
context.reset();
Integer nextEpoch = this.epoch == null ? 1 : this.epoch + 1;
context.epoch(nextEpoch);
return new CandidateState(nextEpoch);
}

RoleStateData stateData = stateDataOpt.get();
if (this.epoch != null && stateData.epoch() < this.epoch) {
RoleTypeData roleTypeData = roleTypeDataOpt.get();
if (this.epoch != null && roleTypeData.epoch() < this.epoch) {
context.reset();
Integer nextEpoch = this.epoch + 1;
context.epoch(nextEpoch);
return new CandidateState(nextEpoch);
}

context.epoch(stateData.epoch());
if (stateData.isMaster(context.node())) {
return new MasterState(stateData);
context.epoch(roleTypeData.epoch());
if (roleTypeData.isMaster(context.node())) {
return new MasterState(roleTypeData);
} else {
return new WorkerState(stateData);
return new WorkerState(roleTypeData);
}
}

Expand Down Expand Up @@ -157,22 +157,22 @@ public Callback callback(StateMachineCallback callback) {

private static class MasterState implements RoleState {

private final RoleStateData stateData;
private final RoleTypeData roleTypeData;

public MasterState(RoleStateData stateData) {
this.stateData = stateData;
public MasterState(RoleTypeData roleTypeData) {
this.roleTypeData = roleTypeData;
}

@Override
public RoleState transform(StateMachineContext context) {
this.stateData.increaseClock();
this.roleTypeData.increaseClock();
RoleState.heartBeatPark(context);
if (context.adapter().updateIfNodePresent(this.stateData)) {
if (context.adapter().updateIfNodePresent(this.roleTypeData)) {
return this;
}
context.reset();
context.epoch(this.stateData.epoch());
return new UnKnownState(this.stateData.epoch()).transform(context);
context.epoch(this.roleTypeData.epoch());
return new UnKnownState(this.roleTypeData.epoch()).transform(context);
}

@Override
Expand All @@ -183,22 +183,22 @@ public Callback callback(StateMachineCallback callback) {

private static class WorkerState implements RoleState {

private RoleStateData stateData;
private int count;
private RoleTypeData roleTypeData;
private int clock;

public WorkerState(RoleStateData stateData) {
this.stateData = stateData;
this.count = 0;
public WorkerState(RoleTypeData roleTypeData) {
this.roleTypeData = roleTypeData;
this.clock = 0;
}

@Override
public RoleState transform(StateMachineContext context) {
RoleState.heartBeatPark(context);
RoleState nextState = new UnKnownState(this.stateData.epoch()).transform(context);
RoleState nextState = new UnKnownState(this.roleTypeData.epoch()).transform(context);
if (nextState instanceof WorkerState) {
this.merge((WorkerState) nextState);
if (this.count > context.config().exceedsWorkerCount()) {
return new CandidateState(this.stateData.epoch() + 1);
if (this.clock > context.config().exceedsWorkerCount()) {
return new CandidateState(this.roleTypeData.epoch() + 1);
} else {
return this;
}
Expand All @@ -213,20 +213,20 @@ public Callback callback(StateMachineCallback callback) {
}

public void merge(WorkerState state) {
if (state.stateData.epoch() > this.stateData.epoch()) {
this.count = 0;
this.stateData = state.stateData;
} else if (state.stateData.epoch() < this.stateData.epoch()){
if (state.roleTypeData.epoch() > this.roleTypeData.epoch()) {
this.clock = 0;
this.roleTypeData = state.roleTypeData;
} else if (state.roleTypeData.epoch() < this.roleTypeData.epoch()){
throw new IllegalStateException("Epoch must increase");
} else if (state.stateData.epoch() == this.stateData.epoch() &&
state.stateData.clock() < this.stateData.clock()) {
} else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() &&
state.roleTypeData.clock() < this.roleTypeData.clock()) {
throw new IllegalStateException("Clock must increase");
} else if (state.stateData.epoch() == this.stateData.epoch() &&
state.stateData.clock() > this.stateData.clock()) {
this.count = 0;
this.stateData = state.stateData;
} else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() &&
state.roleTypeData.clock() > this.roleTypeData.clock()) {
this.clock = 0;
this.roleTypeData = state.roleTypeData;
} else {
this.count++;
this.clock++;
}
}
}
Expand All @@ -243,11 +243,11 @@ public CandidateState(Integer epoch) {
public RoleState transform(StateMachineContext context) {
RoleState.randomPark(context);
int epoch = this.epoch == null ? 1 : this.epoch;
RoleStateData stateData = new RoleStateData(context.config().node(), epoch);
RoleTypeData roleTypeData = new RoleTypeData(context.config().node(), epoch);
//failover to master success
context.epoch(stateData.epoch());
if (context.adapter().updateIfNodePresent(stateData)) {
return new MasterState(stateData);
context.epoch(roleTypeData.epoch());
if (context.adapter().updateIfNodePresent(roleTypeData)) {
return new MasterState(roleTypeData);
} else {
return new UnKnownState(epoch).transform(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@

import java.util.Objects;

public class RoleStateData {
public class RoleTypeData {

private String node;
private long clock;
private int epoch;

public RoleStateData(String node, int epoch) {
public RoleTypeData(String node, int epoch) {
this(node, epoch, 1);
}

public RoleStateData(String node, int epoch, long clock) {
public RoleTypeData(String node, int epoch, long clock) {
this.node = node;
this.epoch = epoch;
this.clock = clock;
Expand Down Expand Up @@ -66,10 +66,10 @@ public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof RoleStateData)) {
if (!(obj instanceof RoleTypeData)) {
return false;
}
RoleStateData metaData = (RoleStateData) obj;
RoleTypeData metaData = (RoleTypeData) obj;
return clock == metaData.clock &&
epoch == metaData.epoch &&
Objects.equals(node, metaData.node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public interface RoleTypeDataAdapter {

boolean updateIfNodePresent(RoleStateData stateData);
boolean updateIfNodePresent(RoleTypeData stateData);

Optional<RoleStateData> query();
Optional<RoleTypeData> query();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.junit.Test;

import com.baidu.hugegraph.election.Config;
import com.baidu.hugegraph.election.RoleStateData;
import com.baidu.hugegraph.election.RoleTypeData;
import com.baidu.hugegraph.election.RoleTypeDataAdapter;
import com.baidu.hugegraph.election.RoleElectionStateMachine;
import com.baidu.hugegraph.election.RoleElectionStateMachineImpl;
Expand Down Expand Up @@ -69,8 +69,12 @@ public LogEntry(Integer epoch, String node, Role role) {

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (!(obj instanceof LogEntry)) return false;
if (this == obj) {
return true;
}
if (!(obj instanceof LogEntry)) {
return false;
}
LogEntry logEntry = (LogEntry) obj;
return Objects.equals(epoch, logEntry.epoch) &&
Objects.equals(node, logEntry.node) && role == logEntry.role;
Expand Down Expand Up @@ -196,28 +200,28 @@ public void error(StateMachineContext context, Throwable e) {
}
};

final List<RoleStateData> metaDataLogs = Collections.synchronizedList(new ArrayList<>(100));
final List<RoleTypeData> metaDataLogs = Collections.synchronizedList(new ArrayList<>(100));
final RoleTypeDataAdapter adapter = new RoleTypeDataAdapter() {

volatile int epoch = 0;

final Map<Integer, RoleStateData> data = new ConcurrentHashMap<>();
final Map<Integer, RoleTypeData> data = new ConcurrentHashMap<>();

RoleStateData copy(RoleStateData stateData) {
RoleTypeData copy(RoleTypeData stateData) {
if (stateData == null) {
return null;
}
return new RoleStateData(stateData.node(), stateData.epoch(), stateData.clock());
return new RoleTypeData(stateData.node(), stateData.epoch(), stateData.clock());
}

@Override
public boolean updateIfNodePresent(RoleStateData stateData) {
public boolean updateIfNodePresent(RoleTypeData stateData) {
if (stateData.epoch() < this.epoch) {
return false;
}

RoleStateData copy = this.copy(stateData);
RoleStateData newData = data.compute(copy.epoch(), (key, value) -> {
RoleTypeData copy = this.copy(stateData);
RoleTypeData newData = data.compute(copy.epoch(), (key, value) -> {
if (copy.epoch() > this.epoch) {
this.epoch = copy.epoch();
Assert.assertNull(value);
Expand All @@ -244,7 +248,7 @@ public boolean updateIfNodePresent(RoleStateData stateData) {
}

@Override
public Optional<RoleStateData> query() {
public Optional<RoleTypeData> query() {
return Optional.ofNullable(this.copy(this.data.get(this.epoch)));
}
};
Expand Down

0 comments on commit 664c614

Please sign in to comment.