Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Sep 22, 2024
1 parent a2812cf commit 5ce00b1
Show file tree
Hide file tree
Showing 26 changed files with 698 additions and 249 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.scalecube.cluster.utils;

import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -30,21 +29,21 @@ public final class NetworkEmulator {
private volatile OutboundSettings defaultOutboundSettings = new OutboundSettings(0, 0);
private volatile InboundSettings defaultInboundSettings = new InboundSettings(true);

private final Map<Address, OutboundSettings> outboundSettings = new ConcurrentHashMap<>();
private final Map<Address, InboundSettings> inboundSettings = new ConcurrentHashMap<>();
private final Map<String, OutboundSettings> outboundSettings = new ConcurrentHashMap<>();
private final Map<String, InboundSettings> inboundSettings = new ConcurrentHashMap<>();

private final AtomicLong totalMessageSentCount = new AtomicLong();
private final AtomicLong totalOutboundMessageLostCount = new AtomicLong();
private final AtomicLong totalInboundMessageLostCount = new AtomicLong();

private final Address address;
private final String address;

/**
* Creates new instance of network emulator.
*
* @param address local address
*/
NetworkEmulator(Address address) {
NetworkEmulator(String address) {
this.address = address;
}

Expand All @@ -56,7 +55,7 @@ public final class NetworkEmulator {
* @param destination address of target endpoint
* @return network outbound settings
*/
public OutboundSettings outboundSettings(Address destination) {
public OutboundSettings outboundSettings(String destination) {
return outboundSettings.getOrDefault(destination, defaultOutboundSettings);
}

Expand All @@ -67,7 +66,7 @@ public OutboundSettings outboundSettings(Address destination) {
* @param lossPercent loss in percents
* @param meanDelay mean delay
*/
public void outboundSettings(Address destination, int lossPercent, int meanDelay) {
public void outboundSettings(String destination, int lossPercent, int meanDelay) {
OutboundSettings settings = new OutboundSettings(lossPercent, meanDelay);
outboundSettings.put(destination, settings);
LOGGER.debug("[{}] Set outbound settings {} to {}", address, settings, destination);
Expand Down Expand Up @@ -103,7 +102,7 @@ public void unblockAllOutbound() {
*
* @param destinations collection of target endpoints where to apply
*/
public void blockOutbound(Address... destinations) {
public void blockOutbound(String... destinations) {
blockOutbound(Arrays.asList(destinations));
}

Expand All @@ -112,8 +111,8 @@ public void blockOutbound(Address... destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void blockOutbound(Collection<Address> destinations) {
for (Address destination : destinations) {
public void blockOutbound(Collection<String> destinations) {
for (String destination : destinations) {
outboundSettings.put(destination, new OutboundSettings(100, 0));
}
LOGGER.debug("[{}] Blocked outbound to {}", address, destinations);
Expand All @@ -124,7 +123,7 @@ public void blockOutbound(Collection<Address> destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void unblockOutbound(Address... destinations) {
public void unblockOutbound(String... destinations) {
unblockOutbound(Arrays.asList(destinations));
}

Expand All @@ -133,7 +132,7 @@ public void unblockOutbound(Address... destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void unblockOutbound(Collection<Address> destinations) {
public void unblockOutbound(Collection<String> destinations) {
destinations.forEach(outboundSettings::remove);
LOGGER.debug("[{}] Unblocked outbound {}", address, destinations);
}
Expand Down Expand Up @@ -164,7 +163,7 @@ public long totalOutboundMessageLostCount() {
* @param address target address
* @return mono message
*/
public Mono<Message> tryFailOutbound(Message msg, Address address) {
public Mono<Message> tryFailOutbound(Message msg, String address) {
return Mono.defer(
() -> {
totalMessageSentCount.incrementAndGet();
Expand All @@ -187,7 +186,7 @@ public Mono<Message> tryFailOutbound(Message msg, Address address) {
* @param address target address
* @return mono message
*/
public Mono<Message> tryDelayOutbound(Message msg, Address address) {
public Mono<Message> tryDelayOutbound(Message msg, String address) {
return Mono.defer(
() -> {
totalMessageSentCount.incrementAndGet();
Expand All @@ -209,7 +208,7 @@ public Mono<Message> tryDelayOutbound(Message msg, Address address) {
* @param destination address of target endpoint
* @return network inbound settings
*/
public InboundSettings inboundSettings(Address destination) {
public InboundSettings inboundSettings(String destination) {
return inboundSettings.getOrDefault(destination, defaultInboundSettings);
}

Expand All @@ -218,7 +217,7 @@ public InboundSettings inboundSettings(Address destination) {
*
* @param shallPass shallPass inbound flag
*/
public void inboundSettings(Address destination, boolean shallPass) {
public void inboundSettings(String destination, boolean shallPass) {
InboundSettings settings = new InboundSettings(shallPass);
inboundSettings.put(destination, settings);
LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);
Expand Down Expand Up @@ -253,7 +252,7 @@ public void unblockAllInbound() {
*
* @param destinations collection of target endpoints where to apply
*/
public void blockInbound(Address... destinations) {
public void blockInbound(String... destinations) {
blockInbound(Arrays.asList(destinations));
}

Expand All @@ -262,8 +261,8 @@ public void blockInbound(Address... destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void blockInbound(Collection<Address> destinations) {
for (Address destination : destinations) {
public void blockInbound(Collection<String> destinations) {
for (String destination : destinations) {
inboundSettings.put(destination, new InboundSettings(false));
}
LOGGER.debug("[{}] Blocked inbound from {}", address, destinations);
Expand All @@ -274,7 +273,7 @@ public void blockInbound(Collection<Address> destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void unblockInbound(Address... destinations) {
public void unblockInbound(String... destinations) {
unblockInbound(Arrays.asList(destinations));
}

Expand All @@ -283,7 +282,7 @@ public void unblockInbound(Address... destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void unblockInbound(Collection<Address> destinations) {
public void unblockInbound(Collection<String> destinations) {
destinations.forEach(inboundSettings::remove);
LOGGER.debug("[{}] Unblocked inbound from {}", address, destinations);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -26,7 +25,7 @@ public NetworkEmulator networkEmulator() {
}

@Override
public Address address() {
public String address() {
return transport.address();
}

Expand All @@ -46,7 +45,7 @@ public boolean isStopped() {
}

@Override
public Mono<Void> send(Address address, Message message) {
public Mono<Void> send(String address, Message message) {
return Mono.defer(
() ->
Mono.just(enhanceWithSender(message))
Expand All @@ -56,7 +55,7 @@ public Mono<Void> send(Address address, Message message) {
}

@Override
public Mono<Message> requestResponse(Address address, Message request) {
public Mono<Message> requestResponse(String address, Message request) {
return Mono.defer(
() ->
Mono.just(enhanceWithSender(request))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.scalecube.cluster.utils;

import io.scalecube.cluster.utils.NetworkEmulator.OutboundSettings;
import io.scalecube.net.Address;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -10,24 +9,23 @@ public class NetworkEmulatorTest extends BaseTest {
@Test
public void testResolveLinkSettingsBySocketAddress() {
// Init network emulator
Address address = Address.from("localhost:1234");
NetworkEmulator networkEmulator = new NetworkEmulator(address);
networkEmulator.outboundSettings(Address.create("localhost", 5678), 25, 10);
networkEmulator.outboundSettings(Address.create("192.168.0.1", 8765), 10, 20);
NetworkEmulator networkEmulator = new NetworkEmulator("localhost:1234");
networkEmulator.outboundSettings("localhost:" + 5678, 25, 10);
networkEmulator.outboundSettings("192.168.0.1:" + 8765, 10, 20);
networkEmulator.setDefaultOutboundSettings(0, 2);

// Check resolve by hostname:port
OutboundSettings link1 = networkEmulator.outboundSettings(Address.create("localhost", 5678));
OutboundSettings link1 = networkEmulator.outboundSettings("localhost:" + 5678);
Assertions.assertEquals(25, link1.lossPercent());
Assertions.assertEquals(10, link1.meanDelay());

// Check resolve by ipaddr:port
OutboundSettings link2 = networkEmulator.outboundSettings(Address.create("192.168.0.1", 8765));
OutboundSettings link2 = networkEmulator.outboundSettings("192.168.0.1:" + 8765);
Assertions.assertEquals(10, link2.lossPercent());
Assertions.assertEquals(20, link2.meanDelay());

// Check default link settings
OutboundSettings link3 = networkEmulator.outboundSettings(Address.create("localhost", 8765));
OutboundSettings link3 = networkEmulator.outboundSettings("localhost:" + 8765);
Assertions.assertEquals(0, link3.lossPercent());
Assertions.assertEquals(2, link3.meanDelay());
}
Expand Down
10 changes: 5 additions & 5 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private Mono<Cluster> doStart0() {
return Transport.bind(config.transportConfig())
.flatMap(
boundTransport -> {
localMember = createLocalMember(boundTransport.address());
localMember = createLocalMember(Address.from(boundTransport.address()));
transport = new SenderAwareTransport(boundTransport, localMember.address());

scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true);
Expand Down Expand Up @@ -506,7 +506,7 @@ private SenderAwareTransport(Transport transport, Address address) {
}

@Override
public Address address() {
public String address() {
return transport.address();
}

Expand All @@ -526,12 +526,12 @@ public boolean isStopped() {
}

@Override
public Mono<Void> send(Address address, Message message) {
public Mono<Void> send(String address, Message message) {
return Mono.defer(() -> transport.send(address, enhanceWithSender(message)));
}

@Override
public Mono<Message> requestResponse(Address address, Message request) {
public Mono<Message> requestResponse(String address, Message request) {
return Mono.defer(() -> transport.requestResponse(address, enhanceWithSender(request)));
}

Expand All @@ -541,7 +541,7 @@ public Flux<Message> listen() {
}

private Message enhanceWithSender(Message message) {
return Message.with(message).sender(address).build();
return Message.with(message).sender(address.toString()).build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void doPing() {
LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember);
Address address = pingMember.address();
transport
.requestResponse(address, pingMsg)
.requestResponse(address.toString(), pingMsg)
.timeout(Duration.ofMillis(config.pingTimeout()), scheduler)
.publishOn(scheduler)
.subscribe(
Expand Down Expand Up @@ -190,7 +190,7 @@ private void doPingReq(
pingReqMembers.forEach(
member ->
transport
.requestResponse(member.address(), pingReqMsg)
.requestResponse(member.address().toString(), pingReqMsg)
.timeout(timeout, scheduler)
.publishOn(scheduler)
.subscribe(
Expand Down Expand Up @@ -232,7 +232,7 @@ private void onMessage(Message message) {
/** Listens to PING message and answers with ACK. */
private void onPing(Message message) {
long period = this.currentPeriod;
Address sender = message.sender();
Address sender = Address.from(message.sender());
LOGGER.debug("[{}][{}] Received Ping from {}", localMember, period, sender);
PingData data = message.data();
data = data.withAckType(AckType.DEST_OK);
Expand All @@ -252,7 +252,7 @@ private void onPing(Message message) {
Address address = data.getFrom().address();
LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, address);
transport
.send(address, ackMessage)
.send(address.toString(), ackMessage)
.subscribe(
null,
ex ->
Expand All @@ -278,7 +278,7 @@ private void onPingReq(Message message) {
Address address = target.address();
LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, address);
transport
.send(address, pingMessage)
.send(address.toString(), pingMessage)
.subscribe(
null,
ex ->
Expand Down Expand Up @@ -308,7 +308,7 @@ private void onTransitPingAck(Message message) {
Address address = target.address();
LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, address);
transport
.send(address, originalAckMessage)
.send(address.toString(), originalAckMessage)
.subscribe(
null,
ex ->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package io.scalecube.cluster.gossip;

import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping;

import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -120,7 +121,7 @@ public void stop() {
actionsDisposables.dispose();

// Stop publishing events
sink.emitComplete(RETRY_NON_SERIALIZED);
sink.emitComplete(busyLooping(Duration.ofSeconds(3)));
}

@Override
Expand Down Expand Up @@ -208,7 +209,7 @@ private void onGossipRequest(Message message) {
if (gossipState == null) { // new gossip
gossipState = new GossipState(gossip, period);
gossips.put(gossip.gossipId(), gossipState);
sink.emitNext(gossip.message(), RETRY_NON_SERIALIZED);
sink.emitNext(gossip.message(), busyLooping(Duration.ofSeconds(3)));
}
}
if (gossipState != null) {
Expand Down Expand Up @@ -294,7 +295,7 @@ private void spreadGossipsTo(long period, Member member) {
.forEach(
message ->
transport
.send(address, message)
.send(address.toString(), message)
.subscribe(
null,
ex ->
Expand Down
Loading

0 comments on commit 5ce00b1

Please sign in to comment.