Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get rid of address #394

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions cluster-api/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand All @@ -12,10 +14,6 @@
<name>ScaleCube/ClusterApi</name>

<dependencies>
<dependency>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-commons</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>scalecube-transport-api</artifactId>
Expand Down
9 changes: 4 additions & 5 deletions cluster-api/src/main/java/io/scalecube/cluster/Cluster.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.scalecube.cluster;

import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.util.Collection;
import java.util.Optional;
import reactor.core.publisher.Mono;
Expand All @@ -10,11 +9,11 @@
public interface Cluster {

/**
* Returns {@link Address} of this cluster instance.
* Returns address of this cluster instance.
*
* @return cluster address
*/
Address address();
String address();

/**
* Spreads given message between cluster members using gossiping protocol.
Expand Down Expand Up @@ -52,15 +51,15 @@ public interface Cluster {
*
* @return member by id
*/
Optional<Member> member(String id);
Optional<Member> memberById(String id);

/**
* Returns cluster member by given address or null if no member with such address exists at joined
* cluster.
*
* @return member by address
*/
Optional<Member> member(Address address);
Optional<Member> memberByAddress(String address);

/**
* Returns list of all members of the joined cluster. This will include all cluster members
Expand Down
11 changes: 5 additions & 6 deletions cluster-api/src/main/java/io/scalecube/cluster/Member.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.scalecube.cluster;

import io.scalecube.cluster.membership.MembershipConfig;
import io.scalecube.net.Address;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
Expand All @@ -20,7 +19,7 @@ public final class Member implements Externalizable {

private String id;
private String alias;
private Address address;
private String address;
private String namespace;

public Member() {}
Expand All @@ -33,7 +32,7 @@ public Member() {}
* @param address member address; not null
* @param namespace namespace; not null
*/
public Member(String id, String alias, Address address, String namespace) {
public Member(String id, String alias, String address, String namespace) {
this.id = Objects.requireNonNull(id, "member id");
this.alias = alias; // optional
this.address = Objects.requireNonNull(address, "member address");
Expand Down Expand Up @@ -76,7 +75,7 @@ public String namespace() {
* @see io.scalecube.cluster.transport.api.TransportConfig#port(int)
* @return member address
*/
public Address address() {
public String address() {
return address;
}

Expand Down Expand Up @@ -110,7 +109,7 @@ public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(alias);
}
// address
out.writeUTF(address.toString());
out.writeUTF(address);
// namespace
out.writeUTF(namespace);
}
Expand All @@ -125,7 +124,7 @@ public void readExternal(ObjectInput in) throws IOException {
alias = in.readUTF();
}
// address
address = Address.from(in.readUTF());
address = in.readUTF();
// namespace
this.namespace = in.readUTF();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.scalecube.cluster.membership;

import io.scalecube.net.Address;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -24,7 +23,7 @@ public final class MembershipConfig implements Cloneable {
public static final int DEFAULT_LOCAL_SUSPICION_MULT = 3;
public static final int DEFAULT_LOCAL_SYNC_INTERVAL = 15_000;

private List<Address> seedMembers = Collections.emptyList();
private List<String> seedMembers = Collections.emptyList();
private int syncInterval = DEFAULT_SYNC_INTERVAL;
private int syncTimeout = DEFAULT_SYNC_TIMEOUT;
private int suspicionMult = DEFAULT_SUSPICION_MULT;
Expand Down Expand Up @@ -67,7 +66,7 @@ public static MembershipConfig defaultLocalConfig() {
.syncInterval(DEFAULT_LOCAL_SYNC_INTERVAL);
}

public List<Address> seedMembers() {
public List<String> seedMembers() {
return seedMembers;
}

Expand All @@ -77,7 +76,7 @@ public List<Address> seedMembers() {
* @param seedMembers seed members
* @return new {@code MembershipConfig} instance
*/
public MembershipConfig seedMembers(Address... seedMembers) {
public MembershipConfig seedMembers(String... seedMembers) {
return seedMembers(Arrays.asList(seedMembers));
}

Expand All @@ -87,7 +86,7 @@ public MembershipConfig seedMembers(Address... seedMembers) {
* @param seedMembers seed members
* @return new {@code MembershipConfig} instance
*/
public MembershipConfig seedMembers(List<Address> seedMembers) {
public MembershipConfig seedMembers(List<String> seedMembers) {
MembershipConfig m = clone();
m.seedMembers = Collections.unmodifiableList(new ArrayList<>(seedMembers));
return m;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.scalecube.cluster.utils;

import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -30,21 +29,21 @@ public final class NetworkEmulator {
private volatile OutboundSettings defaultOutboundSettings = new OutboundSettings(0, 0);
private volatile InboundSettings defaultInboundSettings = new InboundSettings(true);

private final Map<Address, OutboundSettings> outboundSettings = new ConcurrentHashMap<>();
private final Map<Address, InboundSettings> inboundSettings = new ConcurrentHashMap<>();
private final Map<String, OutboundSettings> outboundSettings = new ConcurrentHashMap<>();
private final Map<String, InboundSettings> inboundSettings = new ConcurrentHashMap<>();

private final AtomicLong totalMessageSentCount = new AtomicLong();
private final AtomicLong totalOutboundMessageLostCount = new AtomicLong();
private final AtomicLong totalInboundMessageLostCount = new AtomicLong();

private final Address address;
private final String address;

/**
* Creates new instance of network emulator.
*
* @param address local address
*/
NetworkEmulator(Address address) {
NetworkEmulator(String address) {
this.address = address;
}

Expand All @@ -56,7 +55,7 @@ public final class NetworkEmulator {
* @param destination address of target endpoint
* @return network outbound settings
*/
public OutboundSettings outboundSettings(Address destination) {
public OutboundSettings outboundSettings(String destination) {
return outboundSettings.getOrDefault(destination, defaultOutboundSettings);
}

Expand All @@ -67,7 +66,7 @@ public OutboundSettings outboundSettings(Address destination) {
* @param lossPercent loss in percents
* @param meanDelay mean delay
*/
public void outboundSettings(Address destination, int lossPercent, int meanDelay) {
public void outboundSettings(String destination, int lossPercent, int meanDelay) {
OutboundSettings settings = new OutboundSettings(lossPercent, meanDelay);
outboundSettings.put(destination, settings);
LOGGER.debug("[{}] Set outbound settings {} to {}", address, settings, destination);
Expand Down Expand Up @@ -103,7 +102,7 @@ public void unblockAllOutbound() {
*
* @param destinations collection of target endpoints where to apply
*/
public void blockOutbound(Address... destinations) {
public void blockOutbound(String... destinations) {
blockOutbound(Arrays.asList(destinations));
}

Expand All @@ -112,8 +111,8 @@ public void blockOutbound(Address... destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void blockOutbound(Collection<Address> destinations) {
for (Address destination : destinations) {
public void blockOutbound(Collection<String> destinations) {
for (String destination : destinations) {
outboundSettings.put(destination, new OutboundSettings(100, 0));
}
LOGGER.debug("[{}] Blocked outbound to {}", address, destinations);
Expand All @@ -124,7 +123,7 @@ public void blockOutbound(Collection<Address> destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void unblockOutbound(Address... destinations) {
public void unblockOutbound(String... destinations) {
unblockOutbound(Arrays.asList(destinations));
}

Expand All @@ -133,7 +132,7 @@ public void unblockOutbound(Address... destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void unblockOutbound(Collection<Address> destinations) {
public void unblockOutbound(Collection<String> destinations) {
destinations.forEach(outboundSettings::remove);
LOGGER.debug("[{}] Unblocked outbound {}", address, destinations);
}
Expand Down Expand Up @@ -164,7 +163,7 @@ public long totalOutboundMessageLostCount() {
* @param address target address
* @return mono message
*/
public Mono<Message> tryFailOutbound(Message msg, Address address) {
public Mono<Message> tryFailOutbound(Message msg, String address) {
return Mono.defer(
() -> {
totalMessageSentCount.incrementAndGet();
Expand All @@ -187,7 +186,7 @@ public Mono<Message> tryFailOutbound(Message msg, Address address) {
* @param address target address
* @return mono message
*/
public Mono<Message> tryDelayOutbound(Message msg, Address address) {
public Mono<Message> tryDelayOutbound(Message msg, String address) {
return Mono.defer(
() -> {
totalMessageSentCount.incrementAndGet();
Expand All @@ -209,7 +208,7 @@ public Mono<Message> tryDelayOutbound(Message msg, Address address) {
* @param destination address of target endpoint
* @return network inbound settings
*/
public InboundSettings inboundSettings(Address destination) {
public InboundSettings inboundSettings(String destination) {
return inboundSettings.getOrDefault(destination, defaultInboundSettings);
}

Expand All @@ -218,7 +217,7 @@ public InboundSettings inboundSettings(Address destination) {
*
* @param shallPass shallPass inbound flag
*/
public void inboundSettings(Address destination, boolean shallPass) {
public void inboundSettings(String destination, boolean shallPass) {
InboundSettings settings = new InboundSettings(shallPass);
inboundSettings.put(destination, settings);
LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);
Expand Down Expand Up @@ -253,7 +252,7 @@ public void unblockAllInbound() {
*
* @param destinations collection of target endpoints where to apply
*/
public void blockInbound(Address... destinations) {
public void blockInbound(String... destinations) {
blockInbound(Arrays.asList(destinations));
}

Expand All @@ -262,8 +261,8 @@ public void blockInbound(Address... destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void blockInbound(Collection<Address> destinations) {
for (Address destination : destinations) {
public void blockInbound(Collection<String> destinations) {
for (String destination : destinations) {
inboundSettings.put(destination, new InboundSettings(false));
}
LOGGER.debug("[{}] Blocked inbound from {}", address, destinations);
Expand All @@ -274,7 +273,7 @@ public void blockInbound(Collection<Address> destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void unblockInbound(Address... destinations) {
public void unblockInbound(String... destinations) {
unblockInbound(Arrays.asList(destinations));
}

Expand All @@ -283,7 +282,7 @@ public void unblockInbound(Address... destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void unblockInbound(Collection<Address> destinations) {
public void unblockInbound(Collection<String> destinations) {
destinations.forEach(inboundSettings::remove);
LOGGER.debug("[{}] Unblocked inbound from {}", address, destinations);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -26,7 +25,7 @@ public NetworkEmulator networkEmulator() {
}

@Override
public Address address() {
public String address() {
return transport.address();
}

Expand All @@ -46,7 +45,7 @@ public boolean isStopped() {
}

@Override
public Mono<Void> send(Address address, Message message) {
public Mono<Void> send(String address, Message message) {
return Mono.defer(
() ->
Mono.just(enhanceWithSender(message))
Expand All @@ -56,7 +55,7 @@ public Mono<Void> send(Address address, Message message) {
}

@Override
public Mono<Message> requestResponse(Address address, Message request) {
public Mono<Message> requestResponse(String address, Message request) {
return Mono.defer(
() ->
Mono.just(enhanceWithSender(request))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.scalecube.cluster.utils;

import io.scalecube.cluster.utils.NetworkEmulator.OutboundSettings;
import io.scalecube.net.Address;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -10,24 +9,23 @@ public class NetworkEmulatorTest extends BaseTest {
@Test
public void testResolveLinkSettingsBySocketAddress() {
// Init network emulator
Address address = Address.from("localhost:1234");
NetworkEmulator networkEmulator = new NetworkEmulator(address);
networkEmulator.outboundSettings(Address.create("localhost", 5678), 25, 10);
networkEmulator.outboundSettings(Address.create("192.168.0.1", 8765), 10, 20);
NetworkEmulator networkEmulator = new NetworkEmulator("localhost:1234");
networkEmulator.outboundSettings("localhost:" + 5678, 25, 10);
networkEmulator.outboundSettings("192.168.0.1:" + 8765, 10, 20);
networkEmulator.setDefaultOutboundSettings(0, 2);

// Check resolve by hostname:port
OutboundSettings link1 = networkEmulator.outboundSettings(Address.create("localhost", 5678));
OutboundSettings link1 = networkEmulator.outboundSettings("localhost:" + 5678);
Assertions.assertEquals(25, link1.lossPercent());
Assertions.assertEquals(10, link1.meanDelay());

// Check resolve by ipaddr:port
OutboundSettings link2 = networkEmulator.outboundSettings(Address.create("192.168.0.1", 8765));
OutboundSettings link2 = networkEmulator.outboundSettings("192.168.0.1:" + 8765);
Assertions.assertEquals(10, link2.lossPercent());
Assertions.assertEquals(20, link2.meanDelay());

// Check default link settings
OutboundSettings link3 = networkEmulator.outboundSettings(Address.create("localhost", 8765));
OutboundSettings link3 = networkEmulator.outboundSettings("localhost:" + 8765);
Assertions.assertEquals(0, link3.lossPercent());
Assertions.assertEquals(2, link3.meanDelay());
}
Expand Down
Loading
Loading