diff --git a/cluster-api/pom.xml b/cluster-api/pom.xml index c81d3a51..16ff3dcf 100644 --- a/cluster-api/pom.xml +++ b/cluster-api/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -12,10 +14,6 @@ ScaleCube/ClusterApi - - io.scalecube - scalecube-commons - ${project.groupId} scalecube-transport-api diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java b/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java index c0df446b..6c949224 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java @@ -1,7 +1,6 @@ package io.scalecube.cluster; import io.scalecube.cluster.transport.api.Message; -import io.scalecube.net.Address; import java.util.Collection; import java.util.Optional; import reactor.core.publisher.Mono; @@ -10,11 +9,11 @@ public interface Cluster { /** - * Returns {@link Address} of this cluster instance. + * Returns address of this cluster instance. * * @return cluster address */ - Address address(); + String address(); /** * Spreads given message between cluster members using gossiping protocol. @@ -52,7 +51,7 @@ public interface Cluster { * * @return member by id */ - Optional member(String id); + Optional memberById(String id); /** * Returns cluster member by given address or null if no member with such address exists at joined @@ -60,7 +59,7 @@ public interface Cluster { * * @return member by address */ - Optional member(Address address); + Optional memberByAddress(String address); /** * Returns list of all members of the joined cluster. This will include all cluster members diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Member.java b/cluster-api/src/main/java/io/scalecube/cluster/Member.java index 65dc7835..8963930f 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/Member.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/Member.java @@ -1,7 +1,6 @@ package io.scalecube.cluster; import io.scalecube.cluster.membership.MembershipConfig; -import io.scalecube.net.Address; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -20,7 +19,7 @@ public final class Member implements Externalizable { private String id; private String alias; - private Address address; + private String address; private String namespace; public Member() {} @@ -33,7 +32,7 @@ public Member() {} * @param address member address; not null * @param namespace namespace; not null */ - public Member(String id, String alias, Address address, String namespace) { + public Member(String id, String alias, String address, String namespace) { this.id = Objects.requireNonNull(id, "member id"); this.alias = alias; // optional this.address = Objects.requireNonNull(address, "member address"); @@ -76,7 +75,7 @@ public String namespace() { * @see io.scalecube.cluster.transport.api.TransportConfig#port(int) * @return member address */ - public Address address() { + public String address() { return address; } @@ -110,7 +109,7 @@ public void writeExternal(ObjectOutput out) throws IOException { out.writeUTF(alias); } // address - out.writeUTF(address.toString()); + out.writeUTF(address); // namespace out.writeUTF(namespace); } @@ -125,7 +124,7 @@ public void readExternal(ObjectInput in) throws IOException { alias = in.readUTF(); } // address - address = Address.from(in.readUTF()); + address = in.readUTF(); // namespace this.namespace = in.readUTF(); } diff --git a/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java b/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java index 01b8114c..630210e3 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java @@ -1,6 +1,5 @@ package io.scalecube.cluster.membership; -import io.scalecube.net.Address; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -24,7 +23,7 @@ public final class MembershipConfig implements Cloneable { public static final int DEFAULT_LOCAL_SUSPICION_MULT = 3; public static final int DEFAULT_LOCAL_SYNC_INTERVAL = 15_000; - private List
seedMembers = Collections.emptyList(); + private List seedMembers = Collections.emptyList(); private int syncInterval = DEFAULT_SYNC_INTERVAL; private int syncTimeout = DEFAULT_SYNC_TIMEOUT; private int suspicionMult = DEFAULT_SUSPICION_MULT; @@ -67,7 +66,7 @@ public static MembershipConfig defaultLocalConfig() { .syncInterval(DEFAULT_LOCAL_SYNC_INTERVAL); } - public List
seedMembers() { + public List seedMembers() { return seedMembers; } @@ -77,7 +76,7 @@ public List
seedMembers() { * @param seedMembers seed members * @return new {@code MembershipConfig} instance */ - public MembershipConfig seedMembers(Address... seedMembers) { + public MembershipConfig seedMembers(String... seedMembers) { return seedMembers(Arrays.asList(seedMembers)); } @@ -87,7 +86,7 @@ public MembershipConfig seedMembers(Address... seedMembers) { * @param seedMembers seed members * @return new {@code MembershipConfig} instance */ - public MembershipConfig seedMembers(List
seedMembers) { + public MembershipConfig seedMembers(List seedMembers) { MembershipConfig m = clone(); m.seedMembers = Collections.unmodifiableList(new ArrayList<>(seedMembers)); return m; diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java index d51a7526..552c10bc 100644 --- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java +++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java @@ -1,7 +1,6 @@ package io.scalecube.cluster.utils; import io.scalecube.cluster.transport.api.Message; -import io.scalecube.net.Address; import java.time.Duration; import java.util.Arrays; import java.util.Collection; @@ -30,21 +29,21 @@ public final class NetworkEmulator { private volatile OutboundSettings defaultOutboundSettings = new OutboundSettings(0, 0); private volatile InboundSettings defaultInboundSettings = new InboundSettings(true); - private final Map outboundSettings = new ConcurrentHashMap<>(); - private final Map inboundSettings = new ConcurrentHashMap<>(); + private final Map outboundSettings = new ConcurrentHashMap<>(); + private final Map inboundSettings = new ConcurrentHashMap<>(); private final AtomicLong totalMessageSentCount = new AtomicLong(); private final AtomicLong totalOutboundMessageLostCount = new AtomicLong(); private final AtomicLong totalInboundMessageLostCount = new AtomicLong(); - private final Address address; + private final String address; /** * Creates new instance of network emulator. * * @param address local address */ - NetworkEmulator(Address address) { + NetworkEmulator(String address) { this.address = address; } @@ -56,7 +55,7 @@ public final class NetworkEmulator { * @param destination address of target endpoint * @return network outbound settings */ - public OutboundSettings outboundSettings(Address destination) { + public OutboundSettings outboundSettings(String destination) { return outboundSettings.getOrDefault(destination, defaultOutboundSettings); } @@ -67,7 +66,7 @@ public OutboundSettings outboundSettings(Address destination) { * @param lossPercent loss in percents * @param meanDelay mean delay */ - public void outboundSettings(Address destination, int lossPercent, int meanDelay) { + public void outboundSettings(String destination, int lossPercent, int meanDelay) { OutboundSettings settings = new OutboundSettings(lossPercent, meanDelay); outboundSettings.put(destination, settings); LOGGER.debug("[{}] Set outbound settings {} to {}", address, settings, destination); @@ -103,7 +102,7 @@ public void unblockAllOutbound() { * * @param destinations collection of target endpoints where to apply */ - public void blockOutbound(Address... destinations) { + public void blockOutbound(String... destinations) { blockOutbound(Arrays.asList(destinations)); } @@ -112,8 +111,8 @@ public void blockOutbound(Address... destinations) { * * @param destinations collection of target endpoints where to apply */ - public void blockOutbound(Collection
destinations) { - for (Address destination : destinations) { + public void blockOutbound(Collection destinations) { + for (String destination : destinations) { outboundSettings.put(destination, new OutboundSettings(100, 0)); } LOGGER.debug("[{}] Blocked outbound to {}", address, destinations); @@ -124,7 +123,7 @@ public void blockOutbound(Collection
destinations) { * * @param destinations collection of target endpoints where to apply */ - public void unblockOutbound(Address... destinations) { + public void unblockOutbound(String... destinations) { unblockOutbound(Arrays.asList(destinations)); } @@ -133,7 +132,7 @@ public void unblockOutbound(Address... destinations) { * * @param destinations collection of target endpoints where to apply */ - public void unblockOutbound(Collection
destinations) { + public void unblockOutbound(Collection destinations) { destinations.forEach(outboundSettings::remove); LOGGER.debug("[{}] Unblocked outbound {}", address, destinations); } @@ -164,7 +163,7 @@ public long totalOutboundMessageLostCount() { * @param address target address * @return mono message */ - public Mono tryFailOutbound(Message msg, Address address) { + public Mono tryFailOutbound(Message msg, String address) { return Mono.defer( () -> { totalMessageSentCount.incrementAndGet(); @@ -187,7 +186,7 @@ public Mono tryFailOutbound(Message msg, Address address) { * @param address target address * @return mono message */ - public Mono tryDelayOutbound(Message msg, Address address) { + public Mono tryDelayOutbound(Message msg, String address) { return Mono.defer( () -> { totalMessageSentCount.incrementAndGet(); @@ -209,7 +208,7 @@ public Mono tryDelayOutbound(Message msg, Address address) { * @param destination address of target endpoint * @return network inbound settings */ - public InboundSettings inboundSettings(Address destination) { + public InboundSettings inboundSettings(String destination) { return inboundSettings.getOrDefault(destination, defaultInboundSettings); } @@ -218,7 +217,7 @@ public InboundSettings inboundSettings(Address destination) { * * @param shallPass shallPass inbound flag */ - public void inboundSettings(Address destination, boolean shallPass) { + public void inboundSettings(String destination, boolean shallPass) { InboundSettings settings = new InboundSettings(shallPass); inboundSettings.put(destination, settings); LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination); @@ -253,7 +252,7 @@ public void unblockAllInbound() { * * @param destinations collection of target endpoints where to apply */ - public void blockInbound(Address... destinations) { + public void blockInbound(String... destinations) { blockInbound(Arrays.asList(destinations)); } @@ -262,8 +261,8 @@ public void blockInbound(Address... destinations) { * * @param destinations collection of target endpoints where to apply */ - public void blockInbound(Collection
destinations) { - for (Address destination : destinations) { + public void blockInbound(Collection destinations) { + for (String destination : destinations) { inboundSettings.put(destination, new InboundSettings(false)); } LOGGER.debug("[{}] Blocked inbound from {}", address, destinations); @@ -274,7 +273,7 @@ public void blockInbound(Collection
destinations) { * * @param destinations collection of target endpoints where to apply */ - public void unblockInbound(Address... destinations) { + public void unblockInbound(String... destinations) { unblockInbound(Arrays.asList(destinations)); } @@ -283,7 +282,7 @@ public void unblockInbound(Address... destinations) { * * @param destinations collection of target endpoints where to apply */ - public void unblockInbound(Collection
destinations) { + public void unblockInbound(Collection destinations) { destinations.forEach(inboundSettings::remove); LOGGER.debug("[{}] Unblocked inbound from {}", address, destinations); } diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java index 381042c5..dcfbf677 100644 --- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java +++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java @@ -2,7 +2,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -26,7 +25,7 @@ public NetworkEmulator networkEmulator() { } @Override - public Address address() { + public String address() { return transport.address(); } @@ -46,7 +45,7 @@ public boolean isStopped() { } @Override - public Mono send(Address address, Message message) { + public Mono send(String address, Message message) { return Mono.defer( () -> Mono.just(enhanceWithSender(message)) @@ -56,7 +55,7 @@ public Mono send(Address address, Message message) { } @Override - public Mono requestResponse(Address address, Message request) { + public Mono requestResponse(String address, Message request) { return Mono.defer( () -> Mono.just(enhanceWithSender(request)) diff --git a/cluster-testlib/src/test/java/io/scalecube/cluster/utils/NetworkEmulatorTest.java b/cluster-testlib/src/test/java/io/scalecube/cluster/utils/NetworkEmulatorTest.java index 1919bc6f..b65137db 100644 --- a/cluster-testlib/src/test/java/io/scalecube/cluster/utils/NetworkEmulatorTest.java +++ b/cluster-testlib/src/test/java/io/scalecube/cluster/utils/NetworkEmulatorTest.java @@ -1,7 +1,6 @@ package io.scalecube.cluster.utils; import io.scalecube.cluster.utils.NetworkEmulator.OutboundSettings; -import io.scalecube.net.Address; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -10,24 +9,23 @@ public class NetworkEmulatorTest extends BaseTest { @Test public void testResolveLinkSettingsBySocketAddress() { // Init network emulator - Address address = Address.from("localhost:1234"); - NetworkEmulator networkEmulator = new NetworkEmulator(address); - networkEmulator.outboundSettings(Address.create("localhost", 5678), 25, 10); - networkEmulator.outboundSettings(Address.create("192.168.0.1", 8765), 10, 20); + NetworkEmulator networkEmulator = new NetworkEmulator("localhost:1234"); + networkEmulator.outboundSettings("localhost:" + 5678, 25, 10); + networkEmulator.outboundSettings("192.168.0.1:" + 8765, 10, 20); networkEmulator.setDefaultOutboundSettings(0, 2); // Check resolve by hostname:port - OutboundSettings link1 = networkEmulator.outboundSettings(Address.create("localhost", 5678)); + OutboundSettings link1 = networkEmulator.outboundSettings("localhost:" + 5678); Assertions.assertEquals(25, link1.lossPercent()); Assertions.assertEquals(10, link1.meanDelay()); // Check resolve by ipaddr:port - OutboundSettings link2 = networkEmulator.outboundSettings(Address.create("192.168.0.1", 8765)); + OutboundSettings link2 = networkEmulator.outboundSettings("192.168.0.1:" + 8765); Assertions.assertEquals(10, link2.lossPercent()); Assertions.assertEquals(20, link2.meanDelay()); // Check default link settings - OutboundSettings link3 = networkEmulator.outboundSettings(Address.create("localhost", 8765)); + OutboundSettings link3 = networkEmulator.outboundSettings("localhost:" + 8765); Assertions.assertEquals(0, link3.lossPercent()); Assertions.assertEquals(2, link3.meanDelay()); } diff --git a/cluster/pom.xml b/cluster/pom.xml index 1fd9ae60..8b9287ed 100644 --- a/cluster/pom.xml +++ b/cluster/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -12,10 +14,6 @@ ScaleCube/Cluster - - io.scalecube - scalecube-commons - ${project.groupId} scalecube-transport-api diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 77942be8..27309586 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -1,6 +1,8 @@ package io.scalecube.cluster; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static io.scalecube.cluster.transport.api.Transport.parseHost; +import static io.scalecube.cluster.transport.api.Transport.parsePort; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.scalecube.cluster.fdetector.FailureDetectorConfig; import io.scalecube.cluster.fdetector.FailureDetectorImpl; @@ -16,9 +18,9 @@ import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.transport.api.TransportFactory; -import io.scalecube.net.Address; import java.io.Serializable; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Objects; @@ -111,14 +113,14 @@ private void initLifecycle() { start .asMono() .then(doStart()) - .doOnSuccess(avoid -> onStart.emitEmpty(RETRY_NON_SERIALIZED)) - .doOnError(th -> onStart.emitError(th, RETRY_NON_SERIALIZED)) + .doOnSuccess(avoid -> onStart.emitEmpty(busyLooping(Duration.ofSeconds(3)))) + .doOnError(th -> onStart.emitError(th, busyLooping(Duration.ofSeconds(3)))) .subscribe(null, th -> LOGGER.error("[{}][doStart] Exception occurred:", localMember, th)); shutdown .asMono() .then(doShutdown()) - .doFinally(s -> onShutdown.emitEmpty(RETRY_NON_SERIALIZED)) + .doFinally(s -> onShutdown.emitEmpty(busyLooping(Duration.ofSeconds(3)))) .subscribe( null, th -> @@ -224,7 +226,7 @@ public ClusterImpl handler(Function handler) { public Mono start() { return Mono.defer( () -> { - start.emitEmpty(RETRY_NON_SERIALIZED); + start.emitEmpty(busyLooping(Duration.ofSeconds(3))); return onStart.asMono().thenReturn(this); }); } @@ -244,7 +246,7 @@ private Mono doStart0() { localMember = createLocalMember(boundTransport.address()); transport = new SenderAwareTransport(boundTransport, localMember.address()); - scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true); + scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address(), true); failureDetector = new FailureDetectorImpl( @@ -283,9 +285,10 @@ private Mono doStart0() { /*.publishOn(scheduler)*/ // Dont uncomment, already beign executed inside scalecube-cluster thread .subscribe( - event -> membershipSink.emitNext(event, RETRY_NON_SERIALIZED), + event -> + membershipSink.emitNext(event, busyLooping(Duration.ofSeconds(3))), ex -> LOGGER.error("[{}][membership][error] cause:", localMember, ex), - () -> membershipSink.emitComplete(RETRY_NON_SERIALIZED))); + () -> membershipSink.emitComplete(busyLooping(Duration.ofSeconds(3))))); return Mono.fromRunnable(() -> failureDetector.start()) .then(Mono.fromRunnable(() -> gossip.start())) @@ -371,14 +374,13 @@ private Flux listenMembership() { * @param address transport address * @return local cluster member with cluster address and cluster member id */ - private Member createLocalMember(Address address) { - int port = Optional.ofNullable(config.externalPort()).orElse(address.port()); + private Member createLocalMember(String address) { + final String finalHost = + Optional.ofNullable(config.externalHost()).orElseGet(() -> parseHost(address)); + final int finalPort = + Optional.ofNullable(config.externalPort()).orElseGet(() -> parsePort(address)); - // calculate local member cluster address - Address memberAddress = - Optional.ofNullable(config.externalHost()) - .map(host -> Address.create(host, port)) - .orElseGet(() -> Address.create(address.host(), port)); + final String memberAddress = finalHost + ":" + finalPort; return new Member( config.memberId() != null ? config.memberId() : UUID.randomUUID().toString(), @@ -388,7 +390,7 @@ private Member createLocalMember(Address address) { } @Override - public Address address() { + public String address() { return member().address(); } @@ -431,13 +433,13 @@ public Member member() { } @Override - public Optional member(String id) { - return membership.member(id); + public Optional memberById(String id) { + return membership.memberById(id); } @Override - public Optional member(Address address) { - return membership.member(address); + public Optional memberByAddress(String address) { + return membership.memberByAddress(address); } @Override @@ -449,7 +451,7 @@ public Mono updateMetadata(T metadata) { @Override public void shutdown() { - shutdown.emitEmpty(RETRY_NON_SERIALIZED); + shutdown.emitEmpty(busyLooping(Duration.ofSeconds(3))); } private Mono doShutdown() { @@ -498,15 +500,15 @@ public Mono onShutdown() { private static class SenderAwareTransport implements Transport { private final Transport transport; - private final Address address; + private final String address; - private SenderAwareTransport(Transport transport, Address address) { + private SenderAwareTransport(Transport transport, String address) { this.transport = Objects.requireNonNull(transport); this.address = Objects.requireNonNull(address); } @Override - public Address address() { + public String address() { return transport.address(); } @@ -526,12 +528,12 @@ public boolean isStopped() { } @Override - public Mono send(Address address, Message message) { + public Mono send(String address, Message message) { return Mono.defer(() -> transport.send(address, enhanceWithSender(message))); } @Override - public Mono requestResponse(Address address, Message request) { + public Mono requestResponse(String address, Message request) { return Mono.defer(() -> transport.requestResponse(address, enhanceWithSender(request))); } diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index c7540bc0..274f5a08 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -1,6 +1,6 @@ package io.scalecube.cluster.fdetector; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.scalecube.cluster.Member; import io.scalecube.cluster.fdetector.PingData.AckType; @@ -8,7 +8,6 @@ import io.scalecube.cluster.membership.MembershipEvent; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -117,7 +116,7 @@ public void stop() { actionsDisposables.dispose(); // Stop publishing events - sink.emitComplete(RETRY_NON_SERIALIZED); + sink.emitComplete(busyLooping(Duration.ofSeconds(3))); } @Override @@ -145,7 +144,7 @@ private void doPing() { Message pingMsg = Message.withData(pingData).qualifier(PING).correlationId(cid).build(); LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember); - Address address = pingMember.address(); + String address = pingMember.address(); transport .requestResponse(address, pingMsg) .timeout(Duration.ofMillis(config.pingTimeout()), scheduler) @@ -232,7 +231,7 @@ private void onMessage(Message message) { /** Listens to PING message and answers with ACK. */ private void onPing(Message message) { long period = this.currentPeriod; - Address sender = message.sender(); + String sender = message.sender(); LOGGER.debug("[{}][{}] Received Ping from {}", localMember, period, sender); PingData data = message.data(); data = data.withAckType(AckType.DEST_OK); @@ -249,7 +248,7 @@ private void onPing(Message message) { String correlationId = message.correlationId(); Message ackMessage = Message.withData(data).qualifier(PING_ACK).correlationId(correlationId).build(); - Address address = data.getFrom().address(); + String address = data.getFrom().address(); LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, address); transport .send(address, ackMessage) @@ -275,7 +274,7 @@ private void onPingReq(Message message) { PingData pingReqData = new PingData(localMember, target, originalIssuer); Message pingMessage = Message.withData(pingReqData).qualifier(PING).correlationId(correlationId).build(); - Address address = target.address(); + String address = target.address(); LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, address); transport .send(address, pingMessage) @@ -305,7 +304,7 @@ private void onTransitPingAck(Message message) { PingData originalAckData = new PingData(target, data.getTo()).withAckType(ackType); Message originalAckMessage = Message.withData(originalAckData).qualifier(PING_ACK).correlationId(correlationId).build(); - Address address = target.address(); + String address = target.address(); LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, address); transport .send(address, originalAckMessage) @@ -378,7 +377,7 @@ private List selectPingReqMembers(Member pingMember) { private void publishPingResult(long period, Member member, MemberStatus status) { LOGGER.debug("[{}][{}] Member {} detected as {}", localMember, period, member, status); - sink.emitNext(new FailureDetectorEvent(member, status), RETRY_NON_SERIALIZED); + sink.emitNext(new FailureDetectorEvent(member, status), busyLooping(Duration.ofSeconds(3))); } private MemberStatus computeMemberStatus(Message message, long period) { diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java index 1acfd060..3657c06b 100644 --- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java @@ -1,13 +1,13 @@ package io.scalecube.cluster.gossip; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.scalecube.cluster.ClusterMath; import io.scalecube.cluster.Member; import io.scalecube.cluster.membership.MembershipEvent; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -120,7 +120,7 @@ public void stop() { actionsDisposables.dispose(); // Stop publishing events - sink.emitComplete(RETRY_NON_SERIALIZED); + sink.emitComplete(busyLooping(Duration.ofSeconds(3))); } @Override @@ -208,7 +208,7 @@ private void onGossipRequest(Message message) { if (gossipState == null) { // new gossip gossipState = new GossipState(gossip, period); gossips.put(gossip.gossipId(), gossipState); - sink.emitNext(gossip.message(), RETRY_NON_SERIALIZED); + sink.emitNext(gossip.message(), busyLooping(Duration.ofSeconds(3))); } } if (gossipState != null) { @@ -287,7 +287,7 @@ private void spreadGossipsTo(long period, Member member) { } // Send gossip request - Address address = member.address(); + String address = member.address(); gossips.stream() .map(this::buildGossipRequestMessage) diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java index 08907e21..d127a097 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java @@ -1,7 +1,6 @@ package io.scalecube.cluster.membership; import io.scalecube.cluster.Member; -import io.scalecube.net.Address; import java.util.Collection; import java.util.Optional; import reactor.core.publisher.Flux; @@ -53,7 +52,7 @@ public interface MembershipProtocol { * * @return member by id */ - Optional member(String id); + Optional memberById(String id); /** * Returns cluster member by given address or null if no member with such address exists at joined @@ -61,5 +60,5 @@ public interface MembershipProtocol { * * @return member by address */ - Optional member(Address address); + Optional memberByAddress(String address); } diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index fad3bca3..5c73af81 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -3,7 +3,8 @@ import static io.scalecube.cluster.membership.MemberStatus.ALIVE; import static io.scalecube.cluster.membership.MemberStatus.DEAD; import static io.scalecube.cluster.membership.MemberStatus.LEAVING; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static io.scalecube.cluster.transport.api.Transport.parsePort; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.scalecube.cluster.ClusterConfig; import io.scalecube.cluster.ClusterMath; @@ -15,8 +16,8 @@ import io.scalecube.cluster.metadata.MetadataStore; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.file.Path; import java.nio.file.Paths; @@ -74,7 +75,7 @@ private enum MembershipUpdateReason { private final Transport transport; private final MembershipConfig membershipConfig; private final FailureDetectorConfig failureDetectorConfig; - private final List
seedMembers; + private final List seedMembers; private final FailureDetector failureDetector; private final GossipProtocol gossipProtocol; private final MetadataStore metadataStore; @@ -160,18 +161,18 @@ public MembershipProtocolImpl( } // Remove duplicates and local address(es) - private List
cleanUpSeedMembers(Collection
seedMembers) { - InetAddress localIpAddress = Address.getLocalIpAddress(); + private List cleanUpSeedMembers(Collection seedMembers) { + InetAddress localIpAddress = getLocalIpAddress(); String hostAddress = localIpAddress.getHostAddress(); String hostName = localIpAddress.getHostName(); - Address memberAddr = localMember.address(); - Address transportAddr = transport.address(); - Address memberAddrByHostAddress = Address.create(hostAddress, memberAddr.port()); - Address transportAddrByHostAddress = Address.create(hostAddress, transportAddr.port()); - Address memberAddByHostName = Address.create(hostName, memberAddr.port()); - Address transportAddrByHostName = Address.create(hostName, transportAddr.port()); + String memberAddr = localMember.address(); + String transportAddr = transport.address(); + String memberAddrByHostAddress = hostAddress + ":" + parsePort(memberAddr); + String transportAddrByHostAddress = hostAddress + ":" + parsePort(transportAddr); + String memberAddByHostName = hostName + ":" + parsePort(memberAddr); + String transportAddrByHostName = hostName + ":" + parsePort(transportAddr); return new LinkedHashSet<>(seedMembers) .stream() @@ -184,7 +185,15 @@ private List
cleanUpSeedMembers(Collection
seedMembers) { .collect(Collectors.toList()); } - private boolean checkAddressesNotEqual(Address address0, Address address1) { + private static InetAddress getLocalIpAddress() { + try { + return InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + private boolean checkAddressesNotEqual(String address0, String address1) { if (!address0.equals(address1)) { return true; } else { @@ -300,7 +309,7 @@ public void stop() { suspicionTimeoutTasks.clear(); // Stop publishing events - sink.emitComplete(RETRY_NON_SERIALIZED); + sink.emitComplete(busyLooping(Duration.ofSeconds(3))); } @Override @@ -320,18 +329,18 @@ public Member member() { } @Override - public Optional member(String id) { + public Optional memberById(String id) { return Optional.ofNullable(members.get(id)); } @Override - public Optional member(Address address) { + public Optional memberByAddress(String address) { return new ArrayList<>(members.values()) .stream().filter(member -> member.address().equals(address)).findFirst(); } private void doSync() { - Address address = selectSyncAddress().orElse(null); + String address = selectSyncAddress().orElse(null); if (address == null) { return; } @@ -390,7 +399,7 @@ private Mono onSyncAck(Message syncAckMsg, boolean onStart) { private Mono onSync(Message syncMsg) { return Mono.defer( () -> { - final Address sender = syncMsg.sender(); + final String sender = syncMsg.sender(); LOGGER.debug("[{}] Received Sync from {}", localMember, sender); return syncMembership(syncMsg.data(), false) .doOnSuccess( @@ -425,7 +434,7 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { // Alive won't override SUSPECT so issue instead extra sync with member to force it spread // alive with inc + 1 Message syncMsg = prepareSyncDataMsg(SYNC, null); - Address address = fdEvent.member().address(); + String address = fdEvent.member().address(); transport .send(address, syncMsg) .subscribe( @@ -464,8 +473,8 @@ private void onMembershipGossip(Message message) { } } - private Optional
selectSyncAddress() { - List
addresses = + private Optional selectSyncAddress() { + List addresses = Stream.concat(seedMembers.stream(), otherMembers().stream().map(Member::address)) .collect(Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new)); Collections.shuffle(addresses); @@ -726,7 +735,7 @@ private Mono onLeavingDetected(MembershipRecord r0, MembershipRecord r1) { private void publishEvent(MembershipEvent event) { LOGGER.info("[{}][publishEvent] {}", localMember, event); - sink.emitNext(event, RETRY_NON_SERIALIZED); + sink.emitNext(event, busyLooping(Duration.ofSeconds(3))); } private Mono onDeadMemberDetected(MembershipRecord r1) { diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java index 35ba5328..f86680cf 100644 --- a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java @@ -4,7 +4,6 @@ import io.scalecube.cluster.Member; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import java.nio.ByteBuffer; import java.time.Duration; import java.util.HashMap; @@ -148,7 +147,7 @@ public Mono fetchMetadata(Member member) { return Mono.defer( () -> { final String cid = UUID.randomUUID().toString(); - final Address targetAddress = member.address(); + final String targetAddress = member.address(); LOGGER.debug("[{}][{}] Getting metadata for member {}", localMember, cid, member); @@ -196,7 +195,7 @@ private void onMessage(Message message) { } private void onMetadataRequest(Message message) { - final Address sender = message.sender(); + final String sender = message.sender(); LOGGER.debug("[{}] Received GetMetadataReq from {}", localMember, sender); GetMetadataRequest reqData = message.data(); diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java index dd104ade..f8ac8eda 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java @@ -1,5 +1,6 @@ package io.scalecube.cluster; +import static io.scalecube.cluster.transport.api.Transport.parsePort; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -8,9 +9,9 @@ import io.scalecube.cluster.membership.MembershipEvent; import io.scalecube.cluster.membership.MembershipEvent.Type; import io.scalecube.cluster.metadata.MetadataCodec; -import io.scalecube.net.Address; import io.scalecube.transport.netty.tcp.TcpTransportFactory; import java.net.InetAddress; +import java.net.UnknownHostException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -37,7 +38,7 @@ public class ClusterTest extends BaseTest { @Test public void testStartStopRepeatedly() throws Exception { - Address address = Address.from("localhost:4848"); + String address = "localhost:4848"; // Start seed node Cluster seedNode = @@ -45,7 +46,7 @@ public void testStartStopRepeatedly() throws Exception { .gossip(opts -> opts.gossipInterval(100)) .failureDetector(opts -> opts.pingInterval(100)) .membership(opts -> opts.syncInterval(100)) - .transport(opts -> opts.port(address.port())) + .transport(opts -> opts.port(parsePort(address))) .transport(opts -> opts.connectTimeout(CONNECT_TIMEOUT)) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -72,7 +73,7 @@ public void testStartStopRepeatedly() throws Exception { .gossip(opts -> opts.gossipInterval(100)) .failureDetector(opts -> opts.pingInterval(100)) .membership(opts -> opts.syncInterval(100)) - .transport(opts -> opts.port(address.port())) + .transport(opts -> opts.port(parsePort(address))) .transport(opts -> opts.connectTimeout(CONNECT_TIMEOUT)) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -101,8 +102,8 @@ public void testMembersAccessFromScheduler() { // Members by address - Optional otherNodeOnSeedNode = seedNode.member(otherNode.address()); - Optional seedNodeOnOtherNode = otherNode.member(seedNode.address()); + Optional otherNodeOnSeedNode = seedNode.memberByAddress(otherNode.address()); + Optional seedNodeOnOtherNode = otherNode.memberByAddress(seedNode.address()); assertEquals(otherNode.member(), otherNodeOnSeedNode.orElse(null)); assertEquals(seedNode.member(), seedNodeOnOtherNode.orElse(null)); @@ -112,15 +113,11 @@ public void testMembersAccessFromScheduler() { @Test public void testJoinLocalhostIgnored() throws InterruptedException { - InetAddress localIpAddress = Address.getLocalIpAddress(); - Address localAddressByHostname = Address.create(localIpAddress.getHostName(), 4801); - Address localAddressByIp = Address.create(localIpAddress.getHostAddress(), 4801); - Address[] addresses = { - Address.from("localhost:4801"), - Address.from("127.0.0.1:4801"), - Address.from("127.0.1.1:4801"), - localAddressByHostname, - localAddressByIp + InetAddress localIpAddress = getLocalIpAddress(); + String localAddressByHostname = localIpAddress.getHostName() + ":" + 4801; + String localAddressByIp = localIpAddress.getHostAddress() + ":" + 4801; + String[] addresses = { + "localhost:4801", "127.0.0.1:4801", "127.0.1.1:4801", localAddressByHostname, localAddressByIp }; // Start seed node @@ -141,15 +138,11 @@ public void testJoinLocalhostIgnored() throws InterruptedException { @Test public void testJoinLocalhostIgnoredWithOverride() throws InterruptedException { - InetAddress localIpAddress = Address.getLocalIpAddress(); - Address localAddressByHostname = Address.create(localIpAddress.getHostName(), 7878); - Address localAddressByIp = Address.create(localIpAddress.getHostAddress(), 7878); - Address[] addresses = { - Address.from("localhost:7878"), - Address.from("127.0.0.1:7878"), - Address.from("127.0.1.1:7878"), - localAddressByHostname, - localAddressByIp + InetAddress localIpAddress = getLocalIpAddress(); + String localAddressByHostname = localIpAddress.getHostName() + ":" + 7878; + String localAddressByIp = localIpAddress.getHostAddress() + ":" + 7878; + String[] addresses = { + "localhost:7878", "127.0.0.1:7878", "127.0.1.1:7878", localAddressByHostname, localAddressByIp }; // Start seed node @@ -242,7 +235,7 @@ public void onMembershipEvent(MembershipEvent event) { // Check all test members know valid metadata for (Cluster node : otherNodes) { - Optional memberOptional = node.member(metadataNode.member().id()); + Optional memberOptional = node.memberById(metadataNode.member().id()); assertTrue(memberOptional.isPresent()); Member member = memberOptional.get(); assertEquals(metadata, node.metadata(member).orElse(null)); @@ -254,7 +247,7 @@ public void onMembershipEvent(MembershipEvent event) { // Check all nodes had updated metadata member for (Cluster node : otherNodes) { - Optional memberOptional = node.member(metadataNode.member().id()); + Optional memberOptional = node.memberById(metadataNode.member().id()); assertTrue(memberOptional.isPresent()); Member member = memberOptional.get(); assertEquals(updatedMetadata, node.metadata(member).orElse(null)); @@ -315,7 +308,7 @@ public void onMembershipEvent(MembershipEvent event) { // Check all test members know valid metadata for (Cluster node : otherNodes) { - Optional memberOptional = node.member(metadataNode.member().id()); + Optional memberOptional = node.memberById(metadataNode.member().id()); assertTrue(memberOptional.isPresent()); Member member = memberOptional.get(); assertEquals(metadata, node.metadata(member).orElse(null)); @@ -328,7 +321,7 @@ public void onMembershipEvent(MembershipEvent event) { // Check all nodes had updated metadata member for (Cluster node : otherNodes) { - Optional memberOptional = node.member(metadataNode.member().id()); + Optional memberOptional = node.memberById(metadataNode.member().id()); assertTrue(memberOptional.isPresent()); Member member = memberOptional.get(); //noinspection unchecked,OptionalGetWithoutIsPresent @@ -393,7 +386,7 @@ public void onMembershipEvent(MembershipEvent event) { // Check all test members know valid metadata for (Cluster node : otherNodes) { - Optional memberOptional = node.member(metadataNode.member().id()); + Optional memberOptional = node.memberById(metadataNode.member().id()); assertTrue(memberOptional.isPresent()); Member member = memberOptional.get(); assertEquals(metadata, node.metadata(member).orElse(null)); @@ -407,7 +400,7 @@ public void onMembershipEvent(MembershipEvent event) { // Check all nodes had updated metadata member for (Cluster node : otherNodes) { - Optional memberOptional = node.member(metadataNode.member().id()); + Optional memberOptional = node.memberById(metadataNode.member().id()); assertTrue(memberOptional.isPresent()); Member member = memberOptional.get(); //noinspection unchecked,OptionalGetWithoutIsPresent @@ -559,9 +552,9 @@ public void testJoinSeedClusterWithNoExistingSeedMember() { // Start seed node Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait(); - Address nonExistingSeed1 = Address.from("localhost:1234"); - Address nonExistingSeed2 = Address.from("localhost:5678"); - Address[] seeds = new Address[] {nonExistingSeed1, nonExistingSeed2, seedNode.address()}; + String nonExistingSeed1 = "localhost:1234"; + String nonExistingSeed2 = "localhost:5678"; + String[] seeds = new String[] {nonExistingSeed1, nonExistingSeed2, seedNode.address()}; Cluster otherNode = new ClusterImpl() @@ -575,29 +568,15 @@ public void testJoinSeedClusterWithNoExistingSeedMember() { shutdown(Arrays.asList(seedNode, otherNode)); } - private void shutdown(List nodes) { - try { - Mono.whenDelayError( - nodes.stream() - .peek(Cluster::shutdown) - .map(Cluster::onShutdown) - .collect(Collectors.toList())) - .block(TIMEOUT); - } catch (Exception ex) { - LOGGER.error("Exception on cluster shutdown", ex); - } - } - @Test public void testExplicitLocalMemberId() { - ClusterConfig config = ClusterConfig.defaultConfig() - .memberId("test-member"); + ClusterConfig config = ClusterConfig.defaultConfig().memberId("test-member"); ClusterImpl cluster = null; try { - cluster = (ClusterImpl) new ClusterImpl(config) - .transportFactory(TcpTransportFactory::new) - .startAwait(); + cluster = + (ClusterImpl) + new ClusterImpl(config).transportFactory(TcpTransportFactory::new).startAwait(); assertEquals("test-member", cluster.member().id()); } finally { @@ -606,4 +585,25 @@ public void testExplicitLocalMemberId() { } } } + + private static InetAddress getLocalIpAddress() { + try { + return InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + private void shutdown(List nodes) { + try { + Mono.whenDelayError( + nodes.stream() + .peek(Cluster::shutdown) + .map(Cluster::onShutdown) + .collect(Collectors.toList())) + .block(TIMEOUT); + } catch (Exception ex) { + LOGGER.error("Exception on cluster shutdown", ex); + } + } } diff --git a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java index 3c2241d8..bcb89765 100644 --- a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java @@ -2,6 +2,7 @@ import static io.scalecube.cluster.membership.MemberStatus.ALIVE; import static io.scalecube.cluster.membership.MemberStatus.SUSPECT; +import static io.scalecube.cluster.transport.api.Transport.parsePort; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -12,7 +13,6 @@ import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -53,7 +53,7 @@ public void testTrusted() { Transport a = createTransport(); Transport b = createTransport(); Transport c = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address()); + List members = Arrays.asList(a.address(), b.address(), c.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -82,7 +82,7 @@ public void testSuspected() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address()); + List members = Arrays.asList(a.address(), b.address(), c.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -119,7 +119,7 @@ public void testTrustedDespiteBadNetwork() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address()); + List members = Arrays.asList(a.address(), b.address(), c.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -151,7 +151,7 @@ public void testTrustedDespiteDifferentPingTimings() { Transport a = createTransport(); Transport b = createTransport(); Transport c = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address()); + List members = Arrays.asList(a.address(), b.address(), c.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -183,7 +183,7 @@ public void testSuspectedMemberWithBadNetworkGetsPartitioned() throws Exception NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address(), d.address()); + List members = Arrays.asList(a.address(), b.address(), c.address(), d.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -242,7 +242,7 @@ public void testSuspectedMemberWithNormalNetworkGetsPartitioned() throws Excepti NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address(), d.address()); + List members = Arrays.asList(a.address(), b.address(), c.address(), d.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -303,7 +303,7 @@ public void testMemberStatusChangeAfterNetworkRecovery() throws Exception { // Create transports NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); - List
members = Arrays.asList(a.address(), b.address()); + List members = Arrays.asList(a.address(), b.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -346,7 +346,7 @@ public void testStatusChangeAfterMemberRestart() throws Exception { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport x = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), x.address()); + List members = Arrays.asList(a.address(), b.address(), x.address()); // Create failure detectors FailureDetectorImpl fdA = createFd(a, members); @@ -374,7 +374,7 @@ public void testStatusChangeAfterMemberRestart() throws Exception { TimeUnit.SECONDS.sleep(2); // restart node X as XX - xx = createTransport(new TransportConfig().port(x.address().port())); + xx = createTransport(new TransportConfig().port(parsePort(x.address()))); assertEquals(x.address(), xx.address()); fdetectors = Arrays.asList(fdA, fdB, fdXx = createFd(xx, members)); @@ -397,7 +397,7 @@ public void testStatusChangeAfterMemberRestart() throws Exception { } } - private FailureDetectorImpl createFd(Transport transport, List
members) { + private FailureDetectorImpl createFd(Transport transport, List members) { FailureDetectorConfig failureDetectorConfig = FailureDetectorConfig.defaultLocalConfig() // faster config for local testing .pingTimeout(100) @@ -407,15 +407,16 @@ private FailureDetectorImpl createFd(Transport transport, List
members) } private FailureDetectorImpl createFd( - Transport transport, List
addresses, FailureDetectorConfig config) { + Transport transport, List addresses, FailureDetectorConfig config) { Member localMember = - new Member("member-" + transport.address().port(), null, transport.address(), NAMESPACE); + new Member( + "member-" + parsePort(transport.address()), null, transport.address(), NAMESPACE); Flux membershipFlux = Flux.fromIterable(addresses) .filter(address -> !transport.address().equals(address)) - .map(address -> new Member("member-" + address.port(), null, address, NAMESPACE)) + .map(address -> new Member("member-" + parsePort(address), null, address, NAMESPACE)) .map(member -> MembershipEvent.createAdded(member, null, 0)); return new FailureDetectorImpl(localMember, transport, membershipFlux, config, scheduler); @@ -437,11 +438,11 @@ private static void stop(List fdetectors) { } private static void assertStatus( - Address address, + String address, MemberStatus status, Collection events, - Address... expected) { - List
actual = + String... expected) { + List actual = events.stream() .filter(event -> event.status() == status) .map(FailureDetectorEvent::member) @@ -454,7 +455,7 @@ private static void assertStatus( address, expected.length, status, Arrays.toString(expected), events); assertEquals(expected.length, actual.size(), msg1); - for (Address member : expected) { + for (String member : expected) { String msg2 = String.format("Node %s expected as %s %s, but was: %s", address, status, member, events); assertTrue(actual.contains(member), msg2); @@ -462,7 +463,7 @@ private static void assertStatus( } private static Future> listenNextEventFor( - FailureDetectorImpl fd, List
addresses) { + FailureDetectorImpl fd, List addresses) { final Transport transport = BaseTest.getField(fd, "transport"); addresses = new ArrayList<>(addresses); addresses.remove(transport.address()); // exclude self @@ -471,7 +472,7 @@ private static Future> listenNextEventFor( } List> resultFuture = new ArrayList<>(); - for (final Address member : addresses) { + for (final String member : addresses) { final CompletableFuture future = new CompletableFuture<>(); fd.listen().filter(event -> event.member().address() == member).subscribe(future::complete); resultFuture.add(future); diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java index 3857d1e6..1ea1e583 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java @@ -1,5 +1,6 @@ package io.scalecube.cluster.gossip; +import static io.scalecube.cluster.transport.api.Transport.parsePort; import static org.junit.jupiter.api.Assertions.assertEquals; import io.scalecube.cluster.BaseTest; @@ -9,7 +10,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; @@ -74,7 +74,7 @@ private NetworkEmulatorTransport getNetworkEmulatorTransport(int lostPercent, in return transport; } - private GossipProtocolImpl initGossipProtocol(Transport transport, List
members) { + private GossipProtocolImpl initGossipProtocol(Transport transport, List members) { GossipConfig gossipConfig = new GossipConfig() .gossipFanout(gossipFanout) @@ -82,12 +82,13 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List
.gossipRepeatMult(gossipRepeatMultiplier); Member localMember = - new Member("member-" + transport.address().port(), null, transport.address(), NAMESPACE); + new Member( + "member-" + parsePort(transport.address()), null, transport.address(), NAMESPACE); Flux membershipFlux = Flux.fromIterable(members) .filter(address -> !transport.address().equals(address)) - .map(address -> new Member("member-" + address.port(), null, address, NAMESPACE)) + .map(address -> new Member("member-" + parsePort(address), null, address, NAMESPACE)) .map(member -> MembershipEvent.createAdded(member, null, 0)); GossipProtocolImpl gossipProtocol = diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java index ae82bca2..9842c07c 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java @@ -4,6 +4,7 @@ import static io.scalecube.cluster.ClusterMath.gossipDisseminationTime; import static io.scalecube.cluster.ClusterMath.maxMessagesPerGossipPerNode; import static io.scalecube.cluster.ClusterMath.maxMessagesPerGossipTotal; +import static io.scalecube.cluster.transport.api.Transport.parsePort; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -15,7 +16,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -230,7 +230,7 @@ private LongSummaryStatistics computeMessageLostStats(List g private List initGossipProtocols(int count, int lostPercent, int meanDelay) { final List transports = initTransports(count, lostPercent, meanDelay); - List
members = new ArrayList<>(); + List members = new ArrayList<>(); for (Transport transport : transports) { members.add(transport.address()); } @@ -251,7 +251,7 @@ private List initTransports(int count, int lostPercent, int meanDelay return transports; } - private GossipProtocolImpl initGossipProtocol(Transport transport, List
members) { + private GossipProtocolImpl initGossipProtocol(Transport transport, List members) { GossipConfig gossipConfig = new GossipConfig() .gossipFanout(gossipFanout) @@ -259,12 +259,13 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List
.gossipRepeatMult(gossipRepeatMultiplier); Member localMember = - new Member("member-" + transport.address().port(), null, transport.address(), NAMESPACE); + new Member( + "member-" + parsePort(transport.address()), null, transport.address(), NAMESPACE); Flux membershipFlux = Flux.fromIterable(members) .filter(address -> !transport.address().equals(address)) - .map(address -> new Member("member-" + address.port(), null, address, NAMESPACE)) + .map(address -> new Member("member-" + parsePort(address), null, address, NAMESPACE)) .map(member -> MembershipEvent.createAdded(member, null, 0)); GossipProtocolImpl gossipProtocol = diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java index e2dfa1fc..37a3e86d 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java @@ -8,7 +8,6 @@ import io.scalecube.cluster.Member; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.MessageCodec; -import io.scalecube.net.Address; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.Serializable; @@ -39,7 +38,7 @@ public void init() { @Test public void testSerializationAndDeserialization() throws Exception { - Member from = new Member("0", null, Address.from("localhost:1234"), NAMESPACE); + Member from = new Member("0", null, "localhost:1234", NAMESPACE); List gossips = getGossips(); Message message = Message.withData(new GossipRequest(gossips, from.id())).correlationId("CORR_ID").build(); diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index 4fced8e2..310e947f 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -1,5 +1,6 @@ package io.scalecube.cluster.membership; +import static io.scalecube.cluster.transport.api.Transport.parsePort; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -18,7 +19,6 @@ import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.utils.NetworkEmulator; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import java.net.InetAddress; import java.net.UnknownHostException; import java.time.Duration; @@ -76,7 +76,7 @@ public void testLeaveCluster() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address()); + List addresses = Arrays.asList(a.address(), b.address(), c.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -109,9 +109,8 @@ public void testLeaveCluster() { public void testLeaveClusterCameBeforeAlive() { final NetworkEmulatorTransport a = createTransport(); final NetworkEmulatorTransport b = createTransport(); - final Member anotherMember = - new Member("leavingNodeId-1", null, Address.from("localhost:9236"), NAMESPACE); - final List
addresses = Arrays.asList(a.address(), b.address()); + final Member anotherMember = new Member("leavingNodeId-1", null, "localhost:9236", NAMESPACE); + final List addresses = Arrays.asList(a.address(), b.address()); final MembershipProtocolImpl cmA = createMembership(a, addresses); final MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -154,9 +153,8 @@ public void testLeaveClusterCameBeforeAlive() { public void testLeaveClusterOnly() { final NetworkEmulatorTransport a = createTransport(); final NetworkEmulatorTransport b = createTransport(); - final Member anotherMember = - new Member("leavingNodeId-1", null, Address.from("localhost:9236"), NAMESPACE); - final List
addresses = Arrays.asList(a.address(), b.address()); + final Member anotherMember = new Member("leavingNodeId-1", null, "localhost:9236", NAMESPACE); + final List addresses = Arrays.asList(a.address(), b.address()); final MembershipProtocolImpl cmA = createMembership(a, addresses); final MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -188,9 +186,8 @@ public void testLeaveClusterOnly() { public void testLeaveClusterOnSuspectedNode() { final NetworkEmulatorTransport a = createTransport(); final NetworkEmulatorTransport b = createTransport(); - final Member anotherMember = - new Member("leavingNodeId-1", null, Address.from("localhost:9236"), NAMESPACE); - final List
addresses = Arrays.asList(a.address(), b.address()); + final Member anotherMember = new Member("leavingNodeId-1", null, "localhost:9236", NAMESPACE); + final List addresses = Arrays.asList(a.address(), b.address()); final MembershipProtocolImpl cmA = createMembership(a, addresses); final MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -234,7 +231,7 @@ public void testLeaveClusterOnSuspectedNode() { public void testLeaveClusterOnAliveAndSuspectedNode() { final NetworkEmulatorTransport a = createTransport(); final NetworkEmulatorTransport b = createTransport(); - final List
addresses = Arrays.asList(a.address(), b.address()); + final List addresses = Arrays.asList(a.address(), b.address()); final MembershipProtocolImpl cmA = createMembership(a, addresses); final MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -270,7 +267,7 @@ public void testInitialPhaseOk() { Transport a = createTransport(); Transport b = createTransport(); Transport c = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address()); + List addresses = Arrays.asList(a.address(), b.address(), c.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -295,7 +292,7 @@ public void testNetworkPartitionDueNoOutboundThenRecover() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address()); + List addresses = Arrays.asList(a.address(), b.address(), c.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -341,7 +338,7 @@ public void testMemberLostNetworkDueNoOutboundThenRecover() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
members = Arrays.asList(a.address(), b.address(), c.address()); + List members = Arrays.asList(a.address(), b.address(), c.address()); MembershipProtocolImpl cmA = createMembership(a, members); MembershipProtocolImpl cmB = createMembership(b, members); @@ -397,7 +394,7 @@ public void testNetworkPartitionTwiceDueNoOutboundThenRecover() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address()); + List addresses = Arrays.asList(a.address(), b.address(), c.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -467,7 +464,7 @@ public void testNetworkLostOnAllNodesDueNoOutboundThenRecover() { NetworkEmulatorTransport a = createTransport(); NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address()); + List addresses = Arrays.asList(a.address(), b.address(), c.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -523,7 +520,7 @@ public void testLongNetworkPartitionDueNoOutboundThenRemoved() { NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); + List addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -576,7 +573,7 @@ public void testRestartStoppedMembers() { NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); + List addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -656,7 +653,7 @@ public void testRestartStoppedMembersOnSameAddresses() { NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); + List addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -691,8 +688,8 @@ public void testRestartStoppedMembersOnSameAddresses() { assertSuspected(cmB, cmC.member(), cmD.member()); // Restart C and D on same ports - c_Restarted = createTransport(new TransportConfig().port(c.address().port())); - d_Restarted = createTransport(new TransportConfig().port(d.address().port())); + c_Restarted = createTransport(new TransportConfig().port(parsePort(c.address()))); + d_Restarted = createTransport(new TransportConfig().port(parsePort(d.address()))); cmC_Restarted = createMembership(c_Restarted, addresses); cmD_Restarted = createMembership(d_Restarted, addresses); @@ -1047,7 +1044,7 @@ public void testNetworkPartitionManyDueNoInboundThenRemovedThenRecover() { NetworkEmulatorTransport b = createTransport(); NetworkEmulatorTransport c = createTransport(); NetworkEmulatorTransport d = createTransport(); - List
addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); + List addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address()); MembershipProtocolImpl cmA = createMembership(a, addresses); MembershipProtocolImpl cmB = createMembership(b, addresses); @@ -1117,7 +1114,7 @@ public void testNetworkPartitionManyDueNoInboundThenRemovedThenRecover() { } } - private static ClusterConfig testConfig(List
seedAddresses) { + private static ClusterConfig testConfig(List seedAddresses) { // Create faster config for local testing return new ClusterConfig() .membership(opts -> opts.seedMembers(seedAddresses)) @@ -1129,8 +1126,7 @@ private static ClusterConfig testConfig(List
seedAddresses) { .metadataTimeout(100); } - private MembershipProtocolImpl createMembership( - Transport transport, List
seedAddresses) { + private MembershipProtocolImpl createMembership(Transport transport, List seedAddresses) { return createMembership(transport, testConfig(seedAddresses)); } diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipRecordTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipRecordTest.java index 0a5028cb..54a9c869 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipRecordTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipRecordTest.java @@ -9,13 +9,12 @@ import io.scalecube.cluster.BaseTest; import io.scalecube.cluster.Member; -import io.scalecube.net.Address; import org.junit.jupiter.api.Test; public class MembershipRecordTest extends BaseTest { - private final Member member = new Member("0", "0", Address.from("localhost:1234"), "ns-0"); - private final Member anotherMember = new Member("1", "1", Address.from("localhost:4567"), "ns-1"); + private final Member member = new Member("0", "0", "localhost:1234", "ns-0"); + private final Member anotherMember = new Member("1", "1", "localhost:4567", "ns-1"); private final MembershipRecord r0Null = null; diff --git a/pom.xml b/pom.xml index e21c8b82..584671d8 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -33,8 +35,6 @@ - 1.0.24 - 1.7.36 2.17.2 2020.0.32 @@ -117,13 +117,6 @@ import - - - io.scalecube - scalecube-commons - ${scalecube-commons.version} - - com.fasterxml.jackson diff --git a/transport-parent/transport-api/pom.xml b/transport-parent/transport-api/pom.xml index 9a1f4c38..b0a2d71a 100644 --- a/transport-parent/transport-api/pom.xml +++ b/transport-parent/transport-api/pom.xml @@ -1,5 +1,7 @@ - + scalecube-transport-parent io.scalecube @@ -10,11 +12,4 @@ scalecube-transport-api ScaleCube/ClusterTransportApi - - - io.scalecube - scalecube-commons - - - diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/DistinctErrors.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/DistinctErrors.java new file mode 100644 index 00000000..a421de0a --- /dev/null +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/DistinctErrors.java @@ -0,0 +1,139 @@ +package io.scalecube.cluster.transport.api; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class DistinctErrors { + + private final List distinctObservations = new ArrayList<>(); + private final long evictionInterval; + + /** Constructor. */ + public DistinctErrors() { + this(null); + } + + /** + * Constructor. + * + * @param evictionInterval optional, how long consider incoming observation as unique. + */ + public DistinctErrors(Duration evictionInterval) { + this.evictionInterval = + evictionInterval != null && evictionInterval.toMillis() > 0 + ? evictionInterval.toMillis() + : Long.MAX_VALUE; + } + + /** + * Return true if there is an observation (or at least in the eviction time window) of this error + * type for a stack trace. Otherwise a new entry will be created and kept. + * + * @param observation an error observation + * @return true if such observation exists. + */ + public boolean contains(Throwable observation) { + synchronized (this) { + final long now = System.currentTimeMillis(); + DistinctObservation distinctObservation = find(now, distinctObservations, observation); + + if (distinctObservation == null) { + distinctObservations.add(new DistinctObservation(observation, now + evictionInterval)); + return false; + } + + if (distinctObservation.deadline > now) { + distinctObservation.resetDeadline(now + evictionInterval); + return false; + } + } + + return true; + } + + private static DistinctObservation find( + long now, List existingObservations, Throwable observation) { + DistinctObservation existingObservation = null; + + for (int lastIndex = existingObservations.size() - 1, i = lastIndex; i >= 0; i--) { + final DistinctObservation o = existingObservations.get(lastIndex); + + if (equals(o.throwable, observation)) { + existingObservation = o; + break; + } + + if (o.deadline > now) { + if (i == lastIndex) { + existingObservations.remove(i); + } else { + existingObservations.set(i, existingObservations.remove(lastIndex)); + } + lastIndex--; + } + } + + return existingObservation; + } + + private static boolean equals(Throwable lhs, Throwable rhs) { + while (true) { + if (lhs == rhs) { + return true; + } + + if (lhs.getClass() == rhs.getClass() + && Objects.equals(lhs.getMessage(), rhs.getMessage()) + && equals(lhs.getStackTrace(), rhs.getStackTrace())) { + lhs = lhs.getCause(); + rhs = rhs.getCause(); + + if (null == lhs && null == rhs) { + return true; + } else if (null != lhs && null != rhs) { + continue; + } + } + + return false; + } + } + + private static boolean equals( + StackTraceElement[] lhsStackTrace, StackTraceElement[] rhsStackTrace) { + if (lhsStackTrace.length != rhsStackTrace.length) { + return false; + } + + for (int i = 0, length = lhsStackTrace.length; i < length; i++) { + final StackTraceElement lhs = lhsStackTrace[i]; + final StackTraceElement rhs = rhsStackTrace[i]; + + if (lhs.getLineNumber() != rhs.getLineNumber() + || !lhs.getClassName().equals(rhs.getClassName()) + || !Objects.equals(lhs.getMethodName(), rhs.getMethodName()) + || !Objects.equals(lhs.getFileName(), rhs.getFileName())) { + return false; + } + } + + return true; + } + + private static final class DistinctObservation { + + private final Throwable throwable; + private long deadline; + + DistinctObservation(Throwable throwable, long deadline) { + this.throwable = throwable; + this.deadline = deadline; + } + + void resetDeadline(long deadline) { + this.deadline = deadline; + } + } +} diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java index b5e3e879..96f5105f 100644 --- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java @@ -1,6 +1,5 @@ package io.scalecube.cluster.transport.api; -import io.scalecube.net.Address; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -33,8 +32,8 @@ public final class Message implements Externalizable { public static final String HEADER_CORRELATION_ID = "cid"; /** - * This header represents sender address of type {@link Address}. It's an address of message - * originator. This header is optional. + * This header represents sender address. It is an address of message originator. This header is + * optional. */ public static final String HEADER_SENDER = "sender"; @@ -186,12 +185,12 @@ public T data() { } /** - * Returns {@link Address} of the sender of this message. + * Returns address of the sender of this message. * - * @return address + * @return address, or null */ - public Address sender() { - return Optional.ofNullable(header(HEADER_SENDER)).map(Address::from).orElse(null); + public String sender() { + return Optional.ofNullable(header(HEADER_SENDER)).orElse(null); } @Override @@ -281,8 +280,8 @@ public Builder correlationId(String correlationId) { return header(HEADER_CORRELATION_ID, correlationId); } - public Builder sender(Address sender) { - return header(HEADER_SENDER, sender.toString()); + public Builder sender(String sender) { + return header(HEADER_SENDER, sender); } public Message build() { diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Transport.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Transport.java index 2cd0b025..f8ad2d18 100644 --- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Transport.java +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Transport.java @@ -1,7 +1,8 @@ package io.scalecube.cluster.transport.api; -import io.scalecube.net.Address; import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -12,13 +13,14 @@ */ public interface Transport { + Pattern ADDRESS_FORMAT = Pattern.compile("(?^.*):(?\\d+$)"); + /** - * Returns local {@link Address} on which current instance of transport listens for incoming - * messages. + * Returns local address on which current instance of transport listens for incoming messages. * * @return address */ - Address address(); + String address(); /** * Start transport. After this call method {@link #address()} shall be eligible for calling. @@ -50,7 +52,7 @@ public interface Transport { * @return promise which will be completed with result of sending (void or exception) * @throws IllegalArgumentException if {@code message} or {@code address} is null */ - Mono send(Address address, Message message); + Mono send(String address, Message message); /** * Sends message to the given address. It will issue connect in case if no transport channel by @@ -62,7 +64,7 @@ public interface Transport { * @return promise which will be completed with result of sending (message or exception) * @throws IllegalArgumentException if {@code message} or {@code address} is null */ - Mono requestResponse(Address address, Message request); + Mono requestResponse(String address, Message request); /** * Returns stream of received messages. For each observers subscribed to the returned observable: @@ -124,4 +126,54 @@ static Mono bind(TransportConfig config) { Objects.requireNonNull(config.transportFactory(), "[bind] transportFactory"); return config.transportFactory().createTransport(config).start(); } + + /** + * Parses string in format {@code host:port} and returns host part. + * + * @param address address, must be string in format {@code host:port} + * @return address host, or throwing exception + */ + static String parseHost(String address) { + if (address == null || address.isEmpty()) { + throw new IllegalArgumentException("Cannot parse address host from: " + address); + } + + Matcher matcher = ADDRESS_FORMAT.matcher(address); + if (!matcher.find()) { + throw new IllegalArgumentException("Cannot parse address host from: " + address); + } + + String host = matcher.group(1); + if (host == null || host.isEmpty()) { + throw new IllegalArgumentException("Cannot parse address host from: " + address); + } + + return host; + } + + /** + * Parses string in format {@code host:port} and returns port part. + * + * @param address address, must be string in format {@code host:port} + * @return address port, or throwing exception + */ + static int parsePort(String address) { + if (address == null || address.isEmpty()) { + throw new IllegalArgumentException("Cannot parse address port from: " + address); + } + + Matcher matcher = ADDRESS_FORMAT.matcher(address); + if (!matcher.find()) { + throw new IllegalArgumentException("Cannot parse address port from: " + address); + } + + int port; + try { + port = Integer.parseInt(matcher.group(2)); + } catch (NumberFormatException ex) { + throw new IllegalArgumentException("Cannot parse address port from: " + address, ex); + } + + return port; + } } diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java index 06129f83..b6837f9e 100644 --- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java @@ -1,6 +1,5 @@ package io.scalecube.cluster.transport.api; -import io.scalecube.net.Address; import java.util.StringJoiner; import java.util.function.Function; import reactor.core.Exceptions; @@ -22,7 +21,7 @@ public final class TransportConfig implements Cloneable { private MessageCodec messageCodec = MessageCodec.INSTANCE; private int maxFrameLength = 2 * 1024 * 1024; // 2 MB private TransportFactory transportFactory; - private Function addressMapper = Function.identity(); + private Function addressMapper = Function.identity(); public TransportConfig() {} @@ -143,13 +142,13 @@ public TransportConfig maxFrameLength(int maxFrameLength) { * @param addressMapper address mapper * @return new {@code TransportConfig} instance */ - public TransportConfig addressMapper(Function addressMapper) { + public TransportConfig addressMapper(Function addressMapper) { TransportConfig t = clone(); t.addressMapper = addressMapper; return t; } - public Function addressMapper() { + public Function addressMapper() { return addressMapper; } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/Sender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/Sender.java index 5c0cbdd6..3d5ef35c 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/Sender.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/Sender.java @@ -1,13 +1,12 @@ package io.scalecube.transport.netty; import io.scalecube.cluster.transport.api.Message; -import io.scalecube.net.Address; import reactor.core.publisher.Mono; import reactor.netty.Connection; public interface Sender { - Mono connect(Address address); + Mono connect(String address); Mono send(Message message); } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java index 9838c68d..bcb93ad8 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java @@ -1,6 +1,6 @@ package io.scalecube.transport.netty; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -10,13 +10,13 @@ import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.EncoderException; import io.netty.util.ReferenceCountUtil; +import io.scalecube.cluster.transport.api.DistinctErrors; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.MessageCodec; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.errors.DistinctErrors; -import io.scalecube.net.Address; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.time.Duration; import java.util.Map; import java.util.Objects; @@ -36,6 +36,7 @@ public final class TransportImpl implements Transport { private static final Logger LOGGER = LoggerFactory.getLogger(Transport.class); + private static final DistinctErrors DISTINCT_ERRORS = new DistinctErrors(Duration.ofMinutes(1)); private final MessageCodec messageCodec; @@ -48,15 +49,15 @@ public final class TransportImpl implements Transport { private final Sinks.One onStop = Sinks.one(); // Server - private Address address; + private String address; private DisposableServer server; - private final Map> connections = new ConcurrentHashMap<>(); + private final Map> connections = new ConcurrentHashMap<>(); private final LoopResources loopResources = LoopResources.create("sc-cluster-io", 1, true); // Transport factory private final Receiver receiver; private final Sender sender; - private final Function addressMapper; + private final Function addressMapper; /** * Constructor with config as parameter. @@ -72,21 +73,30 @@ public TransportImpl( MessageCodec messageCodec, Receiver receiver, Sender sender, - Function addressMapper) { + Function addressMapper) { this.messageCodec = messageCodec; this.receiver = receiver; this.sender = sender; this.addressMapper = addressMapper; } - private static Address prepareAddress(DisposableServer server) { + private static String prepareAddress(DisposableServer server) { final InetSocketAddress serverAddress = (InetSocketAddress) server.address(); - InetAddress inetAddress = serverAddress.getAddress(); - int port = serverAddress.getPort(); + final InetAddress inetAddress = serverAddress.getAddress(); + final int port = serverAddress.getPort(); + if (inetAddress.isAnyLocalAddress()) { - return Address.create(Address.getLocalIpAddress().getHostAddress(), port); + return getLocalHostAddress() + ":" + port; } else { - return Address.create(inetAddress.getHostAddress(), port); + return inetAddress.getHostAddress() + ":" + port; + } + } + + private static String getLocalHostAddress() { + try { + return InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); } } @@ -96,7 +106,7 @@ private void init(DisposableServer server) { // Setup cleanup stop.asMono() .then(doStop()) - .doFinally(s -> onStop.emitEmpty(RETRY_NON_SERIALIZED)) + .doFinally(s -> onStop.emitEmpty(busyLooping(Duration.ofSeconds(3)))) .subscribe( null, ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", address, ex.toString())); } @@ -122,7 +132,7 @@ public Mono start() { } @Override - public Address address() { + public String address() { return address; } @@ -132,10 +142,10 @@ public boolean isStopped() { } @Override - public final Mono stop() { + public Mono stop() { return Mono.defer( () -> { - stop.emitEmpty(RETRY_NON_SERIALIZED); + stop.emitEmpty(busyLooping(Duration.ofSeconds(3))); return onStop.asMono(); }); } @@ -145,7 +155,7 @@ private Mono doStop() { () -> { LOGGER.info("[{}][doStop] Stopping", address); // Complete incoming messages observable - sink.emitComplete(RETRY_NON_SERIALIZED); + sink.emitComplete(busyLooping(Duration.ofSeconds(3))); return Flux.concatDelayError(closeServer(), shutdownLoopResources()) .then() .doFinally(s -> connections.clear()) @@ -154,12 +164,12 @@ private Mono doStop() { } @Override - public final Flux listen() { + public Flux listen() { return sink.asFlux().onBackpressureBuffer(); } @Override - public Mono send(Address address, Message message) { + public Mono send(String address, Message message) { return Mono.deferContextual(context -> connections.computeIfAbsent(address, this::connect)) .flatMap( connection -> @@ -172,7 +182,7 @@ public Mono send(Address address, Message message) { } @Override - public Mono requestResponse(Address address, final Message request) { + public Mono requestResponse(String address, final Message request) { return Mono.create( sink -> { Objects.requireNonNull(request, "request must be not null"); @@ -224,8 +234,8 @@ private ByteBuf encodeMessage(Message message) { return byteBuf; } - private Mono connect(Address remoteAddress) { - final Address mappedAddr = addressMapper.apply(remoteAddress); + private Mono connect(String remoteAddress) { + final String mappedAddr = addressMapper.apply(remoteAddress); return sender .connect(mappedAddr) .doOnSuccess( @@ -277,13 +287,13 @@ private Mono shutdownLoopResources() { public static final class ReceiverContext { - private final Address address; + private final String address; private final Sinks.Many sink; private final LoopResources loopResources; private final Function messageDecoder; private ReceiverContext( - Address address, + String address, Sinks.Many sink, LoopResources loopResources, Function messageDecoder) { @@ -313,7 +323,7 @@ public void onMessage(ByteBuf byteBuf) { return; } final Message message = messageDecoder.apply(byteBuf); - sink.emitNext(message, RETRY_NON_SERIALIZED); + sink.emitNext(message, busyLooping(Duration.ofSeconds(3))); } catch (Exception e) { LOGGER.error("[{}][onMessage] Exception occurred:", address, e); } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java index 708d1a95..80450f31 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java @@ -1,9 +1,11 @@ package io.scalecube.transport.netty.tcp; +import static io.scalecube.cluster.transport.api.Transport.parseHost; +import static io.scalecube.cluster.transport.api.Transport.parsePort; + import io.netty.channel.ChannelOption; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.TransportConfig; -import io.scalecube.net.Address; import io.scalecube.transport.netty.Sender; import io.scalecube.transport.netty.TransportImpl.SenderContext; import reactor.core.publisher.Mono; @@ -19,7 +21,7 @@ public final class TcpSender implements Sender { } @Override - public Mono connect(Address address) { + public Mono connect(String address) { return Mono.deferContextual(context -> Mono.just(context.get(SenderContext.class))) .map(context -> newTcpClient(context, address)) .flatMap(TcpClient::connect); @@ -38,12 +40,12 @@ public Mono send(Message message) { }); } - private TcpClient newTcpClient(SenderContext context, Address address) { + private TcpClient newTcpClient(SenderContext context, String address) { TcpClient tcpClient = TcpClient.newConnection() .runOn(context.loopResources()) - .host(address.host()) - .port(address.port()) + .host(parseHost(address)) + .port(parsePort(address)) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_REUSEADDR, true) diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java index 704a7f3d..34762917 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java @@ -1,10 +1,12 @@ package io.scalecube.transport.netty.websocket; +import static io.scalecube.cluster.transport.api.Transport.parseHost; +import static io.scalecube.cluster.transport.api.Transport.parsePort; + import io.netty.channel.ChannelOption; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.TransportConfig; -import io.scalecube.net.Address; import io.scalecube.transport.netty.Sender; import io.scalecube.transport.netty.TransportImpl.SenderContext; import reactor.core.publisher.Mono; @@ -21,7 +23,7 @@ public WebsocketSender(TransportConfig config) { } @Override - public Mono connect(Address address) { + public Mono connect(String address) { return Mono.deferContextual(context -> Mono.just(context.get(SenderContext.class))) .map(context -> newWebsocketSender(context, address)) .flatMap(sender -> sender.uri("/").connect()); @@ -44,12 +46,12 @@ public Mono send(Message message) { }); } - private HttpClient.WebsocketSender newWebsocketSender(SenderContext context, Address address) { + private HttpClient.WebsocketSender newWebsocketSender(SenderContext context, String address) { HttpClient httpClient = HttpClient.newConnection() .runOn(context.loopResources()) - .host(address.host()) - .port(address.port()) + .host(parseHost(address)) + .port(parsePort(address)) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_REUSEADDR, true) diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java index 3d07e84d..a8638bf8 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java @@ -4,7 +4,6 @@ import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import io.scalecube.transport.netty.tcp.TcpTransportFactory; import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; @@ -37,7 +36,7 @@ public final void baseTearDown(TestInfo testInfo) { * @param to destination * @param msg request */ - protected Mono send(Transport transport, Address to, Message msg) { + protected Mono send(Transport transport, String to, Message msg) { return transport .send(to, msg) .doOnError( diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java index 753622d7..417c6f0b 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java @@ -4,7 +4,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import io.scalecube.transport.netty.BaseTest; import java.time.Duration; import java.util.ArrayList; @@ -220,7 +219,7 @@ private void assertSendOrder(int total, List received) { } } - private Callable sender(int id, Transport client, Address address, int total) { + private Callable sender(int id, Transport client, String address, int total) { return () -> { for (int j = 0; j < total; j++) { String correlationId = id + "/" + j; diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java index f8bb8daa..9c5ed4b9 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java @@ -8,7 +8,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import io.scalecube.transport.netty.BaseTest; import java.io.IOException; import java.net.UnknownHostException; @@ -45,7 +44,7 @@ public void testUnresolvedHostConnection() { client = createTcpTransport(); // create transport with wrong host try { - Address address = Address.from("wronghost:49255"); + String address = "wronghost:49255"; Message message = Message.withData("q").build(); client.send(address, message).block(Duration.ofSeconds(20)); fail("fail"); @@ -57,7 +56,7 @@ public void testUnresolvedHostConnection() { @Test public void testInteractWithNoConnection(TestInfo testInfo) { - Address serverAddress = Address.from("localhost:49255"); + String serverAddress = "localhost:49255"; for (int i = 0; i < 10; i++) { LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); @@ -94,7 +93,7 @@ public void testPingPongClientTfListenAndServerTfListen() throws Exception { .listen() .subscribe( message -> { - Address address = message.sender(); + String address = message.sender(); assertEquals(client.address(), address, "Expected clientAddress"); send(server, address, Message.fromQualifier("hi client")).subscribe(); }); diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java index ab17c5b1..8ed11235 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java @@ -4,7 +4,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import io.scalecube.transport.netty.BaseTest; import java.time.Duration; import java.util.ArrayList; @@ -220,7 +219,7 @@ private void assertSendOrder(int total, List received) { } } - private Callable sender(int id, Transport client, Address address, int total) { + private Callable sender(int id, Transport client, String address, int total) { return () -> { for (int j = 0; j < total; j++) { String correlationId = id + "/" + j; diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java index 050474af..9d5867c4 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java @@ -8,7 +8,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; import io.scalecube.transport.netty.BaseTest; import java.io.IOException; import java.net.UnknownHostException; @@ -45,7 +44,7 @@ public void testUnresolvedHostConnection() { client = createWebsocketTransport(); // create transport with wrong host try { - Address address = Address.from("wronghost:49255"); + String address = "wronghost:49255"; Message message = Message.withData("q").build(); client.send(address, message).block(Duration.ofSeconds(20)); fail("fail"); @@ -57,7 +56,7 @@ public void testUnresolvedHostConnection() { @Test public void testInteractWithNoConnection(TestInfo testInfo) { - Address serverAddress = Address.from("localhost:49255"); + String serverAddress = "localhost:49255"; for (int i = 0; i < 10; i++) { LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); @@ -94,7 +93,7 @@ public void testPingPongClientTfListenAndServerTfListen() throws Exception { .listen() .subscribe( message -> { - Address address = message.sender(); + String address = message.sender(); assertEquals(client.address(), address, "Expected clientAddress"); send(server, address, Message.fromQualifier("hi client")).subscribe(); });