Skip to content

Commit

Permalink
improve code
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxxoo committed Aug 16, 2022
1 parent a0a0878 commit d366667
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ public interface Config {
long heartBeatIntervalSecond();

int exceedsWorkerCount();

long baseTimeoutMillisecond();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ public class MetaData {
int epoch;

public MetaData(String node, int epoch) {
this(node, epoch, 1);
}

public MetaData(String node, int epoch, long count) {
this.node = node;
this.epoch = epoch;
this.count = 1;
this.count = count;
}

public void increaseCount() {
Expand All @@ -30,6 +34,14 @@ public long count() {
return this.count;
}

public void count(long count) {
this.count = count;
}

public String node() {
return this.node;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -42,4 +54,13 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(node, count, epoch);
}

@Override
public String toString() {
return "MetaData{" +
"node='" + node + '\'' +
", count=" + count +
", epoch=" + epoch +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@ public interface RoleElectionStateMachine {
void shutdown();

void apply(StateMachineCallback stateMachineCallback);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.baidu.hugegraph.election;

import java.security.SecureRandom;
import java.util.Optional;
import java.util.concurrent.locks.LockSupport;

Expand Down Expand Up @@ -50,6 +51,8 @@ public void apply(StateMachineCallback stateMachineCallback) {

private interface RoleState {

SecureRandom secureRandom = new SecureRandom();

RoleState transform(StateMachineContext context);

Callback callback(StateMachineCallback callback);
Expand All @@ -61,7 +64,9 @@ static void heartBeatPark(StateMachineContext context) {

static void randomPark(StateMachineContext context) {
long randomTimeout = context.config().randomTimeoutMillisecond();
LockSupport.parkNanos(randomTimeout * 1_000_000);
long baseTime = context.config().baseTimeoutMillisecond();
long timeout = (long) (baseTime + (randomTimeout / 10.0 * secureRandom.nextInt(11)));
LockSupport.parkNanos(timeout * 1_000_000);
}
}

Expand Down Expand Up @@ -91,6 +96,13 @@ public RoleState transform(StateMachineContext context) {
}

MetaData metaData = metaDataOpt.get();
if (this.epoch != null && metaData.epoch() < this.epoch) {
context.reset();
this.epoch = this.epoch == null ? 1 : this.epoch + 1;
context.epoch(this.epoch);
return new CandidateState(this.epoch);
}

context.epoch(metaData.epoch());
if (metaData.isMaster(context.node())) {
return new MasterState(metaData);
Expand Down Expand Up @@ -142,7 +154,7 @@ public RoleState transform(StateMachineContext context) {
}
context.reset();
context.epoch(this.metaData.epoch());
return new UnKnownState(this.metaData.epoch());
return new UnKnownState(this.metaData.epoch()).transform(context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
TaskCoreTest.class,
AuthTest.class,
MultiGraphsTest.class,
RamTableTest.class
RamTableTest.class,
RoleElectionStateMachineTest.class
})
public class CoreTestSuite {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package com.baidu.hugegraph.core;

import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;

import com.baidu.hugegraph.election.Config;
Expand Down Expand Up @@ -85,7 +85,7 @@ public String node() {

@Override
public int exceedsFailCount() {
return 10;
return 2;
}

@Override
Expand All @@ -102,13 +102,19 @@ public long heartBeatIntervalSecond() {
public int exceedsWorkerCount() {
return 5;
}

@Override
public long baseTimeoutMillisecond() {
return 100;
}
}

@Test
public void testStateMachine() throws InterruptedException {
final CountDownLatch stop = new CountDownLatch(3);
final int MAX_COUNT = 100;
final CountDownLatch stop = new CountDownLatch(4);
final int MAX_COUNT = 200;
final List<LogEntry> logRecords = Collections.synchronizedList(new ArrayList<>(MAX_COUNT));
final List<String> masterNodes = Collections.synchronizedList(new ArrayList<>(MAX_COUNT));
final StateMachineCallback callback = new StateMachineCallback() {

@Override
Expand All @@ -119,6 +125,8 @@ public void master(StateMachineContext context) {
if (logRecords.size() > MAX_COUNT) {
context.stateMachine().shutdown();
}
System.out.println("----master " + node);
masterNodes.add(node);
}

@Override
Expand Down Expand Up @@ -163,56 +171,90 @@ public void safe(StateMachineContext context) {

@Override
public void error(StateMachineContext context, Throwable e) {

System.out.println("----" + context.node() + " " + e.getMessage());
}
};

final List<MetaData> metaDataLogs = Collections.synchronizedList(new ArrayList<>(100));
final MetaDataAdapter adapter = new MetaDataAdapter() {
int epoch = 0;
int count = 0;
volatile int epoch = 0;
Map<Integer, MetaData> data = new ConcurrentHashMap<>();

MetaData copy(MetaData metaData) {
if (metaData == null) {
return null;
}
return new MetaData(metaData.node(), metaData.epoch(), metaData.count());
}
@Override
public boolean postDelyIfPresent(MetaData metaData, long delySecond) {
this.count ++;
LockSupport.parkNanos(delySecond * 1_000_000_000);
if (count > 10) {
throw new RuntimeException("timeout");
if (delySecond > 0) {
LockSupport.parkNanos(delySecond * 1_000_000_000);
}
if (metaData.epoch() < this.epoch) {
return false;
}
MetaData oldData = data.computeIfAbsent(metaData.epoch(), (key) -> {
this.epoch = Math.max(key, epoch);
return metaData;

MetaData copy = this.copy(metaData);
MetaData newData = data.compute(copy.epoch(), (key, value) -> {
if (copy.epoch() > this.epoch) {
this.epoch = copy.epoch();
Assert.assertNull(value);
metaDataLogs.add(copy);
System.out.println("----1" + copy);
return copy;
}

Assert.assertEquals(value.epoch(), copy.epoch());
if (Objects.equals(value.node(), copy.node()) &&
value.count() <= copy.count()) {
System.out.println("----2" + copy);
metaDataLogs.add(copy);
if (value.count() == copy.count()) {
Exception e = new Exception("eq");
e.printStackTrace();
}
return copy;
}
return value;

});
return oldData == metaData;
return Objects.equals(newData, copy);
}

@Override
public Optional<MetaData> queryDelay(long delySecond) {
LockSupport.parkNanos(delySecond * 1_000_000_000);
return Optional.ofNullable(this.data.get(this.epoch));
return Optional.ofNullable(this.copy(this.data.get(this.epoch)));
}

@Override
public Optional<MetaData> query() {
return Optional.ofNullable(this.data.get(this.epoch));
return Optional.ofNullable(this.copy(this.data.get(this.epoch)));
}
};

RoleElectionStateMachine[] machines = new RoleElectionStateMachine[4];
Thread node1 = new Thread(() -> {
Config config = new TestConfig("1");
RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
machines[1] = stateMachine;
stateMachine.apply(callback);
stop.countDown();
});

Thread node2 = new Thread(() -> {
Config config = new TestConfig("2");
RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
machines[2] = stateMachine;
stateMachine.apply(callback);
stop.countDown();
});

Thread node3 = new Thread(() -> {
Config config = new TestConfig("3");
RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
machines[3] = stateMachine;
stateMachine.apply(callback);
stop.countDown();
});
Expand All @@ -221,14 +263,36 @@ public Optional<MetaData> query() {
node2.start();
node3.start();

Thread randomShutdown = new Thread(() -> {
Set<String> dropNodes = new HashSet<>();
while (dropNodes.size() < 3) {
LockSupport.parkNanos(5_000_000_000L);
int size = masterNodes.size();
if (size < 1) {
continue;
}
String node = masterNodes.get(size - 1);
if (dropNodes.contains(node)) {
continue;
}
machines[Integer.parseInt(node)].shutdown();
dropNodes.add(node);
System.out.println("----shutdown machine " + node);
}
stop.countDown();
});

randomShutdown.start();
stop.await();

Assert.assertTrue(logRecords.size() > MAX_COUNT);
Assert.assertTrue(logRecords.size() > 0);
Map<Integer, String> masters = new HashMap<>();
for (LogEntry entry: logRecords) {
if (entry.role == LogEntry.Role.master) {
String lastNode = masters.putIfAbsent(entry.epoch, entry.node);
Assert.assertEquals(lastNode, entry.node);
if (lastNode != null) {
Assert.assertEquals(lastNode, entry.node);
}
}
}

Expand Down

0 comments on commit d366667

Please sign in to comment.