Azure Event Hubs – Partitions und Retention Time

 

Competing Consumer vs. Partitioned Consumer

Betrachtet man rein den Funktionsumfang den Azure Service Bus Event Hubs und Azure Service Bus Topics/Subscription zur Verfügung stellen, erscheint im ersten Augenblick kein großer Unterschied. Beide Dienste können über verschiedene Protokolle (http(s), AMQP) Daten bzw. Telemetrie von mehreren Sendern entgegennehmen und an multiple Empfänger zur weiteren Bearbeitung weiterleiten.

Einer der großen Unterschiede findet sich in der Möglichkeit Daten mit geringer Latenz und hoher Zuverlässigkeit entgegenzunehmen und dabei zusätzlich eine hohe Bandbreite (aktuell bis zu 1 GB (!!) pro Sekunde) Daten an einen Event Hub zu senden. Dies wird unter anderem dadurch erreicht, dass ein Event Hub ein “Partitioned Consumer” Modell verfolgt. Topic/Subscriptions arbeiten dagegen nach einem “Competing Consumer” Modell. Vereinfacht ausgedrückt verwendet ein EventHub einen “client-seitigen” Cursor. D. h. wenn ein Client von Event Hubs Daten anfordert, so muss dieser dem Event Hub die Stelle bzw. ein Offset mitteilen, ab dem er Daten empfangen möchte. Im Gegensatz dazu verwendet Topic/Subscriptions einen “server-seitigen” Cursor bei dem der Client keine weiteren Informationen welche Daten er abfordern möchte senden muss.

IEventProcessor / Cursorverwaltung

Um die Verwaltung des “client-seitigen” Cursors so einfach wie möglich zu gestalten, stellt Microsoft ein Nuget Package zur Verfügung, welches in der Package Manager Console dem Projekt hinzugefügt werden kann

PM> install-package Microsoft.Azure.ServiceBus.EventProcessorHost

Nach Implementierung des Interface IEventProcessor in einer Klasse müssen die Funktionen CloseAsync(), OpenAsync() und ProcessEventsAsync() implementiert werden. Besonderes Augenmerk liegt hierbei auf OpenAsync(). Die Library startet für jede Partition die bei der Anlage des EventHub definiert wurde eine eigene Instanz der Klasse. Die Referenz auf die jeweilige Partition wird dabei im Parameter PartitionContext übergeben. Dieser PartitionContext kann nun wiederum verwendet werden um die Informationen für den “client-seitigen” Cursor zu speichern. Mit Aufruf der Methode CheckpointAsync() wird die letzte verarbeitete Nachricht gespeichert und bei einem evtl. Neustart der Applikation nicht wieder vom Event Hub abgerufen. 

image

Intern speichert die Library die Informationen über den “client-seitigen” Cursor in einer JSON formatierten Datei  pro Partition in einem Azure Storage Account ab.

{
    „PartitionId“:“0″,
    „Owner“:“MSTechDemoWorker“,
    „Token“:“7886f96a-990c-4049-93a3-432dc159a4fa“,
    „Epoch“:1,
    „Offset“:““,
    „SequenceNumber“:0
}

Retention Time

Bei der Anlage eines Event Hub muss neben der Anzahl der gewünschten Partitionen ebenfalls eine sog. Retention Time (in Tagen) angegeben werden. Der minimale Wert beträgt hierbei ein Tag und der maximale Wert aktuell 7 Tage. Die Angabe der Retention Time legt fest, wie lange der jeweilige Event Hub die eingehenden Daten, für die weitere Verarbeitung durch Konsumenten speichert.

Häufig wird jedoch z. B. bei einer Retention Time von einem Tag davon ausgegangen, dass alle eingegangenen Daten auch automatisch nach Ablauf des einen Tages wieder aus dem Event Hub gelöscht werden. Dies ist nicht der Fall! Die Angabe der Retention Time ist dahingehend zu verstehen, dass die eingegangenen Daten mindestens für diesen Zeitraum zur Verfügung stehen. Gerade während der Entwicklung von Event Hub Consumern muss man häufig die “client-seitigen” Cursor Informationen löschen um wieder mit den “letzten” Test Daten arbeiten zu können. Sehr häufig stellt man dabei fest, dass der Event Hub auch ältere Daten (z. B. Daten welche vor mehreren Tagen an den Event Hub gesendet wurden) zurück meldet, auch wenn man diese aufgrund der Retention Time (von z. B. einem Tag) nicht mehr erwartet.

Um zu verstehen, warum diese Daten trotzdem noch zur Verfügung stehen muss man sich die interne Arbeitsweise des Event Hub näher betrachten. Um die hohe Skalierungsrate zu erreichen, speichert der Event Hub eingehende Daten in Blöcken von definierter Größe pro Partition. Diese Blöcke werden nun wiederum nur gelöscht, wenn diese eine gewisse Größe und die Retention Time überschritten haben. Dadurch erklärt sich auch, warum gerade in der Entwicklung häufig Daten aus einem EventHub zurück geliefert werden, welche die Retention Time bereits überschritten haben. 

Möchte man dieses Verhalten umgehen, ist es möglich, beim Start des EventHubProcessor eine Lambda Funktion für den InitialOffsetProvider zu definieren. Diese Lambda wird wiederum für jede Partition aufgerufen.

image

Ein einfaches Beispiel mit Code Snippets findet sich auf meinem GitHub Account.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind markiert *