Skip to content

Commit

Permalink
Reinstate master-replica wording #1518
Browse files Browse the repository at this point in the history
After community feedback, we're getting back to Master/Replica from Upstream/Replica to not introduce confusion with the term Upstream.
  • Loading branch information
mp911de committed Jan 22, 2021
1 parent 61e486f commit dcccba7
Show file tree
Hide file tree
Showing 24 changed files with 181 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class PubSubClusterEndpoint<K, V> extends PubSubEndpoint<K, V> {

private final NotifyingMessageListener multicast = new NotifyingMessageListener();

private final UpstreamMessageListener upstream = new UpstreamMessageListener();
private final MasterMessageListener upstream = new MasterMessageListener();

private volatile boolean nodeMessagePropagation = false;

Expand Down Expand Up @@ -108,7 +108,7 @@ protected void notifyListeners(PubSubMessage<K, V> output) {
}
}

private class UpstreamMessageListener extends NotifyingMessageListener {
private class MasterMessageListener extends NotifyingMessageListener {

@Override
public void message(RedisClusterNode node, K channel, V message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
import io.lettuce.core.models.role.RedisNodeDescription;

/**
* {@link UpstreamReplicaConnector} to connect unmanaged Redis Master/Replica with auto-discovering master and replica nodes
* from a single {@link RedisURI}.
* {@link MasterReplicaConnector} to connect unmanaged Redis Master/Replica with auto-discovering master and replica nodes from
* a single {@link RedisURI}.
*
* @author Mark Paluch
* @since 5.1
*/
class AutodiscoveryConnector<K, V> implements UpstreamReplicaConnector<K, V> {
class AutodiscoveryConnector<K, V> implements MasterReplicaConnector<K, V> {

private final RedisClient redisClient;

Expand Down Expand Up @@ -112,8 +112,8 @@ private Mono<StatefulRedisMasterReplicaConnection<K, V>> initializeConnection(Re
ReplicaTopologyProvider topologyProvider = new ReplicaTopologyProvider(connectionAndUri.getT2(),
connectionAndUri.getT1());

UpstreamReplicaTopologyRefresh refresh = new UpstreamReplicaTopologyRefresh(redisClient, topologyProvider);
UpstreamReplicaConnectionProvider<K, V> connectionProvider = new UpstreamReplicaConnectionProvider<>(redisClient, codec,
MasterReplicaTopologyRefresh refresh = new MasterReplicaTopologyRefresh(redisClient, topologyProvider);
MasterReplicaConnectionProvider<K, V> connectionProvider = new MasterReplicaConnectionProvider<>(redisClient, codec,
redisURI, (Map) initialConnections);

Mono<List<RedisNodeDescription>> refreshFuture = refresh.getNodes(redisURI);
Expand All @@ -122,10 +122,10 @@ private Mono<StatefulRedisMasterReplicaConnection<K, V>> initializeConnection(Re

connectionProvider.setKnownNodes(nodes);

UpstreamReplicaChannelWriter channelWriter = new UpstreamReplicaChannelWriter(connectionProvider,
MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider,
redisClient.getResources());

StatefulRedisUpstreamReplicaConnectionImpl<K, V> connection = new StatefulRedisUpstreamReplicaConnectionImpl<>(
StatefulRedisMasterReplicaConnectionImpl<K, V> connection = new StatefulRedisMasterReplicaConnectionImpl<>(
channelWriter, codec, redisURI.getTimeout());

connection.setOptions(redisClient.getOptions());
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/lettuce/core/masterreplica/MasterReplica.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@
* <p />
* Master-Replica topologies are either static or semi-static. Redis Standalone instances with attached replicas provide no
* failover/HA mechanism. Redis Sentinel managed instances are controlled by Redis Sentinel and allow failover (which include
* upstream promotion). The {@link MasterReplica} API supports both mechanisms. The topology is provided by a
* master promotion). The {@link MasterReplica} API supports both mechanisms. The topology is provided by a
* {@link TopologyProvider}:
*
* <ul>
* <li>{@link ReplicaTopologyProvider}: Dynamic topology lookup using the {@code INFO REPLICATION} output. Replicas are listed
* as {@code replicaN=...} entries. The initial connection can either point to a upstream or a replica and the topology provider
* as {@code replicaN=...} entries. The initial connection can either point to a master or a replica and the topology provider
* will discover nodes. The connection needs to be re-established outside of lettuce in a case of Master/Replica failover or
* topology changes.</li>
* <li>{@link StaticMasterReplicaTopologyProvider}: Topology is defined by the list of {@link RedisURI URIs} and the
Expand All @@ -83,7 +83,7 @@
*
* <ul>
* <li>Redis Sentinel: At least one Sentinel must be reachable, the masterId must be registered and at least one host must be
* available (upstream or replica). Allows for runtime-recovery based on Sentinel Events.</li>
* available (master or replica). Allows for runtime-recovery based on Sentinel Events.</li>
* <li>Static Setup (auto-discovery): The initial endpoint must be reachable. No recovery/reconfiguration during runtime.</li>
* <li>Static Setup (provided hosts): All endpoints must be reachable. No recovery/reconfiguration during runtime.</li>
* </ul>
Expand All @@ -98,7 +98,7 @@ public class MasterReplica {
* {@link RedisCodec codec} to encode/decode keys.
* <p>
* This {@link MasterReplica} performs auto-discovery of nodes using either Redis Sentinel or Master/Replica. A
* {@link RedisURI} can point to either a upstream or a replica host.
* {@link RedisURI} can point to either a master or a replica host.
* </p>
*
* @param redisClient the Redis client.
Expand All @@ -118,7 +118,7 @@ public static <K, V> StatefulRedisMasterReplicaConnection<K, V> connect(RedisCli
* supplied {@link RedisCodec codec} to encode/decode keys.
* <p>
* This {@link MasterReplica} performs auto-discovery of nodes using either Redis Sentinel or Master/Replica. A
* {@link RedisURI} can point to either a upstream or a replica host.
* {@link RedisURI} can point to either a master or a replica host.
* </p>
*
* @param redisClient the Redis client.
Expand Down Expand Up @@ -224,7 +224,7 @@ private static <K, V> CompletableFuture<StatefulRedisMasterReplicaConnection<K,
return new SentinelConnector<>(redisClient, codec, first).connectAsync();
}

return new StaticUpstreamReplicaConnector<>(redisClient, codec, uriList).connectAsync();
return new StaticMasterReplicaConnector<>(redisClient, codec, uriList).connectAsync();
}

private static boolean isSentinel(RedisURI redisURI) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.lettuce.core.RedisException;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.masterreplica.UpstreamReplicaConnectionProvider.Intent;
import io.lettuce.core.masterreplica.MasterReplicaConnectionProvider.Intent;
import io.lettuce.core.protocol.ConnectionFacade;
import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.protocol.RedisCommand;
Expand All @@ -34,19 +34,19 @@
*
* @author Mark Paluch
*/
class UpstreamReplicaChannelWriter implements RedisChannelWriter {
class MasterReplicaChannelWriter implements RedisChannelWriter {

private UpstreamReplicaConnectionProvider<?, ?> upstreamReplicaConnectionProvider;
private MasterReplicaConnectionProvider<?, ?> masterReplicaConnectionProvider;

private final ClientResources clientResources;

private boolean closed = false;

private boolean inTransaction;

UpstreamReplicaChannelWriter(UpstreamReplicaConnectionProvider<?, ?> upstreamReplicaConnectionProvider,
MasterReplicaChannelWriter(MasterReplicaConnectionProvider<?, ?> masterReplicaConnectionProvider,
ClientResources clientResources) {
this.upstreamReplicaConnectionProvider = upstreamReplicaConnectionProvider;
this.masterReplicaConnectionProvider = masterReplicaConnectionProvider;
this.clientResources = clientResources;
}

Expand All @@ -65,7 +65,7 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
}

Intent intent = inTransaction ? Intent.WRITE : getIntent(command.getType());
CompletableFuture<StatefulRedisConnection<K, V>> future = (CompletableFuture) upstreamReplicaConnectionProvider
CompletableFuture<StatefulRedisConnection<K, V>> future = (CompletableFuture) masterReplicaConnectionProvider
.getConnectionAsync(intent);

if (isEndTransaction(command.getType())) {
Expand Down Expand Up @@ -118,7 +118,7 @@ private static <K, V> void writeCommand(RedisCommand<K, V, ?> command, StatefulR
// Currently: Retain order
Intent intent = inTransaction ? Intent.WRITE : getIntent(commands);

CompletableFuture<StatefulRedisConnection<K, V>> future = (CompletableFuture) upstreamReplicaConnectionProvider
CompletableFuture<StatefulRedisConnection<K, V>> future = (CompletableFuture) masterReplicaConnectionProvider
.getConnectionAsync(intent);

for (RedisCommand<K, V, ?> command : commands) {
Expand Down Expand Up @@ -207,9 +207,9 @@ public CompletableFuture<Void> closeAsync() {

CompletableFuture<Void> future = null;

if (upstreamReplicaConnectionProvider != null) {
future = upstreamReplicaConnectionProvider.closeAsync();
upstreamReplicaConnectionProvider = null;
if (masterReplicaConnectionProvider != null) {
future = masterReplicaConnectionProvider.closeAsync();
masterReplicaConnectionProvider = null;
}

if (future == null) {
Expand All @@ -219,8 +219,8 @@ public CompletableFuture<Void> closeAsync() {
return future;
}

UpstreamReplicaConnectionProvider<?, ?> getUpstreamReplicaConnectionProvider() {
return upstreamReplicaConnectionProvider;
MasterReplicaConnectionProvider<?, ?> getUpstreamReplicaConnectionProvider() {
return masterReplicaConnectionProvider;
}

@Override
Expand All @@ -234,17 +234,17 @@ public ClientResources getClientResources() {

@Override
public void setAutoFlushCommands(boolean autoFlush) {
upstreamReplicaConnectionProvider.setAutoFlushCommands(autoFlush);
masterReplicaConnectionProvider.setAutoFlushCommands(autoFlush);
}

@Override
public void flushCommands() {
upstreamReplicaConnectionProvider.flushCommands();
masterReplicaConnectionProvider.flushCommands();
}

@Override
public void reset() {
upstreamReplicaConnectionProvider.reset();
masterReplicaConnectionProvider.reset();
}

/**
Expand All @@ -254,7 +254,7 @@ public void reset() {
* @param readFrom the read from setting, must not be {@code null}
*/
public void setReadFrom(ReadFrom readFrom) {
upstreamReplicaConnectionProvider.setReadFrom(readFrom);
masterReplicaConnectionProvider.setReadFrom(readFrom);
}

/**
Expand All @@ -263,7 +263,7 @@ public void setReadFrom(ReadFrom readFrom) {
* @return the read from setting
*/
public ReadFrom getReadFrom() {
return upstreamReplicaConnectionProvider.getReadFrom();
return masterReplicaConnectionProvider.getReadFrom();
}

private static boolean isSuccessfullyCompleted(CompletableFuture<?> connectFuture) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
* @author Mark Paluch
* @since 4.1
*/
class UpstreamReplicaConnectionProvider<K, V> {
class MasterReplicaConnectionProvider<K, V> {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(UpstreamReplicaConnectionProvider.class);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MasterReplicaConnectionProvider.class);

private final boolean debugEnabled = logger.isDebugEnabled();

Expand All @@ -61,7 +61,7 @@ class UpstreamReplicaConnectionProvider<K, V> {

private ReadFrom readFrom;

UpstreamReplicaConnectionProvider(RedisClient redisClient, RedisCodec<K, V> redisCodec, RedisURI initialRedisUri,
MasterReplicaConnectionProvider(RedisClient redisClient, RedisCodec<K, V> redisCodec, RedisURI initialRedisUri,
Map<RedisURI, StatefulRedisConnection<K, V>> initialConnections) {

this.initialRedisUri = initialRedisUri;
Expand All @@ -77,9 +77,9 @@ class UpstreamReplicaConnectionProvider<K, V> {
}

/**
* Retrieve a {@link StatefulRedisConnection} by the intent. {@link UpstreamReplicaConnectionProvider.Intent#WRITE}
* intentions use the master connection, {@link UpstreamReplicaConnectionProvider.Intent#READ} intentions lookup one or more
* read candidates using the {@link ReadFrom} setting.
* Retrieve a {@link StatefulRedisConnection} by the intent. {@link MasterReplicaConnectionProvider.Intent#WRITE} intentions
* use the master connection, {@link MasterReplicaConnectionProvider.Intent#READ} intentions lookup one or more read
* candidates using the {@link ReadFrom} setting.
*
* @param intent command intent
* @return the connection.
Expand All @@ -98,9 +98,9 @@ public StatefulRedisConnection<K, V> getConnection(Intent intent) {
}

/**
* Retrieve a {@link StatefulRedisConnection} by the intent. {@link UpstreamReplicaConnectionProvider.Intent#WRITE}
* intentions use the master connection, {@link UpstreamReplicaConnectionProvider.Intent#READ} intentions lookup one or more
* read candidates using the {@link ReadFrom} setting.
* Retrieve a {@link StatefulRedisConnection} by the intent. {@link MasterReplicaConnectionProvider.Intent#WRITE} intentions
* use the master connection, {@link MasterReplicaConnectionProvider.Intent#READ} intentions lookup one or more read
* candidates using the {@link ReadFrom} setting.
*
* @param intent command intent
* @return the connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
import io.lettuce.core.codec.RedisCodec;

/**
* Interface declaring an asynchronous connect method to connect a Upstream/Replica setup.
* Interface declaring an asynchronous connect method to connect a Master/Replica setup.
*
* @author Mark Paluch
* @since 5.1
*/
interface UpstreamReplicaConnector<K, V> {
interface MasterReplicaConnector<K, V> {

/**
* Asynchronously connect to a Upstream/Replica setup given {@link RedisCodec}.
* Asynchronously connect to a Master/Replica setup given {@link RedisCodec}.
*
* @return Future that is notified about the connection progress.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
*
* @author Mark Paluch
*/
class UpstreamReplicaTopologyRefresh {
class MasterReplicaTopologyRefresh {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(UpstreamReplicaTopologyRefresh.class);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MasterReplicaTopologyRefresh.class);

private static final StringCodec CODEC = StringCodec.UTF8;

Expand All @@ -49,11 +49,11 @@ class UpstreamReplicaTopologyRefresh {

private final ScheduledExecutorService eventExecutors;

UpstreamReplicaTopologyRefresh(RedisClient client, TopologyProvider topologyProvider) {
MasterReplicaTopologyRefresh(RedisClient client, TopologyProvider topologyProvider) {
this(new RedisClientNodeConnectionFactory(client), client.getResources().eventExecutorGroup(), topologyProvider);
}

UpstreamReplicaTopologyRefresh(NodeConnectionFactory nodeConnectionFactory, ScheduledExecutorService eventExecutors,
MasterReplicaTopologyRefresh(NodeConnectionFactory nodeConnectionFactory, ScheduledExecutorService eventExecutors,
TopologyProvider topologyProvider) {

this.nodeConnectionFactory = nodeConnectionFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
* @author Mark Paluch
* @author Adam McElwee
*/
class RedisUpstreamReplicaNode implements RedisNodeDescription {
class RedisMasterReplicaNode implements RedisNodeDescription {

private final RedisURI redisURI;

private final Role role;

RedisUpstreamReplicaNode(String host, int port, RedisURI seed, Role role) {
RedisMasterReplicaNode(String host, int port, RedisURI seed, Role role) {

this.redisURI = RedisURI.builder(seed).withHost(host).withPort(port).build();
this.role = role;
Expand All @@ -50,10 +50,10 @@ public Role getRole() {
public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof RedisUpstreamReplicaNode))
if (!(o instanceof RedisMasterReplicaNode))
return false;

RedisUpstreamReplicaNode that = (RedisUpstreamReplicaNode) o;
RedisMasterReplicaNode that = (RedisMasterReplicaNode) o;

if (!redisURI.equals(that.redisURI))
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private List<RedisNodeDescription> getReplicasFromInfo(String info) {
String ip = getNested(IP_PATTERN, group, 1);
String port = getNested(PORT_PATTERN, group, 1);

replicas.add(new RedisUpstreamReplicaNode(ip, Integer.parseInt(port), redisURI, RedisInstance.Role.SLAVE));
replicas.add(new RedisMasterReplicaNode(ip, Integer.parseInt(port), redisURI, RedisInstance.Role.SLAVE));
}

return replicas;
Expand All @@ -163,7 +163,7 @@ private RedisNodeDescription getMasterFromInfo(String info) {
String host = masterHostMatcher.group(1);
int port = Integer.parseInt(masterPortMatcher.group(1));

return new RedisUpstreamReplicaNode(host, port, redisURI, RedisInstance.Role.UPSTREAM);
return new RedisMasterReplicaNode(host, port, redisURI, RedisInstance.Role.UPSTREAM);
}

private String getNested(Pattern pattern, String string, int group) {
Expand Down Expand Up @@ -196,7 +196,7 @@ private RedisNodeDescription getRedisNodeDescription(Matcher matcher) {
+ RedisInstance.Role.REPLICA);
}

return new RedisUpstreamReplicaNode(redisURI.getHost(), redisURI.getPort(), redisURI, role);
return new RedisMasterReplicaNode(redisURI.getHost(), redisURI.getPort(), redisURI, role);
}

}
Loading

0 comments on commit dcccba7

Please sign in to comment.