top of page

Azure Service Bus

Microsoft Azure's Service Bus is a cloud service that helps provides the ability to share data among decoupled systems. In this article, we will learn how to leverage Azure's Service Bus with Brokered Messaging to distribute data among systems. However, if you are at all familiar with Azure services that provide support for distributed systems, you'll know that Service Bus is not the only service of its type.

Azure's Queue Storage service also provides similar functionality and provides the ability to share data among distributed systems. So what queue service is right for you? These questions as well as the following topics are all areas we will be covering.

  • Service Buses

  • Service Bus vs. Queue Storage

  • Brokered vs. Relay Messaging

  • Queues vs. Topics and Subscriptions

  • Getting Started with the Building Blocks

  • Asynchronous Approach

  • Service Bus Queues

  • Sending Messages

  • What Kind of Message are You Sending

  • Receiving Messages

  • Topics and Subscriptions

  • Dead Lettering

  • Auto Forwarding

  • Transactions

  • Batch Processing

  • Security

  • Best Practices


Service Buses Simply put, Service Bus is the second message queuing platform built by Azure that provides Relay and Brokered Messaging capabilities. It is feature-rich and a matured service that can provide a way for decoupled systems to exchange information independently. Azure's Service Bus is one of the many Platform As A Service (PaaS) services and can be as simple as a single queue or highly complex message workflow with a near-infinite number of interrelated queues, topics and subscriptions. Service Bus vs. Queue Storage As I have already pointed out, Service Bus is not the only message queue service Azure offers nor was it the first. But there are significant differentiating features between Service bus and Queue Storage, Azure's first message queuing service. We will be diving deeper into the features of Service Bus with the Brokered Messaging offer, but it's important that you are aware of when to use which service. Microsoft has provided a comparisons and contrasts document to help you make that decision. However, with the limited reasons for using the Queue Storage service, we can quickly summarize when Queue Storage would be the best chose. If you are in need of the least complex approach and your application meets the following needs, Queue Storage would be the best choice:

  • Needs to retain more than 80 GB of data in queue

  • Message time to live less than 7 days

  • The ability to track message processing within a queue*

As you can see, the message storage size and time to live are the two key differentiating points of an Azure Storage Queue service. Naturally, the data retention aspect of the Storage Queue service is due to the underlying Storage service. So, unless one of these points is a critical requirement, the Service Bus service will be a more feature-rich and versatile option.

However, within the Service Bus service, there are more than one messaging capabilities. This article is focused on Service Bus with Brokered Messaging. But, Brokered Messaging is not the only messaging capability that Service Bus offers. Relay is another option that you will read about when investigating Azure's Service bus, so let's take a quick moment to distinguish the two. Brokered vs. Relay Messaging So far we have only mentioned Brokered Messaging with Azure Service Bus. But this is not the only messaging capability provided by Service bus. Instead of the pattern of queuing messages that we have been alluding to so far, Relay Messaging provides the ability to “bounce” a message off of a service to an connected receiver. It requires that the receiver expecting the message is online and available. A strong point of Relay Messaging is the ability to expose the service's endpoint without the typical network firewall and infrastructure configuration hoop-jumping to make it available to external clients. However, durability is not as much of a strong point of Relay Messaging as it is with Brokered Messaging. Brokered Messaging supports the scenario of truly temporal decoupled systems where the availability of either the message producer or consumer is not guaranteed. Therefore, messages that are not immediately delivered must live somewhere and that is where the “broker” comes into play. With Brokered Messaging, the queue is the broker that retains a message created by a producer and where the consumer can retrieve the message when ready. So whereas the exposure of service endpoints is one of the strong points of Relay Messaging and queues provide the durability of Brokered Messaging, queues exist in more than one flavor. Therefore, we need to look at the various queue options before we get to the implementation of Service Bus Brokered Messaging. Queues vs. Topics and Subscriptions This can be confusing for someone just getting an introduction to Service Bus Brokered Messaging, so I want to try and make this as clear as possible. First, don't lose sight of the fact that at the end of the day we are always talking about queues. A Service Bus Queue provides the simplest message delivery option. Messages in a Queue are organized by First In, First Out (FIFO) and each message is expected to be processed by a single consumer. I like to visualize Queues as a single tube where a message is fed into the tube and is consumed by a single consumer on the other end.


However, Topics and Subscriptions constitute a publish/subscribe pattern allowing the same message to be processed by N number of consumers. Subscription Rules and features like Auto-Forwarding allow for a tree like visualization of how this process works.

We will be getting into the details of the preceding, but at a high level, a single message can be added to a topic and for every subscription rule that is satisfied, a copy of the message will be added to that subscription. In this case, each subscription becomes the queue, where consumers can process the messages on a subscription individually. Getting Started with the Building Blocks The end result that we are aiming for is to produce messages that consumers can consume. But to get there, we need to start with the building blocks. One of the main building blocks that you will use for a bulk of your direct and indirect interactions with Service Bus is the NamespaceManager object. The NamespaceManager facilitates the ability to manage and create core entities such as queues, topics and subscriptions using factory methods just to name a few. However, NamespaceManager has a couple of dependencies that we need to provide, so let's discuss those first. Those dependencies that we need to materialize are the following:

  • Service Bus Namespace (used by the Service Bus Service URI)

  • Token Provider

  • Service Bus Service URI

A service bus namespace is exactly what it sounds like and ultimately defines our own personal namespace within our Service Bus service. This first dependency will require being created from within the Azure portal under “Service Bus”.

You'll need to specify the namespace (make note of this) in which your service bus service endpoint will be known and also the information such as the region where it will be hosted. Finally, there is the important note of the Basic vs. Standard Messaging Tier. For an understanding of each you can check out this Azure documentation and can impact the ability to host Topics and Subscriptions. With the namespace created, we can focus on the last two dependencies, Token Provider and Service Bus Service Uri. Token Providers provide the authentication mechanism that the NamespaceManager will use. There are a few out-of-the-box providers that can be used and we will be looking at some of these in more details when we get to security. For now we will use the Shared Access Signature Token Provider and specify the default Shared Access Signature (SAS) policy name and key. These two pieces of information can be found under the “Configure” section of your newly created Service Bus Namespace in the Azure Portal. With the policy name and key we can compose a Token Provider as in the following:

TokenProvider tokenProvider =  
TokenProvider.CreateSharedAccessSignatureTokenProvider(AccountInfo.PolicyName,  
AccountInfo.Key); 


NOTE: AccountInfo simply supplies the proper Shared Access Signature policy name and key. The Service Bus Uri provides the NamespaceManager the service endpoint that operations will be operating against. This is where our earlier created namespace will be utilized. A service endpoint such as mynamespace.servicebus.windows.net can be created using one of the static methods from the ServiceBusEnvironment class and specifying our namespace and the protocol.

//”sb” defines the scheme of the service Uri 
Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri("sb", "shopit", string.Empty); 

With the Visual Studio Azure Tools, many of the Azure Service Bus project templates use the Azure Service Bus Configuration NuGet package. But, understanding these underlying dependencies will help facilitate your own custom implementation when needed. Creating the Namespace Manager With the dependencies in hand, we can now generate our NamespaceManager as in the following:

NamespaceManager namespaceManger = new NamespaceManager(serviceBusUri, tokenProvider); 

Optionally, there is also an override that will allow us to specify a NamespaceManagerSettings object that allows us to further specify additional specifications such as operation timeout periods as well as a retry policy (in other words exponential retry policy or a custom retry policy).

NamespaceManagerSettings settings = new NamespaceManagerSettings {OperationTimeout = new TimeSpan(0, 1, 0), TokenProvider = tokenProvider};  
NamespaceManager namespaceManger = new NamespaceManager(serviceBusUri, settings); 

I discussed implementing a custom retry policy to track when retries actually occur in this article that utilizes the Enterprise Transient Fault Application Block. We'll look at Retry policies later. Asynchronous Approach One of the highly referenced best practices is to utilize an asynchronous development approach to your Service Bus operations. This can easily be done by taking advantage of the asynchronous methods provided to you for a majority of your operations. You will find that the library provides you 2 different asynchronous patterns to choose from that include the Asynchronous Programming Model or the Task-Based Asynchronous Pattern. We'll talk more about this when we get to Best Practices. Try to remain true to the best practices as well since the fact that most demonstrations are done synchronously, here most examples will be delivered asynchronously (pun intended)! In order to intercept and handle thrown exceptions by asynchronous operations, you would at a minimum, wrap your calls in a Try/Catch. However, unless demonstrating error handling, we will exclude that code to focus in on the demonstrated operation. Service Bus Queues With our NamespaceManager created we have laid the foundation for the bulk of operations we will look at regarding Service Bus Brokered Messaging. I have already explained the two message delivery options that Service Bus offers and we will start off with the simplest route by looking at utilizing Service Bus Queues. Though Queues don't support message filtering that makes Topics and Subscriptions so unique, they do support sessions, duplication detection, message deferral, dead lettering and message expiry just to name a few. Even though many of these features are also supported by Topic and Subscriptions, I have decided to review both Queues and Topics/Subscriptions separately. Creating Queues Creating a queue is the first step to allowing us to store and retrieve messages. Queue names are always created in lowercase despite what casing you provide for your queue name.

QueueDescription queueDescription = await namespaceManager.CreateQueueAsync(queueName); 

To demonstrate a few other utility methods, we can first attempt to verify if the queue already exists and return the existing queue information if it does:

QueueDescription newQueueDescription = null;  
if (!await _namespaceManager.QueueExistsAsync(queueName))  
{  
    newQueueDescription = await _namespaceManager.CreateQueueAsync(queueName);  
}  
 
QueueDescription queueDescription = newQueueDescription ?? await _namespaceManager.GetQueueAsync(queueName); 

CreateQueueAsync override takes a QueueDescription object that allows you to specify values outside of the defaults that a queue is constructed with. I was reluctant to discuss the many different queue properties this early on, but since a number of them can't be updated after the creation of the queue, we might as well discuss them now. Queue Properties Queues contain a number of properties that directly affect the queue whereas others might pertain to the Brokered Messages it would host. Not all properties can be updated or changed after a queue is created, so if an update is required then be mindful of these restrictions. The following list is not an exhaustive list, but are some of the more common properties that you will find you are interested in:

  1. DefaultMessageTimeToLive: This is an interesting one. This dictates the time to live of a message in two scenarios 1) if the message does not have its direct TimeToLive property value set or 2) when the messages TimeToLive is greater than the queue's DefaultMessageTimeToLive property. However, if the message's TimeToLive value is lower, the message's TimeToLive will be the time at which the message will expire.

  2. RequiresDuplicationDetection: Allows you to turn on message duplication detection. This works in conjunction with the DuplicationDetectionHistoryTimeWindow. This is one of those values that must be set at creation and cannot be updated afterwards.

  3. DuplicateDetectionHistoryTimeWindow: You can specify a time period that the queue will retain message IDs to carry out message duplication detection. The time can be no greater than the maximum time a message can live on a queue that is 7 days. Be aware that there is an overhead with this feature. A message ID that must be retained by the queue has an overhead of 64 bytes. Therefore, a simple example of a queue that processes 1000 messages would require at least 64,000 bytes of overhead. This might not sound like much, but crunch the numbers if you were sending 30 messages a second and attempted to retain the DuplicateDetectionHistoryTimeWindow for the maximum of 7 days. The overhead would be ~1GB.

  4. EnableDeadLetteringOnMessageExpiration: Allow your messages to be moved to the deadletter queue when either the queue's DefaultMessageTimeToLive or the messages TimeToLive is reached first and the message expires.

  5. LockDuration: When we get to message handling, we'll talk more about PeekLock, but this is the setting that sets how long a message is locked for handling before it is released back onto the queue.

  6. MaxSizeInMegabytes: The total size of the queue. The default is 1GB. This is adversely affected by the overhead produced by the DuplicationDetectionHistoryTimeWindow.

  7. AutoDeleteOnIdle: This is actually a timespan that denotes how long a queue can stay alive once it is idle before it is automatically deleted. The minimum time is 5 minutes.

Update Queues Certain queue properties can be updated after its creation, whereas others cannot. In the following we can see how to utilize a QueueDescription object to update certain properties of an existing queue:

QueueDescription newQueueDescription = new QueueDescription("TestOperationQueue")  
{  
    DefaultMessageTimeToLive = TimeSpan.FromDays(3),  
    AutoDeleteOnIdle = TimeSpan.FromHours(1),  
    MaxSizeInMegabytes = 2048  
};  
 
QueueDescription  queueDescription = await _namespaceManager.UpdateQueueAsync(newQueueDescription ); 

Here we have updated the queue properties to not allow messages time-to-live to exceed 3 days, set the queue's own time-to-live when idle to 1 hour and the maximum queue size to 2GB. Deletion of a queue with AutoDeleteOnIdle does indeed delete even with messages in the queue. Read: a queue idle time starts on the last enqueuing or dequeuing of messages. Delete Queues Delete does not throw an exception when the queue does not exist.

_namespaceManager.DeleteQueueAsync(queuePath); 

We can set the AutoDeleteOnIdle queue property to auto delete itself after it has gone idle and the set time period has expired. Sending Messages Now that our queue is in place, we can exercise sending messages to the queue. In order for that to be possible we need to generate a facilitator for sending messages. When discussing strictly Service Bus Queue's as we are, there are only 2 options (QueueClient and MessageSender). It is important to understand that the MessageSender is an abstraction for the QueueClient and should be utilized unless it is absolutely necessary to get to functionality specific to a QueueClient. We will see the benefits of utilizing the MessageSender over a QueueClient when we get to the subject of Topics. We can create a MessageSender from one of the factory methods of a MessagingFactory. However, the queue author is not always the same entity that sends messages to the queue. Therefore, our MessagingFactory also requires a proper service bus endpoint and token provider just as our NamespaceManager did. Earlier, when we generated our NamespaceManager we could have created it with a connection string instead of manually generating the two dependencies Service Bus URI and Token Provider. Therefore, for demonstration purposes, we'll look at generating a MessageFactory using a connection string that we can acquire from our Azure portal. You can acquire your service bus namespace's Connection String by clicking on “Connection Information” at the bottom of the Azure portal after you have selected the “Service Bus” service icon.

The connection string is parsed to supply the Service Bus URI (endpoint) and Token Provider (shared access signature) for the MessageFactory when we utilize the FromConnectionString method as in the following:

MessagingFactory factory = MessagingFactory.CreateFromConnectionString(AccountInfo.ConnectionString);  
MessageSender messageSender = factory.CreateMessageSender(QueueName); 

With a MessageSender we are prepared to send messages as in the following:

//Some object 
SingingTelegram singingTelegram = new SingingTelegram  
{  
    RecipientFirstName = "Elvis",  
    RecipientLastName = "Presley",  
    SongName = "Won't you come back again?" 
};  
 
BrokeredMessage message = new BrokeredMessage(singingTelegram);  
 
await messageSender.SendAsync(message); 


But hold on! What Kind of Message Are You Sending? Before we can exercise the “distribute” in Distributed System and start flinging messages at our queue, we need to have an understanding of what constitutes a message. In our case, its simple. We're sending a BrokeredMessage. But there is much to understand about a BrokeredMessage that can be broken down to characteristics and features. Many of the features will be discussed under specific areas, but for now I wanted to stop and discuss some of the characteristics you need to be aware of when it comes to BrokeredMessages. First off, a message at its heart is made up of a body and properties. The total maximum size of a message is 256 KB. The maximum size of all properties is 64 KB. The maximum size of the body is the remainder of the maximum size of the message and the current size of the properties. There are ways to increase the maximum size (in other words sessions) but we're not discussing that at the moment. Due to certain protocol limitations such as HTTP header sizes, it is recommended that you keep your custom properties between 2 and 4 KB. Body The body is simply any serializable object and would constitute the payload of the message. Whether you need to use the body of the message is up to you. You will see that there are a number of different BrokeredMessage constructors. These are generally all setting the body of the message as in:

BrokeredMessage msg1 = new BrokeredMessage("My Message Body");  
BrokeredMessage msg2 = new BrokeredMessage(new SingingTelegram()); 


Properties Properties are made up of two parts, system and custom properties. The custom properties are simply a key/value pair collection that can be any string / object. They allow us to transport specific custom properties at the header level of our message. However, custom properties really are used when we get to the discussion of subscription rules when we get to Topics and Subscriptions. Another way to think about message properties in general is when discussing a message flowing among various protocols (HTTP/S, AMQP, SMB). A message originating from HTTP/S request would end up mapping request headers to message properties. When we want to promote data to the header level of a message we can utilize the custom Properties property as easily as:

BrokeredMessage message = new BrokeredMessage(singingTelegram);  
 
foreach (object info in importantInformation)  
{  
    message.Properties.Add(info.Key, info.Value);  
} 


There are a considerable amount of important system properties on a BrokeredMessage that you can adjust. Many of these we'll touch on in specific features. However, a few worth mentioning are:

  1. ScheduledEnqueueTimeUtc: If you wanted to delay the visibility of a message on a queue, you can set a future UTC time in which the message will be unavailable until the specified UTC time.

  2. ExpiresAtUtc: If set, will be a specific UTC time in which the message will expire.

  3. TimeToLive: Similar to ExpiresAtUtc, this is an amount of time that must pass after the message has been enqueued that the message will expire once. Both this property and the ExpiresAtUtc directly affect when a message is expired, one at a set time, while the other after an amount of time.

Again, this is not an exhaustive list and it would benefit you to be aware of some of the other influential properties such as how MessageId is used for message duplication, or optional properties such as Label and ContentType can be used. Receiving Messages Namespace established. Check. Queue created. Check. Messages sent. Check. Obviously the last piece of this simple scenario is the other aspect of a distributed system doing something with the messages we have enqueued. As with a MessageSender we also want to use the MessageReceiver abstraction for getting messages. To generate a MessageReceiver, we would need a MessagingFactory and you can refer back to Sending Messages for reference.

MessageReceiver messageReceiver =  
await _messagingFactory.CreateMessageReceiverAsync(queueName); 

From here we can start to receive messages. We can utilize the Timespan parameter of the ReceiveAsync to specify how long to poll before giving up. In addition, the following example will do a continuous loop until the cancelation token is canceled.

while (!cancellationToken.IsCancellationRequested)  
{  
 //Wait up to 1 minute for a message 
    var msg = await messageReceiver.ReceiveAsync(TimeSpan.FromMinutes(1)).ConfigureAwait(false);  
    await ProcessAndReleaseMessageAsync(msg);  
    await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);  
}  
 
private async Task ProcessAndReleaseMessageAsync(BrokeredMessage message)  
{  
    MessageProcessingAction action = MessageProcessingAction.Abandon;  
 try 
    {  
 //Process message 
        action = MessageProcessingAction.Complete;  
        await UpdateMessageState(message, action);  
    }  
 catch (Exception ex)  
    {  
 //log 
    }  
 finally 
    {  
 //C# 6.0 allows await calls in a finally blocks 
        UpdateMessageState(message, action);  
    }  
}  
 
private async Task UpdateMessageState(BrokeredMessage message, MessageProcessingAction action)  
{  
 switch (action)  
    {  
 case MessageProcessingAction.Complete:  
            await message.CompleteAsync();  
 break;  
 case MessageProcessingAction.Abandon:  
            await message.AbandonAsync();  
 break;  
 case MessageProcessingAction.Deadletter:  
            await message.DeadLetterAsync();  
 break;  
 default:  
            await message.AbandonAsync();  
 break;  
    }  
} 

Simply put, we wait up to 1 minute for a message to be pulled from the queue. Upon successfully pulling a message, the message is processed and then we explicitly notify the Azure Service Bus with the state of the message by calling CompleteAsync or AbandonAsync. This is because of a default mode of the Queue where messages are received in a PeekLock mode. PeekLock specifies that a message is only temporarily checked out for some duration of time to be processed. Unless the message is checked in as complete will it be removed from the queue? There are other consequences that would directly affect a message being removed from the queue such as the time to live setting of the queue or message. But as far as message handling by our receiver is concerned, we will need to complete, abandon or dead-letter the message. We'll talk more about the dead-letter queue and moving messages to there later. As far as the example above, there are a couple of points. Obviously, a switch statement could be more elegantly refactored, but would also obscure what I wanted to demonstrate about updating the message state so that the queue can properly handle it. Whether that means leaving it on the queue in the case where our attempt to process it fails, or setting it to be complete so the queue removes it. PeekLock is not the only mode that messages can be received. Let's look at message receive modes as well as other Queue behaviors and features I have withheld up until now. Dealing with Queues, Again! I have intentionally attempted to not over-complicate queues with all the many features. But before closing out Part 1 on Service Busses, I did want to return to talk about a few important points that are worth noting. As I already said, there are 2 modes of message retrieval, PeekLock and ReceiveAndDelete (technically there is a third, Session + Peek, but not in the scope of this discussion). We have already discussed PeekLock, so RecieveAndDelete is a less durable option as it removes the message from the queue when it is retrieved. Therefore, as you might expect, any exception in your message processing that you have not accounted for can cause a loss of that message. We can change this when creating our MessageReceiver:

MessageReceiver messageReceiver =  
await _messagingFactory.CreateMessageReceiverAsync(queueName, ReceiveMode.ReceiveAndDelete); 

As mentioned under the queue properties, we can set the LockDuration of a message when received under the PeekLock mode that dictates how long a message can be retrained by a receiver before being released back on the queue to be processed. The default is 1 minute., but can be set to a maximum of 5 minutes. When a message's LockUntilUtc property shows it is about to expire, we can request a renewal of the message lock as in the following:

await msg.RenewLockAsync(); 

Queues have two modes of delivery, Pull and Forward. So far we have seen pull when our MessageReceiver pulls on request to the queue to receive a message. But we can forward messages by setting the ForwardTo property of our queue so that any messages delivered will automatically be forwarded onto another queue.

QueueDescription queueDescription = await namespaceManager.CreateQueueAsync(new QueueDescription(QueueName)  
{  
    ForwardTo = destinationQueueName  
}); 


Also Read :




Source: C# Corner

0 comments

Comments


bottom of page