Try fast search NHibernate

16 January 2017

Azure queues by convention

El codigo que voy a mostrar hoy viene corriendo, con pocas variaciones a lo largo del tiempo, desde el 2009 cuando Azure era CTP así que somos ya varios que lo tenemos a mano (algo había compartido en el medio del camino).

Las interfaces

Como siempre es buena norma, por lo menos cuando se trata de servicios, empezar por las interfaces; más aún si terminan involucrados en varios proyectos que trabajan con Azure y/o onPremises o son hibridos en fase de metamorfosis (de monolitica oruga come recursos a espledida mariposa que vuela en las nubes).
La primer interfaz es la que uso normalmente para inicializar culquier cosa que tenga a que ver con lo storage (queues, blobs, tables, servicebus etc.):
  public interface IStorageInitializer
  {
    void Initialize();
    void Drop();
  }
Ya que estamos con queues vamos a tener un par de bichos más:
  public interface IEnqueuer<in TMessage> where TMessage : class
  {
    Task Enqueue(TMessage message);
  }

  public interface IDequeuer<out TMessage> where TMessage : class
  {
    TMessage Dequeue(int timeoutMilliseconds);
  }
En realidad en la mayoría de los casos, dificilmente uso el IDequeuer ya que quien usamos el WebJob SDK aprovechamos todo el sistema de polling que viene OOTB (out-of-the-box).

El template de VisualStudio “Azure WebJob” instiga a usar una clase static con una serie de metodos para consumir queues. Personalmente ese enjendro me gusta muy poco aunque entiendo el esfuerzo de hacer parecer siempre “las cosas” muy simples. Yo prefiero definir clases de Consumers de queues que puedo testear, inyectar, registrar y distribuir en varios workers según la evolución de la app lo necesite en su historia. Por ese motivo esta inteface:
  public interface IQueueMessageConsumer<in TMessage> where TMessage : class
  {
    Task ProcessMessage(TMessage message);
  }

 Convention over configuration

En lugar de lidiar con decenas de parametros de configuración, o las notorias “magic strings”, prefiero una simple convención para nombrar las queues.
  public class QueueNaming
  {
    private static readonly string[] queueMessageTypePostfixSequence = { "Message", "Event" };

    public static string AsQueueStorageName<TMessage>() where TMessage: class
    {
      return AsQueueStorageName(typeof(TMessage));
    }

    public static string AsQueueStorageName(Type messageType)
    {
      var typeName = messageType.Name;
      return ConvertToTableName(queueMessageTypePostfixSequence
        .Select(x => TrucateQueueName(x, typeName))
        .FirstOrDefault(x => x != null) ?? typeName);
    }

    private static string TrucateQueueName(string postfix, string typeName)
    {
      return typeName.EndsWith(postfix) ? typeName.Substring(0, typeName.Length - postfix.Length) : null;
    }

    public static string ConvertToTableName(string candidateName)
    {
      if (string.IsNullOrWhiteSpace(candidateName))
      {
        return null;
      }
      /* QueueName 3-63 lowercase alphanumeric and dash */
      return new string(candidateName.Where(x=> char.IsLetterOrDigit(x) || x == '-').Select(char.ToLowerInvariant).ToArray());
    }
  }
En practica uso el nombre de la clase (purgado si necesario) que representa el mensaje para nombrar la queue. Las limitaciones actuales de los nombres para algunos de los servicios de Azure Storage las pueden encontrar resumidas en este post : https://blogs.msdn.microsoft.com/jmstall/2014/06/12/azure-storage-naming-rules/

Implementaciones para Queue Storage

Vamos con el initializer que, en el caso de las queues, uso en el startup de la app
  /// <summary>
  ///   Initialize a queue storage specific for a message type.
  /// </summary>
  /// <typeparam name="TMessage">The type of the message</typeparam>
  public class QueueStorageInitializer<TMessage> : IStorageInitializer where TMessage : class
  {
    private readonly CloudStorageAccount account;
    private readonly string queueName = GetName();

    public QueueStorageInitializer(CloudStorageAccount account)
    {
      if (account == null)
      {
        throw new ArgumentNullException(nameof(account));
      }
      this.account = account;
    }

    public void Initialize()
    {
      var queueClient = account.CreateCloudQueueClient();
      var queue = queueClient.GetQueueReference(queueName);
      queue.CreateIfNotExists();
    }

    public void Drop()
    {
      var queueClient = account.CreateCloudQueueClient();
      var queue = queueClient.GetQueueReference(queueName);
      queue.DeleteIfExists();
    }

    public static string GetName() => QueueNaming.AsQueueStorageName<TMessage>();
  }
que uso de esta forma en la app Web y/o en los workers
      new QueueStorageInitializer<MyEntityChangedEvent>(account).Initialize();
y el Enqueuer
  public class QueueMessageEnqueuer<TMessage> : IEnqueuer<TMessage> where TMessage : class
  {
    private readonly CloudStorageAccount account;
    private readonly string queueName = QueueNaming.AsQueueStorageName<TMessage>();

    public QueueMessageEnqueuer(CloudStorageAccount account)
    {
      if (account == null)
      {
        throw new ArgumentNullException(nameof(account));
      }
      this.account = account;
    }

    public Task Enqueue(TMessage message)
    {
      if (message == null)
      {
        throw new ArgumentNullException(nameof(message));
      }
      var queueClient = account.CreateCloudQueueClient();
      var queueRef = queueClient.GetQueueReference(queueName);
      var serializedMessage = JsonConvert.SerializeObject(message);
      var qmessage = new CloudQueueMessage(serializedMessage);
      return queueRef.AddMessageAsync(qmessage);
    }
  }

 Implementaciones para Azure ServiceBus

El initializer
  /// <summary>
  ///   Initialize a servicebus queue specific for a message type.
  /// </summary>
  /// <typeparam name="TMessage">The type of the message</typeparam>
  public class ServiceBusQueueInitializer<TMessage> : IStorageInitializer where TMessage : class
  {
    private readonly string queueName = GetName();
    private readonly string serviceBusConnectionString;

    public ServiceBusQueueInitializer(string serviceBusConnectionString)
    {
      if (string.IsNullOrWhiteSpace(serviceBusConnectionString))
      {
        throw new ArgumentNullException(nameof(serviceBusConnectionString));
      }
      this.serviceBusConnectionString = serviceBusConnectionString;
    }

    public void Initialize()
    {
      var namespaceManager = NamespaceManager.CreateFromConnectionString(serviceBusConnectionString);
      if (!namespaceManager.QueueExists(queueName))
      {
        namespaceManager.CreateQueue(queueName);
      }
    }

    public void Drop()
    {
      var namespaceManager = NamespaceManager.CreateFromConnectionString(serviceBusConnectionString);
      if (namespaceManager.QueueExists(queueName))
      {
        namespaceManager.DeleteQueue(queueName);
      }
    }

    public static string GetName() => QueueNaming.AsQueueStorageName<TMessage>();
  }
y el Enqueuer
  public class ServiceBusEnqueuer<TMessage> : IEnqueuer<TMessage> where TMessage : class
  {
    private readonly string queueName = QueueNaming.AsQueueStorageName<TMessage>();
    private readonly string serviceBusConnectionString;

    public ServiceBusEnqueuer(string serviceBusConnectionString)
    {
      if (string.IsNullOrWhiteSpace(serviceBusConnectionString))
      {
        throw new ArgumentNullException(nameof(serviceBusConnectionString));
      }
      this.serviceBusConnectionString = serviceBusConnectionString;
    }

    public async Task Enqueue(TMessage message)
    {
      var client = QueueClient.CreateFromConnectionString(serviceBusConnectionString, queueName);
      var serializedObject = SerializeObjectAsString(message);
      using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(serializedObject), false))
      {
        var bmessage = new BrokeredMessage(stream)
        {
          ContentType = "application/json"
        };
        await client.SendAsync(bmessage);
      }
    }

    protected virtual string SerializeObjectAsString(TMessage message)
    {
      return JsonConvert.SerializeObject(message);
    }
  }

 May the Force be with me

Si este año no caigo en lado oscuro de la fuerza voy a seguir agregando codigo a este proyecto (GIT) con otras utilidades y, tal vez, ejemplos para Azure.

1 comment: