Snowflake : Parquet Files - ein Stresstest
Wie lade ich parquet - Files am schnellsten in die Snowflake Datenbank?
Wie lade ich die Strukturen in Einzelattribute, wenn ich gar nicht weiß, welche Attribute erwartet werden können?
Wie werden in den semistrukturierten Daten neu auftauchende Attribute erkannt und gespeichert?
Wie halte ich den Overhead gering, wenn die Daten minütlich eintreffen können?
Wie lade ich Datenmengen mit mehreren hundert Attributen und bis zu 10-stellig Datensätzen je Tag in Rekordzeit?
Wie halte ich bei diesen Anforderungen die Kosten unten?
Was mache ich, wenn ich nicht mit einer dieser Anforderungen zu tun habe, sondern allen zugleich?
Die alte Methode
Die bis dato im Einsatz befindlichen Techniken umfassten die Nutzung von
von COPY INTO Skripten mit fest definierten Spalten.
Unten ein Beispiel, wie dies aussah:

Diese Variant erlaubte es jedoch nicht, die Struktur dynamisch an den Inhalt des Parquet - Files anzupassen.
Aus diesem Grund bot Snowflake eine Alternative an, mit der einerseits das DDL für eine Tabelle aus Parquet-Files als Input abgeleitet werden konnte.

Daten wurden dann per COPY INTO eingefügt

Diese Methode hatte viele Nachteile:
- Es konnten keine technischen Spalten mitgegeben werden.
- Das Infer_Schema ist vergleichsweise langsam, da die Inputwerte erst Row by Row mit den Output-Werten / Table-Werten abgeglichen werden müssen.
- Es ist auf diese Weise nicht möglich, Milliarden Datensätze in Rekordzeit zu verarbeiten.
- Zusätzlich muss das Skript getriggert werden. Dabei ging wertvolle Zeit verloren.
Die Lösung:
Um Daten so schnell wie möglich zu laden, sobald sie in S3 verfügbar sind, setzten wir Snowpipes ein.
Snowpipe ist der kontinuierliche Dateningestionsdienst von Snowflake. Snowpipe lädt die Daten innerhalb von Minuten, nachdem die Dateien zu einer Stage hinzugefügt und zur Aufnahme übermittelt wurden.
Mit dem serverlosen Rechenmodell von Snowpipe verwaltet Snowflake die Lastkapazität und sorgt für optimale Rechenressourcen, um die Nachfrage zu decken.
Kurz gesagt: Snowpipe bietet eine "Pipeline" zum Laden frischer Daten in Microbatches, sobald sie verfügbar sind.
Vollständigkeithalber hier die Lektüre, die uns bei der Lösung des Problems geholfen hat:
- Grundlegenede Informationen
- https://docs.snowflake.com/en/user-guide/data-load-snowpipe-intro
- https://docs.snowflake.com/de/user-guide/data-load-snowpipe-auto-s3.html#prerequisite-create-an-amazon-sns-topic-and-subscription
- https://docs.snowflake.com/de/user-guide/data-load-snowpipe-auto-s3.html#step-1-subscribe-the-snowflake-sqs-queue-to-the-sns-topic
-
Drei Wege, um den SnowPipe Load von AWS auszulösen
- A) S3 Bucket : Event Notifications + Rule triggers SQS triggers SnowPipe
- B) S3 Bucket : Event Notifications + Rule triggers SNS Topic triggers SQS triggers SnowPipe
- C) S3 Bucket : Event Bridge triggers Event Bridge triggers SNS Topic triggers SQS triggers SnowPipe
- Links zu diesen Möglichkeiten
- Die in AWS notwendig-einzurichtenden Trigger findet man im
- S3 > Bucket Name > Eigenschaften / Properties > Event Notification
- S3 > Bucket Name > Eigenschaften / Properties > EventBridge

Warum diese Variante sinnvoll ist
Der Charme dieser Variante ist, dass AWS S3 über Eventbridge oder Event Notifications an die Snowpipe versendet, dass neue Dateien zum Laden bereitstehen.
So geht also sehr wenig Zeit zwischen dem Eintreffen der Dateien und dem Einlesen der Dateien verloren.
Die Kehrseite davon ist, dass diese Methode keinen Gebrauch von Scaling Up macht, so dass sich das Warehouse nicht vergrößern / skalieren lässt.
Im Beispiel werden die Daten als Variant-Datentyp abgespeichert. Es gibt aber auch Varianten, die mit Infer_Schema funktionieren, also direkt in einzelne Attribute aufspalten. Dort wiederum funktioniert nicht das Setzen von technischen Attributen wie metadata$filename. Ein KO - Kriterium.
Hier eine kurze Übersicht über die Varianten COPY Command und Snowpipe.

An diesem Punkt waren jedoch alle bisherigen Wege als unzureichend abgearbeitet.
Alex 2015 hätte jetzt versucht im Team verschiedene Varianten auszuloten.
Alex 2023 fragt den Hersteller, wie es sein kann, dass ein Fremdsystem Daten innerhalb von 10 Minuten exportiert, die wir in Snowflake nicht innerhalb von mehreren Stunden importieren können.
Was machen wir falsch?
Und so wurde ein Meeting mit Snowflake abgehalten.
"Das Quellsystem hatte die Dateien innerhalb 20 Minuten (Daten für einen ganzen Tag) exportiert und Snowflake soll nun mehr als 24h brauchen, um diese einzulesen? Das konnten und wollten wir nicht glauben."
Der sportliche Ehrgeiz von unseren Snowflake Ansprechpartnern war geweckt. "20 Minuten ist die Vorgabe? Wir machen es in 2!"
Innerlich freute ich mich, Menschen mit Anspruch an sich sind mir die liebsten. Vor allem, wenn sie auch liefern. Und liefern kann Snowflake.
Snowflake und wir (Tandem mit Luca Yael Pomer, wo ich das Vergnügen hatte als Senior eine erstklassige Nachwuchs-Datenwissenschaftlerin auszubilden) vertesten alle möglichen Wege um die Geschwindigkeit des Einlesens zu erhöhen.
Snowflake fasste ihre Ergebnisse eigens in folgendem Artikel zusammen
Was fällt uns auf? Die Optionen von Snowflake sind zwar schnell und machen von Scaling Up (Warehouse Size gebrauch). Aber auch diese Optionen erfüllen nicht die Anforderung, mit sich ändernden Strukturen klarzukommen. Dafür braucht es "INFER_SCHEMA" und das taucht in den Tests gar nicht auf!
"Creativity is intelligence having fun." Albert Einstein
Und so stelle ich mit diebischer Freude vor, was Snowflake so nicht kann:
Idee:
Was wäre, wenn man die Daten in einem zweistufigen Verfahren laden würde:
Schritt 1: Wir laden die Daten in eine erste Tabelle mit technischen Spalten und die Nutzdaten als Variant - Datentyp nach Verfahren C aus. Diese Tabelle enthält die Roh-Daten als Variant und kann deshalb auch beliebigen Datenschrott annehmen
Schritt 2: Aus den Input-Daten erstellen wir über Infer_Schema eine Temporäre Tabelle, die genau passend ist, für die eintreffenden Daten (diese Tabelle hat eine Spalte für jedes Key-Value Pair)
Schritt 3: Eine Prozedur vergleicht nun die Temporäre Tabelle mit der eigentlichen Zieltabelle. Im Gegensatz zur Tabelle mit den Roh-Daten und Variant - Datentyp Spalte enthält diese Tabelle für jeden jemals gelieferten Key eine Spalte (und ist deshalb auch eher etwas "breit"). Eine Prozedur gleicht die Temporäre Tabelle mit der eigentlichen Zieltabelle ab und fügt Attribute hinzu, falls in der temporären Tabelle Attribute auftauchen, die so in der Zieltabelle bisher fehlten.
Schritt 4: Eine weitere Prozedur liest nun aus dem INFORMATION_SCHEMA alle Attribute aus, die in der Zieltabelle existieren und baut nun ein passendes SQL Statement, welches von der RAW - Tabelle den VARIANT Datentyp ausliest und die Inhalte auf die hunderte Ziel-Attribute verteilt.
Damit das Ganze möglichst effizient geschieht, wäre es wichtig, vom Warehouse Size (über External Table) gebrauch zu machen, als auch nur die hinzugekommen Daten zu laden (über Streams)
Jetzt ist wirklich eine Menge Text. Gibt es das auch in grafisch? Aber sicher doch.

Resources und Beispielcode unter:
snowflake/Snowflake_Blueprints/Parquet/Parquet_Stream_Dynamic_Content/
Erklärungen / Appendix
External Table
-
tracking of new files due to aws sns integration
-
make use of partitioning
-
read directly from s3 files
-
make use of warehouse size

INFER_SCHEMA
-
Aus JSON / PARQUET DDL für Staging Tabelle erstellen lassen
-
kann für INSERT/ COPY INTO genutzt werden
-
INFER_SCHEMA ist langsam, wenn für INSERT genutzt
CREATE OR replace table STG.TMP_TRACKING_DATA_DETAIL_DDL using template (select array_agg(object_construct(*)) from table( infer_schema( location=>'" + latest_path_string + " ', file_format => 'ADOBE_PARQUET' ) ) );"
Automatisch INSERT Erstellen
-
Aus dem Information Schema Attributliste der Zieltabelle ziehen
-
aus VARIANT dann Value zu jeweiligen Key auslesen und in das passende Einzelattribut der Zieltabelle schreiben

Das ist ein typischer Alex: Es wird SQL genutzt, um damit SQL zu generieren. Immerhin kein SQL Generator Generator. Das hätte Spaß gemacht, aber die Anwendung wäre hier nicht zielführend.
INFER_SCHEMA
-
temporär eine Tabelle erstellen, die in der Struktur geeignet ist, die aktuellen Daten aufzunehmen.
-
das erstellen dieser Tabelle in Funktion kapseln
-
diese Tabelle enthält dann alle Attribute, die in den aktuellsten Parquet Daten zu finden sind.
-
Die Struktur dieser Tabelle kann man dann mit der eigentlichen Ziel-Tabelle vergleichen um dort neue Attribute bei Bedarf automatisiert hinzuzufügen
