Try fast search NHibernate

19 November 2011

Azure queues: Producer

If you have read something about Azure’s queue you have probably seen some picture like this:

AzureQ_PC

A producer is anything (a class) that, in some moment, enqueue a message; more exactly this “anything” is something that take the role of message producer. For example it can be a MVC controller, more usually it can be an application-service or a domain-event but it can be even a view.

To simplify the work with messages, to our team, I have created two very simple classes on top of Microsoft.WindowsAzure.StorageClient. The main target of these two classes is establish some conventions and make the message strongly typed (it represent a limitation of the capability of Azure’s queues but for our works it is ok).

The basic

Giving a message like this:

public class NewsViewed
{
    public int CountryId { get; set; }
    public Guid NewsId { get; set; }
    public Uri Referrer { get; set; }
}

I would enqueue a new message with a line like this:

new MessageQueue<NewsViewed>().Enqueue(new NewsViewed
{
    CountryId = args.CountryId,
    NewsId = args.NewsId,
    Referrer = args.Referrer
});

When you want work with a message you will probably need some more information than the pure message data so a message in a queue is represented by:

public class QueueMessage<TMessage>
{
    public string Id { get; internal set; }
    public string PopReceipt { get; internal set; }
    public DateTime? InsertionTime { get; internal set; }
    public DateTime? ExpirationTime { get; internal set; }
    public DateTime? NextVisibleTime { get; internal set; }
    public int DequeueCount { get; internal set; }
    public TMessage Data { get; internal set; }
}

The conventions

The base convention is about the name of the queue which is the name of the class representing the message. For the example above the name of the queue is: newsviewed. The second convention is about the format of the message-content: ours Azure-queues will contains just strings, more exactly the JSON serialization of the class representing the message.

The MessageQueue implementation

Currently the max message size is 8KB, perhaps, hopefully NO, we will need more space so the MessageQueue<TMessage> class have to be extensible to allow a gzip/de-gzip of the content or it needs take the real content from a blob… so far it is far away of our needs and I hope we will never need it.

The implementation is:

/// <summary>
/// Generic base class for messages.
/// </summary>
/// <typeparam name="TMessage">The type of the message.</typeparam>
/// <remarks>
/// The <typeparamref name="TMessage"/> have to be JSON serializable.
/// </remarks>
public class MessageQueue<TMessage> where TMessage : class
{
    private const int MaxMessageBlockAllowedByAzure = 32;
    private readonly CloudQueueClient queueClient;
    private readonly string queueName = typeof (TMessage).Name.ToLowerInvariant();

    public MessageQueue() : this(AzureAccount.DefaultAccount()) {}

    public MessageQueue(CloudStorageAccount account)
    {
        if (account == null)
        {
            throw new ArgumentNullException("account");
        }
        queueClient = account.CreateCloudQueueClient();
    }

    public int ApproximateMessageCount
    {
        get
        {
            CloudQueue queueRef = queueClient.GetQueueReference(queueName);
            queueRef.RetrieveApproximateMessageCount();
            if (queueRef.ApproximateMessageCount.HasValue)
            {
                return queueRef.ApproximateMessageCount.Value;
            }
            return 0;
        }
    }

    public void Enqueue(TMessage messageContent)
    {
        if (messageContent == null)
        {
            throw new ArgumentNullException("messageContent");
        }
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        var message = new CloudQueueMessage(SerializeObjectAsString(messageContent));
        queueRef.AddMessage(message);
    }

    public QueueMessage<TMessage> Dequeue()
    {
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        CloudQueueMessage message = queueRef.GetMessage();
        return ConvertToQueueMessage(message);
    }

    public IEnumerable<QueueMessage<TMessage>> Dequeue(int messagesCount)
    {
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        IEnumerable<CloudQueueMessage> messages = queueRef.GetMessages(messagesCount < MaxMessageBlockAllowedByAzure ? messagesCount : MaxMessageBlockAllowedByAzure);
        return messages.Select(x => ConvertToQueueMessage(x));
    }

    public QueueMessage<TMessage> Dequeue(TimeSpan timeout)
    {
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        CloudQueueMessage message = queueRef.GetMessage(timeout);
        return ConvertToQueueMessage(message);
    }

    public IEnumerable<QueueMessage<TMessage>> Dequeue(int messagesCount, TimeSpan timeout)
    {
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        IEnumerable<CloudQueueMessage> messages = queueRef.GetMessages(messagesCount < MaxMessageBlockAllowedByAzure ? messagesCount:MaxMessageBlockAllowedByAzure, timeout);
        return messages.Select(x => ConvertToQueueMessage(x));
    }

    public QueueMessage<TMessage> Peek()
    {
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        CloudQueueMessage message = queueRef.PeekMessage();
        return ConvertToQueueMessage(message);
    }

    public IEnumerable<QueueMessage<TMessage>> Peek(int messagesCount)
    {
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        IEnumerable<CloudQueueMessage> messages = queueRef.PeekMessages(messagesCount < MaxMessageBlockAllowedByAzure ? messagesCount : MaxMessageBlockAllowedByAzure);
        return messages.Select(x => ConvertToQueueMessage(x));
    }

    public void Remove(QueueMessage<TMessage> queueMessage)
    {
        if (queueMessage == null)
        {
            throw new ArgumentNullException("queueMessage");
        }
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        queueRef.DeleteMessage(queueMessage.Id, queueMessage.PopReceipt);
    }

    public void Remove(IEnumerable<QueueMessage<TMessage>> queueMessages)
    {
        if (queueMessages == null)
        {
            throw new ArgumentNullException("queueMessages");
        }
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        foreach (var queueMessage in queueMessages)
        {
            queueRef.DeleteMessage(queueMessage.Id, queueMessage.PopReceipt);
        }
    }

    protected virtual string SerializeObjectAsString(TMessage messageContent)
    {
        // a subclass can gzipr the message (GZipStream) where the serialized TMessage is > 8KB
        return JsonConvert.SerializeObject(messageContent);
    }

    protected virtual TMessage DeserializeObjectFromString(string messageContent)
    {
        // a subclass can de-gzip the message
        return JsonConvert.DeserializeObject<TMessage>(messageContent);
    }

    protected virtual QueueMessage<TMessage> ConvertToQueueMessage(CloudQueueMessage message)
    {
        if (message == null)
        {
            return null;
        }
        string messageContent = message.AsString;
        return new QueueMessage<TMessage>
               {
                   Id = message.Id,
                   PopReceipt = message.PopReceipt,
                   DequeueCount = message.DequeueCount,
                   InsertionTime = message.InsertionTime,
                   ExpirationTime = message.ExpirationTime,
                   NextVisibleTime = message.NextVisibleTime,
                   Data = DeserializeObjectFromString(messageContent)
               };
    }
}

as you can see it is basically a wrapper.

The AzureAccount class, present in the parameters-less constructor, is our static class to access all our storages accounts.

The JsonConverter is the class of the famous Newtonsoft.Json.

The more simple step is done, the next will be about the three parts of the consumer.

3 comments:

  1. This comment has been removed by the author.

    ReplyDelete
  2. Hi Fabio, thanks for the post. About the limit on the size of queue messages, for what I read here http://msdn.microsoft.com/en-us/library/windowsazure/gg441573.aspx, on the November release it has been increased to 64K. Looking forward to reading the next post!

    ReplyDelete
  3. Fabio, just want to say thanks for all the great work with NHibernate!

    ReplyDelete