Kapitel 4. Datenbeobachtungen generieren
Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com
Wie in Kapitel 3 erläutert, kombiniert die Datenbeobachtung Technologie und Menschen, um Informationen über den Zustand eines Systems aus der Datenperspektive und die Erwartungen an diesen Zustand zu sammeln. Diese Informationen werden dann genutzt, um das System anpassungsfähiger oder widerstandsfähiger zu machen.
In diesem Kapitel wird erklärt, wie du die Praxis der Datenbeobachtung anwenden kannst. Ich beginne mit der "Datenbeobachtung an der Quelle", einer Methode zur Einführung von Erfassungsstrategien in deine tägliche Datenarbeit, und zeige dir, wie du ihre Auswirkungen auf die Effizienz minimieren kannst. Dann geht das Kapitel auf die Umsetzung von Erwartungen ein, die sich an den Lebenszyklus der Softwarebereitstellung anschließen, z. B. kontinuierliche Integration und kontinuierliches Deployment (CI/CD).
Wie bei jeder neuen Praxis und Technologie musst du die Eintrittsbarriere senken, um die Akzeptanz der Datenbeobachtung zu erhöhen; so haben die Menschen weniger Grund, gegen die Veränderung zu argumentieren. Aber auch die Menschen sind Teil der Lösung, denn ihre Beteiligung an dem Prozess ist entscheidend, um ihre Erwartungen zu ermitteln und die Regeln festzulegen. Zu diesem Zweck lernst du verschiedene Möglichkeiten kennen, wie du den Aufwand für die Erstellung von Beobachtungen verringern kannst und wie du sie zum richtigen Zeitpunkt im Entwicklungszyklus einführen kannst.
An der Quelle
In Kapitel 2 wurden die Quellen und Arten von Informationen erläutert, die dem Beobachter helfen. Aber wie generierst und sammelst du die Informationen aus diesen Quellen?
Es beginnt mit der Beobachtbarkeit der Daten an der Quelle. Der Begriff "Quelle" bezieht sich auf die Anwendungen, die für das Lesen, Umwandeln und Schreiben von Daten verantwortlich sind. Wie in Kapitel 3 erläutert, können diese Anwendungen entweder die Ursache von Problemen sein oder die Mittel, um sie zu lösen. Außerdem liegen die Anwendungen im Gegensatz zu den Daten selbst in unserer Kontrolle als Ingenieure und Organisationen.
Die Methode der Datenbeobachtung an der Quelle dreht sich also um die Fähigkeit der Anwendungen, Datenbeobachtungen nach dem in Kapitel 2 behandelten Modell zu generieren; mit anderen Worten, die Anwendungen werden datenbeobachtbar gemacht.
Reine Daten- und Analysebeobachtungen - wie z. B. Datenquellen, Schemata, Linien und Metriken - stehen im Zusammenhang mit Lese-, Transformations- und Schreibaktivitäten. Die in diesem Abschnitt beschriebenen Strategien gehen auf diese Aktivitäten ein und erklären, was bei ihrer Durchführung zu beachten ist.
Erzeugen von Datenbeobachtungen an der Quelle
Die Datenbeobachtung an der Quelle beginnt mit der Erstellung zusätzlicher Informationen, die das Verhalten bestimmter Aktivitäten mit Daten erfassen: Lesen, Umwandeln und Schreiben. Entwickler können zum Beispiel Zeilen von Protokollen hinzufügen, die die Informationen enthalten, die sie benötigen, um Einblicke in die Aktivitäten ihrer Anwendung zu erhalten. In der Anleitung werden die in Kapitel 2 beschriebenen Kanäle - also die Logs, Metriken und Traces - verwendet, um die Beobachtungen zu vermitteln, die in einem Logging-System zentralisiert werden können.
Im nächsten Abschnitt lernst du, wie du Datenbeobachtungen im JSON-Format erstellst, die in lokalen Dateien, einem lokalen Dienst, einem entfernten (Web-)Dienst oder ähnlichen Zielen gesammelt (veröffentlicht) werden können. Nach dem Kernmodell der Datenbeobachtung in Kapitel 2 würden die Datenquelle und die Schema-Entität einer Postgres-Tabelle zum Beispiel wie in Beispiel 4-1 aussehen.
Beispiel 4-1. Beispiel für als JSON kodierte Datenbeobachtungen
{
"id"
:
"f1813697-339f-5576-a7ce-6eff6eb63249"
,
"name"
:
"gold.crm.customer"
,
"location"
:
"main-pg:5432/gold/crm/table"
,
"format"
:
"postgres"
}
{
"id"
:
"f1813697-339f-5576-a7ce-6eff6eb63249"
,
"data_source_ref"
:
{
"by_id"
:
"e21ce0a8-a01e-5225-8a30-5dd809c0952e"
},
"fields"
:
[
{
"name"
:
"lastname"
,
"type"
:
"string"
,
"nullable"
:
true
},
{
"name"
:
"id"
,
"type"
:
"int"
,
"nullable"
:
false
}
]
}
Die Möglichkeit, Datenbeobachtungen in demselben Modell auf einer zentralen Plattform zu sammeln, ist der Schlüssel, um den Wert der Datenbeobachtung im großen Maßstab zu generieren, z. B. über Anwendungen, Teams und Abteilungen hinweg. Deshalb ist die Verwendung des Kernmodells der Datenbeobachtung wichtig, um Datenbeobachtungen einfach zu aggregieren, insbesondere entlang von Linien.
Sehen wir uns an, wie man Datenbeobachtungen in Python mithilfe einer Low-Level-API erzeugt, die zur Einführung von Abstraktionen auf höherer Ebene verwendet wird (die in den nächsten Kapiteln behandelt werden).
Low-Level-API in Python
Diese Strategie, eine Low-Level-API zu verwenden, erfordert viel Zeit und Engagement von dir, da du jede Beobachtung explizit erstellen musst. Allerdings bietet dir diese Strategie auch die größte Flexibilität, da sie keine Abstraktionen auf höherer Ebene beinhaltet.
Auf der anderen Seite erfordert die Unterstützung der Datenbeobachtbarkeit auf dieser Ebene, insbesondere während der Erkundung und Wartung, dass der Entwickler konsequent ist und immer daran denkt, was er in der Produktion beobachten möchte (zum Beispiel sollte jeder erfahrene Entwickler so viele Zeilen für Protokolle und Prüfungen produzieren wie für die Geschäftslogik).1 Während der Entwicklung müssen die Entwickler dann die Änderungen an der Logik oder dem Verhalten der Anwendung durch die Erstellung der zugehörigen Beobachtungen sichtbar machen. Beispiele für solche Beobachtungen sind eine Verbindung zu einer neuen Tabelle, die Erstellung einer neuen Datei oder die Änderung einer Struktur mit einem neuen Feld.
In den folgenden Abschnitten wirst du ein komplettes Beispiel für eine in Python geschriebene Datenanwendung durchgehen, die neben der Datennutzung auch Datenbeobachtungen erzeugt, indem du Folgendes tust:
-
Verstehen von Anwendungen ohne Datenbeobachtungsmöglichkeit
-
Hinzufügen von Anweisungen zur Erzeugung von Datenbeobachtungen und deren Zweck
-
Erkenntnisse über die Vor- und Nachteile dieser Strategie gewinnen
Beschreibung der Datenpipeline
Im weiteren Verlauf des Kapitels werde ich eine in Python geschriebene Datenpipeline verwenden, mit der wir Daten beobachtbar machen. Der Code der Pipeline auf GitHub ermöglicht es dir, die Beispiele in diesem Kapitel auszuführen. Sie nutzt die Pandas-Bibliothek, um CSV-Dateien zu verarbeiten, und besteht aus zwei Anwendungen: Ingestion und Reporting, wie in Abbildung 4-1 dargestellt.
Beide Anwendungen (ingestion
und reporting
) verwenden Python und Pandas und nutzen Daten aus der Datenquelle "Tägliche Aktienkurse", um zwei nachgelagerte Berichte (BuzzFeed-Aktien und AppTech) zu erstellen.
Die Anwendung ingestion
liest die CSV-Dateien mit den täglichen Aktienkursen, die das Börsenteam jeden Monat bereitstellt. Das Team hat die Dateien nach Jahr und Monat unterteilt und dann die Preise in monatlichen Ansichten zusammengeführt, die in einer separaten Datei gespeichert werden, wie in ingestion
in Beispiel 4-2.
Beispiel 4-2. Anwendung zur Datenübernahme
import
pandas
as
pd
AppTech
=
pd
.
read_csv
(
"
data/AppTech.csv
"
,
parse_dates
=
[
"
Date
"
]
,
dtype
=
{
"
Symbol
"
:
"
category
"
}
,
)
Buzzfeed
=
pd
.
read_csv
(
"
data/Buzzfeed.csv
"
,
parse_dates
=
[
"
Date
"
]
,
dtype
=
{
"
Symbol
"
:
"
category
"
}
,
)
monthly_assets
=
pd
.
concat
(
[
AppTech
,
Buzzfeed
]
)
\
.
astype
(
{
"
Symbol
"
:
"
category
"
}
)
monthly_assets
.
to_csv
(
"
data/monthly_assets.csv
"
,
index
=
False
)
Die Anwendung ingestion
verarbeitet zwei CSV-Dateien für die folgenden Bestände:
-
BuzzFeed
-
AppTech
Die Anwendung liest die Dateien aus dem Ordner "../data", als Referenz; eine Zeile in diesen Dateien sieht so aus:
1 |
Date |
Symbol |
Open |
High |
Low |
Close |
AdjClose |
Volume |
2 |
2022-01-03 |
APCX |
14.725 |
15.200 |
14.25 |
14.25 |
14.25 |
1600 |
Während der Erkundung stellte unser Dateningenieur fest, dass die Dateien zwei Felder enthalten, die vorverarbeitet werden müssen. Das Feld Date
muss als Datum geparst werden, und das Feld Symbol
müssen als Kategorie behandelt werden.
Die Anwendung liest und verarbeitet die täglichen Bestände, bevor sie sie im Speicher zur Verfügung stellt. Dann fügt die Anwendung sie in einer einzigen Variablen zusammen, die in diesem Fall ein neuer pandas DataFrame ist, der alle Kategorien in Symbol enthält.
Das Skript schreibt das Ergebnis in denselben Ordner als neue Datei namens monthly_assets.csv. Auf diese Weise können andere Analysten oder Datentechniker dort beginnen, wenn sie alle täglichen Aktienkurse für weitere Analysen benötigen.
Nachdem dieses erste Skript ausgeführt wurde und die Datei verfügbar ist, können die übrigen Pipeline-Skripte ausgeführt werden, um die BuzzFeed- und AppTech-Aktienberichte zu erstellen. In diesem Beispiel gibt es nur ein Skript, die Berichts-Python-Datei, die in Beispiel 4-3 gezeigt wird.
Beispiel 4-3. Anwendung zur Datenmeldung
import
pandas
as
pd
all_assets
=
pd
.
read_csv
(
"
data/monthly_assets.csv
"
,
parse_dates
=
[
'
Date
'
]
)
apptech
=
all_assets
[
all_assets
[
'
Symbol
'
]
==
'
APCX
'
]
buzzfeed
=
all_assets
[
all_assets
[
'
Symbol
'
]
==
'
BZFD
'
]
buzzfeed
[
'
Intraday_Delta
'
]
=
buzzfeed
[
'
Adj Close
'
]
-
buzzfeed
[
'
Open
'
]
apptech
[
'
Intraday_Delta
'
]
=
apptech
[
'
Adj Close
'
]
-
apptech
[
'
Open
'
]
kept_values
=
[
'
Open
'
,
'
Adj Close
'
,
'
Intraday_Delta
'
]
buzzfeed
[
kept_values
]
.
to_csv
(
"
data/report_buzzfeed.csv
"
,
index
=
False
)
apptech
[
kept_values
]
.
to_csv
(
"
data/report_appTech.csv
"
,
index
=
False
)
Diese Anwendung führt die folgenden Aktionen durch:
Liest die von der vorherigen Anwendung erstellte Datenquelle
Filtert die Daten für die beiden Berichte heraus, die er erstellt: BuzzFeed und AppTech.
Berechnet die Intraday_Delta
Scores, also die täglichen Bestandsentwicklungen, die zu melden sind.
Meldet die Open, Adj Close
, und die berechneten Werte Intraday_Delta
in den entsprechenden Dateien für jeden Bestand.
Um die Pipeline ordnungsgemäß auszuführen, ist es wichtig, die Abhängigkeiten zwischen den Reporting- und Ingestion-Anwendungen zu beachten. Das heißt, du musst die ingestion
Anwendung erfolgreich ausgeführt werden, bevor du die reporting
Anwendung ausführst. Andernfalls wird die Pipeline fehlschlagen. Eine Lösung, um dies zu erreichen, ist die Verwendung eines Orchestrators, der ingestion
vor reporting
ausführt, wie z. B. Dagster und Airflow. Der Orchestrator ist eigentlich eine weitere Anwendung, die man konfigurieren kann, indem man die Abhängigkeiten zwischen den Anwendungen auf der Datenebene explizit festschreibt. Die Anwendungen selbst wissen aber immer noch nichts über ihre nachgelagerten Abhängigkeiten.
Die Kehrseite der Hardcodierung der Abhängigkeiten in einem Orchestrator ist, dass es sich um ein neues Asset handelt, das gepflegt werden muss (z. B. die Genauigkeit der expliziten Abhängigkeiten). Außerdem gibt es zusätzliche Einschränkungen bei der Erstellung einer Pipeline, da die Teams unabhängig bleiben müssen und daher nicht einfach eine bestehende Pipeline aktualisieren können, um ihre eigenen Bedürfnisse zu erfüllen. Daher muss eine Erweiterung auf einer höheren Ebene erstellt werden, indem eine neue, separate DAG hinzugefügt wird, die die expliziten Abhängigkeiten aufheben könnte.
Kehren wir zu unserer Pipeline zurück und besprechen wir die funktionalen Abhängigkeiten zwischen den Anwendungen. Das heißt, dass ingestion
erfolgreich ausgeführt werden muss, bevor reporting
ausgeführt werden kann. Aber was bedeutet erfolgreich?
Definition des Status der Datenpipeline
Um festzustellen, ob eine Ausführung einer Datenpipeline erfolgreich ist, werde ich die entgegengesetzte Frage analysieren: Welche Arten von Fehlern können in der ingestion
auftreten?
- Explizites Scheitern
- Dies führt zu einem Absturz der Anwendung. Diese Art von Fehler ist für den Orchestrator einfach zu handhaben, wenn du einen verwendest, denn es ist ein Flag, das die nächste Anwendung nicht auslöst: in unserem Beispiel die Anwendung
reporting
. - Stiller Ausfall
- In diesem Fall wird die Anwendung beendet, ohne dass z. B. ein Fehlercode oder ein Protokoll erscheint. Da sie nicht wie erwartet läuft, musst du den Begriff der Erwartungen berücksichtigen, der in Kapitel 2 eingeführt wurde.
Ein Beobachter der ingestion
Anwendung würde expliziten Fehlern wie diesen begegnen:
- Datei nicht gefunden Fehler
- Tritt auf, wenn eine der Datendateien, wie z. B. Buzzfeed.csv, im Ordner nicht verfügbar ist, weil sie umbenannt oder in Kleinbuchstaben geändert wurde oder die Datei nicht erstellt wurde, bevor die Anwendung
ingestion
ausgeführt wurde. - Tippfehler (
TypeError
) - Tritt auf, wenn einige Werte nicht mit den Typen übereinstimmen, die der Funktion
read_csv
zur Verfügung gestellt werden, z. B. wenn das Symbol eine Kategorie sein sollte. - Fehler bei hartkodierten Namen
- Treten auf, wenn eines der Felder, die explizit in dem Code verwendet werden, um auf die Werte zuzugreifen, wie z.B. der Spaltenname
Date
in Beispiel 4-3 undSymbol
in diesem Fall, nicht vorhanden ist oder der Name geändert wurde. - Dateisystem-Fehler
- Passiert, wenn die Dateien nicht lesbar sind oder der Ordner für den Benutzer, der die Anwendung ausführt, nicht beschreibbar ist.
- Speicherfehler
- Passiert, wenn die Dateien so groß werden, dass der der Anwendung zugewiesene Speicher nicht mehr ausreicht.
- Systemfehler
- Wird ausgelöst, wenn die Festplatte keinen Platz mehr hat, um das aggregierte Ergebnis zu schreiben.
Aber aus der Perspektive des Ingenieurs, der es beobachtet, zeigen die folgenden Beispiele stille Fehler:
-
Die Spalte
Date
kann nicht als Datum geparst werden, weil sie falsch formatiert ist, das Muster geändert wurde oder die Zeitzone nicht konsistent ist. In diesem Fall ist die Spalte nicht mehr einedatetime
, sondern eineobject
. -
Die Spalte
Date
enthält Werte, aber nicht für den aktuellen Monat. Alle Werte sind vergangene oder zukünftige Daten. -
Die Spalte
Date
enthält Werte in der Zukunft, weil der Generator möglicherweise später ausgeführt wurde und Informationen über die Zukunft generiert hat, um sie mit dem Monat zu vergleichen, in dem er verarbeitet wird. Dies kann später zu Duplikaten oder inkonsistenten Werten für dieselben Daten führen und einige Aggregationen fehlschlagen lassen. -
Die Kategorie
Symbol
ändert sich aus verschiedenen Gründen, z. B. wegen eines Tippfehlers oder einer Änderung der Großschreibung, der Länge oder der Referenz. Diese Kategorie wird in allen Dateien verwendet und als Kategorie in die Ausgabedatei geschrieben.
Außerdem könnte die Anwendung reporting
die Anwendung ingestion
als fehlgeschlagen betrachten, weil sie die folgenden Fehlschläge in reporting
provoziert hat:
-
Die monatliche Aggregatdatei wurde nicht geschrieben, als die Berichterstattung begann, so dass sie nicht verfügbar ist, wenn das Berichtstool in einem bestimmten Intervall ausgeführt werden soll.
-
Eines der Felder, die zum Filtern in der Arithmetik verwendet werden, ist nicht verfügbar oder sein Name wurde geändert.
-
Die gleichen Fehler treten beim Lese-/Schreibzugriff, der Größe und dem Speicherplatz auf.
Außerdem kann reporting
lautlos fehlschlagen, weil die stillen Ausfälle der ingestion
-
Die Symbole
APCX
undENFA
sind in der bereitgestellten Datei nicht verfügbar. -
Adj Close
oderOpen
fehlende Werte sind, was zu Müll in derIntraday_Delta
Ausgabe führt. Dieses Problem kann auch ein expliziter Fehler sein. In diesem Fall werden die Werte zuNA
von Pandas. -
Die aggregierte Datei enthält keine Informationen über den aktuellen Monat, aber die Daten liegen in der Vergangenheit.
Da jeder dieser Fehler auftreten kann, musst du wissen, wann er auftritt, und - noch besser - du musst ihn frühzeitig erkennen, um zu verhindern, dass er von der Anwendung ingestion
weitergegeben wird (siehe "Fail Fast and Fail Safe").
Die expliziten Fehler sollten als Entwicklungspraxis bereits sichtbar gemacht worden sein, um diese Fehler explizit abzufangen (try…except
in Python). Damit ein Beobachter jedoch stille Fehler erkennen und entdecken kann, muss die Anwendung die entsprechenden Beobachtungen liefern.
Datenbeobachtungen für die Datenpipeline
In diesem Abschnitt werde ich einen Überblick über die Datenbeobachtungen geben, die die Datenpipeline erzeugen muss. Dazu werfen wir einen kurzen Blick auf Abbildung 4-2, die zeigt, wie eine Low-Level-API das in Kapitel 2 vorgestellte Modell umsetzt. Es ist interessant festzustellen, dass sie eine ähnliche Struktur und sogar einige Entitäten (beschriftet) haben; in den nächsten Abschnitten werde ich auf jeden Teil einzeln eingehen, um diese Fakten hervorzuheben.
In diesem Diagramm siehst du, dass die Entitäten mit den Großbuchstaben A, B, C und D in Kreisen gekennzeichnet sind. Die "A"-Datenquellen heben die Beobachtungen hervor, die von der Anwendung ingestion
über die von ihr erzeugten Daten und von reporting
beim Lesen der Daten gemacht werden, wodurch die implizite Abhängigkeit deutlich wird.
In der Tat erzeugen beide Anwendungen mehrere ähnliche Beobachtungen, die alle Abhängigkeiten darstellen, die sie miteinander verbinden. In Abbildung 4-2 sind die folgenden ähnlichen Beobachtungen ebenfalls hervorgehoben:
-
Die "B"-Entitäten beobachten den Server, von dem die Daten abgerufen wurden.
-
Die "C"-Entitäten beobachten den Benutzer beim Ausführen von Befehlen.
-
Die "D"-Entitäten beachten das Schema der Daten, die durch die Ingestion erzeugt werden, so wie sie vom Reporting gelesen werden.
Schauen wir uns nun an, was wir zum Code der Anwendung hinzufügen müssen, um die in Abbildung 4-2 gezeigten Beobachtungen zu erzeugen. Da der Code in Python geschrieben ist, verwenden wir das Modul logging
, um die in JSON kodierten Beobachtungen zu drucken.
Erzeugen von kontextbezogenen Daten Beobachtungen
In diesem Abschnitt behandle ich den Code, den benötigt, um Beobachtungen über den Ausführungskontext der in Abbildung 4-3 gezeigten ingestion
Anwendung zu generieren (beachte, dass reporting
denselben Code wiederverwenden kann).
Füge den Code in Beispiel 4-4 am Anfang der Datei ein, um die Beobachtungen für die Anwendung ingestion
zu erzeugen.
Beispiel 4-4. Erzeugen von Datenbeobachtungen über die Ingestion-Anwendung
app_user
=
getpass
.
getuser
(
)
repo
=
git
.
Repo
(
os
.
getcwd
(
)
,
search_parent_directories
=
True
)
code_repo
=
repo
.
remote
(
)
.
url
commit
=
repo
.
head
.
commit
code_version
=
commit
.
hexsha
code_author
=
commit
.
author
.
name
application_name
=
os
.
path
.
basename
(
os
.
path
.
realpath
(
__file__
)
)
application_start_time
=
datetime
.
datetime
.
now
(
)
.
isoformat
(
)
Abrufen von Informationen zum Benutzernamen.
Abrufen von Git-Informationen.
Abrufen von Informationen über die Ausführung der Anwendung.
Die zusätzlichen Anweisungen in Beispiel 4-5 erstellen Variablen für die Beobachtungen, aber es wird noch nichts mit ihnen gemacht. Um die Informationen zu protokollieren, wie bereits erwähnt hat, verwenden wir eine JSON-Darstellung des Informationsmodells, die wie in Beispiel 4-5 kodiert ist.
Beispiel 4-5. Modellierung von Datenbeobachtungen über die Ingestion-Laufzeit
application_observations
=
{
"name"
:
application_name
,
"code"
:
{
"repo"
:
code_repo
,
"version"
:
code_version
,
"author"
:
code_author
},
"execution"
:
{
"start"
:
application_start_time
,
"user"
:
app_user
}
}
Dieser Code erstellt das JSON, das aus allen bisher erstellten Beobachtungen besteht. In diesem Abschnitt geht es jedoch um die Verwendung einer Low-Level-API für die Beobachtung von Daten. Im weiteren Verlauf werden wir auf ein ähnliches Muster stoßen, was uns die Möglichkeit gibt, Funktionen zu erstellen, um den Code zu vereinfachen und ihn in den Anwendungen ingestion
und reporting
oder in anderen Python-Anwendungen gemeinsam zu nutzen.
Um eine API zu erstellen, erstellen wir ein Modell, das das Beobachtungskernmodell in JSON nachahmt, jede Entität in eine Klasse verwandelt und Beziehungen in Referenzen umwandelt (siehe Beispiel 4-6 ).
Beispiel 4-6. Modellierung der Anwendungsdatenbeobachtungen mit speziellen Klassen
class
Application
:
name
:
str
def
__init__
(
self
,
name
:
str
,
repository
:
ApplicationRepository
)
-
>
None
:
self
.
name
=
name
self
.
repository
=
repository
def
to_json
(
self
)
:
return
{
"
name
"
:
self
.
name
,
"
repository
"
:
self
.
repository
.
to_json
(
)
}
class
ApplicationRepository
:
location
:
str
def
__init__
(
self
,
location
:
str
)
-
>
None
:
self
.
location
=
location
def
to_json
(
self
)
:
return
{
"
location
"
:
self
.
location
}
app_repo
=
ApplicationRepository
(
code_repo
)
app
=
Application
(
application_name
,
app_repo
)
Das bedeutet, dass die Anwendungsentität eine Application
Klasse mit einem Eigenschaftsnamen, der den Namen der Datei als application_name
Variable enthalten kann, und einen Verweis auf eine ApplicationRepository
Instanz haben muss. Diese ApplicationRepository
Entität wird als ApplicationRepository
Klasse kodiert, deren Eigenschaft location
als Git-Remote-Speicherort festgelegt ist. Diese Struktur erleichtert den Aufbau des Modells und die Erstellung einer JSON-Darstellung, die wiederverwendbar ist und zu einer Standardisierung führen kann.
Ein zusätzlicher Vorteil der Kodierung von Konzepten in API-Klassen ist, dass sie die Verantwortung haben, Helfer vorzuschlagen, um zugehörige Beobachtungen zu extrahieren, wie in Beispiel 4-7 .
Beispiel 4-7. Nutzung von Klassen zur Definition von Helfern für modellierte Entitäten
class
ApplicationRepository
:
location
:
str
# [...]
@staticmethod
def
fetch_git_location
(
)
:
import
git
code_repo
=
git
.
Repo
(
os
.
getcwd
(
)
,
search_parent_directories
=
True
)
.
remote
(
)
.
url
return
code_repo
class
Application
:
name
:
str
# [...]
@staticmethod
def
fetch_file_name
(
)
:
import
os
application_name
=
os
.
path
.
basename
(
os
.
path
.
realpath
(
__file__
)
)
return
application_name
app_repo
=
ApplicationRepository
(
ApplicationRepository
.
fetch_git_location
(
)
)
app
=
Application
(
Application
.
fetch_file_name
(
)
,
app_repo
)
Diese Strategie könnte ein einfacher Weg sein, um das Modell umzusetzen. Wir bevorzugen jedoch einen anderen Ansatz, der die Verbindungen zwischen den Entitäten schwächt. In Beispiel 4-8 werden alle Informationen in einer JSON-Datei protokolliert, wobei die Entitäten in einem Informationsbaum mit Application
an der Wurzel angeordnet sind. Diese Kodierung zwingt uns dazu, alle Beobachtungen zu erstellen, bevor wir die Wurzel, also die Application
Instanz, protokollieren. Der Application
Konstruktor würde dann so aussehen wie in Beispiel 4-8.
Beispiel 4-8. Aufgeblähter Konstruktor für die Anwendung ohne Trennung von Belangen
class
Application
:
name
:
str
def
__init__
(
self
,
name
:
str
,
version
:
ApplicationVersion
,
repo
:
ApplicationRepository
,
execution
:
ApplicationExecution
,
server
:
Server
,
author
:
User
)
->
None
:
pass
Um diese Komplexität und Einschränkung zu vermeiden, ist es besser, die Abhängigkeiten zwischen den Entitäten umzukehren. Anstatt die Application
mit ihren ApplicationVersion
oder ApplicationRepository
enthält, erstellen wir Application
allein und fügen dann einen schwachen Verweis auf sie aus ApplicationVersion
und ApplicationRepository
. Beispiel 4-9 zeigt, wie dieses Modell aussehen würde.
Beispiel 4-9. Umkehrung der Abhängigkeiten zwischen Entitäten und Einführung id
class
ApplicationRepository
:
location
:
str
application
:
Application
id
:
str
def
__init__
(
self
,
location
:
str
,
application
:
Application
)
-
>
None
:
self
.
location
=
location
self
.
application
=
application
id_content
=
"
,
"
.
join
(
[
self
.
location
,
self
.
application
.
id
]
)
self
.
id
=
md5
(
content
.
encode
(
"
utf-8
"
)
)
.
hexdigest
(
)
def
to_json
(
self
)
:
return
{
"
id
"
:
self
.
id
,
"
location
"
:
self
.
location
,
"
application
"
:
self
.
application
.
id
}
@staticmethod
def
fetch_git_location
(
)
:
import
git
code_repo
=
git
.
Repo
(
os
.
getcwd
(
)
,
search_parent_directories
=
True
)
.
remote
(
)
.
url
return
code_repo
Mit diesem Modell können wir die Beobachtungen einzeln protokollieren - zwei Aufrufe an logging.info
-reduzieren die Menge der zu speichernden Informationen. Da wir die Beziehungen zwischen den Entitäten neu zusammensetzen müssen, führen wir die Variable id
ein, um die Menge der zu protokollierenden Informationen und der zu verknüpfenden Beobachtungen zu reduzieren. Anhand der Protokolle kann id
die Verknüpfungen im Modell rekonstruieren, wie z. B. die Abhängigkeit zwischen ApplicationRepository
und Application
, da sie bereits protokolliert wurden ( id
).
In diesem Beispiel hat die Anwendung id
lokal erstellt, was zu einem schlechten Design führt, das über mehrere Ausführungen hinweg inkonsistent sein wird. Um dieses Problem zu umgehen, müssen wir einen funktionalen id
definieren, der die Entitäten über verschiedene Ausführungen, Einsätze und Anwendungen hinweg identifizieren kann. Dieser Begriff ist in der Modellierung als Primärschlüssel bekannt. Du kannst einen Primärschlüssel z. B. als Eingabe für einen Hash-Algorithmus verwenden, der die id
auf deterministische Weise generiert, in diesem Fall mit hashlib
.
Beispiel 4-9 zeigt, wie man den Primärschlüssel verwendet, um id
konsistent zu generieren, zum Beispiel mit md5
. Diese Strategie werden wir in diesem Kapitel immer wieder anwenden, um Entitäten zu erzeugen.
Wrap-Up: Die Daten-Beobachtungs-Daten-Pipeline
Bevor wir analysieren, wie die Beobachtungen bei den zu Beginn dieses Abschnitts vorgestellten expliziten und stillen Fehlern helfen , wenden wir an, was wir bisher für die Anwendung ingestion
getan haben, um die Anwendungsdaten reporting
beobachtbar zu machen. Siehe Beispiel 4-19.
Beispiel 4-19. Berichtsanwendung mit ausführlichen, aus dem Code generierten Datenbeobachtungen
import
ApplicationRepository
.
fetch_git_location
import
ApplicationVersion
.
fetch_git_version
app
=
Application
(
Application
.
fetch_file_name
(
)
)
app_repo
=
ApplicationRepository
(
fetch_git_location
(
)
,
app
)
git_user
=
User
(
ApplicationVersion
.
fetch_git_author
(
)
)
app_version
=
ApplicationVersion
(
fetch_git_version
(
)
,
git_user
,
app_repo
)
current_user
=
User
(
"
Emanuele Lucchini
"
)
app_exe
=
ApplicationExecution
(
app_version
,
current_user
)
all_assets
=
pd
.
read_csv
(
"
data/monthly_assets.csv
"
,
parse_dates
=
[
'
Date
'
]
)
apptech
=
all_assets
[
all_assets
[
'
Symbol
'
]
==
'
APCX
'
]
buzzfeed
=
all_assets
[
all_assets
[
'
Symbol
'
]
==
'
BZFD
'
]
buzzfeed
[
'
Intraday_Delta
'
]
=
buzzfeed
[
'
Adj Close
'
]
-
buzzfeed
[
'
Open
'
]
apptech
[
'
Intraday_Delta
'
]
=
apptech
[
'
Adj Close
'
]
-
apptech
[
'
Open
'
]
kept_values
=
[
'
Open
'
,
'
Adj Close
'
,
'
Intraday_Delta
'
]
buzzfeed
[
kept_values
]
.
to_csv
(
"
data/report_buzzfeed.csv
"
,
index
=
False
)
apptech
[
kept_values
]
.
to_csv
(
"
data/report_appTech.csv
"
,
index
=
False
)
all_assets_ds
=
DataSource
(
"
data/monthly_assets.csv
"
,
"
csv
"
)
all_assets_sc
=
Schema
(
Schema
.
extract_fields_from_dataframe
(
all_assets
)
,
all_assets_ds
)
buzzfeed_ds
=
DataSource
(
"
data/report_buzzfeed.csv
"
,
"
csv
"
)
buzzfeed_sc
=
Schema
(
Schema
.
extract_fields_from_dataframe
(
buzzfeed
)
,
buzzfeed_ds
)
apptech_ds
=
DataSource
(
"
data/report_appTech.csv
"
,
"
csv
"
)
apptech_sc
=
Schema
(
Schema
.
extract_fields_from_dataframe
(
apptech
)
,
apptech_ds
)
# First lineage
lineage_buzzfeed
=
OutputDataLineage
(
buzzfeed_sc
,
OutputDataLineage
.
generate_direct_mapping
(
buzzfeed_sc
,
[
all_assets_sc
]
)
)
lineage_buzzfeed_exe
=
DataLineageExecution
(
lineage_buzzfeed
,
app_exe
)
all_assets_ms_1
=
DataMetrics
(
DataMetrics
.
extract_metrics_from_df
(
all_assets
)
,
all_assets_sc
,
lineage_buzzfeed_exe
)
buzzfeed_ms
=
DataMetrics
(
DataMetrics
.
extract_metrics_from_df
(
buzzfeed
)
,
buzzfeed_sc
,
lineage_buzzfeed_exe
)
# Second lineage
lineage_apptech
=
OutputDataLineage
(
apptech_sc
,
OutputDataLineage
.
generate_direct_mapping
(
apptech_sc
,
[
all_assets_sc
]
)
)
lineage_apptech_exe
=
DataLineageExecution
(
lineage_apptech
,
app_exe
)
all_assets_ms_2
=
DataMetrics
(
DataMetrics
.
extract_metrics_from_df
(
all_assets
)
,
all_assets_sc
,
lineage_apptech_exe
)
apptech_ms
=
DataMetrics
(
DataMetrics
.
extract_metrics_from_df
(
apptech
)
,
apptech_sc
,
lineage_apptech_exe
)
Indem wir die Beobachtungen auf diese Weise hinzufügen, bleiben die Änderungen ähnlich wie bei der Anwendung ingestion
. Dieser Ansatz ermöglicht es uns, Gewohnheiten und Abstraktionen zu entwickeln, wie z. B. ein Framework, das die Anzahl der erforderlichen Änderungen reduziert - fast ein implizites Gesetz in der Entwicklung.
In Beispiel 4-19 siehst du, dass die Beobachtungen, die für die Eingänge generiert werden, an das Ende verschoben wurden. Wir haben uns aus Gründen der Einfachheit des Beispiels für diese Implementierung entschieden. Ein Vorteil ist, dass die zusätzlichen Berechnungen am Ende durchgeführt werden, ohne den Geschäftsablauf zu beeinträchtigen. Der Nachteil ist, dass, wenn zwischendurch etwas fehlschlägt, keine Beobachtungen über die Datenquellen und ihr Schema gesendet werden. Natürlich ist es möglich, diese Situation mit einigen Anpassungen am Code zu vermeiden. Außerdem müssen wir für diese Einführung in eine Low-Level-API einige Boilerplate hinzufügen, um die richtigen Informationen zu generieren, was im Verhältnis zum Geschäftscode wie Lärm klingen mag. Behalte jedoch im Hinterkopf, dass das Skript ursprünglich keine Protokolle enthielt. Im Allgemeinen werden Protokolle sporadisch hinzugefügt, um Informationen über das Verhalten des Skripts zu erhalten, was wir auch getan haben, allerdings für die Daten.
Behalte auch diese Punkte im Hinterkopf:
-
Wir haben den
OutputDataLineage.generate_direct_mapping
Helper so verwendet, wie er ist, um ein Lineage Mapping zwischen den Outputs und Inputs zu erstellen. Das wird jedoch nicht funktionieren, weil wirAdj Close
undOpen
aus der Datei monthly_assets.csv in der neuen SpalteIntraday_Delta
zusammengefasst haben. Da die Felder nicht denselben Namen haben, wird die "direkte" Heuristik diese Abhängigkeit nicht erkennen. -
Es wird eine Warnmeldung angezeigt, wenn dieselben Beobachtungen ein zweites Mal für die monthly_assets gemeldet werden. Das haben wir getan, weil wir Lineage pro Ausgabe kodiert haben. Wir haben jetzt zwei Lineages, die jeweils Ausgaben für die Dateien report_buzzfeed.csv und report_AppTech.csv erzeugen. Da die Ausgabe dieselben Daten wie die (gefilterte) Eingabe wiederverwendet, müssen wir für jede Ausgabe melden, wie die Eingabe aussieht, damit sie nicht als Duplikate angezeigt werden. Alternativ könnten wir die Beobachtungen wiederverwenden oder das Modell anpassen, um diese Doppelung zu vermeiden. Du könntest stattdessen die folgenden Optionen in Betracht ziehen:
-
Wenn wir unsere Strategie dahingehend ändern, dass wir jedes Mal lesen, wenn auf die Daten zugegriffen wird, anstatt sie in den Speicher zu laden, dann sind die Beobachtungen nicht mehr identisch, wenn sich die Daten zwischen den beiden Schreibvorgängen ändern. Wenn eine Ausgabe Probleme hat, ziehen wir es vor, die Beobachtungen der Eingabe mit dieser Linie zu synchronisieren. Die Wahrscheinlichkeit dieser Situation steigt, wenn du bedenkst, dass jeder Schreibvorgang Stunden und nicht nur Sekunden dauern kann.
-
Die Anwendung
reporting
erzeugt bei jedem Durchlauf alle Ausgaben, aber durch späteres Refactoring kann dies geändert und parametrisierbar gemacht werden. So kann zum Beispiel nur eine Ausgabe erstellt werden, wie die von BuzzFeed. Daher wird jederreporting
Datensatz durch unabhängige Läufe erzeugt. In diesem Fall bilden die Beobachtungen dies bereits angemessen ab, sodass wir die Logik nicht anpassen müssen. Mit anderen Worten: Wenn wir die Beobachtungen eines bestimmten Inputs so oft senden, wie er verwendet wird, um einen Output zu erzeugen, entspricht das der Realität, anstatt zu versuchen, Daten zu optimieren, die wie Duplikate aussehen könnten.
-
Kümmern wir uns um den ersten Punkt und stellen sicher, dass die Lineage die tatsächlichen Verbindungen zwischen den Datenquellen darstellen kann. Um dies auf vereinfachte Weise zu tun, werden wir die in Beispiel 4-19 eingeführte Hilfsfunktion mit zusätzlichen Informationen aktualisieren. Beispiel 4-20 zeigt die neue Version dieser Hilfsfunktion, die jetzt ein Argument enthält, um eine nicht-direkte Zuordnung für jede Eingabe zu liefern. In späteren Abschnitten werden Strategien vorgestellt, mit denen dieser häufige Anwendungsfall viel einfacher, effizienter, wartbarer und genauer behandelt werden kann, z. B. durch die Verwendung von Monkey Patching.
Beispiel 4-20. Generierte Abstammung auf Feldebene basierend auf übereinstimmenden Feldnamen
@staticmethod
def
generate_direct_mapping
(
output_schema
:
Schema
,
input_schemas
:
list
[
tuple
[
Schema
,
dict
]
]
)
:
input_schemas_mapping
=
[
]
output_schema_field_names
=
[
f
[
0
]
for
f
in
output_schema
.
fields
]
for
(
schema
,
extra_mapping
)
in
input_schemas
:
mapping
=
{
}
for
field
in
schema
.
fields
:
if
field
[
0
]
in
output_schema_field_names
:
mapping
[
field
[
0
]
]
=
[
field
[
0
]
]
mapping
.
update
(
extra_mapping
)
if
len
(
mapping
)
:
input_schemas_mapping
.
append
(
(
schema
,
mapping
)
)
return
input_schemas_mapping
Beispiel 4-21 zeigt den letzten Teil der Beobachtungen der reporting
Anwendung.
Beispiel 4-21. Berichtsanwendung macht Daten mit sauberem Code beobachtbar
# First lineage
intraday_delta_mapping
=
{
"
Intraday_Delta
"
:
[
'
Adj Close
'
,
'
Open
'
]
}
a
=
(
all_assets_sc
,
intraday_delta_mapping
)
lineage_buzzfeed
=
OutputDataLineage
(
buzzfeed_sc
,
OutputDataLineage
.
generate_direct_mapping
(
buzzfeed_sc
,
[
(
all_assets_sc
,
intraday_delta_mapping
)
]
)
)
lineage_buzzfeed_exe
=
DataLineageExecution
(
lineage_buzzfeed
,
app_exe
)
all_assets_ms_1
=
DataMetrics
(
DataMetrics
.
extract_metrics_from_df
(
all_assets
)
,
all_assets_sc
,
lineage_buzzfeed_exe
)
buzzfeed_ms
=
DataMetrics
(
DataMetrics
.
extract_metrics_from_df
(
buzzfeed
)
,
buzzfeed_sc
,
lineage_buzzfeed_exe
)
# Second lineage
lineage_apptech
=
OutputDataLineage
(
apptech_sc
,
OutputDataLineage
.
generate_direct_mapping
(
apptech_sc
,
[
(
all_assets_sc
,
intraday_delta_mapping
)
]
)
)
lineage_apptech_exe
=
DataLineageExecution
(
lineage_apptech
,
app_exe
)
all_assets_ms_2
=
DataMetrics
(
DataMetrics
.
extract_metrics_from_df
(
all_assets
)
,
all_assets_sc
,
lineage_apptech_exe
)
apptech_ms
=
DataMetrics
(
DataMetrics
.
extract_metrics_from_df
(
apptech
)
,
apptech_sc
,
lineage_apptech_exe
)
Datenbeobachtungen zur Behebung von Fehlern in der Datenpipeline nutzen
Jetzt können wir unsere Pipeline bereitstellen, ausführen und überwachen, indem wir die Beobachtungen nutzen, die sie bei jedem Durchlauf generiert. Die Low-Level-API erforderte einiges an zusätzlichem Aufwand und Überzeugung. Dennoch sind wir mit den Ergebnissen zufrieden. Jede Minute, die wir für diese Aufgaben aufwenden, bringt uns den 100-fachen Nutzen in Form von vermiedenen Umsatzeinbußen - gemäß der Qualitätskostenregel 1-10-100 -, wenn in der Produktion Probleme auftreten.
Schauen wir uns die Probleme an, die in diesem Abschnitt auf erwähnt wurden, beginnend mit den ingestion
Bewerbungsfehlern, die auftreten können:
- Eingabedateien nicht gefunden
- Die
DataSource
Beobachtungen werden bei jedem Lauf gesendet. Nachdem sie gelesen wurden, senden die fehlgeschlagenen Läufe also keine von ihnen. Selbst für jemanden, der die Anwendungslogik nicht kennt, ist es klar, dass die bisher verwendeten Dateien fehlen. - Tippfehler beim Lesen
- Die
Schema
Beobachtungen werden gesendet und enthalten die Feldnamen, die mit ihrem Typ verbunden sind. Daher ist der erwartete Typ für den Beobachter klar, ohne dass er die Anwendung oder die Dateien der Vormonate aufrufen muss. - Fehler aufgrund von fehlenden Feldern
- Die gleichen Beobachtungen wie bei "Typfehler beim Lesen" helfen dem Beobachter, schnell zu erkennen, welche Felder in den vorherigen Durchläufen vorhanden waren, die im aktuellen Durchlauf fehlen.
- Dateisystem-Fehler
- Die von der Pandas-Bibliothek ausgelöste Ausnahme liefert normalerweise den Pfad, der zu einem Fehler geführt hat. Die verbleibende Information, die dem Beobachter zur Verfügung steht, um das Problem zu identifizieren, ist der Server, den dieser Pfad verwendet hat. Die IP, die in den Serverbeobachtungen auf
DataSource
angegeben ist, gibt sofort Aufschluss darüber, welcher Server mit diesem Pfad verbunden war. - Speicherfehler
-
Dieses Problem tritt vor allem dann auf, wenn die Datenmenge plötzlich ansteigt. Es können viele Fälle in Betracht gezogen werden, aber sie werden meist intuitiv vom Beobachter gehandhabt, indem er
DataMetrics
Beobachtungen verwendet, die die Anzahl der Zeilen, das Schema mit der Anzahl der Felder oder die Anzahl vonDataSources
enthalten. Trotzdem kann es erforderlich sein, dass die Beobachtungen früher als am Ende des Skripts gesendet werden, wie in den folgenden beiden Fällen:-
Eine der Eingabedateien hat eine größere Größe als zuvor. Die Datei wird leicht erkannt, weil keine
DataMetrics
für sie verfügbar ist. -
Der Output ist stark gewachsen, weil alle Dateien gewachsen sind. Der Größenunterschied wird erkannt, weil die
DataMetrics
für die Ausgaben fehlen. Auch dieDataMetrics
für die Eingaben zeigen einen Anstieg der Anzahl der Zeilen.
-
- Speicherplatzfehler im Dateisystem
-
Diese Fehler treten höchstwahrscheinlich beim Schreiben der Ausgabe auf, wenn man bedenkt, dass es hier um die Beobachtbarkeit von Daten geht. Die gleichen Informationen wie bei "Speicherfehler" geben dem Beobachter sofort Aufschluss darüber, warum der verfügbare Speicherplatz nicht mehr ausreicht und welche Dateien nicht geschrieben werden konnten.
Die
Date
ist nicht als Datum parsbar-
In diesem Fall hat das Schema die Typbeobachtung für das Datumsfeld von
date
auf etwas anderes geändert, z.B.str
oderobject
.
Date
Spalte enthält keine Werte für das aktuelle Jahr/Monat-
Die
DataMetrics
Beobachtungen beinhalten den minimalen und maximalen Zeitstempel, der einen sofortigen Einblick in den Unterschied zwischen der Ausführungszeit und den verfügbaren Daten gibt. Nehmen wir zum Beispiel an, der maximale Zeitstempel liegt zwei Tage vor dem Zeitpunkt, an dem die Datenquelle gelesen wird, dann können die Daten als zu alt angesehen werden, wenn der akzeptable Zeitraum nur einen Tag beträgt.
Date
Spalte enthält Werte in der Zukunft-
Das ist ganz einfach, denn die gleiche Art von Beobachtungen wie beim vorherigen Ausfall aufgrund fehlender Werte für den aktuellen Monat gibt dir diese Sichtbarkeit.
Symbol
Kategorien geändert-
Wenn wir nur numerische
DataMetrics
betrachten, können wir diesen Fall schnell anhand der Anzahl der Kategorien erkennen, die in der Ausgabedatei wachsen würden. Eine oder einige der Dateien wären nicht mehr konsistent, da sie sich auf verschiedene Kategorien beziehen würden.
Dann müssen wir uns überlegen, unter welchen Umständen die
reporting
Anwendung die Ingestion-Anwendung als fehlgeschlagen betrachten könnte, und gegebenenfalls, welchereporting
Anwendung ein Beobachter verwenden könnte. Zu diesen Situationen gehören die folgenden:Die monatliche aggregierte Datei ist nicht verfügbar
-
Die DataSource-Beobachtung oder
DataMetrics
wurde von der Anwendung nicht gesendet.ingestion
Anwendung gesendet.
Die Aggregation verwendet fehlende Felder, wie z. B.
Close
-
Im Schema der monatlichen Daten, die von der Anwendung
ingestion
gesendet werden, fehlen diese Felder ebenfalls.
Fehler bei Lese-/Schreibzugriff, Größe und Platz
-
Für den
reporting
Beobachter gelten die gleichen Lösungen wie für deningestion
Beobachter. Es gibt keine Verzerrung der Informationen zwischen Teams oder Teammitgliedern.
APCX
undENFA
Symbole-
Die Anzahl der Kategorien, die der
ingestion
Beobachter meldet, hat sich geändert, was in einigen Fällen einen Hinweis darauf gibt, was passiert ist. Wir könnenDataMetrics
jedoch erweitern, um auch nicht-numerische Beobachtungen zu melden und die Kategorien zu melden.
Fehlende Werte in
Adj Close
oderOpen
führen zu abnormalen Zahlen-
Die
DataMetrics
"Anzahl der Nullen" deckt diesen Fall ab, denn wenn die Anzahl der Nullen größer als Null ist, wird die Berechnung derIntraday_Delta
NA
s zurückgeben.
Falsches Datum im monatlichen Vermögen
-
Die gleichen Lösungen, die der
ingestion
Beobachter bei Datumsfehlern anwendet, gelten auch hier. Die Anwendung könnte zum Beispiel den Mindest- und Höchstwert für die SpalteDate
im Vergleich zum aktuell gemeldeten Monat verwenden.
-
Wir sind jetzt in der Lage, mit verschiedenen Situationen umzugehen, in denen die Daten die Ursache für ein Problem sind. Ohne dieses Wissen wären in solchen Situationen lange, stressige Meetings nötig, um sie zu verstehen, und anstrengende Stunden oder Tage für die Fehlersuche, die sich zu Monaten ausweiten könnten, weil wir in der Produktion nicht auf die Daten zugreifen können.
Die bisher diskutierten Probleme sind solche, von denen wir wissen, dass sie auftreten können. Es gibt jedoch noch viele andere Probleme, die sich in der Pipeline ausbreiten können und über die wir noch wenig wissen - die unbekannten Unbekannten. Zum Beispiel sind die Werte eines der Bestände für einen vergangenen Monat falsch, weil beim CSV-Export Fehler in die Daten eingefügt wurden. Dieses Problem wird bei mindestens einer deiner Anwendungen auftreten, und bei anderen ähnlichen.
Aufgrund dieser Unbekannten dürfen die Datenbeobachtungen nicht nur auf vordefinierte Fälle beschränkt werden, sondern die Anwendungen müssen so viele Beobachtungen wie möglich melden - unter Berücksichtigung möglicher Einschränkungen bei den Rechenressourcen und der Zeit -, um einen Überblick über die erwartete oder nicht erfüllte Situation zu erhalten. In diesem Beispiel wäre die Verteilung der monatlichen Bestandswerte später für den Vergleich mit anderen Monaten nützlich, und sie könnten Hinweise darauf geben, ob die Werte gleich oder ähnlich sind.
Der Vorteil der Low-Level-Protokollierung besteht darin, dass du völlig flexibel entscheiden kannst, was du sichtbar machen möchtest. Beispiele dafür sind benutzerdefinierte Metriken und Key Performance Indicators (KPIs).
Nicht alle Aufträge sind gleich; jedes Unternehmen, jedes Projekt und jede Anwendung hat ihre eigenen Besonderheiten. Du wirst wahrscheinlich bestimmte Kennzahlen kontrollieren, egal ob sie mit verbrauchten oder produzierten Daten verknüpft sind. Eine solche Kennzahl für eine Tabelle könnte die Summe der Anzahl der Artikel multipliziert mit den Kosten pro Einheit abzüglich des von einem Webservice erhaltenen Betrags sein, count(items) * cost_per_unit
. Das Ergebnis muss immer größer als Null sein. Dies kann einfach in den Quellcode eingefügt werden, muss aber vom Ingenieur hinzugefügt werden, da es sich um spezifische Metriken handelt, die mit der Geschäftslogik (und der Semantik der Spalten) verbunden sind.
Ein weiterer Grund für die Anpassung von Beobachtungen sind KPIs - Zahlen, die von den Stakeholdern angefordert werden und für das zugrunde liegende Geschäft wichtig sind. KPIs werden oft in regelmäßigen Abständen gemeldet oder auf Anfrage berechnet und in zufälligen oder festen Abständen verwendet. Ihre Bedeutung ist jedoch so groß, dass die Stakeholder hohe Erwartungen an sie stellen und wenig bis keine Zeit haben, auf Korrekturen zu warten. Deshalb musst du auch Transparenz darüber schaffen, wie sich die KPIs im Laufe der Zeit entwickeln, denn wenn ein Stakeholder Zweifel daran hat, beginnt die Zeit in dem Moment zu ticken, in dem er dich nach ihrer Korrektheit fragt. Um die Reaktionsfähigkeit zu erhöhen, musst du generell wissen, wie sich die KPIs entwickeln. Du musst erkennen, wie sie sich verändern, bevor sie es tun, und verstehen, warum sie sich aufgrund ihrer historischen Werte und ihrer Herkunft verändern.
Wie du bei der Aktualisierung der Anwendung reporting
vielleicht schon vermutet hast, ist die Definition der API - das Modell, die Kodierung und die Funktionen - keine Aufgabe für jede einzelne Anwendung. Vielmehr musst du sie standardisieren und anwendungsübergreifend wiederverwenden. Durch die Standardisierung wird der Arbeitsaufwand pro Anwendung reduziert. Noch wichtiger ist, dass die Beobachtungen unabhängig von der Anwendung einheitlich sind, um den Beobachtern die Arbeit zu erleichtern, das Verhalten der anderen Anwendungen, die an der zu analysierenden Pipeline beteiligt sind, abzugleichen.
Die Standardisierung ist auch hilfreich, um Entitäten anwendungsübergreifend wiederzuverwenden, wie z. B. assets_monthly DataSource
, das die Ausgabe der Anwendung ingestion
und die Eingabe der Anwendung reporting
ist. Durch die einheitliche Darstellung der Beobachtungen kannst du den Status der gesamten Pipeline konsolidieren, indem du die Entitäten anwendungsübergreifend wiederverwendest.
Ein Teil der Architektur zur Unterstützung der Datenbeobachtung muss ein externes System umfassen, das die Beobachtungen zusammenfasst, um systematisch einen globalen Überblick zu schaffen. Durch Beobachter, die sich auf diese aggregierte Sicht verlassen und auf dieser Grundlage handeln, kann das System einige der Aktionen ausführen, die derzeit von den Beobachtern durchgeführt werden - hier kommt das maschinelle Lernen ins Spiel.
Fazit
In diesem Kapitel haben wir uns umfassend mit der Beobachtbarkeit von Daten an der Quelle und ihrer Bedeutung für die Verbesserung der Datenqualität und der operativen Exzellenz befasst. Wir haben uns mit dem Konzept der Erzeugung von Datenbeobachtungen im Code von Datenanwendungen befasst und dabei hervorgehoben, wie wichtig es ist, den Code zur Erzeugung von Beobachtungen in verschiedene Komponenten der Anwendung einzubauen. Zu diesen Komponenten gehören die Anwendung selbst, die genutzten Daten, ihre Beziehungen und ihr Inhalt.
Außerdem haben wir die Entwicklung einer Python-API zur Datenbeobachtung auf niedriger Ebene besprochen, die Entwicklern ein leistungsfähiges Toolset zur nahtlosen Integration von Datenbeobachtungsfunktionen in ihre Anwendungen bietet. Mit dieser API können Praktiker/innen Datenbeobachtungen erstellen, Datenflüsse verfolgen und die Zuverlässigkeit und Genauigkeit ihrer Daten sicherstellen.
Um diese Konzepte zu untermauern, haben wir ein voll funktionsfähiges Beispiel vorgestellt, das die Umwandlung einer in Python geschriebenen, nicht datenbeobachtbaren Datenpipeline in eine robuste und datenbeobachtungsgesteuerte Pipeline zeigt. Durch die Nutzung der speziellen Python-API für Datenbeobachtung haben wir gezeigt, wie Datenbeobachtungen generiert, erfasst und genutzt werden können, um die Transparenz zu erhöhen, Probleme zu erkennen und kontinuierliche Verbesserungen voranzutreiben.
Die in diesem Kapitel vorgestellten Prinzipien und Strategien dienen als Grundlage für die Integration von Datenbeobachtung in die Struktur von Datenanwendungen. Wenn Unternehmen diese Praktiken anwenden, können sie sicherstellen, dass ihre Datenpipelines robust und zuverlässig sind und wertvolle Erkenntnisse mit einem hohen Maß an Vertrauen liefern können.
Trotz der hohen Anpassungsfähigkeit und Flexibilität von Low-Level-Logging kann der anfängliche Aufwand die Einführung behindern. Dieses Argument gilt auch für die Einführung von Tests. Deshalb ist es wichtig, die Komplexität der Nutzung auf dieser Ebene zu vereinfachen. Außerdem müssen wir nach alternativen Ansätzen suchen, die das Low-Level-Logging ergänzen und gleichzeitig die breite Akzeptanz der Datenbeobachtung in Teams und bei Einzelpersonen fördern. In den folgenden Abschnitten werden wir uns mit diesem Thema befassen und mit der Erforschung ereignisbasierter Systeme beginnen.
1 Das hat mich dazu gebracht, über die datenbeobachtungsbasierte Entwicklungsmethode nachzudenken.
Get Grundlagen der Beobachtbarkeit von Daten now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.