Chapter 4. Dask DataFrame

Pandas DataFrames, while popular, quickly run into memory constraints as data sizes grow, since they store the entirety of the data in memory. Pandas DataFrames have a robust API for all kinds of data manipulation and are frequently the starting point for many analytics and machine learning projects. While pandas itself does not have machine learning built in, data scientists often use it as part of data and feature preparation during the exploratory phase of new projects. As such, scaling pandas DataFrames to be able to handle large datasets is of vital importance to many data scientists. Most data scientists are already familiar with the pandas libraries, and Dask’s DataFrame implements much of the pandas API while adding the ability to scale.

Dask is one of the first to implement a usable subset of the pandas APIs, but other projects such as Spark have added their approaches. This chapter assumes you have a good understanding of the pandas DataFrame APIs; if not, you should check out Python for Data Analysis.

You can often use Dask DataFrames as a replacement for pandas DataFrames with minor changes, thanks to duck-typing. However, this approach can have performance drawbacks, and some functions are not present. These drawbacks come from the distributed parallel nature of Dask, which adds communication costs for certain types of operations. In this chapter, you will learn how to minimize these performance drawbacks and work around any missing functionality.

Dask DataFrames require that your data and your computation are well suited to pandas DataFrames. Dask has bags for unstructured data, arrays for array-structured data, the Dask delayed interface for arbitrary functions, and actors for stateful operations. If even at a small scale you wouldn’t consider using pandas for your problem, Dask DataFrames are probably not the right solution.

How Dask DataFrames Are Built

Dask DataFrames build on top of pandas DataFrames. Each partition is stored as a pandas DataFrame.1 Using pandas DataFrames for the partitions simplifies the implementation of much of the APIs. This is especially true for row-based operations, where Dask passes the function call down to each pandas DataFrame.

Most of the distributed components of Dask DataFrames use the three core building blocks map_partitions, reduction, and rolling. You mostly won’t need to call these functions directly; you will use higher-level APIs instead. But understanding these functions and how they work is important to understanding how Dask works. shuffle is a critical building block of distributed DataFrames for reorganizing your data. Unlike the other building blocks, you may use it directly more frequently, as Dask is unable to abstract away partitioning.

Loading and Writing

Data analytics is only as valuable as the data it has access to, and our insights are helpful only if they result in action. Since not all of our data is in Dask, it’s essential to read and write data from the rest of the world. So far, the examples in this book have mainly used local collections, but you have many more options.

Dask supports reading and writing many standard file formats and filesystems. These formats include CSV, HDF, fixed-width, Parquet, and ORC. Dask supports many of the standard distributed filesystems, from HDFS to S3, and reading from regular filesystems.

Most important for Dask, distributed filesystems allow multiple computers to read and write to the same set of files. Distributed filesystems often store data on multiple computers, which allows for storing more data than a single computer can hold. Often, but not always, distributed filesystems are also fault tolerant (which they achieve through replication). Distributed filesystems can have important performance differences from what you are used to working with, so it’s important to skim the user documentation for the filesystems you are using. Some things to look for are block sizes (you often don’t want to write files smaller than these, as the rest is wasted space), latency, and consistency guarantees.

Tip

Reading from regular local files can be complicated in Dask, as the files need to exist on all workers. If a file exists only on the head node, consider copying it to a distributed filesystem like S3 or NFS, or load it locally and use Dask’s client.scatter function to distribute the data if it’s small enough. Sufficiently small files may be a sign that you don’t yet need Dask, unless the processing on them is complex or slow.

Formats

Dask’s DataFrame loading and writing functions start with to_ or read_ as the prefixes. Each format has its own configuration, but in general, the first positional argument is the location of the data to be read. The location can be a wildcard path of files (e.g., s3://test-bucket/magic/*), a list of files, or a regular file location.

Note

Wildcard paths work only with filesystems that support directory listing. For example, they do not work on HTTP.

When loading data, having the right number of partitions will speed up all of your operations. Sometimes it’s not possible to load the data with the right number of partitions, and in those cases you can repartition your data after the load. As discussed, more partitions allow for more parallelism but have a non-zero overhead. The different formats have slightly different ways to control this. HDF takes chunksize, indicating the number of rows per partition. Parquet also takes split_row_groups, which takes an integer of the desired logical partitioning out of the Parquet file, and Dask will split the whole set into those chunks, or less. If not given, the default behavior is for each partition to correspond to a Parquet file. The text-based formats (CSV, fixed-width, etc.) take a blocksize parameter with the same meaning as Parquet’s chunksize but a maximum value of 64 MB. You can verify this by loading a dataset and seeing the number of tasks and partitions increase with smaller target sizes, as in Example 4-1.

Example 4-1. Dask DataFrame loading CSV with 1 KB chunks
many_chunks = dd.read_csv(url, blocksize="1kb")
many_chunks.index

Loading CSV and JSON files can be more complicated than Parquet, and other self-describing data types don’t have any schema information encoded. Dask DataFrames need to know the types of the different columns to serialize the data correctly. By default, Dask will automatically look at the first few records and guess the data types for each column. This process is known as schema inference, and it can be quite slow.

Unfortunately, schema inference does not always work. For example, if you try to load the UK’s gender pay gap disparity data from https​://gender-pay-gap​.ser⁠vice.gov.uk/viewing/download-data/2021, when you access the data, as in Example 4-2, you will get an error of “Mismatched dtypes found in pd.read​_csv/pd.read_table.” When Dask’s column type inference is incorrect, you can override it (per column) by specifying the dtype parameter, as shown in Example 4-3.

Example 4-2. Dask DataFrame loading CSV, depending entirely on inference
df = dd.read_csv(
    "https://gender-pay-gap.service.gov.uk/viewing/download-data/2021")
Example 4-3. Dask DataFrame loading CSV and specifying data type
df = dd.read_csv(
    "https://gender-pay-gap.service.gov.uk/viewing/download-data/2021",
    dtype={'CompanyNumber': 'str', 'DiffMeanHourlyPercent': 'float64'})
Note

In theory, you can have Dask sample more records by specifying more bytes with the sample parameter, but this does not currently fix the problem. The current sampling code does not strictly respect the number of bytes requested.

Even when schema inference does not return an error, depending on it has a number of drawbacks. Schema inference involves sampling data, and its results are therefore both probabilistic and slow. When you can, you should use self-describing formats or otherwise avoid schema inference; your data loading will be faster and more reliable. Some common self-describing formats you may encounter include Parquet, Avro, and ORC.

Reading and writing from/to new file formats is a lot of work, especially if there are no existing Python libraries. If there are existing libraries, you might find it easier to read the raw data into a bag and parse it with a map function, which we will explore further in the next chapter.

Tip

Dask does not detect sorted data on load. Instead, if you have presorted data, add the sorted=true parameter when setting an index to take advantage of your already sorted data, a step you will learn about in the next section. If you specify this when the data is not sorted, however, you may get silent data corruption.

You can also connect Dask to databases or microservices. Relational databases are a fantastic tool and are often quite performant at both simple reads and writes. Often relational databases support distributed deployment whereby the data is split up on multiple nodes, and this is mostly used with large datasets. Relational databases tend to be great at handling transactions at scale, but running analytic capabilities on the same node can encounter issues. Dask can be used to efficiently read and compute over SQL databases.

You can use Dask’s built-in support for loading SQL databases using SQLAlchemy. For Dask to split up the query on multiple machines, you need to give it an index key. Often SQL databases will have a primary key or numerical index key that you can use for this purpose (e.g., read_sql_table("customers", index_col="customer_id")). An example of this is shown in Example 4-4.

Example 4-4. Reading from and writing to SQL with Dask DataFrame
from sqlite3 import connect
from sqlalchemy import sql
import dask.dataframe as dd

#sqlite connection
db_conn = "sqlite://fake_school.sql"
db = connect(db_conn)

col_student_num = sql.column("student_number")
col_grade = sql.column("grade")
tbl_transcript = sql.table("transcripts")

select_statement = sql.select([col_student_num,
                              col_grade]
                              ).select_from(tbl_transcript)

#read from sql db
ddf = dd.read_sql_query(select_stmt,
                        npartitions=4,
                        index_col=col_student_num,
                        con=db_conn)

#alternatively, read whole table
ddf = dd.read_sql_table("transcripts",
                        db_conn,
                        index_col="student_number",
                        npartitions=4
                        )

#do_some_ETL...

#save to db
ddf.to_sql("transcript_analytics",
           uri=db_conn,
           if_exists='replace',
           schema=None,
           index=False
           )

More advanced connections to databases or microservices are best made using the bag interface and writing your custom load code, which you will learn more about in the next chapter.

Filesystems

Loading data can be a substantial amount of work and a bottleneck, so Dask distributes this like most other tasks. If you are using Dask distributed, each worker must have access to the files to parallelize the loading. Rather than copying the file to each worker, network filesystems allow everyone to access the files. Dask’s file access layer uses the FSSPEC library (from the intake project) to access the different filesystems. Since FSSPEC supports a range of filesystems, it does not install the requirements for every supported filesystem. Use the code in Example 4-5 to see which filesystems are supported and which ones need additional packages.

Example 4-5. Getting a list of FSSPEC-supported filesystems
from fsspec.registry import known_implementations
known_implementations

Many filesystems require some kind of configuration, be it endpoint or credentials. Often new filesystems, like MinIO, offer S3-compatible APIs but overload the endpoint and require some extra configuration to function. With Dask you specify the configuration parameters to the read/write function with storage​_options. Everyone’s configuration here will likely be a bit different.2 Dask will use your storage_options dict as the keyword arguments to the underlying FSSPEC implementation. For example, my storage_options for MinIO are shown in Example 4-6.

Example 4-6. Configuring Dask to talk to MinIO
minio_storage_options = {
    "key": "YOURACCESSKEY",
    "secret": "YOURSECRETKEY",
    "client_kwargs": {
        "endpoint_url": "http://minio-1602984784.minio.svc.cluster.local:9000",
        "region_name": 'us-east-1'
    },
    "config_kwargs": {"s3": {"signature_version": 's3v4'}},
}

Indexing

Indexing into a DataFrame is one of the powerful features of pandas, but it comes with some restrictions when moving into a distributed system like Dask. Since Dask does not track the size of each partition, positional indexing by row is not supported. You can use positional indexing for columns, as well as label indexing for columns or rows.

Indexing is frequently used to filter the data to have only the components you need. We did this for the San Francisco COVID-19 data by looking at just the case rates for people of all vaccine statuses, as shown in Example 4-7.

Example 4-7. Dask DataFrame indexing
mini_sf_covid_df = (sf_covid_df
                    [sf_covid_df['vaccination_status'] == 'All']
                    [['specimen_collection_date', 'new_cases']])

If you truly need positional indexing by row, you can implement your own by computing the size of each partition and using this to select the desired partition subsets. This is very inefficient, so Dask avoids implementing it directly; make an intentional choice before doing this.

Shuffles

As mentioned in the previous chapter, shuffles are expensive. The primary causes of the expensive nature of shuffles are the serialization overhead in moving data between processes and the comparative slowness of networks relative to reading data from memory. These costs scale as the amount of data being shuffled increases, so Dask has techniques to reduce the amount of data being shuffled. These techniques depend on certain data properties or the operation being performed.

Rolling Windows and map_overlap

One situation that can trigger the need for a shuffle is a rolling window, where at the edges of a partition your function needs some records from its neighbors. Dask DataFrame has a special map_overlap function in which you can specify a look-after window (also called a look-ahead window) and look-behind window (also called a look-back window) of rows to transfer (either an integer or a time delta). The simplest example taking advantage of this is a rolling average, shown in Example 4-8.

Example 4-8. Dask DataFrame rolling average
def process_overlap_window(df):
    return df.rolling('5D').mean()


rolling_avg = partitioned_df.map_overlap(
    process_overlap_window,
    pd.Timedelta('5D'),
    0)

Using map_overlap allows Dask to transfer only the data needed. For this implementation to work correctly, your minimum partition size needs to be larger than your largest window.

Warning

Dask’s rolling windows will not cross multiple partitions. If your DataFrame is partitioned so that the look-after or look-back is greater than the length of the neighbor’s partition, the results will either fail or be incorrect. Dask validates this for time delta look-after, but no such checks are performed for look-backs or integer look-after.

An effective but expensive technique for working around the single-partition look-ahead/look-behind of Dask is to repartition your Dask DataFrames.

Aggregations

Aggregations are another special case that can reduce the amount of data that needs to be transferred over the network. Aggregations are functions that combine records. If you are coming from a map/reduce or Spark background, reduceByKey is the classic aggregation. Aggregations can either be “by key” or be global across an entire DataFrame.

To aggregate by key, you first need to call groupby with the column(s) representing the key, or the keying function to aggregate on. For example, calling df.groupby("PostCode") groups your DataFrame by postal code, or calling df.groupby(["PostCode", "SicCodes"]) uses a combination of columns for grouping. Function-wise, many of the same pandas aggregates are available, but the performance of aggregates in Dask are very different from local pandas DataFrames.

Tip

If you’re aggregating by partition key, Dask can compute the aggregation without needing a shuffle.

The first way to speed up your aggregations is to reduce the columns that you are aggregating on, since the fastest data to process is no data. Finally, when possible, doing multiple aggregations at the same time reduces the number of times the same data needs to be shuffled. Therefore, if you need to compute the average and the max, you should compute both at the same time (see Example 4-9).

Example 4-9. Dask DataFrame max and mean
dask.compute(
    raw_grouped[["new_cases"]].max(),
    raw_grouped[["new_cases"]].mean())

For distributed systems like Dask, if an aggregation can be partially evaluated and then merged, you can potentially combine some records pre-shuffle. Not all partial aggregations are created equal. What matters with partial aggregations is that the amount of data is reduced when merging values with the same key compared to the original multiple values.

The most efficient aggregations take a sublinear amount of space regardless of the number of records. Some of these, such as sum, count, first, minimum, maximum, mean, and standard deviation, can take constant space. More complicated tasks, like quantiles and distinct counts, also have sublinear approximation options. These approximation options can be great, as exact answers can require linear growth in storage.3

Some aggregation functions are not sublinear in growth, but tend to or might not grow too quickly. Counting the distinct values is in this group, but if all your values are unique there is no space saving.

To take advantage of efficient aggregations, you need to use a built-in aggregation from Dask, or write your own using Dask’s aggregation class. Whenever you can, use a built-in. Built-ins not only require less effort but also are often faster. Not all of the pandas aggregates are directly supported in Dask, so sometimes your only choice is to write your own aggregate.

If you choose to write your own aggregate, you have three functions to define: chunk for handling each group-partition/chunk, agg to combine the results of chunk between partitions, and (optionally) finalize to take the result of agg and produce a final value.

The fastest way to understand how to use partial aggregation is by looking at an example that uses all three functions. Using the weighted average in Example 4-10 can help you think of what is needed for each function. The first function needs to compute the weighted values and the weights. The agg function combines these by summing each side part of the tuple. Finally, the finalize function divides the total by the weights.

Example 4-10. Dask custom aggregate
# Write a custom weighted mean, we get either a DataFrameGroupBy
# with multiple columns or SeriesGroupBy for each chunk
def process_chunk(chunk):
    def weighted_func(df):
        return (df["EmployerSize"] * df["DiffMeanHourlyPercent"]).sum()
    return (chunk.apply(weighted_func), chunk.sum()["EmployerSize"])


def agg(total, weights):
    return (total.sum(), weights.sum())


def finalize(total, weights):
    return total / weights


weighted_mean = dd.Aggregation(
    name='weighted_mean',
    chunk=process_chunk,
    agg=agg,
    finalize=finalize)

aggregated = (df_diff_with_emp_size.groupby("PostCode")
              ["EmployerSize", "DiffMeanHourlyPercent"].agg(weighted_mean))

In some cases, such as with a pure summation, you don’t need to do any post-processing on agg’s output, so you can skip the finalize function.

Not all aggregations must be by key; you can also compute aggregations across all rows. Dask’s custom aggregation interface, however, is exposed only with by-key operations.

Dask’s built-in full DataFrame aggregations use a lower-level interface called apply_contact_apply for partial aggregations. Rather than learn two different APIs for partial aggregations, we prefer to do a static groupby by providing a constant grouping function. This way, we only have to know one interface for aggregations. You can use this to find the aggregate COVID-19 numbers across the DataFrame, as shown in Example 4-11.

Example 4-11. Aggregating across the entire DataFrame
raw_grouped = sf_covid_df.groupby(lambda x: 0)

When built-in aggregation exists, it will likely be better than anything we would write. Sometimes a partial aggregation is partially implemented, as in the case of Dask’s HyperLogLog: it is implemented only for full DataFrames. You can often translate simple aggregations using apply_contact_apply or aca by copying the chunk function, using the combine parameter for agg, and using the aggregate parameter for finalize. This is shown via porting Dask’s HyperLogLog implementation in Example 4-12.

Example 4-12. Wrapping Dask’s HyperLogLog in dd.Aggregation
# Wrap Dask's hyperloglog in dd.Aggregation

from dask.dataframe import hyperloglog

approx_unique = dd.Aggregation(
    name='approx_unique',
    chunk=hyperloglog.compute_hll_array,
    agg=hyperloglog.reduce_state,
    finalize=hyperloglog.estimate_count)

aggregated = (df_diff_with_emp_size.groupby("PostCode")
              ["EmployerSize", "DiffMeanHourlyPercent"].agg(weighted_mean))

Slow/inefficient aggregations (or those very likely to cause an out-of-memory exception) use storage proportional to the records being aggregated. Examples from this slow group include making a list and naively computing exact quantiles.4 With these slow aggregates, using Dask’s aggregation class has no benefit over the apply API, which you may wish to use for simplicity. For example, if you just wanted a list of employer IDs by postal code, rather than having to write three functions you could use a one-liner like df.groupby("PostCode")["EmployerId"].apply(lambda g: list(g)). Dask implements the apply function as a full shuffle, which is covered in the next section.

Warning

Dask is unable to apply partial aggregations when you use the apply function.

Full Shuffles and Partitioning

If an operation seems to be slower inside of Dask than you would expect from working in local DataFrames, it might be because it requires a full shuffle. An example of this is sorting, which is inherently expensive in distributed systems because it most often requires a shuffle. Full shuffles are sometimes an unavoidable part of working in Dask. Counterintuitively, while full shuffles are themselves slow, you can use them to speed up future operations that are all happening on the same grouping key(s). As mentioned in the aggregation section, one of the ways a full shuffle is triggered is by using the apply method when partitioning is not aligned.

Partitioning

You will most commonly use full shuffles to repartition your data. It’s important to have the right partitioning when dealing with aggregations, rolling windows, or look-ups/indexing. As discussed in the rolling window section, Dask cannot do more than one partition’s worth of look-ahead or look-behind, so having the right partitioning is required to get the correct results. For most other operations, having incorrect partitioning will slow down your job.

Dask has three primary methods for controlling the partitioning of a DataFrame: set_index, repartition, and shuffle (see Table 4-1). You use set_index when changing the partitioning to a new key/index. repartition keeps the same key/index but changes the splits. repartition and set_index take similar parameters, with repartition not taking an index key name. In general, if you are not changing the column used for the index, you should use repartition. shuffle is a bit different since it does not produce a known partitioning scheme that operations like groupby can take advantage of.

Table 4-1. Functions to control partitioning
Method Changes index key Sets number of partitions Results in a known partitioning scheme Ideal use case

set_index

Yes

Yes

Yes

Changing the index key

repartition

No

Yes

Yes

Increasing/decreasing number of partitions

shuffle

No

Yes

No

Skewed distribution of keya

a Hashes the key for distribution, which can help randomly distribute skewed data if the keys are unique (but clustered).

The first step in getting the right partitioning for your DataFrame is to decide whether you want an index. Indexes are useful when filtering data by an indexed value, indexing, grouping, and for almost any other by-key operation. One such by-key operation would be a groupby, in which the column being grouped on could be a good candidate for the key. If you use a rolling window over a column, that column must be the key, which makes choosing the key relatively easy. Once you’ve decided on an index, you can call set_index with the column name of the index (e.g., set_index("PostCode")). This will, under most circumstances, result in a shuffle, so it’s a good time to size your partitions.

Tip

If you’re unsure what the current key used for partitioning is, you can check the index property to see the partitioning key.

Once you’ve chosen your key, the next question is how to size your partitions. The advice in “Partitioning/Chunking Collections” generally applies here: shoot for enough partitions to keep each machine busy, but keep in mind the general sweet spot of 100 MB to 1 GB. Dask generally computes pretty even splits if you give it a target number of partitions.5 Thankfully, set_index will also take npartitions. To repartition the data by postal code, with 10 partitions, you would add set_index("PostCode", npartitions=10); otherwise Dask will default to the number of input partitions.

If you plan to use rolling windows, you will likely need to ensure that you have the right size (in terms of key range) covered in each partition. To do this as part of set_index, you would need to compute your own divisions to ensure each partition has the right range of records present. Divisions are specified as a list starting from the minimal value of the first partition up to the maximum value of the last partition. Each value in between is a “cut” point between the pandas DataFrames that make up the Dask DataFrame. To make a DataFrame with partitions [0, 100) [100, 200), [200, 300), [300, 500), you would write df.set_index("NumEmployees", divisions=[0, 100, 200, 300, 500]). Similarly, for the date range to support a rolling window of up to seven days from around the start of the COVID-19 pandemic to today, see Example 4-13.

Example 4-13. Dask DataFrame rolling window with set_index
divisions = pd.date_range(
    start="2021-01-01",
    end=datetime.today(),
    freq='7D').tolist()
partitioned_df_as_part_of_set_index = mini_sf_covid_df.set_index(
    'specimen_collection_date', divisions=divisions)
Warning

Dask, including for rolling time windows, assumes that your partition index is monotonically increasing.6

So far, you’ve had to specify the number of partitions, or the specific divisions, but you might be wondering if Dask can just figure that out itself. Thankfully, Dask’s repartition function has the ability to pick divisions for a given target size, as shown in Example 4-14. However, doing this has a non-trivial cost, as Dask must evaluate the DataFrame as well as the repartition itself.

Example 4-14. Dask DataFrame automatic partitioning
reparted = indexed.repartition(partition_size="20kb")
Warning

Dask’s set_index has a similar partition_size parameter but, as of this writing, works only to reduce the number of partitions.

As you saw at the start of this chapter, when writing a DataFrame, each partition is given its own file, but sometimes this can result in files that are too big or too small. Some tools can accept only one file as input, so you need to repartition everything into a single partition. At other times, the data storage system is optimized for a certain file size, like the HDFS default block size of 128 MB. The good news is that techniques such as repartition and set_index solve these problems for you.

Embarrassingly Parallel Operations

Dask’s map_partitions function applies a function to each of the partitions underlying pandas DataFrames, and the result is also a pandas DataFrame. Functions implemented with map_partitions are embarrassingly parallel since they don’t require any inter-worker transfer of data.7 Dask implements map with map_partitions, as well as many row-wise operations. If you want to use a row-wise operation that you find missing, you can implement it yourself, as shown in Example 4-15.

Example 4-15. Dask DataFrame fillna
def fillna(df):
    return df.fillna(value={"PostCode": "UNKNOWN"}).fillna(value=0)


new_df = df.map_partitions(fillna)
# Since there could be an NA in the index clear the partition / division
# information
new_df.clear_divisions()

You aren’t limited to calling pandas built-ins. Provided that your function takes and returns a DataFrame, you can do pretty much anything you want inside map​_parti⁠tions.

The full pandas API is too long to cover in this chapter, but if a function can operate on a row-by-row basis without any knowledge of the rows before or after, it may already be implemented in Dask DataFrames using map_partitions.

When using map_partitions on a DataFrame, you can change anything about each row, including the key that it is partitioned on. If you are changing the values in the partition key, you must either clear the partitioning information on the resulting DataFrame with clear_divisions() or specify the correct indexing with set_index, which you’ll learn more about in the next section.

Warning

Incorrect partitioning information can result in incorrect results, not just exceptions, as Dask may miss relevant data.

Working with Multiple DataFrames

Pandas and Dask have four common functions for combining DataFrames. At the root is the concat function, which allows you to join DataFrames on any axis. Concatenating DataFrames is generally slower in Dask since it involves inter-worker communication. The other three functions are join, merge, and append, all of which implement special cases for common situations on top of concat and have slightly different performance considerations. Having good divisions/partitioning, in terms of key selection and number of partitions, makes a huge difference when working on multiple DataFrames.

Dask’s join and merge functions take most of the standard pandas arguments along with an extra optional one, npartitions. npartitions specifies a target number of output partitions, but it is used only for hash joins (which you’ll learn about in “Multi-DataFrame Internals”). Both functions automatically repartition your input DataFrames if needed. This is great, as you might not know the partitioning, but since repartitioning can be slow, explicitly using the lower-level concat function when you don’t expect any partitioning changes to be needed can help catch performance problems early. Dask’s join can take more than two DataFrames at a time only when doing a left or outer join type.

Tip

Dask has special logic to speed up multi-DataFrame joins, so in most cases, rather than doing a.join(b).join(c).join(d)​.join(e), you will benefit from doing a.join([b, c, d, e]). However, if you are performing a left join with a small dataset, then the first syntax may be more efficient.

When you combine or concat DataFrames by row (similar to a SQL UNION), the performance depends on whether divisions of the DataFrames being combined are well ordered. We call the divisions of a series of DataFrames well ordered if all the divisions are known and the highest division of the previous DataFrame is below that of the lowest division of the next. If any input has an unknown division, Dask will produce an output without known partitioning. With all known partitions, Dask treats row-based concatenations as a metadata-only change and will not perform any shuffle. This requires that there is no overlap between the divisions. There is also an extra interleave_partitions parameter, which will change the join type for row-based combinations to one without the input partitioning restriction and result in a known partitioner. Dask DataFrames with known partitioners can support faster look-ups and operations by key.

Dask’s column-based concat (similar to a SQL JOIN) also has restrictions around the divisions/partitions of the DataFrames it is combining. Dask’s version of concat supports only inner or full outer join, not left or right. Column-based joins require that all inputs have known partitioners and also result in a DataFrame with known partitioning. Having a known partitioner can be useful for subsequent joins.

Warning

Don’t use Dask’s concat when operating by row on a DataFrame with unknown divisions, as it will likely return incorrect results.8

Multi-DataFrame Internals

Dask uses four techniques—​hash, broadcast, partitioned, and stack_partitions—to combine DataFrames, and each has very different performance. These four functions do not map 1:1 with the join functions you choose from. Rather, Dask chooses the technique based on the indexes, divisions, and requested join type (e.g., outer/left/inner). The three column-based join techniques are hash joins, broadcast joins, and partitioned joins. When doing row-based combinations (e.g., append), Dask has a special technique called stack_partitions that is extra fast. It’s important that you understand the performance of each of these techniques and the conditions that will cause Dask to pick each approach:

Hash joins

The default that Dask uses when no other join technique is suitable. Hash joins shuffle the data for all the input DataFrames to partition on the target key. They use the hash values of keys, which results in a DataFrame that is not in any particular order. As such, the result of a hash join does not have any known divisions.

Broadcast joins

Ideal for joining large DataFrames with small DataFrames. In a broadcast join, Dask takes the smaller DataFrame and distributes it to all the workers. This means that the smaller DataFrame must be able to fit in memory. To tell Dask that a DataFrame is a good candidate for broadcasting, you make sure it is all stored in one partition, such as by calling repartition(npartitions=1).

Partitioned joins

Occur when combining DataFrames along an index where the partitions/​divi⁠sions are known for all the DataFrames. Since the input partitions are known, Dask is able to align the partitions between the DataFrames, involving less data transfer, as each output partition has less than a full set of inputs.

Since partitioned and broadcast joins are faster, doing some work to help Dask can be worth it. For example, concatenating several DataFrames with known and aligned partitions/divisions and one unaligned DataFrame will result in an expensive hash join. Instead, try to either set the index and partition on the remaining DataFrame or join the less expensive DataFrames first and then perform the expensive join after.

The fourth technique, stack_partitions, is different from the other options since it doesn’t involve any movement of data. Instead, the resulting DataFrame partitions list is a union of the upstream partitions from the input DataFrames. Dask uses stack_partitions for most row-based combinations except when all of the input DataFrame divisions are known, they are not well ordered, and you ask Dask to interleave_partitions. The stack_partitions technique is able to provide known partitioning in its output only when the input divisions are known and well ordered. If all of the divisions are known but not well ordered and you set interleave​_parti⁠tions, Dask will use a partitioned join instead. While this approach is comparatively inexpensive, it is not free, and it can result in an excessively large number of partitions, requiring you to repartition anyway.

Missing Functionality

Not all multi-DataFrame operations are implemented, like compare, which leads us into the next section about the limitations of Dask DataFrames.

What Does Not Work

Dask’s DataFrame implements most, but not all, of the pandas DataFrame API. Some of the pandas API is not implemented in Dask because of the development time involved. Other parts are not used to avoid exposing an API that would be unexpectedly slow.

Sometimes the API is just missing small parts, as both pandas and Dask are under active development. An example is the split function from Example 2-10. In local pandas, instead of doing split().explode(), you could have called split(expand=true). Some of these missing parts can be excellent places for you to get involved and contribute to the Dask project if you are interested.

Some libraries do not parallelize as well as others. In these cases, a common approach is to try to filter or aggregate the data down enough that it can be represented locally and then apply the local libraries to the data. For example, with graphing, it’s common to pre-aggregate the counts or take a random sample and graph the result.

While much of the pandas DataFrame API will work, before you swap in Dask DataFrame, it’s important to make sure you have good test coverage to catch the situations where it does not.

What’s Slower

Usually, using Dask DataFrames will improve performance, but not always. Generally, smaller datasets will perform better in local pandas. As discussed, anything involving shuffles is generally slower in a distributed system than in a local one. Iterative algorithms can also produce large graphs of operations, which are slow to evaluate in Dask compared to traditional greedy evaluation.

Some problems are generally unsuitable for data-parallel computing. For example, writing out to a data store with a single lock that has more parallel writers will increase the lock contention and may make it slower than if a single thread was doing the writing. In these situations, you can sometimes repartition your data or write individual partitions to avoid lock contention.

Handling Recursive Algorithms

Dask’s lazy evaluation, powered by its lineage graph, is normally beneficial, allowing it to combine steps automatically. However, when the graph gets too large, Dask can struggle to manage it, which often shows up as a slow driver process or notebook, and sometimes as an out-of-memory exception. Thankfully, you can work around this by writing out your DataFrame and reading it back in. Generally, Parquet is the best format for doing this as it is space-efficient and self-describing, so no schema inference is required.

Re-computed Data

Another challenge of lazy evaluation is if you want to reuse an element multiple times. For example, say you want to load a few DataFrames and then compute multiple pieces of information. You can ask Dask to keep a collection (including DataFrame, series, etc.) in memory by running client.persist(collection). Not all re-computed data needs to be avoided; for example, if loading the DataFrames is fast enough, it might be fine not to persist them.

Warning

Like other functions in Dask, persist() does not modify the DataFrame—​and if you call functions on it you will still have your data re-computed. This is notably different from Apache Spark.

How Other Functions Are Different

For performance reasons, various parts of Dask DataFrames behave a little differently than local DataFrames:

reset_index

The index will start back over at zero on each partition.

kurtosis

This function does not filter out NaNs and uses SciPy defaults.

concat

Instead of coercing category types, each category type is expanded to the union of all the categories it is concatenated with.

sort_values

Dask supports only single-column sorts.

Joining multiple DataFrames

When joining more than two DataFrames at the same time, the join type must be either outer or left.

When porting your code to use Dask DataFrames, you should be especially mindful anytime you use these functions, as they might not exactly work in the axis you intended. Work small first and test the correctness of the numbers, as issues can often be tricky to track down.

When porting existing pandas code to Dask, consider using the local single-machine version to produce test datasets to compare the results with, to ensure that all changes are intentional.

Data Science with Dask DataFrame: Putting It Together

Dask DataFrame has already proven to be a popular framework for big data uses, so we wanted to highlight a common use case and considerations. Here, we use a canonical data science challenge dataset, the New York City yellow taxicab, and walk through what a data engineer working with this dataset might consider. In the subsequent chapters covering ML workloads, we will be using many of the DataFrame tools to build on.

Deciding to Use Dask

As discussed earlier, Dask excels in data-parallel tasks. A particularly good fit is a dataset that may already be available in columnar format, like Parquet. We also assess where the data lives, such as in S3 or in other remote storage options. Many data scientists and engineers would probably have a dataset that cannot be contained on a single machine or cannot be stored locally due to compliance constraints. Dask’s design lends itself well to these use cases.

Our NYC taxi data fits all these criteria: the data is stored in S3 by the City of New York in Parquet format, and it is easily scalable up and down, as it is partitioned by dates. Additionally, we evaluate that the data is structured already, so we can use Dask DataFrame. Since Dask DataFrames and pandas DataFrames are similar, we can also use a lot of existing workflows for pandas. We can sample a few of these, do our exploratory data analysis in a smaller dev environment, and then scale it up to the full dataset, all with the same code. Note that for Example 4-16, we use row groups to specify chunking behavior.

Example 4-16. Dask DataFrame loading multiple Parquet files
filename = './nyc_taxi/*.parquet'
df_x = dd.read_parquet(
    filename,
    split_row_groups=2
)

Exploratory Data Analysis with Dask

The first step of data science often consists of exploratory data analysis (EDA), or understanding the dataset and plotting its shape. Here, we use Dask DataFrames to walk through the process and examine the common troubleshooting issues that arise from nuanced differences between pandas DataFrame and Dask DataFrame.

Loading Data

The first time you load the data into your dev environment, you might encounter block size issues or schema issues. While Dask tries to infer both, at times it cannot. Block size issues will often show up when you call .compute() on trivial code and see one worker hitting the memory ceiling. In that case, some manual work would be involved in determining the right chunk size. Schema issues would show up as an error or a warning as you read the data, or in subtle ways later on, such as mismatching float32 and float64. If you know the schema already, it’s a good idea to enforce that by specifying dtypes at reading.

As you further explore a dataset, you might encounter data printed by default in a format that you don’t like, for example, scientific notation. The control for that is through pandas, not Dask itself. Dask implicitly calls pandas, so you want to explicitly set your preferred format using pandas.

Summary statistics on the data work just like .describe() from pandas, along with specified percentiles or .quantile(). Remember to chain multiple computes together if you are running several of these, which will save compute time back and forth. Using Dask DataFrame describe is shown in Example 4-17.

Example 4-17. Dask DataFrame describing percentiles with pretty formatting
import pandas as pd

pd.set_option('display.float_format', lambda x: '%.5f' % x)
df.describe(percentiles=[.25, .5, .75]).compute()

Plotting Data

Plotting data is often an important step in getting to know your dataset. Plotting big data is a tricky subject. We as data engineers often get around that issue by first working with a smaller sampled dataset. For that, Dask would work alongside a Python plotting library such as matplotlib or seaborn, just like pandas. The advantage of Dask DataFrame is that we are now able to plot the entire dataset, if desired. We can use plotting frameworks along with Dask to plot the entire dataset. Here, Dask does the filtering, the aggregation on the distributed workers, and then collects down to one worker to give to a non-distributed library like matplotlib to render. Plotting a Dask DataFrame is shown in Example 4-18.

Example 4-18. Dask DataFrame plotting trip distance
import matplotlib.pyplot as plt
import seaborn as sns 
import numpy as np

get_ipython().run_line_magic('matplotlib', 'inline')
sns.set(style="white", palette="muted", color_codes=True)
f, axes = plt.subplots(1, 1, figsize=(11, 7), sharex=True)
sns.despine(left=True)
sns.distplot(
    np.log(
        df['trip_distance'].values +
        1),
    axlabel='Log(trip_distance)',
    label='log(trip_distance)',
    bins=50,
    color="r")
plt.setp(axes, yticks=[])
plt.tight_layout()
plt.show()
Tip

Note that if you’re used to the NumPy logic, you will have to think of the Dask DataFrame layer when plotting. For example, NumPy users would be familiar with df[col].values syntax for defining plotting variables. The .values mean a different action in Dask; what we pass is df[col] instead.

Inspecting Data

Pandas DataFrame users would be familiar with .loc() and .iloc() for inspecting data at a particular row or column. This logic translates to Dask DataFrame, with important differences in .iloc() behaviors.

A sufficiently large Dask DataFrame will contain multiple pandas DataFrames. This changes the way we should think about numbering and addressing indices. For example, .iloc() (a way to access the positions by index) doesn’t work exactly the same for Dask, since each smaller DataFrame would have its own .iloc() value, and Dask does not track the size of each smaller DataFrame. In other words, a global index value is hard for Dask to figure out, since Dask will have to iteratively count through each DataFrame to get to an index. Users should check .iloc() on their DataFrame and ensure that indices return the correct values.

Tip

Be aware that calling methods like .reset_index() can reset indices in each of the smaller DataFrames, potentially returning multiple values when users call .iloc().

Conclusion

In this chapter, you’ve learned how to understand what kinds of operations are slower than you might expect with Dask. You’ve also gained a number of techniques to deal with the performance differences between pandas DataFrames and Dask DataFrames. By understanding the situations in which Dask DataFrames performance may not meet your needs, you’ve also gained an understanding of what problems are not well suited to Dask. So that you can put this all together, you’ve also learned about Dask DataFrame IO options. From here you will go on to learn more about Dask’s other collections and then how to move beyond collections.

In this chapter, you have learned what may cause your Dask DataFrames to behave differently or more slowly than you might expect. This same understanding of how Dask DataFrames are implemented can help you decide whether distributed DataFrames are well suited to your problem. You’ve also seen how to get datasets larger than a single machine can handle into and out of Dask’s DataFrames.

1 See “Partitioning/Chunking Collections” for a review of partitioning.

2 FSSPEC documentation includes the specifics for configuring each of the backends.

3 This can lead to out-of-memory exceptions while executing the aggregation. The linear growth in storage requires that (within a constant factor) all the data must be able to fit on a single process, which limits how effective Dask can be.

4 Alternate algorithms for exact quantiles depend on more shuffles to reduce the space overhead.

5 Key-skew can make this impossible for a known partitioner.

6 Strictly increasing with no repeated values (e.g., 1, 4, 7 is monotonically increasing, but 1, 4, 4, 7 is not).

7 Embarrassingly parallel problems are ones in which the overhead of distributed computing and communication is low.

8 Dask assumes the indices are aligned when there are no indices present.

Get Scaling Python with Dask now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.