diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java index 1c6aaae706..1a5af4a1b6 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java @@ -34,6 +34,8 @@ import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.ErrorMessage; +import org.springframework.util.concurrent.ListenableFutureCallback; /** * Message channel binder for Pub/Sub. @@ -74,6 +76,21 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin PubSubMessageHandler messageHandler = new PubSubMessageHandler(this.pubSubTemplate, destination.getName()); messageHandler.setBeanFactory(getBeanFactory()); messageHandler.setSync(producerProperties.getExtension().isSync()); + if (errorChannel != null) { + messageHandler.setPublishCallback(new ListenableFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + System.out.println("Sending throwable to error channel: " + throwable); + errorChannel.send(new ErrorMessage(throwable)); + } + + @Override + public void onSuccess(String messageId) { + // no-op + System.out.println("Successfully published message " + messageId); + } + }); + } return messageHandler; } diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/TempClientLibraryTest.java b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/TempClientLibraryTest.java new file mode 100644 index 0000000000..fb4f8ae093 --- /dev/null +++ b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/TempClientLibraryTest.java @@ -0,0 +1,124 @@ +package org.springframework.cloud.gcp.stream.binder.pubsub; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.awaitility.Awaitility; +import org.awaitility.Duration; +import org.junit.Test; + +public class TempClientLibraryTest { + + /* This test fails on timeout: because client library retries indefinitely, the failed publish never calls + * the onFailure callback. */ + @Test + public void failedPublishCallsListener() throws IOException, InterruptedException { + Publisher publisher = Publisher.newBuilder("projects/elfel-spring/topics/exampleTopic") + .setEndpoint("cecicestnespasunendpoint:443") + .build(); + + PubsubMessage message = PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("test message")) + .build(); + + AtomicBoolean errorSet = new AtomicBoolean(false); + + ApiFutures.addCallback(publisher.publish(message), + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + System.out.println("Sending throwable to error channel: " + throwable); + errorSet.set(true); + } + + @Override + public void onSuccess(String messageId) { + System.out.println("Successfully published message " + messageId); + } + }); + + Awaitility.await() + .atMost(Duration.TWO_MINUTES) + .until(() -> errorSet.get()); + + } + + + /* This test is identical to the one above, but with overridden retry settings. + Therefore, it works as expected, invoking the onFailure callback. */ + @Test + public void failedPublishCallsListener_overriddenRetrySettingsAllowsAsyncPublishToProperlyFail() throws IOException, InterruptedException { + Publisher publisher = Publisher.newBuilder("projects/elfel-spring/topics/exampleTopic") + .setEndpoint("cecicestnespasunendpoint:443") + .setRetrySettings( + RetrySettings.newBuilder() + .setMaxAttempts(3) + .setTotalTimeout(org.threeten.bp.Duration.ofSeconds(10)) + .setInitialRpcTimeout(org.threeten.bp.Duration.ofSeconds(10)) + .setMaxRpcTimeout(org.threeten.bp.Duration.ofSeconds(20)) // TODO: bad message; try 9; longer not shorter + .build()) + .build(); + + PubsubMessage message = PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("test message")) + .build(); + + AtomicBoolean errorSet = new AtomicBoolean(false); + + ApiFutures.addCallback(publisher.publish(message), + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + System.out.println("Sending throwable to error channel: " + throwable); + errorSet.set(true); + } + + @Override + public void onSuccess(String messageId) { + System.out.println("Successfully published message " + messageId); + } + }); + + Awaitility.await() + .atMost(Duration.TWO_MINUTES) + .until(() -> errorSet.get()); + + } + + @Test + public void successfulPublishCallsListener() throws IOException, InterruptedException { + Publisher publisher = Publisher.newBuilder("projects/elfel-spring/topics/exampleTopic") + .build(); + + PubsubMessage message = PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("test successful message")) + .build(); + + AtomicBoolean successSet = new AtomicBoolean(false); + + ApiFutures.addCallback(publisher.publish(message), + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + System.out.println("publish errored out: " + throwable); + } + + @Override + public void onSuccess(String messageId) { + System.out.println("Successfully published message " + messageId); + successSet.set(true); + } + }); + + Awaitility.await() + .atMost(Duration.FIVE_SECONDS) + .until(() -> successSet.get()); + + } +} diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-binder-sample/src/main/resources/application.properties b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-binder-sample/src/main/resources/application.properties index dd7aaadca0..8d250a993a 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-binder-sample/src/main/resources/application.properties +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-binder-sample/src/main/resources/application.properties @@ -1,6 +1,8 @@ spring.cloud.stream.bindings.input.destination=my-topic spring.cloud.stream.bindings.output.destination=my-topic +spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true + # If group is specified, the Pub/Sub subscription name will be [PUBSUB_TOPIC_NAME].[PUBSUB_GROUP_NAME] spring.cloud.stream.bindings.input.group=my-group