From 204d309e90b630928842239193af22f78c45c698 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Sun, 22 Sep 2024 17:20:35 +0300 Subject: [PATCH] Get rid of Address --- cluster-api/pom.xml | 8 +- .../java/io/scalecube/cluster/Cluster.java | 9 +- .../java/io/scalecube/cluster/Member.java | 11 +- .../cluster/membership/MembershipConfig.java | 9 +- cluster/pom.xml | 8 +- .../io/scalecube/cluster/ClusterImpl.java | 54 +-- .../fdetector/FailureDetectorImpl.java | 27 +- .../cluster/gossip/GossipProtocolImpl.java | 5 +- .../membership/MembershipProtocol.java | 5 +- .../membership/MembershipProtocolImpl.java | 54 +-- .../cluster/metadata/MetadataStoreImpl.java | 9 +- .../io/scalecube/cluster/ClusterTest.java | 102 +++--- .../fdetector/FailureDetectorTest.java | 313 ++++-------------- .../cluster/gossip/GossipDelayTest.java | 26 +- .../cluster/gossip/GossipProtocolTest.java | 15 +- .../cluster/gossip/GossipRequestTest.java | 3 +- .../membership/MembershipProtocolTest.java | 174 +++------- .../membership/MembershipRecordTest.java | 5 +- pom.xml | 13 +- 19 files changed, 290 insertions(+), 560 deletions(-) diff --git a/cluster-api/pom.xml b/cluster-api/pom.xml index c81d3a51..16ff3dcf 100644 --- a/cluster-api/pom.xml +++ b/cluster-api/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -12,10 +14,6 @@ ScaleCube/ClusterApi - - io.scalecube - scalecube-commons - ${project.groupId} scalecube-transport-api diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java b/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java index c0df446b..6c949224 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java @@ -1,7 +1,6 @@ package io.scalecube.cluster; import io.scalecube.cluster.transport.api.Message; -import io.scalecube.net.Address; import java.util.Collection; import java.util.Optional; import reactor.core.publisher.Mono; @@ -10,11 +9,11 @@ public interface Cluster { /** - * Returns {@link Address} of this cluster instance. + * Returns address of this cluster instance. * * @return cluster address */ - Address address(); + String address(); /** * Spreads given message between cluster members using gossiping protocol. @@ -52,7 +51,7 @@ public interface Cluster { * * @return member by id */ - Optional member(String id); + Optional memberById(String id); /** * Returns cluster member by given address or null if no member with such address exists at joined @@ -60,7 +59,7 @@ public interface Cluster { * * @return member by address */ - Optional member(Address address); + Optional memberByAddress(String address); /** * Returns list of all members of the joined cluster. This will include all cluster members diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Member.java b/cluster-api/src/main/java/io/scalecube/cluster/Member.java index 65dc7835..8963930f 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/Member.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/Member.java @@ -1,7 +1,6 @@ package io.scalecube.cluster; import io.scalecube.cluster.membership.MembershipConfig; -import io.scalecube.net.Address; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -20,7 +19,7 @@ public final class Member implements Externalizable { private String id; private String alias; - private Address address; + private String address; private String namespace; public Member() {} @@ -33,7 +32,7 @@ public Member() {} * @param address member address; not null * @param namespace namespace; not null */ - public Member(String id, String alias, Address address, String namespace) { + public Member(String id, String alias, String address, String namespace) { this.id = Objects.requireNonNull(id, "member id"); this.alias = alias; // optional this.address = Objects.requireNonNull(address, "member address"); @@ -76,7 +75,7 @@ public String namespace() { * @see io.scalecube.cluster.transport.api.TransportConfig#port(int) * @return member address */ - public Address address() { + public String address() { return address; } @@ -110,7 +109,7 @@ public void writeExternal(ObjectOutput out) throws IOException { out.writeUTF(alias); } // address - out.writeUTF(address.toString()); + out.writeUTF(address); // namespace out.writeUTF(namespace); } @@ -125,7 +124,7 @@ public void readExternal(ObjectInput in) throws IOException { alias = in.readUTF(); } // address - address = Address.from(in.readUTF()); + address = in.readUTF(); // namespace this.namespace = in.readUTF(); } diff --git a/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java b/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java index 01b8114c..630210e3 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java @@ -1,6 +1,5 @@ package io.scalecube.cluster.membership; -import io.scalecube.net.Address; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -24,7 +23,7 @@ public final class MembershipConfig implements Cloneable { public static final int DEFAULT_LOCAL_SUSPICION_MULT = 3; public static final int DEFAULT_LOCAL_SYNC_INTERVAL = 15_000; - private List
seedMembers = Collections.emptyList(); + private List seedMembers = Collections.emptyList(); private int syncInterval = DEFAULT_SYNC_INTERVAL; private int syncTimeout = DEFAULT_SYNC_TIMEOUT; private int suspicionMult = DEFAULT_SUSPICION_MULT; @@ -67,7 +66,7 @@ public static MembershipConfig defaultLocalConfig() { .syncInterval(DEFAULT_LOCAL_SYNC_INTERVAL); } - public List
seedMembers() { + public List seedMembers() { return seedMembers; } @@ -77,7 +76,7 @@ public List
seedMembers() { * @param seedMembers seed members * @return new {@code MembershipConfig} instance */ - public MembershipConfig seedMembers(Address... seedMembers) { + public MembershipConfig seedMembers(String... seedMembers) { return seedMembers(Arrays.asList(seedMembers)); } @@ -87,7 +86,7 @@ public MembershipConfig seedMembers(Address... seedMembers) { * @param seedMembers seed members * @return new {@code MembershipConfig} instance */ - public MembershipConfig seedMembers(List
seedMembers) { + public MembershipConfig seedMembers(List seedMembers) { MembershipConfig m = clone(); m.seedMembers = Collections.unmodifiableList(new ArrayList<>(seedMembers)); return m; diff --git a/cluster/pom.xml b/cluster/pom.xml index 1fd9ae60..8b9287ed 100644 --- a/cluster/pom.xml +++ b/cluster/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -12,10 +14,6 @@ ScaleCube/Cluster - - io.scalecube - scalecube-commons - ${project.groupId} scalecube-transport-api diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index ff8a67fb..27309586 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -1,6 +1,8 @@ package io.scalecube.cluster; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static io.scalecube.cluster.transport.api.Transport.parseHost; +import static io.scalecube.cluster.transport.api.Transport.parsePort; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.scalecube.cluster.fdetector.FailureDetectorConfig; import io.scalecube.cluster.fdetector.FailureDetectorImpl; @@ -16,9 +18,9 @@ import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.transport.api.TransportFactory; -import io.scalecube.net.Address; import java.io.Serializable; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Objects; @@ -111,14 +113,14 @@ private void initLifecycle() { start .asMono() .then(doStart()) - .doOnSuccess(avoid -> onStart.emitEmpty(RETRY_NON_SERIALIZED)) - .doOnError(th -> onStart.emitError(th, RETRY_NON_SERIALIZED)) + .doOnSuccess(avoid -> onStart.emitEmpty(busyLooping(Duration.ofSeconds(3)))) + .doOnError(th -> onStart.emitError(th, busyLooping(Duration.ofSeconds(3)))) .subscribe(null, th -> LOGGER.error("[{}][doStart] Exception occurred:", localMember, th)); shutdown .asMono() .then(doShutdown()) - .doFinally(s -> onShutdown.emitEmpty(RETRY_NON_SERIALIZED)) + .doFinally(s -> onShutdown.emitEmpty(busyLooping(Duration.ofSeconds(3)))) .subscribe( null, th -> @@ -224,7 +226,7 @@ public ClusterImpl handler(Function handler) { public Mono start() { return Mono.defer( () -> { - start.emitEmpty(RETRY_NON_SERIALIZED); + start.emitEmpty(busyLooping(Duration.ofSeconds(3))); return onStart.asMono().thenReturn(this); }); } @@ -241,10 +243,10 @@ private Mono doStart0() { return Transport.bind(config.transportConfig()) .flatMap( boundTransport -> { - localMember = createLocalMember(Address.from(boundTransport.address())); + localMember = createLocalMember(boundTransport.address()); transport = new SenderAwareTransport(boundTransport, localMember.address()); - scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true); + scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address(), true); failureDetector = new FailureDetectorImpl( @@ -283,9 +285,10 @@ private Mono doStart0() { /*.publishOn(scheduler)*/ // Dont uncomment, already beign executed inside scalecube-cluster thread .subscribe( - event -> membershipSink.emitNext(event, RETRY_NON_SERIALIZED), + event -> + membershipSink.emitNext(event, busyLooping(Duration.ofSeconds(3))), ex -> LOGGER.error("[{}][membership][error] cause:", localMember, ex), - () -> membershipSink.emitComplete(RETRY_NON_SERIALIZED))); + () -> membershipSink.emitComplete(busyLooping(Duration.ofSeconds(3))))); return Mono.fromRunnable(() -> failureDetector.start()) .then(Mono.fromRunnable(() -> gossip.start())) @@ -371,14 +374,13 @@ private Flux listenMembership() { * @param address transport address * @return local cluster member with cluster address and cluster member id */ - private Member createLocalMember(Address address) { - int port = Optional.ofNullable(config.externalPort()).orElse(address.port()); + private Member createLocalMember(String address) { + final String finalHost = + Optional.ofNullable(config.externalHost()).orElseGet(() -> parseHost(address)); + final int finalPort = + Optional.ofNullable(config.externalPort()).orElseGet(() -> parsePort(address)); - // calculate local member cluster address - Address memberAddress = - Optional.ofNullable(config.externalHost()) - .map(host -> Address.create(host, port)) - .orElseGet(() -> Address.create(address.host(), port)); + final String memberAddress = finalHost + ":" + finalPort; return new Member( config.memberId() != null ? config.memberId() : UUID.randomUUID().toString(), @@ -388,7 +390,7 @@ private Member createLocalMember(Address address) { } @Override - public Address address() { + public String address() { return member().address(); } @@ -431,13 +433,13 @@ public Member member() { } @Override - public Optional member(String id) { - return membership.member(id); + public Optional memberById(String id) { + return membership.memberById(id); } @Override - public Optional member(Address address) { - return membership.member(address); + public Optional memberByAddress(String address) { + return membership.memberByAddress(address); } @Override @@ -449,7 +451,7 @@ public Mono updateMetadata(T metadata) { @Override public void shutdown() { - shutdown.emitEmpty(RETRY_NON_SERIALIZED); + shutdown.emitEmpty(busyLooping(Duration.ofSeconds(3))); } private Mono doShutdown() { @@ -498,9 +500,9 @@ public Mono onShutdown() { private static class SenderAwareTransport implements Transport { private final Transport transport; - private final Address address; + private final String address; - private SenderAwareTransport(Transport transport, Address address) { + private SenderAwareTransport(Transport transport, String address) { this.transport = Objects.requireNonNull(transport); this.address = Objects.requireNonNull(address); } @@ -541,7 +543,7 @@ public Flux listen() { } private Message enhanceWithSender(Message message) { - return Message.with(message).sender(address.toString()).build(); + return Message.with(message).sender(address).build(); } } } diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index 40860a89..274f5a08 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -1,6 +1,6 @@ package io.scalecube.cluster.fdetector; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.scalecube.cluster.Member; import io.scalecube.cluster.fdetector.PingData.AckType; @@ -8,7 +8,6 @@ import io.scalecube.cluster.membership.MembershipEvent; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -117,7 +116,7 @@ public void stop() { actionsDisposables.dispose(); // Stop publishing events - sink.emitComplete(RETRY_NON_SERIALIZED); + sink.emitComplete(busyLooping(Duration.ofSeconds(3))); } @Override @@ -145,9 +144,9 @@ private void doPing() { Message pingMsg = Message.withData(pingData).qualifier(PING).correlationId(cid).build(); LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember); - Address address = pingMember.address(); + String address = pingMember.address(); transport - .requestResponse(address.toString(), pingMsg) + .requestResponse(address, pingMsg) .timeout(Duration.ofMillis(config.pingTimeout()), scheduler) .publishOn(scheduler) .subscribe( @@ -190,7 +189,7 @@ private void doPingReq( pingReqMembers.forEach( member -> transport - .requestResponse(member.address().toString(), pingReqMsg) + .requestResponse(member.address(), pingReqMsg) .timeout(timeout, scheduler) .publishOn(scheduler) .subscribe( @@ -232,7 +231,7 @@ private void onMessage(Message message) { /** Listens to PING message and answers with ACK. */ private void onPing(Message message) { long period = this.currentPeriod; - Address sender = Address.from(message.sender()); + String sender = message.sender(); LOGGER.debug("[{}][{}] Received Ping from {}", localMember, period, sender); PingData data = message.data(); data = data.withAckType(AckType.DEST_OK); @@ -249,10 +248,10 @@ private void onPing(Message message) { String correlationId = message.correlationId(); Message ackMessage = Message.withData(data).qualifier(PING_ACK).correlationId(correlationId).build(); - Address address = data.getFrom().address(); + String address = data.getFrom().address(); LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, address); transport - .send(address.toString(), ackMessage) + .send(address, ackMessage) .subscribe( null, ex -> @@ -275,10 +274,10 @@ private void onPingReq(Message message) { PingData pingReqData = new PingData(localMember, target, originalIssuer); Message pingMessage = Message.withData(pingReqData).qualifier(PING).correlationId(correlationId).build(); - Address address = target.address(); + String address = target.address(); LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, address); transport - .send(address.toString(), pingMessage) + .send(address, pingMessage) .subscribe( null, ex -> @@ -305,10 +304,10 @@ private void onTransitPingAck(Message message) { PingData originalAckData = new PingData(target, data.getTo()).withAckType(ackType); Message originalAckMessage = Message.withData(originalAckData).qualifier(PING_ACK).correlationId(correlationId).build(); - Address address = target.address(); + String address = target.address(); LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, address); transport - .send(address.toString(), originalAckMessage) + .send(address, originalAckMessage) .subscribe( null, ex -> @@ -378,7 +377,7 @@ private List selectPingReqMembers(Member pingMember) { private void publishPingResult(long period, Member member, MemberStatus status) { LOGGER.debug("[{}][{}] Member {} detected as {}", localMember, period, member, status); - sink.emitNext(new FailureDetectorEvent(member, status), RETRY_NON_SERIALIZED); + sink.emitNext(new FailureDetectorEvent(member, status), busyLooping(Duration.ofSeconds(3))); } private MemberStatus computeMemberStatus(Message message, long period) { diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java index 6e76bd5c..3657c06b 100644 --- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java @@ -7,7 +7,6 @@ import io.scalecube.cluster.membership.MembershipEvent; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -288,14 +287,14 @@ private void spreadGossipsTo(long period, Member member) { } // Send gossip request - Address address = member.address(); + String address = member.address(); gossips.stream() .map(this::buildGossipRequestMessage) .forEach( message -> transport - .send(address.toString(), message) + .send(address, message) .subscribe( null, ex -> diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java index 08907e21..d127a097 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java @@ -1,7 +1,6 @@ package io.scalecube.cluster.membership; import io.scalecube.cluster.Member; -import io.scalecube.net.Address; import java.util.Collection; import java.util.Optional; import reactor.core.publisher.Flux; @@ -53,7 +52,7 @@ public interface MembershipProtocol { * * @return member by id */ - Optional member(String id); + Optional memberById(String id); /** * Returns cluster member by given address or null if no member with such address exists at joined @@ -61,5 +60,5 @@ public interface MembershipProtocol { * * @return member by address */ - Optional member(Address address); + Optional memberByAddress(String address); } diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index ad3240d5..5c73af81 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -3,6 +3,7 @@ import static io.scalecube.cluster.membership.MemberStatus.ALIVE; import static io.scalecube.cluster.membership.MemberStatus.DEAD; import static io.scalecube.cluster.membership.MemberStatus.LEAVING; +import static io.scalecube.cluster.transport.api.Transport.parsePort; import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.scalecube.cluster.ClusterConfig; @@ -15,8 +16,8 @@ import io.scalecube.cluster.metadata.MetadataStore; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.file.Path; import java.nio.file.Paths; @@ -74,7 +75,7 @@ private enum MembershipUpdateReason { private final Transport transport; private final MembershipConfig membershipConfig; private final FailureDetectorConfig failureDetectorConfig; - private final List
seedMembers; + private final List seedMembers; private final FailureDetector failureDetector; private final GossipProtocol gossipProtocol; private final MetadataStore metadataStore; @@ -160,18 +161,18 @@ public MembershipProtocolImpl( } // Remove duplicates and local address(es) - private List
cleanUpSeedMembers(Collection
seedMembers) { - InetAddress localIpAddress = Address.getLocalIpAddress(); + private List cleanUpSeedMembers(Collection seedMembers) { + InetAddress localIpAddress = getLocalIpAddress(); String hostAddress = localIpAddress.getHostAddress(); String hostName = localIpAddress.getHostName(); - Address memberAddr = localMember.address(); - Address transportAddr = Address.from(transport.address()); - Address memberAddrByHostAddress = Address.create(hostAddress, memberAddr.port()); - Address transportAddrByHostAddress = Address.create(hostAddress, transportAddr.port()); - Address memberAddByHostName = Address.create(hostName, memberAddr.port()); - Address transportAddrByHostName = Address.create(hostName, transportAddr.port()); + String memberAddr = localMember.address(); + String transportAddr = transport.address(); + String memberAddrByHostAddress = hostAddress + ":" + parsePort(memberAddr); + String transportAddrByHostAddress = hostAddress + ":" + parsePort(transportAddr); + String memberAddByHostName = hostName + ":" + parsePort(memberAddr); + String transportAddrByHostName = hostName + ":" + parsePort(transportAddr); return new LinkedHashSet<>(seedMembers) .stream() @@ -184,7 +185,15 @@ private List
cleanUpSeedMembers(Collection
seedMembers) { .collect(Collectors.toList()); } - private boolean checkAddressesNotEqual(Address address0, Address address1) { + private static InetAddress getLocalIpAddress() { + try { + return InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + private boolean checkAddressesNotEqual(String address0, String address1) { if (!address0.equals(address1)) { return true; } else { @@ -256,8 +265,7 @@ private void start0(MonoSink sink) { address -> transport .requestResponse( - address.toString(), - prepareSyncDataMsg(SYNC, UUID.randomUUID().toString())) + address, prepareSyncDataMsg(SYNC, UUID.randomUUID().toString())) .doOnError( ex -> LOGGER.warn( @@ -321,18 +329,18 @@ public Member member() { } @Override - public Optional member(String id) { + public Optional memberById(String id) { return Optional.ofNullable(members.get(id)); } @Override - public Optional member(Address address) { + public Optional memberByAddress(String address) { return new ArrayList<>(members.values()) .stream().filter(member -> member.address().equals(address)).findFirst(); } private void doSync() { - Address address = selectSyncAddress().orElse(null); + String address = selectSyncAddress().orElse(null); if (address == null) { return; } @@ -340,7 +348,7 @@ private void doSync() { Message message = prepareSyncDataMsg(SYNC, null); LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, address); transport - .send(address.toString(), message) + .send(address, message) .subscribe( null, ex -> @@ -391,14 +399,14 @@ private Mono onSyncAck(Message syncAckMsg, boolean onStart) { private Mono onSync(Message syncMsg) { return Mono.defer( () -> { - final Address sender = Address.from(syncMsg.sender()); + final String sender = syncMsg.sender(); LOGGER.debug("[{}] Received Sync from {}", localMember, sender); return syncMembership(syncMsg.data(), false) .doOnSuccess( avoid -> { Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId()); transport - .send(sender.toString(), message) + .send(sender, message) .subscribe( null, ex -> @@ -426,9 +434,9 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { // Alive won't override SUSPECT so issue instead extra sync with member to force it spread // alive with inc + 1 Message syncMsg = prepareSyncDataMsg(SYNC, null); - Address address = fdEvent.member().address(); + String address = fdEvent.member().address(); transport - .send(address.toString(), syncMsg) + .send(address, syncMsg) .subscribe( null, ex -> @@ -465,8 +473,8 @@ private void onMembershipGossip(Message message) { } } - private Optional
selectSyncAddress() { - List
addresses = + private Optional selectSyncAddress() { + List addresses = Stream.concat(seedMembers.stream(), otherMembers().stream().map(Member::address)) .collect(Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new)); Collections.shuffle(addresses); diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java index e7d0ad9a..f86680cf 100644 --- a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java @@ -4,7 +4,6 @@ import io.scalecube.cluster.Member; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import java.nio.ByteBuffer; import java.time.Duration; import java.util.HashMap; @@ -148,7 +147,7 @@ public Mono fetchMetadata(Member member) { return Mono.defer( () -> { final String cid = UUID.randomUUID().toString(); - final Address targetAddress = member.address(); + final String targetAddress = member.address(); LOGGER.debug("[{}][{}] Getting metadata for member {}", localMember, cid, member); @@ -160,7 +159,7 @@ public Mono fetchMetadata(Member member) { .build(); return transport - .requestResponse(targetAddress.toString(), request) + .requestResponse(targetAddress, request) .timeout(Duration.ofMillis(config.metadataTimeout()), scheduler) .publishOn(scheduler) .doOnSuccess( @@ -196,7 +195,7 @@ private void onMessage(Message message) { } private void onMetadataRequest(Message message) { - final Address sender = Address.from(message.sender()); + final String sender = message.sender(); LOGGER.debug("[{}] Received GetMetadataReq from {}", localMember, sender); GetMetadataRequest reqData = message.data(); @@ -225,7 +224,7 @@ private void onMetadataRequest(Message message) { LOGGER.debug("[{}] Send GetMetadataResp to {}", localMember, sender); transport - .send(sender.toString(), response) + .send(sender, response) .subscribe( null, ex -> diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java index dd104ade..f8ac8eda 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java @@ -1,5 +1,6 @@ package io.scalecube.cluster; +import static io.scalecube.cluster.transport.api.Transport.parsePort; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -8,9 +9,9 @@ import io.scalecube.cluster.membership.MembershipEvent; import io.scalecube.cluster.membership.MembershipEvent.Type; import io.scalecube.cluster.metadata.MetadataCodec; -import io.scalecube.net.Address; import io.scalecube.transport.netty.tcp.TcpTransportFactory; import java.net.InetAddress; +import java.net.UnknownHostException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -37,7 +38,7 @@ public class ClusterTest extends BaseTest { @Test public void testStartStopRepeatedly() throws Exception { - Address address = Address.from("localhost:4848"); + String address = "localhost:4848"; // Start seed node Cluster seedNode = @@ -45,7 +46,7 @@ public void testStartStopRepeatedly() throws Exception { .gossip(opts -> opts.gossipInterval(100)) .failureDetector(opts -> opts.pingInterval(100)) .membership(opts -> opts.syncInterval(100)) - .transport(opts -> opts.port(address.port())) + .transport(opts -> opts.port(parsePort(address))) .transport(opts -> opts.connectTimeout(CONNECT_TIMEOUT)) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -72,7 +73,7 @@ public void testStartStopRepeatedly() throws Exception { .gossip(opts -> opts.gossipInterval(100)) .failureDetector(opts -> opts.pingInterval(100)) .membership(opts -> opts.syncInterval(100)) - .transport(opts -> opts.port(address.port())) + .transport(opts -> opts.port(parsePort(address))) .transport(opts -> opts.connectTimeout(CONNECT_TIMEOUT)) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -101,8 +102,8 @@ public void testMembersAccessFromScheduler() { // Members by address - Optional otherNodeOnSeedNode = seedNode.member(otherNode.address()); - Optional seedNodeOnOtherNode = otherNode.member(seedNode.address()); + Optional otherNodeOnSeedNode = seedNode.memberByAddress(otherNode.address()); + Optional seedNodeOnOtherNode = otherNode.memberByAddress(seedNode.address()); assertEquals(otherNode.member(), otherNodeOnSeedNode.orElse(null)); assertEquals(seedNode.member(), seedNodeOnOtherNode.orElse(null)); @@ -112,15 +113,11 @@ public void testMembersAccessFromScheduler() { @Test public void testJoinLocalhostIgnored() throws InterruptedException { - InetAddress localIpAddress = Address.getLocalIpAddress(); - Address localAddressByHostname = Address.create(localIpAddress.getHostName(), 4801); - Address localAddressByIp = Address.create(localIpAddress.getHostAddress(), 4801); - Address[] addresses = { - Address.from("localhost:4801"), - Address.from("127.0.0.1:4801"), - Address.from("127.0.1.1:4801"), - localAddressByHostname, - localAddressByIp + InetAddress localIpAddress = getLocalIpAddress(); + String localAddressByHostname = localIpAddress.getHostName() + ":" + 4801; + String localAddressByIp = localIpAddress.getHostAddress() + ":" + 4801; + String[] addresses = { + "localhost:4801", "127.0.0.1:4801", "127.0.1.1:4801", localAddressByHostname, localAddressByIp }; // Start seed node @@ -141,15 +138,11 @@ public void testJoinLocalhostIgnored() throws InterruptedException { @Test public void testJoinLocalhostIgnoredWithOverride() throws InterruptedException { - InetAddress localIpAddress = Address.getLocalIpAddress(); - Address localAddressByHostname = Address.create(localIpAddress.getHostName(), 7878); - Address localAddressByIp = Address.create(localIpAddress.getHostAddress(), 7878); - Address[] addresses = { - Address.from("localhost:7878"), - Address.from("127.0.0.1:7878"), - Address.from("127.0.1.1:7878"), - localAddressByHostname, - localAddressByIp + InetAddress localIpAddress = getLocalIpAddress(); + String localAddressByHostname = localIpAddress.getHostName() + ":" + 7878; + String localAddressByIp = localIpAddress.getHostAddress() + ":" + 7878; + String[] addresses = { + "localhost:7878", "127.0.0.1:7878", "127.0.1.1:7878", localAddressByHostname, localAddressByIp }; // Start seed node @@ -242,7 +235,7 @@ public void onMembershipEvent(MembershipEvent event) { // Check all test members know valid metadata for (Cluster node : otherNodes) { - Optional memberOptional = node.member(metadataNode.member().id()); + Optional memberOptional = node.memberById(metadataNode.member().id()); assertTrue(memberOptional.isPresent()); Member member = memberOptional.get(); assertEquals(metadata, node.metadata(member).orElse(null)); @@ -254,7 +247,7 @@ public void onMembershipEvent(MembershipEvent event) { // Check all nodes had updated metadata member for (Cluster node : otherNodes) { - Optional memberOptional = node.member(metadataNode.member().id()); + Optional memberOptional = node.memberById(metadataNode.member().id()); assertTrue(memberOptional.isPresent()); Member member = memberOptional.get(); assertEquals(updatedMetadata, node.metadata(member).orElse(null)); @@ -315,7 +308,7 @@ public void onMembershipEvent(MembershipEvent event) { // Check all test members know valid metadata for (Cluster node : otherNodes) { - Optional memberOptional = node.member(metadataNode.member().id()); + Optional memberOptional = node.memberById(metadataNode.member().id()); assertTrue(memberOptional.isPresent()); Member member = memberOptional.get(); assertEquals(metadata, node.metadata(member).orElse(null)); @@ -328,7 +321,7 @@ public void onMembershipEvent(MembershipEvent event) { // Check all nodes had updated metadata member for (Cluster node : otherNodes) { - Optional memberOptional = node.member(metadataNode.member().id()); + Optional memberOptional = node.memberById(metadataNode.member().id()); assertTrue(memberOptional.isPresent()); Member member = memberOptional.get(); //noinspection unchecked,OptionalGetWithoutIsPresent @@ -393,7 +386,7 @@ public void onMembershipEvent(MembershipEvent event) { // Check all test members know valid metadata for (Cluster node : otherNodes) { - Optional memberOptional = node.member(metadataNode.member().id()); + Optional memberOptional = node.memberById(metadataNode.member().id()); assertTrue(memberOptional.isPresent()); Member member = memberOptional.get(); assertEquals(metadata, node.metadata(member).orElse(null)); @@ -407,7 +400,7 @@ public void onMembershipEvent(MembershipEvent event) { // Check all nodes had updated metadata member for (Cluster node : otherNodes) { - Optional memberOptional = node.member(metadataNode.member().id()); + Optional memberOptional = node.memberById(metadataNode.member().id()); assertTrue(memberOptional.isPresent()); Member member = memberOptional.get(); //noinspection unchecked,OptionalGetWithoutIsPresent @@ -559,9 +552,9 @@ public void testJoinSeedClusterWithNoExistingSeedMember() { // Start seed node Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait(); - Address nonExistingSeed1 = Address.from("localhost:1234"); - Address nonExistingSeed2 = Address.from("localhost:5678"); - Address[] seeds = new Address[] {nonExistingSeed1, nonExistingSeed2, seedNode.address()}; + String nonExistingSeed1 = "localhost:1234"; + String nonExistingSeed2 = "localhost:5678"; + String[] seeds = new String[] {nonExistingSeed1, nonExistingSeed2, seedNode.address()}; Cluster otherNode = new ClusterImpl() @@ -575,29 +568,15 @@ public void testJoinSeedClusterWithNoExistingSeedMember() { shutdown(Arrays.asList(seedNode, otherNode)); } - private void shutdown(List nodes) { - try { - Mono.whenDelayError( - nodes.stream() - .peek(Cluster::shutdown) - .map(Cluster::onShutdown) - .collect(Collectors.toList())) - .block(TIMEOUT); - } catch (Exception ex) { - LOGGER.error("Exception on cluster shutdown", ex); - } - } - @Test public void testExplicitLocalMemberId() { - ClusterConfig config = ClusterConfig.defaultConfig() - .memberId("test-member"); + ClusterConfig config = ClusterConfig.defaultConfig().memberId("test-member"); ClusterImpl cluster = null; try { - cluster = (ClusterImpl) new ClusterImpl(config) - .transportFactory(TcpTransportFactory::new) - .startAwait(); + cluster = + (ClusterImpl) + new ClusterImpl(config).transportFactory(TcpTransportFactory::new).startAwait(); assertEquals("test-member", cluster.member().id()); } finally { @@ -606,4 +585,25 @@ public void testExplicitLocalMemberId() { } } } + + private static InetAddress getLocalIpAddress() { + try { + return InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + private void shutdown(List nodes) { + try { + Mono.whenDelayError( + nodes.stream() + .peek(Cluster::shutdown) + .map(Cluster::onShutdown) + .collect(Collectors.toList())) + .block(TIMEOUT); + } catch (Exception ex) { + LOGGER.error("Exception on cluster shutdown", ex); + } + } } diff --git a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java index 0ca750b1..bcb89765 100644 --- a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java @@ -2,6 +2,7 @@ import static io.scalecube.cluster.membership.MemberStatus.ALIVE; import static io.scalecube.cluster.membership.MemberStatus.SUSPECT; +import static io.scalecube.cluster.transport.api.Transport.parsePort; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -12,7 +13,6 @@ import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -53,9 +53,7 @@ public void testTrusted() { Transport a = createTransport(); Transport b = createTransport(); Transport c = createTransport(); - List
members = - Arrays.asList( - Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); + List members = Arrays.asList(a.address(), b.address(), c.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -70,24 +68,9 @@ public void testTrusted() { Future> listB = listenNextEventFor(fdB, members); Future> listC = listenNextEventFor(fdC, members); - assertStatus( - Address.from(a.address()), - ALIVE, - awaitEvents(listA), - Address.from(b.address()), - Address.from(c.address())); - assertStatus( - Address.from(b.address()), - ALIVE, - awaitEvents(listB), - Address.from(a.address()), - Address.from(c.address())); - assertStatus( - Address.from(c.address()), - ALIVE, - awaitEvents(listC), - Address.from(a.address()), - Address.from(b.address())); + assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), c.address()); + assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), c.address()); + assertStatus(c.address(), ALIVE, awaitEvents(listC), a.address(), b.address()); } finally { stop(fdetectors); } @@ -99,9 +82,7 @@ public void testSuspected() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
members = - Arrays.asList( - Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); + List members = Arrays.asList(a.address(), b.address(), c.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -110,12 +91,9 @@ public void testSuspected() { List fdetectors = Arrays.asList(fdA, fdB, fdC); // block all traffic - a.networkEmulator() - .blockOutbound(members.stream().map(Address::toString).collect(Collectors.toList())); - b.networkEmulator() - .blockOutbound(members.stream().map(Address::toString).collect(Collectors.toList())); - c.networkEmulator() - .blockOutbound(members.stream().map(Address::toString).collect(Collectors.toList())); + a.networkEmulator().blockOutbound(members); + b.networkEmulator().blockOutbound(members); + c.networkEmulator().blockOutbound(members); try { start(fdetectors); @@ -124,24 +102,9 @@ public void testSuspected() { Future> listB = listenNextEventFor(fdB, members); Future> listC = listenNextEventFor(fdC, members); - assertStatus( - Address.from(a.address()), - SUSPECT, - awaitEvents(listA), - Address.from(b.address()), - Address.from(c.address())); - assertStatus( - Address.from(b.address()), - SUSPECT, - awaitEvents(listB), - Address.from(a.address()), - Address.from(c.address())); - assertStatus( - Address.from(c.address()), - SUSPECT, - awaitEvents(listC), - Address.from(a.address()), - Address.from(b.address())); + assertStatus(a.address(), SUSPECT, awaitEvents(listA), b.address(), c.address()); + assertStatus(b.address(), SUSPECT, awaitEvents(listB), a.address(), c.address()); + assertStatus(c.address(), SUSPECT, awaitEvents(listC), a.address(), b.address()); } finally { a.networkEmulator().unblockAllOutbound(); b.networkEmulator().unblockAllOutbound(); @@ -156,9 +119,7 @@ public void testTrustedDespiteBadNetwork() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
members = - Arrays.asList( - Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); + List members = Arrays.asList(a.address(), b.address(), c.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -176,24 +137,9 @@ public void testTrustedDespiteBadNetwork() { try { start(fdetectors); - assertStatus( - Address.from(a.address()), - ALIVE, - awaitEvents(listA), - Address.from(b.address()), - Address.from(c.address())); - assertStatus( - Address.from(b.address()), - ALIVE, - awaitEvents(listB), - Address.from(a.address()), - Address.from(c.address())); - assertStatus( - Address.from(c.address()), - ALIVE, - awaitEvents(listC), - Address.from(a.address()), - Address.from(b.address())); + assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), c.address()); + assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), c.address()); + assertStatus(c.address(), ALIVE, awaitEvents(listC), a.address(), b.address()); } finally { stop(fdetectors); } @@ -205,9 +151,7 @@ public void testTrustedDespiteDifferentPingTimings() { Transport a = createTransport(); Transport b = createTransport(); Transport c = createTransport(); - List
members = - Arrays.asList( - Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); + List members = Arrays.asList(a.address(), b.address(), c.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -224,24 +168,9 @@ public void testTrustedDespiteDifferentPingTimings() { Future> listB = listenNextEventFor(fdB, members); Future> listC = listenNextEventFor(fdC, members); - assertStatus( - Address.from(a.address()), - ALIVE, - awaitEvents(listA), - Address.from(b.address()), - Address.from(c.address())); - assertStatus( - Address.from(b.address()), - ALIVE, - awaitEvents(listB), - Address.from(a.address()), - Address.from(c.address())); - assertStatus( - Address.from(c.address()), - ALIVE, - awaitEvents(listC), - Address.from(a.address()), - Address.from(b.address())); + assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), c.address()); + assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), c.address()); + assertStatus(c.address(), ALIVE, awaitEvents(listC), a.address(), b.address()); } finally { stop(fdetectors); } @@ -254,12 +183,7 @@ public void testSuspectedMemberWithBadNetworkGetsPartitioned() throws Exception NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
members = - Arrays.asList( - Address.from(a.address()), - Address.from(b.address()), - Address.from(c.address()), - Address.from(d.address())); + List members = Arrays.asList(a.address(), b.address(), c.address(), d.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -269,8 +193,7 @@ public void testSuspectedMemberWithBadNetworkGetsPartitioned() throws Exception List fdetectors = Arrays.asList(fdA, fdB, fdC, fdD); // Block traffic on member A to all cluster members - a.networkEmulator() - .blockOutbound(members.stream().map(Address::toString).collect(Collectors.toList())); + a.networkEmulator().blockOutbound(members); try { final Future> listA = listenNextEventFor(fdA, members); @@ -281,19 +204,16 @@ public void testSuspectedMemberWithBadNetworkGetsPartitioned() throws Exception start(fdetectors); assertStatus( - Address.from(a.address()), + a.address(), SUSPECT, awaitEvents(listA), - Address.from(b.address()), - Address.from(c.address()), - Address.from(d.address())); // node A + b.address(), + c.address(), + d.address()); // node A // partitioned - assertStatus( - Address.from(b.address()), SUSPECT, awaitEvents(listB), Address.from(a.address())); - assertStatus( - Address.from(c.address()), SUSPECT, awaitEvents(listC), Address.from(a.address())); - assertStatus( - Address.from(d.address()), SUSPECT, awaitEvents(listD), Address.from(a.address())); + assertStatus(b.address(), SUSPECT, awaitEvents(listB), a.address()); + assertStatus(c.address(), SUSPECT, awaitEvents(listC), a.address()); + assertStatus(d.address(), SUSPECT, awaitEvents(listD), a.address()); // Unblock traffic on member A a.networkEmulator().unblockAllOutbound(); @@ -306,34 +226,10 @@ public void testSuspectedMemberWithBadNetworkGetsPartitioned() throws Exception // Check member A recovers - assertStatus( - Address.from(a.address()), - ALIVE, - awaitEvents(listA0), - Address.from(b.address()), - Address.from(c.address()), - Address.from(d.address())); - assertStatus( - Address.from(b.address()), - ALIVE, - awaitEvents(listB0), - Address.from(a.address()), - Address.from(c.address()), - Address.from(d.address())); - assertStatus( - Address.from(c.address()), - ALIVE, - awaitEvents(listC0), - Address.from(a.address()), - Address.from(b.address()), - Address.from(d.address())); - assertStatus( - Address.from(d.address()), - ALIVE, - awaitEvents(listD0), - Address.from(a.address()), - Address.from(b.address()), - Address.from(c.address())); + assertStatus(a.address(), ALIVE, awaitEvents(listA0), b.address(), c.address(), d.address()); + assertStatus(b.address(), ALIVE, awaitEvents(listB0), a.address(), c.address(), d.address()); + assertStatus(c.address(), ALIVE, awaitEvents(listC0), a.address(), b.address(), d.address()); + assertStatus(d.address(), ALIVE, awaitEvents(listD0), a.address(), b.address(), c.address()); } finally { stop(fdetectors); } @@ -346,12 +242,7 @@ public void testSuspectedMemberWithNormalNetworkGetsPartitioned() throws Excepti NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
members = - Arrays.asList( - Address.from(a.address()), - Address.from(b.address()), - Address.from(c.address()), - Address.from(d.address())); + List members = Arrays.asList(a.address(), b.address(), c.address(), d.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -373,19 +264,16 @@ public void testSuspectedMemberWithNormalNetworkGetsPartitioned() throws Excepti start(fdetectors); + assertStatus(a.address(), SUSPECT, awaitEvents(listA), d.address()); + assertStatus(b.address(), SUSPECT, awaitEvents(listB), d.address()); + assertStatus(c.address(), SUSPECT, awaitEvents(listC), d.address()); assertStatus( - Address.from(a.address()), SUSPECT, awaitEvents(listA), Address.from(d.address())); - assertStatus( - Address.from(b.address()), SUSPECT, awaitEvents(listB), Address.from(d.address())); - assertStatus( - Address.from(c.address()), SUSPECT, awaitEvents(listC), Address.from(d.address())); - assertStatus( - Address.from(d.address()), + d.address(), SUSPECT, awaitEvents(listD), - Address.from(a.address()), - Address.from(b.address()), - Address.from(c.address())); // node D + a.address(), + b.address(), + c.address()); // node D // partitioned // Unblock traffic to member D on other members @@ -401,34 +289,10 @@ public void testSuspectedMemberWithNormalNetworkGetsPartitioned() throws Excepti // Check member D recovers - assertStatus( - Address.from(a.address()), - ALIVE, - awaitEvents(listA0), - Address.from(b.address()), - Address.from(c.address()), - Address.from(d.address())); - assertStatus( - Address.from(b.address()), - ALIVE, - awaitEvents(listB0), - Address.from(a.address()), - Address.from(c.address()), - Address.from(d.address())); - assertStatus( - Address.from(c.address()), - ALIVE, - awaitEvents(listC0), - Address.from(a.address()), - Address.from(b.address()), - Address.from(d.address())); - assertStatus( - Address.from(d.address()), - ALIVE, - awaitEvents(listD0), - Address.from(a.address()), - Address.from(b.address()), - Address.from(c.address())); + assertStatus(a.address(), ALIVE, awaitEvents(listA0), b.address(), c.address(), d.address()); + assertStatus(b.address(), ALIVE, awaitEvents(listB0), a.address(), c.address(), d.address()); + assertStatus(c.address(), ALIVE, awaitEvents(listC0), a.address(), b.address(), d.address()); + assertStatus(d.address(), ALIVE, awaitEvents(listD0), a.address(), b.address(), c.address()); } finally { stop(fdetectors); } @@ -439,7 +303,7 @@ public void testMemberStatusChangeAfterNetworkRecovery() throws Exception { // Create transports NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); - List
members = Arrays.asList(Address.from(a.address()), Address.from(b.address())); + List members = Arrays.asList(a.address(), b.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -456,10 +320,8 @@ public void testMemberStatusChangeAfterNetworkRecovery() throws Exception { try { start(fdetectors); - assertStatus( - Address.from(a.address()), SUSPECT, awaitEvents(listA), Address.from(b.address())); - assertStatus( - Address.from(b.address()), SUSPECT, awaitEvents(listB), Address.from(a.address())); + assertStatus(a.address(), SUSPECT, awaitEvents(listA), b.address()); + assertStatus(b.address(), SUSPECT, awaitEvents(listB), a.address()); // Unblock A and B members: A-->B, B-->A a.networkEmulator().unblockAllOutbound(); @@ -471,8 +333,8 @@ public void testMemberStatusChangeAfterNetworkRecovery() throws Exception { listA = listenNextEventFor(fdA, members); listB = listenNextEventFor(fdB, members); - assertStatus(Address.from(a.address()), ALIVE, awaitEvents(listA), Address.from(b.address())); - assertStatus(Address.from(b.address()), ALIVE, awaitEvents(listB), Address.from(a.address())); + assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address()); + assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address()); } finally { stop(fdetectors); } @@ -484,9 +346,7 @@ public void testStatusChangeAfterMemberRestart() throws Exception { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport x = createTransport(); - List
members = - Arrays.asList( - Address.from(a.address()), Address.from(b.address()), Address.from(x.address())); + List members = Arrays.asList(a.address(), b.address(), x.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -505,31 +365,16 @@ public void testStatusChangeAfterMemberRestart() throws Exception { try { start(fdetectors); - assertStatus( - Address.from(a.address()), - ALIVE, - awaitEvents(listA), - Address.from(b.address()), - Address.from(x.address())); - assertStatus( - Address.from(b.address()), - ALIVE, - awaitEvents(listB), - Address.from(a.address()), - Address.from(x.address())); - assertStatus( - Address.from(x.address()), - ALIVE, - awaitEvents(listX), - Address.from(a.address()), - Address.from(b.address())); + assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), x.address()); + assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), x.address()); + assertStatus(x.address(), ALIVE, awaitEvents(listX), a.address(), b.address()); // stop node X stop(Collections.singletonList(fdX)); TimeUnit.SECONDS.sleep(2); // restart node X as XX - xx = createTransport(new TransportConfig().port(Address.from(x.address()).port())); + xx = createTransport(new TransportConfig().port(parsePort(x.address()))); assertEquals(x.address(), xx.address()); fdetectors = Arrays.asList(fdA, fdB, fdXx = createFd(xx, members)); @@ -544,30 +389,15 @@ public void testStatusChangeAfterMemberRestart() throws Exception { // TODO [AK]: It would be more correct to consider restarted member as a new member, so x is // still suspected! - assertStatus( - Address.from(a.address()), - ALIVE, - awaitEvents(listA), - Address.from(b.address()), - Address.from(xx.address())); - assertStatus( - Address.from(b.address()), - ALIVE, - awaitEvents(listB), - Address.from(a.address()), - Address.from(xx.address())); - assertStatus( - Address.from(xx.address()), - ALIVE, - awaitEvents(listXx), - Address.from(a.address()), - Address.from(b.address())); + assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), xx.address()); + assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), xx.address()); + assertStatus(xx.address(), ALIVE, awaitEvents(listXx), a.address(), b.address()); } finally { stop(fdetectors); } } - private FailureDetectorImpl createFd(Transport transport, List
members) { + private FailureDetectorImpl createFd(Transport transport, List members) { FailureDetectorConfig failureDetectorConfig = FailureDetectorConfig.defaultLocalConfig() // faster config for local testing .pingTimeout(100) @@ -577,19 +407,16 @@ private FailureDetectorImpl createFd(Transport transport, List
members) } private FailureDetectorImpl createFd( - Transport transport, List
addresses, FailureDetectorConfig config) { + Transport transport, List addresses, FailureDetectorConfig config) { Member localMember = new Member( - "member-" + Address.from(transport.address()).port(), - null, - Address.from(transport.address()), - NAMESPACE); + "member-" + parsePort(transport.address()), null, transport.address(), NAMESPACE); Flux membershipFlux = Flux.fromIterable(addresses) - .filter(address -> !transport.address().equals(address.toString())) - .map(address -> new Member("member-" + address.port(), null, address, NAMESPACE)) + .filter(address -> !transport.address().equals(address)) + .map(address -> new Member("member-" + parsePort(address), null, address, NAMESPACE)) .map(member -> MembershipEvent.createAdded(member, null, 0)); return new FailureDetectorImpl(localMember, transport, membershipFlux, config, scheduler); @@ -611,11 +438,11 @@ private static void stop(List fdetectors) { } private static void assertStatus( - Address address, + String address, MemberStatus status, Collection events, - Address... expected) { - List
actual = + String... expected) { + List actual = events.stream() .filter(event -> event.status() == status) .map(FailureDetectorEvent::member) @@ -628,7 +455,7 @@ private static void assertStatus( address, expected.length, status, Arrays.toString(expected), events); assertEquals(expected.length, actual.size(), msg1); - for (Address member : expected) { + for (String member : expected) { String msg2 = String.format("Node %s expected as %s %s, but was: %s", address, status, member, events); assertTrue(actual.contains(member), msg2); @@ -636,16 +463,16 @@ private static void assertStatus( } private static Future> listenNextEventFor( - FailureDetectorImpl fd, List
addresses) { + FailureDetectorImpl fd, List addresses) { final Transport transport = BaseTest.getField(fd, "transport"); addresses = new ArrayList<>(addresses); - addresses.remove(Address.from(transport.address())); // exclude self + addresses.remove(transport.address()); // exclude self if (addresses.isEmpty()) { throw new IllegalArgumentException(); } List> resultFuture = new ArrayList<>(); - for (final Address member : addresses) { + for (final String member : addresses) { final CompletableFuture future = new CompletableFuture<>(); fd.listen().filter(event -> event.member().address() == member).subscribe(future::complete); resultFuture.add(future); diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java index 823459c3..1ea1e583 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java @@ -1,5 +1,6 @@ package io.scalecube.cluster.gossip; +import static io.scalecube.cluster.transport.api.Transport.parsePort; import static org.junit.jupiter.api.Assertions.assertEquals; import io.scalecube.cluster.BaseTest; @@ -9,7 +10,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; @@ -38,24 +38,15 @@ public void testMessageDelayMoreThanGossipSweepTime() throws InterruptedExceptio final GossipProtocolImpl gossipProtocol1 = initGossipProtocol( transport1, - Arrays.asList( - Address.from(transport1.address()), - Address.from(transport2.address()), - Address.from(transport3.address()))); + Arrays.asList(transport1.address(), transport2.address(), transport3.address())); final GossipProtocolImpl gossipProtocol2 = initGossipProtocol( transport2, - Arrays.asList( - Address.from(transport1.address()), - Address.from(transport2.address()), - Address.from(transport3.address()))); + Arrays.asList(transport1.address(), transport2.address(), transport3.address())); final GossipProtocolImpl gossipProtocol3 = initGossipProtocol( transport3, - Arrays.asList( - Address.from(transport1.address()), - Address.from(transport2.address()), - Address.from(transport3.address()))); + Arrays.asList(transport1.address(), transport2.address(), transport3.address())); final AtomicInteger protocol1GossipCounter = new AtomicInteger(0); final AtomicInteger protocol2GossipCounter = new AtomicInteger(0); @@ -83,7 +74,7 @@ private NetworkEmulatorTransport getNetworkEmulatorTransport(int lostPercent, in return transport; } - private GossipProtocolImpl initGossipProtocol(Transport transport, List
members) { + private GossipProtocolImpl initGossipProtocol(Transport transport, List members) { GossipConfig gossipConfig = new GossipConfig() .gossipFanout(gossipFanout) @@ -92,15 +83,12 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List
Member localMember = new Member( - "member-" + Address.from(transport.address()).port(), - null, - Address.from(transport.address()), - NAMESPACE); + "member-" + parsePort(transport.address()), null, transport.address(), NAMESPACE); Flux membershipFlux = Flux.fromIterable(members) .filter(address -> !transport.address().equals(address)) - .map(address -> new Member("member-" + address.port(), null, address, NAMESPACE)) + .map(address -> new Member("member-" + parsePort(address), null, address, NAMESPACE)) .map(member -> MembershipEvent.createAdded(member, null, 0)); GossipProtocolImpl gossipProtocol = diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java index 12e5856f..9842c07c 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java @@ -4,6 +4,7 @@ import static io.scalecube.cluster.ClusterMath.gossipDisseminationTime; import static io.scalecube.cluster.ClusterMath.maxMessagesPerGossipPerNode; import static io.scalecube.cluster.ClusterMath.maxMessagesPerGossipTotal; +import static io.scalecube.cluster.transport.api.Transport.parsePort; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -15,7 +16,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -230,9 +230,9 @@ private LongSummaryStatistics computeMessageLostStats(List g private List initGossipProtocols(int count, int lostPercent, int meanDelay) { final List transports = initTransports(count, lostPercent, meanDelay); - List
members = new ArrayList<>(); + List members = new ArrayList<>(); for (Transport transport : transports) { - members.add(Address.from(transport.address())); + members.add(transport.address()); } List gossipProtocols = new ArrayList<>(); for (Transport transport : transports) { @@ -251,7 +251,7 @@ private List initTransports(int count, int lostPercent, int meanDelay return transports; } - private GossipProtocolImpl initGossipProtocol(Transport transport, List
members) { + private GossipProtocolImpl initGossipProtocol(Transport transport, List members) { GossipConfig gossipConfig = new GossipConfig() .gossipFanout(gossipFanout) @@ -260,15 +260,12 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List
Member localMember = new Member( - "member-" + Address.from(transport.address()).port(), - null, - Address.from(transport.address()), - NAMESPACE); + "member-" + parsePort(transport.address()), null, transport.address(), NAMESPACE); Flux membershipFlux = Flux.fromIterable(members) .filter(address -> !transport.address().equals(address)) - .map(address -> new Member("member-" + address.port(), null, address, NAMESPACE)) + .map(address -> new Member("member-" + parsePort(address), null, address, NAMESPACE)) .map(member -> MembershipEvent.createAdded(member, null, 0)); GossipProtocolImpl gossipProtocol = diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java index e2dfa1fc..37a3e86d 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java @@ -8,7 +8,6 @@ import io.scalecube.cluster.Member; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.MessageCodec; -import io.scalecube.net.Address; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.Serializable; @@ -39,7 +38,7 @@ public void init() { @Test public void testSerializationAndDeserialization() throws Exception { - Member from = new Member("0", null, Address.from("localhost:1234"), NAMESPACE); + Member from = new Member("0", null, "localhost:1234", NAMESPACE); List gossips = getGossips(); Message message = Message.withData(new GossipRequest(gossips, from.id())).correlationId("CORR_ID").build(); diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index ee1fdc53..310e947f 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -1,5 +1,6 @@ package io.scalecube.cluster.membership; +import static io.scalecube.cluster.transport.api.Transport.parsePort; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -18,7 +19,6 @@ import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.utils.NetworkEmulator; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import java.net.InetAddress; import java.net.UnknownHostException; import java.time.Duration; @@ -76,9 +76,7 @@ public void testLeaveCluster() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
addresses = - Arrays.asList( - Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); + List addresses = Arrays.asList(a.address(), b.address(), c.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -111,10 +109,8 @@ public void testLeaveCluster() { public void testLeaveClusterCameBeforeAlive() { final NetworkEmulatorTransport a = createTransport(); final NetworkEmulatorTransport b = createTransport(); - final Member anotherMember = - new Member("leavingNodeId-1", null, Address.from("localhost:9236"), NAMESPACE); - final List
addresses = - Arrays.asList(Address.from(a.address()), Address.from(b.address())); + final Member anotherMember = new Member("leavingNodeId-1", null, "localhost:9236", NAMESPACE); + final List addresses = Arrays.asList(a.address(), b.address()); final MembershipProtocolImpl cmA = createMembership(a, addresses); final MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -157,10 +153,8 @@ public void testLeaveClusterCameBeforeAlive() { public void testLeaveClusterOnly() { final NetworkEmulatorTransport a = createTransport(); final NetworkEmulatorTransport b = createTransport(); - final Member anotherMember = - new Member("leavingNodeId-1", null, Address.from("localhost:9236"), NAMESPACE); - final List
addresses = - Arrays.asList(Address.from(a.address()), Address.from(b.address())); + final Member anotherMember = new Member("leavingNodeId-1", null, "localhost:9236", NAMESPACE); + final List addresses = Arrays.asList(a.address(), b.address()); final MembershipProtocolImpl cmA = createMembership(a, addresses); final MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -192,10 +186,8 @@ public void testLeaveClusterOnly() { public void testLeaveClusterOnSuspectedNode() { final NetworkEmulatorTransport a = createTransport(); final NetworkEmulatorTransport b = createTransport(); - final Member anotherMember = - new Member("leavingNodeId-1", null, Address.from("localhost:9236"), NAMESPACE); - final List
addresses = - Arrays.asList(Address.from(a.address()), Address.from(b.address())); + final Member anotherMember = new Member("leavingNodeId-1", null, "localhost:9236", NAMESPACE); + final List addresses = Arrays.asList(a.address(), b.address()); final MembershipProtocolImpl cmA = createMembership(a, addresses); final MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -239,8 +231,7 @@ public void testLeaveClusterOnSuspectedNode() { public void testLeaveClusterOnAliveAndSuspectedNode() { final NetworkEmulatorTransport a = createTransport(); final NetworkEmulatorTransport b = createTransport(); - final List
addresses = - Arrays.asList(Address.from(a.address()), Address.from(b.address())); + final List addresses = Arrays.asList(a.address(), b.address()); final MembershipProtocolImpl cmA = createMembership(a, addresses); final MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -276,9 +267,7 @@ public void testInitialPhaseOk() { Transport a = createTransport(); Transport b = createTransport(); Transport c = createTransport(); - List
addresses = - Arrays.asList( - Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); + List addresses = Arrays.asList(a.address(), b.address(), c.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -303,9 +292,7 @@ public void testNetworkPartitionDueNoOutboundThenRecover() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
addresses = - Arrays.asList( - Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); + List addresses = Arrays.asList(a.address(), b.address(), c.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -314,12 +301,9 @@ public void testNetworkPartitionDueNoOutboundThenRecover() { awaitSeconds(3); // Block traffic - a.networkEmulator() - .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList())); - b.networkEmulator() - .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList())); - c.networkEmulator() - .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList())); + a.networkEmulator().blockOutbound(addresses); + b.networkEmulator().blockOutbound(addresses); + c.networkEmulator().blockOutbound(addresses); try { @@ -354,9 +338,7 @@ public void testMemberLostNetworkDueNoOutboundThenRecover() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
members = - Arrays.asList( - Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); + List members = Arrays.asList(a.address(), b.address(), c.address()); MembershipProtocolImpl cmA = createMembership(a, members); MembershipProtocolImpl cmB = createMembership(b, members); @@ -412,9 +394,7 @@ public void testNetworkPartitionTwiceDueNoOutboundThenRecover() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
addresses = - Arrays.asList( - Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); + List addresses = Arrays.asList(a.address(), b.address(), c.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -484,9 +464,7 @@ public void testNetworkLostOnAllNodesDueNoOutboundThenRecover() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
addresses = - Arrays.asList( - Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); + List addresses = Arrays.asList(a.address(), b.address(), c.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -502,12 +480,9 @@ public void testNetworkLostOnAllNodesDueNoOutboundThenRecover() { assertTrusted(cmC, cmB.member(), cmA.member()); assertNoSuspected(cmC); - a.networkEmulator() - .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList())); - b.networkEmulator() - .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList())); - c.networkEmulator() - .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList())); + a.networkEmulator().blockOutbound(addresses); + b.networkEmulator().blockOutbound(addresses); + c.networkEmulator().blockOutbound(addresses); awaitSeconds(1); @@ -545,12 +520,7 @@ public void testLongNetworkPartitionDueNoOutboundThenRemoved() { NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
addresses = - Arrays.asList( - Address.from(a.address()), - Address.from(b.address()), - Address.from(c.address()), - Address.from(d.address())); + List addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -603,12 +573,7 @@ public void testRestartStoppedMembers() { NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
addresses = - Arrays.asList( - Address.from(a.address()), - Address.from(b.address()), - Address.from(c.address()), - Address.from(d.address())); + List addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -688,12 +653,7 @@ public void testRestartStoppedMembersOnSameAddresses() { NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
addresses = - Arrays.asList( - Address.from(a.address()), - Address.from(b.address()), - Address.from(c.address()), - Address.from(d.address())); + List addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -728,8 +688,8 @@ public void testRestartStoppedMembersOnSameAddresses() { assertSuspected(cmB, cmC.member(), cmD.member()); // Restart C and D on same ports - c_Restarted = createTransport(new TransportConfig().port(Address.from(c.address()).port())); - d_Restarted = createTransport(new TransportConfig().port(Address.from(d.address()).port())); + c_Restarted = createTransport(new TransportConfig().port(parsePort(c.address()))); + d_Restarted = createTransport(new TransportConfig().port(parsePort(d.address()))); cmC_Restarted = createMembership(c_Restarted, addresses); cmD_Restarted = createMembership(d_Restarted, addresses); @@ -765,14 +725,10 @@ public void testLimitedSeedMembers() { NetworkEmulatorTransport e = createTransport(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = - createMembership(b, Collections.singletonList(Address.from(a.address()))); - MembershipProtocolImpl cmC = - createMembership(c, Collections.singletonList(Address.from(a.address()))); - MembershipProtocolImpl cmD = - createMembership(d, Collections.singletonList(Address.from(b.address()))); - MembershipProtocolImpl cmE = - createMembership(e, Collections.singletonList(Address.from(b.address()))); + MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); + MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address())); + MembershipProtocolImpl cmD = createMembership(d, Collections.singletonList(b.address())); + MembershipProtocolImpl cmE = createMembership(e, Collections.singletonList(b.address())); try { awaitSeconds(3); @@ -806,24 +762,16 @@ public void testOverrideMemberAddress() throws UnknownHostException { createMembership(a, testConfig(Collections.emptyList()).externalHost(localAddress)); MembershipProtocolImpl cmB = createMembership( - b, - testConfig(Collections.singletonList(Address.from(a.address()))) - .externalHost(localAddress)); + b, testConfig(Collections.singletonList(a.address())).externalHost(localAddress)); MembershipProtocolImpl cmC = createMembership( - c, - testConfig(Collections.singletonList(Address.from(a.address()))) - .externalHost(localAddress)); + c, testConfig(Collections.singletonList(a.address())).externalHost(localAddress)); MembershipProtocolImpl cmD = createMembership( - d, - testConfig(Collections.singletonList(Address.from(b.address()))) - .externalHost(localAddress)); + d, testConfig(Collections.singletonList(b.address())).externalHost(localAddress)); MembershipProtocolImpl cmE = createMembership( - e, - testConfig(Collections.singletonList(Address.from(b.address()))) - .externalHost(localAddress)); + e, testConfig(Collections.singletonList(b.address())).externalHost(localAddress)); try { awaitSeconds(3); @@ -853,10 +801,9 @@ public void testNodeJoinClusterWithNoInbound() { c_noInbound.networkEmulator().blockAllInbound(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = - createMembership(b, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); MembershipProtocolImpl cm_noInbound = - createMembership(c_noInbound, Collections.singletonList(Address.from(a.address()))); + createMembership(c_noInbound, Collections.singletonList(a.address())); awaitSeconds(3); @@ -881,11 +828,9 @@ public void testNodeJoinClusterWithNoInboundThenInboundRecover() { c_noInboundThenInboundOk.networkEmulator().blockAllInbound(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = - createMembership(b, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); MembershipProtocolImpl cm_noInboundThenInboundOk = - createMembership( - c_noInboundThenInboundOk, Collections.singletonList(Address.from(a.address()))); + createMembership(c_noInboundThenInboundOk, Collections.singletonList(a.address())); awaitSeconds(3); @@ -917,10 +862,8 @@ public void testNetworkPartitionDueNoInboundThenRemoved() { NetworkEmulatorTransport c = createTransport(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = - createMembership(b, Collections.singletonList(Address.from(a.address()))); - MembershipProtocolImpl cmC = - createMembership(c, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); + MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address())); try { awaitSeconds(3); @@ -959,10 +902,8 @@ public void testNetworkPartitionDueNoInboundUntilRemovedThenInboundRecover() { NetworkEmulatorTransport c = createTransport(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = - createMembership(b, Collections.singletonList(Address.from(a.address()))); - MembershipProtocolImpl cmC = - createMembership(c, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); + MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address())); try { awaitSeconds(3); @@ -1013,10 +954,8 @@ public void testNetworkPartitionBetweenTwoMembersDueNoInbound() { NetworkEmulatorTransport c = createTransport(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = - createMembership(b, Collections.singletonList(Address.from(a.address()))); - MembershipProtocolImpl cmC = - createMembership(c, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); + MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address())); try { awaitSeconds(3); @@ -1045,10 +984,8 @@ public void testNetworkPartitionBetweenTwoMembersDueNoOutbound() { NetworkEmulatorTransport c = createTransport(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = - createMembership(b, Collections.singletonList(Address.from(a.address()))); - MembershipProtocolImpl cmC = - createMembership(c, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); + MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address())); try { awaitSeconds(3); @@ -1077,10 +1014,8 @@ public void testNetworkPartitionBetweenTwoMembersDueNoTrafficAtAll() { NetworkEmulatorTransport c = createTransport(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = - createMembership(b, Collections.singletonList(Address.from(a.address()))); - MembershipProtocolImpl cmC = - createMembership(c, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); + MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address())); try { awaitSeconds(3); @@ -1109,12 +1044,7 @@ public void testNetworkPartitionManyDueNoInboundThenRemovedThenRecover() { NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
addresses = - Arrays.asList( - Address.from(a.address()), - Address.from(b.address()), - Address.from(c.address()), - Address.from(d.address())); + List addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -1184,7 +1114,7 @@ public void testNetworkPartitionManyDueNoInboundThenRemovedThenRecover() { } } - private static ClusterConfig testConfig(List
seedAddresses) { + private static ClusterConfig testConfig(List seedAddresses) { // Create faster config for local testing return new ClusterConfig() .membership(opts -> opts.seedMembers(seedAddresses)) @@ -1196,14 +1126,12 @@ private static ClusterConfig testConfig(List
seedAddresses) { .metadataTimeout(100); } - private MembershipProtocolImpl createMembership( - Transport transport, List
seedAddresses) { + private MembershipProtocolImpl createMembership(Transport transport, List seedAddresses) { return createMembership(transport, testConfig(seedAddresses)); } private MembershipProtocolImpl createMembership(Transport transport, ClusterConfig config) { - Member localMember = - new Member(newMemberId(), null, Address.from(transport.address()), NAMESPACE); + Member localMember = new Member(newMemberId(), null, transport.address(), NAMESPACE); Sinks.Many membershipProcessor = Sinks.many().multicast().directBestEffort(); diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipRecordTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipRecordTest.java index 0a5028cb..54a9c869 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipRecordTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipRecordTest.java @@ -9,13 +9,12 @@ import io.scalecube.cluster.BaseTest; import io.scalecube.cluster.Member; -import io.scalecube.net.Address; import org.junit.jupiter.api.Test; public class MembershipRecordTest extends BaseTest { - private final Member member = new Member("0", "0", Address.from("localhost:1234"), "ns-0"); - private final Member anotherMember = new Member("1", "1", Address.from("localhost:4567"), "ns-1"); + private final Member member = new Member("0", "0", "localhost:1234", "ns-0"); + private final Member anotherMember = new Member("1", "1", "localhost:4567", "ns-1"); private final MembershipRecord r0Null = null; diff --git a/pom.xml b/pom.xml index e21c8b82..584671d8 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -33,8 +35,6 @@ - 1.0.24 - 1.7.36 2.17.2 2020.0.32 @@ -117,13 +117,6 @@ import - - - io.scalecube - scalecube-commons - ${scalecube-commons.version} - - com.fasterxml.jackson