Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log messages to recovery test #665

Merged
merged 8 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ Used as a prefix for connection names.
|Contract to change resolved node address to connect to.
|Pass-through (no-op)

|`locatorConnectionCount`
|Number of locator connections to maintain (for metadata search)
|The smaller of the number of URIs and 3.

|`tls`
|Configuration helper for TLS.
|TLS is enabled if a `rabbitmq-stream+tls` URI is provided.
Expand Down Expand Up @@ -293,8 +297,12 @@ include::{test-examples}/EnvironmentUsage.java[tag=address-resolver]
<1> Set the load balancer address
<2> Use load balancer address for initial connection
<3> Ignore metadata hints, always use load balancer
<4> Set the number of locator connections to maintain

The blog post covers the https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams/#client-workaround-with-a-load-balancer[underlying details of this workaround].
Note the example above sets the number of locator connections the environment maintains.
Locator connections are used to perform infrastructure-related operations (e.g. looking up the topology of a stream to find an appropriate node to connect to).
The environment uses the number of passed-in URIs to choose an appropriate default number and will pick 1 in this case, which may be too low for a cluster deployment.
This is why it is recommended to set the value explicitly, 3 being a good default.

==== Managing Streams

Expand Down
30 changes: 27 additions & 3 deletions src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.compression.CompressionCodecFactory;
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder;
import com.rabbitmq.stream.metrics.MetricsCollector;
import com.rabbitmq.stream.sasl.CredentialsProvider;
import com.rabbitmq.stream.sasl.SaslConfiguration;
Expand Down Expand Up @@ -62,14 +63,15 @@ public interface EnvironmentBuilder {
* An {@link AddressResolver} to potentially change resolved node address to connect to.
*
* <p>Applications can use this abstraction to make sure connection attempts ignore metadata hints
* and always go to a single point like a load balancer.
* and always go to a single point like a load balancer. Consider setting {@link
* #locatorConnectionCount(int)} when using a load balancer.
*
* <p>The default implementation does not perform any logic, it just returns the passed-in
* address.
*
* <p><i>The default implementation is overridden automatically if the following conditions are
* met: the host to connect to is <code>localhost</code>, the user is <code>guest</code>, and no
* address resolver has been provided. The client will then always tries to connect to <code>
* address resolver has been provided. The client will then always try to connect to <code>
* localhost</code> to facilitate local development. Just provide a pass-through address resolver
* to avoid this behavior, e.g.:</i>
*
Expand All @@ -79,10 +81,11 @@ public interface EnvironmentBuilder {
* .build();
* </pre>
*
* @param addressResolver
* @param addressResolver the address resolver
* @return this builder instance
* @see <a href="https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/">"Connecting to
* Streams" blog post</a>
* @see #locatorConnectionCount(int)
*/
EnvironmentBuilder addressResolver(AddressResolver addressResolver);

Expand Down Expand Up @@ -395,6 +398,27 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
*/
EnvironmentBuilder forceLeaderForProducers(boolean forceLeader);

/**
* Set the expected number of "locator" connections to maintain.
*
* <p>Locator connections are used to perform infrastructure-related operations (e.g. looking up
* the topology of a stream to find an appropriate node to connect to).
*
* <p>It is recommended to maintain 2 to 3 locator connections. The environment uses the smaller
* of the number of passed-in URIs and 3 by default (see {@link #uris(List)}).
*
* <p>The number of locator connections should be explicitly set when a load balancer is used, as
* the environment cannot know the number of cluster nodes in this case (the only URI set is the
* one of the load balancer).
*
* @param locatorConnectionCount number of expected locator connections
* @return this builder instance
* @see #uris(List)
* @see #addressResolver(AddressResolver)
* @since 0.21.0
*/
StreamEnvironmentBuilder locatorConnectionCount(int locatorConnectionCount);

/**
* Create the {@link Environment} instance.
*
Expand Down
14 changes: 11 additions & 3 deletions src/main/java/com/rabbitmq/stream/Message.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand Down Expand Up @@ -32,18 +32,26 @@ public interface Message {
/**
* Does this message has a publishing ID?
*
* <p>Publishing IDs are used for de-duplication of outbound messages. They are not persisted.
* <p>Publishing IDs are used for deduplication of outbound messages. They are not persisted.
*
* @return true if the message has a publishing ID, false otherwise
* @see ProducerBuilder#name(String)
* @see <a
* href="https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#outbound-message-deduplication">Deduplication
* documentation</a>
*/
boolean hasPublishingId();

/**
* Get the publishing ID for the message.
*
* <p>Publishing IDs are used for de-duplication of outbound messages. They are not persisted.
* <p>Publishing IDs are used for deduplication of outbound messages. They are not persisted.
*
* @return the publishing ID of the message
* @see ProducerBuilder#name(String)
* @see <a
* href="https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#outbound-message-deduplication">Deduplication
* documentation</a>
*/
long getPublishingId();

Expand Down
8 changes: 6 additions & 2 deletions src/main/java/com/rabbitmq/stream/MessageBuilder.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand Down Expand Up @@ -38,12 +38,16 @@ public interface MessageBuilder {
Message build();

/**
* Set the publishing ID (for de-duplication).
* Set the publishing ID (for deduplication).
*
* <p>This is value is used only for outbound messages and is not persisted.
*
* @param publishingId
* @return this builder instance
* @see ProducerBuilder#name(String)
* @see <a
* href="https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#outbound-message-deduplication">Deduplication
* documentation</a>
*/
MessageBuilder publishingId(long publishingId);

Expand Down
10 changes: 8 additions & 2 deletions src/main/java/com/rabbitmq/stream/ProducerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
public interface ProducerBuilder {

/**
* The logical name of the producer.
* The producer name for deduplication (<b>read the <a
* href="https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#outbound-message-deduplication">documentation</a>
* before use</b>).
*
* <p>Set a value to enable de-duplication.
* <p>There must be only one producer instance at the same time using a given name.
*
* @param name
* @return this builder instance
* @see MessageBuilder#publishingId(long)
* @see <a
* href="https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#outbound-message-deduplication">Deduplication
* documentation</a>
*/
ProducerBuilder name(String name);

Expand Down
42 changes: 32 additions & 10 deletions src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.rabbitmq.stream.impl.Utils.*;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;

import com.rabbitmq.stream.*;
import com.rabbitmq.stream.MessageHandler.Context;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,7 +83,7 @@ class StreamEnvironment implements Environment {
private final ByteBufAllocator byteBufAllocator;
private final AtomicBoolean locatorsInitialized = new AtomicBoolean(false);
private final Runnable locatorInitializationSequence;
private final List<Locator> locators = new CopyOnWriteArrayList<>();
private final List<Locator> locators;
private final ExecutorServiceFactory executorServiceFactory;
private final ObservationCollector<?> observationCollector;

Expand All @@ -105,7 +107,8 @@ class StreamEnvironment implements Environment {
boolean forceReplicaForConsumers,
boolean forceLeaderForProducers,
Duration producerNodeRetryDelay,
Duration consumerNodeRetryDelay) {
Duration consumerNodeRetryDelay,
int expectedLocatorCount) {
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
this.byteBufAllocator = byteBufAllocator;
Expand Down Expand Up @@ -147,7 +150,7 @@ class StreamEnvironment implements Environment {
new Address(
uriItem.getHost() == null ? "localhost" : uriItem.getHost(),
uriItem.getPort() == -1 ? defaultPort : uriItem.getPort()))
.collect(Collectors.toList());
.collect(toList());
}

AddressResolver addressResolverToUse = addressResolver;
Expand Down Expand Up @@ -179,7 +182,24 @@ class StreamEnvironment implements Environment {

this.addressResolver = addressResolverToUse;

this.addresses.forEach(address -> this.locators.add(new Locator(address)));
int locatorCount;
if (expectedLocatorCount > 0) {
locatorCount = expectedLocatorCount;
} else {
locatorCount = Math.min(this.addresses.size(), 3);
}
LOGGER.debug("Using {} locator connection(s)", locatorCount);

List<Locator> lctrs =
IntStream.range(0, locatorCount)
.mapToObj(
i -> {
Address addr = this.addresses.get(i % this.addresses.size());
return new Locator(addr);
})
.collect(toList());
this.locators = List.copyOf(lctrs);

this.executorServiceFactory =
new DefaultExecutorServiceFactory(
this.addresses.size(), 1, "rabbitmq-stream-locator-connection-");
Expand Down Expand Up @@ -230,8 +250,8 @@ class StreamEnvironment implements Environment {
Runnable locatorInitSequence =
() -> {
RuntimeException lastException = null;
for (int i = 0; i < addresses.size(); i++) {
Address address = addresses.get(i);
for (int i = 0; i < locators.size(); i++) {
Address address = addresses.get(i % addresses.size());
Locator locator = locator(i);
address = addressResolver.resolve(address);
String connectionName = connectionNamingStrategy.apply(ClientConnectionType.LOCATOR);
Expand Down Expand Up @@ -290,10 +310,10 @@ private ShutdownListener shutdownListener(
Client.ShutdownListener shutdownListener =
shutdownContext -> {
if (shutdownContext.isShutdownUnexpected()) {
String label = locator.label();
locator.client(null);
LOGGER.debug(
"Unexpected locator disconnection for locator on '{}', trying to reconnect",
locator.label());
"Unexpected locator disconnection for locator on '{}', trying to reconnect", label);
try {
Client.ClientParameters newLocatorParameters =
this.locatorParametersCopy().shutdownListener(shutdownListenerReference.get());
Expand Down Expand Up @@ -742,7 +762,7 @@ static <T> T locatorOperation(
Function<Client, T> operation,
Supplier<Client> clientSupplier,
BackOffDelayPolicy backOffDelayPolicy) {
int maxAttempt = 5;
int maxAttempt = 3;
int attempt = 0;
boolean executed = false;
Exception lastException = null;
Expand Down Expand Up @@ -991,7 +1011,9 @@ private String label() {
if (c == null) {
return address.host() + ":" + address.port();
} else {
return c.getHost() + ":" + c.getPort();
return String.format(
"%s:%d [advertised %s:%d]",
c.getHost(), c.getPort(), c.serverAdvertisedHost(), c.serverAdvertisedPort());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
private ObservationCollector<?> observationCollector = ObservationCollector.NO_OP;
private Duration producerNodeRetryDelay = Duration.ofMillis(500);
private Duration consumerNodeRetryDelay = Duration.ofMillis(1000);
private int locatorConnectionCount = -1;

public StreamEnvironmentBuilder() {}

Expand Down Expand Up @@ -315,6 +316,12 @@ StreamEnvironmentBuilder consumerNodeRetryDelay(Duration consumerNodeRetryDelay)
return this;
}

@Override
public StreamEnvironmentBuilder locatorConnectionCount(int locatorCount) {
this.locatorConnectionCount = locatorCount;
return this;
}

@Override
public Environment build() {
if (this.compressionCodecFactory == null) {
Expand Down Expand Up @@ -349,7 +356,8 @@ public Environment build() {
this.forceReplicaForConsumers,
this.forceLeaderForProducers,
this.producerNodeRetryDelay,
this.consumerNodeRetryDelay);
this.consumerNodeRetryDelay,
this.locatorConnectionCount);
}

static final class DefaultTlsConfiguration implements TlsConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ void addressResolver() throws Exception {
.host(entryPoint.host()) // <2>
.port(entryPoint.port()) // <2>
.addressResolver(address -> entryPoint) // <3>
.locatorConnectionCount(3) // <4>
.build();
// end::address-resolver[]
}
Expand Down
Loading