Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for binder customizers #186

Merged
merged 2 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.cloud.spring.pubsub.PubSubAdmin;
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;
import com.google.cloud.spring.stream.binder.pubsub.PubSubMessageChannelBinder;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubExtendedBindingProperties;
import com.google.cloud.spring.stream.binder.pubsub.provisioning.PubSubChannelProvisioner;
Expand All @@ -29,14 +31,18 @@
import org.springframework.boot.context.properties.source.ConfigurationPropertyName;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.config.BindingHandlerAdvise.MappingsProvider;
import org.springframework.cloud.stream.config.ConsumerEndpointCustomizer;
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;

/**
* Pub/Sub binder configuration.
*
* @author João André Martins
* @author Daniel Zou
* @author Mike Eltsufin
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(Binder.class)
Expand All @@ -52,10 +58,15 @@ public PubSubChannelProvisioner pubSubChannelProvisioner(PubSubAdmin pubSubAdmin
public PubSubMessageChannelBinder pubSubBinder(
PubSubChannelProvisioner pubSubChannelProvisioner,
PubSubTemplate pubSubTemplate,
PubSubExtendedBindingProperties pubSubExtendedBindingProperties) {

return new PubSubMessageChannelBinder(null, pubSubChannelProvisioner, pubSubTemplate,
PubSubExtendedBindingProperties pubSubExtendedBindingProperties,
@Nullable ProducerMessageHandlerCustomizer<PubSubMessageHandler> producerCustomizer,
@Nullable ConsumerEndpointCustomizer<PubSubInboundChannelAdapter> consumerCustomizer
) {
PubSubMessageChannelBinder binder = new PubSubMessageChannelBinder(null, pubSubChannelProvisioner, pubSubTemplate,
pubSubExtendedBindingProperties);
binder.setProducerMessageHandlerCustomizer(producerCustomizer);
binder.setConsumerEndpointCustomizer(consumerCustomizer);
return binder;
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,52 @@

package com.google.cloud.spring.stream.binder.pubsub;

import java.util.List;
import java.util.Map;

import com.google.api.gax.core.CredentialsProvider;
import com.google.auth.Credentials;
import com.google.cloud.spring.core.GcpProjectIdProvider;
import com.google.cloud.spring.pubsub.PubSubAdmin;
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubMessageSource;
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;
import com.google.cloud.spring.stream.binder.pubsub.config.PubSubBinderConfiguration;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubConsumerProperties;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubExtendedBindingProperties;
import com.google.cloud.spring.stream.binder.pubsub.provisioning.PubSubChannelProvisioner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.config.ConsumerEndpointCustomizer;
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.MessageChannel;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -52,6 +75,7 @@
*/
@RunWith(MockitoJUnitRunner.class)
public class PubSubMessageChannelBinderTests {
private static final Log LOGGER = LogFactory.getLog(PubSubMessageChannelBinderTests.class);

PubSubMessageChannelBinder binder;

Expand Down Expand Up @@ -148,4 +172,67 @@ public void consumerMaxFetchPropertyPropagatesToMessageSource() {
});
}

@Test
public void testProducerAndConsumerCustomizers() {
baseContext.withUserConfiguration(PubSubBinderTestConfig.class)
.withPropertyValues("spring.cloud.stream.bindings.input.group=testGroup")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test relies on a lot of internal Spring Cloud Stream implementation details. Would it be easier to retrieve the adapter/message source by name? Or customize ACK method and verify that instead of verifying the name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be possible, but I simply followed the test design for Kafka binder.

.run(context -> {

DirectFieldAccessor channelBindingServiceAccessor = new DirectFieldAccessor(
context.getBean(BindingService.class));
@SuppressWarnings("unchecked")
Map<String, List<Binding<MessageChannel>>> consumerBindings =
(Map<String, List<Binding<MessageChannel>>>) channelBindingServiceAccessor
.getPropertyValue("consumerBindings");
assertThat(new DirectFieldAccessor(
consumerBindings.get("input").get(0)).getPropertyValue(
"lifecycle.beanName"))
.isEqualTo("setByCustomizer:input");

@SuppressWarnings("unchecked")
Map<String, Binding<MessageChannel>> producerBindings =
(Map<String, Binding<MessageChannel>>) channelBindingServiceAccessor
.getPropertyValue("producerBindings");
assertThat(new DirectFieldAccessor(
producerBindings.get("output")).getPropertyValue(
"val$producerMessageHandler.beanName"))
.isEqualTo("setByCustomizer:output");
});
}

public interface PMS {
@Input
PollableMessageSource source();
}

@EnableBinding({ Processor.class, PMS.class })
@EnableAutoConfiguration
public static class PubSubBinderTestConfig {

@Bean
public ConsumerEndpointCustomizer<PubSubInboundChannelAdapter> consumerCustomizer() {
return (p, q, g) -> p.setBeanName("setByCustomizer:" + q);
}

@Bean
public ProducerMessageHandlerCustomizer<PubSubMessageHandler> handlerCustomizer() {
return (handler, destinationName) -> handler.setBeanName("setByCustomizer:" + destinationName);
}

@StreamListener(Sink.INPUT)
public void process(String payload) throws InterruptedException {
LOGGER.info("received: " + payload);
}

@Bean
public GcpProjectIdProvider projectIdProvider() {
return () -> "fake project";
}

@Bean
public CredentialsProvider googleCredentials() {
return () -> mock(Credentials.class);
}

}
}