Try fast search NHibernate

16 January 2017

Azure queues by convention

El codigo que voy a mostrar hoy viene corriendo, con pocas variaciones a lo largo del tiempo, desde el 2009 cuando Azure era CTP así que somos ya varios que lo tenemos a mano (algo había compartido en el medio del camino).

Las interfaces

Como siempre es buena norma, por lo menos cuando se trata de servicios, empezar por las interfaces; más aún si terminan involucrados en varios proyectos que trabajan con Azure y/o onPremises o son hibridos en fase de metamorfosis (de monolitica oruga come recursos a espledida mariposa que vuela en las nubes).
La primer interfaz es la que uso normalmente para inicializar culquier cosa que tenga a que ver con lo storage (queues, blobs, tables, servicebus etc.):
  public interface IStorageInitializer
  {
    void Initialize();
    void Drop();
  }
Ya que estamos con queues vamos a tener un par de bichos más:
  public interface IEnqueuer<in TMessage> where TMessage : class
  {
    Task Enqueue(TMessage message);
  }

  public interface IDequeuer<out TMessage> where TMessage : class
  {
    TMessage Dequeue(int timeoutMilliseconds);
  }
En realidad en la mayoría de los casos, dificilmente uso el IDequeuer ya que quien usamos el WebJob SDK aprovechamos todo el sistema de polling que viene OOTB (out-of-the-box).

El template de VisualStudio “Azure WebJob” instiga a usar una clase static con una serie de metodos para consumir queues. Personalmente ese enjendro me gusta muy poco aunque entiendo el esfuerzo de hacer parecer siempre “las cosas” muy simples. Yo prefiero definir clases de Consumers de queues que puedo testear, inyectar, registrar y distribuir en varios workers según la evolución de la app lo necesite en su historia. Por ese motivo esta inteface:
  public interface IQueueMessageConsumer<in TMessage> where TMessage : class
  {
    Task ProcessMessage(TMessage message);
  }

 Convention over configuration

En lugar de lidiar con decenas de parametros de configuración, o las notorias “magic strings”, prefiero una simple convención para nombrar las queues.
  public class QueueNaming
  {
    private static readonly string[] queueMessageTypePostfixSequence = { "Message", "Event" };

    public static string AsQueueStorageName<TMessage>() where TMessage: class
    {
      return AsQueueStorageName(typeof(TMessage));
    }

    public static string AsQueueStorageName(Type messageType)
    {
      var typeName = messageType.Name;
      return ConvertToTableName(queueMessageTypePostfixSequence
        .Select(x => TrucateQueueName(x, typeName))
        .FirstOrDefault(x => x != null) ?? typeName);
    }

    private static string TrucateQueueName(string postfix, string typeName)
    {
      return typeName.EndsWith(postfix) ? typeName.Substring(0, typeName.Length - postfix.Length) : null;
    }

    public static string ConvertToTableName(string candidateName)
    {
      if (string.IsNullOrWhiteSpace(candidateName))
      {
        return null;
      }
      /* QueueName 3-63 lowercase alphanumeric and dash */
      return new string(candidateName.Where(x=> char.IsLetterOrDigit(x) || x == '-').Select(char.ToLowerInvariant).ToArray());
    }
  }
En practica uso el nombre de la clase (purgado si necesario) que representa el mensaje para nombrar la queue. Las limitaciones actuales de los nombres para algunos de los servicios de Azure Storage las pueden encontrar resumidas en este post : https://blogs.msdn.microsoft.com/jmstall/2014/06/12/azure-storage-naming-rules/

Implementaciones para Queue Storage

Vamos con el initializer que, en el caso de las queues, uso en el startup de la app
  /// <summary>
  ///   Initialize a queue storage specific for a message type.
  /// </summary>
  /// <typeparam name="TMessage">The type of the message</typeparam>
  public class QueueStorageInitializer<TMessage> : IStorageInitializer where TMessage : class
  {
    private readonly CloudStorageAccount account;
    private readonly string queueName = GetName();

    public QueueStorageInitializer(CloudStorageAccount account)
    {
      if (account == null)
      {
        throw new ArgumentNullException(nameof(account));
      }
      this.account = account;
    }

    public void Initialize()
    {
      var queueClient = account.CreateCloudQueueClient();
      var queue = queueClient.GetQueueReference(queueName);
      queue.CreateIfNotExists();
    }

    public void Drop()
    {
      var queueClient = account.CreateCloudQueueClient();
      var queue = queueClient.GetQueueReference(queueName);
      queue.DeleteIfExists();
    }

    public static string GetName() => QueueNaming.AsQueueStorageName<TMessage>();
  }
que uso de esta forma en la app Web y/o en los workers
      new QueueStorageInitializer<MyEntityChangedEvent>(account).Initialize();
y el Enqueuer
  public class QueueMessageEnqueuer<TMessage> : IEnqueuer<TMessage> where TMessage : class
  {
    private readonly CloudStorageAccount account;
    private readonly string queueName = QueueNaming.AsQueueStorageName<TMessage>();

    public QueueMessageEnqueuer(CloudStorageAccount account)
    {
      if (account == null)
      {
        throw new ArgumentNullException(nameof(account));
      }
      this.account = account;
    }

    public Task Enqueue(TMessage message)
    {
      if (message == null)
      {
        throw new ArgumentNullException(nameof(message));
      }
      var queueClient = account.CreateCloudQueueClient();
      var queueRef = queueClient.GetQueueReference(queueName);
      var serializedMessage = JsonConvert.SerializeObject(message);
      var qmessage = new CloudQueueMessage(serializedMessage);
      return queueRef.AddMessageAsync(qmessage);
    }
  }

 Implementaciones para Azure ServiceBus

El initializer
  /// <summary>
  ///   Initialize a servicebus queue specific for a message type.
  /// </summary>
  /// <typeparam name="TMessage">The type of the message</typeparam>
  public class ServiceBusQueueInitializer<TMessage> : IStorageInitializer where TMessage : class
  {
    private readonly string queueName = GetName();
    private readonly string serviceBusConnectionString;

    public ServiceBusQueueInitializer(string serviceBusConnectionString)
    {
      if (string.IsNullOrWhiteSpace(serviceBusConnectionString))
      {
        throw new ArgumentNullException(nameof(serviceBusConnectionString));
      }
      this.serviceBusConnectionString = serviceBusConnectionString;
    }

    public void Initialize()
    {
      var namespaceManager = NamespaceManager.CreateFromConnectionString(serviceBusConnectionString);
      if (!namespaceManager.QueueExists(queueName))
      {
        namespaceManager.CreateQueue(queueName);
      }
    }

    public void Drop()
    {
      var namespaceManager = NamespaceManager.CreateFromConnectionString(serviceBusConnectionString);
      if (namespaceManager.QueueExists(queueName))
      {
        namespaceManager.DeleteQueue(queueName);
      }
    }

    public static string GetName() => QueueNaming.AsQueueStorageName<TMessage>();
  }
y el Enqueuer
  public class ServiceBusEnqueuer<TMessage> : IEnqueuer<TMessage> where TMessage : class
  {
    private readonly string queueName = QueueNaming.AsQueueStorageName<TMessage>();
    private readonly string serviceBusConnectionString;

    public ServiceBusEnqueuer(string serviceBusConnectionString)
    {
      if (string.IsNullOrWhiteSpace(serviceBusConnectionString))
      {
        throw new ArgumentNullException(nameof(serviceBusConnectionString));
      }
      this.serviceBusConnectionString = serviceBusConnectionString;
    }

    public async Task Enqueue(TMessage message)
    {
      var client = QueueClient.CreateFromConnectionString(serviceBusConnectionString, queueName);
      var serializedObject = SerializeObjectAsString(message);
      using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(serializedObject), false))
      {
        var bmessage = new BrokeredMessage(stream)
        {
          ContentType = "application/json"
        };
        await client.SendAsync(bmessage);
      }
    }

    protected virtual string SerializeObjectAsString(TMessage message)
    {
      return JsonConvert.SerializeObject(message);
    }
  }

 May the Force be with me

Si este año no caigo en lado oscuro de la fuerza voy a seguir agregando codigo a este proyecto (GIT) con otras utilidades y, tal vez, ejemplos para Azure.

10 January 2017

Azure IoT Hub a puro REST

El año pasado, en una reunión con el colorado el ruso y el pibe, me pidieron un ejemplo de uso de Azure IoT Hub sin usar el SDK. Yo pensaba que con un poco de GoogleFu algo iba a encontrar pero… nada.

En lugar de seguir buscando codigo y ya que todo, o casi, en Azure tiene API REST empecé a leer la documentación de la API y a codear.

Si quieren probar el codigo de este post, a parte un account de Azure, necesitan crear un IoTHub y tener a mano tres parametros:
1) el host del IoTHub que creaste
HostName
2) El nombre de la policy. En este caso, aunque ya tienen policies definida por default, le conviene crear una policy nueva.
PolicyName
3) la key de la policy. Como key pueden usar la primary o la secondary; en muchos servicios en Azure siempre tienen dos keys para cambiar/regenerar la key sin sufrir downtime.
SAS

Ya tenemos los ingredientes arriba la mesada y podemos empezar a prepararlos para la cocción.

Ya hace un tiempito que en Azure muchos servicios gozan de la fantastica SAS (Shared Access Signature) que, basicamente, nos permite compartir recursos y/o servicios sin por eso tener que hacer “viajar” las keys o utilizar procesos de auth que pueden ser más complejos como OAuth2. La SAS viaja plácidamente en el query string o en los headers de un request REST. En este caso usaremos la SAS para evitar el obscuro MitM (man in the middle) creando una SAS, usable por un tiempo relativamente corto (TTL time to live), para enviar mensajes al Azure IoT Hub. Una SAS para, condimentar un request al IoT Hub, se prepara de esta forma:

    private static readonly DateTime epochTime = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);

    public static string SharedAccessSignature(string hostUrl, string policyName, string policyAccessKey, TimeSpan timeToLive)
    {
      if (string.IsNullOrWhiteSpace(hostUrl))
      {
        throw new ArgumentNullException(nameof(hostUrl));
      }

      var expires = Convert.ToInt64(DateTime.UtcNow.Add(timeToLive).Subtract(epochTime).TotalSeconds).ToString(CultureInfo.InvariantCulture);
      var resourceUri = WebUtility.UrlEncode(hostUrl.ToLowerInvariant());
      var toSign = string.Concat(resourceUri, "\n", expires);
      var signed = Sign(toSign, policyAccessKey);

      var sb = new StringBuilder();
      sb.Append("sr=").Append(resourceUri)
        .Append("&sig=").Append(WebUtility.UrlEncode(signed))
        .Append("&se=").Append(expires);
      if (!string.IsNullOrEmpty(policyName))
      {
        sb.Append("&skn=").Append(WebUtility.UrlEncode(policyName));
      }
      return sb.ToString();
    }

    private static string Sign(string requestString, string key)
    {
      using (var hmacshA256 = new HMACSHA256(Convert.FromBase64String(key)))
      {
        var hash = hmacshA256.ComputeHash(Encoding.UTF8.GetBytes(requestString));
        return Convert.ToBase64String(hash);
      }
    }
Ingredientes y condimento preparados podemos empezar cocinando la entrada o sea el check de existencia y la registración de un device en el IoT Hub… el device se puede registrar en el IoT Hub a mano (desde el portal de Azure) pero va a resultar medio engorroso si planean agregar una estación de monitoreo sin mucha cerimonia.
    public static async Task CreateIfNotExists(HttpClient httpClient, string deviceId)
    {
      if (await Exists(httpClient, deviceId))
      {
        return;
      }
      var jsonMessage =
        $"{{\"deviceId\": \"{deviceId}\", \"status\": \"enabled\", \"statusReason\": \"Listo para enviar info\"}}";

      var request = new HttpRequestMessage(HttpMethod.Put, $"devices/{deviceId}?api-version=2016-02-03")
      {
        Content = new StringContent(jsonMessage, Encoding.ASCII, "application/json")
      };
      var response = await httpClient.SendAsync(request);
      if (!response.IsSuccessStatusCode)
      {
        throw new IOException("No fue posible registrar la estación de medición.");
      }
    }

    public static async Task<bool> Exists(HttpClient httpClient, string deviceId)
    {
      var request = new HttpRequestMessage(HttpMethod.Get, $"devices/{deviceId}?api-version=2016-02-03");
      request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
      var response = await httpClient.SendAsync(request);
      return response.IsSuccessStatusCode;
    }
Como notarán, hasta ahora, son todo metodos de una pequeña utility (todo los metodos son static) que llamé, con muchisima fantasia: DeviceRest . Otra peculiaridad de estos métodos es el hecho que reciben un HttpClient ; el motivo será más claro cocinando el plato principal.
  public class MeditionInfoSender
  {
    private readonly string stationId;
    private HttpClient currentHttpClient;
    private readonly string iotHubHost;
    private readonly string iotHubPolicyName;
    private readonly string iotHubPolicyKey;

    public MeditionInfoSender(string iotHubHost, string iotHubPolicyName, string iotHubPolicyKey, string stationId)
    {
      if (string.IsNullOrWhiteSpace(iotHubHost))
      {
        throw new ArgumentNullException(nameof(iotHubHost));
      }
      if (string.IsNullOrWhiteSpace(iotHubPolicyName))
      {
        throw new ArgumentNullException(nameof(iotHubPolicyName));
      }
      if (string.IsNullOrWhiteSpace(iotHubPolicyKey))
      {
        throw new ArgumentNullException(nameof(iotHubPolicyKey));
      }
      if (string.IsNullOrWhiteSpace(stationId))
      {
        throw new ArgumentNullException(nameof(stationId));
      }
      this.stationId = stationId;
      this.iotHubHost = iotHubHost;
      this.iotHubPolicyName = iotHubPolicyName;
      this.iotHubPolicyKey = iotHubPolicyKey;
    }

    public void InitializeStation()
    {
      using (var httpClient = new HttpClient())
      {
        httpClient.BaseAddress = new UriBuilder {Scheme = "https", Host = iotHubHost}.Uri;
        var hubSharedAccessSignature = DeviceRest.SharedAccessSignature(iotHubHost
          , iotHubPolicyName
          , iotHubPolicyKey
          , TimeSpan.FromMinutes(1));
        httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("SharedAccessSignature", hubSharedAccessSignature);
        DeviceRest.CreateIfNotExists(httpClient, stationId).Wait();
      }
      currentHttpClient = CreateHttpClient();
    }

    public async Task<bool> Send(string jsonMessage)
    {
      if (string.IsNullOrWhiteSpace(jsonMessage))
      {
        return false;
      }
      var request = new HttpRequestMessage(HttpMethod.Post, $"devices/{stationId}/messages/events?api-version=2016-02-03")
      {
        Content = new StringContent(jsonMessage, Encoding.ASCII, "application/json")
      };
      try
      {
        var httpClient = GetHttpClient();
        var response = await httpClient.SendAsync(request);
        return response.IsSuccessStatusCode;
      }
      catch (Exception)
      {
        return false;
      }
    }

    private HttpClient GetHttpClient()=> currentHttpClient ?? (currentHttpClient= CreateHttpClient());

    private HttpClient CreateHttpClient()
    {
      var httpClient = new HttpClient(new SharedAccessSignatureAuthHandler(iotHubHost, iotHubPolicyName, iotHubPolicyKey))
      {
        BaseAddress = new UriBuilder {Scheme = "https", Host = iotHubHost}.Uri,
      };
      return httpClient;
    }

    private class SharedAccessSignatureAuthHandler : HttpClientHandler
    {
      private AuthenticationHeaderValue currentSas;
      private readonly TimeSpan maxSasTtl = TimeSpan.FromMinutes(23);
      private readonly Stopwatch timer = new Stopwatch();
      private readonly string iotHubHost;
      private readonly string iotHubPolicyName;
      private readonly string iotHubPolicyKey;

      public SharedAccessSignatureAuthHandler(string iotHubHost, string iotHubPolicyName, string iotHubPolicyKey)
      {
        if (string.IsNullOrWhiteSpace(iotHubHost))
        {
          throw new ArgumentNullException(nameof(iotHubHost));
        }
        if (string.IsNullOrWhiteSpace(iotHubPolicyName))
        {
          throw new ArgumentNullException(nameof(iotHubPolicyName));
        }
        if (string.IsNullOrWhiteSpace(iotHubPolicyKey))
        {
          throw new ArgumentNullException(nameof(iotHubPolicyKey));
        }
        this.iotHubHost = iotHubHost;
        this.iotHubPolicyName = iotHubPolicyName;
        this.iotHubPolicyKey = iotHubPolicyKey;
      }

      protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
      {
        if (timer.Elapsed >= maxSasTtl || currentSas == null)
        {
          SetCurrentAuth();
        }
        request.Headers.Authorization = currentSas;
        return base.SendAsync(request, cancellationToken);
      }

      private void SetCurrentAuth()
      {
        timer.Reset();
        var hubSharedAccessSignature = DeviceRest.SharedAccessSignature(iotHubHost
          , iotHubPolicyName
          , iotHubPolicyKey
          , maxSasTtl.Add(TimeSpan.FromMinutes(3)));
        currentSas = new AuthenticationHeaderValue("SharedAccessSignature", hubSharedAccessSignature);
        timer.Start();
      }
    }
  }
Esta clase se ocupa de toda (lo que necesité) la comunicación con el IoT Hub usando su API REST. La parte más compleja (bueno… ponele…) es la gestión de la instancia de HttpClient que se usa a cada request.En el pipeline del HttpClient se usa una instancia de HttpClientHandler encargada de generar una SAS, con un TTL de 3 minutos, y adornar el request agregando el Authorization Header (el handler es la clase SharedAccessSignatureAuthHandler ).

Ya está el plato cocinado falta solo ver como se come…

Considerando que ustedes sabrán como obtener el stationId (que para el IoT Hub es el Device-ID), el primer bocón sería mas o menos así:
      sender = new MeditionInfoSender(iotHubHost, iotHubPolicyName, iotHubPolicyKey, stationId);
      Console.WriteLine($"Inicializando estación de medición '{stationId}'...");
      sender.InitializeStation();
      Console.WriteLine($"Estación '{stationId}' inicializada.");
Considerando que ustedes sabrán como obtener/construir el iotJsonMessage , el envío de un mensaje al IoT Hub (todos los otros bocones) se reduce a lo siguiente:
        var sent = await sender.Send(iotJsonMessage);

Esta receta es para la versión ligth/zero que pueden consumir quienes están a dieta o le resulta pesado el SDK o simplemente se divierte cocinando comida casera (pueden usarlo para traducirlo en el lenguaje de su device). Los que pueden ir al fast-food sería mejor que usen el SDK que le corresponda; le paso la que ya es solo la landing page del SDK (en el texto de la pagina encontrarán en link al repo de cada lenguaje): https://github.com/Azure/azure-iot-sdks

Como postre: el codigo de este post, as is, se usó en un par de proyectos .NET Core para los siguientes runtimes:
  "runtimes": {
    "win10-x64": {},
    "ubuntu.14.04-x64": {},
    "debian.8-x64": {} 
  }
o sea que funcionó en windows, ubuntu y docker (meterlo en un RaspberryPi, con UWP, no cuesta mucho)… y si! .NET, al fin, es multiplataforma y OSS.