diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java index d51a7526..552c10bc 100644 --- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java +++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java @@ -1,7 +1,6 @@ package io.scalecube.cluster.utils; import io.scalecube.cluster.transport.api.Message; -import io.scalecube.net.Address; import java.time.Duration; import java.util.Arrays; import java.util.Collection; @@ -30,21 +29,21 @@ public final class NetworkEmulator { private volatile OutboundSettings defaultOutboundSettings = new OutboundSettings(0, 0); private volatile InboundSettings defaultInboundSettings = new InboundSettings(true); - private final Map outboundSettings = new ConcurrentHashMap<>(); - private final Map inboundSettings = new ConcurrentHashMap<>(); + private final Map outboundSettings = new ConcurrentHashMap<>(); + private final Map inboundSettings = new ConcurrentHashMap<>(); private final AtomicLong totalMessageSentCount = new AtomicLong(); private final AtomicLong totalOutboundMessageLostCount = new AtomicLong(); private final AtomicLong totalInboundMessageLostCount = new AtomicLong(); - private final Address address; + private final String address; /** * Creates new instance of network emulator. * * @param address local address */ - NetworkEmulator(Address address) { + NetworkEmulator(String address) { this.address = address; } @@ -56,7 +55,7 @@ public final class NetworkEmulator { * @param destination address of target endpoint * @return network outbound settings */ - public OutboundSettings outboundSettings(Address destination) { + public OutboundSettings outboundSettings(String destination) { return outboundSettings.getOrDefault(destination, defaultOutboundSettings); } @@ -67,7 +66,7 @@ public OutboundSettings outboundSettings(Address destination) { * @param lossPercent loss in percents * @param meanDelay mean delay */ - public void outboundSettings(Address destination, int lossPercent, int meanDelay) { + public void outboundSettings(String destination, int lossPercent, int meanDelay) { OutboundSettings settings = new OutboundSettings(lossPercent, meanDelay); outboundSettings.put(destination, settings); LOGGER.debug("[{}] Set outbound settings {} to {}", address, settings, destination); @@ -103,7 +102,7 @@ public void unblockAllOutbound() { * * @param destinations collection of target endpoints where to apply */ - public void blockOutbound(Address... destinations) { + public void blockOutbound(String... destinations) { blockOutbound(Arrays.asList(destinations)); } @@ -112,8 +111,8 @@ public void blockOutbound(Address... destinations) { * * @param destinations collection of target endpoints where to apply */ - public void blockOutbound(Collection
destinations) { - for (Address destination : destinations) { + public void blockOutbound(Collection destinations) { + for (String destination : destinations) { outboundSettings.put(destination, new OutboundSettings(100, 0)); } LOGGER.debug("[{}] Blocked outbound to {}", address, destinations); @@ -124,7 +123,7 @@ public void blockOutbound(Collection
destinations) { * * @param destinations collection of target endpoints where to apply */ - public void unblockOutbound(Address... destinations) { + public void unblockOutbound(String... destinations) { unblockOutbound(Arrays.asList(destinations)); } @@ -133,7 +132,7 @@ public void unblockOutbound(Address... destinations) { * * @param destinations collection of target endpoints where to apply */ - public void unblockOutbound(Collection
destinations) { + public void unblockOutbound(Collection destinations) { destinations.forEach(outboundSettings::remove); LOGGER.debug("[{}] Unblocked outbound {}", address, destinations); } @@ -164,7 +163,7 @@ public long totalOutboundMessageLostCount() { * @param address target address * @return mono message */ - public Mono tryFailOutbound(Message msg, Address address) { + public Mono tryFailOutbound(Message msg, String address) { return Mono.defer( () -> { totalMessageSentCount.incrementAndGet(); @@ -187,7 +186,7 @@ public Mono tryFailOutbound(Message msg, Address address) { * @param address target address * @return mono message */ - public Mono tryDelayOutbound(Message msg, Address address) { + public Mono tryDelayOutbound(Message msg, String address) { return Mono.defer( () -> { totalMessageSentCount.incrementAndGet(); @@ -209,7 +208,7 @@ public Mono tryDelayOutbound(Message msg, Address address) { * @param destination address of target endpoint * @return network inbound settings */ - public InboundSettings inboundSettings(Address destination) { + public InboundSettings inboundSettings(String destination) { return inboundSettings.getOrDefault(destination, defaultInboundSettings); } @@ -218,7 +217,7 @@ public InboundSettings inboundSettings(Address destination) { * * @param shallPass shallPass inbound flag */ - public void inboundSettings(Address destination, boolean shallPass) { + public void inboundSettings(String destination, boolean shallPass) { InboundSettings settings = new InboundSettings(shallPass); inboundSettings.put(destination, settings); LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination); @@ -253,7 +252,7 @@ public void unblockAllInbound() { * * @param destinations collection of target endpoints where to apply */ - public void blockInbound(Address... destinations) { + public void blockInbound(String... destinations) { blockInbound(Arrays.asList(destinations)); } @@ -262,8 +261,8 @@ public void blockInbound(Address... destinations) { * * @param destinations collection of target endpoints where to apply */ - public void blockInbound(Collection
destinations) { - for (Address destination : destinations) { + public void blockInbound(Collection destinations) { + for (String destination : destinations) { inboundSettings.put(destination, new InboundSettings(false)); } LOGGER.debug("[{}] Blocked inbound from {}", address, destinations); @@ -274,7 +273,7 @@ public void blockInbound(Collection
destinations) { * * @param destinations collection of target endpoints where to apply */ - public void unblockInbound(Address... destinations) { + public void unblockInbound(String... destinations) { unblockInbound(Arrays.asList(destinations)); } @@ -283,7 +282,7 @@ public void unblockInbound(Address... destinations) { * * @param destinations collection of target endpoints where to apply */ - public void unblockInbound(Collection
destinations) { + public void unblockInbound(Collection destinations) { destinations.forEach(inboundSettings::remove); LOGGER.debug("[{}] Unblocked inbound from {}", address, destinations); } diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java index 381042c5..dcfbf677 100644 --- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java +++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java @@ -2,7 +2,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -26,7 +25,7 @@ public NetworkEmulator networkEmulator() { } @Override - public Address address() { + public String address() { return transport.address(); } @@ -46,7 +45,7 @@ public boolean isStopped() { } @Override - public Mono send(Address address, Message message) { + public Mono send(String address, Message message) { return Mono.defer( () -> Mono.just(enhanceWithSender(message)) @@ -56,7 +55,7 @@ public Mono send(Address address, Message message) { } @Override - public Mono requestResponse(Address address, Message request) { + public Mono requestResponse(String address, Message request) { return Mono.defer( () -> Mono.just(enhanceWithSender(request)) diff --git a/cluster-testlib/src/test/java/io/scalecube/cluster/utils/NetworkEmulatorTest.java b/cluster-testlib/src/test/java/io/scalecube/cluster/utils/NetworkEmulatorTest.java index 1919bc6f..b65137db 100644 --- a/cluster-testlib/src/test/java/io/scalecube/cluster/utils/NetworkEmulatorTest.java +++ b/cluster-testlib/src/test/java/io/scalecube/cluster/utils/NetworkEmulatorTest.java @@ -1,7 +1,6 @@ package io.scalecube.cluster.utils; import io.scalecube.cluster.utils.NetworkEmulator.OutboundSettings; -import io.scalecube.net.Address; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -10,24 +9,23 @@ public class NetworkEmulatorTest extends BaseTest { @Test public void testResolveLinkSettingsBySocketAddress() { // Init network emulator - Address address = Address.from("localhost:1234"); - NetworkEmulator networkEmulator = new NetworkEmulator(address); - networkEmulator.outboundSettings(Address.create("localhost", 5678), 25, 10); - networkEmulator.outboundSettings(Address.create("192.168.0.1", 8765), 10, 20); + NetworkEmulator networkEmulator = new NetworkEmulator("localhost:1234"); + networkEmulator.outboundSettings("localhost:" + 5678, 25, 10); + networkEmulator.outboundSettings("192.168.0.1:" + 8765, 10, 20); networkEmulator.setDefaultOutboundSettings(0, 2); // Check resolve by hostname:port - OutboundSettings link1 = networkEmulator.outboundSettings(Address.create("localhost", 5678)); + OutboundSettings link1 = networkEmulator.outboundSettings("localhost:" + 5678); Assertions.assertEquals(25, link1.lossPercent()); Assertions.assertEquals(10, link1.meanDelay()); // Check resolve by ipaddr:port - OutboundSettings link2 = networkEmulator.outboundSettings(Address.create("192.168.0.1", 8765)); + OutboundSettings link2 = networkEmulator.outboundSettings("192.168.0.1:" + 8765); Assertions.assertEquals(10, link2.lossPercent()); Assertions.assertEquals(20, link2.meanDelay()); // Check default link settings - OutboundSettings link3 = networkEmulator.outboundSettings(Address.create("localhost", 8765)); + OutboundSettings link3 = networkEmulator.outboundSettings("localhost:" + 8765); Assertions.assertEquals(0, link3.lossPercent()); Assertions.assertEquals(2, link3.meanDelay()); } diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 77942be8..ff8a67fb 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -241,7 +241,7 @@ private Mono doStart0() { return Transport.bind(config.transportConfig()) .flatMap( boundTransport -> { - localMember = createLocalMember(boundTransport.address()); + localMember = createLocalMember(Address.from(boundTransport.address())); transport = new SenderAwareTransport(boundTransport, localMember.address()); scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true); @@ -506,7 +506,7 @@ private SenderAwareTransport(Transport transport, Address address) { } @Override - public Address address() { + public String address() { return transport.address(); } @@ -526,12 +526,12 @@ public boolean isStopped() { } @Override - public Mono send(Address address, Message message) { + public Mono send(String address, Message message) { return Mono.defer(() -> transport.send(address, enhanceWithSender(message))); } @Override - public Mono requestResponse(Address address, Message request) { + public Mono requestResponse(String address, Message request) { return Mono.defer(() -> transport.requestResponse(address, enhanceWithSender(request))); } @@ -541,7 +541,7 @@ public Flux listen() { } private Message enhanceWithSender(Message message) { - return Message.with(message).sender(address).build(); + return Message.with(message).sender(address.toString()).build(); } } } diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index c7540bc0..40860a89 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -147,7 +147,7 @@ private void doPing() { LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember); Address address = pingMember.address(); transport - .requestResponse(address, pingMsg) + .requestResponse(address.toString(), pingMsg) .timeout(Duration.ofMillis(config.pingTimeout()), scheduler) .publishOn(scheduler) .subscribe( @@ -190,7 +190,7 @@ private void doPingReq( pingReqMembers.forEach( member -> transport - .requestResponse(member.address(), pingReqMsg) + .requestResponse(member.address().toString(), pingReqMsg) .timeout(timeout, scheduler) .publishOn(scheduler) .subscribe( @@ -232,7 +232,7 @@ private void onMessage(Message message) { /** Listens to PING message and answers with ACK. */ private void onPing(Message message) { long period = this.currentPeriod; - Address sender = message.sender(); + Address sender = Address.from(message.sender()); LOGGER.debug("[{}][{}] Received Ping from {}", localMember, period, sender); PingData data = message.data(); data = data.withAckType(AckType.DEST_OK); @@ -252,7 +252,7 @@ private void onPing(Message message) { Address address = data.getFrom().address(); LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, address); transport - .send(address, ackMessage) + .send(address.toString(), ackMessage) .subscribe( null, ex -> @@ -278,7 +278,7 @@ private void onPingReq(Message message) { Address address = target.address(); LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, address); transport - .send(address, pingMessage) + .send(address.toString(), pingMessage) .subscribe( null, ex -> @@ -308,7 +308,7 @@ private void onTransitPingAck(Message message) { Address address = target.address(); LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, address); transport - .send(address, originalAckMessage) + .send(address.toString(), originalAckMessage) .subscribe( null, ex -> diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java index 1acfd060..6e76bd5c 100644 --- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java @@ -1,6 +1,6 @@ package io.scalecube.cluster.gossip; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.scalecube.cluster.ClusterMath; import io.scalecube.cluster.Member; @@ -8,6 +8,7 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.net.Address; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -120,7 +121,7 @@ public void stop() { actionsDisposables.dispose(); // Stop publishing events - sink.emitComplete(RETRY_NON_SERIALIZED); + sink.emitComplete(busyLooping(Duration.ofSeconds(3))); } @Override @@ -208,7 +209,7 @@ private void onGossipRequest(Message message) { if (gossipState == null) { // new gossip gossipState = new GossipState(gossip, period); gossips.put(gossip.gossipId(), gossipState); - sink.emitNext(gossip.message(), RETRY_NON_SERIALIZED); + sink.emitNext(gossip.message(), busyLooping(Duration.ofSeconds(3))); } } if (gossipState != null) { @@ -294,7 +295,7 @@ private void spreadGossipsTo(long period, Member member) { .forEach( message -> transport - .send(address, message) + .send(address.toString(), message) .subscribe( null, ex -> diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index fad3bca3..ad3240d5 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -3,7 +3,7 @@ import static io.scalecube.cluster.membership.MemberStatus.ALIVE; import static io.scalecube.cluster.membership.MemberStatus.DEAD; import static io.scalecube.cluster.membership.MemberStatus.LEAVING; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.scalecube.cluster.ClusterConfig; import io.scalecube.cluster.ClusterMath; @@ -167,7 +167,7 @@ private List
cleanUpSeedMembers(Collection
seedMembers) { String hostName = localIpAddress.getHostName(); Address memberAddr = localMember.address(); - Address transportAddr = transport.address(); + Address transportAddr = Address.from(transport.address()); Address memberAddrByHostAddress = Address.create(hostAddress, memberAddr.port()); Address transportAddrByHostAddress = Address.create(hostAddress, transportAddr.port()); Address memberAddByHostName = Address.create(hostName, memberAddr.port()); @@ -256,7 +256,8 @@ private void start0(MonoSink sink) { address -> transport .requestResponse( - address, prepareSyncDataMsg(SYNC, UUID.randomUUID().toString())) + address.toString(), + prepareSyncDataMsg(SYNC, UUID.randomUUID().toString())) .doOnError( ex -> LOGGER.warn( @@ -300,7 +301,7 @@ public void stop() { suspicionTimeoutTasks.clear(); // Stop publishing events - sink.emitComplete(RETRY_NON_SERIALIZED); + sink.emitComplete(busyLooping(Duration.ofSeconds(3))); } @Override @@ -339,7 +340,7 @@ private void doSync() { Message message = prepareSyncDataMsg(SYNC, null); LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, address); transport - .send(address, message) + .send(address.toString(), message) .subscribe( null, ex -> @@ -390,14 +391,14 @@ private Mono onSyncAck(Message syncAckMsg, boolean onStart) { private Mono onSync(Message syncMsg) { return Mono.defer( () -> { - final Address sender = syncMsg.sender(); + final Address sender = Address.from(syncMsg.sender()); LOGGER.debug("[{}] Received Sync from {}", localMember, sender); return syncMembership(syncMsg.data(), false) .doOnSuccess( avoid -> { Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId()); transport - .send(sender, message) + .send(sender.toString(), message) .subscribe( null, ex -> @@ -427,7 +428,7 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { Message syncMsg = prepareSyncDataMsg(SYNC, null); Address address = fdEvent.member().address(); transport - .send(address, syncMsg) + .send(address.toString(), syncMsg) .subscribe( null, ex -> @@ -726,7 +727,7 @@ private Mono onLeavingDetected(MembershipRecord r0, MembershipRecord r1) { private void publishEvent(MembershipEvent event) { LOGGER.info("[{}][publishEvent] {}", localMember, event); - sink.emitNext(event, RETRY_NON_SERIALIZED); + sink.emitNext(event, busyLooping(Duration.ofSeconds(3))); } private Mono onDeadMemberDetected(MembershipRecord r1) { diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java index 35ba5328..e7d0ad9a 100644 --- a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java @@ -160,7 +160,7 @@ public Mono fetchMetadata(Member member) { .build(); return transport - .requestResponse(targetAddress, request) + .requestResponse(targetAddress.toString(), request) .timeout(Duration.ofMillis(config.metadataTimeout()), scheduler) .publishOn(scheduler) .doOnSuccess( @@ -196,7 +196,7 @@ private void onMessage(Message message) { } private void onMetadataRequest(Message message) { - final Address sender = message.sender(); + final Address sender = Address.from(message.sender()); LOGGER.debug("[{}] Received GetMetadataReq from {}", localMember, sender); GetMetadataRequest reqData = message.data(); @@ -225,7 +225,7 @@ private void onMetadataRequest(Message message) { LOGGER.debug("[{}] Send GetMetadataResp to {}", localMember, sender); transport - .send(sender, response) + .send(sender.toString(), response) .subscribe( null, ex -> diff --git a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java index 3c2241d8..0ca750b1 100644 --- a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java @@ -53,7 +53,9 @@ public void testTrusted() { Transport a = createTransport(); Transport b = createTransport(); Transport c = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address()); + List
members = + Arrays.asList( + Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -68,9 +70,24 @@ public void testTrusted() { Future> listB = listenNextEventFor(fdB, members); Future> listC = listenNextEventFor(fdC, members); - assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), c.address()); - assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), c.address()); - assertStatus(c.address(), ALIVE, awaitEvents(listC), a.address(), b.address()); + assertStatus( + Address.from(a.address()), + ALIVE, + awaitEvents(listA), + Address.from(b.address()), + Address.from(c.address())); + assertStatus( + Address.from(b.address()), + ALIVE, + awaitEvents(listB), + Address.from(a.address()), + Address.from(c.address())); + assertStatus( + Address.from(c.address()), + ALIVE, + awaitEvents(listC), + Address.from(a.address()), + Address.from(b.address())); } finally { stop(fdetectors); } @@ -82,7 +99,9 @@ public void testSuspected() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address()); + List
members = + Arrays.asList( + Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -91,9 +110,12 @@ public void testSuspected() { List fdetectors = Arrays.asList(fdA, fdB, fdC); // block all traffic - a.networkEmulator().blockOutbound(members); - b.networkEmulator().blockOutbound(members); - c.networkEmulator().blockOutbound(members); + a.networkEmulator() + .blockOutbound(members.stream().map(Address::toString).collect(Collectors.toList())); + b.networkEmulator() + .blockOutbound(members.stream().map(Address::toString).collect(Collectors.toList())); + c.networkEmulator() + .blockOutbound(members.stream().map(Address::toString).collect(Collectors.toList())); try { start(fdetectors); @@ -102,9 +124,24 @@ public void testSuspected() { Future> listB = listenNextEventFor(fdB, members); Future> listC = listenNextEventFor(fdC, members); - assertStatus(a.address(), SUSPECT, awaitEvents(listA), b.address(), c.address()); - assertStatus(b.address(), SUSPECT, awaitEvents(listB), a.address(), c.address()); - assertStatus(c.address(), SUSPECT, awaitEvents(listC), a.address(), b.address()); + assertStatus( + Address.from(a.address()), + SUSPECT, + awaitEvents(listA), + Address.from(b.address()), + Address.from(c.address())); + assertStatus( + Address.from(b.address()), + SUSPECT, + awaitEvents(listB), + Address.from(a.address()), + Address.from(c.address())); + assertStatus( + Address.from(c.address()), + SUSPECT, + awaitEvents(listC), + Address.from(a.address()), + Address.from(b.address())); } finally { a.networkEmulator().unblockAllOutbound(); b.networkEmulator().unblockAllOutbound(); @@ -119,7 +156,9 @@ public void testTrustedDespiteBadNetwork() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address()); + List
members = + Arrays.asList( + Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -137,9 +176,24 @@ public void testTrustedDespiteBadNetwork() { try { start(fdetectors); - assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), c.address()); - assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), c.address()); - assertStatus(c.address(), ALIVE, awaitEvents(listC), a.address(), b.address()); + assertStatus( + Address.from(a.address()), + ALIVE, + awaitEvents(listA), + Address.from(b.address()), + Address.from(c.address())); + assertStatus( + Address.from(b.address()), + ALIVE, + awaitEvents(listB), + Address.from(a.address()), + Address.from(c.address())); + assertStatus( + Address.from(c.address()), + ALIVE, + awaitEvents(listC), + Address.from(a.address()), + Address.from(b.address())); } finally { stop(fdetectors); } @@ -151,7 +205,9 @@ public void testTrustedDespiteDifferentPingTimings() { Transport a = createTransport(); Transport b = createTransport(); Transport c = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address()); + List
members = + Arrays.asList( + Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -168,9 +224,24 @@ public void testTrustedDespiteDifferentPingTimings() { Future> listB = listenNextEventFor(fdB, members); Future> listC = listenNextEventFor(fdC, members); - assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), c.address()); - assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), c.address()); - assertStatus(c.address(), ALIVE, awaitEvents(listC), a.address(), b.address()); + assertStatus( + Address.from(a.address()), + ALIVE, + awaitEvents(listA), + Address.from(b.address()), + Address.from(c.address())); + assertStatus( + Address.from(b.address()), + ALIVE, + awaitEvents(listB), + Address.from(a.address()), + Address.from(c.address())); + assertStatus( + Address.from(c.address()), + ALIVE, + awaitEvents(listC), + Address.from(a.address()), + Address.from(b.address())); } finally { stop(fdetectors); } @@ -183,7 +254,12 @@ public void testSuspectedMemberWithBadNetworkGetsPartitioned() throws Exception NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address(), d.address()); + List
members = + Arrays.asList( + Address.from(a.address()), + Address.from(b.address()), + Address.from(c.address()), + Address.from(d.address())); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -193,7 +269,8 @@ public void testSuspectedMemberWithBadNetworkGetsPartitioned() throws Exception List fdetectors = Arrays.asList(fdA, fdB, fdC, fdD); // Block traffic on member A to all cluster members - a.networkEmulator().blockOutbound(members); + a.networkEmulator() + .blockOutbound(members.stream().map(Address::toString).collect(Collectors.toList())); try { final Future> listA = listenNextEventFor(fdA, members); @@ -204,16 +281,19 @@ public void testSuspectedMemberWithBadNetworkGetsPartitioned() throws Exception start(fdetectors); assertStatus( - a.address(), + Address.from(a.address()), SUSPECT, awaitEvents(listA), - b.address(), - c.address(), - d.address()); // node A + Address.from(b.address()), + Address.from(c.address()), + Address.from(d.address())); // node A // partitioned - assertStatus(b.address(), SUSPECT, awaitEvents(listB), a.address()); - assertStatus(c.address(), SUSPECT, awaitEvents(listC), a.address()); - assertStatus(d.address(), SUSPECT, awaitEvents(listD), a.address()); + assertStatus( + Address.from(b.address()), SUSPECT, awaitEvents(listB), Address.from(a.address())); + assertStatus( + Address.from(c.address()), SUSPECT, awaitEvents(listC), Address.from(a.address())); + assertStatus( + Address.from(d.address()), SUSPECT, awaitEvents(listD), Address.from(a.address())); // Unblock traffic on member A a.networkEmulator().unblockAllOutbound(); @@ -226,10 +306,34 @@ public void testSuspectedMemberWithBadNetworkGetsPartitioned() throws Exception // Check member A recovers - assertStatus(a.address(), ALIVE, awaitEvents(listA0), b.address(), c.address(), d.address()); - assertStatus(b.address(), ALIVE, awaitEvents(listB0), a.address(), c.address(), d.address()); - assertStatus(c.address(), ALIVE, awaitEvents(listC0), a.address(), b.address(), d.address()); - assertStatus(d.address(), ALIVE, awaitEvents(listD0), a.address(), b.address(), c.address()); + assertStatus( + Address.from(a.address()), + ALIVE, + awaitEvents(listA0), + Address.from(b.address()), + Address.from(c.address()), + Address.from(d.address())); + assertStatus( + Address.from(b.address()), + ALIVE, + awaitEvents(listB0), + Address.from(a.address()), + Address.from(c.address()), + Address.from(d.address())); + assertStatus( + Address.from(c.address()), + ALIVE, + awaitEvents(listC0), + Address.from(a.address()), + Address.from(b.address()), + Address.from(d.address())); + assertStatus( + Address.from(d.address()), + ALIVE, + awaitEvents(listD0), + Address.from(a.address()), + Address.from(b.address()), + Address.from(c.address())); } finally { stop(fdetectors); } @@ -242,7 +346,12 @@ public void testSuspectedMemberWithNormalNetworkGetsPartitioned() throws Excepti NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address(), d.address()); + List
members = + Arrays.asList( + Address.from(a.address()), + Address.from(b.address()), + Address.from(c.address()), + Address.from(d.address())); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -264,16 +373,19 @@ public void testSuspectedMemberWithNormalNetworkGetsPartitioned() throws Excepti start(fdetectors); - assertStatus(a.address(), SUSPECT, awaitEvents(listA), d.address()); - assertStatus(b.address(), SUSPECT, awaitEvents(listB), d.address()); - assertStatus(c.address(), SUSPECT, awaitEvents(listC), d.address()); assertStatus( - d.address(), + Address.from(a.address()), SUSPECT, awaitEvents(listA), Address.from(d.address())); + assertStatus( + Address.from(b.address()), SUSPECT, awaitEvents(listB), Address.from(d.address())); + assertStatus( + Address.from(c.address()), SUSPECT, awaitEvents(listC), Address.from(d.address())); + assertStatus( + Address.from(d.address()), SUSPECT, awaitEvents(listD), - a.address(), - b.address(), - c.address()); // node D + Address.from(a.address()), + Address.from(b.address()), + Address.from(c.address())); // node D // partitioned // Unblock traffic to member D on other members @@ -289,10 +401,34 @@ public void testSuspectedMemberWithNormalNetworkGetsPartitioned() throws Excepti // Check member D recovers - assertStatus(a.address(), ALIVE, awaitEvents(listA0), b.address(), c.address(), d.address()); - assertStatus(b.address(), ALIVE, awaitEvents(listB0), a.address(), c.address(), d.address()); - assertStatus(c.address(), ALIVE, awaitEvents(listC0), a.address(), b.address(), d.address()); - assertStatus(d.address(), ALIVE, awaitEvents(listD0), a.address(), b.address(), c.address()); + assertStatus( + Address.from(a.address()), + ALIVE, + awaitEvents(listA0), + Address.from(b.address()), + Address.from(c.address()), + Address.from(d.address())); + assertStatus( + Address.from(b.address()), + ALIVE, + awaitEvents(listB0), + Address.from(a.address()), + Address.from(c.address()), + Address.from(d.address())); + assertStatus( + Address.from(c.address()), + ALIVE, + awaitEvents(listC0), + Address.from(a.address()), + Address.from(b.address()), + Address.from(d.address())); + assertStatus( + Address.from(d.address()), + ALIVE, + awaitEvents(listD0), + Address.from(a.address()), + Address.from(b.address()), + Address.from(c.address())); } finally { stop(fdetectors); } @@ -303,7 +439,7 @@ public void testMemberStatusChangeAfterNetworkRecovery() throws Exception { // Create transports NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); - List
members = Arrays.asList(a.address(), b.address()); + List
members = Arrays.asList(Address.from(a.address()), Address.from(b.address())); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -320,8 +456,10 @@ public void testMemberStatusChangeAfterNetworkRecovery() throws Exception { try { start(fdetectors); - assertStatus(a.address(), SUSPECT, awaitEvents(listA), b.address()); - assertStatus(b.address(), SUSPECT, awaitEvents(listB), a.address()); + assertStatus( + Address.from(a.address()), SUSPECT, awaitEvents(listA), Address.from(b.address())); + assertStatus( + Address.from(b.address()), SUSPECT, awaitEvents(listB), Address.from(a.address())); // Unblock A and B members: A-->B, B-->A a.networkEmulator().unblockAllOutbound(); @@ -333,8 +471,8 @@ public void testMemberStatusChangeAfterNetworkRecovery() throws Exception { listA = listenNextEventFor(fdA, members); listB = listenNextEventFor(fdB, members); - assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address()); - assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address()); + assertStatus(Address.from(a.address()), ALIVE, awaitEvents(listA), Address.from(b.address())); + assertStatus(Address.from(b.address()), ALIVE, awaitEvents(listB), Address.from(a.address())); } finally { stop(fdetectors); } @@ -346,7 +484,9 @@ public void testStatusChangeAfterMemberRestart() throws Exception { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport x = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), x.address()); + List
members = + Arrays.asList( + Address.from(a.address()), Address.from(b.address()), Address.from(x.address())); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -365,16 +505,31 @@ public void testStatusChangeAfterMemberRestart() throws Exception { try { start(fdetectors); - assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), x.address()); - assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), x.address()); - assertStatus(x.address(), ALIVE, awaitEvents(listX), a.address(), b.address()); + assertStatus( + Address.from(a.address()), + ALIVE, + awaitEvents(listA), + Address.from(b.address()), + Address.from(x.address())); + assertStatus( + Address.from(b.address()), + ALIVE, + awaitEvents(listB), + Address.from(a.address()), + Address.from(x.address())); + assertStatus( + Address.from(x.address()), + ALIVE, + awaitEvents(listX), + Address.from(a.address()), + Address.from(b.address())); // stop node X stop(Collections.singletonList(fdX)); TimeUnit.SECONDS.sleep(2); // restart node X as XX - xx = createTransport(new TransportConfig().port(x.address().port())); + xx = createTransport(new TransportConfig().port(Address.from(x.address()).port())); assertEquals(x.address(), xx.address()); fdetectors = Arrays.asList(fdA, fdB, fdXx = createFd(xx, members)); @@ -389,9 +544,24 @@ public void testStatusChangeAfterMemberRestart() throws Exception { // TODO [AK]: It would be more correct to consider restarted member as a new member, so x is // still suspected! - assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), xx.address()); - assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), xx.address()); - assertStatus(xx.address(), ALIVE, awaitEvents(listXx), a.address(), b.address()); + assertStatus( + Address.from(a.address()), + ALIVE, + awaitEvents(listA), + Address.from(b.address()), + Address.from(xx.address())); + assertStatus( + Address.from(b.address()), + ALIVE, + awaitEvents(listB), + Address.from(a.address()), + Address.from(xx.address())); + assertStatus( + Address.from(xx.address()), + ALIVE, + awaitEvents(listXx), + Address.from(a.address()), + Address.from(b.address())); } finally { stop(fdetectors); } @@ -410,11 +580,15 @@ private FailureDetectorImpl createFd( Transport transport, List
addresses, FailureDetectorConfig config) { Member localMember = - new Member("member-" + transport.address().port(), null, transport.address(), NAMESPACE); + new Member( + "member-" + Address.from(transport.address()).port(), + null, + Address.from(transport.address()), + NAMESPACE); Flux membershipFlux = Flux.fromIterable(addresses) - .filter(address -> !transport.address().equals(address)) + .filter(address -> !transport.address().equals(address.toString())) .map(address -> new Member("member-" + address.port(), null, address, NAMESPACE)) .map(member -> MembershipEvent.createAdded(member, null, 0)); @@ -465,7 +639,7 @@ private static Future> listenNextEventFor( FailureDetectorImpl fd, List
addresses) { final Transport transport = BaseTest.getField(fd, "transport"); addresses = new ArrayList<>(addresses); - addresses.remove(transport.address()); // exclude self + addresses.remove(Address.from(transport.address())); // exclude self if (addresses.isEmpty()) { throw new IllegalArgumentException(); } diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java index 3857d1e6..823459c3 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java @@ -38,15 +38,24 @@ public void testMessageDelayMoreThanGossipSweepTime() throws InterruptedExceptio final GossipProtocolImpl gossipProtocol1 = initGossipProtocol( transport1, - Arrays.asList(transport1.address(), transport2.address(), transport3.address())); + Arrays.asList( + Address.from(transport1.address()), + Address.from(transport2.address()), + Address.from(transport3.address()))); final GossipProtocolImpl gossipProtocol2 = initGossipProtocol( transport2, - Arrays.asList(transport1.address(), transport2.address(), transport3.address())); + Arrays.asList( + Address.from(transport1.address()), + Address.from(transport2.address()), + Address.from(transport3.address()))); final GossipProtocolImpl gossipProtocol3 = initGossipProtocol( transport3, - Arrays.asList(transport1.address(), transport2.address(), transport3.address())); + Arrays.asList( + Address.from(transport1.address()), + Address.from(transport2.address()), + Address.from(transport3.address()))); final AtomicInteger protocol1GossipCounter = new AtomicInteger(0); final AtomicInteger protocol2GossipCounter = new AtomicInteger(0); @@ -82,7 +91,11 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List
.gossipRepeatMult(gossipRepeatMultiplier); Member localMember = - new Member("member-" + transport.address().port(), null, transport.address(), NAMESPACE); + new Member( + "member-" + Address.from(transport.address()).port(), + null, + Address.from(transport.address()), + NAMESPACE); Flux membershipFlux = Flux.fromIterable(members) diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java index ae82bca2..12e5856f 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java @@ -232,7 +232,7 @@ private List initGossipProtocols(int count, int lostPercent, final List transports = initTransports(count, lostPercent, meanDelay); List
members = new ArrayList<>(); for (Transport transport : transports) { - members.add(transport.address()); + members.add(Address.from(transport.address())); } List gossipProtocols = new ArrayList<>(); for (Transport transport : transports) { @@ -259,7 +259,11 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List
.gossipRepeatMult(gossipRepeatMultiplier); Member localMember = - new Member("member-" + transport.address().port(), null, transport.address(), NAMESPACE); + new Member( + "member-" + Address.from(transport.address()).port(), + null, + Address.from(transport.address()), + NAMESPACE); Flux membershipFlux = Flux.fromIterable(members) diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index 4fced8e2..ee1fdc53 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -76,7 +76,9 @@ public void testLeaveCluster() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address()); + List
addresses = + Arrays.asList( + Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -111,7 +113,8 @@ public void testLeaveClusterCameBeforeAlive() { final NetworkEmulatorTransport b = createTransport(); final Member anotherMember = new Member("leavingNodeId-1", null, Address.from("localhost:9236"), NAMESPACE); - final List
addresses = Arrays.asList(a.address(), b.address()); + final List
addresses = + Arrays.asList(Address.from(a.address()), Address.from(b.address())); final MembershipProtocolImpl cmA = createMembership(a, addresses); final MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -156,7 +159,8 @@ public void testLeaveClusterOnly() { final NetworkEmulatorTransport b = createTransport(); final Member anotherMember = new Member("leavingNodeId-1", null, Address.from("localhost:9236"), NAMESPACE); - final List
addresses = Arrays.asList(a.address(), b.address()); + final List
addresses = + Arrays.asList(Address.from(a.address()), Address.from(b.address())); final MembershipProtocolImpl cmA = createMembership(a, addresses); final MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -190,7 +194,8 @@ public void testLeaveClusterOnSuspectedNode() { final NetworkEmulatorTransport b = createTransport(); final Member anotherMember = new Member("leavingNodeId-1", null, Address.from("localhost:9236"), NAMESPACE); - final List
addresses = Arrays.asList(a.address(), b.address()); + final List
addresses = + Arrays.asList(Address.from(a.address()), Address.from(b.address())); final MembershipProtocolImpl cmA = createMembership(a, addresses); final MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -234,7 +239,8 @@ public void testLeaveClusterOnSuspectedNode() { public void testLeaveClusterOnAliveAndSuspectedNode() { final NetworkEmulatorTransport a = createTransport(); final NetworkEmulatorTransport b = createTransport(); - final List
addresses = Arrays.asList(a.address(), b.address()); + final List
addresses = + Arrays.asList(Address.from(a.address()), Address.from(b.address())); final MembershipProtocolImpl cmA = createMembership(a, addresses); final MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -270,7 +276,9 @@ public void testInitialPhaseOk() { Transport a = createTransport(); Transport b = createTransport(); Transport c = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address()); + List
addresses = + Arrays.asList( + Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -295,7 +303,9 @@ public void testNetworkPartitionDueNoOutboundThenRecover() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address()); + List
addresses = + Arrays.asList( + Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -304,9 +314,12 @@ public void testNetworkPartitionDueNoOutboundThenRecover() { awaitSeconds(3); // Block traffic - a.networkEmulator().blockOutbound(addresses); - b.networkEmulator().blockOutbound(addresses); - c.networkEmulator().blockOutbound(addresses); + a.networkEmulator() + .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList())); + b.networkEmulator() + .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList())); + c.networkEmulator() + .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList())); try { @@ -341,7 +354,9 @@ public void testMemberLostNetworkDueNoOutboundThenRecover() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address()); + List
members = + Arrays.asList( + Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); MembershipProtocolImpl cmA = createMembership(a, members); MembershipProtocolImpl cmB = createMembership(b, members); @@ -397,7 +412,9 @@ public void testNetworkPartitionTwiceDueNoOutboundThenRecover() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address()); + List
addresses = + Arrays.asList( + Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -467,7 +484,9 @@ public void testNetworkLostOnAllNodesDueNoOutboundThenRecover() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address()); + List
addresses = + Arrays.asList( + Address.from(a.address()), Address.from(b.address()), Address.from(c.address())); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -483,9 +502,12 @@ public void testNetworkLostOnAllNodesDueNoOutboundThenRecover() { assertTrusted(cmC, cmB.member(), cmA.member()); assertNoSuspected(cmC); - a.networkEmulator().blockOutbound(addresses); - b.networkEmulator().blockOutbound(addresses); - c.networkEmulator().blockOutbound(addresses); + a.networkEmulator() + .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList())); + b.networkEmulator() + .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList())); + c.networkEmulator() + .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList())); awaitSeconds(1); @@ -523,7 +545,12 @@ public void testLongNetworkPartitionDueNoOutboundThenRemoved() { NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); + List
addresses = + Arrays.asList( + Address.from(a.address()), + Address.from(b.address()), + Address.from(c.address()), + Address.from(d.address())); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -576,7 +603,12 @@ public void testRestartStoppedMembers() { NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); + List
addresses = + Arrays.asList( + Address.from(a.address()), + Address.from(b.address()), + Address.from(c.address()), + Address.from(d.address())); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -656,7 +688,12 @@ public void testRestartStoppedMembersOnSameAddresses() { NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); + List
addresses = + Arrays.asList( + Address.from(a.address()), + Address.from(b.address()), + Address.from(c.address()), + Address.from(d.address())); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -691,8 +728,8 @@ public void testRestartStoppedMembersOnSameAddresses() { assertSuspected(cmB, cmC.member(), cmD.member()); // Restart C and D on same ports - c_Restarted = createTransport(new TransportConfig().port(c.address().port())); - d_Restarted = createTransport(new TransportConfig().port(d.address().port())); + c_Restarted = createTransport(new TransportConfig().port(Address.from(c.address()).port())); + d_Restarted = createTransport(new TransportConfig().port(Address.from(d.address()).port())); cmC_Restarted = createMembership(c_Restarted, addresses); cmD_Restarted = createMembership(d_Restarted, addresses); @@ -728,10 +765,14 @@ public void testLimitedSeedMembers() { NetworkEmulatorTransport e = createTransport(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); - MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address())); - MembershipProtocolImpl cmD = createMembership(d, Collections.singletonList(b.address())); - MembershipProtocolImpl cmE = createMembership(e, Collections.singletonList(b.address())); + MembershipProtocolImpl cmB = + createMembership(b, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmC = + createMembership(c, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmD = + createMembership(d, Collections.singletonList(Address.from(b.address()))); + MembershipProtocolImpl cmE = + createMembership(e, Collections.singletonList(Address.from(b.address()))); try { awaitSeconds(3); @@ -765,16 +806,24 @@ public void testOverrideMemberAddress() throws UnknownHostException { createMembership(a, testConfig(Collections.emptyList()).externalHost(localAddress)); MembershipProtocolImpl cmB = createMembership( - b, testConfig(Collections.singletonList(a.address())).externalHost(localAddress)); + b, + testConfig(Collections.singletonList(Address.from(a.address()))) + .externalHost(localAddress)); MembershipProtocolImpl cmC = createMembership( - c, testConfig(Collections.singletonList(a.address())).externalHost(localAddress)); + c, + testConfig(Collections.singletonList(Address.from(a.address()))) + .externalHost(localAddress)); MembershipProtocolImpl cmD = createMembership( - d, testConfig(Collections.singletonList(b.address())).externalHost(localAddress)); + d, + testConfig(Collections.singletonList(Address.from(b.address()))) + .externalHost(localAddress)); MembershipProtocolImpl cmE = createMembership( - e, testConfig(Collections.singletonList(b.address())).externalHost(localAddress)); + e, + testConfig(Collections.singletonList(Address.from(b.address()))) + .externalHost(localAddress)); try { awaitSeconds(3); @@ -804,9 +853,10 @@ public void testNodeJoinClusterWithNoInbound() { c_noInbound.networkEmulator().blockAllInbound(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); + MembershipProtocolImpl cmB = + createMembership(b, Collections.singletonList(Address.from(a.address()))); MembershipProtocolImpl cm_noInbound = - createMembership(c_noInbound, Collections.singletonList(a.address())); + createMembership(c_noInbound, Collections.singletonList(Address.from(a.address()))); awaitSeconds(3); @@ -831,9 +881,11 @@ public void testNodeJoinClusterWithNoInboundThenInboundRecover() { c_noInboundThenInboundOk.networkEmulator().blockAllInbound(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); + MembershipProtocolImpl cmB = + createMembership(b, Collections.singletonList(Address.from(a.address()))); MembershipProtocolImpl cm_noInboundThenInboundOk = - createMembership(c_noInboundThenInboundOk, Collections.singletonList(a.address())); + createMembership( + c_noInboundThenInboundOk, Collections.singletonList(Address.from(a.address()))); awaitSeconds(3); @@ -865,8 +917,10 @@ public void testNetworkPartitionDueNoInboundThenRemoved() { NetworkEmulatorTransport c = createTransport(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); - MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address())); + MembershipProtocolImpl cmB = + createMembership(b, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmC = + createMembership(c, Collections.singletonList(Address.from(a.address()))); try { awaitSeconds(3); @@ -905,8 +959,10 @@ public void testNetworkPartitionDueNoInboundUntilRemovedThenInboundRecover() { NetworkEmulatorTransport c = createTransport(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); - MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address())); + MembershipProtocolImpl cmB = + createMembership(b, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmC = + createMembership(c, Collections.singletonList(Address.from(a.address()))); try { awaitSeconds(3); @@ -957,8 +1013,10 @@ public void testNetworkPartitionBetweenTwoMembersDueNoInbound() { NetworkEmulatorTransport c = createTransport(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); - MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address())); + MembershipProtocolImpl cmB = + createMembership(b, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmC = + createMembership(c, Collections.singletonList(Address.from(a.address()))); try { awaitSeconds(3); @@ -987,8 +1045,10 @@ public void testNetworkPartitionBetweenTwoMembersDueNoOutbound() { NetworkEmulatorTransport c = createTransport(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); - MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address())); + MembershipProtocolImpl cmB = + createMembership(b, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmC = + createMembership(c, Collections.singletonList(Address.from(a.address()))); try { awaitSeconds(3); @@ -1017,8 +1077,10 @@ public void testNetworkPartitionBetweenTwoMembersDueNoTrafficAtAll() { NetworkEmulatorTransport c = createTransport(); MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList()); - MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address())); - MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address())); + MembershipProtocolImpl cmB = + createMembership(b, Collections.singletonList(Address.from(a.address()))); + MembershipProtocolImpl cmC = + createMembership(c, Collections.singletonList(Address.from(a.address()))); try { awaitSeconds(3); @@ -1047,7 +1109,12 @@ public void testNetworkPartitionManyDueNoInboundThenRemovedThenRecover() { NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); + List
addresses = + Arrays.asList( + Address.from(a.address()), + Address.from(b.address()), + Address.from(c.address()), + Address.from(d.address())); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -1135,7 +1202,8 @@ private MembershipProtocolImpl createMembership( } private MembershipProtocolImpl createMembership(Transport transport, ClusterConfig config) { - Member localMember = new Member(newMemberId(), null, transport.address(), NAMESPACE); + Member localMember = + new Member(newMemberId(), null, Address.from(transport.address()), NAMESPACE); Sinks.Many membershipProcessor = Sinks.many().multicast().directBestEffort(); diff --git a/transport-parent/transport-api/pom.xml b/transport-parent/transport-api/pom.xml index 9a1f4c38..b0a2d71a 100644 --- a/transport-parent/transport-api/pom.xml +++ b/transport-parent/transport-api/pom.xml @@ -1,5 +1,7 @@ - + scalecube-transport-parent io.scalecube @@ -10,11 +12,4 @@ scalecube-transport-api ScaleCube/ClusterTransportApi - - - io.scalecube - scalecube-commons - - - diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/DistinctErrors.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/DistinctErrors.java new file mode 100644 index 00000000..a421de0a --- /dev/null +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/DistinctErrors.java @@ -0,0 +1,139 @@ +package io.scalecube.cluster.transport.api; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class DistinctErrors { + + private final List distinctObservations = new ArrayList<>(); + private final long evictionInterval; + + /** Constructor. */ + public DistinctErrors() { + this(null); + } + + /** + * Constructor. + * + * @param evictionInterval optional, how long consider incoming observation as unique. + */ + public DistinctErrors(Duration evictionInterval) { + this.evictionInterval = + evictionInterval != null && evictionInterval.toMillis() > 0 + ? evictionInterval.toMillis() + : Long.MAX_VALUE; + } + + /** + * Return true if there is an observation (or at least in the eviction time window) of this error + * type for a stack trace. Otherwise a new entry will be created and kept. + * + * @param observation an error observation + * @return true if such observation exists. + */ + public boolean contains(Throwable observation) { + synchronized (this) { + final long now = System.currentTimeMillis(); + DistinctObservation distinctObservation = find(now, distinctObservations, observation); + + if (distinctObservation == null) { + distinctObservations.add(new DistinctObservation(observation, now + evictionInterval)); + return false; + } + + if (distinctObservation.deadline > now) { + distinctObservation.resetDeadline(now + evictionInterval); + return false; + } + } + + return true; + } + + private static DistinctObservation find( + long now, List existingObservations, Throwable observation) { + DistinctObservation existingObservation = null; + + for (int lastIndex = existingObservations.size() - 1, i = lastIndex; i >= 0; i--) { + final DistinctObservation o = existingObservations.get(lastIndex); + + if (equals(o.throwable, observation)) { + existingObservation = o; + break; + } + + if (o.deadline > now) { + if (i == lastIndex) { + existingObservations.remove(i); + } else { + existingObservations.set(i, existingObservations.remove(lastIndex)); + } + lastIndex--; + } + } + + return existingObservation; + } + + private static boolean equals(Throwable lhs, Throwable rhs) { + while (true) { + if (lhs == rhs) { + return true; + } + + if (lhs.getClass() == rhs.getClass() + && Objects.equals(lhs.getMessage(), rhs.getMessage()) + && equals(lhs.getStackTrace(), rhs.getStackTrace())) { + lhs = lhs.getCause(); + rhs = rhs.getCause(); + + if (null == lhs && null == rhs) { + return true; + } else if (null != lhs && null != rhs) { + continue; + } + } + + return false; + } + } + + private static boolean equals( + StackTraceElement[] lhsStackTrace, StackTraceElement[] rhsStackTrace) { + if (lhsStackTrace.length != rhsStackTrace.length) { + return false; + } + + for (int i = 0, length = lhsStackTrace.length; i < length; i++) { + final StackTraceElement lhs = lhsStackTrace[i]; + final StackTraceElement rhs = rhsStackTrace[i]; + + if (lhs.getLineNumber() != rhs.getLineNumber() + || !lhs.getClassName().equals(rhs.getClassName()) + || !Objects.equals(lhs.getMethodName(), rhs.getMethodName()) + || !Objects.equals(lhs.getFileName(), rhs.getFileName())) { + return false; + } + } + + return true; + } + + private static final class DistinctObservation { + + private final Throwable throwable; + private long deadline; + + DistinctObservation(Throwable throwable, long deadline) { + this.throwable = throwable; + this.deadline = deadline; + } + + void resetDeadline(long deadline) { + this.deadline = deadline; + } + } +} diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java index b5e3e879..96f5105f 100644 --- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java @@ -1,6 +1,5 @@ package io.scalecube.cluster.transport.api; -import io.scalecube.net.Address; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -33,8 +32,8 @@ public final class Message implements Externalizable { public static final String HEADER_CORRELATION_ID = "cid"; /** - * This header represents sender address of type {@link Address}. It's an address of message - * originator. This header is optional. + * This header represents sender address. It is an address of message originator. This header is + * optional. */ public static final String HEADER_SENDER = "sender"; @@ -186,12 +185,12 @@ public T data() { } /** - * Returns {@link Address} of the sender of this message. + * Returns address of the sender of this message. * - * @return address + * @return address, or null */ - public Address sender() { - return Optional.ofNullable(header(HEADER_SENDER)).map(Address::from).orElse(null); + public String sender() { + return Optional.ofNullable(header(HEADER_SENDER)).orElse(null); } @Override @@ -281,8 +280,8 @@ public Builder correlationId(String correlationId) { return header(HEADER_CORRELATION_ID, correlationId); } - public Builder sender(Address sender) { - return header(HEADER_SENDER, sender.toString()); + public Builder sender(String sender) { + return header(HEADER_SENDER, sender); } public Message build() { diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Transport.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Transport.java index 2cd0b025..f8ad2d18 100644 --- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Transport.java +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Transport.java @@ -1,7 +1,8 @@ package io.scalecube.cluster.transport.api; -import io.scalecube.net.Address; import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -12,13 +13,14 @@ */ public interface Transport { + Pattern ADDRESS_FORMAT = Pattern.compile("(?^.*):(?\\d+$)"); + /** - * Returns local {@link Address} on which current instance of transport listens for incoming - * messages. + * Returns local address on which current instance of transport listens for incoming messages. * * @return address */ - Address address(); + String address(); /** * Start transport. After this call method {@link #address()} shall be eligible for calling. @@ -50,7 +52,7 @@ public interface Transport { * @return promise which will be completed with result of sending (void or exception) * @throws IllegalArgumentException if {@code message} or {@code address} is null */ - Mono send(Address address, Message message); + Mono send(String address, Message message); /** * Sends message to the given address. It will issue connect in case if no transport channel by @@ -62,7 +64,7 @@ public interface Transport { * @return promise which will be completed with result of sending (message or exception) * @throws IllegalArgumentException if {@code message} or {@code address} is null */ - Mono requestResponse(Address address, Message request); + Mono requestResponse(String address, Message request); /** * Returns stream of received messages. For each observers subscribed to the returned observable: @@ -124,4 +126,54 @@ static Mono bind(TransportConfig config) { Objects.requireNonNull(config.transportFactory(), "[bind] transportFactory"); return config.transportFactory().createTransport(config).start(); } + + /** + * Parses string in format {@code host:port} and returns host part. + * + * @param address address, must be string in format {@code host:port} + * @return address host, or throwing exception + */ + static String parseHost(String address) { + if (address == null || address.isEmpty()) { + throw new IllegalArgumentException("Cannot parse address host from: " + address); + } + + Matcher matcher = ADDRESS_FORMAT.matcher(address); + if (!matcher.find()) { + throw new IllegalArgumentException("Cannot parse address host from: " + address); + } + + String host = matcher.group(1); + if (host == null || host.isEmpty()) { + throw new IllegalArgumentException("Cannot parse address host from: " + address); + } + + return host; + } + + /** + * Parses string in format {@code host:port} and returns port part. + * + * @param address address, must be string in format {@code host:port} + * @return address port, or throwing exception + */ + static int parsePort(String address) { + if (address == null || address.isEmpty()) { + throw new IllegalArgumentException("Cannot parse address port from: " + address); + } + + Matcher matcher = ADDRESS_FORMAT.matcher(address); + if (!matcher.find()) { + throw new IllegalArgumentException("Cannot parse address port from: " + address); + } + + int port; + try { + port = Integer.parseInt(matcher.group(2)); + } catch (NumberFormatException ex) { + throw new IllegalArgumentException("Cannot parse address port from: " + address, ex); + } + + return port; + } } diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java index 06129f83..b6837f9e 100644 --- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java @@ -1,6 +1,5 @@ package io.scalecube.cluster.transport.api; -import io.scalecube.net.Address; import java.util.StringJoiner; import java.util.function.Function; import reactor.core.Exceptions; @@ -22,7 +21,7 @@ public final class TransportConfig implements Cloneable { private MessageCodec messageCodec = MessageCodec.INSTANCE; private int maxFrameLength = 2 * 1024 * 1024; // 2 MB private TransportFactory transportFactory; - private Function addressMapper = Function.identity(); + private Function addressMapper = Function.identity(); public TransportConfig() {} @@ -143,13 +142,13 @@ public TransportConfig maxFrameLength(int maxFrameLength) { * @param addressMapper address mapper * @return new {@code TransportConfig} instance */ - public TransportConfig addressMapper(Function addressMapper) { + public TransportConfig addressMapper(Function addressMapper) { TransportConfig t = clone(); t.addressMapper = addressMapper; return t; } - public Function addressMapper() { + public Function addressMapper() { return addressMapper; } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/Sender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/Sender.java index 5c0cbdd6..3d5ef35c 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/Sender.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/Sender.java @@ -1,13 +1,12 @@ package io.scalecube.transport.netty; import io.scalecube.cluster.transport.api.Message; -import io.scalecube.net.Address; import reactor.core.publisher.Mono; import reactor.netty.Connection; public interface Sender { - Mono connect(Address address); + Mono connect(String address); Mono send(Message message); } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java index 9838c68d..bcb93ad8 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java @@ -1,6 +1,6 @@ package io.scalecube.transport.netty; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -10,13 +10,13 @@ import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.EncoderException; import io.netty.util.ReferenceCountUtil; +import io.scalecube.cluster.transport.api.DistinctErrors; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.MessageCodec; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.errors.DistinctErrors; -import io.scalecube.net.Address; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.time.Duration; import java.util.Map; import java.util.Objects; @@ -36,6 +36,7 @@ public final class TransportImpl implements Transport { private static final Logger LOGGER = LoggerFactory.getLogger(Transport.class); + private static final DistinctErrors DISTINCT_ERRORS = new DistinctErrors(Duration.ofMinutes(1)); private final MessageCodec messageCodec; @@ -48,15 +49,15 @@ public final class TransportImpl implements Transport { private final Sinks.One onStop = Sinks.one(); // Server - private Address address; + private String address; private DisposableServer server; - private final Map> connections = new ConcurrentHashMap<>(); + private final Map> connections = new ConcurrentHashMap<>(); private final LoopResources loopResources = LoopResources.create("sc-cluster-io", 1, true); // Transport factory private final Receiver receiver; private final Sender sender; - private final Function addressMapper; + private final Function addressMapper; /** * Constructor with config as parameter. @@ -72,21 +73,30 @@ public TransportImpl( MessageCodec messageCodec, Receiver receiver, Sender sender, - Function addressMapper) { + Function addressMapper) { this.messageCodec = messageCodec; this.receiver = receiver; this.sender = sender; this.addressMapper = addressMapper; } - private static Address prepareAddress(DisposableServer server) { + private static String prepareAddress(DisposableServer server) { final InetSocketAddress serverAddress = (InetSocketAddress) server.address(); - InetAddress inetAddress = serverAddress.getAddress(); - int port = serverAddress.getPort(); + final InetAddress inetAddress = serverAddress.getAddress(); + final int port = serverAddress.getPort(); + if (inetAddress.isAnyLocalAddress()) { - return Address.create(Address.getLocalIpAddress().getHostAddress(), port); + return getLocalHostAddress() + ":" + port; } else { - return Address.create(inetAddress.getHostAddress(), port); + return inetAddress.getHostAddress() + ":" + port; + } + } + + private static String getLocalHostAddress() { + try { + return InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); } } @@ -96,7 +106,7 @@ private void init(DisposableServer server) { // Setup cleanup stop.asMono() .then(doStop()) - .doFinally(s -> onStop.emitEmpty(RETRY_NON_SERIALIZED)) + .doFinally(s -> onStop.emitEmpty(busyLooping(Duration.ofSeconds(3)))) .subscribe( null, ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", address, ex.toString())); } @@ -122,7 +132,7 @@ public Mono start() { } @Override - public Address address() { + public String address() { return address; } @@ -132,10 +142,10 @@ public boolean isStopped() { } @Override - public final Mono stop() { + public Mono stop() { return Mono.defer( () -> { - stop.emitEmpty(RETRY_NON_SERIALIZED); + stop.emitEmpty(busyLooping(Duration.ofSeconds(3))); return onStop.asMono(); }); } @@ -145,7 +155,7 @@ private Mono doStop() { () -> { LOGGER.info("[{}][doStop] Stopping", address); // Complete incoming messages observable - sink.emitComplete(RETRY_NON_SERIALIZED); + sink.emitComplete(busyLooping(Duration.ofSeconds(3))); return Flux.concatDelayError(closeServer(), shutdownLoopResources()) .then() .doFinally(s -> connections.clear()) @@ -154,12 +164,12 @@ private Mono doStop() { } @Override - public final Flux listen() { + public Flux listen() { return sink.asFlux().onBackpressureBuffer(); } @Override - public Mono send(Address address, Message message) { + public Mono send(String address, Message message) { return Mono.deferContextual(context -> connections.computeIfAbsent(address, this::connect)) .flatMap( connection -> @@ -172,7 +182,7 @@ public Mono send(Address address, Message message) { } @Override - public Mono requestResponse(Address address, final Message request) { + public Mono requestResponse(String address, final Message request) { return Mono.create( sink -> { Objects.requireNonNull(request, "request must be not null"); @@ -224,8 +234,8 @@ private ByteBuf encodeMessage(Message message) { return byteBuf; } - private Mono connect(Address remoteAddress) { - final Address mappedAddr = addressMapper.apply(remoteAddress); + private Mono connect(String remoteAddress) { + final String mappedAddr = addressMapper.apply(remoteAddress); return sender .connect(mappedAddr) .doOnSuccess( @@ -277,13 +287,13 @@ private Mono shutdownLoopResources() { public static final class ReceiverContext { - private final Address address; + private final String address; private final Sinks.Many sink; private final LoopResources loopResources; private final Function messageDecoder; private ReceiverContext( - Address address, + String address, Sinks.Many sink, LoopResources loopResources, Function messageDecoder) { @@ -313,7 +323,7 @@ public void onMessage(ByteBuf byteBuf) { return; } final Message message = messageDecoder.apply(byteBuf); - sink.emitNext(message, RETRY_NON_SERIALIZED); + sink.emitNext(message, busyLooping(Duration.ofSeconds(3))); } catch (Exception e) { LOGGER.error("[{}][onMessage] Exception occurred:", address, e); } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java index 708d1a95..80450f31 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java @@ -1,9 +1,11 @@ package io.scalecube.transport.netty.tcp; +import static io.scalecube.cluster.transport.api.Transport.parseHost; +import static io.scalecube.cluster.transport.api.Transport.parsePort; + import io.netty.channel.ChannelOption; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.TransportConfig; -import io.scalecube.net.Address; import io.scalecube.transport.netty.Sender; import io.scalecube.transport.netty.TransportImpl.SenderContext; import reactor.core.publisher.Mono; @@ -19,7 +21,7 @@ public final class TcpSender implements Sender { } @Override - public Mono connect(Address address) { + public Mono connect(String address) { return Mono.deferContextual(context -> Mono.just(context.get(SenderContext.class))) .map(context -> newTcpClient(context, address)) .flatMap(TcpClient::connect); @@ -38,12 +40,12 @@ public Mono send(Message message) { }); } - private TcpClient newTcpClient(SenderContext context, Address address) { + private TcpClient newTcpClient(SenderContext context, String address) { TcpClient tcpClient = TcpClient.newConnection() .runOn(context.loopResources()) - .host(address.host()) - .port(address.port()) + .host(parseHost(address)) + .port(parsePort(address)) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_REUSEADDR, true) diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java index 704a7f3d..34762917 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java @@ -1,10 +1,12 @@ package io.scalecube.transport.netty.websocket; +import static io.scalecube.cluster.transport.api.Transport.parseHost; +import static io.scalecube.cluster.transport.api.Transport.parsePort; + import io.netty.channel.ChannelOption; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.TransportConfig; -import io.scalecube.net.Address; import io.scalecube.transport.netty.Sender; import io.scalecube.transport.netty.TransportImpl.SenderContext; import reactor.core.publisher.Mono; @@ -21,7 +23,7 @@ public WebsocketSender(TransportConfig config) { } @Override - public Mono connect(Address address) { + public Mono connect(String address) { return Mono.deferContextual(context -> Mono.just(context.get(SenderContext.class))) .map(context -> newWebsocketSender(context, address)) .flatMap(sender -> sender.uri("/").connect()); @@ -44,12 +46,12 @@ public Mono send(Message message) { }); } - private HttpClient.WebsocketSender newWebsocketSender(SenderContext context, Address address) { + private HttpClient.WebsocketSender newWebsocketSender(SenderContext context, String address) { HttpClient httpClient = HttpClient.newConnection() .runOn(context.loopResources()) - .host(address.host()) - .port(address.port()) + .host(parseHost(address)) + .port(parsePort(address)) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_REUSEADDR, true) diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java index 3d07e84d..a8638bf8 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java @@ -4,7 +4,6 @@ import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import io.scalecube.transport.netty.tcp.TcpTransportFactory; import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; @@ -37,7 +36,7 @@ public final void baseTearDown(TestInfo testInfo) { * @param to destination * @param msg request */ - protected Mono send(Transport transport, Address to, Message msg) { + protected Mono send(Transport transport, String to, Message msg) { return transport .send(to, msg) .doOnError( diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java index 753622d7..417c6f0b 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java @@ -4,7 +4,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import io.scalecube.transport.netty.BaseTest; import java.time.Duration; import java.util.ArrayList; @@ -220,7 +219,7 @@ private void assertSendOrder(int total, List received) { } } - private Callable sender(int id, Transport client, Address address, int total) { + private Callable sender(int id, Transport client, String address, int total) { return () -> { for (int j = 0; j < total; j++) { String correlationId = id + "/" + j; diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java index f8bb8daa..9c5ed4b9 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java @@ -8,7 +8,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import io.scalecube.transport.netty.BaseTest; import java.io.IOException; import java.net.UnknownHostException; @@ -45,7 +44,7 @@ public void testUnresolvedHostConnection() { client = createTcpTransport(); // create transport with wrong host try { - Address address = Address.from("wronghost:49255"); + String address = "wronghost:49255"; Message message = Message.withData("q").build(); client.send(address, message).block(Duration.ofSeconds(20)); fail("fail"); @@ -57,7 +56,7 @@ public void testUnresolvedHostConnection() { @Test public void testInteractWithNoConnection(TestInfo testInfo) { - Address serverAddress = Address.from("localhost:49255"); + String serverAddress = "localhost:49255"; for (int i = 0; i < 10; i++) { LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); @@ -94,7 +93,7 @@ public void testPingPongClientTfListenAndServerTfListen() throws Exception { .listen() .subscribe( message -> { - Address address = message.sender(); + String address = message.sender(); assertEquals(client.address(), address, "Expected clientAddress"); send(server, address, Message.fromQualifier("hi client")).subscribe(); }); diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java index ab17c5b1..8ed11235 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java @@ -4,7 +4,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import io.scalecube.transport.netty.BaseTest; import java.time.Duration; import java.util.ArrayList; @@ -220,7 +219,7 @@ private void assertSendOrder(int total, List received) { } } - private Callable sender(int id, Transport client, Address address, int total) { + private Callable sender(int id, Transport client, String address, int total) { return () -> { for (int j = 0; j < total; j++) { String correlationId = id + "/" + j; diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java index 050474af..9d5867c4 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java @@ -8,7 +8,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import io.scalecube.transport.netty.BaseTest; import java.io.IOException; import java.net.UnknownHostException; @@ -45,7 +44,7 @@ public void testUnresolvedHostConnection() { client = createWebsocketTransport(); // create transport with wrong host try { - Address address = Address.from("wronghost:49255"); + String address = "wronghost:49255"; Message message = Message.withData("q").build(); client.send(address, message).block(Duration.ofSeconds(20)); fail("fail"); @@ -57,7 +56,7 @@ public void testUnresolvedHostConnection() { @Test public void testInteractWithNoConnection(TestInfo testInfo) { - Address serverAddress = Address.from("localhost:49255"); + String serverAddress = "localhost:49255"; for (int i = 0; i < 10; i++) { LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); @@ -94,7 +93,7 @@ public void testPingPongClientTfListenAndServerTfListen() throws Exception { .listen() .subscribe( message -> { - Address address = message.sender(); + String address = message.sender(); assertEquals(client.address(), address, "Expected clientAddress"); send(server, address, Message.fromQualifier("hi client")).subscribe(); });