Skip to content

Communicate to Azure Service Fabric Services using Service Bus Queues and Topics

License

Notifications You must be signed in to change notification settings

loekd/ServiceFabric.ServiceBus

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Netstandard 2.0

Check out the Netstandard 2.0 services package and provide feedback! https://www.nuget.org/packages/ServiceFabric.ServiceBus.Services.Netstd/6.0.3

Demo Project

Need some help to get started? Have a look at: 'https://github.com/loekd/ServiceFabric.ServiceBus/tree/master/ServiceFabric.ServiceBus.Demo'

Nuget Packages:

Three packages, one for Receiving Brokered Messsages on the full .NET framework, one for Netstandard 2.0. One (optional) package for Sending them.

ServiceFabric.ServiceBus.Services

https://www.nuget.org/packages/ServiceFabric.ServiceBus.Services Contains implementations of ICommunicationListener that receive messages from Azure Service Bus (Queue/Subscription).

Receive from Queues:

  • ServiceBusQueueBatchCommunicationListener
  • ServiceBusQueueCommunicationListener

Receive from Subscriptions:

  • ServiceBusSubscriptionBatchCommunicationListener
  • ServiceBusSubscriptionCommunicationListener

Note: Session support is available too, in the package for Full Fx.

ServiceFabric.ServiceBus.Clients

https://www.nuget.org/packages/ServiceFabric.ServiceBus.Clients (Optional package) Receive BrokeredMessages in Service Fabric Reliable Services using the Communication Listener from the package 'ServiceFabric.ServiceBus.Services'. Provides a ServiceBusTopicCommunicationClient to be used with 'ServicePartitionClient'.

If you post messages to Service Bus in a different way, you won't need the client package.

Contribute!

Contributions are welcome. Please upgrade the package version with a minor tick if there are no breaking changes. And add a line to the readme.md, stating the changes, e.g. 'upgraded to SF version x.y.z'. Doing so will allow me to simply accept the PR, which will automatically trigger the release of a new package. Please also make sure all feature additions have a corresponding unit test.

Release notes:

v7.0.4 (Netstd only)

  • Opt out of autocomplete by default.

v7.0.3 (Netstd only)

  • Abandon received messages while closing.

v7.0.2 (Netstd only)

  • add CompleteMessage to DefaultServiceBusMessageReceiver to allow easy message completion.

v7.0.1 (Netstd only)

  • fix bug that completes messages twice when auto complete is true

v7.0.0 (Netstd only)

  • fix bug that would cause the ServiceBusCommunicationListener to close before all messages are processed, when MaxConcurrentCalls is greater than 1.

v6.0.3 (Netstd only)

  • add IServiceBusCommunicationListener to DefaultServiceBusMessageReceiver

v6.0.2

  • Netstandard package is no longer in preview

v6.0.0-preview (netstandard version)

  • allow factory to provide IServiceBusMessageReceiver instance to communication client.

v6.0.0-preview (netstandard version)

  • add netstandard 2.0 version. Not fully backwards compatible due to different Service Bus package. Supports Queues and Subscriptions. No sessions yet, no batches.

v5.1.8

  • add strong name to assemblies

v5.1.7

  • Upgraded nuget packages (SF 3.1.301)

v5.1.6

  • Upgraded nuget packages (SF 3.0.456)
  • Upgraded lowest netfx version to net452

v5.1.5

  • Upgraded nuget packages (SF 2.8.232)

v5.1.4

  • Upgraded nuget packages (SF 2.8.219)

v5.1.3

  • Upgraded nuget packages (SF 2.8.211)

v5.1.2

  • Upgraded nuget packages (SF 2.7.198)

v5.1.1

  • Fixed optional string not being optional issue reported by jfloodnet.

v5.1.0

  • Upgraded nuget packages (SF 2.6.220)

v5.0.0

  • Upgraded nuget packages (SF 2.6.210)
  • Upgraded sln to VS2017

v4.6.0

  • Upgraded nuget packages (SF 2.5.216)

v4.5.1

  • upgraded ASB Nuget package
  • fixed throwing of an ArgumentNullException when not passsing optional ServiceContext.

v4.5.0

  • upgraded to new SDK and packages (2.4.164)

v4.4.0

  • upgraded to new SDK and packages (2.4.145)

v4.3.1

  • clarify documentation

v4.3.0

  • upgraded to new SDK and packages (2.3.311)

v4.1.0

  • upgraded to new SDK and packages (2.3.301)

v4.0.0

  • Some breaking changes in order to have full batch and session support.
  • For batch support use: ServiceBusQueueBatchCommunicationListener and ServiceBusSubscriptionBatchCommunicationListener, combined with IServiceBusMessageBatchReceiver.
  • For single message receive use: ServiceBusQueueCommunicationListener and ServiceBusSubscriptionCommunicationListener, combined with IServiceBusMessageReceiver.
  • Message handlers now provide the MessageSession if available.
  • Some properties are now obsolete to make naming more consistent.
  • Single messages are now received using the message pump.
  • Support entity path in connection strings, or explicit entitypath argument

v3.6.0

  • Updated to new SDK, no other changes

v3.5.0

  • Added logging support.

  • Added automatic BrokeredMessage lock renewal option.

    How to use:

    Set the properties 'LogAction' and 'MessageLockRenewTimeSpan' on the Communication Listeners to enable:

    [..]
    Action<string> logAction = log => ServiceEventSource.Current.ServiceMessage(this, log);
    new ServiceInstanceListener(context => new ServiceBusQueueCommunicationListener(
                  new Handler(logAction)
                  , context
                  , serviceBusQueueName
                  , requireSessions: false)
              {
                  MessageLockRenewTimeSpan = TimeSpan.FromSeconds(50),  //auto renew every 50s, so processing can take longer than 60s (default lock duration).
                  LogAction = logAction
              }, "StatelessService-ServiceBusQueueListener");

[..]


v3.3.0
- Fixed a permissions issue that was introduced with session support. (Communication Listeners required 'manage' permissions.) (Found by Denis.)
The option to use sessions is now a constructor argument for the Communication Listeners.
- Upgraded all nuget packages.

v3.2.2
- Merged pull request by cpletz that addresses an await issue.
- Upgraded all nuget packages.

v3.2.1	
- Added Session support and demo code that uses it. (requested by Aaron) 
- Upgraded all nuget packages.

v3.1.0	
- Upgraded all nuget packages. (Newer SDK version)
- Fixed async issue in AutoCompleteServiceBusMessageReceiver

v3.0.0	
- Handlers are now async to allow await in handling code. 
- Upgraded all nuget packages.
- Dispose BrokeredMessage after handling completes.

v2.0.0
- Upgraded to GA

v1.0.0
- First commit

### If you want to integrate this into an existing project, read on...
----------------------------------------------

# Server Package
Make sure your projects are configured to build as 64 bit programs!
----------------------------------------------

## To create a message handler:

*You can also use the ```AutoCompleteServiceBusMessageReceiver``` and ```AutoCompleteBatchServiceBusMessageReceiver``` handlers supplied by the package.*

```javascript
internal sealed class Handler : IServiceBusMessageReceiver
{
  private readonly StatefulService _service;

  public Handler(StatefulService service)
  {
      _service = service;
  }
  
  public Task ReceiveMessageAsync(BrokeredMessage message, MessageSession session, CancellationToken cancellationToken)
      {
          ServiceEventSource.Current.ServiceMessage(_service, $"Handling subscription message {message.MessageId}");
          return Task.FromResult(true);
      }
}

To create a Stateful Service that can be accessed through a Service Bus Queue:

internal sealed class SampleQueueListeningStatefulService : StatefulService
{
    protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
    {
        // In the configuration file, define connection strings: 
        // "Microsoft.ServiceBus.ConnectionString.Receive"
        // and "Microsoft.ServiceBus.ConnectionString.Send"
            
        // Also, define a QueueName:
        string serviceBusQueueName = CloudConfigurationManager.GetSetting("QueueName");

        yield return new ServiceReplicaListener(context => new ServiceBusQueueCommunicationListener(
            new Handler(this)
            , context				
            , serviceBusQueueName), "StatefulService-ServiceBusQueueListener");
    }
}

To create a Stateful Service that can be accessed through a Service Bus Subscription:

internal sealed class SampleSubscriptionListeningStatefulService : StatefulService
{
    protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
    {
        // In the configuration file, define connection strings: 
        // "Microsoft.ServiceBus.ConnectionString.Receive"
        // and "Microsoft.ServiceBus.ConnectionString.Send"
            
        // Also, define Topic & Subscription Names:
        string serviceBusTopicName = CloudConfigurationManager.GetSetting("TopicName");
        string serviceBusSubscriptionName = CloudConfigurationManager.GetSetting("SubscriptionName");

        yield return new ServiceReplicaListener(context => new ServiceBusSubscriptionCommunicationListener(
            new Handler(this)
            , context			
            , serviceBusTopicName
            , serviceBusSubscriptionName), "StatefulService-ServiceBusSubscriptionListener");
    }
}

To create a Stateless Service that can be accessed through a Service Bus Queue:

internal sealed class SampleQueueListeningStatelessService : StatelessService
{
    protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        // In the configuration file, define connection strings: 
        // "Microsoft.ServiceBus.ConnectionString.Receive"
        // and "Microsoft.ServiceBus.ConnectionString.Send"
            
        // Also, define a QueueName:
        string serviceBusQueueName = CloudConfigurationManager.GetSetting("QueueName");
        yield return new ServiceInstanceListener(context => new ServiceBusQueueCommunicationListener(
            new Handler(this)
            , context			
            , serviceBusQueueName), "StatelessService-ServiceBusQueueListener");
    }
}

To create a Stateless Service that can be accessed through a Service Bus Subscription:

internal sealed class SampleSubscriptionListeningStatelessService : StatelessService
{
    protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        // In the configuration file, define connection strings: 
        // "Microsoft.ServiceBus.ConnectionString.Receive"
        // and "Microsoft.ServiceBus.ConnectionString.Send"
            
        // Also, define Topic & Subscription Names:
        string serviceBusTopicName = CloudConfigurationManager.GetSetting("TopicName");
        string serviceBusSubscriptionName = CloudConfigurationManager.GetSetting("SubscriptionName");

        yield return new ServiceInstanceListener(context => new ServiceBusSubscriptionCommunicationListener(
            new Handler(this)
            , context
            , serviceBusTopicName
            , serviceBusSubscriptionName), "StatelessService-ServiceBusSubscriptionListener");
    }
}

Client Package

Make sure your projects are configured to build as 64 bit programs!

To communicate to a ServiceFabric Service through a Topic (Queues are also supported): (add the Microsoft.ServiceFabric.Services nuget package)

//the name of your application and the name of the Service, the default partition resolver and the topic name
//to create a communication client factory:
var uri = new Uri("fabric:/[ServiceFabric App]/[ServiceFabric Service]");
var resolver = ServicePartitionResolver.GetDefault();
string serviceBusTopicName = CloudConfigurationManager.GetSetting("TopicName");
var factory = new ServiceBusTopicCommunicationClientFactory(resolver, serviceBusTopicName);

//determine the partition and create a communication proxy
var partitionKey = new ServicePartitionKey(0L);
var servicePartitionClient = new ServicePartitionClient<ServiceBusTopicCommunicationClient>(factory, uri, partitionKey);

//use the proxy to send a message to the Service
servicePartitionClient.InvokeWithRetry(c => c.SendMessage(new BrokeredMessage()
{
    Properties =
    {
        { "TestKey", "TestValue" }
    }
}));

About

Communicate to Azure Service Fabric Services using Service Bus Queues and Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published