diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java index aa49ecc8a9..e85658f3a6 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java @@ -11,4 +11,6 @@ public interface Config { long heartBeatIntervalSecond(); int exceedsWorkerCount(); + + long baseTimeoutMillisecond(); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java index 845971f0ce..e42b571543 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java @@ -9,9 +9,13 @@ public class MetaData { int epoch; public MetaData(String node, int epoch) { + this(node, epoch, 1); + } + + public MetaData(String node, int epoch, long count) { this.node = node; this.epoch = epoch; - this.count = 1; + this.count = count; } public void increaseCount() { @@ -30,6 +34,14 @@ public long count() { return this.count; } + public void count(long count) { + this.count = count; + } + + public String node() { + return this.node; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -42,4 +54,13 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(node, count, epoch); } + + @Override + public String toString() { + return "MetaData{" + + "node='" + node + '\'' + + ", count=" + count + + ", epoch=" + epoch + + '}'; + } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java index 666c836a70..51933a93d7 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java @@ -5,5 +5,4 @@ public interface RoleElectionStateMachine { void shutdown(); void apply(StateMachineCallback stateMachineCallback); - } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java index 8952c321f1..d79c1c0d0b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java @@ -1,5 +1,6 @@ package com.baidu.hugegraph.election; +import java.security.SecureRandom; import java.util.Optional; import java.util.concurrent.locks.LockSupport; @@ -50,6 +51,8 @@ public void apply(StateMachineCallback stateMachineCallback) { private interface RoleState { + SecureRandom secureRandom = new SecureRandom(); + RoleState transform(StateMachineContext context); Callback callback(StateMachineCallback callback); @@ -61,7 +64,9 @@ static void heartBeatPark(StateMachineContext context) { static void randomPark(StateMachineContext context) { long randomTimeout = context.config().randomTimeoutMillisecond(); - LockSupport.parkNanos(randomTimeout * 1_000_000); + long baseTime = context.config().baseTimeoutMillisecond(); + long timeout = (long) (baseTime + (randomTimeout / 10.0 * secureRandom.nextInt(11))); + LockSupport.parkNanos(timeout * 1_000_000); } } @@ -91,6 +96,13 @@ public RoleState transform(StateMachineContext context) { } MetaData metaData = metaDataOpt.get(); + if (this.epoch != null && metaData.epoch() < this.epoch) { + context.reset(); + this.epoch = this.epoch == null ? 1 : this.epoch + 1; + context.epoch(this.epoch); + return new CandidateState(this.epoch); + } + context.epoch(metaData.epoch()); if (metaData.isMaster(context.node())) { return new MasterState(metaData); @@ -142,7 +154,7 @@ public RoleState transform(StateMachineContext context) { } context.reset(); context.epoch(this.metaData.epoch()); - return new UnKnownState(this.metaData.epoch()); + return new UnKnownState(this.metaData.epoch()).transform(context); } @Override diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java index 509cdf814a..d6dd4c350e 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java @@ -49,7 +49,8 @@ TaskCoreTest.class, AuthTest.class, MultiGraphsTest.class, - RamTableTest.class + RamTableTest.class, + RoleElectionStateMachineTest.class }) public class CoreTestSuite { diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java index 0956387b21..c7fed73685 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java @@ -1,16 +1,16 @@ package com.baidu.hugegraph.core; -import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.LockSupport; import com.baidu.hugegraph.election.Config; @@ -85,7 +85,7 @@ public String node() { @Override public int exceedsFailCount() { - return 10; + return 2; } @Override @@ -102,13 +102,19 @@ public long heartBeatIntervalSecond() { public int exceedsWorkerCount() { return 5; } + + @Override + public long baseTimeoutMillisecond() { + return 100; + } } @Test public void testStateMachine() throws InterruptedException { - final CountDownLatch stop = new CountDownLatch(3); - final int MAX_COUNT = 100; + final CountDownLatch stop = new CountDownLatch(4); + final int MAX_COUNT = 200; final List logRecords = Collections.synchronizedList(new ArrayList<>(MAX_COUNT)); + final List masterNodes = Collections.synchronizedList(new ArrayList<>(MAX_COUNT)); final StateMachineCallback callback = new StateMachineCallback() { @Override @@ -119,6 +125,8 @@ public void master(StateMachineContext context) { if (logRecords.size() > MAX_COUNT) { context.stateMachine().shutdown(); } + System.out.println("----master " + node); + masterNodes.add(node); } @Override @@ -163,42 +171,74 @@ public void safe(StateMachineContext context) { @Override public void error(StateMachineContext context, Throwable e) { - + System.out.println("----" + context.node() + " " + e.getMessage()); } }; + + final List metaDataLogs = Collections.synchronizedList(new ArrayList<>(100)); final MetaDataAdapter adapter = new MetaDataAdapter() { - int epoch = 0; - int count = 0; + volatile int epoch = 0; Map data = new ConcurrentHashMap<>(); + + MetaData copy(MetaData metaData) { + if (metaData == null) { + return null; + } + return new MetaData(metaData.node(), metaData.epoch(), metaData.count()); + } @Override public boolean postDelyIfPresent(MetaData metaData, long delySecond) { - this.count ++; - LockSupport.parkNanos(delySecond * 1_000_000_000); - if (count > 10) { - throw new RuntimeException("timeout"); + if (delySecond > 0) { + LockSupport.parkNanos(delySecond * 1_000_000_000); + } + if (metaData.epoch() < this.epoch) { + return false; } - MetaData oldData = data.computeIfAbsent(metaData.epoch(), (key) -> { - this.epoch = Math.max(key, epoch); - return metaData; + + MetaData copy = this.copy(metaData); + MetaData newData = data.compute(copy.epoch(), (key, value) -> { + if (copy.epoch() > this.epoch) { + this.epoch = copy.epoch(); + Assert.assertNull(value); + metaDataLogs.add(copy); + System.out.println("----1" + copy); + return copy; + } + + Assert.assertEquals(value.epoch(), copy.epoch()); + if (Objects.equals(value.node(), copy.node()) && + value.count() <= copy.count()) { + System.out.println("----2" + copy); + metaDataLogs.add(copy); + if (value.count() == copy.count()) { + Exception e = new Exception("eq"); + e.printStackTrace(); + } + return copy; + } + return value; + }); - return oldData == metaData; + return Objects.equals(newData, copy); } @Override public Optional queryDelay(long delySecond) { LockSupport.parkNanos(delySecond * 1_000_000_000); - return Optional.ofNullable(this.data.get(this.epoch)); + return Optional.ofNullable(this.copy(this.data.get(this.epoch))); } @Override public Optional query() { - return Optional.ofNullable(this.data.get(this.epoch)); + return Optional.ofNullable(this.copy(this.data.get(this.epoch))); } }; + RoleElectionStateMachine[] machines = new RoleElectionStateMachine[4]; Thread node1 = new Thread(() -> { Config config = new TestConfig("1"); RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter); + machines[1] = stateMachine; stateMachine.apply(callback); stop.countDown(); }); @@ -206,6 +246,7 @@ public Optional query() { Thread node2 = new Thread(() -> { Config config = new TestConfig("2"); RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter); + machines[2] = stateMachine; stateMachine.apply(callback); stop.countDown(); }); @@ -213,6 +254,7 @@ public Optional query() { Thread node3 = new Thread(() -> { Config config = new TestConfig("3"); RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter); + machines[3] = stateMachine; stateMachine.apply(callback); stop.countDown(); }); @@ -221,14 +263,36 @@ public Optional query() { node2.start(); node3.start(); + Thread randomShutdom = new Thread(() -> { + Set dropNodes = new HashSet<>(); + while (dropNodes.size() < 3) { + LockSupport.parkNanos(5_000_000_000L); + int size = masterNodes.size(); + if (size < 1) { + continue; + } + String node = masterNodes.get(size - 1); + if (dropNodes.contains(node)) { + continue; + } + machines[Integer.parseInt(node)].shutdown(); + dropNodes.add(node); + System.out.println("----shutdown machine " + node); + } + stop.countDown(); + }); + + randomShutdom.start(); stop.await(); - Assert.assertTrue(logRecords.size() > MAX_COUNT); + Assert.assertTrue(logRecords.size() > 0); Map masters = new HashMap<>(); for (LogEntry entry: logRecords) { if (entry.role == LogEntry.Role.master) { String lastNode = masters.putIfAbsent(entry.epoch, entry.node); - Assert.assertEquals(lastNode, entry.node); + if (lastNode != null) { + Assert.assertEquals(lastNode, entry.node); + } } }