Home Artificial Intelligence Utilizing PyArrow to Improve pandas and Dask Workflows

Utilizing PyArrow to Improve pandas and Dask Workflows

0
Utilizing PyArrow to Improve pandas and Dask Workflows

Get probably the most out of PyArrow support in pandas and Dask at once

All images were created by the creator

Introduction

This post investigates where we will use PyArrow to enhance our pandas and Dask workflows at once. General support for PyArrow dtypes was added with pandas 2.0 to pandas and Dask. This solves a bunch of long-standing pains for users of each libraries. pandas users often complain to me that pandas doesn’t support missing values in arbitrary dtypes or that non-standard dtypes usually are not thoroughly supported. A very annoying problem for Dask users is running out of memory with large datasets. PyArrow backed string columns eat as much as 70% less memory in comparison with NumPy object columns and thus have the potential to mitigate this problem in addition to providing an enormous performance improvement.

Support for PyArrow dtypes in pandas, and by extension Dask, remains to be relatively latest. I’d recommend caution when opting into the PyArrow dtype_backend until at the least pandas 2.1 is released. Not every a part of each APIs is optimized yet. You need to have the opportunity to get a giant improvement in certain workflows though. This post will go over a few examples where I’d recommend switching to PyArrow straight away, since it already provides huge advantages.

Dask itself can profit in various ways from PyArrow dtypes. We are going to investigate how PyArrow backed strings can easily mitigate the pain point of running out of memory on Dask clusters and the way we will improve performance through utilizing PyArrow.

I’m a part of the pandas core team and was heavily involved in implementing and improving PyArrow support in pandas. I’ve recently joined Coiled where I’m working on Dask. Considered one of my tasks is improving the PyArrow integration in Dask.

General overview of PyArrow support

PyArrow dtypes were initially introduced in pandas 1.5. The implementation was experimental and I wouldn’t recommend using it on pandas 1.5.x. Support for them remains to be relatively latest. pandas 2.0 provides an enormous improvement, including making opting into PyArrow backed DataFrames easy. We’re still working on supporting them properly in all places, and thus they must be used with caution until at the least pandas 2.1 is released. Each projects work repeatedly to enhance support throughout Dask and pandas.

We encourage users to try them out! This may help us to get a greater idea of what remains to be lacking support or just isn’t fast enough. Giving feedback helps us improve support and can drastically reduce the time that’s obligatory to create a smooth user experience.

Dataset

We are going to use the taxi dataset from Recent York City that comprises all Uber and Lyft rides. It has some interesting attributes like price, suggestions, driver pay and lots of more. The dataset might be found here (see terms of service) and is stored in parquet files. When analyzing Dask queries, we’ll use a publicly available S3 bucket to simplify our queries: s3://coiled-datasets/uber-lyft-tlc/. We are going to use the dataset from December 2022 for our pandas queries, since that is the utmost that matches comfortably into memory on my machine (24GB of RAM). We’ve to avoid stressing our RAM usage, since this might introduce unwanted effects when analyzing performance.

We can even investigate the performance of read_csv. We are going to use the Crimes in Chicago dataset that might be found here.

Dask cluster

There are numerous different options to establish a Dask cluster, see the Dask documentation for a non-exhaustive list of deployment options. I’ll use Coiled to create a cluster on AWS with 30 machines through:

import coiled

cluster = coiled.Cluster(
n_workers=30,
name="dask-performance-comparisons",
region="us-east-2", # that is the region of our dataset
worker_vm_type="m6i.large",
)

Coiled is connected to my AWS account. It creates the cluster inside my account and manages all resources for me. 30 machines are enough to operate on our dataset comfortably. We are going to investigate how we will reduce the required variety of employees to fifteen through some small modifications.

pandas StringDtype backed by PyArrow

We start with a feature that was originally introduced over 3 years ago in pandas 1.0. Setting the dtype in pandas or Dask to string returns an object with StringDtype. This feature is comparatively mature and will provide a smooth user experience.

Historically, pandas represented string data through NumPy arrays with dtype object. NumPy object data is stored as an array of pointers pointing to the actual data in memory. This makes iterating over an array containing strings very slow. pandas 1.0 initially introduced said StringDtype that allowed easier and consistent operations on strings. This dtype was still backed by Python strings and thus, wasn’t very performant either. Somewhat, it provided a transparent abstraction of string data.

pandas 1.3 finally introduced an enhancement to create an efficient string dtype. This datatype is backed by PyArrow arrays. PyArrow provides a knowledge structure that allows performant and memory efficient string operations. Ranging from that time on, users could use a string dtype that was contiguous in memory and thus very fast. This dtype might be requested through string[pyarrow]. Alternatively, we will request it by specifying string because the dtype and setting:

pd.options.mode.string_storage = "pyarrow"

Since Dask builds on top of pandas, this string dtype is out there here as well. On top of that, Dask offers a convenient option that mechanically converts all string-data to string[pyarrow].

dask.config.set({"dataframe.convert-string": True})

This can be a convenient way of avoiding NumPy object dtype for string columns. Moreover, it has the advantage that it creates PyArrow arrays natively for I/O methods that operate with Arrow objects. On top of providing huge performance improvements, PyArrow strings eat significantly less memory. A mean Dask DataFrame with PyArrow strings consumes around 33–50% of the unique memory in comparison with NumPy object. This solves the largest pain point for Dask users that’s running out of memory when operating on large datasets. The choice enables global testing in Dask’s test suite. This ensures that PyArrow backed strings are mature enough to offer a smooth user experience.

Let’s have a look at just a few operations that represent typical string operations. We are going to start with a few pandas examples before switching over to operations on our Dask cluster.

We are going to use df.convert_dtypes to convert our object columns to PyArrow string arrays. There are more efficient ways of getting PyArrow dtypes in pandas that we are going to explore later. We are going to use the Uber-Lyft dataset from December 2022, this file matches comfortably into memory on my machine.

import pandas as pd

pd.options.mode.string_storage = "pyarrow"

df = pd.read_parquet(
"fhvhv_tripdata_2022-10.parquet",
columns=[
"tips",
"hvfhs_license_num",
"driver_pay",
"base_passenger_fare",
"dispatching_base_num",
],
)
df = df.convert_dtypes(
convert_boolean=False,
convert_floating=False,
convert_integer=False,
)

Our DataFrame has NumPy dtypes for all non-string columns in this instance. Let’s start with filtering for all rides that were operated by Uber.

df[df["hvfhs_license_num"] == "HV0003"]

This operation creates a mask with True/False values that specify whether Uber operated a ride. This doesn’t utilize any special string methods, however the equality comparison dispatches to PyArrow. Next, we’ll use the String accessor that’s implemented in pandas and provides you access to every kind of string operations on a per-element basis. We wish to seek out all rides that were dispatched from a base starting with "B028".

df[df["dispatching_base_num"].str.startswith("B028")]

startswith iterates over our array and checks whether every string starts with the desired substring. The advantage of PyArrow is straightforward to see. The info are contiguous in memory, which implies that we will efficiently iterate over them. Moreover, these arrays have a second array with pointers that time to the primary memory address of each string, which makes computing the starting sequence even faster.

Finally, we have a look at a GroupBy operation that groups over PyArrow string columns. The calculation of the groups can dispatch to PyArrow as well, which is more efficient than factorizing over a NumPy object array.

df.groupby(
["hvfhs_license_num", "dispatching_base_num"]
).mean(numeric_only=True)

Let’s have a look at how these operations stack up against DataFrames where string columns are represented by NumPy object dtype.

The outcomes are kind of as we expected. The string based comparisons are significantly faster when performed on PyArrow strings. Most string accessors should provide an enormous performance improvement. One other interesting remark is memory usage, it’s reduced by roughly 50% in comparison with NumPy object dtype. We are going to take a more in-depth have a look at this with Dask.

Dask mirrors the pandas API and dispatches to pandas for many operations. Consequently, we will use the identical API to access PyArrow strings. A convenient choice to request these globally is the choice mentioned above, which is what we’ll use here:

dask.config.set({"dataframe.convert-string": True})

Considered one of the largest advantages of this feature during development is that it enables easy testing of PyArrow strings globally in Dask to make sure that that every little thing works easily. We are going to utilize the Uber-Lyft dataset for our explorations. The dataset takes up around 240GB of memory on our cluster. Our initial cluster has 30 machines, which is sufficient to perform our computations comfortably.

import dask
import dask.dataframe as dd
from distributed import wait

dask.config.set({"dataframe.convert-string": True})

df = dd.read_parquet(
"s3://coiled-datasets/uber-lyft-tlc/",
storage_options={"anon": True},
)
df = df.persist()
wait(df) # Wait till the computation is finished

We persist the info in memory in order that I/O performance doesn’t influence our performance measurements. Our data is now available in memory, which makes access fast. We are going to perform computations which can be just like our pandas computations. Considered one of the essential goals is to point out that the advantages from pandas will translate to computations in a distributed environment with Dask.

Considered one of the primary observations is that the DataFrame with PyArrow backed string columns consumes only 130GB of memory, only half of what it consumed with NumPy object columns. We’ve only just a few string columns in our DataFrame, which implies that the memory savings for string columns are literally higher than around 50% when switching to PyArrow strings. Consequently, we’ll reduce the dimensions of our cluster to fifteen employees when performing our operations on PyArrow string columns.

cluster.scale(15)

We measure the performance of the mask-operation and one among the String accessors together through subsequent filtering of the DataFrame.

df = df[df["hvfhs_license_num"] == "HV0003"]
df = df[df["dispatching_base_num"].str.startswith("B028")]
df = df.persist()
wait(df)

We see that we will use the identical methods as in our previous example. This makes transitioning from pandas to Dask relatively easy.

Moreover, we’ll again compute a GroupBy operation on our data. That is significantly harder in a distributed environment, which makes the outcomes more interesting. The previous operations parallelize relatively easy onto a big cluster, while that is harder with GroupBy.

df = df.groupby(
["hvfhs_license_num", "dispatching_base_num"]
).mean(numeric_only=True)

df = df.persist()
wait(df)

We get nice improvements by aspects of two and three. This is particularly intriguing since we reduced the dimensions of our cluster from 30 machines to fifteen, reducing the associated fee by 50%. Subsequently, we also reduced our computational resources by an element of two, which makes our performance improvement much more impressive. Thus, the performance improved by an element of 4 and 6 respectively. We will perform the identical computations on a smaller cluster, which saves money and is more efficient typically, and still get a performance boost out of it.

Summarizing, we saw that PyArrow string-columns are an enormous improvement when comparing them to NumPy object columns in DataFrames. Switching to PyArrow strings is a comparatively small change that may improve the performance and efficiency of a median workflow that will depend on string data immensely. These improvements are equally visible in pandas and Dask!

Engine keyword in I/O methods

We are going to now take a have a look at I/O functions in pandas and Dask. Some functions have custom implementations, like read_csv, while others dispatch to a different library, like read_excel to openpyxl. A few of these functions gained a latest engine keyword that allows us to dispatch to PyArrow. The PyArrow parsers are multithreaded by default and hence, can provide a big performance improvement.

pd.read_csv("Crimes_-_2001_to_Present.csv", engine="pyarrow")

This configuration will return the identical results as the opposite engines. The one difference is that PyArrow is used to read the info. The identical option is out there for read_json. The PyArrow-engines were added to offer a faster way of reading data. The improved speed is simply one among the benefits. The PyArrow parsers return the info as a PyArrow Table. A PyArrow Table provides built-in functionality to convert to a pandas DataFrame. Depending on the info, this might require a duplicate while casting to NumPy (string, integers with missing values, …), which brings an unnecessary slowdown. That is where the PyArrow dtype_backend is available in. It’s implemented as an ArrowExtensionArray class in pandas, which is backed by a PyArrow ChunkedArray. As a direct consequence, the conversion from a PyArrow Table to pandas is amazingly low cost because it doesn’t require any copies.

pd.read_csv(
"Crimes_-_2001_to_Present.csv",
engine="pyarrow",
dtype_backend="pyarrow",
)

This returns a DataFrame that’s backed by PyArrow arrays. pandas is not optimized in all places yet, so this will offer you a slowdown in follow-up operations. It is likely to be value it if the workload is especially I/O heavy. Let us take a look at a direct comparison:

We will see that PyArrow-engine and PyArrow dtypes provide a 15x speedup in comparison with the C-engine.

The identical benefits apply to Dask. Dask wraps the pandas csv reader and hence, gets the identical features free of charge.

The comparison for Dask is a little more complicated. Firstly, my example reads the info from my local machine while our Dask examples will read the info from a S3 bucket. Network speed shall be a relevant component. Also, distributed computations have some overhead that we now have to account for.

We’re purely in search of speed here, so we’ll read some timeseries data from a public S3 bucket.

import dask.dataframe as dd
from distributed import wait

df = dd.read_csv(
"s3://coiled-datasets/timeseries/20-years/csv/",
storage_options={"anon": True},
engine="pyarrow",
parse_dates=["timestamp"],
)
df = df.persist()
wait(df)

We are going to execute this code-snippet for engine="c", engine="pyarrow" and moreover engine="pyarrow" with dtype_backend="pyarrow". Let us take a look at some performance comparisons. Each examples were executed with 30 machines on the cluster.

The PyArrow-engine runs around 2 times as fast because the C-engine. Each implementations used the identical variety of machines. The memory usage was reduced by 50% with the PyArrow dtype_backend. The identical reduction is out there if only object columns are converted to PyArrow strings, which supplies a greater experience in follow-up operations.

We’ve seen that the Arrow-engines provide significant speedups over the custom C implementations. They don’t support all features of the custom implementations yet, but in case your use-case is compatible with the supported options, you need to get a big speedup free of charge.

The case with the PyArrow dtype_backend is a little more complicated. Not all areas of the API are optimized yet. In case you spend numerous time processing your data outside I/O functions, then this may not offer you what you would like. It should speed up your processing in case your workflow spends numerous time reading the info.

dtype_backend in PyArrow-native I/O readers

Another I/O methods have an engine keyword as well. read_parquet is the preferred example. The situation is a bit different here though. These I/O methods were already using the PyArrow engine by default. So the parsing is as efficient as possible. One other potential performance profit is the usage of the dtype_backend keyword. Normally, PyArrow will return a PyArrow table which is then converted to a pandas DataFrame. The PyArrow dtypes are converted to their NumPy equivalent. Setting dtype_backend="pyarrow" avoids this conversion. This provides a good performance improvement and saves numerous memory.

Let’s have a look at one pandas performance comparison. We read the Uber-Lyft taxi data from December 2022.

pd.read_parquet("fhvhv_tripdata_2022-10.parquet")

We read the info with and without dtype_backend="pyarrow".

We will easily see that probably the most time is taken up by the conversion after the reading of the Parquet file was finished. The function runs 3 times as fast when avoiding the conversion to NumPy dtypes.

Dask has a specialized implementation for read_parquet that has some benefits tailored to distributed workloads in comparison with the pandas implementation. The common denominator is that each functions dispatch to PyArrow to read the parquet file. Each have in common that the info are converted to NumPy dtypes after successfully reading the file. We’re reading the entire Uber-Lyft dataset, which consumes around 240GB of memory on our cluster.

import dask.dataframe as dd
from distributed import wait

df = dd.read_parquet(
"s3://coiled-datasets/uber-lyft-tlc/",
storage_options={"anon": True},
)
df = df.persist()
wait(df)

We read the dataset in 3 different configurations. First with the default NumPy dtypes, then with the PyArrow string option turned on:

dask.config.set({"dataframe.convert-string": True})

And lastly with dtype_backend="pyarrow". Let us take a look at what this implies performance-wise:

Much like our pandas example, we will see that converting to NumPy dtypes takes up an enormous chunk of our runtime. The PyArrow dtypes give us a pleasant performance improvement. Each PyArrow configurations use half of the memory that the NumPy dtypes are using.

PyArrow-strings are so much more mature than the overall PyArrow dtype_backend. Based on the performance chart we got, we get roughly the identical performance improvement when using PyArrow strings and NumPy dtypes for all other dtypes. If a workflow doesn’t work well enough on PyArrow dtypes yet, I’d recommend enabling PyArrow strings only.

Conclusion

We’ve seen how we will leverage PyArrow in pandas in Dask at once. PyArrow backed string columns have the potential to affect most workflows in a positive way and supply a smooth user experience with pandas 2.0. Dask has a convenient choice to globally avoid NumPy object dtype when possible, which makes opting into PyArrow backed strings even easier. PyArrow also provides huge speedups in other areas where available. The PyArrow dtype_backend remains to be pretty latest and has the potential to chop I/O times significantly at once. It’s actually value exploring whether it could actually solve performance bottlenecks. There’s numerous work happening to enhance support for general PyArrow dtypes with the potential to hurry up a median workflow within the near future.

There’s a current proposal in pandas to start out inferring strings as PyArrow backed strings by default ranging from pandas 3.0. Moreover, it includes many more areas where leaning more onto PyArrow makes numerous sense (e.g. Decimals, structured data, …). You may read up on the proposal here.

Thanks for reading. Be at liberty to succeed in out within the comments to share your thoughts and feedback about PyArrow support in each libraries. I’ll write follow up posts focused on this topic and pandas typically. Follow me on Medium in the event you wish to read more about pandas and Dask.

LEAVE A REPLY

Please enter your comment!
Please enter your name here