Chapter 4. Diving into the Delta Lake Ecosystem

Over the last few chapters, we’ve explored Delta Lake from the comfort of the Spark ecosystem. The Delta protocol, however, offers rich interoperability not only across the underlying table format but within the computing environment as well. This opens the doors to an expansive universe of possibilities for powering our lakehouse applications, using a single source of table truth. It’s time to break outside the box and look at the connector ecosystem.

The connector ecosystem is a set of ever-expanding frameworks, services, and community-driven integrations enabling Delta to be utilized from just about anywhere. The commitment to interoperability enables us to take full advantage of the hard work and effort the growing open source community provides without sacrificing the years we’ve collectively poured into technologies outside the Spark ecosystem.

In this chapter, we’ll discover some of the more popular Delta connectors while learning to pilot our Delta-based data applications from outside the traditional Spark ecosystem. For those of you who haven’t done much work with Apache Spark, you’re in luck, since this chapter is a love song to Delta Lake without Apache Spark and a closer look at how the connector ecosystem works.

We will be covering the following integrations:1

  • Flink DataStream Connector

  • Kafka Delta Ingest

  • Trino Connector

In addition to the four core connectors in this chapter, support for Apache Pulsar, ClickHouse, FINOS Legend, Hopsworks, Delta Rust, Presto, StarRocks, and general SQL import to Delta is also available at the time of writing.

What are connectors, you ask? We will learn all about them next.

Connectors

As people, we don’t like to set limits for ourselves. Some of us are more adventurous and love to think about the unlimited possibilities of the future. Others take a more narrow, straight-ahead approach to life. Regardless of our respective attitudes, we are bound together by our pursuit of adventure, search for novelty, and desire to make decisions for ourselves. Nothing is worse than being locked in, trapped, with no way out. From the perspective of the data practitioner, it is also nice to know that what we rely on today can be used tomorrow without the dread of contract renegotiations! While Delta Lake is not a person, the open source community has responded to the various wants and needs of the community at large, and a healthy ecosystem has risen up to ensure that no one will have to be tied directly to the Apache Spark ecosystem, the JVM, or even the traditional set of data-focused programming languages like Python, Scala, and Java.

The mission of the connector ecosystem is to ensure frictionless interoperability with the Delta protocol. Over time, however, fragmentation across the current (delta < 3.0) connector ecosystem has led to multiple independent implementations of the Delta protocol and divergence across the current connectors. To streamline support for the future of the Delta ecosystem, Delta Kernel was introduced to provide a common interface and expectations that simplify true interoperability within the Delta ecosystem.

Note

Kernel provides a seamless set of read- and write-level APIs that ensures correctness of operation and freedom of expression for the connector API implementation. This means that the behavior across all connectors will leverage the same set of operations, with the same inputs and outputs, while ensuring each connector can quickly implement new features without lengthy lead times or divergent handling of the underlying Delta protocol. Delta Kernel is introduced in Chapter 1.

There are a healthy number of connectors and integrations that enable interoperability with the Delta table format and protocols, no matter where we trigger operations from. Interoperability and unification are part of the core tenets of the Delta project and helped drive the push toward UniForm (introduced along with Delta 3.0), which provides cross-table support for Delta, Iceberg, and Hudi.

In the sections that follow, we’ll take a look at the most popular connectors, including Apache Flink, Trino, and Kafka Delta Ingest. Learning to utilize Delta from your favorite framework is just a few steps away.

Kafka Delta Ingest

The connector name sums up exactly what this little powerful library does. It reads a stream of records from a Kafka topic, optionally transforms each record (the data stream)—for example, from raw bytes to the deserialized JSON or Avro payload—and then writes the data into a Delta table. Behind the scenes, a minimal amount of user-provided configuration helps mold the connector to fulfill each specific use case. Due to the simplicity of the kafka-delta-ingest client, we reduce the level of effort required for one of the most critical phases of the data engineering life cycle—initial data ingestion into the lakehouse via Delta Lake.

The kafka-delta-ingest connector provides  a daemon that simplifies the common step of streaming Kafka data into our Delta Lake tables. Getting started can also be done in four easy steps:

  1. Install Rust.

  2. Build the project.

  3. Create your Delta table.

  4. Run the ingestion flow.

Install Rust

This can be done using the rustup toolchain:

% curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

Once rustup is installed, running rustup update will ensure we are on the latest stable version of Rust available.

Build the Project

This step ensures we have access to the source code.

Using git on the command line, simply clone the connector:

% git clone git@github.com:delta-io/kafka-delta-ingest.git \
  && cd kafka-delta-ingest

Set up your local environment

From the root of the project directory, run the Docker setup utility:

% docker compose up setup

After the setup flow completes, we have localstack (which runs a local Amazon Web Services [AWS] instance), kafka (redpandas), and the confluent schema registry, as well as azurite for local Azure Storage. Having access to run our cloud-based workflows locally greatly reduces the pain of moving from the design phase of our applications into production.

Build the connector

Rust uses cargo for dependency management and to build your project. The cargo utility is installed for us by the rustup toolchain. From the project root, execute the following command:

% cargo build

At this point we’ll have the connector built and the Rust dependencies installed, and we can choose to either run the examples or connect to our own Kafka brokers and get started. The last section on using kafka-delta-ingest will cover running the end-to-end ingestion.2

Run the Ingestion Flow

For the ingestion application to function, we need to have two things—a source Kafka topic and a destination Delta table. There is a caveat with the generation of the Delta table, especially if you are familiar with Apache Spark–based Delta workflows: we must first create our destination Delta table in order to successfully run the ingestion flow.

There are a handful of variables that can modify the kafka-delta-ingest application. We will begin with a tour of the basic environment variables in Table 4-1, and then Table 4-2 will provide us with some of the runtime variables (args) that are available to us when using this connector.

Table 4-1. Using environment variables
Environment variable Description Default
KAFKA_BROKERS The Kafka broker string; can be used to overwrite the location of the brokers for local testing, or for triage and recovery applications localhost:9092
AWS_ENDPOINT_URL Used to run local tests via LocalStack none
AWS_ACCESS_KEY_ID Used to provide the application identity test
AWS_SECRET_ACCESS_KEY Used to authenticate the application identity test
AWS_DEFAULT_REGION Can be useful for running LocalStack or for bootstrapping separate S3 bucket locations none
Table 4-2. Using command-line arguments
Argument Description Example
allowed_latency Used to specify how long to fill the buffer and await new data before processing --allowed_latency 60
app_id Used to run local tests via LocalStack --app_id ingest-app
auto_offset_reset Can be earliest or latest; this affects whether you read from the tail or the head of the Kafka topic --auto_offset_reset earliest
checkpoints Will record the Kafka metadata for each processed ingestion batch; this allows for you to easily stop the application and start it back up again without data loss (unless Kafka deletes the data between runs, which can be checked in the delete policy for the topic) --checkpoints
consumer_group_id Provides a unique consumer name for the Kafka brokers; using the group ID, the brokers can distribute the processing of a large topic among multiple consumer applications without duplication --consumer_group_id ecomm-ingest-app
max_messages_per_batch Use this option to throttle the number of messages per application tick (loop); this can help keep your applications from running out of memory if there is an unexpected increase in the volume of the records being written to the topic --max_mes⁠sages_per​_batch 1600
min_bytes_per_file Use this option to ensure that the underlying Delta table doesn’t become riddled with small files --min_bytes_per_file 64000000
kafka Used to pass the Kafka broker string to the ingest application --kafka 127.0.0.1:29092

Now all that is left to do is to run the ingestion application. If we are running the application using our environment variables, then the simplest command would provide the Kafka topic and the Delta table location. The command signature is as follows:

% cargo run ingest <topic> <delta_table_location>

Next, we’ll see a complete example:

% cargo run \
  ingest ecomm.v1.clickstream file:///dldg/ecomm-ingest/ \
  --allowed_latency 120 \
  --app_id clickstream_ecomm \
  --auto_offset_reset earliest \
  --checkpoints \
  --kafka 'localhost:9092' \
  --max_messages_per_batch 2000 \
  --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' \
  --transform 'meta.kafka.offset: kafka.offset' \
  --transform 'meta.kafka.partition: kafka.partition' \
  --transform 'meta.kafka.topic: kafka.topic'

With the simple steps we’ve explored together, we can now easily ingest data from our Kafka topics. We have set ourselves up for success by ensuring that the folks consuming our data do so with a high level of reliability. The more we can automate, the lower the chance of human error getting in the way and resulting in incidents or in the dreaded data loss.

In the next section, we are going to explore Trino. Both prior examples play nice alongside the Trino ecosystem, as they reduce the level of effort to ingest and transform data prior to writing solid tables that can be analyzed through more traditional SQL tooling.

Trino

Trino is a distributed SQL query engine designed to seamlessly connect to and interoperate with a myriad of data sources. It provides a connector ecosystem that supports Delta Lake natively.

Note

Trino is the community-supported fork of the Presto project and was initially designed and developed in-house at Facebook. Trino was known as PrestoSQL before it was given its present name in 2020.

To learn more about Trino, check out Trino: The Definitive Guide (O’Reilly).

Getting Started

All we need to get started with Trino and Delta Lake is any version of Trino newer than version 373. At the time of writing, Trino is currently at version 459.

Connector requirements

While the Delta connector is natively included in the Trino distribution, there are still additional things we need to consider to ensure a frictionless experience.

Connecting to OSS or Databricks Delta Lake:

  • Delta Tables written by Databricks Runtime 7.3 LTS, 9.1 LTS, 10.4 LTS, 11.3 LTS, and >= 12.2 LTS.

  • Deployments using AWS, HDFS, Azure Storage, and Google Cloud Storage (GCS) are fully supported.

  • Network access from the coordinator and workers to the Delta Lake storage.

  • Access to the Hive Metastore (HMS).

  • Network access to HMS from the coordinator and workers. Port 9083 is the default port for the Thrift protocol used by HMS.

Working locally with Docker:

  • Trino Image

  • Hive Metastore (HMS) service (standalone)

  • Postgres or supported relational database management system (RDBMS) to store the HMS table properties, columns, databases, and other configurations (can point to managed RDBMS like RDS for simplicity)

  • Amazon S3 or MinIO (for object storage for our managed data warehouse)

The Docker Compose configuration in Example 4-10 shows how to configure a simple Trino container for local testing.

Example 4-10. Basic Trino Docker Compose
services:
  trinodb:
    image: trinodb/trino:426-arm64
    platform: linux/arm64
    hostname: trinodb
    container_name: trinodb
    volumes:
      - $PWD/etc/catalog/delta.properties:/etc/trino/catalog/delta.properties
      - $PWD/conf:/etc/hadoop/conf/
    ports:
      - target: 8080
        published: 9090
        protocol: tcp
        mode: host
    environment:
      - AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID
      - AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY
      - AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION:-us-west-1}
    networks:
      - dldg

The example in the next section assumes we have the following resources available to us:

  • Amazon S3 or MinIO (bucket provisioned, with a user, and roles set to allow read, write, and delete access). Using local MinIO to mock S3 is a simple way to try things out without any upfront costs. See the docker compose examples in the book’s GitHub repository under ch04/trinodb/.

  • MySQL or PostgreSQL. This can run locally, or we can set it up on our favorite cloud provider; for example, AWS RDS is a simple way to get started.

  • Hive Metastore (HMS) or Amazon Glue Data Catalog.

Next, we’ll learn how to configure the Delta Lake connector so that we can create a Delta catalog in Trino. If you want to learn more about using the Hive Metastore (HMS), including how to configure the hive-site.xml, how to include the required JARs for S3, and how to run HMS, you can read through “Running the Hive Metastore”. Otherwise, skip ahead to “Configuring and Using the Trino Connector”.

Running the Hive Metastore

If you already have a reliable metastore instance setup, you can modify the connection properties to use that instead. If you are looking to have a local setup, then we can begin with the creation of the hive-site.xml, which is shown in Example 4-11 and which is required to connect to both MySQL and Amazon S3.

Example 4-11. hive-site.xml for HMS
<configuration>
  <property>
    <name>hive.metastore.version</name>
    <value>3.1.0</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://RDBMS_REMOTE_HOSTNAME:3306/metastore</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.cj.jdbc.Driver</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>RDBMS_USERNAME</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>RDBMS_PASSWORD</value>
  </property>
  <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>s3a://dldgv2/delta/</value>
  </property>
   <property>
      <name>fs.s3a.access.key</name>
      <value>S3_ACCESS_KEY</value>
   </property>
   <property>
      <name>fs.s3a.secret.key</name>
      <value>S3_SECRET_KEY</value>
   </property>
  <property>
    <name>fs.s3.path-style-access</name>
    <value>true</value>
  </property>
  <property>
    <name>fs.s3a.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  </property>
</configuration>

The configuration provides the nuts and bolts we need to access the metadata database, using the JDBC connection URL, username, and password properties, as well as the data warehouse, using the hive.metastore.warehouse.dir and the properties prefixed with fs.s3a.

Next, we need to create a Docker Compose file to run the metastore, which we do in Example 4-12.

Example 4-12. Docker Compose for the Hive Metastore
version: "3.7"

services:
  metastore:
    image: apache/hive:3.1.3
    platform: linux/amd64
    hostname: metastore
    container_name: metastore
    volumes:
      - ${PWD}/jars/hadoop-aws-3.2.0.jar:/opt/hive/lib/
      - ${PWD}/jars/mysql-connector-java-8.0.23.jar:/opt/hive/lib/      
      - ${PWD}/jars/aws-java-sdk-bundle-1.11.375.jar:/opt/hive/lib/
      - ${PWD}/conf:/opt/hive/conf
    environment:
      - SERVICE_NAME=metastore
      - DB_DRIVER=mysql
      - IS_RESUME="true"
    expose:
      - 9083
    ports:
      - target: 9083
        published: 9083
        protocol: tcp
        mode: host
    networks:
      - dldg

With the metastore running, we are now in the driver’s seat to understand how to take advantage of the Trino connector for Delta Lake.

Configuring and Using the Trino Connector

Trino uses configuration files called catalogs. They are used to describe the catalog type (delta_lake, hive, and many more), and they enable us to tune a given catalog to optimize for reads and writes and to manage additional connector configurations. The minimum configuration for the Delta connector requires an addressable Hive Metastore location thrift:hostname:port (if using HMS). The other supported catalog at the time of writing is AWS Glue.

The code in Example 4-13 configures the connector pointing to the Hive Metastore.

Example 4-13. The Delta Lake connector properties
connector.name=delta_lake
hive.metastore=thrift
hive.metastore.uri=thrift://metastore:9083
delta.hive-catalog-name=metastore
delta.compression-codec=SNAPPY
delta.enable-non-concurrent-writes=true
delta.target-max-file-size=512MB
delta.unique-table-location=true
delta.vacuum.min-retention=7d
Warning

The property delta.enable-non-concurrent-writes must be set to true if there is a chance of multiple writers making nonatomic changes to a table. This is most often the case with Amazon S3; setting the property to true ensures that the table remains consistent.

The property file from Example 4-13 can be saved as delta.properties. As long as the file is copied into the Trino catalog directory (/etc/trino/catalog/), then we’ll be able to read, write, and delete from the underlying hive.metastore.warehouse.dir, and do a whole lot more.

Let’s look at what’s possible.

Using Show Catalogs

Using show catalogs is a simple first step to ensure that the Delta connector has been configured correctly and shows up as a resource:

trino> show catalogs;
Catalog
---------
 delta
 ...
(6 rows)

As long as we see delta in the list, we can move on to creating a schema. This confirms that our catalog is correctly configured.

Creating a Schema

The notion of a schema is a bit overloaded. We have schemas that represent the structured data describing the columns of our tables, but we also have schemas representing traditional databases. Using create schema enables us to generate a managed location within our data warehouse that can act as a boundary for access and governance, as well as to separate the physical table data among bronze, silver, and golden tables. We’ll learn more about the medallion architecture in Chapter 9, but for now let’s create a bronze_schema to store some raw tables:

trino> create schema delta.bronze_schema;
CREATE SCHEMA
Tip

If we are greeted by an exception rather than seeing CREATE SCHEMA returned, then it’s likely due to permissions issues writing to the physical warehouse. The following is an example of such an exception:

Query 20231001_182856_00004_zjwqg failed: Got exception: java.nio.file.AccessDeniedException s3a://com.newfront.dldgv2/delta/bronze_schema.db: getFileStatus on s3a://com.newfront.dldgv2/delta/bronze_schema.db: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403;

We can fix the problem by modifying our identity and access management (IAM) permissions or by ensuring we are using the correct IAM roles.

Show Schemas

This command allows us to query a catalog to view available schemas:

trino> show schemas from delta;
Schema
--------------------
 default
 information_schema
 bronze_schema
(3 rows)

If the schema we are looking for exists, then we are ready to move on to creating some tables.

Working with Tables

Table compatibility between the Trino and Delta ecosystems requires that we follow some guidelines. We’ll look at data type interoperability and then create a table, add some rows, and view the Delta metadata, including the transaction history and tracking changes for Change Data Feed (CDF)–enabled tables. We’ll conclude by looking at table optimization and vacuuming.

Data types

There are a few caveats to creating tables using Trino, especially when it comes to type mapping differences between Trino and Delta Lake. The table shown in Table 4-3 can be used to ensure that the appropriate types are used and to steer clear of incompatibility if our aim is interoperability.

Table 4-3. Delta to Trino type mapping
Delta data type Trino data type
BOOLEAN BOOLEAN
INTEGER INTEGER
BYTE TINYINT
SHORT SMALLINT
LONG BIGINT
FLOAT REAL
DOUBLE DOUBLE
DECIMAL(p,s) DECIMAL(p,s)
STRING VARCHAR
BINARY VARBINARY
DATE DATE
TIMESTAMPNTZ (TIMESTAMP_NTZ) TIMESTAMP(6)
TIMESTAMP TIMESTAMP(3) WITH TIME ZONE
ARRAY ARRAY
MAP MAP
STRUCT(...) ROW(...)

CREATE TABLE options

The supported table options (shown in Table 4-4) can be applied to our table using the WITH clause of the CREATE TABLE operation. This enables us to specify options on our tables that Trino wouldn’t otherwise understand. In the case of partitioning, Trino won’t automatically discover partitions, which could be a problem when it comes to the performance of SQL queries.

Table 4-4. CREATE TABLE options
Property name Description Default
location Filesystem location uniform resource identifier (URI) for table. This option is deprecated. Will use a managed table mapped to the location of the hive​.meta⁠store.warehouse.dir or Glue Catalog equivalent
partitioned_by Columns to partition the table by No partitions
checkpoint_interval How often to commit changes to Delta Lake Every 10 for open source software (OSS), and every 100 for Databricks (DBR)
change_data_feed_enabled Track changes made to the table for use in change data capture (CDC)/Change Data Feed (CDF) applications false
column_mapping_mode How to map the underlying Parquet columns: options (ID, name, none) none

Creating tables

We can create tables using the longform <catalog>.<schema>.<table> syntax, or the shortform syntax <table> after calling use delta.<schema>. Example 4-14 provides an example using the shortform create.

Example 4-14. Creating a Delta table with Trino
trino> use delta.bronze_schema;
CREATE TABLE ecomm_v1_clickstream (
  event_date DATE,
  event_time VARCHAR(255),
  event_type VARCHAR(255),
  product_id INTEGER,
  category_id BIGINT,
  category_code VARCHAR(255),
  brand VARCHAR(255),
  price DECIMAL(5,2),
  user_id INTEGER,
  user_session VARCHAR(255)
)
WITH (
    partitioned_by = ARRAY['event_date'],
    checkpoint_interval = 30,
    change_data_feed_enabled = false,
    column_mapping_mode = 'name'
);

The table generated using the DDL statement in Example 4-14 creates a managed table in our data warehouse that will be partitioned daily. The table structure represents the ecommerce data from the “Apache Flink” section earlier in this chapter.

Listing tables

Using show tables will allow us to view the collection of tables within a given schema in the Delta catalog:

trino:bronze_schema> show tables;
Table
----------------------
 ecomm_v1_clickstream
(1 row)

Inspecting tables

If we are not the owners of a given table, we can use describe to learn about the table through its metadata:

trino> describe delta.bronze_schema."ecomm_v1_clickstream";
    Column     |     Type     | Extra | Comment
---------------+--------------+-------+---------
 event_date    | date         |       |
 event_time    | varchar      |       |
 event_type    | varchar      |       |
 product_id    | integer      |       |
 category_id   | bigint       |       |
 category_code | varchar      |       |
 brand         | varchar      |       |
 price         | decimal(5,2) |       |
 user_id       | integer      |       |
 user_session  | varchar      |       |
(10 rows)

Using INSERT

Rows can be inserted directly using the command line, or through the use of the Trino client:

trino> INSERT INTO delta.bronze_schema."ecomm_v1_clickstream"
    VALUES
        (DATE '2023-10-01', '2023-10-01T19:10:05.704396Z', 'view', ...),
        (DATE('2023-10-01'), '2023-10-01T19:20:05.704396Z', 'view', ...);
INSERT: 2 rows

Querying Delta tables

Using the select operator allows you to query your Delta tables:

trino> select event_date, product_id, brand, price 
  -> from delta.bronze_schema."ecomm_v1_clickstream";
 event_date | product_id |  brand  | price
------------+------------+---------+--------
 2023-10-01 |   44600062 | nars    |  35.79
 2023-10-01 |   54600062 | lancome | 122.79
(2 rows)

Updating rows

The standard update operator is available:

trino> UPDATE delta.bronze_schema."ecomm_v1_clickstream"
    -> SET category_code = 'health.beauty.products'
    -> where category_id = 2103807459595387724;

Creating tables with selection

We can create a table using another table. This is referred to as CREATE TABLE AS, and it allows us to create a new physical Delta table by referencing another table:

trino> CREATE TABLE delta.bronze_schema."ecomm_lite" 
    AS select event_date, product_id, brand, price 
    FROM delta.bronze_schema."ecomm_v1_clickstream";

Table Operations

There are many table operations to consider for optimal performance, and for decluttering the physical filesystem in which our Delta tables live. Chapter 5 covers the common maintenance and table utility functions, and the following section covers what functions are available within the Trino connector.

Vacuum

The vacuum operation will clean up files that are no longer required in the current version of a given Delta table. In Chapter 5 we go into more detail about why vacuuming is required, as well as the caveats to keep in mind to support table recovery and rolling back to prior versions with time travel.

With respect to Trino, the Delta catalog property delta.vacuum.min-retention provides a gating mechanism to protect a table in case of an arbitrary call to vacuum with a low number of days or hours:

trino> CALL delta.system.vacuum('bronze_schema', 'ecomm_v1_clickstream', '1d');
Retention specified (1.00d) is shorter than the minimum retention configured in the system (7.00d). Minimum retention can be changed with delta.vacuum.min-retention configuration property or delta.vacuum_min_retention session property

Otherwise, the vacuum operation will delete the physical files that are no longer needed by the table.

Table optimization

Depending on the size of the table parts created as we make modifications to our tables with Trino, we run the risk of creating too many small files representing our tables. A simple technique to combine the small files into larger files is bin-packing optimize (which we cover in Chapter 5 and in the performance-tuning deep dive in Chapter 10). To trigger compaction, we can call ALTER TABLE with EXECUTE:

trino> ALTER TABLE delta.bronze_schema."ecomm_v1_clickstream" EXECUTE optimize;

We can also provide more hints to change the behavior of the optimize operation. The following will ignore files greater than 10 MB:

trino> ALTER TABLE delta.bronze_schema."ecomm_v1_clickstream" 
    -> EXECUTE optimize(file_size_threshold => '10MB')

The following will only attempt to compact table files within the partition (event_date = "2023-10-01"):

trino> ALTER TABLE delta.bronze_schema."ecomm_v1_clickstream" EXECUTE optimize
WHERE event_date = "2023-10-01"

Metadata tables

The connector exposes several metadata tables for each Delta Lake table that contain information about their internal structure. We can query these tables to learn more about our tables and to inspect changes and recent history.

Table history

Each transaction is recorded in the <table>$history metadata table:

trino> describe delta.bronze_schema."ecomm_v1_clickstream$history";
        Column        |            Type             | Extra | Comment
----------------------+-----------------------------+-------+---------
 version              | bigint                      |       |
 timestamp            | timestamp(3) with time zone |       |
 user_id              | varchar                     |       |
 user_name            | varchar                     |       |
 operation            | varchar                     |       |
 operation_parameters | map(varchar, varchar)       |       |
 cluster_id           | varchar                     |       |
 read_version         | bigint                      |       |
 isolation_level      | varchar                     |       |
 is_blind_append      | boolean                     |       |

We can query the metadata table. Let’s look at the last three transactions for our ecomm_v1_clickstream table:

trino> select version, timestamp, operation 
    -> from delta.bronze_schema."ecomm_v1_clickstream$history";
 version |          timestamp          |  operation
---------+-----------------------------+--------------
       0 | 2023-10-01 19:47:35.618 UTC | CREATE TABLE
       1 | 2023-10-01 19:48:41.212 UTC | WRITE
       2 | 2023-10-01 23:01:13.141 UTC | OPTIMIZE
(3 rows)

Change Data Feed

The Trino connector provides functionality for reading Change Data Feed (CDF) entries to expose row-level changes between two versions of a Delta Lake table. When the change_data_feed_enabled table property is set to true on a specific Delta Lake table, the connector records change events for all data changes on the table:

trino> use delta.bronze_schema;
CREATE TABLE ecomm_v1_clickstream (
  ...
)
WITH (
    change_data_feed_enabled = true
);

Now each row of each transaction is recorded (with the operation type), enabling us to rebuild the state of a table or to walk through the changes to a table after a specific point in time.

For example, if we’d like to view all changes since version 0 of a table, we could execute the following:

trino> select event_date, _change_type, _commit_version, _commit_timestamp 
from TABLE(
  delta.system.table_changes(
    schema_name => 'bronze_schema',
    table_name => 'ecomm_v1_clickstream',
    since_version => 0
  )
);

and view the changes made. In the example use case, we’ve simply inserted two rows:

 event_date | _change_type | _commit_version |      _commit_timestamp
------------+--------------+-----------------+-----------------------------
 2023-10-01 | insert       |               1 | 2023-10-01 19:48:41.212 UTC
 2023-10-01 | insert       |               1 | 2023-10-01 19:48:41.212 UTC
(2 rows)

Viewing table properties

It is useful to be able to view the table properties associated with our tables. We can use the metadata table <table>$properties to view the associated Delta TBLPROPERTIES:

trino> select * from  delta.bronze_schema."ecomm_v1_clickstream$properties";
               key               | value
---------------------------------+-------
 delta.enableChangeDataFeed      | true
 delta.columnMapping.maxColumnId | 10
 delta.columnMapping.mode        | name
 delta.checkpointInterval        | 30
 delta.minReaderVersion          | 2
 delta.minWriterVersion          | 5

Modifying table properties

If we want to modify the underlying table properties of our Delta table, we’ll need to use the Delta connectors alias for the supported table properties. For example, change_data_feed_enabled will map to the delta.enableChangeDataFeed property:

trino> ALTER TABLE delta.bronze_schema."ecomm_v1_clickstream"
SET PROPERTIES "change_data_feed_enabled" = false;

Deleting tables

Using the DROP TABLE operation, we can permanently remove a table that is no longer needed:

trino> DROP TABLE delta.bronze_schema."ecomm_lite";

There is a lot more that we can do with the Trino connector that is out of scope for this book; for now we will say goodbye to Trino and conclude this chapter.

Conclusion

During the time we spent together in this chapter, we learned how simple it can be to connect our Delta tables as either the source or the sink for our Flink applications. We then learned to use the Rust-based kafka-delta-ingest application to simplify the data ingestion process that is the bread and butter of most data engineers working with high-throughput streaming data. By reducing the level of effort required to simply read a stream of data and write it into our Delta tables, we end up in a much better place in terms of cognitive burden. When we start to think about all data in terms of tables—bounded or unbounded—the mental model can be applied to tame even the most wildly data-intensive problems. On that note, we concluded the chapter by exploring the native Trino connector for Delta. We discovered how simple configuration opens up the doors to analytics and insights, all while ensuring we continue to have a single source of data truth residing in our Delta tables.

1 For the full list of evolving integrations, see “Delta Lake Integrations” on the Delta Lake website.

2 The full ingestion flow application is available in the book’s GitHub repository under ch04/rust/kafka-delta-ingest.

Get Delta Lake: The Definitive Guide now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.