Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Control sync/async publish in Spring Cloud Stream binder #2473

Merged
merged 3 commits into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/src/main/asciidoc/spring-stream.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ If you are using Pub/Sub auto-configuration from the Spring Cloud GCP Pub/Sub St

NOTE: To use this binder with a https://cloud.google.com/pubsub/docs/emulator[running emulator], configure its host and port via `spring.cloud.gcp.pubsub.emulator-host`.

==== Producer Synchronous Sending Configuration
By default, this binder will send messages to Cloud Pub/Sub asynchronously.
If synchronous sending is preferred (for example, to allow propagating errors back to the sender), set `spring.cloud.stream.gcp.pubsub.default.producer.sync` property to `true`.

==== Producer Destination Configuration

If automatic resource creation is turned ON and the topic corresponding to the destination name does not exist, it will be created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin

PubSubMessageHandler messageHandler = new PubSubMessageHandler(this.pubSubTemplate, destination.getName());
messageHandler.setBeanFactory(getBeanFactory());
messageHandler.setSync(producerProperties.getExtension().isSync());
return messageHandler;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@
* @author João André Martins
* @author Daniel Zou
* @author Chengyuan Zhao
* @author Elena Felder
*/
public class PubSubProducerProperties extends PubSubCommonProperties {
private boolean sync = false;

public boolean isSync() {
return sync;
}

public void setSync(boolean sync) {
this.sync = sync;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,30 @@
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.gcp.pubsub.PubSubAdmin;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.stream.binder.pubsub.config.PubSubBinderConfiguration;
import org.springframework.cloud.gcp.stream.binder.pubsub.properties.PubSubConsumerProperties;
import org.springframework.cloud.gcp.stream.binder.pubsub.properties.PubSubExtendedBindingProperties;
import org.springframework.cloud.gcp.stream.binder.pubsub.provisioning.PubSubChannelProvisioner;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.messaging.MessageChannel;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Tests for channel binder.
*
* @author Mike Eltsufin
* @author Elena Felder
*
* @since 1.1
*/
Expand All @@ -49,19 +60,36 @@ public class PubSubMessageChannelBinderTests {
@Mock
PubSubTemplate pubSubTemplate;

@Mock
PubSubAdmin pubSubAdmin;

@Mock
PubSubExtendedBindingProperties properties;

@Mock
ConsumerDestination consumerDestination;

@Mock
ProducerDestination producerDestination;

@Mock
ExtendedConsumerProperties<PubSubConsumerProperties> consumerProperties;

@Mock
MessageChannel errorChannel;

ApplicationContextRunner baseContext = new ApplicationContextRunner()
.withBean(PubSubTemplate.class, () -> pubSubTemplate)
.withBean(PubSubAdmin.class, () -> pubSubAdmin)
.withConfiguration(
AutoConfigurations.of(PubSubBinderConfiguration.class, PubSubExtendedBindingProperties.class));

@Before
public void before() {
this.binder = new PubSubMessageChannelBinder(new String[0], this.channelProvisioner, this.pubSubTemplate,
this.properties);

when(producerDestination.getName()).thenReturn("test-topic");
}

@Test
Expand All @@ -71,4 +99,37 @@ public void testAfterUnbindConsumer() {
verify(this.channelProvisioner).afterUnbindConsumer(this.consumerDestination);
}

@Test
public void producerSyncPropertyFalseByDefault() {
Copy link
Contributor

@dzou dzou Jul 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's possible, I think it would be better to set the property via spring.cloud.stream.gcp.pubsub.default.producer.sync rather than set the setting through the property class directly. Mainly to verify that spring.cloud.stream.gcp.pubsub.default.producer.sync is indeed the correct format of the property name (as I am not sure what the correct form is myself).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've been getting increasingly uncomfortable with the amount of mocking in this test. I'll either move this into an integration test, or turn this test into a light integration test with only a mocked pubsub template.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the best I could do without testing SCS implementation details. There is still one mocked transition from our properties into a SCS wrapper, but at least it tests the property mapping correctly.

baseContext
.run(ctx -> {
PubSubMessageChannelBinder binder = ctx.getBean(PubSubMessageChannelBinder.class);

PubSubExtendedBindingProperties props = ctx.getBean("pubSubExtendedBindingProperties", PubSubExtendedBindingProperties.class);
PubSubMessageHandler messageHandler = (PubSubMessageHandler) binder.createProducerMessageHandler(
producerDestination,
new ExtendedProducerProperties<>(props.getExtendedProducerProperties("test")),
errorChannel
);
assertThat(messageHandler.isSync()).isFalse();
});
}

@Test
public void producerSyncPropertyPropagatesToMessageHandler() {
baseContext
.withPropertyValues("spring.cloud.stream.gcp.pubsub.default.producer.sync=true")
.run(ctx -> {
PubSubMessageChannelBinder binder = ctx.getBean(PubSubMessageChannelBinder.class);

PubSubExtendedBindingProperties props = ctx.getBean("pubSubExtendedBindingProperties", PubSubExtendedBindingProperties.class);
PubSubMessageHandler messageHandler = (PubSubMessageHandler) binder.createProducerMessageHandler(
producerDestination,
new ExtendedProducerProperties<>(props.getExtendedProducerProperties("test")),
errorChannel
);
assertThat(messageHandler.isSync()).isTrue();
});
}

}