Skip to content

Commit

Permalink
Merge pull request #437 from rabbitmq/do-not-unsubscribe-with-closed-…
Browse files Browse the repository at this point in the history
…connection

Do not unsubscribe with closed client
  • Loading branch information
acogoluegnes authored Oct 4, 2023
2 parents 3f51f48 + 3fa5a50 commit 20282d9
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 8 deletions.
6 changes: 6 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -2029,8 +2029,14 @@ public void done() {
}
}

static Response responseOk() {
return Response.OK;
}

public static class Response {

private static final Response OK = new Response(RESPONSE_CODE_OK);

private final short responseCode;

public Response(short responseCode) {
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,10 @@ private void assignConsumersToStream(
boolean maybeCloseClient) {
Runnable consumersClosingCallback =
() -> {
LOGGER.debug(
"Running consumer closing callback after recovery failure, "
+ "closing {} subscription(s)",
subscriptions.size());
for (SubscriptionTracker affectedSubscription : subscriptions) {
try {
affectedSubscription.consumer.closeAfterStreamDeletion();
Expand Down Expand Up @@ -1078,7 +1082,13 @@ synchronized void remove(SubscriptionTracker subscriptionTracker) {
try {
Client.Response unsubscribeResponse =
Utils.callAndMaybeRetry(
() -> client.unsubscribe(subscriptionIdInClient),
() -> {
if (client.isOpen()) {
return client.unsubscribe(subscriptionIdInClient);
} else {
return Client.responseOk();
}
},
RETRY_ON_TIMEOUT,
"Unsubscribe request for consumer %d on stream '%s'",
subscriptionTracker.consumer.id(),
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/rabbitmq/stream/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ static <T> T callAndMaybeRetry(
while (keepTrying) {
try {
attempt++;
LOGGER.debug("Starting attempt #{} for operation '{}'", attempt, description);
T result = operation.call();
Duration operationDuration = Duration.ofNanos(System.nanoTime() - startTime);
LOGGER.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

import com.rabbitmq.stream.*;
import com.rabbitmq.stream.codec.WrapperMessageBuilder;
Expand Down Expand Up @@ -459,6 +456,42 @@ void subscribeShouldSubscribeToStreamAndDispatchMessage_UnsubscribeShouldUnsubsc
assertThat(messageHandlerCalls.get()).isEqualTo(1);
}

@Test
void shouldNotUnsubscribeIfClientIsClosed() {
when(locator.metadata("stream")).thenReturn(metadata(null, replicas()));

when(clientFactory.client(any())).thenReturn(client);
when(client.subscribe(
subscriptionIdCaptor.capture(),
anyString(),
any(OffsetSpecification.class),
anyInt(),
anyMap()))
.thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));

Runnable closingRunnable =
coordinator.subscribe(
consumer,
"stream",
OffsetSpecification.first(),
null,
NO_OP_SUBSCRIPTION_LISTENER,
() -> {},
(offset, message) -> {},
Collections.emptyMap(),
flowStrategy());
verify(clientFactory, times(1)).client(any());
verify(client, times(1))
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());

when(client.isOpen()).thenReturn(false);
when(client.unsubscribe(subscriptionIdCaptor.getValue()))
.thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));

closingRunnable.run();
verify(client, never()).unsubscribe(subscriptionIdCaptor.getValue());
}

@Test
void subscribeShouldSubscribeToStreamAndDispatchMessageWithManySubscriptions() {
when(locator.metadata("stream")).thenReturn(metadata(leader(), null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ void consumerShouldBeClosedWhenStreamGetsDeleted(TestInfo info) throws Exception
environment.deleteStream(s);

TestUtils.waitAtMost(10, () -> !consumer.isOpen());

assertThat(consumer.isOpen()).isFalse();
}

Expand Down
2 changes: 0 additions & 2 deletions src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
</appender>

<logger name="com.rabbitmq.stream" level="warn" />
<logger name="com.rabbitmq.stream.perf" level="info" />

<logger name="com.rabbitmq.stream.perf.Version" level="error" />
<logger name="org.eclipse.jetty" level="warn" />

<root level="info">
<appender-ref ref="STDOUT" />
Expand Down

0 comments on commit 20282d9

Please sign in to comment.