Skip to content

Commit

Permalink
Add support for binder customizers (#186)
Browse files Browse the repository at this point in the history
`ConsumerEndpointCustomizer<PubSubInboundChannelAdapter>` and `ProducerMessageHandlerCustomizer<PubSubMessageHandler>` user beans can now be provided to customize the Pub/Sub binder.

Fixes: #182.
  • Loading branch information
meltsufin authored Jan 5, 2021
1 parent 2af4c27 commit b9af4f7
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.cloud.spring.pubsub.PubSubAdmin;
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;
import com.google.cloud.spring.stream.binder.pubsub.PubSubMessageChannelBinder;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubExtendedBindingProperties;
import com.google.cloud.spring.stream.binder.pubsub.provisioning.PubSubChannelProvisioner;
Expand All @@ -29,14 +31,18 @@
import org.springframework.boot.context.properties.source.ConfigurationPropertyName;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.config.BindingHandlerAdvise.MappingsProvider;
import org.springframework.cloud.stream.config.ConsumerEndpointCustomizer;
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;

/**
* Pub/Sub binder configuration.
*
* @author João André Martins
* @author Daniel Zou
* @author Mike Eltsufin
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(Binder.class)
Expand All @@ -52,10 +58,15 @@ public PubSubChannelProvisioner pubSubChannelProvisioner(PubSubAdmin pubSubAdmin
public PubSubMessageChannelBinder pubSubBinder(
PubSubChannelProvisioner pubSubChannelProvisioner,
PubSubTemplate pubSubTemplate,
PubSubExtendedBindingProperties pubSubExtendedBindingProperties) {

return new PubSubMessageChannelBinder(null, pubSubChannelProvisioner, pubSubTemplate,
PubSubExtendedBindingProperties pubSubExtendedBindingProperties,
@Nullable ProducerMessageHandlerCustomizer<PubSubMessageHandler> producerCustomizer,
@Nullable ConsumerEndpointCustomizer<PubSubInboundChannelAdapter> consumerCustomizer
) {
PubSubMessageChannelBinder binder = new PubSubMessageChannelBinder(null, pubSubChannelProvisioner, pubSubTemplate,
pubSubExtendedBindingProperties);
binder.setProducerMessageHandlerCustomizer(producerCustomizer);
binder.setConsumerEndpointCustomizer(consumerCustomizer);
return binder;
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,52 @@

package com.google.cloud.spring.stream.binder.pubsub;

import java.util.List;
import java.util.Map;

import com.google.api.gax.core.CredentialsProvider;
import com.google.auth.Credentials;
import com.google.cloud.spring.core.GcpProjectIdProvider;
import com.google.cloud.spring.pubsub.PubSubAdmin;
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubMessageSource;
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;
import com.google.cloud.spring.stream.binder.pubsub.config.PubSubBinderConfiguration;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubConsumerProperties;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubExtendedBindingProperties;
import com.google.cloud.spring.stream.binder.pubsub.provisioning.PubSubChannelProvisioner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.config.ConsumerEndpointCustomizer;
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.MessageChannel;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -52,6 +75,7 @@
*/
@RunWith(MockitoJUnitRunner.class)
public class PubSubMessageChannelBinderTests {
private static final Log LOGGER = LogFactory.getLog(PubSubMessageChannelBinderTests.class);

PubSubMessageChannelBinder binder;

Expand Down Expand Up @@ -148,4 +172,67 @@ public void consumerMaxFetchPropertyPropagatesToMessageSource() {
});
}

@Test
public void testProducerAndConsumerCustomizers() {
baseContext.withUserConfiguration(PubSubBinderTestConfig.class)
.withPropertyValues("spring.cloud.stream.bindings.input.group=testGroup")
.run(context -> {

DirectFieldAccessor channelBindingServiceAccessor = new DirectFieldAccessor(
context.getBean(BindingService.class));
@SuppressWarnings("unchecked")
Map<String, List<Binding<MessageChannel>>> consumerBindings =
(Map<String, List<Binding<MessageChannel>>>) channelBindingServiceAccessor
.getPropertyValue("consumerBindings");
assertThat(new DirectFieldAccessor(
consumerBindings.get("input").get(0)).getPropertyValue(
"lifecycle.beanName"))
.isEqualTo("setByCustomizer:input");

@SuppressWarnings("unchecked")
Map<String, Binding<MessageChannel>> producerBindings =
(Map<String, Binding<MessageChannel>>) channelBindingServiceAccessor
.getPropertyValue("producerBindings");
assertThat(new DirectFieldAccessor(
producerBindings.get("output")).getPropertyValue(
"val$producerMessageHandler.beanName"))
.isEqualTo("setByCustomizer:output");
});
}

public interface PMS {
@Input
PollableMessageSource source();
}

@EnableBinding({ Processor.class, PMS.class })
@EnableAutoConfiguration
public static class PubSubBinderTestConfig {

@Bean
public ConsumerEndpointCustomizer<PubSubInboundChannelAdapter> consumerCustomizer() {
return (p, q, g) -> p.setBeanName("setByCustomizer:" + q);
}

@Bean
public ProducerMessageHandlerCustomizer<PubSubMessageHandler> handlerCustomizer() {
return (handler, destinationName) -> handler.setBeanName("setByCustomizer:" + destinationName);
}

@StreamListener(Sink.INPUT)
public void process(String payload) throws InterruptedException {
LOGGER.info("received: " + payload);
}

@Bean
public GcpProjectIdProvider projectIdProvider() {
return () -> "fake project";
}

@Bean
public CredentialsProvider googleCredentials() {
return () -> mock(Credentials.class);
}

}
}

0 comments on commit b9af4f7

Please sign in to comment.