properties) {
+ this.eventHubOperation.setStartPosition(properties.getExtension().getStartPosition());
+ CheckpointConfig checkpointConfig =
+ CheckpointConfig.builder().checkpointMode(properties.getExtension().getCheckpointMode())
+ .checkpointCount(properties.getExtension().getCheckpointCount())
+ .checkpointInterval(properties.getExtension().getCheckpointInterval())
+ .build();
+ this.eventHubOperation.setCheckpointConfig(checkpointConfig);
+
+ boolean anonymous = !StringUtils.hasText(group);
+ if (anonymous) {
+ group = "anonymous." + UUID.randomUUID().toString();
+ this.eventHubOperation.setStartPosition(StartPosition.LATEST);
+ }
+ EventHubInboundChannelAdapter inboundAdapter =
+ new EventHubInboundChannelAdapter(destination.getName(), this.eventHubOperation, group);
+ inboundAdapter.setBeanFactory(getBeanFactory());
+ ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group, properties);
+ inboundAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
+ return inboundAdapter;
+ }
+
+ @Override
+ public EventHubConsumerProperties getExtendedConsumerProperties(String destination) {
+ return this.bindingProperties.getExtendedConsumerProperties(destination);
+ }
+
+ @Override
+ public EventHubProducerProperties getExtendedProducerProperties(String destination) {
+ return this.bindingProperties.getExtendedProducerProperties(destination);
+ }
+
+ @Override
+ public String getDefaultsPrefix() {
+ return this.bindingProperties.getDefaultsPrefix();
+ }
+
+ @Override
+ public Class extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
+ return this.bindingProperties.getExtendedPropertiesEntryClass();
+ }
+
+ public void setBindingProperties(EventHubExtendedBindingProperties bindingProperties) {
+ this.bindingProperties = bindingProperties;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/config/EventHubBinderConfiguration.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/config/EventHubBinderConfiguration.java
new file mode 100644
index 0000000000000..fa76bfeeb7d6b
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/config/EventHubBinderConfiguration.java
@@ -0,0 +1,70 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder.config;
+
+import com.microsoft.azure.eventhub.stream.binder.EventHubMessageChannelBinder;
+import com.microsoft.azure.eventhub.stream.binder.properties.EventHubExtendedBindingProperties;
+import com.microsoft.azure.eventhub.stream.binder.provisioning.EventHubChannelProvisioner;
+import com.microsoft.azure.eventhub.stream.binder.provisioning.EventHubChannelResourceManagerProvisioner;
+import com.microsoft.azure.spring.cloud.autoconfigure.context.AzureEnvironmentAutoConfiguration;
+import com.microsoft.azure.spring.cloud.autoconfigure.eventhub.AzureEventHubAutoConfiguration;
+import com.microsoft.azure.spring.cloud.autoconfigure.eventhub.AzureEventHubProperties;
+import com.microsoft.azure.spring.cloud.autoconfigure.eventhub.EventHubUtils;
+import com.microsoft.azure.spring.cloud.context.core.api.ResourceManagerProvider;
+import com.microsoft.azure.spring.cloud.telemetry.TelemetryCollector;
+import com.microsoft.azure.spring.integration.eventhub.api.EventHubOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cloud.stream.binder.Binder;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * @author Warren Zhu
+ */
+@Configuration
+@ConditionalOnMissingBean(Binder.class)
+@Import({AzureEventHubAutoConfiguration.class, AzureEnvironmentAutoConfiguration.class})
+@EnableConfigurationProperties({AzureEventHubProperties.class, EventHubExtendedBindingProperties.class})
+public class EventHubBinderConfiguration {
+
+ private static final String EVENT_HUB_BINDER = "EventHubBinder";
+ private static final String NAMESPACE = "Namespace";
+
+ @Autowired(required = false)
+ private ResourceManagerProvider resourceManagerProvider;
+
+ @PostConstruct
+ public void collectTelemetry() {
+ TelemetryCollector.getInstance().addService(EVENT_HUB_BINDER);
+ }
+
+ @Bean
+ @ConditionalOnMissingBean
+ public EventHubChannelProvisioner eventHubChannelProvisioner(AzureEventHubProperties eventHubProperties) {
+ if (resourceManagerProvider != null) {
+ return new EventHubChannelResourceManagerProvisioner(resourceManagerProvider,
+ eventHubProperties.getNamespace());
+ } else {
+ TelemetryCollector.getInstance().addProperty(EVENT_HUB_BINDER, NAMESPACE,
+ EventHubUtils.getNamespace(eventHubProperties.getConnectionString()));
+ }
+
+ return new EventHubChannelProvisioner();
+ }
+
+ @Bean
+ @ConditionalOnMissingBean
+ public EventHubMessageChannelBinder eventHubBinder(EventHubChannelProvisioner eventHubChannelProvisioner,
+ EventHubOperation eventHubOperation, EventHubExtendedBindingProperties bindingProperties) {
+ EventHubMessageChannelBinder binder =
+ new EventHubMessageChannelBinder(null, eventHubChannelProvisioner, eventHubOperation);
+ binder.setBindingProperties(bindingProperties);
+ return binder;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubBindingProperties.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubBindingProperties.java
new file mode 100644
index 0000000000000..8106ef9a9f359
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubBindingProperties.java
@@ -0,0 +1,30 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder.properties;
+
+import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
+
+/**
+ * @author Warren Zhu
+ */
+public class EventHubBindingProperties implements BinderSpecificPropertiesProvider {
+ private EventHubConsumerProperties consumer = new EventHubConsumerProperties();
+ private EventHubProducerProperties producer = new EventHubProducerProperties();
+
+ public EventHubConsumerProperties getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(EventHubConsumerProperties consumer) {
+ this.consumer = consumer;
+ }
+
+ public EventHubProducerProperties getProducer() {
+ return producer;
+ }
+
+ public void setProducer(EventHubProducerProperties producer) {
+ this.producer = producer;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubConsumerProperties.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubConsumerProperties.java
new file mode 100644
index 0000000000000..88379d07cdc8e
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubConsumerProperties.java
@@ -0,0 +1,79 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder.properties;
+
+import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
+import com.microsoft.azure.spring.integration.core.api.StartPosition;
+
+import java.time.Duration;
+
+/**
+ * @author Warren Zhu
+ */
+public class EventHubConsumerProperties {
+ /**
+ * Whether the consumer receives messages from the beginning or end of event hub.
+ * If {@link StartPosition#EARLIEST}, from beginning. If {@link StartPosition#LATEST}, from end.
+ *
+ * Default: {@link StartPosition#LATEST}
+ */
+ private StartPosition startPosition = StartPosition.LATEST;
+
+ /**
+ * Checkpoint mode used when consumer decide how to checkpoint message
+ *
+ * Default: {@link CheckpointMode#BATCH}
+ */
+ private CheckpointMode checkpointMode = CheckpointMode.BATCH;
+
+ /**
+ * Effectively only when {@link CheckpointMode#PARTITION_COUNT}.
+ * Decides the amount of message for each partition to do one checkpoint
+ *
+ *
+ * Default : 10
+ */
+ private int checkpointCount = 10;
+
+ /**
+ * Effectively only when {@link CheckpointMode#TIME}.
+ * Decides the time interval to do one checkpoint
+ *
+ *
+ * Default : 5s
+ */
+ private Duration checkpointInterval = Duration.ofSeconds(5);
+
+ public StartPosition getStartPosition() {
+ return startPosition;
+ }
+
+ public void setStartPosition(StartPosition startPosition) {
+ this.startPosition = startPosition;
+ }
+
+ public CheckpointMode getCheckpointMode() {
+ return checkpointMode;
+ }
+
+ public void setCheckpointMode(CheckpointMode checkpointMode) {
+ this.checkpointMode = checkpointMode;
+ }
+
+ public int getCheckpointCount() {
+ return checkpointCount;
+ }
+
+ public void setCheckpointCount(int checkpointCount) {
+ this.checkpointCount = checkpointCount;
+ }
+
+ public Duration getCheckpointInterval() {
+ return checkpointInterval;
+ }
+
+ public void setCheckpointInterval(Duration checkpointInterval) {
+ this.checkpointInterval = checkpointInterval;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubExtendedBindingProperties.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubExtendedBindingProperties.java
new file mode 100644
index 0000000000000..d842d126239ac
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubExtendedBindingProperties.java
@@ -0,0 +1,58 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder.properties;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
+import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Warren Zhu
+ */
+@ConfigurationProperties("spring.cloud.stream.eventhub")
+public class EventHubExtendedBindingProperties
+ implements ExtendedBindingProperties {
+ private static final String DEFAULTS_PREFIX = "spring.cloud.stream.eventhub.default";
+ private Map bindings = new ConcurrentHashMap<>();
+ private String checkpointStorageAccount;
+
+ @Override
+ public EventHubConsumerProperties getExtendedConsumerProperties(String channelName) {
+ return this.bindings.computeIfAbsent(channelName, key -> new EventHubBindingProperties()).getConsumer();
+ }
+
+ @Override
+ public EventHubProducerProperties getExtendedProducerProperties(String channelName) {
+ return this.bindings.computeIfAbsent(channelName, key -> new EventHubBindingProperties()).getProducer();
+ }
+
+ @Override
+ public String getDefaultsPrefix() {
+ return DEFAULTS_PREFIX;
+ }
+
+ @Override
+ public Class extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
+ return EventHubBindingProperties.class;
+ }
+
+ public String getCheckpointStorageAccount() {
+ return checkpointStorageAccount;
+ }
+
+ public void setCheckpointStorageAccount(String checkpointStorageAccount) {
+ this.checkpointStorageAccount = checkpointStorageAccount;
+ }
+
+ public Map getBindings() {
+ return bindings;
+ }
+
+ public void setBindings(Map bindings) {
+ this.bindings = bindings;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubProducerProperties.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubProducerProperties.java
new file mode 100644
index 0000000000000..22a1e641ccfb0
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/properties/EventHubProducerProperties.java
@@ -0,0 +1,42 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder.properties;
+
+/**
+ * @author Warren Zhu
+ */
+public class EventHubProducerProperties {
+ /**
+ * Whether the producer should act in a synchronous manner with respect to sending messages into destination.
+ * If true, the producer will wait for a response from Event Hub after a send operation before sending next message.
+ * If false, the producer will keep sending without waiting response
+ *
+ * Default: true
+ */
+ private boolean sync;
+
+ /**
+ * Effective only if sync is set to true.
+ * The amount of time to wait for a response from Event Hub after a send operation, in milliseconds.
+ *
+ * Default: 10000
+ */
+ private long sendTimeout = 10000;
+
+ public boolean isSync() {
+ return sync;
+ }
+
+ public void setSync(boolean sync) {
+ this.sync = sync;
+ }
+
+ public long getSendTimeout() {
+ return sendTimeout;
+ }
+
+ public void setSendTimeout(long sendTimeout) {
+ this.sendTimeout = sendTimeout;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubChannelProvisioner.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubChannelProvisioner.java
new file mode 100644
index 0000000000000..92bb8574b40b4
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubChannelProvisioner.java
@@ -0,0 +1,43 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder.provisioning;
+
+import com.microsoft.azure.eventhub.stream.binder.properties.EventHubConsumerProperties;
+import com.microsoft.azure.eventhub.stream.binder.properties.EventHubProducerProperties;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
+import org.springframework.cloud.stream.provisioning.ConsumerDestination;
+import org.springframework.cloud.stream.provisioning.ProducerDestination;
+import org.springframework.cloud.stream.provisioning.ProvisioningException;
+import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
+
+/**
+ * @author Warren Zhu
+ */
+public class EventHubChannelProvisioner implements
+ ProvisioningProvider,
+ ExtendedProducerProperties> {
+
+ @Override
+ public ProducerDestination provisionProducerDestination(String name,
+ ExtendedProducerProperties properties) throws ProvisioningException {
+ validateOrCreateForProducer(name);
+ return new EventHubProducerDestination(name);
+ }
+
+ @Override
+ public ConsumerDestination provisionConsumerDestination(String name, String group,
+ ExtendedConsumerProperties properties) throws ProvisioningException {
+ validateOrCreateForConsumer(name, group);
+ return new EventHubConsumerDestination(name);
+ }
+
+ protected void validateOrCreateForConsumer(String name, String group) {
+ // no-op
+ }
+
+ protected void validateOrCreateForProducer(String name) {
+ // no-op
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubChannelResourceManagerProvisioner.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubChannelResourceManagerProvisioner.java
new file mode 100644
index 0000000000000..1e68dedf6bacc
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubChannelResourceManagerProvisioner.java
@@ -0,0 +1,50 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder.provisioning;
+
+import com.microsoft.azure.management.eventhub.EventHub;
+import com.microsoft.azure.management.eventhub.EventHubNamespace;
+import com.microsoft.azure.spring.cloud.context.core.api.ResourceManagerProvider;
+import com.microsoft.azure.spring.cloud.context.core.util.Tuple;
+import org.springframework.cloud.stream.provisioning.ProvisioningException;
+import org.springframework.lang.NonNull;
+import org.springframework.util.Assert;
+
+/**
+ * @author Warren Zhu
+ */
+public class EventHubChannelResourceManagerProvisioner extends EventHubChannelProvisioner {
+ private final String namespace;
+ private final ResourceManagerProvider resourceManagerProvider;
+
+ public EventHubChannelResourceManagerProvisioner(@NonNull ResourceManagerProvider resourceManagerProvider,
+ @NonNull String namespace) {
+ Assert.hasText(namespace, "The namespace can't be null or empty");
+ this.namespace = namespace;
+ this.resourceManagerProvider = resourceManagerProvider;
+ }
+
+ @Override
+ protected void validateOrCreateForConsumer(String name, String group) {
+ EventHubNamespace eventHubNamespace =
+ this.resourceManagerProvider.getEventHubNamespaceManager().getOrCreate(namespace);
+ EventHub eventHub = this.resourceManagerProvider.getEventHubManager().get(Tuple.of(eventHubNamespace, name));
+ if (eventHub == null) {
+ throw new ProvisioningException(
+ String.format("Event hub with name '%s' in namespace '%s' not existed", name, namespace));
+ }
+
+ this.resourceManagerProvider.getEventHubConsumerGroupManager().getOrCreate(Tuple.of(eventHub, group));
+ }
+
+ @Override
+ protected void validateOrCreateForProducer(String name) {
+ if (resourceManagerProvider == null) {
+ return;
+ }
+ EventHubNamespace eventHubNamespace =
+ this.resourceManagerProvider.getEventHubNamespaceManager().getOrCreate(namespace);
+ this.resourceManagerProvider.getEventHubManager().getOrCreate(Tuple.of(eventHubNamespace, name));
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubConsumerDestination.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubConsumerDestination.java
new file mode 100644
index 0000000000000..b85d6d5e62fc2
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubConsumerDestination.java
@@ -0,0 +1,23 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder.provisioning;
+
+import org.springframework.cloud.stream.provisioning.ConsumerDestination;
+
+/**
+ * @author Warren Zhu
+ */
+public class EventHubConsumerDestination implements ConsumerDestination {
+
+ private String name;
+
+ public EventHubConsumerDestination(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubProducerDestination.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubProducerDestination.java
new file mode 100644
index 0000000000000..8e9590dabfe12
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/java/com/microsoft/azure/eventhub/stream/binder/provisioning/EventHubProducerDestination.java
@@ -0,0 +1,28 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder.provisioning;
+
+import org.springframework.cloud.stream.provisioning.ProducerDestination;
+
+/**
+ * @author Warren Zhu
+ */
+public class EventHubProducerDestination implements ProducerDestination {
+
+ private String name;
+
+ public EventHubProducerDestination(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public String getNameForPartition(int partition) {
+ return this.name + "-" + partition;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/resources/META-INF/spring.binders b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/resources/META-INF/spring.binders
new file mode 100644
index 0000000000000..4f388e15a99df
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/main/resources/META-INF/spring.binders
@@ -0,0 +1 @@
+eventhub: com.microsoft.azure.eventhub.stream.binder.config.EventHubBinderConfiguration
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/EventHubPartitionBinderTests.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/EventHubPartitionBinderTests.java
new file mode 100644
index 0000000000000..5f7a7ea5423c5
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/EventHubPartitionBinderTests.java
@@ -0,0 +1,86 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder;
+
+import com.azure.messaging.eventhubs.models.EventContext;
+import com.azure.messaging.eventhubs.models.PartitionContext;
+import com.microsoft.azure.eventhub.stream.binder.properties.EventHubConsumerProperties;
+import com.microsoft.azure.eventhub.stream.binder.properties.EventHubProducerProperties;
+import com.microsoft.azure.servicebus.stream.binder.test.AzurePartitionBinderTests;
+import com.microsoft.azure.spring.integration.core.api.StartPosition;
+import com.microsoft.azure.spring.integration.eventhub.api.EventHubClientFactory;
+import com.microsoft.azure.spring.integration.eventhub.support.EventHubTestOperation;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
+import org.springframework.cloud.stream.binder.HeaderMode;
+import reactor.core.publisher.Mono;
+
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases are defined in super class
+ *
+ * @author Warren Zhu
+ */
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*"})
+@PrepareForTest(EventContext.class)
+public class EventHubPartitionBinderTests extends
+ AzurePartitionBinderTests,
+ ExtendedProducerProperties> {
+
+ @Mock
+ EventHubClientFactory clientFactory;
+
+ @Mock
+ EventContext eventContext;
+
+ @Mock
+ PartitionContext partitionContext;
+
+
+ private EventHubTestBinder binder;
+
+ @Before
+ public void setUp() {
+ when(this.eventContext.updateCheckpointAsync()).thenReturn(Mono.empty());
+ when(this.eventContext.getPartitionContext()).thenReturn(this.partitionContext);
+ when(this.partitionContext.getPartitionId()).thenReturn("1");
+
+ this.binder = new EventHubTestBinder(new EventHubTestOperation(clientFactory, () -> eventContext));
+ }
+
+ @Override
+ protected String getClassUnderTestName() {
+ return EventHubTestBinder.class.getSimpleName();
+ }
+
+ @Override
+ protected EventHubTestBinder getBinder() throws Exception {
+ return this.binder;
+ }
+
+ @Override
+ protected ExtendedConsumerProperties createConsumerProperties() {
+ ExtendedConsumerProperties properties =
+ new ExtendedConsumerProperties<>(new EventHubConsumerProperties());
+ properties.setHeaderMode(HeaderMode.embeddedHeaders);
+ properties.getExtension().setStartPosition(StartPosition.EARLIEST);
+ return properties;
+ }
+
+ @Override
+ protected ExtendedProducerProperties createProducerProperties() {
+ ExtendedProducerProperties properties =
+ new ExtendedProducerProperties<>(new EventHubProducerProperties());
+ properties.setHeaderMode(HeaderMode.embeddedHeaders);
+ return properties;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/EventHubTestBinder.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/EventHubTestBinder.java
new file mode 100644
index 0000000000000..579303274ea0a
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/EventHubTestBinder.java
@@ -0,0 +1,38 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder;
+
+import com.microsoft.azure.eventhub.stream.binder.properties.EventHubConsumerProperties;
+import com.microsoft.azure.eventhub.stream.binder.properties.EventHubProducerProperties;
+import com.microsoft.azure.eventhub.stream.binder.provisioning.EventHubChannelProvisioner;
+import com.microsoft.azure.spring.integration.eventhub.api.EventHubOperation;
+import org.springframework.cloud.stream.binder.AbstractTestBinder;
+import org.springframework.cloud.stream.binder.BinderHeaders;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
+import org.springframework.context.support.GenericApplicationContext;
+
+/**
+ * @author Warren Zhu
+ */
+
+public class EventHubTestBinder extends
+ AbstractTestBinder,
+ ExtendedProducerProperties> {
+
+ EventHubTestBinder(EventHubOperation eventHubOperation) {
+ EventHubMessageChannelBinder binder =
+ new EventHubMessageChannelBinder(BinderHeaders.STANDARD_HEADERS, new EventHubChannelProvisioner(),
+ eventHubOperation);
+ GenericApplicationContext context = new GenericApplicationContext();
+ binder.setApplicationContext(context);
+ this.setBinder(binder);
+ }
+
+ @Override
+ public void cleanup() {
+ // No-op
+ }
+
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/integration/EventHubBinderBatchModeIT.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/integration/EventHubBinderBatchModeIT.java
new file mode 100644
index 0000000000000..9f5e84b0596b4
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/integration/EventHubBinderBatchModeIT.java
@@ -0,0 +1,49 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder.integration;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.cloud.stream.messaging.Sink;
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.messaging.support.GenericMessage;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = EventHubBinderBatchModeIT.TestConfig.class)
+@TestPropertySource(locations = "classpath:application-test.properties",
+ properties = "spring.cloud.stream.eventhub.bindings.input.consumer.checkpoint-mode=BATCH")
+public class EventHubBinderBatchModeIT {
+
+ private static String message = UUID.randomUUID().toString();
+
+ @Autowired
+ Source source;
+
+ @Test
+ public void testSendAndReceiveMessage() {
+ this.source.output().send(new GenericMessage<>(message));
+ }
+
+ @EnableBinding({Source.class, Sink.class})
+ @EnableAutoConfiguration
+ public static class TestConfig {
+
+ @StreamListener(Sink.INPUT)
+ public void handleMessage(String message) {
+ assertThat(message.equals(EventHubBinderBatchModeIT.message)).isTrue();
+ }
+
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/integration/EventHubBinderManualModeIT.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/integration/EventHubBinderManualModeIT.java
new file mode 100644
index 0000000000000..a6236e040724a
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/integration/EventHubBinderManualModeIT.java
@@ -0,0 +1,55 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder.integration;
+
+import com.microsoft.azure.spring.integration.core.AzureHeaders;
+import com.microsoft.azure.spring.integration.core.api.Checkpointer;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.cloud.stream.messaging.Sink;
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.support.GenericMessage;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = EventHubBinderManualModeIT.TestConfig.class)
+@TestPropertySource(locations = "classpath:application-test.properties",
+ properties = "spring.cloud.stream.eventhub.bindings.input.consumer.checkpoint-mode=MANUAL")
+public class EventHubBinderManualModeIT {
+
+ private static String message = UUID.randomUUID().toString();
+ @Autowired
+ Source source;
+
+ @Test
+ public void testSendAndReceiveMessage() {
+ this.source.output().send(new GenericMessage<>(message));
+ }
+
+ @EnableBinding({Source.class, Sink.class})
+ @EnableAutoConfiguration
+ public static class TestConfig {
+
+ @StreamListener(Sink.INPUT)
+ public void handleMessage(String message, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
+ assertThat(message.equals(EventHubBinderManualModeIT.message)).isTrue();
+ checkpointer.success().handle((r, ex) -> {
+ assertThat(ex == null).isTrue();
+ return null;
+ });
+ }
+
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/integration/EventHubBinderRecordModeIT.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/integration/EventHubBinderRecordModeIT.java
new file mode 100644
index 0000000000000..f5f1523164101
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/integration/EventHubBinderRecordModeIT.java
@@ -0,0 +1,48 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder.integration;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.cloud.stream.messaging.Sink;
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.messaging.support.GenericMessage;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = EventHubBinderRecordModeIT.TestConfig.class)
+@TestPropertySource(locations = "classpath:application-test.properties",
+ properties = "spring.cloud.stream.eventhub.bindings.input.consumer.checkpoint-mode=RECORD")
+public class EventHubBinderRecordModeIT {
+
+ private static String message = UUID.randomUUID().toString();
+ @Autowired
+ Source source;
+
+ @Test
+ public void testSendAndReceiveMessage() {
+ this.source.output().send(new GenericMessage<>(message));
+ }
+
+ @EnableBinding({Source.class, Sink.class})
+ @EnableAutoConfiguration
+ public static class TestConfig {
+
+ @StreamListener(Sink.INPUT)
+ public void handleMessage(String message) {
+ assertThat(message.equals(EventHubBinderRecordModeIT.message)).isTrue();
+ }
+
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/integration/EventHubBinderSyncModeIT.java b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/integration/EventHubBinderSyncModeIT.java
new file mode 100644
index 0000000000000..639f52b19caf9
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/java/com/microsoft/azure/eventhub/stream/binder/integration/EventHubBinderSyncModeIT.java
@@ -0,0 +1,48 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.eventhub.stream.binder.integration;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.cloud.stream.messaging.Sink;
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.messaging.support.GenericMessage;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = EventHubBinderSyncModeIT.TestConfig.class)
+@TestPropertySource(locations = "classpath:application-test.properties",
+ properties = "spring.cloud.stream.eventhub.bindings.input.producer.sync=true")
+public class EventHubBinderSyncModeIT {
+
+ private static String message = UUID.randomUUID().toString();
+ @Autowired
+ Source source;
+
+ @Test
+ public void testSendAndReceiveMessage() {
+ this.source.output().send(new GenericMessage<>(message));
+ }
+
+ @EnableBinding({Source.class, Sink.class})
+ @EnableAutoConfiguration
+ public static class TestConfig {
+
+ @StreamListener(Sink.INPUT)
+ public void handleMessage(String message) {
+ assertThat(message.equals(EventHubBinderSyncModeIT.message)).isTrue();
+ }
+
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/resources/application-test.properties b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/resources/application-test.properties
new file mode 100644
index 0000000000000..b3fd93bb52274
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/resources/application-test.properties
@@ -0,0 +1,14 @@
+spring.cloud.azure.credential-file-path=file:@credential@
+spring.cloud.azure.resource-group=spring-cloud
+spring.cloud.azure.eventhub.namespace=spring-cloud-azure
+
+spring.cloud.azure.eventhub.checkpoint-storage-account=springcloudcheckpoint
+
+spring.cloud.azure.region=westUS
+spring.cloud.azure.auto-create-resources=true
+
+spring.cloud.stream.bindings.input.destination=eventhub1
+spring.cloud.stream.bindings.input.group=cg1
+spring.cloud.stream.bindings.output.destination=eventhub1
+
+spring.main.banner-mode=off
diff --git a/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/resources/logback-test.xml b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/resources/logback-test.xml
new file mode 100644
index 0000000000000..300f34b41c279
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-eventhubs-stream-binder/src/test/resources/logback-test.xml
@@ -0,0 +1,12 @@
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
diff --git a/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/CHANGELOG.md b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/CHANGELOG.md
new file mode 100644
index 0000000000000..43b25529c46ce
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/CHANGELOG.md
@@ -0,0 +1,3 @@
+# Release History
+
+## 1.2.8-beta.1 (Unreleased)
diff --git a/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/README.md b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/README.md
new file mode 100644
index 0000000000000..564f480e47ffa
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/README.md
@@ -0,0 +1,118 @@
+# Spring Cloud Azure Service Bus Queue Stream Binder client library for Java
+
+The project provides **Spring Cloud Stream Binder for Azure Service Bus Queue** which allows you to build message-driven
+microservice using **Spring Cloud Stream** based on [Azure Service Bus Queue](https://azure.microsoft.com/en-us/services/service-bus/) service.
+
+## Key concepts
+
+### Service Bus Queue Binder Overview
+
+The Spring Cloud Stream Binder for Azure Service Bus Queue provides the binding implementation for the Spring Cloud Stream.
+This implementation uses Spring Integration Service Bus Queue Channel Adapters at its foundation.
+
+#### Scheduled Message
+
+This binder supports submitting messages to a queue for delayed processing. Users can send scheduled messages with header `x-delay`
+expressing in milliseconds a delay time for the message. The message will be delivered to the respective queues after `x-delay` milliseconds.
+#### Consumer Group
+
+This binder has no consumer group support since all consumers share one queue.
+
+#### Partitioning Support
+
+This binder has no partition support even service bus queue supports partition.
+
+## Getting started
+
+## Examples
+
+Please use this [sample](../../spring-cloud-azure-samples/servicebus-queue-binder-sample/) as a reference
+for how to use this binder in your projects.
+
+### Feature List
+
+- [Dependency Management](#dependency-management)
+- [Configuration Options](#configuration-options)
+
+#### Dependency Management
+
+**Maven Coordinates**
+```
+
+ com.microsoft.azure
+ spring-cloud-azure-servicebus-queue-stream-binder
+
+
+```
+**Gradle Coordinates**
+```
+dependencies {
+ compile group: 'com.microsoft.azure', name: 'spring-cloud-azure-servicebus-queue-stream-binder'
+}
+```
+
+#### Configuration Options
+
+The binder provides the following configuration options in `application.properties`.
+
+##### Spring Cloud Azure Properties #####
+
+Name | Description | Required | Default
+---|---|---|---
+spring.cloud.azure.credential-file-path | Location of azure credential file | Yes |
+spring.cloud.azure.resource-group | Name of Azure resource group | Yes |
+spring.cloud.azure.region | Region name of the Azure resource group, e.g. westus | Yes |
+spring.cloud.azure.servicebus.namespace | Service Bus Namespace. Auto creating if missing | Yes |
+
+##### Serivce Bus Queue Producer Properties #####
+
+It supports the following configurations with the format of `spring.cloud.stream.servicebus.queue.bindings..producer`.
+
+**_sync_**
+
+Whether the producer should act in a synchronous manner with respect to writing messages into a stream. If true, the
+producer will wait for a response after a send operation.
+
+Default: `false`
+
+**_send-timeout_**
+
+Effective only if `sync` is set to true. The amount of time to wait for a response after a send operation, in milliseconds.
+
+Default: `10000`
+
+##### Service Bus Queue Consumer Properties #####
+
+It supports the following configurations with the format of `spring.cloud.stream.servicebus.queue.bindings..consumer`.
+
+**_checkpoint-mode_**
+
+The mode in which checkpoints are updated.
+
+`RECORD`, checkpoints occur after each record successfully processed by user-defined message handler without any exception.
+
+`MANUAL`, checkpoints occur on demand by the user via the `Checkpointer`. You can get `Checkpointer` by `Message.getHeaders.get(AzureHeaders.CHECKPOINTER)`callback.
+
+Default: `RECORD`
+
+**_prefetch-count_**
+
+Prefetch count of underlying service bus client.
+
+Default: `1`
+
+**_concurrency_**
+
+Controls the max concurrent calls of service bus message handler and the size of fixed thread pool that handles user's business logic
+
+Default: `1`
+
+**_sessionsEnabled_**
+
+Controls if is a session aware consumer. Set it to `true` if is a queue with sessions enabled.
+
+Default: `false`
+
+## Troubleshooting
+## Next steps
+## Contributing
diff --git a/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/pom.xml b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/pom.xml
new file mode 100644
index 0000000000000..0ca61155429bc
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/pom.xml
@@ -0,0 +1,68 @@
+
+
+
+ com.azure
+ azure-client-sdk-parent
+ 1.7.0
+ ../../parents/azure-client-sdk-parent
+
+ 4.0.0
+
+ com.microsoft.azure
+ spring-cloud-azure-servicebus-queue-stream-binder
+ 1.2.8-beta.1
+
+ Azure Spring Cloud Stream Service Bus Queue Binder
+ Azure Service Bus Queue binder for Spring Cloud Stream
+ https://github.com/Azure/azure-sdk-for-java
+
+
+ 0.10
+ 0.00
+
+
+
+
+ com.microsoft.azure
+ spring-cloud-azure-servicebus-stream-binder-core
+ 1.2.8-beta.1
+
+
+
+ com.microsoft.azure
+ spring-cloud-azure-stream-binder-test
+ 1.2.8-beta.1
+ test
+
+
+
+ com.google.code.findbugs
+ jsr305
+ 3.0.2
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 3.0.0-M3
+
+
+
+
+ com.microsoft.azure:spring-cloud-azure-servicebus-stream-binder-core:[1.2.8-beta.1]
+
+
+
+
+
+
+
+
diff --git a/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusQueueMessageChannelBinder.java b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusQueueMessageChannelBinder.java
new file mode 100644
index 0000000000000..b8da42f06f44b
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusQueueMessageChannelBinder.java
@@ -0,0 +1,51 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder;
+
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusConsumerProperties;
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusQueueExtendedBindingProperties;
+import com.microsoft.azure.servicebus.stream.binder.provisioning.ServiceBusChannelProvisioner;
+import com.microsoft.azure.spring.integration.core.api.SendOperation;
+import com.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusQueueInboundChannelAdapter;
+import com.microsoft.azure.spring.integration.servicebus.queue.ServiceBusQueueOperation;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.provisioning.ConsumerDestination;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.lang.NonNull;
+
+/**
+ * @author Warren Zhu
+ */
+public class ServiceBusQueueMessageChannelBinder extends
+ ServiceBusMessageChannelBinder {
+
+ private final ServiceBusQueueOperation serviceBusQueueOperation;
+
+ public ServiceBusQueueMessageChannelBinder(String[] headersToEmbed,
+ @NonNull ServiceBusChannelProvisioner provisioningProvider,
+ @NonNull ServiceBusQueueOperation serviceBusQueueOperation) {
+ super(headersToEmbed, provisioningProvider);
+ this.serviceBusQueueOperation = serviceBusQueueOperation;
+ this.bindingProperties = new ServiceBusQueueExtendedBindingProperties();
+ }
+
+ @Override
+ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
+ ExtendedConsumerProperties properties) {
+
+ this.serviceBusQueueOperation.setCheckpointConfig(buildCheckpointConfig(properties));
+ this.serviceBusQueueOperation.setClientConfig(buildClientConfig(properties));
+ ServiceBusQueueInboundChannelAdapter inboundAdapter =
+ new ServiceBusQueueInboundChannelAdapter(destination.getName(), this.serviceBusQueueOperation);
+ inboundAdapter.setBeanFactory(getBeanFactory());
+ ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group, properties);
+ inboundAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
+ return inboundAdapter;
+ }
+
+ @Override
+ SendOperation getSendOperation() {
+ return this.serviceBusQueueOperation;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/config/ServiceBusQueueBinderConfiguration.java b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/config/ServiceBusQueueBinderConfiguration.java
new file mode 100644
index 0000000000000..2ca0b56dcb748
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/config/ServiceBusQueueBinderConfiguration.java
@@ -0,0 +1,77 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.config;
+
+import com.microsoft.azure.servicebus.stream.binder.ServiceBusQueueMessageChannelBinder;
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusQueueExtendedBindingProperties;
+import com.microsoft.azure.servicebus.stream.binder.provisioning.ServiceBusChannelProvisioner;
+import com.microsoft.azure.servicebus.stream.binder.provisioning.ServiceBusQueueChannelResourceManagerProvisioner;
+import com.microsoft.azure.spring.cloud.autoconfigure.context.AzureEnvironmentAutoConfiguration;
+import com.microsoft.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusProperties;
+import com.microsoft.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusQueueAutoConfiguration;
+import com.microsoft.azure.spring.cloud.autoconfigure.servicebus.ServiceBusUtils;
+import com.microsoft.azure.spring.cloud.context.core.api.ResourceManagerProvider;
+import com.microsoft.azure.spring.cloud.telemetry.TelemetryCollector;
+import com.microsoft.azure.spring.integration.servicebus.queue.ServiceBusQueueOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cloud.stream.binder.Binder;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * @author Warren Zhu
+ */
+@Configuration
+@ConditionalOnMissingBean(Binder.class)
+@Import({AzureServiceBusQueueAutoConfiguration.class, AzureEnvironmentAutoConfiguration.class})
+@EnableConfigurationProperties({AzureServiceBusProperties.class, ServiceBusQueueExtendedBindingProperties.class})
+public class ServiceBusQueueBinderConfiguration {
+
+ private static final String SERVICE_BUS_QUEUE_BINDER = "ServiceBusQueueBinder";
+ private static final String NAMESPACE = "Namespace";
+
+ @Autowired(required = false)
+ private ResourceManagerProvider resourceManagerProvider;
+
+ @PostConstruct
+ public void collectTelemetry() {
+ TelemetryCollector.getInstance().addService(SERVICE_BUS_QUEUE_BINDER);
+ }
+
+ @Bean
+ @ConditionalOnBean(ResourceManagerProvider.class)
+ @ConditionalOnMissingBean
+ public ServiceBusChannelProvisioner serviceBusChannelProvisioner(AzureServiceBusProperties serviceBusProperties) {
+ if (this.resourceManagerProvider != null) {
+ return new ServiceBusQueueChannelResourceManagerProvisioner(resourceManagerProvider,
+ serviceBusProperties.getNamespace());
+ } else {
+ TelemetryCollector.getInstance().addProperty(SERVICE_BUS_QUEUE_BINDER, NAMESPACE,
+ ServiceBusUtils.getNamespace(serviceBusProperties.getConnectionString()));
+ }
+ return new ServiceBusChannelProvisioner();
+ }
+
+ @Bean
+ @ConditionalOnMissingBean({ResourceManagerProvider.class, ServiceBusChannelProvisioner.class})
+ public ServiceBusChannelProvisioner serviceBusChannelProvisionerWithResourceManagerProvider() {
+ return new ServiceBusChannelProvisioner();
+ }
+
+ @Bean
+ public ServiceBusQueueMessageChannelBinder serviceBusQueueBinder(
+ ServiceBusChannelProvisioner queueChannelProvisioner, ServiceBusQueueOperation serviceBusQueueOperation,
+ ServiceBusQueueExtendedBindingProperties bindingProperties) {
+ ServiceBusQueueMessageChannelBinder binder =
+ new ServiceBusQueueMessageChannelBinder(null, queueChannelProvisioner, serviceBusQueueOperation);
+ binder.setBindingProperties(bindingProperties);
+ return binder;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusQueueExtendedBindingProperties.java b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusQueueExtendedBindingProperties.java
new file mode 100644
index 0000000000000..76061921f1fe7
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusQueueExtendedBindingProperties.java
@@ -0,0 +1,19 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.properties;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * @author Warren Zhu
+ */
+@ConfigurationProperties("spring.cloud.stream.servicebus.queue")
+public class ServiceBusQueueExtendedBindingProperties extends ServiceBusExtendedBindingProperties {
+ private static final String DEFAULTS_PREFIX = "spring.cloud.stream.servicebus.queue.default";
+
+ @Override
+ public String getDefaultsPrefix() {
+ return DEFAULTS_PREFIX;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusQueueChannelResourceManagerProvisioner.java b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusQueueChannelResourceManagerProvisioner.java
new file mode 100644
index 0000000000000..ed60343c779a1
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusQueueChannelResourceManagerProvisioner.java
@@ -0,0 +1,46 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.provisioning;
+
+import com.microsoft.azure.management.servicebus.Queue;
+import com.microsoft.azure.management.servicebus.ServiceBusNamespace;
+import com.microsoft.azure.spring.cloud.context.core.api.ResourceManagerProvider;
+import com.microsoft.azure.spring.cloud.context.core.util.Tuple;
+import org.springframework.cloud.stream.provisioning.ProvisioningException;
+import org.springframework.lang.NonNull;
+import org.springframework.util.Assert;
+
+/**
+ * @author Warren Zhu
+ */
+public class ServiceBusQueueChannelResourceManagerProvisioner extends ServiceBusChannelProvisioner {
+
+ private final ResourceManagerProvider resourceManagerProvider;
+ private final String namespace;
+
+ public ServiceBusQueueChannelResourceManagerProvisioner(@NonNull ResourceManagerProvider resourceManagerProvider,
+ @NonNull String namespace) {
+ Assert.hasText(namespace, "The namespace can't be null or empty");
+ this.resourceManagerProvider = resourceManagerProvider;
+ this.namespace = namespace;
+ }
+
+ @Override
+ protected void validateOrCreateForConsumer(String name, String group) {
+ ServiceBusNamespace namespace =
+ this.resourceManagerProvider.getServiceBusNamespaceManager().getOrCreate(this.namespace);
+ Queue queue = this.resourceManagerProvider.getServiceBusQueueManager().getOrCreate(Tuple.of(namespace, name));
+ if (queue == null) {
+ throw new ProvisioningException(
+ String.format("Event hub with name '%s' in namespace '%s' not existed", name, namespace));
+ }
+ }
+
+ @Override
+ protected void validateOrCreateForProducer(String name) {
+ ServiceBusNamespace namespace =
+ this.resourceManagerProvider.getServiceBusNamespaceManager().getOrCreate(this.namespace);
+ this.resourceManagerProvider.getServiceBusQueueManager().getOrCreate(Tuple.of(namespace, name));
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/resources/META-INF/spring.binders b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/resources/META-INF/spring.binders
new file mode 100644
index 0000000000000..4147b21a4aa2d
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/main/resources/META-INF/spring.binders
@@ -0,0 +1 @@
+servicebus-queue: com.microsoft.azure.servicebus.stream.binder.config.ServiceBusQueueBinderConfiguration
diff --git a/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusQueuePartitionBinderTests.java b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusQueuePartitionBinderTests.java
new file mode 100644
index 0000000000000..921ce60e05d4a
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusQueuePartitionBinderTests.java
@@ -0,0 +1,87 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder;
+
+import com.microsoft.azure.servicebus.QueueClient;
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusConsumerProperties;
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusProducerProperties;
+import com.microsoft.azure.servicebus.stream.binder.test.AzurePartitionBinderTests;
+import com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusQueueClientFactory;
+import com.microsoft.azure.spring.integration.servicebus.queue.support.ServiceBusQueueTestOperation;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
+import org.springframework.cloud.stream.binder.HeaderMode;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases are defined in super class
+ *
+ * @author Warren Zhu
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ServiceBusQueuePartitionBinderTests extends
+ AzurePartitionBinderTests,
+ ExtendedProducerProperties> {
+ @Mock
+ ServiceBusQueueClientFactory clientFactory;
+
+ @Mock
+ QueueClient queueClient;
+
+ private ServiceBusQueueTestBinder binder;
+
+ @Before
+ public void setUp() {
+ when(this.clientFactory.getOrCreateClient(anyString())).thenReturn(this.queueClient);
+ CompletableFuture future = new CompletableFuture<>();
+ future.complete(null);
+ when(this.queueClient.completeAsync(any())).thenReturn(future);
+ this.binder = new ServiceBusQueueTestBinder(new ServiceBusQueueTestOperation(this.clientFactory));
+ }
+
+ @Override
+ protected String getClassUnderTestName() {
+ return ServiceBusQueueTestBinder.class.getSimpleName();
+ }
+
+ @Override
+ protected ServiceBusQueueTestBinder getBinder() throws Exception {
+ return this.binder;
+ }
+
+ @Override
+ protected ExtendedConsumerProperties createConsumerProperties() {
+ ExtendedConsumerProperties properties =
+ new ExtendedConsumerProperties<>(new ServiceBusConsumerProperties());
+ properties.setHeaderMode(HeaderMode.embeddedHeaders);
+ return properties;
+ }
+
+ @Override
+ protected ExtendedProducerProperties createProducerProperties() {
+ ExtendedProducerProperties properties =
+ new ExtendedProducerProperties<>(new ServiceBusProducerProperties());
+ properties.setHeaderMode(HeaderMode.embeddedHeaders);
+ return properties;
+ }
+
+ @Override
+ public void testOneRequiredGroup() {
+ // Required group test rely on unsupported start position of consumer properties
+ }
+
+ @Override
+ public void testTwoRequiredGroups() {
+ // Required group test rely on unsupported start position of consumer properties
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusQueueSessionsBinderTests.java b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusQueueSessionsBinderTests.java
new file mode 100644
index 0000000000000..d3e27e711bd20
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusQueueSessionsBinderTests.java
@@ -0,0 +1,34 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder;
+
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusConsumerProperties;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.HeaderMode;
+
+/**
+ * Test cases are defined in super class
+ *
+ * @author Eduardo Sciullo
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ServiceBusQueueSessionsBinderTests extends
+ ServiceBusQueuePartitionBinderTests {
+
+ @Override
+ protected ExtendedConsumerProperties createConsumerProperties() {
+
+ ServiceBusConsumerProperties serviceBusConsumerProperties = new ServiceBusConsumerProperties();
+ serviceBusConsumerProperties.setSessionsEnabled(true);
+
+ ExtendedConsumerProperties properties = new ExtendedConsumerProperties<>(
+ serviceBusConsumerProperties);
+ properties.setHeaderMode(HeaderMode.embeddedHeaders);
+ return properties;
+ }
+
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusQueueTestBinder.java b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusQueueTestBinder.java
new file mode 100644
index 0000000000000..135fa4310b4c2
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusQueueTestBinder.java
@@ -0,0 +1,39 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder;
+
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusConsumerProperties;
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusProducerProperties;
+import com.microsoft.azure.servicebus.stream.binder.provisioning.ServiceBusChannelProvisioner;
+import com.microsoft.azure.spring.integration.servicebus.queue.ServiceBusQueueOperation;
+import org.springframework.cloud.stream.binder.AbstractTestBinder;
+import org.springframework.cloud.stream.binder.BinderHeaders;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
+import org.springframework.context.support.GenericApplicationContext;
+
+/**
+ * @author Warren Zhu
+ */
+public class ServiceBusQueueTestBinder extends
+ AbstractTestBinder,
+ ExtendedProducerProperties> {
+
+ ServiceBusQueueTestBinder(ServiceBusQueueOperation operation) {
+
+ ServiceBusQueueMessageChannelBinder binder =
+ new ServiceBusQueueMessageChannelBinder(BinderHeaders.STANDARD_HEADERS,
+ new ServiceBusChannelProvisioner(), operation);
+ GenericApplicationContext context = new GenericApplicationContext();
+ binder.setApplicationContext(context);
+ this.setBinder(binder);
+ }
+
+ @Override
+ public void cleanup() {
+ // No-op
+ }
+
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/resources/logback-test.xml b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/resources/logback-test.xml
new file mode 100644
index 0000000000000..300f34b41c279
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/resources/logback-test.xml
@@ -0,0 +1,12 @@
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
diff --git a/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000000000..ca6ee9cea8ec1
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-queue-stream-binder/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
\ No newline at end of file
diff --git a/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/CHANGELOG.md b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/CHANGELOG.md
new file mode 100644
index 0000000000000..43b25529c46ce
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/CHANGELOG.md
@@ -0,0 +1,3 @@
+# Release History
+
+## 1.2.8-beta.1 (Unreleased)
diff --git a/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/README.md b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/README.md
new file mode 100644
index 0000000000000..67a033d554ec7
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/README.md
@@ -0,0 +1,94 @@
+# Azure Spring Cloud Service Bus Queue Stream Binder client library for Java
+
+The project provides **Spring Cloud Stream Binder for Azure Service Bus Queue** which allows you to build message-driven
+microservice using **Spring Cloud Stream** based on [Azure Service Bus Queue](https://azure.microsoft.com/en-us/services/service-bus/) service.
+
+## Key concepts
+
+### Service Bus Queue Binder Overview
+
+The Spring Cloud Stream Binder for Azure Service Bus Queue provides the binding implementation for the Spring Cloud Stream.
+This implementation uses Spring Integration Service Bus Queue Channel Adapters at its foundation.
+
+#### Consumer Group
+
+This binder has no consumer group support since all consumers share one queue.
+
+#### Partitioning Support
+
+This binder has no partition support even service bus queue supports partition.
+
+## Getting started
+
+## Examples
+
+Please use this [sample](../../spring-cloud-azure-samples/servicebus-queue-binder-sample/) as a reference
+for how to use this binder in your projects.
+
+### Feature List
+
+- [Dependency Management](#dependency-management)
+- [Configuration Options](#configuration-options)
+
+#### Dependency Management
+
+**Maven Coordinates**
+```
+
+ com.microsoft.azure
+ spring-cloud-azure-servicebus-queue-stream-binder
+
+
+```
+**Gradle Coordinates**
+```
+dependencies {
+ compile group: 'com.microsoft.azure', name: 'spring-cloud-azure-servicebus-queue-stream-binder'
+}
+```
+
+#### Configuration Options
+
+The binder provides the following configuration options in `application.properties`.
+
+##### Spring Cloud Azure Properties ####
+
+Name | Description | Required | Default
+---|---|---|---
+spring.cloud.azure.credential-file-path | Location of azure credential file | Yes |
+spring.cloud.azure.resource-group | Name of Azure resource group | Yes |
+spring.cloud.azure.region | Region name of the Azure resource group, e.g. westus | Yes |
+spring.cloud.azure.servicebus.namespace | Service Bus Namespace. Auto creating if missing | Yes |
+
+##### Serivce Bus Queue Producer Properties #####
+
+It supports the following configurations with the format of `spring.cloud.stream.servicebus.queue.bindings..producer`.
+
+**_sync_**
+
+Whether the producer should act in a synchronous manner with respect to writing records into a stream. If true, the
+producer will wait for a response after a send operation.
+
+Default: `false`
+
+**_sendTimeout_**
+
+Effective only if `sync` is set to true. The amount of time to wait for a response after a send operation, in milliseconds.
+
+Default: `10000`
+
+##### Service Bus Queue Consumer Properties #####
+
+It supports the following configurations with the format of `spring.cloud.stream.servicebus.queue.bindings..consumer`.
+
+**_checkpointMode_**
+
+The mode in which checkpoints are updated.
+If `RECORD`, checkpoints occur after each record is received by Spring Channel.
+If `MANUAL`, checkpoints occur on demand by the user via the `Checkpointer`. You can get `Checkpointer` by `Message.getHeaders.get(AzureHeaders.CHECKPOINTER)`callback.
+
+Default: `RECORD`
+
+## Troubleshooting
+## Next steps
+## Contributing
diff --git a/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/pom.xml b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/pom.xml
new file mode 100644
index 0000000000000..1b27dc37d1dc5
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/pom.xml
@@ -0,0 +1,77 @@
+
+
+
+ com.azure
+ azure-client-sdk-parent
+ 1.7.0
+ ../../parents/azure-client-sdk-parent
+
+ 4.0.0
+
+ com.microsoft.azure
+ spring-cloud-azure-servicebus-stream-binder-core
+ 1.2.8-beta.1
+
+ Azure Spring Cloud Stream Service Bus Binder Core
+ https://github.com/Azure/azure-sdk-for-java
+
+
+ 0.10
+ 0.15
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream
+ 3.0.0.RELEASE
+
+
+ com.microsoft.azure
+ spring-cloud-azure-autoconfigure
+ 1.2.8-beta.1
+
+
+ com.microsoft.azure
+ spring-cloud-starter-azure-servicebus
+ 1.2.8-beta.1
+
+
+ com.microsoft.azure
+ azure-servicebus
+ 3.4.0
+
+
+
+ com.google.code.findbugs
+ jsr305
+ 3.0.2
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 3.0.0-M3
+
+
+
+
+ com.microsoft.azure:azure-servicebus:[3.4.0]
+ com.microsoft.azure:spring-cloud-azure-autoconfigure:[1.2.8-beta.1]
+ com.microsoft.azure:spring-cloud-starter-azure-servicebus:[1.2.8-beta.1]
+ org.springframework.cloud:spring-cloud-stream:[3.0.0.RELEASE]
+
+
+
+
+
+
+
+
diff --git a/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusMessageChannelBinder.java b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusMessageChannelBinder.java
new file mode 100644
index 0000000000000..bfc393272782d
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusMessageChannelBinder.java
@@ -0,0 +1,99 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder;
+
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusConsumerProperties;
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusExtendedBindingProperties;
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusProducerProperties;
+import com.microsoft.azure.servicebus.stream.binder.provisioning.ServiceBusChannelProvisioner;
+import com.microsoft.azure.spring.integration.core.DefaultMessageHandler;
+import com.microsoft.azure.spring.integration.core.api.CheckpointConfig;
+import com.microsoft.azure.spring.integration.core.api.SendOperation;
+import com.microsoft.azure.spring.integration.servicebus.ServiceBusClientConfig;
+import org.springframework.cloud.stream.binder.BinderHeaders;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
+import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
+import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
+import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
+import org.springframework.cloud.stream.provisioning.ProducerDestination;
+import org.springframework.integration.expression.FunctionExpression;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+/**
+ * @author Warren Zhu
+ * @author Eduardo Sciullo
+ */
+public abstract class ServiceBusMessageChannelBinder extends
+ AbstractMessageChannelBinder,
+ ExtendedProducerProperties,
+ ServiceBusChannelProvisioner>
+ implements
+ ExtendedPropertiesBinder {
+
+ protected T bindingProperties;
+
+ public ServiceBusMessageChannelBinder(String[] headersToEmbed, ServiceBusChannelProvisioner provisioningProvider) {
+ super(headersToEmbed, provisioningProvider);
+ }
+
+ @Override
+ protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
+ ExtendedProducerProperties producerProperties, MessageChannel errorChannel) {
+ DefaultMessageHandler handler = new DefaultMessageHandler(destination.getName(), getSendOperation());
+ handler.setBeanFactory(getBeanFactory());
+ handler.setSync(producerProperties.getExtension().isSync());
+ handler.setSendTimeout(producerProperties.getExtension().getSendTimeout());
+ handler.setSendFailureChannel(errorChannel);
+ if (producerProperties.isPartitioned()) {
+ handler.setPartitionKeyExpressionString(
+ "'partitionKey-' + headers['" + BinderHeaders.PARTITION_HEADER + "']");
+ } else {
+ handler.setPartitionKeyExpression(new FunctionExpression>(m -> m.getPayload().hashCode()));
+ }
+
+ return handler;
+ }
+
+ @Override
+ public ServiceBusConsumerProperties getExtendedConsumerProperties(String channelName) {
+ return this.bindingProperties.getExtendedConsumerProperties(channelName);
+ }
+
+ @Override
+ public ServiceBusProducerProperties getExtendedProducerProperties(String channelName) {
+ return this.bindingProperties.getExtendedProducerProperties(channelName);
+ }
+
+ @Override
+ public String getDefaultsPrefix() {
+ return this.bindingProperties.getDefaultsPrefix();
+ }
+
+ @Override
+ public Class extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
+ return this.bindingProperties.getExtendedPropertiesEntryClass();
+ }
+
+ public void setBindingProperties(T bindingProperties) {
+ this.bindingProperties = bindingProperties;
+ }
+
+ protected CheckpointConfig buildCheckpointConfig(
+ ExtendedConsumerProperties properties) {
+ return CheckpointConfig.builder().checkpointMode(properties.getExtension().getCheckpointMode()).build();
+ }
+
+ protected ServiceBusClientConfig buildClientConfig(
+ ExtendedConsumerProperties properties) {
+ ServiceBusConsumerProperties consumerProperties = properties.getExtension();
+ return ServiceBusClientConfig.builder().setPrefetchCount(consumerProperties.getPrefetchCount())
+ .setConcurrency(consumerProperties.getConcurrency())
+ .setSessionsEnabled(consumerProperties.isSessionsEnabled()).build();
+ }
+
+ abstract SendOperation getSendOperation();
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusBindingProperties.java b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusBindingProperties.java
new file mode 100644
index 0000000000000..c4bd503e33c43
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusBindingProperties.java
@@ -0,0 +1,30 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.properties;
+
+import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
+
+/**
+ * @author Warren Zhu
+ */
+public class ServiceBusBindingProperties implements BinderSpecificPropertiesProvider {
+ private ServiceBusConsumerProperties consumer = new ServiceBusConsumerProperties();
+ private ServiceBusProducerProperties producer = new ServiceBusProducerProperties();
+
+ public ServiceBusConsumerProperties getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(ServiceBusConsumerProperties consumer) {
+ this.consumer = consumer;
+ }
+
+ public ServiceBusProducerProperties getProducer() {
+ return producer;
+ }
+
+ public void setProducer(ServiceBusProducerProperties producer) {
+ this.producer = producer;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusConsumerProperties.java b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusConsumerProperties.java
new file mode 100644
index 0000000000000..1d5e3426ab310
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusConsumerProperties.java
@@ -0,0 +1,72 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.properties;
+
+import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
+
+/**
+ * @author Warren Zhu
+ * @author Eduardo Sciullo
+ */
+public class ServiceBusConsumerProperties {
+ private int prefetchCount = 1;
+ private int concurrency = 1;
+ private boolean sessionsEnabled = false;
+
+ private CheckpointMode checkpointMode = CheckpointMode.RECORD;
+
+ public CheckpointMode getCheckpointMode() {
+ return checkpointMode;
+ }
+
+ public void setCheckpointMode(CheckpointMode checkpointMode) {
+ this.checkpointMode = checkpointMode;
+ }
+
+ /**
+ * Prefetch count of underlying service bus client.
+ *
+ *
+ *
+ * @return int, default : 1
+ */
+ public int getPrefetchCount() {
+ return prefetchCount;
+ }
+
+ public void setPrefetchCount(int prefetchCount) {
+ this.prefetchCount = prefetchCount;
+ }
+
+ /**
+ * Controls the max concurrent calls of service bus message handler and
+ * the size of fixed thread pool that handles user's business logic
+ *
+ *
+ * @return int, default : 1
+ */
+ public int getConcurrency() {
+ return concurrency;
+ }
+
+ public void setConcurrency(int concurrency) {
+ this.concurrency = concurrency;
+ }
+
+ /**
+ * Controls if is session aware
+ *
+ *
+ * @return boolean, default : false
+ */
+ public boolean isSessionsEnabled() {
+ return sessionsEnabled;
+ }
+
+ public void setSessionsEnabled(boolean sessionsEnabled) {
+ this.sessionsEnabled = sessionsEnabled;
+ }
+
+
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusExtendedBindingProperties.java b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusExtendedBindingProperties.java
new file mode 100644
index 0000000000000..c2fef1b872b97
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusExtendedBindingProperties.java
@@ -0,0 +1,37 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.properties;
+
+import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
+import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Warren Zhu
+ */
+public abstract class ServiceBusExtendedBindingProperties
+ implements ExtendedBindingProperties {
+ private final Map bindings = new ConcurrentHashMap<>();
+
+ @Override
+ public ServiceBusConsumerProperties getExtendedConsumerProperties(String channelName) {
+ return this.bindings.computeIfAbsent(channelName, key -> new ServiceBusBindingProperties()).getConsumer();
+ }
+
+ @Override
+ public ServiceBusProducerProperties getExtendedProducerProperties(String channelName) {
+ return this.bindings.computeIfAbsent(channelName, key -> new ServiceBusBindingProperties()).getProducer();
+ }
+
+ @Override
+ public Class extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
+ return ServiceBusBindingProperties.class;
+ }
+
+ public Map getBindings() {
+ return bindings;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusProducerProperties.java b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusProducerProperties.java
new file mode 100644
index 0000000000000..100afdd510e2d
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusProducerProperties.java
@@ -0,0 +1,28 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.properties;
+
+/**
+ * @author Warren Zhu
+ */
+public class ServiceBusProducerProperties {
+ private boolean sync = false;
+ private long sendTimeout = 10000;
+
+ public boolean isSync() {
+ return sync;
+ }
+
+ public void setSync(boolean sync) {
+ this.sync = sync;
+ }
+
+ public long getSendTimeout() {
+ return sendTimeout;
+ }
+
+ public void setSendTimeout(long sendTimeout) {
+ this.sendTimeout = sendTimeout;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusChannelProvisioner.java b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusChannelProvisioner.java
new file mode 100644
index 0000000000000..dc8e7e41442cf
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusChannelProvisioner.java
@@ -0,0 +1,43 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.provisioning;
+
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusConsumerProperties;
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusProducerProperties;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
+import org.springframework.cloud.stream.provisioning.ConsumerDestination;
+import org.springframework.cloud.stream.provisioning.ProducerDestination;
+import org.springframework.cloud.stream.provisioning.ProvisioningException;
+import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
+
+/**
+ * @author Warren Zhu
+ */
+public class ServiceBusChannelProvisioner implements
+ ProvisioningProvider,
+ ExtendedProducerProperties> {
+
+ @Override
+ public ProducerDestination provisionProducerDestination(String name,
+ ExtendedProducerProperties properties) throws ProvisioningException {
+ validateOrCreateForProducer(name);
+ return new ServiceBusProducerDestination(name);
+ }
+
+ @Override
+ public ConsumerDestination provisionConsumerDestination(String name, String group,
+ ExtendedConsumerProperties properties) throws ProvisioningException {
+ validateOrCreateForConsumer(name, group);
+ return new ServiceBusConsumerDestination(name);
+ }
+
+ protected void validateOrCreateForConsumer(String name, String group) {
+ // no-op
+ }
+
+ protected void validateOrCreateForProducer(String name) {
+ // no-op
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusConsumerDestination.java b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusConsumerDestination.java
new file mode 100644
index 0000000000000..31b0fa6e0f7b6
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusConsumerDestination.java
@@ -0,0 +1,23 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.provisioning;
+
+import org.springframework.cloud.stream.provisioning.ConsumerDestination;
+
+/**
+ * @author Warren Zhu
+ */
+public class ServiceBusConsumerDestination implements ConsumerDestination {
+
+ private String name;
+
+ public ServiceBusConsumerDestination(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusProducerDestination.java b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusProducerDestination.java
new file mode 100644
index 0000000000000..6cbdbd780298c
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-stream-binder-core/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusProducerDestination.java
@@ -0,0 +1,28 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.provisioning;
+
+import org.springframework.cloud.stream.provisioning.ProducerDestination;
+
+/**
+ * @author Warren Zhu
+ */
+public class ServiceBusProducerDestination implements ProducerDestination {
+
+ private String name;
+
+ public ServiceBusProducerDestination(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public String getNameForPartition(int partition) {
+ return this.name + "-" + partition;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/CHANGELOG.md b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/CHANGELOG.md
new file mode 100644
index 0000000000000..43b25529c46ce
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/CHANGELOG.md
@@ -0,0 +1,3 @@
+# Release History
+
+## 1.2.8-beta.1 (Unreleased)
diff --git a/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/README.md b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/README.md
new file mode 100644
index 0000000000000..50bbd710dd66a
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/README.md
@@ -0,0 +1,120 @@
+# Spring Cloud Azure Service Bus Topic Stream Binder client library for Java
+
+The project provides **Spring Cloud Stream Binder for Azure Service Bus Topic** which allows you to build message-driven
+microservice using **Spring Cloud Stream** based on [Azure Service Bus Topic](https://azure.microsoft.com/en-us/services/service-bus/) service.
+
+## Key concepts
+
+### Service Bus Topic Binder Overview
+
+The Spring Cloud Stream Binder for Azure Service Bus Topic provides the binding implementation for the Spring Cloud Stream.
+This implementation uses Spring Integration Service Bus Topic Channel Adapters at its foundation.
+
+#### Scheduled Message
+
+This binder supports submitting messages to a topic for delayed processing. Users can send scheduled messages with header `x-delay`
+expressing in milliseconds a delay time for the message. The message will be delivered to the respective topics after `x-delay` milliseconds.
+
+#### Consumer Group
+
+Service Bus Topic provides similar support of consumer group as Apache Kafka, but with slight different logic.
+This binder rely on `Subscription` of a topic to act as a consumer group.
+
+#### Partitioning Support
+
+## Getting started
+
+This binder implementation has no partition support even service bus topic supports partition.
+
+## Examples
+
+Please use this [sample](../../spring-cloud-azure-samples/servicebus-topic-binder-sample/) as a reference
+for how to use this binder in your projects.
+
+### Feature List
+
+- [Dependency Management](#dependency-management)
+- [Configuration Options](#configuration-options)
+
+#### Dependency Management
+
+**Maven Coordinates**
+```
+
+ com.microsoft.azure
+ spring-cloud-azure-servicebus-topic-stream-binder
+
+
+```
+**Gradle Coordinates**
+```
+dependencies {
+ compile group: 'com.microsoft.azure', name: 'spring-cloud-azure-servicebus-topic-stream-binder'
+}
+```
+
+#### Configuration Options
+
+The binder provides the following configuration options in `application.properties`.
+
+##### Spring Cloud Azure Properties #####
+
+Name | Description | Required | Default
+---|---|---|---
+spring.cloud.azure.credential-file-path | Location of azure credential file | Yes |
+spring.cloud.azure.resource-group | Name of Azure resource group | Yes |
+spring.cloud.azure.region | Region name of the Azure resource group, e.g. westus | Yes |
+spring.cloud.azure.servicebus.namespace | Service Bus Namespace. Auto creating if missing | Yes |
+
+##### Serivce Bus Topic Producer Properties #####
+
+It supports the following configurations with the format of `spring.cloud.stream.servicebus.topic.bindings..producer`.
+
+**_sync_**
+
+Whether the producer should act in a synchronous manner with respect to writing messages into a stream. If true, the
+producer will wait for a response after a send operation.
+
+Default: `false`
+
+**_send-timeout_**
+
+Effective only if `sync` is set to true. The amount of time to wait for a response after a send operation, in milliseconds.
+
+Default: `10000`
+
+##### Service Bus Topic Consumer Properties #####
+
+It supports the following configurations with the format of `spring.cloud.stream.servicebus.topic.bindings..consumer`.
+
+**_checkpoint-mode_**
+
+The mode in which checkpoints are updated.
+
+`RECORD`, checkpoints occur after each record successfully processed by user-defined message handler without any exception.
+
+`MANUAL`, checkpoints occur on demand by the user via the `Checkpointer`. You can get `Checkpointer` by `Message.getHeaders.get(AzureHeaders.CHECKPOINTER)`callback.
+
+Default: `RECORD`
+
+**_prefetch-count_**
+
+Prefetch count of underlying service bus client.
+
+Default: `1`
+
+**_concurrency_**
+
+Controls the max concurrent calls of service bus message handler and the size of fixed thread pool that handles user's business logic
+
+Default: `1`
+
+**_sessionsEnabled_**
+
+Controls if is a session aware consumer. Set it to `true` if is a topic with sessions enabled.
+
+Default: `false`
+
+## Troubleshooting
+## Next steps
+## Contributing
diff --git a/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/pom.xml b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/pom.xml
new file mode 100644
index 0000000000000..d147ccba8339b
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/pom.xml
@@ -0,0 +1,68 @@
+
+
+
+ com.azure
+ azure-client-sdk-parent
+ 1.7.0
+ ../../parents/azure-client-sdk-parent
+
+ 4.0.0
+
+ com.microsoft.azure
+ spring-cloud-azure-servicebus-topic-stream-binder
+ 1.2.8-beta.1
+
+ Azure Spring Cloud Stream Service Bus Topic Binder
+ Azure Service Bus binder for Spring Cloud Stream
+ https://github.com/Azure/azure-sdk-for-java
+
+
+ 0.10
+ 0.15
+
+
+
+
+ com.microsoft.azure
+ spring-cloud-azure-servicebus-stream-binder-core
+ 1.2.8-beta.1
+
+
+
+ com.microsoft.azure
+ spring-cloud-azure-stream-binder-test
+ 1.2.8-beta.1
+ test
+
+
+
+ com.google.code.findbugs
+ jsr305
+ 3.0.2
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 3.0.0-M3
+
+
+
+
+ com.microsoft.azure:spring-cloud-azure-servicebus-stream-binder-core:[1.2.8-beta.1]
+
+
+
+
+
+
+
+
diff --git a/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusTopicMessageChannelBinder.java b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusTopicMessageChannelBinder.java
new file mode 100644
index 0000000000000..ab6b08f408fc2
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusTopicMessageChannelBinder.java
@@ -0,0 +1,58 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder;
+
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusConsumerProperties;
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusTopicExtendedBindingProperties;
+import com.microsoft.azure.servicebus.stream.binder.provisioning.ServiceBusChannelProvisioner;
+import com.microsoft.azure.spring.integration.core.api.SendOperation;
+import com.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusTopicInboundChannelAdapter;
+import com.microsoft.azure.spring.integration.servicebus.topic.ServiceBusTopicOperation;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.provisioning.ConsumerDestination;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.lang.NonNull;
+import org.springframework.util.StringUtils;
+
+import java.util.UUID;
+
+/**
+ * @author Warren Zhu
+ */
+public class ServiceBusTopicMessageChannelBinder extends
+ ServiceBusMessageChannelBinder {
+
+ private final ServiceBusTopicOperation serviceBusTopicOperation;
+
+ public ServiceBusTopicMessageChannelBinder(String[] headersToEmbed,
+ @NonNull ServiceBusChannelProvisioner provisioningProvider,
+ @NonNull ServiceBusTopicOperation serviceBusTopicOperation) {
+ super(headersToEmbed, provisioningProvider);
+ this.serviceBusTopicOperation = serviceBusTopicOperation;
+ this.bindingProperties = new ServiceBusTopicExtendedBindingProperties();
+ }
+
+ @Override
+ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
+ ExtendedConsumerProperties properties) {
+
+ this.serviceBusTopicOperation.setCheckpointConfig(buildCheckpointConfig(properties));
+ this.serviceBusTopicOperation.setClientConfig(buildClientConfig(properties));
+ boolean anonymous = !StringUtils.hasText(group);
+ if (anonymous) {
+ group = "anonymous." + UUID.randomUUID().toString();
+ }
+ ServiceBusTopicInboundChannelAdapter inboundAdapter =
+ new ServiceBusTopicInboundChannelAdapter(destination.getName(), this.serviceBusTopicOperation, group);
+ inboundAdapter.setBeanFactory(getBeanFactory());
+ ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group, properties);
+ inboundAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
+ return inboundAdapter;
+ }
+
+ @Override
+ SendOperation getSendOperation() {
+ return this.serviceBusTopicOperation;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/config/ServiceBusTopicBinderConfiguration.java b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/config/ServiceBusTopicBinderConfiguration.java
new file mode 100644
index 0000000000000..cf4c1c8f9c4cb
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/config/ServiceBusTopicBinderConfiguration.java
@@ -0,0 +1,77 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.config;
+
+import com.microsoft.azure.servicebus.stream.binder.ServiceBusTopicMessageChannelBinder;
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusTopicExtendedBindingProperties;
+import com.microsoft.azure.servicebus.stream.binder.provisioning.ServiceBusChannelProvisioner;
+import com.microsoft.azure.servicebus.stream.binder.provisioning.ServiceBusTopicChannelResourceManagerProvisioner;
+import com.microsoft.azure.spring.cloud.autoconfigure.context.AzureEnvironmentAutoConfiguration;
+import com.microsoft.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusProperties;
+import com.microsoft.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusTopicAutoConfiguration;
+import com.microsoft.azure.spring.cloud.autoconfigure.servicebus.ServiceBusUtils;
+import com.microsoft.azure.spring.cloud.context.core.api.ResourceManagerProvider;
+import com.microsoft.azure.spring.cloud.telemetry.TelemetryCollector;
+import com.microsoft.azure.spring.integration.servicebus.topic.ServiceBusTopicOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cloud.stream.binder.Binder;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * @author Warren Zhu
+ */
+@Configuration
+@ConditionalOnMissingBean(Binder.class)
+@Import({AzureServiceBusTopicAutoConfiguration.class, AzureEnvironmentAutoConfiguration.class})
+@EnableConfigurationProperties({AzureServiceBusProperties.class, ServiceBusTopicExtendedBindingProperties.class})
+public class ServiceBusTopicBinderConfiguration {
+
+ private static final String SERVICE_BUS_TOPIC_BINDER = "ServiceBusTopicBinder";
+ private static final String NAMESPACE = "Namespace";
+
+ @Autowired(required = false)
+ private ResourceManagerProvider resourceManagerProvider;
+
+ @PostConstruct
+ public void collectTelemetry() {
+ TelemetryCollector.getInstance().addService(SERVICE_BUS_TOPIC_BINDER);
+ }
+
+ @Bean
+ @ConditionalOnBean(ResourceManagerProvider.class)
+ @ConditionalOnMissingBean
+ public ServiceBusChannelProvisioner serviceBusChannelProvisioner(AzureServiceBusProperties serviceBusProperties) {
+ if (this.resourceManagerProvider != null) {
+ return new ServiceBusTopicChannelResourceManagerProvisioner(resourceManagerProvider,
+ serviceBusProperties.getNamespace());
+ } else {
+ TelemetryCollector.getInstance().addProperty(SERVICE_BUS_TOPIC_BINDER, NAMESPACE,
+ ServiceBusUtils.getNamespace(serviceBusProperties.getConnectionString()));
+ }
+ return new ServiceBusChannelProvisioner();
+ }
+
+ @Bean
+ @ConditionalOnMissingBean({ResourceManagerProvider.class, ServiceBusChannelProvisioner.class})
+ public ServiceBusChannelProvisioner serviceBusChannelProvisionerWithResourceManagerProvider() {
+ return new ServiceBusChannelProvisioner();
+ }
+
+ @Bean
+ public ServiceBusTopicMessageChannelBinder serviceBusTopicBinder(
+ ServiceBusChannelProvisioner topicChannelProvisioner, ServiceBusTopicOperation serviceBusTopicOperation,
+ ServiceBusTopicExtendedBindingProperties bindingProperties) {
+ ServiceBusTopicMessageChannelBinder binder =
+ new ServiceBusTopicMessageChannelBinder(null, topicChannelProvisioner, serviceBusTopicOperation);
+ binder.setBindingProperties(bindingProperties);
+ return binder;
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusTopicExtendedBindingProperties.java b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusTopicExtendedBindingProperties.java
new file mode 100644
index 0000000000000..19f3029b7cf0d
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/properties/ServiceBusTopicExtendedBindingProperties.java
@@ -0,0 +1,20 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.properties;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * @author Warren Zhu
+ */
+@ConfigurationProperties("spring.cloud.stream.servicebus.topic")
+public class ServiceBusTopicExtendedBindingProperties extends ServiceBusExtendedBindingProperties {
+ private static final String DEFAULTS_PREFIX = "spring.cloud.stream.servicebus.topic.default";
+
+ @Override
+ public String getDefaultsPrefix() {
+ return DEFAULTS_PREFIX;
+ }
+
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusTopicChannelResourceManagerProvisioner.java b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusTopicChannelResourceManagerProvisioner.java
new file mode 100644
index 0000000000000..8c15c34d27ebd
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/java/com/microsoft/azure/servicebus/stream/binder/provisioning/ServiceBusTopicChannelResourceManagerProvisioner.java
@@ -0,0 +1,48 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.provisioning;
+
+import com.microsoft.azure.management.servicebus.ServiceBusNamespace;
+import com.microsoft.azure.management.servicebus.Topic;
+import com.microsoft.azure.spring.cloud.context.core.api.ResourceManagerProvider;
+import com.microsoft.azure.spring.cloud.context.core.util.Tuple;
+import org.springframework.cloud.stream.provisioning.ProvisioningException;
+import org.springframework.lang.NonNull;
+import org.springframework.util.Assert;
+
+/**
+ * @author Warren Zhu
+ */
+public class ServiceBusTopicChannelResourceManagerProvisioner extends ServiceBusChannelProvisioner {
+
+ private final ResourceManagerProvider resourceManagerProvider;
+ private final String namespace;
+
+ public ServiceBusTopicChannelResourceManagerProvisioner(@NonNull ResourceManagerProvider resourceManagerProvider,
+ @NonNull String namespace) {
+ Assert.hasText(namespace, "The namespace can't be null or empty");
+ this.resourceManagerProvider = resourceManagerProvider;
+ this.namespace = namespace;
+ }
+
+ @Override
+ protected void validateOrCreateForConsumer(String name, String group) {
+ ServiceBusNamespace namespace =
+ this.resourceManagerProvider.getServiceBusNamespaceManager().getOrCreate(this.namespace);
+ Topic topic = this.resourceManagerProvider.getServiceBusTopicManager().getOrCreate(Tuple.of(namespace, name));
+ if (topic == null) {
+ throw new ProvisioningException(
+ String.format("Event hub with name '%s' in namespace '%s' not existed", name, namespace));
+ }
+
+ this.resourceManagerProvider.getServiceBusTopicSubscriptionManager().getOrCreate(Tuple.of(topic, group));
+ }
+
+ @Override
+ protected void validateOrCreateForProducer(String name) {
+ ServiceBusNamespace namespace =
+ this.resourceManagerProvider.getServiceBusNamespaceManager().getOrCreate(this.namespace);
+ this.resourceManagerProvider.getServiceBusTopicManager().getOrCreate(Tuple.of(namespace, name));
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/resources/META-INF/spring.binders b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/resources/META-INF/spring.binders
new file mode 100644
index 0000000000000..da79f1167f203
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/main/resources/META-INF/spring.binders
@@ -0,0 +1 @@
+servicebus-topic: com.microsoft.azure.servicebus.stream.binder.config.ServiceBusTopicBinderConfiguration
diff --git a/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusTopicPartitionBinderTests.java b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusTopicPartitionBinderTests.java
new file mode 100644
index 0000000000000..afeaa9495e32c
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusTopicPartitionBinderTests.java
@@ -0,0 +1,88 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder;
+
+import com.microsoft.azure.servicebus.SubscriptionClient;
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusConsumerProperties;
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusProducerProperties;
+import com.microsoft.azure.servicebus.stream.binder.test.AzurePartitionBinderTests;
+import com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusTopicClientFactory;
+import com.microsoft.azure.spring.integration.servicebus.topic.support.ServiceBusTopicTestOperation;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
+import org.springframework.cloud.stream.binder.HeaderMode;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases are defined in super class
+ *
+ * @author Warren Zhu
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ServiceBusTopicPartitionBinderTests extends
+ AzurePartitionBinderTests,
+ ExtendedProducerProperties> {
+ @Mock
+ ServiceBusTopicClientFactory clientFactory;
+
+ @Mock
+ SubscriptionClient subscriptionClient;
+
+ private ServiceBusTopicTestBinder binder;
+
+ @Before
+ public void setUp() {
+ when(this.clientFactory.getOrCreateSubscriptionClient(anyString(), anyString()))
+ .thenReturn(this.subscriptionClient);
+ CompletableFuture future = new CompletableFuture<>();
+ future.complete(null);
+ when(this.subscriptionClient.completeAsync(any())).thenReturn(future);
+ this.binder = new ServiceBusTopicTestBinder(new ServiceBusTopicTestOperation(this.clientFactory));
+ }
+
+ @Override
+ protected String getClassUnderTestName() {
+ return ServiceBusTopicTestBinder.class.getSimpleName();
+ }
+
+ @Override
+ protected ServiceBusTopicTestBinder getBinder() throws Exception {
+ return this.binder;
+ }
+
+ @Override
+ protected ExtendedConsumerProperties createConsumerProperties() {
+ ExtendedConsumerProperties properties =
+ new ExtendedConsumerProperties<>(new ServiceBusConsumerProperties());
+ properties.setHeaderMode(HeaderMode.embeddedHeaders);
+ return properties;
+ }
+
+ @Override
+ protected ExtendedProducerProperties createProducerProperties() {
+ ExtendedProducerProperties properties =
+ new ExtendedProducerProperties<>(new ServiceBusProducerProperties());
+ properties.setHeaderMode(HeaderMode.embeddedHeaders);
+ return properties;
+ }
+
+ @Override
+ public void testOneRequiredGroup() {
+ // Required group test rely on unsupported start position of consumer properties
+ }
+
+ @Override
+ public void testTwoRequiredGroups() {
+ // Required group test rely on unsupported start position of consumer properties
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusTopicSessionsBinderTests.java b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusTopicSessionsBinderTests.java
new file mode 100644
index 0000000000000..6dcc5793c8c77
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusTopicSessionsBinderTests.java
@@ -0,0 +1,33 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder;
+
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusConsumerProperties;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.HeaderMode;
+
+/**
+ * Test cases are defined in super class
+ *
+ * @author Eduardo Sciullo
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ServiceBusTopicSessionsBinderTests extends
+ ServiceBusTopicPartitionBinderTests {
+
+ @Override
+ protected ExtendedConsumerProperties createConsumerProperties() {
+
+ ServiceBusConsumerProperties serviceBusConsumerProperties = new ServiceBusConsumerProperties();
+ serviceBusConsumerProperties.setSessionsEnabled(true);
+
+ ExtendedConsumerProperties properties = new ExtendedConsumerProperties<>(
+ serviceBusConsumerProperties);
+ properties.setHeaderMode(HeaderMode.embeddedHeaders);
+ return properties;
+ }
+
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusTopicTestBinder.java b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusTopicTestBinder.java
new file mode 100644
index 0000000000000..9cb22e15d3343
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/java/com/microsoft/azure/servicebus/stream/binder/ServiceBusTopicTestBinder.java
@@ -0,0 +1,39 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder;
+
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusConsumerProperties;
+import com.microsoft.azure.servicebus.stream.binder.properties.ServiceBusProducerProperties;
+import com.microsoft.azure.servicebus.stream.binder.provisioning.ServiceBusChannelProvisioner;
+import com.microsoft.azure.spring.integration.servicebus.topic.ServiceBusTopicOperation;
+import org.springframework.cloud.stream.binder.AbstractTestBinder;
+import org.springframework.cloud.stream.binder.BinderHeaders;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
+import org.springframework.context.support.GenericApplicationContext;
+
+/**
+ * @author Warren Zhu
+ */
+public class ServiceBusTopicTestBinder extends
+ AbstractTestBinder,
+ ExtendedProducerProperties> {
+
+ ServiceBusTopicTestBinder(ServiceBusTopicOperation operation) {
+
+ ServiceBusTopicMessageChannelBinder binder =
+ new ServiceBusTopicMessageChannelBinder(BinderHeaders.STANDARD_HEADERS,
+ new ServiceBusChannelProvisioner(), operation);
+ GenericApplicationContext context = new GenericApplicationContext();
+ binder.setApplicationContext(context);
+ this.setBinder(binder);
+ }
+
+ @Override
+ public void cleanup() {
+ // No-op
+ }
+
+}
diff --git a/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/resources/logback-test.xml b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/resources/logback-test.xml
new file mode 100644
index 0000000000000..300f34b41c279
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/resources/logback-test.xml
@@ -0,0 +1,12 @@
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
diff --git a/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000000000..ca6ee9cea8ec1
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-servicebus-topic-stream-binder/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
\ No newline at end of file
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-test/CHANGELOG.md b/sdk/spring/azure-spring-cloud-stream-binder-test/CHANGELOG.md
new file mode 100644
index 0000000000000..43b25529c46ce
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-test/CHANGELOG.md
@@ -0,0 +1,3 @@
+# Release History
+
+## 1.2.8-beta.1 (Unreleased)
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-test/README.md b/sdk/spring/azure-spring-cloud-stream-binder-test/README.md
new file mode 100644
index 0000000000000..f8bbbf381daad
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-test/README.md
@@ -0,0 +1,8 @@
+# Azure Spring Cloud Stream Binder Test client library for Java
+
+## Key concepts
+## Getting started
+## Examples
+## Troubleshooting
+## Next steps
+## Contributing
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-test/pom.xml b/sdk/spring/azure-spring-cloud-stream-binder-test/pom.xml
new file mode 100644
index 0000000000000..4b477aebb7f5b
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-test/pom.xml
@@ -0,0 +1,95 @@
+
+
+
+ com.azure
+ azure-client-sdk-parent
+ 1.7.0
+ ../../parents/azure-client-sdk-parent
+
+ 4.0.0
+
+ com.microsoft.azure
+ spring-cloud-azure-stream-binder-test
+ 1.2.8-beta.1
+
+ Azure Spring Cloud Stream Binder Test
+ https://github.com/Azure/azure-sdk-for-java
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-test
+ 3.0.0.RELEASE
+
+
+ junit
+ junit
+ 4.13
+
+
+ org.mockito
+ mockito-core
+ 3.3.3
+
+
+ org.powermock
+ powermock-api-mockito2
+ 2.0.2
+
+
+ org.powermock
+ powermock-module-junit4
+ 2.0.2
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ 2.3.2.RELEASE
+
+
+ org.hibernate.validator
+ hibernate-validator
+ 6.1.5.Final
+
+
+ org.springframework.cloud
+ spring-cloud-stream
+ 3.0.0.RELEASE
+
+
+ com.microsoft.azure
+ spring-cloud-azure-autoconfigure
+ 1.2.8-beta.1
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 3.0.0-M3
+
+
+
+
+ com.microsoft.azure:spring-cloud-azure-autoconfigure:[1.2.8-beta.1]
+ com.microsoft.azure:spring-integration-azure-core:[1.2.8-beta.1]
+ junit:junit:[4.13]
+ org.hibernate.validator:hibernate-validator:[6.1.5.Final]
+ org.mockito:mockito-core:[3.3.3]
+ org.powermock:powermock-api-mockito2:[2.0.2]
+ org.powermock:powermock-module-junit4:[2.0.2]
+ org.springframework.boot:spring-boot-starter-test:[2.3.2.RELEASE]
+ org.springframework.cloud:spring-cloud-stream-binder-test:[3.0.0.RELEASE]
+ org.springframework.cloud:spring-cloud-stream:[3.0.0.RELEASE]
+
+
+
+
+
+
+
+
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/AbstractStatistics.java b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/AbstractStatistics.java
new file mode 100644
index 0000000000000..78224bb65e663
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/AbstractStatistics.java
@@ -0,0 +1,90 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.test;
+
+import net.jcip.annotations.ThreadSafe;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
+@ThreadSafe
+public abstract class AbstractStatistics {
+ private final int size;
+ private final Statistics throughput;
+ private final Statistics numberMessagePerSec;
+ private final long reportingInterval;
+ private final LongAdder totalMessages;
+ private final LongAdder totalBytes;
+ private final ScheduledExecutorService scheduler;
+
+ public AbstractStatistics(int size, long reportingInterval, String labelPrefix) {
+ this.size = size;
+ this.throughput = new Statistics(labelPrefix + "Throughput KB/sec");
+ this.numberMessagePerSec = new Statistics(labelPrefix + "nMsg/sec");
+ this.reportingInterval = reportingInterval;
+ this.totalMessages = new LongAdder();
+ this.totalBytes = new LongAdder();
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+ this.scheduler.scheduleAtFixedRate(new ReportingTask(), this.reportingInterval, this.reportingInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void record(long messageSize) {
+ totalMessages.increment();
+ totalBytes.add(messageSize);
+ }
+
+ public void printSummary() {
+ this.throughput.printSummary();
+ this.numberMessagePerSec.printSummary();
+ }
+
+ protected void complete() {
+ }
+
+ class ReportingTask implements Runnable {
+ private final AtomicLong lastReportingTime;
+ private final AtomicLong lastMessages;
+ private final AtomicLong lastBytes;
+
+ ReportingTask() {
+ this.lastReportingTime = new AtomicLong(System.currentTimeMillis());
+ this.lastMessages = new AtomicLong(0);
+ this.lastBytes = new AtomicLong(0);
+ }
+
+ @Override
+ public void run() {
+ long now = System.currentTimeMillis();
+ long messages = totalMessages.longValue();
+ long bytes = totalBytes.longValue();
+
+ long elapsedMs = now - lastReportingTime.get();
+ double windowKbRead = ((bytes - lastBytes.get()) * 1.0) / (1024);
+ double windowKbPerSec = 1000.0 * windowKbRead / elapsedMs;
+ long windowMessage = messages - lastMessages.get();
+ double windowMessagesPerSec = (windowMessage * 1.0 / elapsedMs) * 1000.0;
+
+ if (windowMessage > 0) {
+ throughput.record(windowKbPerSec);
+ numberMessagePerSec.record(windowMessagesPerSec);
+ }
+
+ lastReportingTime.set(now);
+ lastBytes.set(bytes);
+ lastMessages.set(messages);
+
+ System.out.printf("Total %d records, %.1f records/sec (%.4f KB/sec).%n", messages, windowMessagesPerSec,
+ windowKbPerSec);
+
+ if (messages >= size) {
+ printSummary();
+ complete();
+ }
+ }
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/AzurePartitionBinderTests.java b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/AzurePartitionBinderTests.java
new file mode 100644
index 0000000000000..11c262a3c2ac9
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/AzurePartitionBinderTests.java
@@ -0,0 +1,118 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.test;
+
+import org.assertj.core.api.Assertions;
+import org.junit.BeforeClass;
+import org.springframework.cloud.stream.binder.AbstractBinder;
+import org.springframework.cloud.stream.binder.ConsumerProperties;
+import org.springframework.cloud.stream.binder.PartitionCapableBinderTests;
+import org.springframework.cloud.stream.binder.ProducerProperties;
+import org.springframework.cloud.stream.binder.AbstractTestBinder;
+import org.springframework.cloud.stream.binder.Spy;
+import org.springframework.cloud.stream.binder.Binder;
+import org.springframework.cloud.stream.binder.Binding;
+import org.springframework.cloud.stream.config.BindingProperties;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.support.MessageBuilder;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.util.Assert;
+import org.springframework.util.MimeTypeUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test cases are defined in super class
+ *
+ * @author Warren Zhu
+ */
+public abstract class AzurePartitionBinderTests, CP, PP>,
+ CP extends ConsumerProperties, PP extends ProducerProperties>
+ extends PartitionCapableBinderTests {
+
+ @BeforeClass
+ public static void enableTests() {
+ }
+
+ @Override
+ protected boolean usesExplicitRouting() {
+ return false;
+ }
+
+ @Override
+ public Spy spyOn(String name) {
+ return null;
+ }
+
+ @Override
+ public void testClean() throws Exception {
+ // No-op
+ }
+
+ @Override
+ public void testPartitionedModuleJava() {
+ // Partitioned consumer mode unsupported yet
+ }
+
+ @Override
+ public void testPartitionedModuleSpEL() {
+ // Partitioned consumer mode unsupported
+ }
+
+ @Override
+ public void testAnonymousGroup() {
+ // azure binder not support anonymous group
+ }
+
+ // Same logic as super.testSendAndReceiveNoOriginalContentType() except one line commented below
+ @Override
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testSendAndReceiveNoOriginalContentType() throws Exception {
+ Binder binder = getBinder();
+
+ BindingProperties producerBindingProperties = createProducerBindingProperties(createProducerProperties());
+ DirectChannel moduleOutputChannel = createBindableChannel("output", producerBindingProperties);
+ BindingProperties inputBindingProperties = createConsumerBindingProperties(createConsumerProperties());
+ DirectChannel moduleInputChannel = createBindableChannel("input", inputBindingProperties);
+ Binding producerBinding =
+ binder.bindProducer(String.format("bar%s0", getDestinationNameDelimiter()), moduleOutputChannel,
+ producerBindingProperties.getProducer());
+ Binding consumerBinding =
+ binder.bindConsumer(String.format("bar%s0", getDestinationNameDelimiter()),
+ "testSendAndReceiveNoOriginalContentType", moduleInputChannel, createConsumerProperties());
+ binderBindUnbindLatency();
+
+ Message> message =
+ MessageBuilder.withPayload("foo").setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
+ .build();
+
+ // Comment line below since service bus topic operation is event driven mode
+ // but subscriber is not ready in the downstream
+ //moduleOutputChannel.send(message);
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference> inboundMessageRef = new AtomicReference<>();
+ moduleInputChannel.subscribe(message1 -> {
+ try {
+ inboundMessageRef.set((Message) message1);
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ moduleOutputChannel.send(message);
+ Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");
+ Assertions.assertThat(inboundMessageRef.get()).isNotNull();
+ Assertions.assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo".getBytes(StandardCharsets.UTF_8));
+ Assertions.assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE).toString())
+ .isEqualTo(MimeTypeUtils.TEXT_PLAIN_VALUE);
+ producerBinding.unbind();
+ consumerBinding.unbind();
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/ConsumerStatistics.java b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/ConsumerStatistics.java
new file mode 100644
index 0000000000000..f1296192822f9
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/ConsumerStatistics.java
@@ -0,0 +1,24 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.test;
+
+import net.jcip.annotations.ThreadSafe;
+
+import java.util.concurrent.CountDownLatch;
+
+@ThreadSafe
+public class ConsumerStatistics extends AbstractStatistics {
+
+ private final CountDownLatch testCompleted;
+
+ public ConsumerStatistics(int size, long reportingInterval, CountDownLatch testCompleted) {
+ super(size, reportingInterval, "Consume ");
+ this.testCompleted = testCompleted;
+ }
+
+ @Override
+ protected void complete() {
+ this.testCompleted.countDown();
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/ProducerPerformance.java b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/ProducerPerformance.java
new file mode 100644
index 0000000000000..08ad74131f8ce
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/ProducerPerformance.java
@@ -0,0 +1,56 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.test;
+
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.support.GenericMessage;
+
+import java.util.Random;
+
+/**
+ * Producer perf test via {@link MessageChannel}
+ *
+ * @author Warren Zhu
+ */
+public class ProducerPerformance {
+
+ public static void startPerfTest(MessageChannel channel, int recordSize, int numRecords, int throughput) {
+
+ byte[] payload = new byte[recordSize];
+ Random random = new Random(0);
+
+ for (int i = 0; i < payload.length; ++i) {
+ payload[i] = (byte) (random.nextInt(26) + 65);
+ }
+
+ ProducerStatistics statistics = new ProducerStatistics(numRecords, 5000);
+ long startMs = System.currentTimeMillis();
+
+ ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
+
+ int failedMessage = 0;
+
+ for (int i = 0; i < numRecords; i++) {
+
+ long sendStartMs = System.currentTimeMillis();
+
+ boolean succeed = channel.send(new GenericMessage<>(payload));
+
+ if (succeed) {
+ long now = System.currentTimeMillis();
+ statistics.record(payload.length, now - sendStartMs);
+ } else {
+ failedMessage++;
+ }
+
+ if (throttler.shouldThrottle(i, sendStartMs)) {
+ throttler.throttle();
+ }
+ }
+
+ System.out.println("Failed message count: " + failedMessage);
+ statistics.printSummary();
+ }
+
+}
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/ProducerStatistics.java b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/ProducerStatistics.java
new file mode 100644
index 0000000000000..8e3d74aafd65d
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/ProducerStatistics.java
@@ -0,0 +1,26 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.test;
+
+import net.jcip.annotations.ThreadSafe;
+
+@ThreadSafe
+public class ProducerStatistics extends AbstractStatistics {
+ private final Statistics sendLatency;
+
+ ProducerStatistics(int size, long reportingInterval) {
+ super(size, reportingInterval, "Produce ");
+ this.sendLatency = new Statistics("Produce Latency (ms)");
+ }
+
+ public void record(long messageSize, long latency) {
+ super.record(messageSize);
+ this.sendLatency.record(latency);
+ }
+
+ public void printSummary() {
+ super.printSummary();
+ this.sendLatency.printSummary();
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/Statistics.java b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/Statistics.java
new file mode 100644
index 0000000000000..2a9a9c326ce0e
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/Statistics.java
@@ -0,0 +1,53 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.test;
+
+import com.google.common.math.Quantiles;
+
+import java.util.ArrayList;
+import java.util.DoubleSummaryStatistics;
+import java.util.List;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class Statistics {
+ private final List stats;
+ private final String label;
+
+ public Statistics(String label) {
+ this.stats = new ArrayList<>();
+ this.label = label;
+ }
+
+ public void record(double data) {
+ stats.add(data);
+ }
+
+ public void printSummary() {
+ Map statByColumn = new LinkedHashMap<>();
+
+ DoubleSummaryStatistics summaryStatistics = stats.stream().collect(Collectors.summarizingDouble(i -> i));
+
+ statByColumn.put("Average", summaryStatistics.getAverage());
+ statByColumn.put("Min", summaryStatistics.getMin());
+ statByColumn.put("Max", summaryStatistics.getMax());
+
+ int[] perc = {50, 90, 95, 99};
+ Map percentiles = Quantiles.percentiles().indexes(perc).compute(stats);
+
+ percentiles.forEach((key, value) -> statByColumn.put(key.toString(), value));
+
+ System.out.println(this.label);
+ System.out.println("=====================");
+
+ System.out.println(statByColumn.keySet().stream().collect(Collectors.joining("|", "|", "|")));
+ System.out.println(IntStream.range(1, statByColumn.size() + 1).mapToObj(i -> " -- ")
+ .collect(Collectors.joining("|", "|", "|")));
+ System.out.println(statByColumn.values().stream().map(i -> String.format("%.4f", i))
+ .collect(Collectors.joining("|", "|", "|")));
+ System.out.println();
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/ThroughputThrottler.java b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/ThroughputThrottler.java
new file mode 100644
index 0000000000000..2cf0c74f24d07
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-test/src/main/java/com/microsoft/azure/servicebus/stream/binder/test/ThroughputThrottler.java
@@ -0,0 +1,122 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.microsoft.azure.servicebus.stream.binder.test;
+
+/**
+ * This class helps producers throttle throughput.
+ *
+ * If targetThroughput >= 0, the resulting average throughput will be approximately
+ * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
+ * no throttling will occur.
+ *
+ * To use, do this between successive send attempts:
+ *
+ * {@code
+ * if (throttler.shouldThrottle(...)) {
+ * throttler.throttle();
+ * }
+ * }
+ *
+ *
+ * Note that this can be used to throttle message throughput or data throughput.
+ */
+public class ThroughputThrottler {
+
+ private static final long NS_PER_MS = 1000000L;
+ private static final long NS_PER_SEC = 1000 * NS_PER_MS;
+ private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
+
+ private final long startMs;
+ private final long sleepTimeNs;
+ private final long targetThroughput;
+
+ private long sleepDeficitNs = 0;
+ private boolean wakeup = false;
+
+ /**
+ * @param targetThroughput Can be messages/sec or bytes/sec
+ * @param startMs When the very first message is sent
+ */
+ public ThroughputThrottler(long targetThroughput, long startMs) {
+ this.startMs = startMs;
+ this.targetThroughput = targetThroughput;
+ this.sleepTimeNs = targetThroughput > 0 ? NS_PER_SEC / targetThroughput : Long.MAX_VALUE;
+ }
+
+ /**
+ * @param amountSoFar bytes produced so far if you want to throttle data throughput, or
+ * messages produced so far if you want to throttle message throughput.
+ * @param sendStartMs timestamp of the most recently sent message
+ * @return boolean
+ */
+ public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
+ if (this.targetThroughput < 0) {
+ // No throttling in this case
+ return false;
+ }
+
+ float elapsedSec = (sendStartMs - startMs) / 1000.f;
+ return elapsedSec > 0 && (amountSoFar / elapsedSec) > this.targetThroughput;
+ }
+
+ /**
+ * Occasionally blocks for small amounts of time to achieve targetThroughput.
+ *
+ * Note that if targetThroughput is 0, this will block extremely aggressively.
+ */
+ public void throttle() {
+ if (targetThroughput == 0) {
+ try {
+ synchronized (this) {
+ while (!wakeup) {
+ this.wait();
+ }
+ }
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+ return;
+ }
+
+ // throttle throughput by sleeping, on average,
+ // (1 / this.throughput) seconds between "things sent"
+ sleepDeficitNs += sleepTimeNs;
+
+ // If enough sleep deficit has accumulated, sleep a little
+ if (sleepDeficitNs >= MIN_SLEEP_NS) {
+ long sleepStartNs = System.nanoTime();
+ try {
+ synchronized (this) {
+ long remaining = sleepDeficitNs;
+ while (!wakeup && remaining > 0) {
+ long sleepMs = remaining / 1000000;
+ long sleepNs = remaining - sleepMs * 1000000;
+ this.wait(sleepMs, (int) sleepNs);
+ long elapsed = System.nanoTime() - sleepStartNs;
+ remaining = sleepDeficitNs - elapsed;
+ }
+ wakeup = false;
+ }
+ sleepDeficitNs = 0;
+ } catch (InterruptedException e) {
+ // If sleep is cut short, reduce deficit by the amount of
+ // time we actually spent sleeping
+ long sleepElapsedNs = System.nanoTime() - sleepStartNs;
+ if (sleepElapsedNs <= sleepDeficitNs) {
+ sleepDeficitNs -= sleepElapsedNs;
+ }
+ }
+ }
+ }
+
+ /**
+ * Wakeup the throttler if its sleeping.
+ */
+ public void wakeup() {
+ synchronized (this) {
+ wakeup = true;
+ this.notifyAll();
+ }
+ }
+}
diff --git a/sdk/spring/ci.yml b/sdk/spring/ci.yml
index 62cb0fa419fed..1bc9a08b0feda 100644
--- a/sdk/spring/ci.yml
+++ b/sdk/spring/ci.yml
@@ -109,6 +109,21 @@ extends:
- name: spring-cloud-starter-azure-storage-queue
groupId: com.microsoft.azure
safeName: springcloudstarterazurestoragequeue
+ - name: spring-cloud-azure-servicebus-stream-binder-core
+ groupId: com.microsoft.azure
+ safeName: springcloudazureservicebusstreambindercore
+ - name: spring-cloud-azure-stream-binder-test
+ groupId: com.microsoft.azure
+ safeName: springcloudazurestreambindertest
+ - name: spring-cloud-azure-servicebus-topic-stream-binder
+ groupId: com.microsoft.azure
+ safeName: springcloudazureservicebustopicstreambinder
+ - name: spring-cloud-azure-servicebus-queue-stream-binder
+ groupId: com.microsoft.azure
+ safeName: springcloudazureservicebusqueuestreambinder
+ - name: spring-cloud-azure-eventhubs-stream-binder
+ groupId: com.microsoft.azure
+ safeName: springcloudazureeventhubsstreambinder
AdditionalModules:
- name: azure-spring-boot-sample-active-directory
groupId: com.azure
diff --git a/sdk/spring/pom.xml b/sdk/spring/pom.xml
index db47ae38e126a..550bc1853b3dd 100644
--- a/sdk/spring/pom.xml
+++ b/sdk/spring/pom.xml
@@ -57,6 +57,11 @@
azure-spring-cloud-starter-servicebus
azure-spring-cloud-starter-storage
azure-spring-cloud-starter-storage-queue
+ azure-spring-cloud-servicebus-stream-binder-core
+ azure-spring-cloud-stream-binder-test
+ azure-spring-cloud-servicebus-queue-stream-binder
+ azure-spring-cloud-servicebus-topic-stream-binder
+ azure-spring-cloud-eventhubs-stream-binder