Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): cluster role automatic management #1943

Merged
merged 9 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@

public class RoleElectionStateMachineImpl implements RoleElectionStateMachine {

private volatile boolean shutdown = false;
private volatile boolean shutdown;
private final Config config;
private volatile RoleState state;
private final RoleStataDataAdapter roleStataDataAdapter;
private final RoleTypeDataAdapter roleTypeDataAdapter;

public RoleElectionStateMachineImpl(Config config, RoleStataDataAdapter adapter) {
public RoleElectionStateMachineImpl(Config config, RoleTypeDataAdapter adapter) {
this.config = config;
this.roleStataDataAdapter = adapter;
this.roleTypeDataAdapter = adapter;
this.state = new UnKnownState(null);
this.shutdown = false;
}

@Override
Expand All @@ -58,7 +59,7 @@ public void apply(StateMachineCallback stateMachineCallback) {
stateMachineCallback.error(context, e);
failCount ++;
if (failCount >= this.config.exceedsFailCount()) {
this.state = new SafeState(context.epoch());
this.state = new AbdicationState(context.epoch());
Callback runnable = this.state.callback(stateMachineCallback);
runnable.call(context);
}
Expand Down Expand Up @@ -103,7 +104,7 @@ public UnKnownState(Integer epoch) {

@Override
public RoleState transform(StateMachineContext context) {
RoleStataDataAdapter adapter = context.adapter();
RoleTypeDataAdapter adapter = context.adapter();
Optional<RoleStateData> stateDataOpt = adapter.query();
if (!stateDataOpt.isPresent()) {
context.reset();
Expand Down Expand Up @@ -134,11 +135,11 @@ public Callback callback(StateMachineCallback callback) {
}
}

private static class SafeState implements RoleState {
private static class AbdicationState implements RoleState {

private final Integer epoch;

public SafeState(Integer epoch) {
public AbdicationState(Integer epoch) {
this.epoch = epoch;
}

Expand All @@ -150,7 +151,7 @@ public RoleState transform(StateMachineContext context) {

@Override
public Callback callback(StateMachineCallback callback) {
return callback::safe;
return callback::abdication;
}
}

Expand All @@ -164,9 +165,9 @@ public MasterState(RoleStateData stateData) {

@Override
public RoleState transform(StateMachineContext context) {
this.stateData.increaseCount();
this.stateData.increaseClock();
RoleState.heartBeatPark(context);
if (context.adapter().delayIfNodePresent(this.stateData, -1)) {
if (context.adapter().updateIfNodePresent(this.stateData)) {
return this;
}
context.reset();
Expand All @@ -183,10 +184,11 @@ public Callback callback(StateMachineCallback callback) {
private static class WorkerState implements RoleState {

private RoleStateData stateData;
private int count = 0;
private int count;
zyxxoo marked this conversation as resolved.
Show resolved Hide resolved

public WorkerState(RoleStateData stateData) {
this.stateData = stateData;
this.count = 0;
}

@Override
Expand Down Expand Up @@ -217,10 +219,10 @@ public void merge(WorkerState state) {
} else if (state.stateData.epoch() < this.stateData.epoch()){
throw new IllegalStateException("Epoch must increase");
} else if (state.stateData.epoch() == this.stateData.epoch() &&
state.stateData.count() < this.stateData.count()) {
throw new IllegalStateException("Meta count must increase");
state.stateData.clock() < this.stateData.clock()) {
throw new IllegalStateException("Clock must increase");
} else if (state.stateData.epoch() == this.stateData.epoch() &&
state.stateData.count() > this.stateData.count()) {
state.stateData.clock() > this.stateData.clock()) {
this.count = 0;
this.stateData = state.stateData;
} else {
Expand All @@ -244,10 +246,10 @@ public RoleState transform(StateMachineContext context) {
RoleStateData stateData = new RoleStateData(context.config().node(), epoch);
//failover to master success
context.epoch(stateData.epoch());
if (context.adapter().delayIfNodePresent(stateData, -1)) {
if (context.adapter().updateIfNodePresent(stateData)) {
return new MasterState(stateData);
} else {
return new WorkerState(stateData);
return new UnKnownState(epoch).transform(context);
zyxxoo marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -284,7 +286,7 @@ public void epoch(Integer epoch) {
}

@Override
public RoleStataDataAdapter adapter() {
public RoleTypeDataAdapter adapter() {
return this.machine.adapter();
}

Expand All @@ -304,7 +306,7 @@ public void reset() {
}
}

protected RoleStataDataAdapter adapter() {
return this.roleStataDataAdapter;
protected RoleTypeDataAdapter adapter() {
return this.roleTypeDataAdapter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@
public class RoleStateData {

private String node;
private long count;
private long clock;
private int epoch;

public RoleStateData(String node, int epoch) {
this(node, epoch, 1);
}

public RoleStateData(String node, int epoch, long count) {
public RoleStateData(String node, int epoch, long clock) {
this.node = node;
this.epoch = epoch;
this.count = count;
this.clock = clock;
}

public void increaseCount() {
this.count++;
public void increaseClock() {
this.clock++;
}

public boolean isMaster(String node) {
Expand All @@ -49,42 +49,42 @@ public int epoch() {
return this.epoch;
}

public long count() {
return this.count;
public long clock() {
return this.clock;
}

public void count(long count) {
this.count = count;
public void clock(long clock) {
this.clock = clock;
}

public String node() {
return this.node;
}

@Override
public boolean equals(Object o) {
if (this == o) {
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(o instanceof RoleStateData)) {
if (!(obj instanceof RoleStateData)) {
return false;
}
RoleStateData metaData = (RoleStateData) o;
return count == metaData.count &&
RoleStateData metaData = (RoleStateData) obj;
return clock == metaData.clock &&
epoch == metaData.epoch &&
Objects.equals(node, metaData.node);
}

@Override
public int hashCode() {
return Objects.hash(node, count, epoch);
return Objects.hash(node, clock, epoch);
}

@Override
public String toString() {
return "RoleStateData{" +
"node='" + node + '\'' +
", count=" + count +
", clock=" + clock +
", epoch=" + epoch +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@

import java.util.Optional;

public interface RoleStataDataAdapter {
public interface RoleTypeDataAdapter {
zyxxoo marked this conversation as resolved.
Show resolved Hide resolved

boolean delayIfNodePresent(RoleStateData metaData, long delaySecond);

Optional<RoleStateData> queryWithDelay(long delaySecond);
boolean updateIfNodePresent(RoleStateData stateData);

Optional<RoleStateData> query();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface StateMachineCallback {

void unknown(StateMachineContext context);

void safe(StateMachineContext context);
void abdication(StateMachineContext context);

void error(StateMachineContext context, Throwable e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface StateMachineContext {

Config config();

RoleStataDataAdapter adapter();
RoleTypeDataAdapter adapter();

void reset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.LockSupport;

import org.junit.Assert;
import org.junit.Test;

import com.baidu.hugegraph.election.Config;
import com.baidu.hugegraph.election.RoleStateData;
import com.baidu.hugegraph.election.RoleStataDataAdapter;
import com.baidu.hugegraph.election.RoleTypeDataAdapter;
import com.baidu.hugegraph.election.RoleElectionStateMachine;
import com.baidu.hugegraph.election.RoleElectionStateMachineImpl;
import com.baidu.hugegraph.election.StateMachineCallback;
import com.baidu.hugegraph.election.StateMachineContext;
import com.baidu.hugegraph.testutil.Assert;

public class RoleElectionStateMachineTest {

Expand All @@ -57,7 +57,7 @@ enum Role {
master,
worker,
candidate,
safe,
abdication,
unknown
}

Expand All @@ -68,11 +68,12 @@ public LogEntry(Integer epoch, String node, Role role) {
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof LogEntry)) return false;
LogEntry logEntry = (LogEntry) o;
return Objects.equals(epoch, logEntry.epoch) && Objects.equals(node, logEntry.node) && role == logEntry.role;
public boolean equals(Object obj) {
if (this == obj) return true;
zyxxoo marked this conversation as resolved.
Show resolved Hide resolved
if (!(obj instanceof LogEntry)) return false;
LogEntry logEntry = (LogEntry) obj;
return Objects.equals(epoch, logEntry.epoch) &&
Objects.equals(node, logEntry.node) && role == logEntry.role;
}

@Override
Expand Down Expand Up @@ -180,10 +181,10 @@ public void unknown(StateMachineContext context) {
}

@Override
public void safe(StateMachineContext context) {
public void abdication(StateMachineContext context) {
Integer epochId = context.epoch();
String node = context.node();
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.safe));
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.abdication));
if (logRecords.size() > MAX_COUNT) {
context.stateMachine().shutdown();
}
Expand All @@ -196,7 +197,7 @@ public void error(StateMachineContext context, Throwable e) {
};
zyxxoo marked this conversation as resolved.
Show resolved Hide resolved

final List<RoleStateData> metaDataLogs = Collections.synchronizedList(new ArrayList<>(100));
final RoleStataDataAdapter adapter = new RoleStataDataAdapter() {
final RoleTypeDataAdapter adapter = new RoleTypeDataAdapter() {

volatile int epoch = 0;

Expand All @@ -206,14 +207,11 @@ RoleStateData copy(RoleStateData stateData) {
if (stateData == null) {
return null;
}
return new RoleStateData(stateData.node(), stateData.epoch(), stateData.count());
return new RoleStateData(stateData.node(), stateData.epoch(), stateData.clock());
}

@Override
public boolean delayIfNodePresent(RoleStateData stateData, long delaySecond) {
if (delaySecond > 0) {
LockSupport.parkNanos(delaySecond * 1_000_000_000);
}
public boolean updateIfNodePresent(RoleStateData stateData) {
if (stateData.epoch() < this.epoch) {
return false;
}
Expand All @@ -230,10 +228,10 @@ public boolean delayIfNodePresent(RoleStateData stateData, long delaySecond) {

Assert.assertEquals(value.epoch(), copy.epoch());
if (Objects.equals(value.node(), copy.node()) &&
value.count() <= copy.count()) {
value.clock() <= copy.clock()) {
System.out.println("----2" + copy);
zyxxoo marked this conversation as resolved.
Show resolved Hide resolved
metaDataLogs.add(copy);
if (value.count() == copy.count()) {
if (value.clock() == copy.clock()) {
Exception e = new Exception("eq");
e.printStackTrace();
zyxxoo marked this conversation as resolved.
Show resolved Hide resolved
}
Expand All @@ -245,12 +243,6 @@ public boolean delayIfNodePresent(RoleStateData stateData, long delaySecond) {
return Objects.equals(newData, copy);
}

@Override
public Optional<RoleStateData> queryWithDelay(long delaySecond) {
LockSupport.parkNanos(delaySecond * 1_000_000_000);
return Optional.ofNullable(this.copy(this.data.get(this.epoch)));
}

@Override
public Optional<RoleStateData> query() {
return Optional.ofNullable(this.copy(this.data.get(this.epoch)));
Expand Down Expand Up @@ -308,7 +300,7 @@ public Optional<RoleStateData> query() {
randomShutdown.start();
stop.await();

Assert.assertTrue(logRecords.size() > 0);
Assert.assertGt(0, logRecords.size());
Map<Integer, String> masters = new HashMap<>();
for (LogEntry entry: logRecords) {
if (entry.role == LogEntry.Role.master) {
Expand All @@ -319,6 +311,6 @@ public Optional<RoleStateData> query() {
}
}

Assert.assertTrue(masters.size() > 0);
Assert.assertGt(0, masters.size());
}
}