21 Punkte von hiddenest 2020-12-24 | 2 Kommentare | Auf WhatsApp teilen

In einer Umgebung mit monatlich durchschnittlich mehr als 10 Milliarden Events entstand die Anforderung, Daten in kurzer Zeit zu analysieren und Analysen zum Nutzerverhalten (Cohort) bereitzustellen.

(z. B. Frauen in ihren 30ern, die in den letzten 6 Monaten in unserer App monatlich mehr als 100.000 Won ausgegeben haben → deren Wiederkehrrate)

Beschrieben wird die Geschichte, wie ein Datastore, den Entwickler sonst nur genutzt hatten, direkt selbst implementiert wurde.

Um Abfragen zur Analyse des Nutzerverhaltens zu implementieren, braucht es…

  • Es muss möglich sein, Metriken abzufragen, die nicht im Voraus berechnet wurden (+ auch neue Analysearten müssen ohne Re-indexing möglich sein)

  • Beim Group By von Event-Daten nach Nutzer darf der Bottleneck durch High-Cardinality-Shuffle gering sein

Soll eine bestehende Lösung genutzt werden, oder bauen wir selbst?

  • Druid wurde andernorts bereits eingesetzt, war aber wegen der Einschränkungen von Pre-Aggregation (ein Ansatz, bei dem nur berechnete Werte gelesen werden) für die Umsetzung ungeeignet

  • Data Warehouses wie Snowflake oder Redshift lassen sich in großem Maßstab betreiben, sind aufgrund ihrer allgemeinen Auslegung für das Ziel aber zu teuer, weil dafür ein im Verhältnis viel zu großes Cluster betrieben werden müsste

  • Um verschiedene Anforderungen wie Funnel, ID-Matching usw. abzudecken, stößt eine SQL-basierte DB an Grenzen

Am Ende wurde der Datastore selbst gebaut

  • Luft = ein Datastore, der von Grund auf dafür optimiert ist, nach Nutzer-ID gruppierte Abfragen zur Analyse des Nutzerverhaltens schnell auszuführen

  • Entwickelt auf Basis von Golang

  • Analysiert Nutzerdaten im Umfang von mehreren Dutzend TB mit weniger als 5 Nodes im Durchschnitt in 3 Sekunden, maximal in 10 Sekunden

  • Anders als ein typisches RDBMS ist es immutable (bei Bedarf werden Daten für denselben Zeitraum überschrieben) → schlichtes Cluster-Design, hohe Performance ohne komplexe Implementierung eines Page Managers, und ein Datenformat nach Wunsch kann entworfen werden

Ein Blick auf die technische Grundlage

  • TrailDB (Storage Engine) – ein Rowstore zur Speicherung von Time-Series-Events, optimiert für die Partitionierung nach Nutzer-ID

→ Werte werden in ein Dictionary überführt, gespeichert wird nur deren ID

→ Nutzer-Events werden zeitlich sortiert, und es werden nur der gegenüber dem vorherigen Event vergrößerte Zeitwert sowie geänderte Spalten gespeichert (da sich die meisten Nutzereigenschaften nicht ändern)

→ Kein Index. Es ist immer ein Full Scan nötig.

→ Dafür bietet es eine erstaunlich hohe Kompressionsrate (CSV 13GB → ~TrailDB 300mb)

→ Da die Zeitkomplexität O(n) ist, entstand die Überlegung, einfach die Platzkomplexität zu reduzieren

  • LLVM (Query Engine)

→ TrailDB unterstützt allerdings nur Equals im OR-AND-Format, und in Go geparste Queries müssen an C/C++ übergeben werden

→ Dabei wurde entdeckt, dass PostgreSQL Queries per LLVM JiT kompiliert

→ Da Queries funktional häufig erweitert werden, lässt sich vermeiden, dass die Entwicklungskosten steigen, wenn alles in C/C++ geschrieben wird (es reicht, LLVM IR in Golang zu erzeugen und an C/C++ zu übergeben, wo es per JiT kompiliert und ausgeführt wird)

  • Die Compute-Layer selbst bauen

→ MapReduce wird zwar oft genutzt, konnte aber wegen Golang nicht verwendet werden

→ Spark/Hadoop sind für Long-running Jobs optimiert, daher war die Performance auch bei Anbindung nicht gut

→ Also wurde auch das selbst entwickelt → https://github.com/ab180/lrmr

→ Kombination aus gRPC + Protobuf + etcd, mit vielen übernommenen Elementen aus dem vertrauten Spark-Design

→ Verzicht auf Resiliency → wenn die Performance maximal hoch ist, kann man bei Ausfällen einfach von vorn starten und bleibt trotzdem unter 10 Sekunden

→ Da bei der Verarbeitung großer Datenmengen häufig Buffer Overflows (Backpressure) auftraten, wurde auf einen Pull-based Event Stream umgestellt (eingesetzt u. a. bei Kafka und Armeria)

  • Sharding selbst implementieren

→ Shard = Historical Node

→ Was, wenn der Datumsbereich einer Partition als Sharding-Key verwendet wird?

→ Jede Query enthält Zeitangaben → leicht zu filtern

→ Im selben Zeitbereich gibt es Daten mit ähnlichem Volumen → leicht zu verteilen

→ Verteilte Umgebungen sind nicht schön…

→ Was passiert, wenn ein Node ausfällt oder neu hinzukommt?

→ Was, wenn der Speicher voll wird?

→ Was, wenn durch eine Störung alles auf einen Node konzentriert wird?

→ Die Cost Function von Druid wurde angepasst, sodass die Cost steigt, je näher Partitionsdatumsbereiche beieinanderliegen und je stärker sie sich überlappen

→ Für die Verfügbarkeit der Shards wurde Folgendes umgesetzt

→ Auf Shard-Informationen wurde eine TTL gesetzt und sie werden periodisch aktualisiert (etcd)

 → Partitionen werden in S3 gespeichert, die Partitionsliste wird in DynamoDB verwaltet

Aktueller Produktionsstand

  • Mit nur 4 c5.2xlarge-Instanzen werden 500GB Daten in unter 15 Sekunden gescannt

Ziele für die Zukunft (oder was noch zu tun ist)

  • Realtime-Funnel-Analyse soll mit einem Cluster von weniger als 10 Maschinen möglich werden

  • Unterstützung für Spark, um ML-Anbindung usw. zu ermöglichen

  • Es wird ein eigener Column Store als Ersatz für TrailDB entwickelt (Ziegel)

→ SIMD- und Multicore-Optimierung

→ Vorabfilterung anhand von Nutzereigenschaften per Bitmap Index

2 Kommentare

 
gera1d 2020-12-24

traildb ist interessant. https://www.youtube.com/watch?v=-oPFxSwn0lM Es ist sehenswert. Auch wenn das Video schon älter ist, hat sich bei traildb in der Zwischenzeit wahrscheinlich nichts geändert.

 
hiddenest 2020-12-24

Jetzt sehe ich, dass es auch einen Blogbeitrag des Entwicklers dazu gibt,

https://engineering.ab180.co/stories/introducing-luft

Von TrailDB hatte ich vorher noch nie gehört, aber das ist so etwas in der Art ...

https://github.com/traildb/traildb