Capítulo 4. Ingestión de Datos: Extracción de datos

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

Como se explica en el Capítulo 3, el patrón ELT es el diseño ideal para las canalizaciones de datos creadas para el análisis de datos, la ciencia de datos y los productos de datos. Los dos primeros pasos del patrón ELT, extracción y carga, se denominan colectivamente ingestión de datos. Este capítulo trata de cómo configurar tu entorno de desarrollo y tu infraestructura para ambas cosas, y repasa los detalles de la extracción de datos de varios sistemas fuente. El Capítulo 5 trata de la carga de los conjuntos de datos resultantes en un almacén de datos.

Nota

Los ejemplos de código de extracción y carga de este capítulo están totalmente desacoplados entre sí. Coordinar los dos pasos para completar una ingesta de datos es un tema que se trata en el Capítulo 7.

Como se explica en el Capítulo 2, existen numerosos tipos de sistemas fuente de los que extraer datos, así como numerosos destinos en los que cargarlos. Además, los datos se presentan en muchas formas, y todas ellas plantean distintos retos a la hora de ingerirlos.

Este capítulo y el siguiente incluyen ejemplos de código para exportar e ingerir datos desde y hacia sistemas comunes. El código está muy simplificado y sólo contiene una mínima gestión de errores. Cada ejemplo pretende ser un punto de partida fácil de entender para las ingestas de datos, pero es totalmente funcional y ampliable a soluciones más escalables.

Nota

Los ejemplos de código de este capítulo escriben los datos extraídos en archivos CSV para cargarlos en el almacén de datos de destino. Hay ocasiones en las que tiene más sentido almacenar los datos extraídos en otro formato, como JSON, antes de cargarlos. Donde proceda, señalo dónde podrías considerar hacer tal ajuste.

El Capítulo 5 también analiza algunos marcos de código abierto en los que puedes basarte, y alternativas comerciales que ofrecen a los ingenieros y analistas de datos opciones de "bajo código" para la ingesta de datos.

Configurar tu entorno Python

Todos los ejemplos de código que siguen están escritos en Python y SQL y utilizan marcos de código abierto que son comunes en el campo de la ingeniería de datos hoy en día. Por simplicidad, el número de fuentes y destinos es limitado. Sin embargo, cuando procede, proporciono notas sobre cómo modificar para sistemas similares.

Para ejecutar el código de ejemplo, necesitarás una máquina física o virtual que ejecute Python 3.x. También tendrás que instalar e importar algunas bibliotecas.

Si no tienes Python instalado en tu máquina, puedes obtener la distribución y el instalador para tu sistema operativo directamente de ellos.

Nota

Los siguientes comandos están escritos para una línea de comandos de Linux o Macintosh. En Windows, puede que necesites añadir el ejecutable Python 3 a tu PATH.

Antes de instalar las bibliotecas utilizadas en este capítulo, es mejor crear un entorno virtual en el que instalarlas. Para ello, puedes utilizar una herramienta llamada virtualenv. virtualenv es útil para gestionar bibliotecas Python para diferentes proyectos y aplicaciones. Te permite instalar bibliotecas Python dentro de un ámbito específico de tu proyecto en lugar de hacerlo globalmente. En primer lugar, crea un entorno virtual llamado env.

$ python -m venv env

Ahora que tu entorno virtual está creado, actívalo con el siguiente comando:

$ source env/bin/activate

Puedes comprobar que tu entorno virtual está activado de dos formas. En primer lugar, observarás que tu símbolo del sistema lleva ahora el prefijo del nombre del entorno:

(env) $

También puedes utilizar el comando which python para verificar dónde busca Python las bibliotecas. Deberías ver algo como esto, que muestra la ruta del directorio del entorno virtual:

(env) $ which python
env/bin/python

Ahora ya puedes instalar las bibliotecas que necesitas para los ejemplos de código que siguen.

Nota

En algunos sistemas operativos (SO), debes utilizar python3 en lugar de python para ejecutar el ejecutable de Python 3.x. Las versiones de SO más antiguas pueden utilizar por defecto Python 2.x. Puedes averiguar qué versión de Python utiliza tu SO escribiendo python --version.

A lo largo de este capítulo, utilizarás pip para instalar las bibliotecas utilizadas en los ejemplos de código. pip es una herramienta que viene con la mayoría de las distribuciones de Python.

La primera biblioteca que instalarás utilizando pip es configparser, que se utilizará para leer la información de configuración que añadirás a un archivo más adelante.

(env) $ pip install configparser

A continuación, crea un archivo llamado pipeline.conf en el mismo directorio que los scripts de Python que crearás en las secciones siguientes. Deja el archivo vacío por ahora. Los ejemplos de código de este capítulo te pedirán que lo añadas. En los sistemas operativos Linux y Mac, puedes crear el archivo vacío en la línea de comandos con el siguiente comando:

(env) $ touch pipeline.conf

Configurar el almacenamiento de archivos en la nube

En cada ejemplo de este capítulo, utilizarás un bucket de Amazon Simple Storage Service (Amazon S3 o simplemente S3) para almacenar archivos. S3 está alojado en AWS, y como su nombre indica, S3 es una forma sencilla de almacenar archivos y acceder a ellos. También es muy rentable. En el momento de escribir esto, AWS ofrece 5 GB de almacenamiento S3 gratuito durante 12 meses con una nueva cuenta AWS y cobra menos de 3 céntimos de USD al mes por gigabyte para la clase estándar de almacenamiento S3 después de eso. Dada la simplicidad de los ejemplos de este capítulo, podrás almacenar los datos necesarios en S3 de forma gratuita si aún estás en los primeros 12 meses de creación de una cuenta de AWS, o por menos de 1 dólar al mes después de eso.

Para ejecutar los ejemplos de este capítulo, necesitarás un bucket S3. Afortunadamente, crear un bucket S3 es sencillo, y puedes encontrar las instrucciones más recientes en la documentación de AWS. Configurar el control de acceso adecuado al bucket S3 depende del almacén de datos que estés utilizando. En general, lo mejor es utilizar los roles de AWS Identity and Access Management (IAM) para las políticas de gestión del acceso. Las instrucciones detalladas para configurar dicho acceso tanto para un almacén de datos de Amazon Redshift como de Snowflake se encuentran en las secciones siguientes, pero por ahora, sigue las instrucciones para crear un nuevo bucket. Ponle el nombre que quieras; te sugiero que utilices la configuración predeterminada, que incluye mantener el bucket como privado.

Cada ejemplo de extracción extrae datos del sistema de origen dado y almacena el resultado en el cubo de S3. Cada ejemplo de carga del Capítulo 5 carga esos datos del cubo de S3 en el destino. Este es un patrón común en las canalizaciones de datos. Todos los principales proveedores de nubes públicas tienen un servicio similar a S3. Los equivalentes en otras nubes públicas son Azure Storage en Microsoft Azure y Google Cloud Storage (GCS) en GCP.

También es posible modificar cada ejemplo para utilizar almacenamiento local o local. Sin embargo, se requiere trabajo adicional para cargar datos en tu almacén de datos desde un almacenamiento fuera de su proveedor de nube específico. En cualquier caso, los patrones descritos en este capítulo son válidos independientemente del proveedor de nube que utilices, o si decides alojar tu infraestructura de datos en las instalaciones.

Antes de pasar a cada ejemplo, hay otra biblioteca de Python que tendrás que instalar para que tus scripts de extracción y carga puedan interactuar con tu bucket de S3. Boto3 es el SDK de AWS para Python. Asegúrate de que el entorno virtual que configuraste en la sección anterior está activo y utiliza pip para instalarlo:

(env) $ pip install boto3

En los ejemplos que siguen, se te pedirá que importes boto3 en tus scripts de Python de la siguiente manera:

import boto3

Como vas a utilizar la biblioteca boto3 Python para interactuar con tu cubo de S3, también tendrás que crear un usuario IAM, generar claves de acceso para ese usuario y almacenar las claves en un archivo de configuración que tus scripts de Python puedan leer. Todo esto es necesario para que tus scripts tengan permisos para leer y escribir archivos en tu cubo de S3.

Primero, crea el usuario IAM:

  1. En el menú Servicios de la consola de AWS (o barra de navegación superior), navega hasta IAM.

  2. En el panel de navegación, haz clic en Usuarios y, a continuación, en "Añadir usuario". Escribe el nombre de usuario para el nuevo usuario. En este ejemplo, nombra al usuario data_pipeline_readwrite.

  3. Haz clic en el tipo de acceso para este usuario IAM. Haz clic en "acceso programático", ya que este usuario no necesitará iniciar sesión en la consola de AWS, sino acceder a los recursos de AWS mediante programación a través de scripts de Python.

  4. Haz clic en Siguiente: Permisos.

  5. En la página "Establecer permisos", haz clic en la opción "Adjuntar políticas existentes al usuario directamente". Añade la política AmazonS3FullAccess.

  6. Haz clic en Siguiente: Etiquetas. Es una buena práctica en AWS añadir etiquetas a varios objetos y servicios para que puedas encontrarlos más tarde. Sin embargo, esto es opcional.

  7. Pulsa Siguiente: Revisar para verificar tu configuración. Si todo parece correcto, haz clic en "Crear usuario".

  8. Querrás guardar el ID de la clave de acceso y la clave de acceso secreta del nuevo usuario IAM. Para ello, haz clic en Descargar.csv y, a continuación, guarda el archivo en una ubicación segura para poder utilizarlo en un momento.

Por último, añade una sección al archivo pipeline. conf llamada [aws_boto_credentials] para almacenar las credenciales del usuario IAM y la información del bucket de S3. Puedes encontrar el ID de tu cuenta de AWS haciendo clic en el nombre de tu cuenta en la parte superior derecha de cualquier página cuando hayas iniciado sesión en el sitio de AWS. Utiliza el nombre del bucket de S3 que creaste anteriormente para el valor bucket_name. La nueva sección en pipline. conf tendrá el siguiente aspecto:

[aws_boto_credentials]
access_key = ijfiojr54rg8er8erg8erg8
secret_key = 5r4f84er4ghrg484eg84re84ger84
bucket_name = pipeline-bucket
account_id = 4515465518

Extraer datos de una base de datos MySQL

Extraer datos de una base de datos MySQL puede hacerse de dos formas:

  • Extracción completa o incremental mediante SQL

  • Replicación del registro binario (binlog)

La extracción completa o incremental mediante SQL es mucho más sencilla de implementar, pero también menos escalable para grandes conjuntos de datos con cambios frecuentes. También hay ventajas y desventajas entre las extracciones completas e incrementales que analizo en la sección siguiente.

La replicación del registro binario, aunque es más compleja de implementar, es más adecuada para los casos en los que el volumen de datos de los cambios en las tablas de origen es alto, o existe la necesidad de ingestas de datos más frecuentes desde la fuente MySQL.

Nota

La replicación binlog también es una vía para crear una ingestión de datos en flujo. Consulta la sección "Ingestión por lotes frente a ingestión por flujo" de este capítulo para saber más sobre la distinción entre ambos enfoques, así como sobre los patrones de implementación.

Esta sección es relevante para aquellos lectores que tengan una fuente de datos MySQL de la que necesiten extraer datos. Sin embargo, si quieres configurar una base de datos sencilla para poder probar los ejemplos de código, tienes dos opciones. En primer lugar, puedes instalar MySQL en tu máquina local o virtual de forma gratuita. Puedes encontrar un instalador para tu sistema operativo en la página de descargas de MySQL.

Como alternativa, puedes crear una instancia totalmente gestionada de Amazon RDS para MySQL en AWS. Este método me parece más sencillo, ¡y es agradable no crear un desorden innecesario en mi máquina local!

Advertencia

Cuando sigas las instrucciones enlazadas para configurar una instancia de base de datos MySQL RDS, se te pedirá que configures tu base de datos como de acceso público. Eso está muy bien para aprender y trabajar con datos de muestra. De hecho, facilita mucho la conexión desde cualquier máquina en la que estés ejecutando los ejemplos de esta sección. Sin embargo, para una seguridad más robusta en un entorno de producción, te sugiero que sigas las buenas prácticas de seguridad de Amazon RDS.

Ten en cuenta que, al igual que ocurre con los precios de S3 indicados anteriormente, si ya no puedes optar a la capa gratuita de AWS, ello conlleva un coste. Por lo demás, ¡es gratis configurarlo y ejecutarlo! Sólo recuerda eliminar tu instancia RDS cuando hayas terminado para que no se te olvide e incurras en cargos cuando caduque tu capa gratuita.

Los ejemplos de código de esta sección son bastante sencillos y se refieren a una tabla llamada Orders en una base de datos MySQL. Una vez que tengas una instancia MySQL con la que trabajar, puedes crear la tabla e insertar algunas filas de ejemplo ejecutando los siguientes comandos SQL:

CREATE TABLE Orders (
  OrderId int,
  OrderStatus varchar(30),
  LastUpdated timestamp
);

INSERT INTO Orders
  VALUES(1,'Backordered', '2020-06-01 12:00:00');
INSERT INTO Orders
  VALUES(1,'Shipped', '2020-06-09 12:00:25');
INSERT INTO Orders
  VALUES(2,'Shipped', '2020-07-11 3:05:00');
INSERT INTO Orders
  VALUES(1,'Shipped', '2020-06-09 11:50:00');
INSERT INTO Orders
  VALUES(3,'Shipped', '2020-07-12 12:00:00');

Extracción completa o incremental de tablas MySQL

Cuando necesites ingerir todas o un subconjunto de columnas de una tabla MySQL en un almacén o lago de datos, puedes hacerlo utilizando la extracción completa o la extracción incremental.

En una extracción completa, cada registro de la tabla se extrae en cada ejecución del trabajo de extracción. Éste es el método menos complejo, pero para tablas de gran volumen puede tardar mucho tiempo en ejecutarse. Por ejemplo, si quieres ejecutar una extracción completa en una tabla llamada Orders, el SQL ejecutado en la base de datos MySQL de origen tendrá el siguiente aspecto:

SELECT *
FROM Orders;

En una extracción incremental, sólo se extraen los registros de la tabla de origen que han cambiado o se han añadido desde la última ejecución del trabajo. La marca de tiempo de la última extracción puede almacenarse en una tabla de registro del trabajo de extracción en el almacén de datos o recuperarse consultando la marca de tiempo máxima en una columna LastUpdated de la tabla de destino en el almacén. Utilizando la tabla ficticia Orders como ejemplo, la consulta SQL ejecutada en la base de datos MySQL de origen tendrá el siguiente aspecto:

SELECT *
FROM Orders
WHERE LastUpdated > {{ last_extraction_run} };
Nota

Para las tablas que contienen datos inmutables (lo que significa que los registros se pueden insertar, pero no actualizar), puedes utilizar la marca de tiempo de cuando se creó el registro en lugar de una columna LastUpdated.

La variable {{ last_extraction_run }} es una marca de tiempo que representa la ejecución más reciente del trabajo de extracción. Lo más habitual es que se consulte desde la tabla de destino en el almacén de datos. En ese caso, se ejecutaría el siguiente SQL en el almacén de datos, y el valor resultante se utilizaría para {{ last_extraction_run }}:

SELECT MAX(LastUpdated)
FROM warehouse.Orders;

Aunque la extracción incremental es ideal para un rendimiento óptimo, hay algunas desventajas y razones por las que puede no ser posible para una tabla determinada.

En primer lugar, con este método eliminado, las filas no se capturan. Si se borra una fila de la tabla MySQL de origen, no lo sabrás, y permanecerá en la tabla de destino como si nada hubiera cambiado.

En segundo lugar, la tabla fuente debe tener una marca de tiempo fiable de cuándo se actualizó por última vez (la columna LastUpdated del ejemplo anterior). No es infrecuente que las tablas del sistema fuente carezcan de dicha columna o tengan una que no se actualiza de forma fiable. No hay nada que impida a los desarrolladores actualizar registros en la tabla de origen y olvidarse de actualizar la marca de tiempo LastUpdated.

Sin embargo, la extracción incremental facilita la captura de filas actualizadas. En los próximos ejemplos de código, si se actualiza una fila concreta de la tabla Orders, tanto la extracción completa como la incremental recuperarán la última versión de la fila. En la extracción completa, eso es cierto para todas las filas de la tabla, ya que la extracción recupera una copia completa de la tabla. En la extracción incremental, sólo se recuperan las filas que han cambiado.

Cuando llega el momento del paso de carga, los extractos completos suelen cargarse truncando primero la tabla de destino y cargando los datos recién extraídos. En ese caso, sólo te queda la última versión de la fila en el almacén de datos.

Al cargar datos de una extracción incremental, los datos resultantes se añaden a los datos de la tabla de destino. En ese caso, tienes tanto el registro original como la versión actualizada. Disponer de ambos puede ser valioso a la hora de transformar y analizar los datos, como se explica en el Capítulo 6.

Por ejemplo, la Tabla 4-1 muestra el registro original de OrderId 1 en la base de datos MySQL. Cuando el cliente hizo el pedido, estaba en pedido pendiente. La Tabla 4-2 muestra el registro actualizado en la base de datos MySQL. Como puedes ver, el pedido se actualizó porque se envió el 2020-06-09.

Tabla 4-1. Estado original de OrderId 1
OrderId Estado del pedido ÚltimaActualización

1

Pedidos pendientes

2020-06-01 12:00:00

Tabla 4-2. Estado actualizado de OrderId 1
OrderId Estado del pedido ÚltimaActualización

1

Enviado

2020-06-09 12:00:25

Cuando se ejecuta una extracción completa, primero se trunca la tabla de destino del almacén de datos y luego se carga con el resultado de la extracción. El resultado para OrderId 1 es el único registro que aparece en la Tabla 4-2. Sin embargo, en una extracción incremental, el resultado de la extracción simplemente se añade a la tabla de destino del almacén de datos. El resultado es que tanto el registro original como el actualizado de OrderId 1 están en el almacén de datos, como se muestra en la Tabla 4-3.

Tabla 4-3. Todas las versiones de OrderId 1 en el almacén de datos
OrderId Estado del pedido ÚltimaActualización

1

Pedidos pendientes

2020-06-01 12:00:00

1

Enviado

2020-06-09 12:00:25

Puedes obtener más información sobre la carga de extracciones completas e incrementales en las secciones del Capítulo 5, incluida "Carga de datos en un almacén Redshift".

Advertencia

Nunca des por supuesto que una columna LastUpdated de un sistema fuente está actualizada de forma fiable. Compruébalo con el propietario del sistema fuente y confírmalo antes de confiar en él para una extracción incremental.

Tanto las extracciones completas como las incrementales de una base de datos MySQL pueden implementarse utilizando consultas SQL ejecutadas en la base de datos pero activadas por scripts Python. Además de las bibliotecas Python instaladas en secciones anteriores, necesitarás instalar la biblioteca PyMySQL utilizando pip:

(env) $ pip install pymysql

También tendrás que añadir una nueva sección al archivo pipeline.conf para almacenar la información de conexión a la base de datos MySQL:

[mysql_config]
hostname = my_host.com
port = 3306
username = my_user_name
password = my_password
database = db_name

Ahora crea un nuevo script Python llamado extracto_mysql_full.py. Necesitarás importar varias bibliotecas, como pymysql, que conecta con la base de datos MySQL, y la biblioteca csv para que puedas estructurar y escribir los datos extraídos en un archivo plano que sea fácil de importar a un almacén de datos en el paso de carga de la ingesta. Además, importa boto3 para que puedas subir el archivo CSV resultante a tu cubo de S3 para cargarlo posteriormente en el almacén de datos:

import pymysql
import csv
import boto3
import configparser

Ahora puedes inicializar una conexión con la base de datos MySQL:

parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
dbname = parser.get("mysql_config", "database")
password = parser.get("mysql_config", "password")

conn = pymysql.connect(host=hostname,
        user=username,
        password=password,
        db=dbname,
        port=int(port))

if conn is None:
  print("Error connecting to the MySQL database")
else:
  print("MySQL connection established!")

Ejecuta una extracción completa de la tabla Orders del ejemplo anterior. El siguiente código extraerá todo el contenido de la tabla y lo escribirá en un archivo CSV delimitado por tuberías. Para realizar la extracción, utiliza un objeto cursor de la biblioteca pymysql para ejecutar la consulta SELECT:

m_query = "SELECT * FROM Orders;"
local_filename = "order_extract.csv"

m_cursor = conn.cursor()
m_cursor.execute(m_query)
results = m_cursor.fetchall()

with open(local_filename, 'w') as fp:
  csv_w = csv.writer(fp, delimiter='|')
  csv_w.writerows(results)

fp.close()
m_cursor.close()
conn.close()

Ahora que el archivo CSV está escrito localmente, hay que subirlo al bucket de S3 para cargarlo posteriormente en el almacén de datos u otro destino. Recuerda que en "Configuración del almacenamiento de archivos en la nube" configuraste un usuario IAM para que la biblioteca Boto3 utilizara la autenticación en el cubo de S3. También almacenaste las credenciales en la sección aws_boto_credentials del archivo pipeline.conf. Aquí tienes el código para subir el archivo CSV a tu cubo de S3:

# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get("aws_boto_credentials", "access_key")
secret_key = parser.get("aws_boto_credentials", "secret_key")
bucket_name = parser.get("aws_boto_credentials", "bucket_name")

s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)

s3_file = local_filename

s3.upload_file(local_filename, bucket_name, s3_file)

Puedes ejecutar el script de la siguiente manera:

(env) $ python extract_mysql_full.py

Cuando se ejecute el script, todo el contenido de la tabla Orders estará ahora en un archivo CSV que estará en el cubo de S3 esperando a ser cargado en el almacén de datos o en otro almacén de datos. Consulta el Capítulo 5 para obtener más información sobre la carga en el almacén de datos de tu elección.

Si quieres extraer datos de forma incremental, tendrás que hacer algunos cambios en el script. Te sugiero que crees una copia de extract_mysql_full.py con el nombre extract_mysql_incremental.py como punto de partida.

En primer lugar, busca la marca de tiempo del último registro que se extrajo de la tabla Orders de origen. Para ello, consulta el valor MAX(LastUpdated) de la tabla Orders del almacén de datos. En este ejemplo, utilizaré un almacén de datos de Redshift (consulta "Configurar un almacén de Amazon Redshift como destino"), pero puedes utilizar la misma lógica con el almacén de tu elección.

Para interactuar con tu clúster Redshift, instala la biblioteca psycopg2, si aún no lo has hecho.

(env) $ pip install psycopg2

Este es el código para conectar y consultar el cluster Redshift para obtener el valor MAX(LastUpdated) de la tabla Orders:

import psycopg2

# get db Redshift connection info
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
dbname = parser.get("aws_creds", "database")
user = parser.get("aws_creds", "username")
password = parser.get("aws_creds", "password")
host = parser.get("aws_creds", "host")
port = parser.get("aws_creds", "port")

# connect to the redshift cluster
rs_conn = psycopg2.connect(
    "dbname=" + dbname
    + " user=" + user
    + " password=" + password
    + " host=" + host
    + " port=" + port)

rs_sql = """SELECT COALESCE(MAX(LastUpdated),
        '1900-01-01')
        FROM Orders;"""
rs_cursor = rs_conn.cursor()
rs_cursor.execute(rs_sql)
result = rs_cursor.fetchone()

# there's only one row and column returned
last_updated_warehouse = result[0]

rs_cursor.close()
rs_conn.commit()

Utilizando el valor almacenado en last_updated_warehouse, modifica la consulta de extracción ejecutada en la base de datos MySQL para extraer sólo los registros de la tabla Orders que se hayan actualizado desde la ejecución anterior del trabajo de extracción. La nueva consulta contiene un marcador de posición, representado por %s para el valor last_updated_warehouse. A continuación, el valor se pasa a la función .execute() del cursor como una tupla (un tipo de dato utilizado para almacenar colecciones de datos). Esta es la forma correcta y segura de añadir parámetros a una consulta SQL para evitar posibles inyecciones SQL. Aquí tienes el bloque de código actualizado para ejecutar la consulta SQL en la base de datos MySQL:

m_query = """SELECT *
    FROM Orders
    WHERE LastUpdated > %s;"""
local_filename = "order_extract.csv"

m_cursor = conn.cursor()
m_cursor.execute(m_query, (last_updated_warehouse,))

El script entire extract_mysql_incremental.py para la extracción incremental (utilizando un clúster Redshift para el valor last_updated ) tiene el siguiente aspecto:

import pymysql
import csv
import boto3
import configparser
import psycopg2

# get db Redshift connection info
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
dbname = parser.get("aws_creds", "database")
user = parser.get("aws_creds", "username")
password = parser.get("aws_creds", "password")
host = parser.get("aws_creds", "host")
port = parser.get("aws_creds", "port")

# connect to the redshift cluster
rs_conn = psycopg2.connect(
    "dbname=" + dbname
    + " user=" + user
    + " password=" + password
    + " host=" + host
    + " port=" + port)

rs_sql = """SELECT COALESCE(MAX(LastUpdated),
        '1900-01-01')
        FROM Orders;"""
rs_cursor = rs_conn.cursor()
rs_cursor.execute(rs_sql)
result = rs_cursor.fetchone()

# there's only one row and column returned
last_updated_warehouse = result[0]

rs_cursor.close()
rs_conn.commit()

# get the MySQL connection info and connect
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
dbname = parser.get("mysql_config", "database")
password = parser.get("mysql_config", "password")

conn = pymysql.connect(host=hostname,
        user=username,
        password=password,
        db=dbname,
        port=int(port))

if conn is None:
  print("Error connecting to the MySQL database")
else:
  print("MySQL connection established!")

m_query = """SELECT *
      FROM Orders
      WHERE LastUpdated > %s;"""
local_filename = "order_extract.csv"

m_cursor = conn.cursor()
m_cursor.execute(m_query, (last_updated_warehouse,))
results = m_cursor.fetchall()

with open(local_filename, 'w') as fp:
  csv_w = csv.writer(fp, delimiter='|')
  csv_w.writerows(results)

fp.close()
m_cursor.close()
conn.close()

# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get(
    "aws_boto_credentials",
    "access_key")
secret_key = parser.get(
    "aws_boto_credentials",
    "secret_key")
bucket_name = parser.get(
    "aws_boto_credentials",
    "bucket_name")

s3 = boto3.client(
    's3',
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key)

s3_file = local_filename

s3.upload_file(
    local_filename,
    bucket_name,
    s3_file)
Advertencia

Ten cuidado con los grandes trabajos de extracción -ya sean completos o incrementales- que sobrecargan la base de datos MySQL de origen, e incluso bloquean la ejecución de las consultas de producción. Consulta con el propietario de la base de datos y considera la posibilidad de crear una réplica desde la que realizar la extracción, en lugar de hacerlo desde la base de datos fuente primaria.

Replicación binaria del registro de datos MySQL

Aunque es más compleja de implementar, la ingesta de datos desde una base de datos MySQL utilizando el contenido del binlog MySQL para replicar los cambios es eficiente en casos de necesidades de ingesta de gran volumen.

Nota

La replicación Binlog es una forma de captura de datos de cambio (CDC). Muchos almacenes de datos fuente tienen alguna forma de CDC que puedes utilizar.

El binlog de MySQL es un registro que lleva un registro de cada operación realizada en la base de datos. Por ejemplo, dependiendo de cómo esté configurado, registrará los detalles de cada creación o modificación de una tabla, así como cada operación de INSERT, UPDATE y DELETE. Aunque originalmente estaba pensado para replicar datos a otras instancias de MySQL, no es difícil ver por qué el contenido del binlog es tan atractivo para los ingenieros de datos que quieren ingerir datos en un almacén de datos.

Como es probable que tu almacén de datos no sea una base de datos MySQL, no es posible utilizar simplemente las funciones de replicación MySQL incorporadas. Para utilizar el binlog para la ingesta de datos a una fuente no MySQL, hay que seguir una serie de pasos:

  1. Activa y configura el binlog en el servidor MySQL.

  2. Realiza una extracción y carga inicial de la tabla completa.

  3. Extraer del binlog de forma continua.

  4. Traduce y carga extractos de binlog en el almacén de datos.

Nota

El paso 3 no se trata en detalle, pero para utilizar el binlog para la ingesta, primero debes rellenar las tablas del almacén de datos con el estado actual de la base de datos MySQL y luego utilizar el binlog para ingerir los cambios posteriores. Hacerlo suele implicar poner un LOCK en las tablas que quieres extraer, ejecutar un mysqldump de esas tablas y luego cargar el resultado del mysqldump en el almacén antes de activar la ingestión del binlog.

Aunque lo mejor es consultar la documentación más reciente sobre binlog de MySQL para obtener instrucciones sobre cómo activar y configurar el registro binario, yo te explicaré los valores de configuración clave.

Hay dos ajustes clave que debes asegurar en la base de datos MySQL con respecto a la configuración de binlog.

En primer lugar, asegúrate de que el registro binario está activado. Normalmente está activado por defecto, pero puedes comprobarlo ejecutando la siguiente consulta SQL en la base de datos (la sintaxis exacta puede variar según la distribución de MySQL):

SELECT variable_value as bin_log_status
FROM performance_schema.global_variables
WHERE variable_name='log_bin';

Si el registro binario está activado, verás lo siguiente. Si el estado devuelto es OFF, tendrás que consultar la documentación de MySQL de la versión correspondiente para activarlo.

+ — — — — — — — — — — — — — — — — — — -+
| bin_log_status :: |
+ — — — — — — — — — — — — — — — — — — -+
| ON |
+ — — — — — — — — — — — — — — — — — — -+
1 row in set (0.00 sec)

A continuación, asegúrate de que el formato binario de registro está configurado correctamente. Hay tres formatos admitidos en la versión reciente de MySQL:

  • STATEMENT

  • ROW

  • MIXED

El formato STATEMENT registra cada sentencia SQL que inserta o modifica una fila en el binlog. Si quisieras replicar datos de una base de datos MySQL a otra, este formato es útil. Para replicar los datos, podrías simplemente ejecutar todas las sentencias para reproducir el estado de la base de datos. Sin embargo, como es probable que los datos extraídos estén destinados a un almacén de datos que se ejecuta en una plataforma diferente, las sentencias SQL producidas en la base de datos MySQL pueden no ser compatibles con tu almacén de datos.

Con el formato ROW, cada cambio en una fila de una tabla se representa en una línea del binlog, no como una sentencia SQL, sino como los datos de la propia fila. Este es el formato preferido.

El formato MIXED registra tanto los registros formateados en STATEMENT como en ROW en el binlog. Aunque es posible filtrar posteriormente sólo los datos de ROW, a menos que el binlog se utilice para otro fin, no es necesario activar MIXED, dado el espacio adicional en disco que ocupa.

Puedes verificar el formato binlog actual ejecutando la siguiente consulta SQL:

SELECT variable_value as bin_log_format
FROM performance_schema.global_variables
WHERE variable_name='binlog_format';

La declaración devolverá el formato que esté activo en ese momento:

+ — — — — — — — — — — — — — — — — — — — -+
| bin_log_format :: |
+ — — — — — — — — — — — — — — — — — — — -+
| ROW |
+ — — — — — — — — — — — — — — — — — — — -+
1 row in set (0.00 sec)

El formato binlog, así como otros ajustes de configuración, suelen establecerse en el archivo my.cnf específico de la instancia de base de datos MySQL. Si abres el archivo, verás incluida una fila como la siguiente:

[mysqld]
binlog_format=row
........

De nuevo, es mejor consultar con el propietario de la base de datos MySQL o con la documentación más reciente de MySQL antes de modificar cualquier configuración.

Ahora que el registro binario está habilitado en un formato ROW, puedes construir un proceso para extraer de él la información relevante y almacenarla en un archivo para cargarlo en tu almacén de datos.

Hay tres tipos diferentes de eventos con formato ROW que querrás extraer del binlog. Por el bien de este ejemplo de ingestión, puedes ignorar otros eventos que encuentres en el registro, pero en estrategias de replicación más avanzadas, extraer eventos que modifiquen la estructura de una tabla también tiene su valor. Los eventos con los que trabajarás son los siguientes:

  • WRITE_ROWS_EVENT

  • UPDATE_ROWS_EVENT

  • DELETE_ROWS_EVENT

A continuación, es hora de obtener los eventos del binlog. Afortunadamente, existen algunas bibliotecas Python de código abierto para que puedas empezar. Una de las más populares es el proyecto python-mysql-replication, que puedes encontrar en GitHub. Para empezar, instálala utilizando pip:

(env) $ pip install mysql-replication

Para hacerte una idea de cómo es la salida del binlog, puedes conectarte a la base de datos y leer el binlog. En este ejemplo, utilizaré la información de conexión MySQL añadida al archivo pipeline. conf para el ejemplo de ingestión completa e incremental anterior en esta sección.

Nota

El siguiente ejemplo lee del archivo binlog por defecto del servidor MySQL. El nombre y la ruta del archivo binlog por defecto se establecen en la variable log_bin, que se almacena en el archivo my.cnf de la base de datos MySQL. En algunos casos, los binlogs se rotan a lo largo del tiempo (quizás diariamente o cada hora). Si es así, tendrás que determinar la ruta del archivo basándote en el método de rotación de logs y el esquema de nomenclatura de archivos elegido por el administrador de MySQL y pasarlo como valor al parámetro log_file al crear la instancia BinLogStreamReader. Para más información, consulta la documentación de la clase BinLogStreamReader .

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication import row_event
import configparser
import pymysqlreplication

# get the MySQL connection info
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
password = parser.get("mysql_config", "password")

mysql_settings = {
    "host": hostname,
    "port": int(port),
    "user": username,
    "passwd": password
}

b_stream = BinLogStreamReader(
            connection_settings = mysql_settings,
            server_id=100,
            only_events=[row_event.DeleteRowsEvent,
                        row_event.WriteRowsEvent,
                        row_event.UpdateRowsEvent]
            )

for event in b_stream:
    event.dump()

b_stream.close()

Hay algunas cosas que señalar sobre el objeto BinLogStreamReader que se instancia en el ejemplo de código. En primer lugar, se conecta a la base de datos MySQL especificada en el archivo pipeline. conf y lee de un archivo binlog específico. A continuación, la combinación del ajuste resume_stream=True y el valor log_pos le indica que empiece a leer el binlog en un punto determinado. En este caso, es la posición 1400. Por último, le digo a BinLogStreamReader que sólo lea los eventos DeleteRowsEvent, WriteRowsEvent, y UpdateRowsEvent, utilizando el parámetro only_events.

A continuación, el script recorre todos los eventos y los imprime en un formato legible por humanos. Para tu base de datos con la tabla Orders, verás algo como esto:

=== WriteRowsEvent ===
Date: 2020-06-01 12:00:00
Log position: 1400
Event size: 30
Read bytes: 20
Table: orders
Affected columns: 3
Changed rows: 1
Values:
--
* OrderId : 1
* OrderStatus : Backordered
* LastUpdated : 2020-06-01 12:00:00

=== UpdateRowsEvent ===
Date: 2020-06-09 12:00:25
Log position: 1401
Event size: 56
Read bytes: 15
Table: orders
Affected columns: 3
Changed rows: 1
Affected columns: 3
Values:
--
* OrderId : 1 => 1
* OrderStatus : Backordered => Shipped
* LastUpdated : 2020-06-01 12:00:00 => 2020-06-09 12:00:25

Como puedes ver, hay dos eventos que representan las páginas INSERT y UPDATE de OrderId 1, que se mostraban en la Tabla 4-3. En este ejemplo ficticio, los dos eventos binlog secuenciales están separados por días, pero en la realidad habría numerosos eventos entre ellos, que representarían todos los cambios realizados en la base de datos.

Nota

El valor de log_pos, que indica a BinLogStreamReader dónde debe empezar, es un valor que tendrás que almacenar en algún lugar de una tabla propia para saber dónde debe recogerse cuando se ejecute la siguiente extracción. A mí me parece que lo mejor es almacenar el valor en una tabla de registro en el almacén de datos desde la que se pueda leer cuando se inicie la extracción y en la que se pueda escribir, con el valor de posición del evento final cuando termine.

Aunque el ejemplo de código muestra el aspecto de los eventos en un formato legible para el ser humano, para que la salida sea fácil de cargar en el almacén de datos, es necesario hacer un par de cosas más:

  • Analiza y escribe los datos en un formato diferente. Para simplificar la carga, el siguiente ejemplo de código escribirá cada evento en una fila de un archivo CSV.

  • Escribe un archivo por cada tabla que quieras extraer y cargar. Aunque el binlog de ejemplo sólo contiene eventos relacionados con la tabla Orders, es muy probable que en un binlog real también se incluyan eventos relacionados con otras tablas.

Para el primer cambio, en lugar de utilizar la función .dump(), analizaré los atributos de los eventos y los escribiré en un archivo CSV. Para el segundo, en lugar de escribir un archivo para cada tabla, por simplicidad sólo escribiré los eventos relacionados con la tabla Orders en un archivo llamado pedidos_extraer.csv. En una extracción totalmente implementada, modifica este ejemplo de código para agrupar los eventos por tabla y escribir varios archivos, uno para cada tabla de la que quieras ingerir cambios. El último paso del ejemplo de código final carga el archivo CSV en el bucket de S3 para que pueda cargarse en el almacén de datos, como se describe detalladamente en el Capítulo 5:

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication import row_event
import configparser
import pymysqlreplication
import csv
import boto3

# get the MySQL connection info
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
password = parser.get("mysql_config", "password")

mysql_settings = {
    "host": hostname,
    "port": int(port),
    "user": username,
    "passwd": password
}

b_stream = BinLogStreamReader(
            connection_settings = mysql_settings,
            server_id=100,
            only_events=[row_event.DeleteRowsEvent,
                        row_event.WriteRowsEvent,
                        row_event.UpdateRowsEvent]
            )

order_events = []

for binlogevent in b_stream:
  for row in binlogevent.rows:
    if binlogevent.table == 'orders':
      event = {}
      if isinstance(
            binlogevent,row_event.DeleteRowsEvent
        ):
        event["action"] = "delete"
        event.update(row["values"].items())
      elif isinstance(
            binlogevent,row_event.UpdateRowsEvent
        ):
        event["action"] = "update"
        event.update(row["after_values"].items())
      elif isinstance(
            binlogevent,row_event.WriteRowsEvent
        ):
        event["action"] = "insert"
        event.update(row["values"].items())

      order_events.append(event)

b_stream.close()

keys = order_events[0].keys()
local_filename = 'orders_extract.csv'
with open(
        local_filename,
        'w',
        newline='') as output_file:
    dict_writer = csv.DictWriter(
                output_file, keys,delimiter='|')
    dict_writer.writerows(order_events)

# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get(
                "aws_boto_credentials",
                "access_key")
secret_key = parser.get(
                "aws_boto_credentials",
                "secret_key")
bucket_name = parser.get(
                "aws_boto_credentials",
                "bucket_name")

s3 = boto3.client(
    's3',
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key)

s3_file = local_filename

s3.upload_file(
    local_filename,
    bucket_name,
    s3_file)

Tras la ejecución, orders_extract.csv tendrá el siguiente aspecto:

insert|1|Backordered|2020-06-01 12:00:00
update|1|Shipped|2020-06-09 12:00:25

Como comento en el Capítulo 5, el formato del archivo CSV resultante está optimizado para una carga rápida. Dar sentido a los datos que se han extraído es un trabajo para el paso de transformación en una tubería, revisado en detalle en el Capítulo 6.

Extraer datos de una base de datos PostgreSQL

Al igual que MySQL, la ingesta de datos de una base de datos PostgreSQL (comúnmente conocida como Postgres) puede hacerse en de dos maneras: con extracciones completas o incrementales utilizando SQL o aprovechando las características de la base de datos destinadas a soportar la replicación a otros nodos. En el caso de Postgres, hay varias formas de hacerlo, pero este capítulo se centrará en un método: convertir el registro de escritura anticipada (WAL) de Postgres en un flujo de datos.

Al igual que la sección anterior, ésta está pensada para quienes necesiten ingerir datos de una base de datos Postgres existente. Sin embargo, si sólo quieres probar los ejemplos de código, puedes configurar Postgres instalándolo en tu máquina local o en AWS utilizando una instancia RDS, que es lo que recomiendo. Consulta la sección anterior para obtener información sobre las buenas prácticas relacionadas con los precios y la seguridad para RDS MySQL, ya que también se aplican a RDS Postgres.

Los ejemplos de código de esta sección son bastante sencillos y se refieren a una tabla llamada Orders en una base de datos Postgres. Una vez que tengas una instancia de Postgres con la que trabajar, puedes crear la tabla e insertar algunas filas de ejemplo ejecutando los siguientes comandos SQL:

CREATE TABLE Orders (
  OrderId int,
  OrderStatus varchar(30),
  LastUpdated timestamp
);

INSERT INTO Orders
  VALUES(1,'Backordered', '2020-06-01 12:00:00');
INSERT INTO Orders
  VALUES(1,'Shipped', '2020-06-09 12:00:25');
INSERT INTO Orders
  VALUES(2,'Shipped', '2020-07-11 3:05:00');
INSERT INTO Orders
  VALUES(1,'Shipped', '2020-06-09 11:50:00');
INSERT INTO Orders
  VALUES(3,'Shipped', '2020-07-12 12:00:00');

Extracción completa o incremental de tablas Postgres

Este método es similar a las extracciones completa e incremental y completa demostradas en "Extraer datos de una base de datos MySQL". Es tan similar que no entraré en detalles aquí más allá de una diferencia en el código. Al igual que el ejemplo de esa sección, éste extraerá datos de una tabla llamada Orders en una base de datos fuente, los escribirá en un archivo CSV y luego los subirá a un bucket de S3.

La única diferencia en esta sección es la biblioteca Python que utilizaré para extraer los datos. En lugar de PyMySQL, utilizaré pyscopg2 para conectarme a una base de datos Postgres. Si aún no la has instalado, puedes hacerlo utilizando pip:

(env) $ pip install pyscopg2

También tendrás que añadir una nueva sección al archivo pipeline. conf con la información de conexión para la base de datos Postgres:

[postgres_config]
host = myhost.com
port = 5432
username = my_username
password = my_password
database = db_name

El código para ejecutar la extracción completa de la tabla Orders es casi idéntico al ejemplo de la sección MySQL, pero como puedes ver, utiliza pyscopg2 para conectar con la base de datos de origen y ejecutar la consulta. Aquí está en su totalidad:

import psycopg2
import csv
import boto3
import configparser

parser = configparser.ConfigParser()
parser.read("pipeline.conf")
dbname = parser.get("postgres_config", "database")
user = parser.get("postgres_config", "username")
password = parser.get("postgres_config",
    "password")
host = parser.get("postgres_config", "host")
port = parser.get("postgres_config", "port")

conn = psycopg2.connect(
        "dbname=" + dbname
        + " user=" + user
        + " password=" + password
        + " host=" + host,
        port = port)

m_query = "SELECT * FROM Orders;"
local_filename = "order_extract.csv"

m_cursor = conn.cursor()
m_cursor.execute(m_query)
results = m_cursor.fetchall()

with open(local_filename, 'w') as fp:
  csv_w = csv.writer(fp, delimiter='|')
  csv_w.writerows(results)

fp.close()
m_cursor.close()
conn.close()

# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get(
                "aws_boto_credentials",
                "access_key")
secret_key = parser.get(
                "aws_boto_credentials",
                "secret_key")
bucket_name = parser.get(
                "aws_boto_credentials",
                "bucket_name")

s3 = boto3.client(
      's3',
      aws_access_key_id = access_key,
      aws_secret_access_key = secret_key)

s3_file = local_filename

s3.upload_file(
    local_filename,
    bucket_name,
    s3_file)

Modificar la versión incremental mostrada en la sección MySQL es igual de sencillo. Lo único que tienes que hacer es utilizar psycopg2 en lugar de PyMySQL.

Replicar datos utilizando el registro de escritura anticipada

Al igual que el binlog de MySQL (del que hablamos en la sección anterior), la WAL de Postgres puede utilizarse como método de CDC para la extracción. También como el binlog de MySQL, utilizar la WAL para la ingestión de datos en una canalización es bastante complejo.

Aunque puedes adoptar un enfoque similar y simplificado al utilizado como ejemplo con el binlog MySQL, te sugiero que utilices una plataforma distribuida de código abierto llamada Debezium para transmitir el contenido de la WAL de Postgres a un bucket S3 o a un almacén de datos.

Aunque los detalles de la configuración y ejecución de los servicios Debezium son un tema al que merece la pena dedicar un libro entero, en "Streaming Data Ingestions with Kafka and Debezium" ofrezco una visión general de Debezium y de cómo puede utilizarse para las ingestas de datos . Puedes obtener más información sobre cómo puede utilizarse para Postgres CDC allí.

Extraer datos de MongoDB

Este ejemplo ilustra cómo extraer un subconjunto de documentos MongoDB de una colección. En esta colección MongoDB de ejemplo, los documentos representan eventos registrados desde algún sistema, como un servidor web. Cada documento tiene una marca de tiempo que indica cuándo se creó, así como una serie de propiedades de las que el código de ejemplo extrae un subconjunto. Una vez finalizada la extracción, los datos se escriben en un archivo CSV y se almacenan en un bucket S3 para que puedan cargarse en un almacén de datos en un paso futuro (consulta el Capítulo 5).

Para conectarte a la base de datos MongoDB, primero tendrás que instalar la biblioteca PyMongo. Como con otras bibliotecas de Python, puedes instalarla utilizando pip:

(env) $ pip install pymongo

Por supuesto, puedes modificar el siguiente código de ejemplo para conectarte a tu propia instancia de MongoDB y extraer datos de tus documentos. Sin embargo, si quieres ejecutar el ejemplo tal cual, puedes hacerlo creando un clúster MongoDB gratuito con MongoDB Atlas. Atlas es un servicio MongoDB totalmente gestionado e incluye un nivel gratuito de por vida con abundante almacenamiento y potencia de cálculo para aprender y ejecutar muestras como la que yo proporciono. Puedes pasar a un plan de pago para implementaciones de producción.

Puedes aprender a crear un clúster MongoDB gratuito en Atlas, crear una base de datos y configurarla para que puedas conectarte mediante un script de Python que se ejecute en tu máquina local siguiendo estas instrucciones.

Necesitarás instalar una biblioteca Python más llamada dnspython para soportar pymongo en la conexión a tu clúster alojado en MongoDB Atlas. Puedes instalarla utilizando pip:

(env) $ pip install dnspython

A continuación, añade una nueva sección al archivo pipeline. conf con la información de conexión para la instancia de MongoDB de la que extraerás los datos. Rellena cada línea con tus propios detalles de conexión. Si utilizas MongoDB Atlas y no recuerdas estos valores de cuando configuraste tu clúster, puedes aprender a encontrarlos leyendo la documentación de Atlas.

[mongo_config]
hostname = my_host.com
username = mongo_user
password = mongo_password
database = my_database
collection = my_collection

Antes de crear y ejecutar el script de extracción, puedes insertar algunos datos de muestra con los que trabajar. Crea un archivo llamado muestra_mongodb.py con el siguiente código:

from pymongo import MongoClient
import datetime
import configparser

# load the mongo_config values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mongo_config", "hostname")
username = parser.get("mongo_config", "username")
password = parser.get("mongo_config", "password")
database_name = parser.get("mongo_config",
                    "database")
collection_name = parser.get("mongo_config",
                    "collection")

mongo_client = MongoClient(
                "mongodb+srv://" + username
                + ":" + password
                + "@" + hostname
                + "/" + database_name
                + "?retryWrites=true&"
                + "w=majority&ssl=true&"
                + "ssl_cert_reqs=CERT_NONE")

# connect to the db where the collection resides
mongo_db = mongo_client[database_name]

# choose the collection to query documents from
mongo_collection = mongo_db[collection_name]

event_1 = {
  "event_id": 1,
  "event_timestamp": datetime.datetime.today(),
  "event_name": "signup"
}

event_2 = {
  "event_id": 2,
  "event_timestamp": datetime.datetime.today(),
  "event_name": "pageview"
}

event_3 = {
  "event_id": 3,
  "event_timestamp": datetime.datetime.today(),
  "event_name": "login"
}

# insert the 3 documents
mongo_collection.insert_one(event_1)
mongo_collection.insert_one(event_2)
mongo_collection.insert_one(event_3)

Cuando lo ejecutes, los tres documentos se insertarán en tu colección MongoDB:

(env) $ python sample_mongodb.py

Ahora crea un nuevo script Python llamado mongo_extract.py para que puedas añadirle los siguientes bloques de código.

En primer lugar, importa PyMongo y Boto3 para que puedas extraer datos de la base de datos MongoDB y almacenar los resultados en un bucket S3. Importa también la biblioteca csv para que puedas estructurar y escribir los datos extraídos en un archivo plano fácil de importar a un almacén de datos en el paso de carga de la ingesta. Por último, necesitarás algunas funciones datetime para este ejemplo, de modo que puedas iterar a través de los datos de eventos de muestra en la colección MongoDB:

from pymongo import MongoClient
import csv
import boto3
import datetime
from datetime import timedelta
import configparser

A continuación, conéctate a la instancia de MongoDB especificada en el archivo pipelines.conf, y crea un objeto collection donde se almacenen los documentos que quieres extraer:

# load the mongo_config values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mongo_config", "hostname")
username = parser.get("mongo_config", "username")
password = parser.get("mongo_config", "password")
database_name = parser.get("mongo_config",
                    "database")
collection_name = parser.get("mongo_config",
                    "collection")

mongo_client = MongoClient(
                "mongodb+srv://" + username
                + ":" + password
                + "@" + hostname
                + "/" + database_name
                + "?retryWrites=true&"
                + "w=majority&ssl=true&"
                + "ssl_cert_reqs=CERT_NONE")

# connect to the db where the collection resides
mongo_db = mongo_client[database_name]

# choose the collection to query documents from
mongo_collection = mongo_db[collection_name]

Ahora es el momento de consultar los documentos a extraer. Puedes hacerlo llamando a la función .find() en mongo_collection para consultar los documentos que buscas. En el siguiente ejemplo, cogerás todos los documentos con un valor del campo event_timestamp entre dos fechas definidas en el script.

Nota

Extraer datos inmutables, como registros de registro o registros genéricos de "eventos" de un almacén de datos por intervalo de fechas, es un caso de uso común. Aunque el código de ejemplo utiliza un intervalo de fechas definido en el script, es más probable que le pases un intervalo de fechas, o que hagas que el script consulte tu almacén de datos para obtener la fecha del último evento cargado, y extraiga los registros posteriores del almacén de datos de origen. Consulta "Extraer datos de una base de datos MySQL" para ver un ejemplo de cómo hacerlo.

start_date = datetime.datetime.today() + timedelta(days = -1)
end_date = start_date + timedelta(days = 1 )

mongo_query = { "$and":[{"event_timestamp" : { "$gte": start_date }}, {"event_timestamp" : { "$lt": end_date }}] }

event_docs = mongo_collection.find(mongo_query, batch_size=3000)
Nota

El parámetro batch_size en este ejemplo se establece en 3000. PyMongo realiza un viaje de ida y vuelta al host MongoDB por cada lote. Por ejemplo, si el result_docs Cursor tiene 6.000 resultados, tardará dos viajes al host MongoDB para bajar todos los documentos a la máquina donde se está ejecutando tu script Python. Lo que establezcas como valor del tamaño del lote depende de ti, y dependerá del equilibrio entre almacenar más documentos en la memoria del sistema que ejecuta el extracto y realizar muchos viajes de ida y vuelta a la instancia de MongoDB.

El resultado del código anterior es un Cursor llamado event_docs que utilizaré para iterar por los documentos resultantes. Recuerda que en este ejemplo simplificado, cada documento representa un evento generado desde un sistema como un servidor web. Un evento puede representar algo así como un usuario que se registra, ve una página o envía un formulario de respuesta. Aunque los documentos podrían tener docenas de campos para representar cosas como el navegador con el que se conectó el usuario, en este ejemplo sólo tomaré unos pocos campos:

# create a blank list to store the results
all_events = []

# iterate through the cursor
for doc in event_docs:
    # Include default values
    event_id = str(doc.get("event_id", -1))
    event_timestamp = doc.get(
                        "event_timestamp", None)
    event_name = doc.get("event_name", None)

    # add all the event properties into a list
    current_event = []
    current_event.append(event_id)
    current_event.append(event_timestamp)
    current_event.append(event_name)

    # add the event to the final list of events
    all_events.append(current_event)

Estoy incluyendo un valor por defecto en la llamada a la función doc.get() (-1 o Ninguno). ¿Por qué? La naturaleza de los datos de documentos no estructurados implica que es posible que falten campos en un documento. En otras palabras, no puedes suponer que cada uno de los documentos por los que estás iterando tiene un "nombre_evento" o cualquier otro campo. En esos casos, dile a doc.get() que devuelva un valor None en lugar de lanzar un error.

Después de iterar por todos los eventos en event_docs, la lista all_events está lista para ser escrita en un archivo CSV. Para ello, utilizarás el módulo csv, incluido en la distribución estándar de Python e importado anteriormente en este ejemplo:

export_file = "export_file.csv"

with open(export_file, 'w') as fp:
	csvw = csv.writer(fp, delimiter='|')
	csvw.writerows(all_events)

fp.close()

Ahora, sube el archivo CSV al bucket S3 que configuraste en "Configurar el almacenamiento de archivos en la nube". Para hacerlo en , utiliza la biblioteca Boto3:

# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get("aws_boto_credentials",
                "access_key")
secret_key = parser.get("aws_boto_credentials",
                "secret_key")
bucket_name = parser.get("aws_boto_credentials",
                "bucket_name")

s3 = boto3.client('s3',
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_key)

s3_file = export_file

s3.upload_file(export_file, bucket_name, s3_file)

Ya está. Los datos que has extraído de la colección de MongoDB están ahora en el cubo de S3 esperando a ser cargados en el almacén de datos u otro almacén de datos. Si has utilizado los datos de ejemplo proporcionados, el contenido de export_file.csv tendrá este aspecto:

1|2020-12-13 11:01:37.942000|signup
2|2020-12-13 11:01:37.942000|pageview
3|2020-12-13 11:01:37.942000|login

Consulta el Capítulo 5 para saber más sobre cómo cargar los datos en el almacén de datos que elijas.

Extraer datos de una API REST

Las API REST son una fuente habitual de la que extraer datos. Puede que necesites ingerir datos de una API que tu organización haya creado y mantenga, o de una API de un servicio o proveedor externo que tu organización utilice, como Salesforce, HubSpot o Twitter. Independientemente de la API, hay un patrón común para la extracción de datos que utilizaré en el sencillo ejemplo que sigue:

  1. Envía una solicitud HTTP GET al punto final de la API.

  2. Acepta la respuesta, que lo más probable es que esté formateada en JSON.

  3. Analiza la respuesta y "aplánala" en un archivo CSV que puedas cargar posteriormente en el almacén de datos.

Nota

Aunque estoy analizando la respuesta JSON y almacenándola en un archivo plano (CSV), también puedes guardar los datos en formato JSON para cargarlos en tu almacén de datos. En aras de la simplicidad, me atengo al patrón de este capítulo y utilizo archivos CSV. Consulta el Capítulo 5 o la documentación de tu almacén de datos para obtener más información sobre la carga de datos en un formato distinto al CSV.

En este ejemplo, me conectaré a una API llamada Open Notify. La API tiene varios puntos finales, cada uno de los cuales devuelve datos de la NASA sobre lo que ocurre en el espacio. Consultaré el punto final que devuelve las próximas cinco veces que la Estación Espacial Internacional (ISS) pasará por encima de la ubicación dada en la Tierra.

Antes de compartir el código Python para consultar el punto final, puedes ver cómo es la salida de una consulta sencilla escribiendo la siguiente URL en tu navegador:

http://api.open-notify.org/iss-pass.json?lat=42.36&lon=71.05

El JSON resultante tiene este aspecto:

{
  "message": "success",
  "request": {
    "altitude": 100,
    "datetime": 1596384217,
    "latitude": 42.36,
    "longitude": 71.05,
    "passes": 5
  },
  "response": [
    {
      "duration": 623,
      "risetime": 1596384449
    },
    {
      "duration": 169,
      "risetime": 1596390428
    },
    {
      "duration": 482,
      "risetime": 1596438949
    },
    {
      "duration": 652,
      "risetime": 1596444637
    },
    {
      "duration": 624,
      "risetime": 1596450474
    }
  ]
}

El objetivo de esta extracción es recuperar los datos de la respuesta y formatearlos en un archivo CSV con una línea por cada hora y duración de cada pasada que la ISS haga sobre el par lat/long. Por ejemplo, las dos primeras líneas del archivo CSV serán las siguientes

42.36,|71.05|623|1596384449
42.36,|71.05|169|1596390428

Para consultar la API y gestionar la respuesta en Python, necesitarás instalar la biblioteca requests. requests facilita el trabajo con solicitudes HTTP y respuestas en Python. Puedes instalarla con pip:

(env) $ pip install requests

Ahora puedes utilizar requests para consultar el punto final de la API, obtener la respuesta e imprimir el JSON resultante, que se parecerá a lo que veías en tu navegador:

import requests

lat = 42.36
lon = 71.05
lat_log_params = {"lat": lat, "lon": lon}

api_response = requests.get(
    "http://api.open-notify.org/iss-pass.json", params=lat_log_params)

print(api_response.content)

En lugar de imprimir el JSON, iteraré por la respuesta, analizaré los valores de duración y tiempo de subida, escribiré los resultados en un archivo CSV y subiré el archivo al bucket de S3.

Para analizar la respuesta JSON, importaré la biblioteca Python json. No es necesario instalarla, ya que viene con la instalación estándar de Python. A continuación, importaré la biblioteca csv, que también se incluye en la distribución estándar de Python para escribir el archivo CSV. Por último, utilizaré la biblioteca configparser para obtener las credenciales que necesita la biblioteca Boto3 para subir el archivo CSV al bucket de S3:

import requests
import json
import configparser
import csv
import boto3

A continuación, consulta la API igual que hiciste antes:

lat = 42.36
lon = 71.05
lat_log_params = {"lat": lat, "lon": lon}

api_response = requests.get(
    "http://api.open-notify.org/iss-pass.json", params=lat_log_params)

Ahora, es el momento de iterar por la respuesta, almacenar los resultados en un archivo Python list llamado all_passes, y guardar los resultados en un archivo CSV. Observa que también almaceno la latitud y la longitud de la solicitud, aunque no estén incluidas en la respuesta. Son necesarias en cada línea del archivo CSV para que los tiempos de paso se asocien a la latitud y longitud correctas cuando se carguen en el almacén de datos:

# create a json object from the response content
response_json = json.loads(api_response.content)

all_passes = []
for response in response_json['response']:
    current_pass = []

    #store the lat/log from the request
    current_pass.append(lat)
    current_pass.append(lon)

    # store the duration and risetime of the pass
    current_pass.append(response['duration'])
    current_pass.append(response['risetime'])

    all_passes.append(current_pass)

export_file = "export_file.csv"

with open(export_file, 'w') as fp:
	csvw = csv.writer(fp, delimiter='|')
	csvw.writerows(all_passes)

fp.close()

Por último, sube el archivo CSV al bucket S3 utilizando la biblioteca Boto3:

# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get("aws_boto_credentials",
                "access_key")
secret_key = parser.get("aws_boto_credentials",
                "secret_key")
bucket_name = parser.get("aws_boto_credentials",
                "bucket_name")

s3 = boto3.client(
    's3',
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key)

s3.upload_file(
    export_file,
    bucket_name,
    export_file)

Ingesta de datos en streaming con Kafka y Debezium

Cuando se trata de a la ingesta de datos de un sistema CDC, como los binlogs de MySQL o los WALs de Postgres, no hay una solución sencilla sin la ayuda de un gran framework.

Debezium es un sistema distribuido formado por varios servicios de código abierto que capturan los cambios a nivel de fila de los sistemas CDC comunes y luego los transmiten como eventos consumibles por otros sistemas. Hay tres componentes principales en una instalación de Debezium:

  • Apache Zookeeper gestiona el entorno distribuido y se encarga de la configuración de cada servicio.

  • Apache Kafka es una plataforma de streaming distribuido que se utiliza habitualmente para construir canalizaciones de datos altamente escalables.

  • Apache Kafka Connect es una herramienta para conectar Kafka con otros sistemas, de forma que los datos se puedan transmitir fácilmente a través de Kafka. Los conectores están construidos para sistemas como MySQL y Postgres y convierten los datos de sus sistemas CDC (binlogs y WAL) en temas Kakfa.

Kafka intercambia mensajes organizados por temas. Un sistema puede publicar en un tema, mientras que otro u otros pueden consumirlo o suscribirse a él.

Debezium une estos sistemas e incluye conectores para implementaciones comunes de CDC. Por ejemplo, hablé de los retos para CDC en " Extraer datos de una base de datos MySQL" y "Extraer datos de una base de datos PostgreSQL". Afortunadamente, ya existen conectores para "escuchar" el binlog de MySQL y el WAL de Postgres. A continuación, los datos se enrutan a través de Kakfa como registros en un tema y se consumen en un destino como un bucket S3, Snowflake o almacén de datos Redshift utilizando otro conector. La Figura 4-1 ilustra un ejemplo de uso de Debezium, y sus componentes individuales, para enviar los eventos creados por un binlog MySQL a un almacén de datos Snowflake.

dppr 0401
Figura 4-1. Uso de componentes de Debezium para CDC de MySQL a Snowflake.

En el momento de escribir esto, hay una serie de conectores Debezium ya construidos para sistemas fuente de los que puede que necesites ingerir:

  • MongoDB

  • MySQL

  • PostgreSQL

  • Microsoft SQL Server

  • Oracle

  • Db2

  • Cassandra

También hay conectores Kafka Connect para los almacenes de datos y sistemas de almacenamiento más comunes, como S3 y Snowflake.

Aunque Debezium, y el propio Kafka, es un tema que justifica su propio libro, quiero señalar su valor si decides que CDC es un método que quieres utilizar para la ingestión de datos. El sencillo ejemplo que utilicé en la sección de extracción de MySQL de este capítulo es funcional; sin embargo, si quieres utilizar CDC a escala, ¡te sugiero encarecidamente que utilices algo como Debezium en lugar de construir una plataforma como Debezium por tu cuenta!

Consejo

La documentación de Debezium es excelente y un gran punto de partida para conocer el sistema.

Get Referencia de bolsillo sobre canalizaciones de datos now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.