Kapitel 4. Datenbeobachtungen generieren

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

Wie in Kapitel 3 erläutert, kombiniert die Datenbeobachtung Technologie und Menschen, um Informationen über den Zustand eines Systems aus der Datenperspektive und die Erwartungen an diesen Zustand zu sammeln. Diese Informationen werden dann genutzt, um das System anpassungsfähiger oder widerstandsfähiger zu machen.

In diesem Kapitel wird erklärt, wie du die Praxis der Datenbeobachtung anwenden kannst. Ich beginne mit der "Datenbeobachtung an der Quelle", einer Methode zur Einführung von Erfassungsstrategien in deine tägliche Datenarbeit, und zeige dir, wie du ihre Auswirkungen auf die Effizienz minimieren kannst. Dann geht das Kapitel auf die Umsetzung von Erwartungen ein, die sich an den Lebenszyklus der Softwarebereitstellung anschließen, z. B. kontinuierliche Integration und kontinuierliches Deployment (CI/CD).

Wie bei jeder neuen Praxis und Technologie musst du die Eintrittsbarriere senken, um die Akzeptanz der Datenbeobachtung zu erhöhen; so haben die Menschen weniger Grund, gegen die Veränderung zu argumentieren. Aber auch die Menschen sind Teil der Lösung, denn ihre Beteiligung an dem Prozess ist entscheidend, um ihre Erwartungen zu ermitteln und die Regeln festzulegen. Zu diesem Zweck lernst du verschiedene Möglichkeiten kennen, wie du den Aufwand für die Erstellung von Beobachtungen verringern kannst und wie du sie zum richtigen Zeitpunkt im Entwicklungszyklus einführen kannst.

An der Quelle

In Kapitel 2 wurden die Quellen und Arten von Informationen erläutert, die dem Beobachter helfen. Aber wie generierst und sammelst du die Informationen aus diesen Quellen?

Es beginnt mit der Beobachtbarkeit der Daten an der Quelle. Der Begriff "Quelle" bezieht sich auf die Anwendungen, die für das Lesen, Umwandeln und Schreiben von Daten verantwortlich sind. Wie in Kapitel 3 erläutert, können diese Anwendungen entweder die Ursache von Problemen sein oder die Mittel, um sie zu lösen. Außerdem liegen die Anwendungen im Gegensatz zu den Daten selbst in unserer Kontrolle als Ingenieure und Organisationen.

Die Methode der Datenbeobachtung an der Quelle dreht sich also um die Fähigkeit der Anwendungen, Datenbeobachtungen nach dem in Kapitel 2 behandelten Modell zu generieren; mit anderen Worten, die Anwendungen werden datenbeobachtbar gemacht.

Reine Daten- und Analysebeobachtungen - wie z. B. Datenquellen, Schemata, Linien und Metriken - stehen im Zusammenhang mit Lese-, Transformations- und Schreibaktivitäten. Die in diesem Abschnitt beschriebenen Strategien gehen auf diese Aktivitäten ein und erklären, was bei ihrer Durchführung zu beachten ist.

Erzeugen von Datenbeobachtungen an der Quelle

Die Datenbeobachtung an der Quelle beginnt mit der Erstellung zusätzlicher Informationen, die das Verhalten bestimmter Aktivitäten mit Daten erfassen: Lesen, Umwandeln und Schreiben. Entwickler können zum Beispiel Zeilen von Protokollen hinzufügen, die die Informationen enthalten, die sie benötigen, um Einblicke in die Aktivitäten ihrer Anwendung zu erhalten. In der Anleitung werden die in Kapitel 2 beschriebenen Kanäle - also die Logs, Metriken und Traces - verwendet, um die Beobachtungen zu vermitteln, die in einem Logging-System zentralisiert werden können.

Im nächsten Abschnitt lernst du, wie du Datenbeobachtungen im JSON-Format erstellst, die in lokalen Dateien, einem lokalen Dienst, einem entfernten (Web-)Dienst oder ähnlichen Zielen gesammelt (veröffentlicht) werden können. Nach dem Kernmodell der Datenbeobachtung in Kapitel 2 würden die Datenquelle und die Schema-Entität einer Postgres-Tabelle zum Beispiel wie in Beispiel 4-1 aussehen.

Beispiel 4-1. Beispiel für als JSON kodierte Datenbeobachtungen
{
  "id": "f1813697-339f-5576-a7ce-6eff6eb63249",
  "name": "gold.crm.customer",
  "location": "main-pg:5432/gold/crm/table",
  "format": "postgres"
}
{
  "id": "f1813697-339f-5576-a7ce-6eff6eb63249",
  "data_source_ref": {"by_id": "e21ce0a8-a01e-5225-8a30-5dd809c0952e"},
  "fields": [
  { "name": "lastname", "type": "string", "nullable": true }, 
  { "name": "id", "type": "int", "nullable": false }
  ]
}

Die Möglichkeit, Datenbeobachtungen in demselben Modell auf einer zentralen Plattform zu sammeln, ist der Schlüssel, um den Wert der Datenbeobachtung im großen Maßstab zu generieren, z. B. über Anwendungen, Teams und Abteilungen hinweg. Deshalb ist die Verwendung des Kernmodells der Datenbeobachtung wichtig, um Datenbeobachtungen einfach zu aggregieren, insbesondere entlang von Linien.

Sehen wir uns an, wie man Datenbeobachtungen in Python mithilfe einer Low-Level-API erzeugt, die zur Einführung von Abstraktionen auf höherer Ebene verwendet wird (die in den nächsten Kapiteln behandelt werden).

Low-Level-API in Python

Diese Strategie, eine Low-Level-API zu verwenden, erfordert viel Zeit und Engagement von dir, da du jede Beobachtung explizit erstellen musst. Allerdings bietet dir diese Strategie auch die größte Flexibilität, da sie keine Abstraktionen auf höherer Ebene beinhaltet.

Auf der anderen Seite erfordert die Unterstützung der Datenbeobachtbarkeit auf dieser Ebene, insbesondere während der Erkundung und Wartung, dass der Entwickler konsequent ist und immer daran denkt, was er in der Produktion beobachten möchte (zum Beispiel sollte jeder erfahrene Entwickler so viele Zeilen für Protokolle und Prüfungen produzieren wie für die Geschäftslogik).1 Während der Entwicklung müssen die Entwickler dann die Änderungen an der Logik oder dem Verhalten der Anwendung durch die Erstellung der zugehörigen Beobachtungen sichtbar machen. Beispiele für solche Beobachtungen sind eine Verbindung zu einer neuen Tabelle, die Erstellung einer neuen Datei oder die Änderung einer Struktur mit einem neuen Feld.

In den folgenden Abschnitten wirst du ein komplettes Beispiel für eine in Python geschriebene Datenanwendung durchgehen, die neben der Datennutzung auch Datenbeobachtungen erzeugt, indem du Folgendes tust:

  • Verstehen von Anwendungen ohne Datenbeobachtungsmöglichkeit

  • Hinzufügen von Anweisungen zur Erzeugung von Datenbeobachtungen und deren Zweck

  • Erkenntnisse über die Vor- und Nachteile dieser Strategie gewinnen

Beschreibung der Datenpipeline

Im weiteren Verlauf des Kapitels werde ich eine in Python geschriebene Datenpipeline verwenden, mit der wir Daten beobachtbar machen. Der Code der Pipeline auf GitHub ermöglicht es dir, die Beispiele in diesem Kapitel auszuführen. Sie nutzt die Pandas-Bibliothek, um CSV-Dateien zu verarbeiten, und besteht aus zwei Anwendungen: Ingestion und Reporting, wie in Abbildung 4-1 dargestellt.

Structure of the example pipeline
Abbildung 4-1. Struktur der Beispielpipeline

Beide Anwendungen (ingestion und reporting) verwenden Python und Pandas und nutzen Daten aus der Datenquelle "Tägliche Aktienkurse", um zwei nachgelagerte Berichte (BuzzFeed-Aktien und AppTech) zu erstellen.

Die Anwendung ingestion liest die CSV-Dateien mit den täglichen Aktienkursen, die das Börsenteam jeden Monat bereitstellt. Das Team hat die Dateien nach Jahr und Monat unterteilt und dann die Preise in monatlichen Ansichten zusammengeführt, die in einer separaten Datei gespeichert werden, wie in ingestion in Beispiel 4-2.

Beispiel 4-2. Anwendung zur Datenübernahme
import pandas as pd

AppTech = pd.read_csv( 1
   "data/AppTech.csv",
   parse_dates=["Date"], 2
   dtype={"Symbol": "category"}, 3
)
Buzzfeed = pd.read_csv(
   "data/Buzzfeed.csv",
   parse_dates=["Date"],
   dtype={"Symbol": "category"},
)

monthly_assets = pd.concat([AppTech, Buzzfeed]) \ 4
   .astype({"Symbol": "category"})
monthly_assets.to_csv(
   "data/monthly_assets.csv", index=False
)

1 Die Anwendung ingestion verarbeitet zwei CSV-Dateien für die folgenden Bestände:

  • BuzzFeed

  • AppTech

Die Anwendung liest die Dateien aus dem Ordner "../data", als Referenz; eine Zeile in diesen Dateien sieht so aus:

1 Date Symbol Open High Low Close AdjClose Volume
2 2022-01-03 APCX 14.725 15.200 14.25 14.25 14.25 1600

Während der Erkundung stellte unser Dateningenieur fest, dass die Dateien zwei Felder enthalten, die vorverarbeitet werden müssen. 2 Das Feld Date muss als Datum geparst werden, 3 und das Feld Symbol müssen als Kategorie behandelt werden.

Die Anwendung liest und verarbeitet die täglichen Bestände, bevor sie sie im Speicher zur Verfügung stellt. Dann fügt die Anwendung sie in einer einzigen Variablen zusammen, die in diesem Fall ein neuer pandas DataFrame ist, der alle Kategorien in Symbol enthält.

4 Das Skript schreibt das Ergebnis in denselben Ordner als neue Datei namens monthly_assets.csv. Auf diese Weise können andere Analysten oder Datentechniker dort beginnen, wenn sie alle täglichen Aktienkurse für weitere Analysen benötigen.

Nachdem dieses erste Skript ausgeführt wurde und die Datei verfügbar ist, können die übrigen Pipeline-Skripte ausgeführt werden, um die BuzzFeed- und AppTech-Aktienberichte zu erstellen. In diesem Beispiel gibt es nur ein Skript, die Berichts-Python-Datei, die in Beispiel 4-3 gezeigt wird.

Beispiel 4-3. Anwendung zur Datenmeldung
import pandas as pd

all_assets = pd.read_csv("data/monthly_assets.csv", parse_dates=['Date']) 1

apptech = all_assets[all_assets['Symbol'] == 'APCX'] 2
buzzfeed = all_assets[all_assets['Symbol'] == 'BZFD']

buzzfeed['Intraday_Delta'] = buzzfeed['Adj Close'] - buzzfeed['Open'] 3
apptech['Intraday_Delta'] = apptech['Adj Close'] - apptech['Open']

kept_values = ['Open', 'Adj Close', 'Intraday_Delta']

buzzfeed[kept_values].to_csv("data/report_buzzfeed.csv", index=False) 4
apptech[kept_values].to_csv("data/report_appTech.csv", index=False)

Diese Anwendung führt die folgenden Aktionen durch:

1 Liest die von der vorherigen Anwendung erstellte Datenquelle

2 Filtert die Daten für die beiden Berichte heraus, die er erstellt: BuzzFeed und AppTech.

3 Berechnet die Intraday_Delta Scores, also die täglichen Bestandsentwicklungen, die zu melden sind.

4 Meldet die Open, Adj Close, und die berechneten Werte Intraday_Delta in den entsprechenden Dateien für jeden Bestand.

Um die Pipeline ordnungsgemäß auszuführen, ist es wichtig, die Abhängigkeiten zwischen den Reporting- und Ingestion-Anwendungen zu beachten. Das heißt, du musst die ingestion Anwendung erfolgreich ausgeführt werden, bevor du die reporting Anwendung ausführst. Andernfalls wird die Pipeline fehlschlagen. Eine Lösung, um dies zu erreichen, ist die Verwendung eines Orchestrators, der ingestion vor reporting ausführt, wie z. B. Dagster und Airflow. Der Orchestrator ist eigentlich eine weitere Anwendung, die man konfigurieren kann, indem man die Abhängigkeiten zwischen den Anwendungen auf der Datenebene explizit festschreibt. Die Anwendungen selbst wissen aber immer noch nichts über ihre nachgelagerten Abhängigkeiten.

Die Kehrseite der Hardcodierung der Abhängigkeiten in einem Orchestrator ist, dass es sich um ein neues Asset handelt, das gepflegt werden muss (z. B. die Genauigkeit der expliziten Abhängigkeiten). Außerdem gibt es zusätzliche Einschränkungen bei der Erstellung einer Pipeline, da die Teams unabhängig bleiben müssen und daher nicht einfach eine bestehende Pipeline aktualisieren können, um ihre eigenen Bedürfnisse zu erfüllen. Daher muss eine Erweiterung auf einer höheren Ebene erstellt werden, indem eine neue, separate DAG hinzugefügt wird, die die expliziten Abhängigkeiten aufheben könnte.

Kehren wir zu unserer Pipeline zurück und besprechen wir die funktionalen Abhängigkeiten zwischen den Anwendungen. Das heißt, dass ingestion erfolgreich ausgeführt werden muss, bevor reporting ausgeführt werden kann. Aber was bedeutet erfolgreich?

Definition des Status der Datenpipeline

Um festzustellen, ob eine Ausführung einer Datenpipeline erfolgreich ist, werde ich die entgegengesetzte Frage analysieren: Welche Arten von Fehlern können in der ingestion auftreten?

Explizites Scheitern
Dies führt zu einem Absturz der Anwendung. Diese Art von Fehler ist für den Orchestrator einfach zu handhaben, wenn du einen verwendest, denn es ist ein Flag, das die nächste Anwendung nicht auslöst: in unserem Beispiel die Anwendung reporting.
Stiller Ausfall
In diesem Fall wird die Anwendung beendet, ohne dass z. B. ein Fehlercode oder ein Protokoll erscheint. Da sie nicht wie erwartet läuft, musst du den Begriff der Erwartungen berücksichtigen, der in Kapitel 2 eingeführt wurde.

Ein Beobachter der ingestion Anwendung würde expliziten Fehlern wie diesen begegnen:

Datei nicht gefunden Fehler
Tritt auf, wenn eine der Datendateien, wie z. B. Buzzfeed.csv, im Ordner nicht verfügbar ist, weil sie umbenannt oder in Kleinbuchstaben geändert wurde oder die Datei nicht erstellt wurde, bevor die Anwendung ingestion ausgeführt wurde.
Tippfehler (TypeError)
Tritt auf, wenn einige Werte nicht mit den Typen übereinstimmen, die der Funktion read_csv zur Verfügung gestellt werden, z. B. wenn das Symbol eine Kategorie sein sollte.
Fehler bei hartkodierten Namen
Treten auf, wenn eines der Felder, die explizit in dem Code verwendet werden, um auf die Werte zuzugreifen, wie z.B. der Spaltenname Date in Beispiel 4-3 1 und Symbol in diesem Fall, nicht vorhanden ist oder der Name geändert wurde.
Dateisystem-Fehler
Passiert, wenn die Dateien nicht lesbar sind oder der Ordner für den Benutzer, der die Anwendung ausführt, nicht beschreibbar ist.
Speicherfehler
Passiert, wenn die Dateien so groß werden, dass der der Anwendung zugewiesene Speicher nicht mehr ausreicht.
Systemfehler
Wird ausgelöst, wenn die Festplatte keinen Platz mehr hat, um das aggregierte Ergebnis zu schreiben.

Aber aus der Perspektive des Ingenieurs, der es beobachtet, zeigen die folgenden Beispiele stille Fehler:

  • Die Spalte Date kann nicht als Datum geparst werden, weil sie falsch formatiert ist, das Muster geändert wurde oder die Zeitzone nicht konsistent ist. In diesem Fall ist die Spalte nicht mehr eine datetime, sondern eine object.

  • Die Spalte Date enthält Werte, aber nicht für den aktuellen Monat. Alle Werte sind vergangene oder zukünftige Daten.

  • Die Spalte Date enthält Werte in der Zukunft, weil der Generator möglicherweise später ausgeführt wurde und Informationen über die Zukunft generiert hat, um sie mit dem Monat zu vergleichen, in dem er verarbeitet wird. Dies kann später zu Duplikaten oder inkonsistenten Werten für dieselben Daten führen und einige Aggregationen fehlschlagen lassen.

  • Die Kategorie Symbol ändert sich aus verschiedenen Gründen, z. B. wegen eines Tippfehlers oder einer Änderung der Großschreibung, der Länge oder der Referenz. Diese Kategorie wird in allen Dateien verwendet und als Kategorie in die Ausgabedatei geschrieben.

Außerdem könnte die Anwendung reporting die Anwendung ingestion als fehlgeschlagen betrachten, weil sie die folgenden Fehlschläge in reporting provoziert hat:

  • Die monatliche Aggregatdatei wurde nicht geschrieben, als die Berichterstattung begann, so dass sie nicht verfügbar ist, wenn das Berichtstool in einem bestimmten Intervall ausgeführt werden soll.

  • Eines der Felder, die zum Filtern in der Arithmetik verwendet werden, ist nicht verfügbar oder sein Name wurde geändert.

  • Die gleichen Fehler treten beim Lese-/Schreibzugriff, der Größe und dem Speicherplatz auf.

Außerdem kann reporting lautlos fehlschlagen, weil die stillen Ausfälle der ingestion

  • Die Symbole APCX und ENFA sind in der bereitgestellten Datei nicht verfügbar.

  • Adj Close oder Open fehlende Werte sind, was zu Müll in der Intraday_Delta Ausgabe führt. Dieses Problem kann auch ein expliziter Fehler sein. In diesem Fall werden die Werte zu NA von Pandas.

  • Die aggregierte Datei enthält keine Informationen über den aktuellen Monat, aber die Daten liegen in der Vergangenheit.

Da jeder dieser Fehler auftreten kann, musst du wissen, wann er auftritt, und - noch besser - du musst ihn frühzeitig erkennen, um zu verhindern, dass er von der Anwendung ingestion weitergegeben wird (siehe "Fail Fast and Fail Safe").

Die expliziten Fehler sollten als Entwicklungspraxis bereits sichtbar gemacht worden sein, um diese Fehler explizit abzufangen (try…except in Python). Damit ein Beobachter jedoch stille Fehler erkennen und entdecken kann, muss die Anwendung die entsprechenden Beobachtungen liefern.

Datenbeobachtungen für die Datenpipeline

In diesem Abschnitt werde ich einen Überblick über die Datenbeobachtungen geben, die die Datenpipeline erzeugen muss. Dazu werfen wir einen kurzen Blick auf Abbildung 4-2, die zeigt, wie eine Low-Level-API das in Kapitel 2 vorgestellte Modell umsetzt. Es ist interessant festzustellen, dass sie eine ähnliche Struktur und sogar einige Entitäten (beschriftet) haben; in den nächsten Abschnitten werde ich auf jeden Teil einzeln eingehen, um diese Fakten hervorzuheben.

The observations for ingestion and reporting and their similarities
Abbildung 4-2. Die Beobachtungen zur Aufnahme und Meldung und ihre Ähnlichkeiten (eine größere Version dieser Abbildung ist unter https://oreil.ly/TaQGV verfügbar)

In diesem Diagramm siehst du, dass die Entitäten mit den Großbuchstaben A, B, C und D in Kreisen gekennzeichnet sind. Die "A"-Datenquellen heben die Beobachtungen hervor, die von der Anwendung ingestion über die von ihr erzeugten Daten und von reporting beim Lesen der Daten gemacht werden, wodurch die implizite Abhängigkeit deutlich wird.

In der Tat erzeugen beide Anwendungen mehrere ähnliche Beobachtungen, die alle Abhängigkeiten darstellen, die sie miteinander verbinden. In Abbildung 4-2 sind die folgenden ähnlichen Beobachtungen ebenfalls hervorgehoben:

  • Die "B"-Entitäten beobachten den Server, von dem die Daten abgerufen wurden.

  • Die "C"-Entitäten beobachten den Benutzer beim Ausführen von Befehlen.

  • Die "D"-Entitäten beachten das Schema der Daten, die durch die Ingestion erzeugt werden, so wie sie vom Reporting gelesen werden.

Schauen wir uns nun an, was wir zum Code der Anwendung hinzufügen müssen, um die in Abbildung 4-2 gezeigten Beobachtungen zu erzeugen. Da der Code in Python geschrieben ist, verwenden wir das Modul logging, um die in JSON kodierten Beobachtungen zu drucken.

Erzeugen von kontextbezogenen Daten Beobachtungen

In diesem Abschnitt behandle ich den Code, den benötigt, um Beobachtungen über den Ausführungskontext der in Abbildung 4-3 gezeigten ingestion Anwendung zu generieren (beachte, dass reporting denselben Code wiederverwenden kann).

Data observations about the execution context of the ingestion application
Abbildung 4-3. Datenbeobachtungen über den Ausführungskontext der Anwendung ingestion

Füge den Code in Beispiel 4-4 am Anfang der Datei ein, um die Beobachtungen für die Anwendung ingestion zu erzeugen.

Beispiel 4-4. Erzeugen von Datenbeobachtungen über die Ingestion-Anwendung
app_user = getpass.getuser() 1
repo = git.Repo(os.getcwd(), search_parent_directories=True) 2
code_repo = repo.remote().url
commit = repo.head.commit
code_version = commit.hexsha
code_author = commit.author.name
application_name = os.path.basename(os.path.realpath(__file__)) 3
application_start_time = datetime.datetime.now().isoformat()

1 Abrufen von Informationen zum Benutzernamen.

2 Abrufen von Git-Informationen.

3 Abrufen von Informationen über die Ausführung der Anwendung.

Die zusätzlichen Anweisungen in Beispiel 4-5 erstellen Variablen für die Beobachtungen, aber es wird noch nichts mit ihnen gemacht. Um die Informationen zu protokollieren, wie bereits erwähnt hat, verwenden wir eine JSON-Darstellung des Informationsmodells, die wie in Beispiel 4-5 kodiert ist.

Beispiel 4-5. Modellierung von Datenbeobachtungen über die Ingestion-Laufzeit
application_observations = { 
   "name": application_name,
   "code": {
       "repo": code_repo,
       "version": code_version,
       "author": code_author
   },
   "execution": {
       "start": application_start_time,
       "user": app_user
   }
}

Dieser Code erstellt das JSON, das aus allen bisher erstellten Beobachtungen besteht. In diesem Abschnitt geht es jedoch um die Verwendung einer Low-Level-API für die Beobachtung von Daten. Im weiteren Verlauf werden wir auf ein ähnliches Muster stoßen, was uns die Möglichkeit gibt, Funktionen zu erstellen, um den Code zu vereinfachen und ihn in den Anwendungen ingestion und reporting oder in anderen Python-Anwendungen gemeinsam zu nutzen.

Um eine API zu erstellen, erstellen wir ein Modell, das das Beobachtungskernmodell in JSON nachahmt, jede Entität in eine Klasse verwandelt und Beziehungen in Referenzen umwandelt (siehe Beispiel 4-6 ).

Beispiel 4-6. Modellierung der Anwendungsdatenbeobachtungen mit speziellen Klassen
    class Application: 1
       name: str
    
       def __init__(self, name: str, repository: ApplicationRepository) -> None:
           self.name = name
           self.repository = repository
    
       def to_json(self):
           return {"name": self.name, "repository": self.repository.to_json()}
    
    class ApplicationRepository: 2
       location: str
    
       def __init__(self, location: str) -> None:
           self.location = location
    
       def to_json(self):
           return {"location": self.location}
    
    app_repo = ApplicationRepository(code_repo)
    app = Application(application_name, app_repo)

Das bedeutet, dass die Anwendungsentität eine Application 1 Klasse mit einem Eigenschaftsnamen, der den Namen der Datei als application_name Variable enthalten kann, und einen Verweis auf eine ApplicationRepository Instanz haben muss. Diese ApplicationRepository Entität wird als ApplicationRepository 2 Klasse kodiert, deren Eigenschaft location als Git-Remote-Speicherort festgelegt ist. Diese Struktur erleichtert den Aufbau des Modells und die Erstellung einer JSON-Darstellung, die wiederverwendbar ist und zu einer Standardisierung führen kann.

Ein zusätzlicher Vorteil der Kodierung von Konzepten in API-Klassen ist, dass sie die Verantwortung haben, Helfer vorzuschlagen, um zugehörige Beobachtungen zu extrahieren, wie in Beispiel 4-7 1.

Beispiel 4-7. Nutzung von Klassen zur Definition von Helfern für modellierte Entitäten
class ApplicationRepository:
   location: str

   # [...]

   @staticmethod
   def fetch_git_location(): 1
       import git
       code_repo = git.Repo(os.getcwd(), search_parent_directories=True).remote().url
       return code_repo


class Application:
   name: str

   # [...]
   @staticmethod
   def fetch_file_name(): 1
       import os
       application_name = os.path.basename(os.path.realpath(__file__))
       return application_name


app_repo = ApplicationRepository(ApplicationRepository.fetch_git_location())
app = Application(Application.fetch_file_name(), app_repo)

Diese Strategie könnte ein einfacher Weg sein, um das Modell umzusetzen. Wir bevorzugen jedoch einen anderen Ansatz, der die Verbindungen zwischen den Entitäten schwächt. In Beispiel 4-8 werden alle Informationen in einer JSON-Datei protokolliert, wobei die Entitäten in einem Informationsbaum mit Application an der Wurzel angeordnet sind. Diese Kodierung zwingt uns dazu, alle Beobachtungen zu erstellen, bevor wir die Wurzel, also die Application Instanz, protokollieren. Der Application Konstruktor würde dann so aussehen wie in Beispiel 4-8.

Beispiel 4-8. Aufgeblähter Konstruktor für die Anwendung ohne Trennung von Belangen
class Application:
    name: str
    def __init__(self, name: str, version: ApplicationVersion, 
                 repo: ApplicationRepository, execution: ApplicationExecution, 
                 server: Server, author: User) -> None:
        pass

Um diese Komplexität und Einschränkung zu vermeiden, ist es besser, die Abhängigkeiten zwischen den Entitäten umzukehren. Anstatt die Application mit ihren ApplicationVersion oder ApplicationRepository enthält, erstellen wir Application allein und fügen dann einen schwachen Verweis auf sie aus ApplicationVersion und ApplicationRepository. Beispiel 4-9 zeigt, wie dieses Modell aussehen würde.

Beispiel 4-9. Umkehrung der Abhängigkeiten zwischen Entitäten und Einführung id
class ApplicationRepository:
   location: str
   application: Application
   id: str

   def __init__(self, location: str, application: Application) -> None:
       self.location = location
       self.application = application
       id_content = ",".join([self.location, self.application.id])
       self.id = md5(content.encode("utf-8")).hexdigest() 1

   def to_json(self):
       return { "id": self.id, 
                "location": self.location, 
                "application": self.application.id }

   @staticmethod
   def fetch_git_location():
       import git
       code_repo = git.Repo(os.getcwd(), 
                            search_parent_directories=True).remote().url
       return code_repo

Mit diesem Modell können wir die Beobachtungen einzeln protokollieren - zwei Aufrufe an logging.info-reduzieren die Menge der zu speichernden Informationen. Da wir die Beziehungen zwischen den Entitäten neu zusammensetzen müssen, führen wir die Variable id ein, um die Menge der zu protokollierenden Informationen und der zu verknüpfenden Beobachtungen zu reduzieren. Anhand der Protokolle kann id die Verknüpfungen im Modell rekonstruieren, wie z. B. die Abhängigkeit zwischen ApplicationRepository und Application, da sie bereits protokolliert wurden ( id).

In diesem Beispiel hat die Anwendung id lokal erstellt, was zu einem schlechten Design führt, das über mehrere Ausführungen hinweg inkonsistent sein wird. Um dieses Problem zu umgehen, müssen wir einen funktionalen id definieren, der die Entitäten über verschiedene Ausführungen, Einsätze und Anwendungen hinweg identifizieren kann. Dieser Begriff ist in der Modellierung als Primärschlüssel bekannt. Du kannst einen Primärschlüssel z. B. als Eingabe für einen Hash-Algorithmus verwenden, der die id auf deterministische Weise generiert, in diesem Fall mit hashlib.

Beispiel 4-9 zeigt, wie man den Primärschlüssel verwendet, um id konsistent zu generieren, zum Beispiel mit md5 1 . Diese Strategie werden wir in diesem Kapitel immer wieder anwenden, um Entitäten zu erzeugen.

Wrap-Up: Die Daten-Beobachtungs-Daten-Pipeline

Bevor wir analysieren, wie die Beobachtungen bei den zu Beginn dieses Abschnitts vorgestellten expliziten und stillen Fehlern helfen , wenden wir an, was wir bisher für die Anwendung ingestion getan haben, um die Anwendungsdaten reporting beobachtbar zu machen. Siehe Beispiel 4-19.

Beispiel 4-19. Berichtsanwendung mit ausführlichen, aus dem Code generierten Datenbeobachtungen
import ApplicationRepository.fetch_git_location
import ApplicationVersion.fetch_git_version
app = Application(Application.fetch_file_name())
app_repo = ApplicationRepository(fetch_git_location(), app)
git_user = User(ApplicationVersion.fetch_git_author())
app_version = ApplicationVersion(fetch_git_version(), git_user, app_repo)
current_user = User("Emanuele Lucchini")
app_exe = ApplicationExecution(app_version, current_user)

all_assets = pd.read_csv("data/monthly_assets.csv", parse_dates=['Date'])

apptech = all_assets[all_assets['Symbol'] == 'APCX']
buzzfeed = all_assets[all_assets['Symbol'] == 'BZFD']

buzzfeed['Intraday_Delta'] = buzzfeed['Adj Close'] - buzzfeed['Open']
apptech['Intraday_Delta'] = apptech['Adj Close'] - apptech['Open']

kept_values = ['Open', 'Adj Close', 'Intraday_Delta']

buzzfeed[kept_values].to_csv("data/report_buzzfeed.csv", index=False)
apptech[kept_values].to_csv("data/report_appTech.csv", index=False)

all_assets_ds = DataSource("data/monthly_assets.csv", "csv")
all_assets_sc = Schema(Schema.extract_fields_from_dataframe(all_assets), 
                 all_assets_ds)
buzzfeed_ds = DataSource("data/report_buzzfeed.csv", "csv")
buzzfeed_sc = Schema(Schema.extract_fields_from_dataframe(buzzfeed), buzzfeed_ds)
apptech_ds = DataSource("data/report_appTech.csv", "csv")
apptech_sc = Schema(Schema.extract_fields_from_dataframe(apptech), apptech_ds)
# First lineage
lineage_buzzfeed = OutputDataLineage(buzzfeed_sc,
                                    OutputDataLineage.generate_direct_mapping
                                    (buzzfeed_sc, [all_assets_sc]))
lineage_buzzfeed_exe = DataLineageExecution(lineage_buzzfeed, app_exe)
all_assets_ms_1 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets), 
                   1 all_assets_sc, lineage_buzzfeed_exe)
buzzfeed_ms = DataMetrics(DataMetrics.extract_metrics_from_df(buzzfeed), buzzfeed_sc, 
               lineage_buzzfeed_exe)
# Second lineage
lineage_apptech = OutputDataLineage(apptech_sc,
                                    OutputDataLineage.generate_direct_mapping
                                    (apptech_sc, [all_assets_sc]))
lineage_apptech_exe = DataLineageExecution(lineage_apptech, app_exe)
all_assets_ms_2 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets), 
                   1 all_assets_sc, lineage_apptech_exe)
apptech_ms = DataMetrics(DataMetrics.extract_metrics_from_df(apptech), apptech_sc, 
              lineage_apptech_exe)

Indem wir die Beobachtungen auf diese Weise hinzufügen, bleiben die Änderungen ähnlich wie bei der Anwendung ingestion. Dieser Ansatz ermöglicht es uns, Gewohnheiten und Abstraktionen zu entwickeln, wie z. B. ein Framework, das die Anzahl der erforderlichen Änderungen reduziert - fast ein implizites Gesetz in der Entwicklung.

In Beispiel 4-19 siehst du, dass die Beobachtungen, die für die Eingänge generiert werden, an das Ende 1 verschoben wurden. Wir haben uns aus Gründen der Einfachheit des Beispiels für diese Implementierung entschieden. Ein Vorteil ist, dass die zusätzlichen Berechnungen am Ende durchgeführt werden, ohne den Geschäftsablauf zu beeinträchtigen. Der Nachteil ist, dass, wenn zwischendurch etwas fehlschlägt, keine Beobachtungen über die Datenquellen und ihr Schema gesendet werden. Natürlich ist es möglich, diese Situation mit einigen Anpassungen am Code zu vermeiden. Außerdem müssen wir für diese Einführung in eine Low-Level-API einige Boilerplate hinzufügen, um die richtigen Informationen zu generieren, was im Verhältnis zum Geschäftscode wie Lärm klingen mag. Behalte jedoch im Hinterkopf, dass das Skript ursprünglich keine Protokolle enthielt. Im Allgemeinen werden Protokolle sporadisch hinzugefügt, um Informationen über das Verhalten des Skripts zu erhalten, was wir auch getan haben, allerdings für die Daten.

Behalte auch diese Punkte im Hinterkopf:

  1. Wir haben den OutputDataLineage.generate_direct_mapping Helper so verwendet, wie er ist, um ein Lineage Mapping zwischen den Outputs und Inputs zu erstellen. Das wird jedoch nicht funktionieren, weil wir Adj Close und Open aus der Datei monthly_assets.csv in der neuen Spalte Intraday_Delta zusammengefasst haben. Da die Felder nicht denselben Namen haben, wird die "direkte" Heuristik diese Abhängigkeit nicht erkennen.

  2. Es wird eine Warnmeldung angezeigt, wenn dieselben Beobachtungen ein zweites Mal für die monthly_assets gemeldet werden. Das haben wir getan, weil wir Lineage pro Ausgabe kodiert haben. Wir haben jetzt zwei Lineages, die jeweils Ausgaben für die Dateien report_buzzfeed.csv und report_AppTech.csv erzeugen. Da die Ausgabe dieselben Daten wie die (gefilterte) Eingabe wiederverwendet, müssen wir für jede Ausgabe melden, wie die Eingabe aussieht, damit sie nicht als Duplikate angezeigt werden. Alternativ könnten wir die Beobachtungen wiederverwenden oder das Modell anpassen, um diese Doppelung zu vermeiden. Du könntest stattdessen die folgenden Optionen in Betracht ziehen:

    • Wenn wir unsere Strategie dahingehend ändern, dass wir jedes Mal lesen, wenn auf die Daten zugegriffen wird, anstatt sie in den Speicher zu laden, dann sind die Beobachtungen nicht mehr identisch, wenn sich die Daten zwischen den beiden Schreibvorgängen ändern. Wenn eine Ausgabe Probleme hat, ziehen wir es vor, die Beobachtungen der Eingabe mit dieser Linie zu synchronisieren. Die Wahrscheinlichkeit dieser Situation steigt, wenn du bedenkst, dass jeder Schreibvorgang Stunden und nicht nur Sekunden dauern kann.

    • Die Anwendung reporting erzeugt bei jedem Durchlauf alle Ausgaben, aber durch späteres Refactoring kann dies geändert und parametrisierbar gemacht werden. So kann zum Beispiel nur eine Ausgabe erstellt werden, wie die von BuzzFeed. Daher wird jeder reporting Datensatz durch unabhängige Läufe erzeugt. In diesem Fall bilden die Beobachtungen dies bereits angemessen ab, sodass wir die Logik nicht anpassen müssen. Mit anderen Worten: Wenn wir die Beobachtungen eines bestimmten Inputs so oft senden, wie er verwendet wird, um einen Output zu erzeugen, entspricht das der Realität, anstatt zu versuchen, Daten zu optimieren, die wie Duplikate aussehen könnten.

Kümmern wir uns um den ersten Punkt und stellen sicher, dass die Lineage die tatsächlichen Verbindungen zwischen den Datenquellen darstellen kann. Um dies auf vereinfachte Weise zu tun, werden wir die in Beispiel 4-19 eingeführte Hilfsfunktion mit zusätzlichen Informationen aktualisieren. Beispiel 4-20 1 zeigt die neue Version dieser Hilfsfunktion, die jetzt ein Argument enthält, um eine nicht-direkte Zuordnung für jede Eingabe zu liefern. In späteren Abschnitten werden Strategien vorgestellt, mit denen dieser häufige Anwendungsfall viel einfacher, effizienter, wartbarer und genauer behandelt werden kann, z. B. durch die Verwendung von Monkey Patching.

Beispiel 4-20. Generierte Abstammung auf Feldebene basierend auf übereinstimmenden Feldnamen
@staticmethod
def generate_direct_mapping(output_schema: Schema, 
                            input_schemas: list[tuple[Schema, dict]]):
   input_schemas_mapping = []
   output_schema_field_names = [f[0] for f in output_schema.fields]
   for (schema, extra_mapping) in input_schemas:
       mapping = {}
       for field in schema.fields:
           if field[0] in output_schema_field_names:
               mapping[field[0]] = [field[0]]
       mapping.update(extra_mapping) 1
       if len(mapping):
           input_schemas_mapping.append((schema, mapping))
   return input_schemas_mapping

Beispiel 4-21 zeigt den letzten Teil der Beobachtungen der reporting Anwendung.

Beispiel 4-21. Berichtsanwendung macht Daten mit sauberem Code beobachtbar
# First lineage
intraday_delta_mapping = {"Intraday_Delta": ['Adj Close', 'Open']}
a = (all_assets_sc, intraday_delta_mapping)
lineage_buzzfeed = OutputDataLineage(buzzfeed_sc,
                                    OutputDataLineage.generate_direct_mapping(
                                    buzzfeed_sc, [(all_assets_sc, 
                                    intraday_delta_mapping)]))
lineage_buzzfeed_exe = DataLineageExecution(lineage_buzzfeed, app_exe)
all_assets_ms_1 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets), 
                   all_assets_sc, lineage_buzzfeed_exe)
buzzfeed_ms = DataMetrics(DataMetrics.extract_metrics_from_df(buzzfeed), 
               buzzfeed_sc, lineage_buzzfeed_exe)
# Second lineage
lineage_apptech = OutputDataLineage(apptech_sc,
                                   OutputDataLineage.generate_direct_mapping(
                                    apptech_sc, [(all_assets_sc, 
                                                 intraday_delta_mapping)]))
lineage_apptech_exe = DataLineageExecution(lineage_apptech, app_exe)
all_assets_ms_2 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets), 
                   all_assets_sc, lineage_apptech_exe)
apptech_ms = DataMetrics(DataMetrics.extract_metrics_from_df(apptech), apptech_sc, 
              lineage_apptech_exe)

Datenbeobachtungen zur Behebung von Fehlern in der Datenpipeline nutzen

Jetzt können wir unsere Pipeline bereitstellen, ausführen und überwachen, indem wir die Beobachtungen nutzen, die sie bei jedem Durchlauf generiert. Die Low-Level-API erforderte einiges an zusätzlichem Aufwand und Überzeugung. Dennoch sind wir mit den Ergebnissen zufrieden. Jede Minute, die wir für diese Aufgaben aufwenden, bringt uns den 100-fachen Nutzen in Form von vermiedenen Umsatzeinbußen - gemäß der Qualitätskostenregel 1-10-100 -, wenn in der Produktion Probleme auftreten.

Schauen wir uns die Probleme an, die in diesem Abschnitt auf erwähnt wurden, beginnend mit den ingestion Bewerbungsfehlern, die auftreten können:

Eingabedateien nicht gefunden
Die DataSource Beobachtungen werden bei jedem Lauf gesendet. Nachdem sie gelesen wurden, senden die fehlgeschlagenen Läufe also keine von ihnen. Selbst für jemanden, der die Anwendungslogik nicht kennt, ist es klar, dass die bisher verwendeten Dateien fehlen.
Tippfehler beim Lesen
Die Schema Beobachtungen werden gesendet und enthalten die Feldnamen, die mit ihrem Typ verbunden sind. Daher ist der erwartete Typ für den Beobachter klar, ohne dass er die Anwendung oder die Dateien der Vormonate aufrufen muss.
Fehler aufgrund von fehlenden Feldern
Die gleichen Beobachtungen wie bei "Typfehler beim Lesen" helfen dem Beobachter, schnell zu erkennen, welche Felder in den vorherigen Durchläufen vorhanden waren, die im aktuellen Durchlauf fehlen.
Dateisystem-Fehler
Die von der Pandas-Bibliothek ausgelöste Ausnahme liefert normalerweise den Pfad, der zu einem Fehler geführt hat. Die verbleibende Information, die dem Beobachter zur Verfügung steht, um das Problem zu identifizieren, ist der Server, den dieser Pfad verwendet hat. Die IP, die in den Serverbeobachtungen auf DataSource angegeben ist, gibt sofort Aufschluss darüber, welcher Server mit diesem Pfad verbunden war.
Speicherfehler

Dieses Problem tritt vor allem dann auf, wenn die Datenmenge plötzlich ansteigt. Es können viele Fälle in Betracht gezogen werden, aber sie werden meist intuitiv vom Beobachter gehandhabt, indem er DataMetrics Beobachtungen verwendet, die die Anzahl der Zeilen, das Schema mit der Anzahl der Felder oder die Anzahl von DataSources enthalten. Trotzdem kann es erforderlich sein, dass die Beobachtungen früher als am Ende des Skripts gesendet werden, wie in den folgenden beiden Fällen:

  • Eine der Eingabedateien hat eine größere Größe als zuvor. Die Datei wird leicht erkannt, weil keine DataMetrics für sie verfügbar ist.

  • Der Output ist stark gewachsen, weil alle Dateien gewachsen sind. Der Größenunterschied wird erkannt, weil die DataMetrics für die Ausgaben fehlen. Auch die DataMetrics für die Eingaben zeigen einen Anstieg der Anzahl der Zeilen.

Speicherplatzfehler im Dateisystem

Diese Fehler treten höchstwahrscheinlich beim Schreiben der Ausgabe auf, wenn man bedenkt, dass es hier um die Beobachtbarkeit von Daten geht. Die gleichen Informationen wie bei "Speicherfehler" geben dem Beobachter sofort Aufschluss darüber, warum der verfügbare Speicherplatz nicht mehr ausreicht und welche Dateien nicht geschrieben werden konnten.

Die Date ist nicht als Datum parsbar

  • In diesem Fall hat das Schema die Typbeobachtung für das Datumsfeld von date auf etwas anderes geändert, z.B. str oder object.

Date Spalte enthält keine Werte für das aktuelle Jahr/Monat

  • Die DataMetrics Beobachtungen beinhalten den minimalen und maximalen Zeitstempel, der einen sofortigen Einblick in den Unterschied zwischen der Ausführungszeit und den verfügbaren Daten gibt. Nehmen wir zum Beispiel an, der maximale Zeitstempel liegt zwei Tage vor dem Zeitpunkt, an dem die Datenquelle gelesen wird, dann können die Daten als zu alt angesehen werden, wenn der akzeptable Zeitraum nur einen Tag beträgt.

Date Spalte enthält Werte in der Zukunft

  • Das ist ganz einfach, denn die gleiche Art von Beobachtungen wie beim vorherigen Ausfall aufgrund fehlender Werte für den aktuellen Monat gibt dir diese Sichtbarkeit.

Symbol Kategorien geändert

  • Wenn wir nur numerische DataMetrics betrachten, können wir diesen Fall schnell anhand der Anzahl der Kategorien erkennen, die in der Ausgabedatei wachsen würden. Eine oder einige der Dateien wären nicht mehr konsistent, da sie sich auf verschiedene Kategorien beziehen würden.

Dann müssen wir uns überlegen, unter welchen Umständen die reporting Anwendung die Ingestion-Anwendung als fehlgeschlagen betrachten könnte, und gegebenenfalls, welche reporting Anwendung ein Beobachter verwenden könnte. Zu diesen Situationen gehören die folgenden:

Die monatliche aggregierte Datei ist nicht verfügbar

  • Die DataSource-Beobachtung oder DataMetrics wurde von der Anwendung nicht gesendet. ingestion Anwendung gesendet.

Die Aggregation verwendet fehlende Felder, wie z. B. Close

  • Im Schema der monatlichen Daten, die von der Anwendung ingestion gesendet werden, fehlen diese Felder ebenfalls.

Fehler bei Lese-/Schreibzugriff, Größe und Platz

  • Für den reporting Beobachter gelten die gleichen Lösungen wie für den ingestion Beobachter. Es gibt keine Verzerrung der Informationen zwischen Teams oder Teammitgliedern.

APCX und ENFA Symbole

  • Die Anzahl der Kategorien, die der ingestion Beobachter meldet, hat sich geändert, was in einigen Fällen einen Hinweis darauf gibt, was passiert ist. Wir können DataMetrics jedoch erweitern, um auch nicht-numerische Beobachtungen zu melden und die Kategorien zu melden.

Fehlende Werte in Adj Close oder Open führen zu abnormalen Zahlen

  • Die DataMetrics "Anzahl der Nullen" deckt diesen Fall ab, denn wenn die Anzahl der Nullen größer als Null ist, wird die Berechnung der Intraday_Delta NA s zurückgeben.

Falsches Datum im monatlichen Vermögen

  • Die gleichen Lösungen, die der ingestion Beobachter bei Datumsfehlern anwendet, gelten auch hier. Die Anwendung könnte zum Beispiel den Mindest- und Höchstwert für die Spalte Date im Vergleich zum aktuell gemeldeten Monat verwenden.

Wir sind jetzt in der Lage, mit verschiedenen Situationen umzugehen, in denen die Daten die Ursache für ein Problem sind. Ohne dieses Wissen wären in solchen Situationen lange, stressige Meetings nötig, um sie zu verstehen, und anstrengende Stunden oder Tage für die Fehlersuche, die sich zu Monaten ausweiten könnten, weil wir in der Produktion nicht auf die Daten zugreifen können.

Die bisher diskutierten Probleme sind solche, von denen wir wissen, dass sie auftreten können. Es gibt jedoch noch viele andere Probleme, die sich in der Pipeline ausbreiten können und über die wir noch wenig wissen - die unbekannten Unbekannten. Zum Beispiel sind die Werte eines der Bestände für einen vergangenen Monat falsch, weil beim CSV-Export Fehler in die Daten eingefügt wurden. Dieses Problem wird bei mindestens einer deiner Anwendungen auftreten, und bei anderen ähnlichen.

Aufgrund dieser Unbekannten dürfen die Datenbeobachtungen nicht nur auf vordefinierte Fälle beschränkt werden, sondern die Anwendungen müssen so viele Beobachtungen wie möglich melden - unter Berücksichtigung möglicher Einschränkungen bei den Rechenressourcen und der Zeit -, um einen Überblick über die erwartete oder nicht erfüllte Situation zu erhalten. In diesem Beispiel wäre die Verteilung der monatlichen Bestandswerte später für den Vergleich mit anderen Monaten nützlich, und sie könnten Hinweise darauf geben, ob die Werte gleich oder ähnlich sind.

Der Vorteil der Low-Level-Protokollierung besteht darin, dass du völlig flexibel entscheiden kannst, was du sichtbar machen möchtest. Beispiele dafür sind benutzerdefinierte Metriken und Key Performance Indicators (KPIs).

Nicht alle Aufträge sind gleich; jedes Unternehmen, jedes Projekt und jede Anwendung hat ihre eigenen Besonderheiten. Du wirst wahrscheinlich bestimmte Kennzahlen kontrollieren, egal ob sie mit verbrauchten oder produzierten Daten verknüpft sind. Eine solche Kennzahl für eine Tabelle könnte die Summe der Anzahl der Artikel multipliziert mit den Kosten pro Einheit abzüglich des von einem Webservice erhaltenen Betrags sein, count(items) * cost_per_unit. Das Ergebnis muss immer größer als Null sein. Dies kann einfach in den Quellcode eingefügt werden, muss aber vom Ingenieur hinzugefügt werden, da es sich um spezifische Metriken handelt, die mit der Geschäftslogik (und der Semantik der Spalten) verbunden sind.

Ein weiterer Grund für die Anpassung von Beobachtungen sind KPIs - Zahlen, die von den Stakeholdern angefordert werden und für das zugrunde liegende Geschäft wichtig sind. KPIs werden oft in regelmäßigen Abständen gemeldet oder auf Anfrage berechnet und in zufälligen oder festen Abständen verwendet. Ihre Bedeutung ist jedoch so groß, dass die Stakeholder hohe Erwartungen an sie stellen und wenig bis keine Zeit haben, auf Korrekturen zu warten. Deshalb musst du auch Transparenz darüber schaffen, wie sich die KPIs im Laufe der Zeit entwickeln, denn wenn ein Stakeholder Zweifel daran hat, beginnt die Zeit in dem Moment zu ticken, in dem er dich nach ihrer Korrektheit fragt. Um die Reaktionsfähigkeit zu erhöhen, musst du generell wissen, wie sich die KPIs entwickeln. Du musst erkennen, wie sie sich verändern, bevor sie es tun, und verstehen, warum sie sich aufgrund ihrer historischen Werte und ihrer Herkunft verändern.

Wie du bei der Aktualisierung der Anwendung reporting vielleicht schon vermutet hast, ist die Definition der API - das Modell, die Kodierung und die Funktionen - keine Aufgabe für jede einzelne Anwendung. Vielmehr musst du sie standardisieren und anwendungsübergreifend wiederverwenden. Durch die Standardisierung wird der Arbeitsaufwand pro Anwendung reduziert. Noch wichtiger ist, dass die Beobachtungen unabhängig von der Anwendung einheitlich sind, um den Beobachtern die Arbeit zu erleichtern, das Verhalten der anderen Anwendungen, die an der zu analysierenden Pipeline beteiligt sind, abzugleichen.

Die Standardisierung ist auch hilfreich, um Entitäten anwendungsübergreifend wiederzuverwenden, wie z. B. assets_monthly DataSource, das die Ausgabe der Anwendung ingestion und die Eingabe der Anwendung reporting ist. Durch die einheitliche Darstellung der Beobachtungen kannst du den Status der gesamten Pipeline konsolidieren, indem du die Entitäten anwendungsübergreifend wiederverwendest.

Ein Teil der Architektur zur Unterstützung der Datenbeobachtung muss ein externes System umfassen, das die Beobachtungen zusammenfasst, um systematisch einen globalen Überblick zu schaffen. Durch Beobachter, die sich auf diese aggregierte Sicht verlassen und auf dieser Grundlage handeln, kann das System einige der Aktionen ausführen, die derzeit von den Beobachtern durchgeführt werden - hier kommt das maschinelle Lernen ins Spiel.

Fazit

In diesem Kapitel haben wir uns umfassend mit der Beobachtbarkeit von Daten an der Quelle und ihrer Bedeutung für die Verbesserung der Datenqualität und der operativen Exzellenz befasst. Wir haben uns mit dem Konzept der Erzeugung von Datenbeobachtungen im Code von Datenanwendungen befasst und dabei hervorgehoben, wie wichtig es ist, den Code zur Erzeugung von Beobachtungen in verschiedene Komponenten der Anwendung einzubauen. Zu diesen Komponenten gehören die Anwendung selbst, die genutzten Daten, ihre Beziehungen und ihr Inhalt.

Außerdem haben wir die Entwicklung einer Python-API zur Datenbeobachtung auf niedriger Ebene besprochen, die Entwicklern ein leistungsfähiges Toolset zur nahtlosen Integration von Datenbeobachtungsfunktionen in ihre Anwendungen bietet. Mit dieser API können Praktiker/innen Datenbeobachtungen erstellen, Datenflüsse verfolgen und die Zuverlässigkeit und Genauigkeit ihrer Daten sicherstellen.

Um diese Konzepte zu untermauern, haben wir ein voll funktionsfähiges Beispiel vorgestellt, das die Umwandlung einer in Python geschriebenen, nicht datenbeobachtbaren Datenpipeline in eine robuste und datenbeobachtungsgesteuerte Pipeline zeigt. Durch die Nutzung der speziellen Python-API für Datenbeobachtung haben wir gezeigt, wie Datenbeobachtungen generiert, erfasst und genutzt werden können, um die Transparenz zu erhöhen, Probleme zu erkennen und kontinuierliche Verbesserungen voranzutreiben.

Die in diesem Kapitel vorgestellten Prinzipien und Strategien dienen als Grundlage für die Integration von Datenbeobachtung in die Struktur von Datenanwendungen. Wenn Unternehmen diese Praktiken anwenden, können sie sicherstellen, dass ihre Datenpipelines robust und zuverlässig sind und wertvolle Erkenntnisse mit einem hohen Maß an Vertrauen liefern können.

Trotz der hohen Anpassungsfähigkeit und Flexibilität von Low-Level-Logging kann der anfängliche Aufwand die Einführung behindern. Dieses Argument gilt auch für die Einführung von Tests. Deshalb ist es wichtig, die Komplexität der Nutzung auf dieser Ebene zu vereinfachen. Außerdem müssen wir nach alternativen Ansätzen suchen, die das Low-Level-Logging ergänzen und gleichzeitig die breite Akzeptanz der Datenbeobachtung in Teams und bei Einzelpersonen fördern. In den folgenden Abschnitten werden wir uns mit diesem Thema befassen und mit der Erforschung ereignisbasierter Systeme beginnen.

1 Das hat mich dazu gebracht, über die datenbeobachtungsbasierte Entwicklungsmethode nachzudenken.

Get Grundlagen der Beobachtbarkeit von Daten now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.