diff --git a/docs/src/main/asciidoc/spring-stream.adoc b/docs/src/main/asciidoc/spring-stream.adoc index f0f719782b..3f54bc9cbd 100644 --- a/docs/src/main/asciidoc/spring-stream.adoc +++ b/docs/src/main/asciidoc/spring-stream.adoc @@ -40,6 +40,10 @@ If you are using Pub/Sub auto-configuration from the Spring Cloud GCP Pub/Sub St NOTE: To use this binder with a https://cloud.google.com/pubsub/docs/emulator[running emulator], configure its host and port via `spring.cloud.gcp.pubsub.emulator-host`. +==== Producer Synchronous Sending Configuration +By default, this binder will send messages to Cloud Pub/Sub asynchronously. +If synchronous sending is preferred (for example, to allow propagating errors back to the sender), set `spring.cloud.stream.gcp.pubsub.default.producer.sync` property to `true`. + ==== Producer Destination Configuration If automatic resource creation is turned ON and the topic corresponding to the destination name does not exist, it will be created. diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java index 8a11664934..1c6aaae706 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java @@ -73,6 +73,7 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin PubSubMessageHandler messageHandler = new PubSubMessageHandler(this.pubSubTemplate, destination.getName()); messageHandler.setBeanFactory(getBeanFactory()); + messageHandler.setSync(producerProperties.getExtension().isSync()); return messageHandler; } diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/properties/PubSubProducerProperties.java b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/properties/PubSubProducerProperties.java index 0d70c9fe13..48bf89bd83 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/properties/PubSubProducerProperties.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/properties/PubSubProducerProperties.java @@ -22,6 +22,16 @@ * @author João André Martins * @author Daniel Zou * @author Chengyuan Zhao + * @author Elena Felder */ public class PubSubProducerProperties extends PubSubCommonProperties { + private boolean sync = false; + + public boolean isSync() { + return sync; + } + + public void setSync(boolean sync) { + this.sync = sync; + } } diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinderTests.java b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinderTests.java index d32bcc2d87..21c3935aab 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinderTests.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinderTests.java @@ -22,19 +22,30 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.gcp.pubsub.PubSubAdmin; import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate; +import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler; +import org.springframework.cloud.gcp.stream.binder.pubsub.config.PubSubBinderConfiguration; import org.springframework.cloud.gcp.stream.binder.pubsub.properties.PubSubConsumerProperties; import org.springframework.cloud.gcp.stream.binder.pubsub.properties.PubSubExtendedBindingProperties; import org.springframework.cloud.gcp.stream.binder.pubsub.provisioning.PubSubChannelProvisioner; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.messaging.MessageChannel; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for channel binder. * * @author Mike Eltsufin + * @author Elena Felder * * @since 1.1 */ @@ -49,19 +60,36 @@ public class PubSubMessageChannelBinderTests { @Mock PubSubTemplate pubSubTemplate; + @Mock + PubSubAdmin pubSubAdmin; + @Mock PubSubExtendedBindingProperties properties; @Mock ConsumerDestination consumerDestination; + @Mock + ProducerDestination producerDestination; + @Mock ExtendedConsumerProperties consumerProperties; + @Mock + MessageChannel errorChannel; + + ApplicationContextRunner baseContext = new ApplicationContextRunner() + .withBean(PubSubTemplate.class, () -> pubSubTemplate) + .withBean(PubSubAdmin.class, () -> pubSubAdmin) + .withConfiguration( + AutoConfigurations.of(PubSubBinderConfiguration.class, PubSubExtendedBindingProperties.class)); + @Before public void before() { this.binder = new PubSubMessageChannelBinder(new String[0], this.channelProvisioner, this.pubSubTemplate, this.properties); + + when(producerDestination.getName()).thenReturn("test-topic"); } @Test @@ -71,4 +99,37 @@ public void testAfterUnbindConsumer() { verify(this.channelProvisioner).afterUnbindConsumer(this.consumerDestination); } + @Test + public void producerSyncPropertyFalseByDefault() { + baseContext + .run(ctx -> { + PubSubMessageChannelBinder binder = ctx.getBean(PubSubMessageChannelBinder.class); + + PubSubExtendedBindingProperties props = ctx.getBean("pubSubExtendedBindingProperties", PubSubExtendedBindingProperties.class); + PubSubMessageHandler messageHandler = (PubSubMessageHandler) binder.createProducerMessageHandler( + producerDestination, + new ExtendedProducerProperties<>(props.getExtendedProducerProperties("test")), + errorChannel + ); + assertThat(messageHandler.isSync()).isFalse(); + }); + } + + @Test + public void producerSyncPropertyPropagatesToMessageHandler() { + baseContext + .withPropertyValues("spring.cloud.stream.gcp.pubsub.default.producer.sync=true") + .run(ctx -> { + PubSubMessageChannelBinder binder = ctx.getBean(PubSubMessageChannelBinder.class); + + PubSubExtendedBindingProperties props = ctx.getBean("pubSubExtendedBindingProperties", PubSubExtendedBindingProperties.class); + PubSubMessageHandler messageHandler = (PubSubMessageHandler) binder.createProducerMessageHandler( + producerDestination, + new ExtendedProducerProperties<>(props.getExtendedProducerProperties("test")), + errorChannel + ); + assertThat(messageHandler.isSync()).isTrue(); + }); + } + }