Las interfaces
Como siempre es buena norma, por lo menos cuando se trata de servicios, empezar por las interfaces; más aún si terminan involucrados en varios proyectos que trabajan con Azure y/o onPremises o son hibridos en fase de metamorfosis (de monolitica oruga come recursos a espledida mariposa que vuela en las nubes).La primer interfaz es la que uso normalmente para inicializar culquier cosa que tenga a que ver con lo storage (queues, blobs, tables, servicebus etc.):
public interface IStorageInitializer
{
void Initialize();
void Drop();
}
{
void Initialize();
void Drop();
}
public interface IEnqueuer<in TMessage> where TMessage : class
{
Task Enqueue(TMessage message);
}
public interface IDequeuer<out TMessage> where TMessage : class
{
TMessage Dequeue(int timeoutMilliseconds);
}
{
Task Enqueue(TMessage message);
}
public interface IDequeuer<out TMessage> where TMessage : class
{
TMessage Dequeue(int timeoutMilliseconds);
}
El template de VisualStudio “Azure WebJob” instiga a usar una clase static con una serie de metodos para consumir queues. Personalmente ese enjendro me gusta muy poco aunque entiendo el esfuerzo de hacer parecer siempre “las cosas” muy simples. Yo prefiero definir clases de Consumers de queues que puedo testear, inyectar, registrar y distribuir en varios workers según la evolución de la app lo necesite en su historia. Por ese motivo esta inteface:
public interface IQueueMessageConsumer<in TMessage> where TMessage : class
{
Task ProcessMessage(TMessage message);
}
{
Task ProcessMessage(TMessage message);
}
Convention over configuration
En lugar de lidiar con decenas de parametros de configuración, o las notorias “magic strings”, prefiero una simple convención para nombrar las queues.
public class QueueNaming
{
private static readonly string[] queueMessageTypePostfixSequence = { "Message", "Event" };
public static string AsQueueStorageName<TMessage>() where TMessage: class
{
return AsQueueStorageName(typeof(TMessage));
}
public static string AsQueueStorageName(Type messageType)
{
var typeName = messageType.Name;
return ConvertToTableName(queueMessageTypePostfixSequence
.Select(x => TrucateQueueName(x, typeName))
.FirstOrDefault(x => x != null) ?? typeName);
}
private static string TrucateQueueName(string postfix, string typeName)
{
return typeName.EndsWith(postfix) ? typeName.Substring(0, typeName.Length - postfix.Length) : null;
}
public static string ConvertToTableName(string candidateName)
{
if (string.IsNullOrWhiteSpace(candidateName))
{
return null;
}
/* QueueName 3-63 lowercase alphanumeric and dash */
return new string(candidateName.Where(x=> char.IsLetterOrDigit(x) || x == '-').Select(char.ToLowerInvariant).ToArray());
}
}
{
private static readonly string[] queueMessageTypePostfixSequence = { "Message", "Event" };
public static string AsQueueStorageName<TMessage>() where TMessage: class
{
return AsQueueStorageName(typeof(TMessage));
}
public static string AsQueueStorageName(Type messageType)
{
var typeName = messageType.Name;
return ConvertToTableName(queueMessageTypePostfixSequence
.Select(x => TrucateQueueName(x, typeName))
.FirstOrDefault(x => x != null) ?? typeName);
}
private static string TrucateQueueName(string postfix, string typeName)
{
return typeName.EndsWith(postfix) ? typeName.Substring(0, typeName.Length - postfix.Length) : null;
}
public static string ConvertToTableName(string candidateName)
{
if (string.IsNullOrWhiteSpace(candidateName))
{
return null;
}
/* QueueName 3-63 lowercase alphanumeric and dash */
return new string(candidateName.Where(x=> char.IsLetterOrDigit(x) || x == '-').Select(char.ToLowerInvariant).ToArray());
}
}
Implementaciones para Queue Storage
Vamos con el initializer que, en el caso de las queues, uso en el startup de la app
/// <summary>
/// Initialize a queue storage specific for a message type.
/// </summary>
/// <typeparam name="TMessage">The type of the message</typeparam>
public class QueueStorageInitializer<TMessage> : IStorageInitializer where TMessage : class
{
private readonly CloudStorageAccount account;
private readonly string queueName = GetName();
public QueueStorageInitializer(CloudStorageAccount account)
{
if (account == null)
{
throw new ArgumentNullException(nameof(account));
}
this.account = account;
}
public void Initialize()
{
var queueClient = account.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference(queueName);
queue.CreateIfNotExists();
}
public void Drop()
{
var queueClient = account.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference(queueName);
queue.DeleteIfExists();
}
public static string GetName() => QueueNaming.AsQueueStorageName<TMessage>();
}
/// Initialize a queue storage specific for a message type.
/// </summary>
/// <typeparam name="TMessage">The type of the message</typeparam>
public class QueueStorageInitializer<TMessage> : IStorageInitializer where TMessage : class
{
private readonly CloudStorageAccount account;
private readonly string queueName = GetName();
public QueueStorageInitializer(CloudStorageAccount account)
{
if (account == null)
{
throw new ArgumentNullException(nameof(account));
}
this.account = account;
}
public void Initialize()
{
var queueClient = account.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference(queueName);
queue.CreateIfNotExists();
}
public void Drop()
{
var queueClient = account.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference(queueName);
queue.DeleteIfExists();
}
public static string GetName() => QueueNaming.AsQueueStorageName<TMessage>();
}
new QueueStorageInitializer<MyEntityChangedEvent>(account).Initialize();
y el Enqueuer
public class QueueMessageEnqueuer<TMessage> : IEnqueuer<TMessage> where TMessage : class
{
private readonly CloudStorageAccount account;
private readonly string queueName = QueueNaming.AsQueueStorageName<TMessage>();
public QueueMessageEnqueuer(CloudStorageAccount account)
{
if (account == null)
{
throw new ArgumentNullException(nameof(account));
}
this.account = account;
}
public Task Enqueue(TMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var queueClient = account.CreateCloudQueueClient();
var queueRef = queueClient.GetQueueReference(queueName);
var serializedMessage = JsonConvert.SerializeObject(message);
var qmessage = new CloudQueueMessage(serializedMessage);
return queueRef.AddMessageAsync(qmessage);
}
}
{
private readonly CloudStorageAccount account;
private readonly string queueName = QueueNaming.AsQueueStorageName<TMessage>();
public QueueMessageEnqueuer(CloudStorageAccount account)
{
if (account == null)
{
throw new ArgumentNullException(nameof(account));
}
this.account = account;
}
public Task Enqueue(TMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var queueClient = account.CreateCloudQueueClient();
var queueRef = queueClient.GetQueueReference(queueName);
var serializedMessage = JsonConvert.SerializeObject(message);
var qmessage = new CloudQueueMessage(serializedMessage);
return queueRef.AddMessageAsync(qmessage);
}
}
Implementaciones para Azure ServiceBus
El initializer
/// <summary>
/// Initialize a servicebus queue specific for a message type.
/// </summary>
/// <typeparam name="TMessage">The type of the message</typeparam>
public class ServiceBusQueueInitializer<TMessage> : IStorageInitializer where TMessage : class
{
private readonly string queueName = GetName();
private readonly string serviceBusConnectionString;
public ServiceBusQueueInitializer(string serviceBusConnectionString)
{
if (string.IsNullOrWhiteSpace(serviceBusConnectionString))
{
throw new ArgumentNullException(nameof(serviceBusConnectionString));
}
this.serviceBusConnectionString = serviceBusConnectionString;
}
public void Initialize()
{
var namespaceManager = NamespaceManager.CreateFromConnectionString(serviceBusConnectionString);
if (!namespaceManager.QueueExists(queueName))
{
namespaceManager.CreateQueue(queueName);
}
}
public void Drop()
{
var namespaceManager = NamespaceManager.CreateFromConnectionString(serviceBusConnectionString);
if (namespaceManager.QueueExists(queueName))
{
namespaceManager.DeleteQueue(queueName);
}
}
public static string GetName() => QueueNaming.AsQueueStorageName<TMessage>();
}
/// Initialize a servicebus queue specific for a message type.
/// </summary>
/// <typeparam name="TMessage">The type of the message</typeparam>
public class ServiceBusQueueInitializer<TMessage> : IStorageInitializer where TMessage : class
{
private readonly string queueName = GetName();
private readonly string serviceBusConnectionString;
public ServiceBusQueueInitializer(string serviceBusConnectionString)
{
if (string.IsNullOrWhiteSpace(serviceBusConnectionString))
{
throw new ArgumentNullException(nameof(serviceBusConnectionString));
}
this.serviceBusConnectionString = serviceBusConnectionString;
}
public void Initialize()
{
var namespaceManager = NamespaceManager.CreateFromConnectionString(serviceBusConnectionString);
if (!namespaceManager.QueueExists(queueName))
{
namespaceManager.CreateQueue(queueName);
}
}
public void Drop()
{
var namespaceManager = NamespaceManager.CreateFromConnectionString(serviceBusConnectionString);
if (namespaceManager.QueueExists(queueName))
{
namespaceManager.DeleteQueue(queueName);
}
}
public static string GetName() => QueueNaming.AsQueueStorageName<TMessage>();
}
public class ServiceBusEnqueuer<TMessage> : IEnqueuer<TMessage> where TMessage : class
{
private readonly string queueName = QueueNaming.AsQueueStorageName<TMessage>();
private readonly string serviceBusConnectionString;
public ServiceBusEnqueuer(string serviceBusConnectionString)
{
if (string.IsNullOrWhiteSpace(serviceBusConnectionString))
{
throw new ArgumentNullException(nameof(serviceBusConnectionString));
}
this.serviceBusConnectionString = serviceBusConnectionString;
}
public async Task Enqueue(TMessage message)
{
var client = QueueClient.CreateFromConnectionString(serviceBusConnectionString, queueName);
var serializedObject = SerializeObjectAsString(message);
using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(serializedObject), false))
{
var bmessage = new BrokeredMessage(stream)
{
ContentType = "application/json"
};
await client.SendAsync(bmessage);
}
}
protected virtual string SerializeObjectAsString(TMessage message)
{
return JsonConvert.SerializeObject(message);
}
}
{
private readonly string queueName = QueueNaming.AsQueueStorageName<TMessage>();
private readonly string serviceBusConnectionString;
public ServiceBusEnqueuer(string serviceBusConnectionString)
{
if (string.IsNullOrWhiteSpace(serviceBusConnectionString))
{
throw new ArgumentNullException(nameof(serviceBusConnectionString));
}
this.serviceBusConnectionString = serviceBusConnectionString;
}
public async Task Enqueue(TMessage message)
{
var client = QueueClient.CreateFromConnectionString(serviceBusConnectionString, queueName);
var serializedObject = SerializeObjectAsString(message);
using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(serializedObject), false))
{
var bmessage = new BrokeredMessage(stream)
{
ContentType = "application/json"
};
await client.SendAsync(bmessage);
}
}
protected virtual string SerializeObjectAsString(TMessage message)
{
return JsonConvert.SerializeObject(message);
}
}