Skip to content

Commit

Permalink
Get rid of Address
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Sep 22, 2024
1 parent 5ce00b1 commit 204d309
Show file tree
Hide file tree
Showing 19 changed files with 290 additions and 560 deletions.
8 changes: 3 additions & 5 deletions cluster-api/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand All @@ -12,10 +14,6 @@
<name>ScaleCube/ClusterApi</name>

<dependencies>
<dependency>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-commons</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>scalecube-transport-api</artifactId>
Expand Down
9 changes: 4 additions & 5 deletions cluster-api/src/main/java/io/scalecube/cluster/Cluster.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.scalecube.cluster;

import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.util.Collection;
import java.util.Optional;
import reactor.core.publisher.Mono;
Expand All @@ -10,11 +9,11 @@
public interface Cluster {

/**
* Returns {@link Address} of this cluster instance.
* Returns address of this cluster instance.
*
* @return cluster address
*/
Address address();
String address();

/**
* Spreads given message between cluster members using gossiping protocol.
Expand Down Expand Up @@ -52,15 +51,15 @@ public interface Cluster {
*
* @return member by id
*/
Optional<Member> member(String id);
Optional<Member> memberById(String id);

/**
* Returns cluster member by given address or null if no member with such address exists at joined
* cluster.
*
* @return member by address
*/
Optional<Member> member(Address address);
Optional<Member> memberByAddress(String address);

/**
* Returns list of all members of the joined cluster. This will include all cluster members
Expand Down
11 changes: 5 additions & 6 deletions cluster-api/src/main/java/io/scalecube/cluster/Member.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.scalecube.cluster;

import io.scalecube.cluster.membership.MembershipConfig;
import io.scalecube.net.Address;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
Expand All @@ -20,7 +19,7 @@ public final class Member implements Externalizable {

private String id;
private String alias;
private Address address;
private String address;
private String namespace;

public Member() {}
Expand All @@ -33,7 +32,7 @@ public Member() {}
* @param address member address; not null
* @param namespace namespace; not null
*/
public Member(String id, String alias, Address address, String namespace) {
public Member(String id, String alias, String address, String namespace) {
this.id = Objects.requireNonNull(id, "member id");
this.alias = alias; // optional
this.address = Objects.requireNonNull(address, "member address");
Expand Down Expand Up @@ -76,7 +75,7 @@ public String namespace() {
* @see io.scalecube.cluster.transport.api.TransportConfig#port(int)
* @return member address
*/
public Address address() {
public String address() {
return address;
}

Expand Down Expand Up @@ -110,7 +109,7 @@ public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(alias);
}
// address
out.writeUTF(address.toString());
out.writeUTF(address);
// namespace
out.writeUTF(namespace);
}
Expand All @@ -125,7 +124,7 @@ public void readExternal(ObjectInput in) throws IOException {
alias = in.readUTF();
}
// address
address = Address.from(in.readUTF());
address = in.readUTF();
// namespace
this.namespace = in.readUTF();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.scalecube.cluster.membership;

import io.scalecube.net.Address;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -24,7 +23,7 @@ public final class MembershipConfig implements Cloneable {
public static final int DEFAULT_LOCAL_SUSPICION_MULT = 3;
public static final int DEFAULT_LOCAL_SYNC_INTERVAL = 15_000;

private List<Address> seedMembers = Collections.emptyList();
private List<String> seedMembers = Collections.emptyList();
private int syncInterval = DEFAULT_SYNC_INTERVAL;
private int syncTimeout = DEFAULT_SYNC_TIMEOUT;
private int suspicionMult = DEFAULT_SUSPICION_MULT;
Expand Down Expand Up @@ -67,7 +66,7 @@ public static MembershipConfig defaultLocalConfig() {
.syncInterval(DEFAULT_LOCAL_SYNC_INTERVAL);
}

public List<Address> seedMembers() {
public List<String> seedMembers() {
return seedMembers;
}

Expand All @@ -77,7 +76,7 @@ public List<Address> seedMembers() {
* @param seedMembers seed members
* @return new {@code MembershipConfig} instance
*/
public MembershipConfig seedMembers(Address... seedMembers) {
public MembershipConfig seedMembers(String... seedMembers) {
return seedMembers(Arrays.asList(seedMembers));
}

Expand All @@ -87,7 +86,7 @@ public MembershipConfig seedMembers(Address... seedMembers) {
* @param seedMembers seed members
* @return new {@code MembershipConfig} instance
*/
public MembershipConfig seedMembers(List<Address> seedMembers) {
public MembershipConfig seedMembers(List<String> seedMembers) {
MembershipConfig m = clone();
m.seedMembers = Collections.unmodifiableList(new ArrayList<>(seedMembers));
return m;
Expand Down
8 changes: 3 additions & 5 deletions cluster/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand All @@ -12,10 +14,6 @@
<name>ScaleCube/Cluster</name>

<dependencies>
<dependency>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-commons</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>scalecube-transport-api</artifactId>
Expand Down
54 changes: 28 additions & 26 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.scalecube.cluster;

import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
import static io.scalecube.cluster.transport.api.Transport.parseHost;
import static io.scalecube.cluster.transport.api.Transport.parsePort;
import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping;

import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.fdetector.FailureDetectorImpl;
Expand All @@ -16,9 +18,9 @@
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.cluster.transport.api.TransportFactory;
import io.scalecube.net.Address;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
Expand Down Expand Up @@ -111,14 +113,14 @@ private void initLifecycle() {
start
.asMono()
.then(doStart())
.doOnSuccess(avoid -> onStart.emitEmpty(RETRY_NON_SERIALIZED))
.doOnError(th -> onStart.emitError(th, RETRY_NON_SERIALIZED))
.doOnSuccess(avoid -> onStart.emitEmpty(busyLooping(Duration.ofSeconds(3))))
.doOnError(th -> onStart.emitError(th, busyLooping(Duration.ofSeconds(3))))
.subscribe(null, th -> LOGGER.error("[{}][doStart] Exception occurred:", localMember, th));

shutdown
.asMono()
.then(doShutdown())
.doFinally(s -> onShutdown.emitEmpty(RETRY_NON_SERIALIZED))
.doFinally(s -> onShutdown.emitEmpty(busyLooping(Duration.ofSeconds(3))))
.subscribe(
null,
th ->
Expand Down Expand Up @@ -224,7 +226,7 @@ public ClusterImpl handler(Function<Cluster, ClusterMessageHandler> handler) {
public Mono<Cluster> start() {
return Mono.defer(
() -> {
start.emitEmpty(RETRY_NON_SERIALIZED);
start.emitEmpty(busyLooping(Duration.ofSeconds(3)));
return onStart.asMono().thenReturn(this);
});
}
Expand All @@ -241,10 +243,10 @@ private Mono<Cluster> doStart0() {
return Transport.bind(config.transportConfig())
.flatMap(
boundTransport -> {
localMember = createLocalMember(Address.from(boundTransport.address()));
localMember = createLocalMember(boundTransport.address());
transport = new SenderAwareTransport(boundTransport, localMember.address());

scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true);
scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address(), true);

failureDetector =
new FailureDetectorImpl(
Expand Down Expand Up @@ -283,9 +285,10 @@ private Mono<Cluster> doStart0() {
/*.publishOn(scheduler)*/
// Dont uncomment, already beign executed inside scalecube-cluster thread
.subscribe(
event -> membershipSink.emitNext(event, RETRY_NON_SERIALIZED),
event ->
membershipSink.emitNext(event, busyLooping(Duration.ofSeconds(3))),
ex -> LOGGER.error("[{}][membership][error] cause:", localMember, ex),
() -> membershipSink.emitComplete(RETRY_NON_SERIALIZED)));
() -> membershipSink.emitComplete(busyLooping(Duration.ofSeconds(3)))));

return Mono.fromRunnable(() -> failureDetector.start())
.then(Mono.fromRunnable(() -> gossip.start()))
Expand Down Expand Up @@ -371,14 +374,13 @@ private Flux<MembershipEvent> listenMembership() {
* @param address transport address
* @return local cluster member with cluster address and cluster member id
*/
private Member createLocalMember(Address address) {
int port = Optional.ofNullable(config.externalPort()).orElse(address.port());
private Member createLocalMember(String address) {
final String finalHost =
Optional.ofNullable(config.externalHost()).orElseGet(() -> parseHost(address));
final int finalPort =
Optional.ofNullable(config.externalPort()).orElseGet(() -> parsePort(address));

// calculate local member cluster address
Address memberAddress =
Optional.ofNullable(config.externalHost())
.map(host -> Address.create(host, port))
.orElseGet(() -> Address.create(address.host(), port));
final String memberAddress = finalHost + ":" + finalPort;

return new Member(
config.memberId() != null ? config.memberId() : UUID.randomUUID().toString(),
Expand All @@ -388,7 +390,7 @@ private Member createLocalMember(Address address) {
}

@Override
public Address address() {
public String address() {
return member().address();
}

Expand Down Expand Up @@ -431,13 +433,13 @@ public Member member() {
}

@Override
public Optional<Member> member(String id) {
return membership.member(id);
public Optional<Member> memberById(String id) {
return membership.memberById(id);
}

@Override
public Optional<Member> member(Address address) {
return membership.member(address);
public Optional<Member> memberByAddress(String address) {
return membership.memberByAddress(address);
}

@Override
Expand All @@ -449,7 +451,7 @@ public <T> Mono<Void> updateMetadata(T metadata) {

@Override
public void shutdown() {
shutdown.emitEmpty(RETRY_NON_SERIALIZED);
shutdown.emitEmpty(busyLooping(Duration.ofSeconds(3)));
}

private Mono<Void> doShutdown() {
Expand Down Expand Up @@ -498,9 +500,9 @@ public Mono<Void> onShutdown() {
private static class SenderAwareTransport implements Transport {

private final Transport transport;
private final Address address;
private final String address;

private SenderAwareTransport(Transport transport, Address address) {
private SenderAwareTransport(Transport transport, String address) {
this.transport = Objects.requireNonNull(transport);
this.address = Objects.requireNonNull(address);
}
Expand Down Expand Up @@ -541,7 +543,7 @@ public Flux<Message> listen() {
}

private Message enhanceWithSender(Message message) {
return Message.with(message).sender(address.toString()).build();
return Message.with(message).sender(address).build();
}
}
}
Loading

0 comments on commit 204d309

Please sign in to comment.