diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java new file mode 100644 index 0000000000..ad39c0b5ef --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.election; + +public interface Config { + + String node(); + + int exceedsFailCount(); + + long randomTimeoutMillisecond(); + + long heartBeatIntervalSecond(); + + int exceedsWorkerCount(); + + long baseTimeoutMillisecond(); +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java new file mode 100644 index 0000000000..4bc258623b --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java @@ -0,0 +1,27 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.election; + +public interface RoleElectionStateMachine { + + void shutdown(); + + void apply(StateMachineCallback stateMachineCallback); +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java new file mode 100644 index 0000000000..016f51cb8d --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java @@ -0,0 +1,312 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.election; + +import java.security.SecureRandom; +import java.util.Optional; +import java.util.concurrent.locks.LockSupport; + +import com.baidu.hugegraph.util.E; + +public class RoleElectionStateMachineImpl implements RoleElectionStateMachine { + + private volatile boolean shutdown; + private final Config config; + private volatile RoleState state; + private final RoleTypeDataAdapter roleTypeDataAdapter; + + public RoleElectionStateMachineImpl(Config config, RoleTypeDataAdapter adapter) { + this.config = config; + this.roleTypeDataAdapter = adapter; + this.state = new UnknownState(null); + this.shutdown = false; + } + + @Override + public void shutdown() { + this.shutdown = true; + } + + @Override + public void apply(StateMachineCallback stateMachineCallback) { + int failCount = 0; + StateMachineContextImpl context = new StateMachineContextImpl(this); + while (!this.shutdown) { + E.checkArgumentNotNull(this.state, "State don't be null"); + try { + this.state = state.transform(context); + Callback runnable = this.state.callback(stateMachineCallback); + runnable.call(context); + failCount = 0; + } catch (Throwable e) { + stateMachineCallback.error(context, e); + failCount ++; + if (failCount >= this.config.exceedsFailCount()) { + this.state = new AbdicationState(context.epoch()); + Callback runnable = this.state.callback(stateMachineCallback); + runnable.call(context); + } + } + } + } + + private interface RoleState { + + SecureRandom secureRandom = new SecureRandom(); + + RoleState transform(StateMachineContext context); + + Callback callback(StateMachineCallback callback); + + static void heartBeatPark(StateMachineContext context) { + long heartBeatIntervalSecond = context.config().heartBeatIntervalSecond(); + LockSupport.parkNanos(heartBeatIntervalSecond * 1_000_000_000); + } + + static void randomPark(StateMachineContext context) { + long randomTimeout = context.config().randomTimeoutMillisecond(); + long baseTime = context.config().baseTimeoutMillisecond(); + long timeout = (long) (baseTime + (randomTimeout / 10.0 * secureRandom.nextInt(11))); + LockSupport.parkNanos(timeout * 1_000_000); + } + } + + @FunctionalInterface + private interface Callback { + + void call(StateMachineContext context); + } + + private static class UnknownState implements RoleState { + + final Integer epoch; + + public UnknownState(Integer epoch) { + this.epoch = epoch; + } + + @Override + public RoleState transform(StateMachineContext context) { + RoleTypeDataAdapter adapter = context.adapter(); + Optional roleTypeDataOpt = adapter.query(); + if (!roleTypeDataOpt.isPresent()) { + context.reset(); + Integer nextEpoch = this.epoch == null ? 1 : this.epoch + 1; + context.epoch(nextEpoch); + return new CandidateState(nextEpoch); + } + + RoleTypeData roleTypeData = roleTypeDataOpt.get(); + if (this.epoch != null && roleTypeData.epoch() < this.epoch) { + context.reset(); + Integer nextEpoch = this.epoch + 1; + context.epoch(nextEpoch); + return new CandidateState(nextEpoch); + } + + context.epoch(roleTypeData.epoch()); + if (roleTypeData.isMaster(context.node())) { + return new MasterState(roleTypeData); + } else { + return new WorkerState(roleTypeData); + } + } + + @Override + public Callback callback(StateMachineCallback callback) { + return callback::unknown; + } + } + + private static class AbdicationState implements RoleState { + + private final Integer epoch; + + public AbdicationState(Integer epoch) { + this.epoch = epoch; + } + + @Override + public RoleState transform(StateMachineContext context) { + RoleState.heartBeatPark(context); + return new UnknownState(this.epoch).transform(context); + } + + @Override + public Callback callback(StateMachineCallback callback) { + return callback::abdication; + } + } + + private static class MasterState implements RoleState { + + private final RoleTypeData roleTypeData; + + public MasterState(RoleTypeData roleTypeData) { + this.roleTypeData = roleTypeData; + } + + @Override + public RoleState transform(StateMachineContext context) { + this.roleTypeData.increaseClock(); + RoleState.heartBeatPark(context); + if (context.adapter().updateIfNodePresent(this.roleTypeData)) { + return this; + } + context.reset(); + context.epoch(this.roleTypeData.epoch()); + return new UnknownState(this.roleTypeData.epoch()).transform(context); + } + + @Override + public Callback callback(StateMachineCallback callback) { + return callback::master; + } + } + + private static class WorkerState implements RoleState { + + private RoleTypeData roleTypeData; + private int clock; + + public WorkerState(RoleTypeData roleTypeData) { + this.roleTypeData = roleTypeData; + this.clock = 0; + } + + @Override + public RoleState transform(StateMachineContext context) { + RoleState.heartBeatPark(context); + RoleState nextState = new UnknownState(this.roleTypeData.epoch()).transform(context); + if (nextState instanceof WorkerState) { + this.merge((WorkerState) nextState); + if (this.clock > context.config().exceedsWorkerCount()) { + return new CandidateState(this.roleTypeData.epoch() + 1); + } else { + return this; + } + } else { + return nextState; + } + } + + @Override + public Callback callback(StateMachineCallback callback) { + return callback::worker; + } + + public void merge(WorkerState state) { + if (state.roleTypeData.epoch() > this.roleTypeData.epoch()) { + this.clock = 0; + this.roleTypeData = state.roleTypeData; + } else if (state.roleTypeData.epoch() < this.roleTypeData.epoch()){ + throw new IllegalStateException("Epoch must increase"); + } else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() && + state.roleTypeData.clock() < this.roleTypeData.clock()) { + throw new IllegalStateException("Clock must increase"); + } else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() && + state.roleTypeData.clock() > this.roleTypeData.clock()) { + this.clock = 0; + this.roleTypeData = state.roleTypeData; + } else { + this.clock++; + } + } + } + + private static class CandidateState implements RoleState { + + private final Integer epoch; + + public CandidateState(Integer epoch) { + this.epoch = epoch; + } + + @Override + public RoleState transform(StateMachineContext context) { + RoleState.randomPark(context); + int epoch = this.epoch == null ? 1 : this.epoch; + RoleTypeData roleTypeData = new RoleTypeData(context.config().node(), epoch); + //failover to master success + context.epoch(roleTypeData.epoch()); + if (context.adapter().updateIfNodePresent(roleTypeData)) { + return new MasterState(roleTypeData); + } else { + return new UnknownState(epoch).transform(context); + } + } + + @Override + public Callback callback(StateMachineCallback callback) { + return callback::candidate; + } + } + + private static class StateMachineContextImpl implements StateMachineContext { + + private Integer epoch; + private final String node; + private final RoleElectionStateMachineImpl machine; + + public StateMachineContextImpl(RoleElectionStateMachineImpl machine) { + this.node = machine.config.node(); + this.machine = machine; + } + + @Override + public Integer epoch() { + return this.epoch; + } + + @Override + public String node() { + return this.node; + } + + @Override + public void epoch(Integer epoch) { + this.epoch = epoch; + } + + @Override + public RoleTypeDataAdapter adapter() { + return this.machine.adapter(); + } + + @Override + public Config config() { + return this.machine.config; + } + + @Override + public RoleElectionStateMachine stateMachine() { + return this.machine; + } + + @Override + public void reset() { + this.epoch = null; + } + } + + protected RoleTypeDataAdapter adapter() { + return this.roleTypeDataAdapter; + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeData.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeData.java new file mode 100644 index 0000000000..aa60c47ace --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeData.java @@ -0,0 +1,91 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.election; + +import java.util.Objects; + +public class RoleTypeData { + + private String node; + private long clock; + private int epoch; + + public RoleTypeData(String node, int epoch) { + this(node, epoch, 1); + } + + public RoleTypeData(String node, int epoch, long clock) { + this.node = node; + this.epoch = epoch; + this.clock = clock; + } + + public void increaseClock() { + this.clock++; + } + + public boolean isMaster(String node) { + return Objects.equals(this.node, node); + } + + public int epoch() { + return this.epoch; + } + + public long clock() { + return this.clock; + } + + public void clock(long clock) { + this.clock = clock; + } + + public String node() { + return this.node; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof RoleTypeData)) { + return false; + } + RoleTypeData metaData = (RoleTypeData) obj; + return clock == metaData.clock && + epoch == metaData.epoch && + Objects.equals(node, metaData.node); + } + + @Override + public int hashCode() { + return Objects.hash(node, clock, epoch); + } + + @Override + public String toString() { + return "RoleStateData{" + + "node='" + node + '\'' + + ", clock=" + clock + + ", epoch=" + epoch + + '}'; + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java new file mode 100644 index 0000000000..41a2021f09 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java @@ -0,0 +1,29 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.election; + +import java.util.Optional; + +public interface RoleTypeDataAdapter { + + boolean updateIfNodePresent(RoleTypeData stateData); + + Optional query(); +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java new file mode 100644 index 0000000000..8403b59502 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.election; + +public interface StateMachineCallback { + + void master(StateMachineContext context); + + void worker(StateMachineContext context); + + void candidate(StateMachineContext context); + + void unknown(StateMachineContext context); + + void abdication(StateMachineContext context); + + void error(StateMachineContext context, Throwable e); +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java new file mode 100644 index 0000000000..a3693f5fac --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java @@ -0,0 +1,37 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.election; + +public interface StateMachineContext { + + Integer epoch(); + + String node(); + + RoleElectionStateMachine stateMachine(); + + void epoch(Integer epoch); + + Config config(); + + RoleTypeDataAdapter adapter(); + + void reset(); +} diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java index 509cdf814a..d6dd4c350e 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java @@ -49,7 +49,8 @@ TaskCoreTest.class, AuthTest.class, MultiGraphsTest.class, - RamTableTest.class + RamTableTest.class, + RoleElectionStateMachineTest.class }) public class CoreTestSuite { diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java new file mode 100644 index 0000000000..6e8af972dc --- /dev/null +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java @@ -0,0 +1,318 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.core; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.LockSupport; + +import com.baidu.hugegraph.election.Config; +import com.baidu.hugegraph.election.RoleElectionStateMachine; +import com.baidu.hugegraph.election.RoleElectionStateMachineImpl; +import com.baidu.hugegraph.election.RoleTypeData; +import com.baidu.hugegraph.election.RoleTypeDataAdapter; +import com.baidu.hugegraph.election.StateMachineCallback; +import com.baidu.hugegraph.election.StateMachineContext; +import com.baidu.hugegraph.testutil.Assert; +import org.junit.Test; + +public class RoleElectionStateMachineTest { + + public static class LogEntry { + + Integer epoch; + + String node; + + Role role; + + enum Role { + master, + worker, + candidate, + abdication, + unknown + } + + public LogEntry(Integer epoch, String node, Role role) { + this.epoch = epoch; + this.node = node; + this.role = role; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof LogEntry)) { + return false; + } + LogEntry logEntry = (LogEntry) obj; + return Objects.equals(epoch, logEntry.epoch) && + Objects.equals(node, logEntry.node) && role == logEntry.role; + } + + @Override + public int hashCode() { + return Objects.hash(epoch, node, role); + } + + @Override + public String toString() { + return "LogEntry{" + + "epoch=" + epoch + + ", node='" + node + '\'' + + ", role=" + role + + '}'; + } + } + + private static class TestConfig implements Config { + + String node; + + public TestConfig(String node) { + this.node = node; + } + + @Override + public String node() { + return this.node; + } + + @Override + public int exceedsFailCount() { + return 2; + } + + @Override + public long randomTimeoutMillisecond() { + return 400; + } + + @Override + public long heartBeatIntervalSecond() { + return 1; + } + + @Override + public int exceedsWorkerCount() { + return 5; + } + + @Override + public long baseTimeoutMillisecond() { + return 100; + } + } + + @Test + public void testStateMachine() throws InterruptedException { + final CountDownLatch stop = new CountDownLatch(4); + final int MAX_COUNT = 200; + final List logRecords = Collections.synchronizedList(new ArrayList<>(MAX_COUNT)); + final List masterNodes = Collections.synchronizedList(new ArrayList<>(MAX_COUNT)); + final StateMachineCallback callback = new StateMachineCallback() { + + @Override + public void master(StateMachineContext context) { + Integer epochId = context.epoch(); + String node = context.node(); + logRecords.add(new LogEntry(epochId, node, LogEntry.Role.master)); + if (logRecords.size() > MAX_COUNT) { + context.stateMachine().shutdown(); + } + System.out.println("master node: " + node); + masterNodes.add(node); + } + + @Override + public void worker(StateMachineContext context) { + Integer epochId = context.epoch(); + String node = context.node(); + logRecords.add(new LogEntry(epochId, node, LogEntry.Role.worker)); + if (logRecords.size() > MAX_COUNT) { + context.stateMachine().shutdown(); + } + } + + @Override + public void candidate(StateMachineContext context) { + Integer epochId = context.epoch(); + String node = context.node(); + logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate)); + if (logRecords.size() > MAX_COUNT) { + context.stateMachine().shutdown(); + } + } + + @Override + public void unknown(StateMachineContext context) { + Integer epochId = context.epoch(); + String node = context.node(); + logRecords.add(new LogEntry(epochId, node, LogEntry.Role.unknown)); + if (logRecords.size() > MAX_COUNT) { + context.stateMachine().shutdown(); + } + } + + @Override + public void abdication(StateMachineContext context) { + Integer epochId = context.epoch(); + String node = context.node(); + logRecords.add(new LogEntry(epochId, node, LogEntry.Role.abdication)); + if (logRecords.size() > MAX_COUNT) { + context.stateMachine().shutdown(); + } + } + + @Override + public void error(StateMachineContext context, Throwable e) { + System.out.println("state machine error: node " + context.node() + " message " + e.getMessage()); + } + }; + + final List metaDataLogs = Collections.synchronizedList(new ArrayList<>(100)); + final RoleTypeDataAdapter adapter = new RoleTypeDataAdapter() { + + volatile int epoch = 0; + + final Map data = new ConcurrentHashMap<>(); + + RoleTypeData copy(RoleTypeData stateData) { + if (stateData == null) { + return null; + } + return new RoleTypeData(stateData.node(), stateData.epoch(), stateData.clock()); + } + + @Override + public boolean updateIfNodePresent(RoleTypeData stateData) { + if (stateData.epoch() < this.epoch) { + return false; + } + + RoleTypeData copy = this.copy(stateData); + RoleTypeData newData = data.compute(copy.epoch(), (key, value) -> { + if (copy.epoch() > this.epoch) { + this.epoch = copy.epoch(); + Assert.assertNull(value); + metaDataLogs.add(copy); + System.out.println("The node " + copy + " become new master:"); + return copy; + } + + Assert.assertEquals(value.epoch(), copy.epoch()); + if (Objects.equals(value.node(), copy.node()) && + value.clock() <= copy.clock()) { + System.out.println("The master node " + copy + " keep heartbeat"); + metaDataLogs.add(copy); + if (value.clock() == copy.clock()) { + Assert.fail("Clock must increase when same epoch and node id"); + } + return copy; + } + return value; + + }); + return Objects.equals(newData, copy); + } + + @Override + public Optional query() { + return Optional.ofNullable(this.copy(this.data.get(this.epoch))); + } + }; + + RoleElectionStateMachine[] machines = new RoleElectionStateMachine[4]; + Thread node1 = new Thread(() -> { + Config config = new TestConfig("1"); + RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter); + machines[1] = stateMachine; + stateMachine.apply(callback); + stop.countDown(); + }); + + Thread node2 = new Thread(() -> { + Config config = new TestConfig("2"); + RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter); + machines[2] = stateMachine; + stateMachine.apply(callback); + stop.countDown(); + }); + + Thread node3 = new Thread(() -> { + Config config = new TestConfig("3"); + RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter); + machines[3] = stateMachine; + stateMachine.apply(callback); + stop.countDown(); + }); + + node1.start(); + node2.start(); + node3.start(); + + Thread randomShutdown = new Thread(() -> { + Set dropNodes = new HashSet<>(); + while (dropNodes.size() < 3) { + LockSupport.parkNanos(5_000_000_000L); + int size = masterNodes.size(); + if (size < 1) { + continue; + } + String node = masterNodes.get(size - 1); + if (dropNodes.contains(node)) { + continue; + } + machines[Integer.parseInt(node)].shutdown(); + dropNodes.add(node); + System.out.println("----shutdown machine " + node); + } + stop.countDown(); + }); + + randomShutdown.start(); + stop.await(); + + Assert.assertGt(0, logRecords.size()); + Map masters = new HashMap<>(); + for (LogEntry entry: logRecords) { + if (entry.role == LogEntry.Role.master) { + String lastNode = masters.putIfAbsent(entry.epoch, entry.node); + if (lastNode != null) { + Assert.assertEquals(lastNode, entry.node); + } + } + } + + Assert.assertGt(0, masters.size()); + } +}