The variety of output files saved to the disk is the same as the variety of partitions within the Spark executors when the write operation is performed. Nevertheless, gauging the variety of partitions before performing the write operation might be tricky.
When reading a table, Spark defaults to read blocks with a maximum size of 128Mb (though you’ll be able to change this with sql.files.maxPartitionBytes
). Thus, the variety of partitions relies on the dimensions of the input. Yet in point of fact, the variety of partitions will most definitely equal the sql.shuffle.partitions
parameter. This number defaults to 200, but for larger workloads, it rarely is enough. Try this video to learn the best way to set the best variety of shuffle partitions.
The variety of partitions in Spark executors equals sql.shuffle.partitions
if there may be a minimum of one wide transformation within the ETL. If only narrow transformations are applied, the variety of partitions would match the number created when reading the file.
Setting the variety of shuffle partitions gives us high-level control of the whole partitions only when coping with non-partitioned tables. Once we enter the territory of partitioned tables, changing the sql.shuffle.partitions
parameter won’t easily steer the dimensions of every data file.
We have now two primary ways to administer the variety of partitions at runtime: repartition()
and coalesce()
. Here’s a fast breakdown:
Repartition
:repartition(partitionCols, n_partitions)
is a lazy transformation with two parameters – the variety of partitions and the partitioning column(s). When performed, Spark shuffles the partitions across the cluster in accordance with the partitioning column. Nevertheless, once the table is saved, information in regards to the repartitioning is lost. Subsequently, this convenient piece of knowledge won’t be used when reading the file.
df = df.repartition("column_name", n_partitions)
Coalesce
:coalesce(num_partitions)
can also be a lazy transformation, nevertheless it only takes one argument – the variety of partitions. Importantly, the coalesce operation doesn’t shuffle data across the cluster — subsequently it’s faster thanrepartition
. Also, coalesce can only reduce the variety of partitions, it won’t work if attempting to increase the variety of partitions.
df = df.coalesce(num_partitions)
The first insight to remove here is that using the coalesce method is mostly more useful. That’s to not say that repartitioning isn’t useful; it definitely is, particularly when we’d like to regulate the variety of partitions in a dataframe at runtime.
In my experience with ETL processes, where I take care of multiple tables of various sizes and perform complex transformations and joins, I’ve found that sql.shuffle.partitions
doesn’t offer the precise control I would like. For example, using the identical variety of shuffle partitions for joining two small tables and two large tables in the identical ETL can be inefficient — resulting in an overabundance of small partitions for the small tables or insufficient partitions for the massive tables. Repartitioning also has the additional advantage of helping me sidestep issues with skewed joins and skewed data [2].
That being said, repartitioning is less suitable prior to writing the table to disk, and typically, it could get replaced with coalesce. Coalesce takes the upper hand over repartition before writing to disk for a few reasons:
- It prevents an unnecessary reshuffling of information across the cluster.
- It allows data ordering in accordance with a logical heuristic. When using the repartition method before writing, data is reshuffled across the cluster, causing a loss in its order. Alternatively, using coalesce retains the order as data is gathered together moderately than being redistributed.
Let’s see why ordering the info is crucial.
We mentioned above how after we apply the repartition
method, Spark won’t save the partitioning information within the metadata of the table. Nevertheless, when coping with big data, this is an important piece of knowledge for 2 reasons:
- It allows scanning through the table way more quickly at query time.
- It allows higher compression — if coping with a compressible format (comparable to parquet, CSV, Json, etc). This is a terrific article to know why.
The important thing takeaway is to order the info before saving. The knowledge can be retained within the metadata, and it’s going to be used at query time, making the query much faster.
Let’s now explore the differences between saving to a non-partitioned table and a partitioned table and why saving to a partitioned table requires some extra adjustments.
In relation to non-partitioned tables, managing the variety of files throughout the save operation is a direct process. Utilising the coalesce
method before saving will accomplish the duty, no matter whether the info is sorted or not.
# Example of using coalesce method before saving a non-partitioned table
df.coalesce(10).write.format("parquet").save("/path/to/output")
Nevertheless, this method isn’t effective when handling partitioned tables, unless the info is arranged prior to coalescing. To understand why this happens, we’d like to delve into the actions happening inside Spark executors when the info is ordered versus when it isn’t [fig.2].