From 566eb9e92217ecda8a23dce83bd292ae692f7a32 Mon Sep 17 00:00:00 2001 From: Elena Felder <41136058+elefeint@users.noreply.github.com> Date: Tue, 25 Aug 2020 22:15:21 -0400 Subject: [PATCH 1/2] WIP binder producer error propagation --- .../pubsub/PubSubMessageChannelBinder.java | 17 ++++ .../binder/pubsub/TempClientLibraryTest.java | 81 +++++++++++++++++++ .../src/main/resources/application.properties | 2 + 3 files changed, 100 insertions(+) create mode 100644 spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/TempClientLibraryTest.java diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java index 1c6aaae706..1a5af4a1b6 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java @@ -34,6 +34,8 @@ import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.ErrorMessage; +import org.springframework.util.concurrent.ListenableFutureCallback; /** * Message channel binder for Pub/Sub. @@ -74,6 +76,21 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin PubSubMessageHandler messageHandler = new PubSubMessageHandler(this.pubSubTemplate, destination.getName()); messageHandler.setBeanFactory(getBeanFactory()); messageHandler.setSync(producerProperties.getExtension().isSync()); + if (errorChannel != null) { + messageHandler.setPublishCallback(new ListenableFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + System.out.println("Sending throwable to error channel: " + throwable); + errorChannel.send(new ErrorMessage(throwable)); + } + + @Override + public void onSuccess(String messageId) { + // no-op + System.out.println("Successfully published message " + messageId); + } + }); + } return messageHandler; } diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/TempClientLibraryTest.java b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/TempClientLibraryTest.java new file mode 100644 index 0000000000..c3de40663b --- /dev/null +++ b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/TempClientLibraryTest.java @@ -0,0 +1,81 @@ +package org.springframework.cloud.gcp.stream.binder.pubsub; + +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.awaitility.Awaitility; +import org.awaitility.Duration; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TempClientLibraryTest { + + @Test + public void failedPublishCallsListener() throws IOException, InterruptedException { + Publisher publisher = Publisher.newBuilder("projects/elfel-spring/topics/exampleTopic") + .setEndpoint("cecicestnespasunendpoint:443") + .build(); + + PubsubMessage message = PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("test message")) + .build(); + + AtomicBoolean errorSet = new AtomicBoolean(false); + + ApiFutures.addCallback(publisher.publish(message), + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + System.out.println("Sending throwable to error channel: " + throwable); + errorSet.set(true); + } + + @Override + public void onSuccess(String messageId) { + System.out.println("Successfully published message " + messageId); + } + }); + + Awaitility.await() + .atMost(Duration.TWO_MINUTES) + .until(() -> errorSet.get()); + + } + + + @Test + public void successfulPublishCallsListener() throws IOException, InterruptedException { + Publisher publisher = Publisher.newBuilder("projects/elfel-spring/topics/exampleTopic") + .build(); + + PubsubMessage message = PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("test successful message")) + .build(); + + AtomicBoolean successSet = new AtomicBoolean(false); + + ApiFutures.addCallback(publisher.publish(message), + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + System.out.println("publish errored out: " + throwable); + } + + @Override + public void onSuccess(String messageId) { + System.out.println("Successfully published message " + messageId); + successSet.set(true); + } + }); + + Awaitility.await() + .atMost(Duration.FIVE_SECONDS) + .until(() -> successSet.get()); + + } +} diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-binder-sample/src/main/resources/application.properties b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-binder-sample/src/main/resources/application.properties index dd7aaadca0..8d250a993a 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-binder-sample/src/main/resources/application.properties +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-binder-sample/src/main/resources/application.properties @@ -1,6 +1,8 @@ spring.cloud.stream.bindings.input.destination=my-topic spring.cloud.stream.bindings.output.destination=my-topic +spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true + # If group is specified, the Pub/Sub subscription name will be [PUBSUB_TOPIC_NAME].[PUBSUB_GROUP_NAME] spring.cloud.stream.bindings.input.group=my-group From 03e97375a463599e7e06ea8f5c9db5d51aa800b1 Mon Sep 17 00:00:00 2001 From: Elena Felder <41136058+elefeint@users.noreply.github.com> Date: Thu, 27 Aug 2020 19:52:50 -0400 Subject: [PATCH 2/2] override retry settings in test copy --- .../binder/pubsub/TempClientLibraryTest.java | 51 +++++++++++++++++-- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/TempClientLibraryTest.java b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/TempClientLibraryTest.java index c3de40663b..fb4f8ae093 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/TempClientLibraryTest.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/TempClientLibraryTest.java @@ -1,7 +1,11 @@ package org.springframework.cloud.gcp.stream.binder.pubsub; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; @@ -9,12 +13,10 @@ import org.awaitility.Duration; import org.junit.Test; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - public class TempClientLibraryTest { + /* This test fails on timeout: because client library retries indefinitely, the failed publish never calls + * the onFailure callback. */ @Test public void failedPublishCallsListener() throws IOException, InterruptedException { Publisher publisher = Publisher.newBuilder("projects/elfel-spring/topics/exampleTopic") @@ -48,6 +50,47 @@ public void onSuccess(String messageId) { } + /* This test is identical to the one above, but with overridden retry settings. + Therefore, it works as expected, invoking the onFailure callback. */ + @Test + public void failedPublishCallsListener_overriddenRetrySettingsAllowsAsyncPublishToProperlyFail() throws IOException, InterruptedException { + Publisher publisher = Publisher.newBuilder("projects/elfel-spring/topics/exampleTopic") + .setEndpoint("cecicestnespasunendpoint:443") + .setRetrySettings( + RetrySettings.newBuilder() + .setMaxAttempts(3) + .setTotalTimeout(org.threeten.bp.Duration.ofSeconds(10)) + .setInitialRpcTimeout(org.threeten.bp.Duration.ofSeconds(10)) + .setMaxRpcTimeout(org.threeten.bp.Duration.ofSeconds(20)) // TODO: bad message; try 9; longer not shorter + .build()) + .build(); + + PubsubMessage message = PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("test message")) + .build(); + + AtomicBoolean errorSet = new AtomicBoolean(false); + + ApiFutures.addCallback(publisher.publish(message), + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + System.out.println("Sending throwable to error channel: " + throwable); + errorSet.set(true); + } + + @Override + public void onSuccess(String messageId) { + System.out.println("Successfully published message " + messageId); + } + }); + + Awaitility.await() + .atMost(Duration.TWO_MINUTES) + .until(() -> errorSet.get()); + + } + @Test public void successfulPublishCallsListener() throws IOException, InterruptedException { Publisher publisher = Publisher.newBuilder("projects/elfel-spring/topics/exampleTopic")