Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

PubSubReactiveFactory.poll doesn't handle exceptions thrown by the PubSubSubscriberOperations #2229

Closed
mzeijen opened this issue Mar 3, 2020 · 1 comment · Fixed by #2230

Comments

@mzeijen
Copy link
Contributor

mzeijen commented Mar 3, 2020

Describe the bug
If you use the PubSubReactiveFactory to get a Flux of messages using poll, and the underlying SubscriberOperations.pull fails with an unexpected runtime exception then it doesn't correctly deal with this exception.

With both the NonBlockingUnlimitedDemandPullTask and BlockingLimitedDemandPullTask the exceptions aren't passed upstream to the sink. This means that the flux can't define how to deal with the error. Maybe it just wants to retry immediately, have an incremental backoff or maybe cancel the subscription.

It could also be a choice that the upstream doesn't need to deal with any pubsub client exceptions, but then I would at least expect logging. Also in that case, the client preferably should be able to configure that it wants to handle the errors itself.

Using the backpressure based pull, there is an additional bug with handling exceptions. When an exception is thrown, this will now result in a "dead" flux that won't receive any messages anymore. This is because the exception in the BlockingLimitedDemandPullTask will kill the task and it isn't restarted. The Flux doesn't know that it is dead and will simply keep on waiting for messages that will never arrive.

Sample
The following test case shows the problem:


import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.Status;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations;
import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class PubSubReactiveFactoryExceptionHandlingTests {

	@Mock
	PubSubSubscriberOperations subscriberOperations;

	PubSubReactiveFactory factory;

	@Before
	public void setUp() {
		// Note: This doesn't use any virtual time scheduler because that doesn't realistically deal with exceptions
		factory = new PubSubReactiveFactory(subscriberOperations, Schedulers.elastic());
	}

	@Test(timeout = 1000)
	public void exceptionInBackpressureBasedPullShouldResultInAnErrorInFlux() {
		setUpMessages("msg1", "msg2", "throw");

		StepVerifier.create(factory.poll("sub1", 10).map(this::messageToString), 2)
				.expectSubscription()
				.expectNext("msg1", "msg2")
				.thenRequest(2)
				.expectError(RuntimeException.class)
				.verify(Duration.ofSeconds(1));

		InOrder methodOrder = Mockito.inOrder(this.subscriberOperations);
		methodOrder.verify(this.subscriberOperations, times(2)).pull("sub1", 2, false);
		methodOrder.verifyNoMoreInteractions();
	}

	@Test()
	public void exceptionInUnlimitedPullShouldResultInAnErrorInFlux() {
		setUpMessages("msg1", "msg2", "stop", "throw");

		StepVerifier.create(factory.poll("sub1", 10).map(this::messageToString))
				.expectSubscription()
				.expectNext("msg1", "msg2")
				.expectError(RuntimeException.class)
				.verify(Duration.ofSeconds(1));

		InOrder methodOrder = Mockito.inOrder(this.subscriberOperations);
		methodOrder.verify(this.subscriberOperations, times(2)).pull("sub1", Integer.MAX_VALUE, true);
		methodOrder.verifyNoMoreInteractions();
	}

	// This one actually does currently work because of the reschedule of the the task.
	@Test
	public void exceptionInUnlimitedPullCurrentlyIsSimplyIgnoredAndFluxKeepsProvidingMessages() {
		setUpMessages("msg1", "msg2", "stop", "throw", "msg3", "msg4", "stop");

		StepVerifier.create(factory.poll("sub1", 100).map(this::messageToString))
				.expectSubscription()
				.expectNext("msg1", "msg2")
				.thenAwait(Duration.ofMillis(200))
				.expectNext("msg3", "msg4")
				.thenCancel()
				.verify(Duration.ofSeconds(1));

		InOrder methodOrder = Mockito.inOrder(this.subscriberOperations);
		methodOrder.verify(this.subscriberOperations, times(3)).pull("sub1", Integer.MAX_VALUE, true);
		methodOrder.verifyNoMoreInteractions();
	}

	private String messageToString(AcknowledgeablePubsubMessage message) {
		return new String(message.getPubsubMessage().getData().toByteArray(), Charset.defaultCharset());
	}

	/**
	 * Replays provided messages.
	 * If a synthetic message "stop" is encountered, immediately returns previously collected messages.
	 * If a synthetic message "throw" is encountered, throws an {@link DeadlineExceededException}.
	 * Fails the calling test if there are not enough messages to fulfill demand from cumulative calls to {@code pull()}.
	 * @param messages messages to replay
	 */
	private void setUpMessages(String... messages) {
		List<String> msgList = new ArrayList<>(Arrays.asList(messages));

		when(subscriberOperations.pull(eq("sub1"), any(Integer.class), any(Boolean.class))).then(invocationOnMock -> {
			List<AcknowledgeablePubsubMessage> result = new ArrayList<>();
			for (int i = 0; i < (Integer) invocationOnMock.getArgument(1); i++) {
				if (msgList.isEmpty()) {
					fail("Ran out of provided messages.");
				}

				String nextPayload = msgList.remove(0);
				switch (nextPayload) {
					case "stop":
						return result;
					case "timeout":
						if (!result.isEmpty()) {
							fail("Bad setup -- 'throw' should be the first event in batch");
						}
						throw new DeadlineExceededException("this is a noop", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true);
					case "throw":
						throw new RuntimeException("exception during pull of messages");
				}

				AcknowledgeablePubsubMessage msg = mock(AcknowledgeablePubsubMessage.class);
				PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
						.setData(ByteString.copyFrom((nextPayload).getBytes()))
						.build();
				when(msg.getPubsubMessage()).thenReturn(pubsubMessage);
				result.add(msg);
			}
			return result;
		});
	}
}
@mzeijen
Copy link
Contributor Author

mzeijen commented Mar 3, 2020

I created the #2230 pull request, that fixes the issues where the exception is passed to the Flux sink.

elefeint pushed a commit that referenced this issue Mar 3, 2020
…tion is published to the Flux sink (#2230)

Fixes #2229 by publishing the exception to the Flux sink.

This may be partially seen as a breaking change as it changes the behaviour for the unlimited poll because the unlimited poll would previously ignore exceptions and keep on being rescheduled.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
1 participant