Eventverarbeitung mit Riemann
Riemann ist ein Stream-Processing-System, das sich hervorragend zum Sammeln und Verarbeiten von Events und Logs von Servern und Systemen eignet. Wir verwenden Riemann erfolgreich produktiv in sehr großen IT-Systemen als Kernstück für die Log- und Metrikverarbeitung und für das Monitoring.
Dabei nutzen wir Riemann zur Aufbereitung von Events und zum Weiterleiten an Langzeitspeichersysteme wie Elasticsearch, in denen diese Events wiederum durch Benutzeroberflächen wie Kibana komfortabel durchforstet werden können. Außerdem nutzen wir Riemann, um Metriken in Zeitreihendatenbanken wie InfluxDB zu schreiben; diese Zeitreihen können dann durch Benutzeroberflächen wie Grafana visualisiert werden oder können auch benutzt werden, um bei Fehlern und Problemen zu alarmieren.
Wir zeigen heute an einem Anwendungsbeispiel, wie man aus Logs mit Hilfe von Riemann Metriken extrahieren kann.
Riemann
Riemann selbst ist in Clojure geschrieben und kann auch in Clojure konfiguriert und programmiert werden. Tatsächlich ist die Konfiguration einer Riemann-Instanz ein Clojure-Programm, das mit Hilfe einer mächtigen und effizienten Stream-Processing-Sprache die Eventverarbeitung steuert.
Riemann ermöglicht es, Events durch Streams zu schicken, die diese Events filtern, verändern, kombinieren, aggregieren und projizieren können. Es gibt dutzende eingebaute Streams und es ist sehr einfach, eigene Streams zu schreiben, da ein Stream lediglich eine Funktion ist, die ein Event akzeptiert.
Events
Events sind in Riemann als Maps repräsentiert, also als eine Menge von Schlüssel-Wert-Paaren, wie zum Beispiel dieses Event, das eine Logzeile einer Anfrage an einen Webserver repräsentiert:
(def request-event
{:timestamp 1663577804126
:method "GET"
:request "/index.html"
:requestor "192.168.1.23"
:transaction "uid-82a9dda829"
:service "webserver-request"
:host "192.168.1.1"
:time 1})
Wir binden das Event an den Namen request-event
, um es im Folgenden für unsere
Tests verwenden zu können. Riemann definiert einige besondere Felder wie zum
Beispiel :host
und :service
, die in allen Events vorkommen; ansonsten gibt
es keine Einschränkung, welche Felder und Werte ein Event enthalten kann.
Streams
Wie bereits erwähnt, ist ein Stream in Riemann eine Funktion, die ein Event akzeptiert. In Riemann bedeutet „Stream“ also nicht eine Sequenz von Datenelementen, was man unter diesen Begriff üblicherweise in den meisten Kontexten versteht. Vielleicht wäre „Event-Prozessor“ ein besserer Name für das, was ein Stream in Riemann tatsächlich ist: Ein Callback, der Events akzeptiert und diese Events wiederum an andere Event-Prozessor-Callbacks weitergibt (und keinen Rückgabewert hat). Wir bleiben im Folgenden aber beim Begriff Stream, da dies die gebräuchliche Bezeichnung in Riemann ist.
Ein einfacher Stream ist zum Beispiel ein where
-Stream, der in Riemann
eingebaut ist:
(where <condition> <stream>)
Ein where
-Stream ist selbst ein Event-Prozessor, der ein Event annimmt,
überprüft ob die angegebene Bedingung stimmt und dann den Event-Prozessor
<stream>
mit dem Event aufruft.
Damit bauen wir einen Stream, der Logevents von Metriken trennt: Logevents sollen zur Langzeitspeicherung an einen Elasticsearch-Server und Metriken an eine InfluxDB-Zeitreihendatenbank weitergeleitet werden.
(streams
(where
(metric nil)
elasticsearch-stream
(else
influxdb-stream)))
Der where
-Stream überprüft, ob durchfließende Events auf das Prädikat (metric
nil)
passen. Das Prädikat (metric nil)
bedeutet, dass ein Event kein Feld
:metric
enthält, was in unserem Beispiel wiederum besagt, dass ein Event keine
Metrik ist (die Event-Taxonomie von Riemann nimmt an, dass sich Metriken durch
die Existenz des Felds :metric
von anderen Eventtypen unterscheiden). In
diesem Fall fließt das Event an einen elasticsearch-stream
weiter, der es
schließlich an den Elasticsearch-Server schickt. Im else
-Zweig finden sich
nur noch Metriken, die Riemann dann über den influxdb-stream
an die InfluxDB
weiterleitet.
Die Definitionen von elasticsearch-stream
und influxdb-stream
sehen so aus:
(def elasticsearch-stream
(tap :elasticsearch-stream elasticsearch))
(def influxdb-stream
(tap :influxdb-stream influxdb))
Das tap
in jeder dieser Definitionen ist ein Konstrukt, um Events für
Testfälle abzugreifen. Davon werden wir gleich Gebrauch machen.
Die tatsächlichen Definitionen von elasticsearch
und influxdb
würden diesen
Blogartikel sprengen, daher verlassen wir uns einfach auf Wunschdenken und gehen
davon aus, dass sie das machen, was sie sollen: die ankommenden Events nach
jeweils Elasticsearch beziehungsweise InfluxDB weiterleiten. Für besonders
Interessierte sei gesagt, dass wir für diese Streams die Implementierung aus
unserer Riemann-Bibliothek
active-riemann
benutzen.
Tests
Unit-Tests sind unverzichtbar; auch unsere Stream-Definition müssen wir testen.
Riemann bringt Unterstützung für Unit-Tests mit, wir können Tests mit riemann
test
auf der Kommandozeile laufen lassen. Die Tests müssen wir aber erstmal
schreiben. Der erste Test stellt sicher, dass Events, die keine Metriken sind,
nur im Elasticsearch landen:
(deftest event-test
(let [actual (inject! [request-event])]
(is (= [request-event] (:elasticsearch-stream actual)))
(is (empty? (:influxdb-stream actual)))))
Die Funktion inject!
gibt es nur, um die Streams im Rahmen von Unit-Tests zu
füttern. Sie schickt eine Liste von Events in die definierten Streams. Hier
ist in der Liste nur ein Event, das weiter oben definierte request-event
. Der
Rückgabewert von inject!
enthält alle Events, die an den Stellen, die wir mit
tap
markiert haben, vorbeigekommen sind. Da wir unseren Stream nach
Elasticsearch mit :elasticsearch-stream
belauschen, erwarten wir, dass wir
genau dieses eine Event dort finden, weil unsere Streamdefinition es dort
vorbeigeleitet hat. Und da wir keine Metrik injiziert haben, erwarten wir, dass
an :influxdb-stream
keine Events vorbeigekommen sind, daher muss diese Liste
leer sein, also auf das Prädikat empty?
passen.
Für den nächsten Test wollen wir eine Metrik injizieren, also erstellen wir eine
Beispielmetrik. Dafür bauen wir uns zuerst eine Hilfsfunktion, in der wir das
Aussehen unserer Metrik abstrahieren. Die Metrik hat ein Feld :label
für den
Bezeichner und ein Feld :metric
für den metrischen Wert, den wir der
Hilfsfunktion übergeben können:
(defn make-webserver-transaction-metric
[duration]
{:label "webserver-transaction-duration-milliseconds"
:metric duration})
Dann erzeugen wir unsere Beispielmetrik mit dem Wert 116:
(def example-metric (make-webserver-transaction-metric 116))
Für den nächsten Testfall injizieren wir diese Beispielmetrik:
(deftest metric-test
(let [actual (inject! [example-metric])]
(is (empty? (:elasticsearch-stream actual)))
(is (= [example-metric] (:influxdb-stream actual)))))
Hier erwarten wir, dass an :elasticsearch-stream
kein Event vorbeigekommen
ist, dafür aber die Beispielmetrik bei :influxdb-stream
auftaucht. Alle Tests
laufen durch.
Webserver-Logs
Für unser Anwendungsbeispiel nehmen wir an, dass auf einem ausgedachten Webserver Logs anfallen. Die Anfragen schaffen es in dem oben beschriebenen Format in unser Riemann-System.1 Außer den Anfragen loggt der Webserver auch noch die Auslieferung der Antwort. Ein Event dafür sieht so aus:
(def reply-event
{:timestamp 1663577804242
:transaction "uid-82a9dda829"
:service "webserver-reply"
:host "192.168.1.1"})
Der Webserver verknüpft Anfrage und Antwort mit einer eindeutigen
Transaktions-ID im Feld :transaction
.
Transaktionszeit bestimmen
Für unseren Anwendungsfall interessiert uns, wie lange der Webserver braucht, um
eine Anfrage zu beantworten. Diese Metrik wollen wir messen und festhalten, um
die Performance des Webservers überwachen zu können. Das bedeutet, dass wir die
verstrichene Zeit zwischen den Events der Anfrage und der Antwort einer
Transaktion ermitteln müssen und zur Aufbewahrung, späteren Visualisierung oder
gegebenenfalls auch für Alarme in unsere InfluxDB schreiben. Wenn wir die
Zeitstempel aus den :timestamp
-Feldern voneinander abziehen, erhalten wir die
Transaktionszeit in Millisekunden. Für unser Beispiel ergibt sich eine
Transaktionszeit von 116 Millisekunden.
Entscheidend dabei ist, dass wir nur den Abstand der zusammengehörenden Logeinträge berechnen, also die Logeinträge finden müssen, die dieselbe Transaktions-ID haben. Konkret bedeutet das, dass wir uns die Anfragen so lange merken müssen, bis wir die passende Antwort sehen, sprich die Antwort mit derselben Transaktions-ID wie die Anfrage. Sich etwas zu merken bedeutet Zustand. Riemann bringt in Form eines Index Unterstützung für das Speichern von Zuständen mit.
Index
Der Index ist ein in Riemann eingebauter Speicher für Events. Riemann speichert
für jedes Tupel aus :host
und :service
das jeweils letzte gesehene Event.
Diese Events können wir vom Index abfragen, außerdem haben die Events im Index
ein Verfallsdatum: Das Feld :ttl
legt die „time-to-live“ für ein Event im
Index in Sekunden fest. Und wir können festlegen, in welchen Abständen
gestorbene Events gelöscht werden. Riemann löscht diese Events nicht nur,
sondern informiert auch noch über diese abgelaufenen Events – diese Information
können wir für unser Vorhaben, die Transaktionszeit zu bestimmen, auch sinnvoll
nutzen.
Wir initialisieren unseren Index wie folgt:
(def default-ttl 60)
(def index
(do
(periodically-expire (/ default-ttl 10))
(index)))
(def index-stream
(sdo
(tap :index identity)
(default :ttl default-ttl index)))
Zunächst definieren wir unsere Standard-Verfallszeit default-ttl
auf 60
Sekunden; einen neuen Index binden wir an den Namen index
und definieren mit
periodically-expire
, dass Riemann alle sechs Sekunden nach abgelaufenen Events
schauen soll. Dann erzeugen wir mit dem Aufruf der Riemann-Funktion (index)
einen neuen Index. Den neuen Index verpacken wir in einen Stream mit dem Namen
index-stream
. Genaugenommen ist unser index-stream
eine sdo
-Komposition
von zwei Streams: sdo
ist do
für Streams, also fließen Events in alle
Streams im Rumpf. Hier durchläuft ein ankommendes Event die Streams (tap
:index identity)
und (default :ttl default-ttl index)
:
-
Der Stream
(tap :index identity)
ist nur dazu da, um unseren Index-Stream für unsere Unit-Tests belauschen zu können. -
Der Stream
(default :ttl default-ttl index)
versieht jedes Event, das noch kein Feld:ttl
enthält, mit einem Standardwert für das Feld:ttl
, und zwar mit dem Wert unserer definierten Standard-Lebenszeit. Anschließend werden diese Events dann an den Index weitergereicht, um sie dort abzuspeichern.
In den Index schreiben
Wir brauchen einen Stream, der ein Anfrage-Event in den Index schreibt. Wir
nennen den Stream store-requests-stream
:
(def store-requests-stream
(smap (fn [request]
(clojure.set/rename-keys request {:transaction :service}))
index-stream))
Der Stream benutzt einen smap
-Stream, um die angegebene Funktion auf jedes
Event – sprich auf jede Anfrage – anzuwenden und das Ergebnis dann in den
index-stream
weiterzuschicken. Die Funktion, die smap
anwendet, baut eine
Anfrage so um, dass der Index sie auch richtig speichern kann: Wie oben erwähnt,
speichert der Index ein Event pro Tupel aus :host
und :service
. Damit wir
wirklich alle Events speichern können, benennen wir in der Anfrage einfach das
Feld :transaction
in :service
um und haben so die eindeutige Transaktions-ID
mit als Teil des Schlüssels im Index. Der ursprüngliche Wert von :service
interessiert uns nicht weiter. Ein indiziertes Event sieht zum Beispiel so aus:
(def request-event
{:timestamp 1663577804126
:method "GET"
:request "/index.html"
:requestor "192.168.1.23"
:service "uid-82a9dda829"
:host "192.168.1.1"
:time 1})
Aus dem Index lesen
Kommen Antwort-Events an, müssen wir zugehörige Anfragen aus dem Index lesen.
Diesen Stream nennen wir lookup-requests-and-calculate-metric-stream
; der
Stream erwartet Antwort-Events:
(def lookup-requests-and-calculate-metric-stream
(smap (fn [reply]
(when-let [request (riemann.index/lookup index
(:host reply)
(:transaction reply))]
(riemann.index/delete index request)
(make-webserver-transaction-metric (- (:timestamp reply)
(:timestamp request)))))
reinject))
Dieser Stream benutzt ebenfalls einen smap
-Stream, um eine Funktion auf jedes
Antwort-Event aufzurufen und dann mittels des reinject
-Streams das Ergebnis
davon wieder in unsere Streams zu reinjizieren, also wieder vorne in unsere
Streams hineinzuschieben.
Im Rumpf dieser Funktion passiert einiges: Eine möglicherweise gespeicherte
Anfrage versucht riemann.index/lookup
anhand der Felder :host
und
:transaction
der Antwort zu finden. Wenn das glückt, entfernt die Funktion
das gefundene Event aus dem Index, damit es später nicht ablaufen kann – es ist
ja schließlich schon verarbeitet. Das Ergebnis dieser Funktion ist eine Metrik,
die wir mit make-webserver-transaction-metric
erzeugen und die als Wert die
Differenz der Zeitstempel zwischen Anfrage und Antwort berechnet. Ist eine
passende Anfrage nicht im Index, zum Beispiel weil wir unser Programm gestartet
haben, nachdem diese Anfrage gekommen wäre, gibt die Funktion nil
zurück. Das
hat dann den beabsichtigten Effekt, dass dieses nil
eben nicht reinjiziert
wird, da smap
den Wert nil
nicht an die Streams weitergibt.
Den Index sauberhalten
Damit sich keine Anfragen ansammeln, zu denen der Webserver – aus welchen
Gründen auch immer – nie eine Antwort rausgeschickt hat, haben wir jede Anfrage
im Index mit einer maximalen Lebenszeit versehen. Wenn Riemann solche
abgelaufenen Anfragen aus dem Index entfernt, reinjiziert es eine Information
darüber in die Streams, in Form der Anfrage, aber zusätzlich mit einem Feld
:state
mit dem Wert expired
markiert. Mit dem Prädikat (state "expired")
kann Riemann diese Events filtern – und wir können diese Information nutzen.
Im Falle eines solchen Timeouts wollen wir eine Metrik ausgeben, deren Wert sehr
groß sein soll, sprich also Double/MAX_VALUE
in unserem Zahlenraum. Die
Metrik nennen wir timeout-metric
:
(def timeout-metric
(make-webserver-transaction-metric Double/MAX_VALUE))
Den Stream dafür nennen wir emit-timeout-metric-stream
:
(def emit-timeout-metric-stream
(smap (constantly timeout-metric) reinject))
Der smap
-Stream reinjiziert für jeden eingehenden Wert, unabhängig von der Art
des Werts, eine Metrik mit Double/MAX_VALUE
in die Streams.
Nicht die Ströme kreuzen
Damit haben wir alle Bestandteile zusammen; nun können wir unseren ursprünglichen Stream um ein paar Weichen ergänzen, um auf unsere gewünschte Funktionalität zu kommen:
(streams
(where
(metric nil)
(sdo
(where (not (state "expired")) elasticsearch-stream)
(split
(service "webserver-request") store-requests-stream
(service "webserver-reply") lookup-replies-and-calculate-metric-stream
(state "expired") emit-timeout-metric-stream))
(else
influxdb-stream)))
Der Fall für Metriken bleibt unverändert. Der Fall von Events, die keine Metrik
sind, wird komplizierter: Das sdo
markiert, dass die Events an mehrere Streams
weitergegeben werden sollen. Zum einen an den Stream, der ans Elasticsearch
weiterschickt, allerdings wollen wir die Informationen über die abgelaufenen
Anfragen nicht dort sehen, daher filtern wir die mit (not (state "expired"))
aus. Ansonsten sollen alle Anfragen und alle Antworten im Elasticsearch landen.
Der nächste Stream implementiert die Berechnung der Transaktions-Metriken. Wir
spalten in einem split
-Stream den Eventstrom in Anfragen mit dem Prädikat
(service "webserver-request")
, in Antworten mit dem Prädikat (service
"webserver-reply")
und in abgelaufene Events mit dem Prädikat (state
"expired")
auf:
-
Die Anfragen landen über den oben definierten
store-requests-stream
im Index. -
Die Antworten führen via
lookup-replies-and-calculate-metric-stream
und die zuvor im Index gespeicherten Anfragen zu Metriken. -
Die abgelaufenen Anfragen führen zu Metriken, die den Timeout markieren.
„Wieso nicht?“ – „Das wäre schlecht!“
Zwei Unit-Tests sollen unsere Implementierung absichern. Der Erste testet eine komplette Transaktion bestehend aus Anfrage und Antwort:
(deftest transaction-test
(let [actual (inject! [request-event reply-event])]
(is (= [request-event reply-event]
(:elasticsearch-stream actual)))
(is (= [indexed-event]
(:index actual)))
(is (= [example-metric]
(:influxdb-stream actual)))))
Der Test injiziert unsere Beispielevents für Anfrage request-event
und Antwort
reply-event
. Unsere Erwartungen:
- Beide Events werden an Elasticsearch geschickt.
- Die Anfrage landet in abgewandelter Form als indiziertes Event
indexed-event
im Index. - Die berechnete Metrik
example-metric
erreicht die InfluxDB.
Der zweite Test überprüft das Verhalten im Timeout-Fall:
(deftest transaction-expired-test
(let [actual (do
(inject! [request-event])
(riemann.time.controlled/advance! (* 2 default-ttl))
(inject! [reply-event]))]
(is (= [request-event reply-event]
(:elasticsearch-stream actual)))
(is (= [indexed-event]
(:index actual)))
(is (= [timeout-metric]
(:influxdb-stream actual)))))
Hier injizieren wir zunächst nur die Anfrage request-event
und stellen dann
für diesen Test die Uhr mit riemann.time.controlled/advance!
so weit vor, dass
die Anfrage im Index auf jeden Fall abgelaufen ist. Erst dann schicken wir die
Antwort reply-event
, die dann aber keine passende Anfrage mehr findet. Daher
sehen wir zwar im Elasticsearch sowohl Anfrage- als auch Antwort-Event, und auch
im Index kam mal das indizierte Anfrage-Event vorbei, aber die einzige Metrik,
die es in die InfluxDB geschafft hat, ist die Timeout-Metrik timeout-metric
.
Zusätzlich zum Verhalten im Timeout-Fall deckt dieser Unit-Test sogar das
Verhalten in dem Fall ab, dass für ein Antwort-Event keine Anfrage im Index
vorhanden ist.
Damit haben wir alle Zweige unserer Streams abgetestet und das Ziel erreicht: Wir haben mit Riemann Metriken aus eingehenden Events abgeleitet.
Fazit
Riemann ist performant und effizient in der Ausführung – unsere produktiv laufenden Systeme verarbeiten trotz großer und komplizierter Stream-Logik mehrere tausend Events pro Sekunde.
Und Riemann ist elegant zu programmieren: Mit der umfangreichen Stream-Processing-Sprache lassen sich die typischen Aufgaben in der Eventverarbeitung wie Filtern, Anreichern, Kombinieren, Aggregieren und Projizieren einfach erledigen. Und für alle weiteren Aufgaben können wir die Stream-Processing-Sprache flexibel erweitern.