Parquet Content-Defined Chunking

-


Krisztian Szucs's avatar


Reduce Parquet file upload and download times on Hugging Face Hub by leveraging the brand new Xet storage layer and Apache Arrow’s Parquet Content-Defined Chunking (CDC) feature enabling more efficient and scalable data workflows.

TL;DR: Parquet Content-Defined Chunking (CDC) is now available in PyArrow and Pandas, enabling efficient deduplication of Parquet files on content-addressable storage systems like Hugging Face’s Xet storage layer. CDC dramatically reduces data transfer and storage costs by uploading or downloading only the modified data chunks. Enable CDC by passing the use_content_defined_chunking argument:

import pandas as pd
import pyarrow.parquet as pq

df.to_parquet("hf://datasets/{user}/{repo}/path.parquet", use_content_defined_chunking=True)
pq.write_table(table, "hf://datasets/{user}/{repo}/path.parquet", use_content_defined_chunking=True)



Table of Contents



Introduction

Apache Parquet is a columnar storage format that’s widely utilized in the information engineering community.

As of today, Hugging Face hosts nearly 21 PB of datasets, with Parquet files alone accounting for over 4 PB of that storage. Optimizing Parquet storage is subsequently a high priority.
Hugging Face has introduced a brand new storage layer called Xet that leverages content-defined chunking to efficiently deduplicate chunks of knowledge reducing storage costs and improving download/upload speeds.

While Xet is format agnostic, Parquet’s layout and column-chunk (data page) based compression can produce entirely different byte-level representations for data with minor changes, resulting in suboptimal deduplication performance. To handle this, the Parquet files must be written in a way that minimizes the byte-level differences between similar data, which is where content-defined chunking (CDC) comes into play.

Let’s explore the performance advantages of the brand new Parquet CDC feature used alongside Hugging Face’s Xet storage layer.



Data Preparation

For demonstration purposes, we are going to use a manageable sized subset of OpenOrca dataset.

import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
from huggingface_hub import hf_hub_download


def shuffle_table(table, seed=40):
    rng = np.random.default_rng(seed)
    indices = rng.permutation(len(table))
    return table.take(indices)



path = hf_hub_download(
    repo_id="Open-Orca/OpenOrca", 
    filename="3_5M-GPT3_5-Augmented.parquet", 
    repo_type="dataset"
)


orca = pq.read_table(path, schema=pa.schema([
    pa.field("id", pa.string()),
    pa.field("system_prompt", pa.string()),
    pa.field("question", pa.large_string()),
    pa.field("response", pa.large_string()),
]))


orca = orca.add_column(
    orca.schema.get_field_index("query"),
    "question_length",
    pc.utf8_length(orca["question"])
)
orca = orca.add_column(
    orca.schema.get_field_index("response"),
    "response_length",
    pc.utf8_length(orca["response"])
)


orca = shuffle_table(orca)


table = orca[:100_000]


table[:3].to_pandas()
id system_prompt question_length query response_length response
0 cot.64099 You might be an AI assistant that helps people find… 241 Consider the query. What’s the euphrates l… 1663 The query is asking what the Euphrates Rive…
1 flan.1206442 You might be an AI assistant. You shall be given a t… 230 Single/multi-select query: Is it possible t… 751 It isn’t possible to conclude that the cowboy…
2 t0.1170225 You might be an AI assistant. User will you give yo… 1484 Q:I’m taking a test and should guess the righ… 128 The passage mainly tells us what things are im…



Upload the table as a Parquet file to Hugging Face Hub

Since pyarrow>=21.0.0 we will use Hugging Face URIs within the pyarrow functions to directly read and write parquet (and other file formats) files to the Hub using the hf:// URI scheme.

>>> pq.write_table(table, "hf://datasets/kszucs/pq/orca.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████| 96.1MB / 96.1MB, 48.0kB/s  
Total Bytes:  96.1M
Total Transfer:  96.1M

We will see that the table has been uploaded entirely (total bytes == total transfer) as latest data since it isn’t known to the Xet storage layer yet. Now read it back as a pyarrow table:

downloaded_table = pq.read_table("hf://datasets/kszucs/pq/orca.parquet")
assert downloaded_table.equals(table)

Note that each one pyarrow functions that accept a file path also accept a Hugging Face URI, like pyarrow datasets,
CSV functions, incremental Parquet author or reading only the parquet metadata:

pq.read_metadata("hf://datasets/kszucs/pq/orca.parquet")

  created_by: parquet-cpp-arrow version 21.0.0-SNAPSHOT
  num_columns: 6
  num_rows: 100000
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 4143



Different Use Cases for Parquet Deduplication

To show the effectiveness of the content-defined chunking feature, we are going to check out the way it performs in case of:

  1. Re-uploading exact copies of the table
  2. Adding/removing columns from the table
  3. Changing column types within the table
  4. Appending latest rows and concatenating tables
  5. Inserting / deleting rows within the table
  6. Change row-group size of the table
  7. Using Various File-Level Splits



1. Re-uploading Exact Copies of the Table

While this use case sounds trivial, traditional file systems don’t deduplicate files leading to full re-upload and re-download of the information. In contrast, a system utilizing content-defined chunking can recognize that the file content is an identical and avoid unnecessary data transfer.

>>> pq.write_table(table, "hf://datasets/kszucs/pq/orca-copy.parquet")
Recent Data Upload: |                                                   |  0.00B /  0.00B,  0.00B/s  
Total Bytes:  96.1M
Total Transfer:  0.00

We will see that no latest data has been uploaded, and the operation was instantaneous. Now let’s have a look at what happens if we upload the identical file again but to a distinct repository:

>>> pq.write_table(table, "hf://datasets/kszucs/pq-copy/orca-copy-again.parquet")
Recent Data Upload: |                                                   |  0.00B /  0.00B,  0.00B/s  
Total Bytes:  96.1M
Total Transfer:  0.00

The upload was instantaneous again since deduplication works across repositories as well. It is a key feature of the Xet storage layer, allowing efficient data sharing and collaboration. You may read more about the main points and scaling challenges within the From Chunks to Blocks: Accelerating Uploads and Downloads on the Hub blog post.



2. Adding and Removing Columns from the Table

First write out the unique and adjusted tables to local parquet files to see their sizes:

table_with_new_columns = table.add_column(
    table.schema.get_field_index("response"),
    "response_short",
    pc.utf8_slice_codeunits(table["response"], 0, 10)
)
table_with_removed_columns = table.drop(["response"])
    
pq.write_table(table, "/tmp/original.parquet")
pq.write_table(table_with_new_columns, "/tmp/with-new-columns.parquet")
pq.write_table(table_with_removed_columns, "/tmp/with-removed-columns.parquet")
!ls -lah /tmp/*.parquet
-rw-r--r--  1 kszucs  wheel    92M Jul 22 14:47 /tmp/original.parquet
-rw-r--r--  1 kszucs  wheel    92M Jul 22 14:47 /tmp/with-new-columns.parquet
-rw-r--r--  1 kszucs  wheel    67M Jul 22 14:47 /tmp/with-removed-columns.parquet

Now upload them to Hugging Face to see how much data is definitely transferred:

>>> pq.write_table(table_with_new_columns, "hf://datasets/kszucs/pq/orca-added-columns.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████|  575kB /  575kB,  288kB/s  
Total Bytes:  96.6M
Total Transfer:  575k

We will see that only the brand new columns and the brand new parquet metadata placed within the file’s footer were uploaded, while the unique data was not transferred again. It is a huge advantage of the Xet storage layer, because it allows us to efficiently add latest columns without transferring all the dataset again.

Same applies to removing columns, as we will see below:

>>> pq.write_table(table_with_removed_columns, "hf://datasets/kszucs/pq/orca-removed-columns.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████| 37.7kB / 37.7kB, 27.0kB/s  
Total Bytes:  70.6M
Total Transfer:  37.7k

To have a greater understanding of what has been uploaded, we will visualize the differences between the 2 parquet files using the deduplication estimation tool:

from de import visualize

visualize(table, table_with_new_columns, title="With Recent Columns", prefix="orca")



With Recent Columns

Compression Vanilla Parquet
None Parquet none nocdc
Dedup Stats 157.4 MB / 313.8 MB = 50%
Snappy Parquet snappy nocdc
Dedup Stats 96.7 MB / 192.7 MB = 50%

Adding two latest columns mean that we have now unseen data pages which should be transferred (highlighted in red), but the remaining of the information stays unchanged (highlighted in green), so it isn’t transferred again. Note the small red area within the footer metadata which nearly all the time changes as we modify the parquet file. The dedup stats show / = where smaller ratios mean higher deduplication performance.

Also visualize the difference after removing a column:

visualize(table, table_with_removed_columns, title="With Removed Columns", prefix="orca")



With Removed Columns

Compression Vanilla Parquet
None Parquet none nocdc
Dedup Stats 156.6 MB / 269.4 MB = 58%
Snappy Parquet snappy nocdc
Dedup Stats 96.1 MB / 166.7 MB = 57%

Since we’re removing entire columns we will only see changes within the footer metadata, all the opposite columns remain unchanged and already existing within the storage layer, in order that they aren’t transferred again.



3. Changing Column Types within the Table

One other common use case is changing the column types within the table e.g. to scale back the storage size or to optimize the information for specific queries. Let’s change the question_length column from int64 data type to int32 and see how much data is transferred:



table_without_text = table_with_new_columns.drop(["question", "response"])


table_with_casted_column = table_without_text.set_column(
    table_without_text.schema.get_field_index("question_length"),
    "question_length",
    table_without_text["question_length"].forged("int32")
)
>>> pq.write_table(table_with_casted_column, "hf://datasets/kszucs/pq/orca-casted-column.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████|  181kB /  181kB,  113kB/s  
Total Bytes:  1.80M
Total Transfer:  181k

Again, we will see that only the brand new column and the updated parquet metadata were uploaded. Now visualize the deduplication heatmap:

visualize(table_without_text, table_with_casted_column, title="With Casted Column", prefix="orca")



With Casted Column

Compression Vanilla Parquet
None Parquet none nocdc
Dedup Stats 2.8 MB / 5.3 MB = 52%
Snappy Parquet snappy nocdc
Dedup Stats 1.9 MB / 3.6 MB = 53%

The primary red region indicates the brand new column that was added, while the second red region indicates the updated metadata within the footer. The remainder of the information stays unchanged and isn’t transferred again.



4. Appending Recent Rows and Concatenating Tables

We’re going to append latest rows by concatenating one other slice of the unique dataset to the table.

table = orca[:100_000]
next_10k_rows = orca[100_000:110_000]
table_with_appended_rows = pa.concat_tables([table, next_10k_rows])

assert len(table_with_appended_rows) == 110_000

Now check that only the brand new rows are being uploaded because the original data is already known to the Xet storage layer:

>>> pq.write_table(table_with_appended_rows, "hf://datasets/kszucs/pq/orca-appended-rows.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████| 10.3MB / 10.3MB, 1.36MB/s  
Total Bytes:  106M
Total Transfer:  10.3M
visualize(table, table_with_appended_rows, title="With Appended Rows", prefix="orca")



With Appended Rows

Compression Vanilla Parquet
None Parquet none nocdc
Dedup Stats 173.1 MB / 328.8 MB = 52%
Snappy Parquet snappy nocdc
Dedup Stats 106.5 MB / 201.8 MB = 52%

Since each column gets latest data, we will see multiple red regions. That is as a result of the actual parquet file specification where whole columns are laid out after one another (inside each row group).



5. Inserting / Deleting Rows within the Table

Here comes the difficult part as insertions and deletions are shifting the prevailing rows which result in different columns chunks or data pages within the parquet nomenclature. Since each data page is compressed individually, even a single row insertion or deletion can result in a very different byte-level representation ranging from the edited row(s) to the top of the parquet file.

This parquet specific problem can’t be solved by the Xet storage layer alone, the parquet file itself must be written in a way that minimizes the information page differences even when there are inserted or deleted rows.

Let’s try to make use of the prevailing mechanism and see the way it performs.

table = orca[:100_000]


table_with_deleted_rows = pa.concat_tables([
    orca[:15_000], 
    orca[18_000:60_000],
    orca[61_000:100_000]
])


table_with_inserted_rows = pa.concat_tables([
    orca[:10_000],
    orca[100_000:101_000],
    orca[10_000:50_000],
    orca[101_000:103_000],
    orca[50_000:100_000],
])

assert len(table) == 100_000
assert len(table_with_deleted_rows) == 96_000
assert len(table_with_inserted_rows) == 103_000
>>> pq.write_table(table_with_inserted_rows, "hf://datasets/kszucs/pq/orca-inserted-rows.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████| 89.8MB / 89.8MB, 42.7kB/s  
Total Bytes:  99.1M
Total Transfer:  89.8M
>>> pq.write_table(table_with_deleted_rows, "hf://datasets/kszucs/pq/orca-deleted-rows.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████| 78.2MB / 78.2MB, 46.5kB/s  
Total Bytes:  92.2M
Total Transfer:  78.2M

Also visualize each cases to see the differences:

visualize(table, table_with_deleted_rows, title="Deleted Rows", prefix="orca")
visualize(table, table_with_inserted_rows, title="Inserted Rows", prefix="orca")



Deleted Rows

Compression Vanilla Parquet
None Parquet none nocdc
Dedup Stats 185.3 MB / 306.8 MB = 60%
Snappy Parquet snappy nocdc
Dedup Stats 174.4 MB / 188.3 MB = 92%



Inserted Rows

Compression Vanilla Parquet
None Parquet none nocdc
Dedup Stats 190.1 MB / 318.0 MB = 59%
Snappy Parquet snappy nocdc
Dedup Stats 186.2 MB / 195.2 MB = 95%

We will see that the deduplication performance has dropped significantly (higher ratio), and the deduplication heatmaps show that the compressed parquet files are quite different from one another. That is as a result of the incontrovertible fact that the inserted and deleted rows have shifted the prevailing rows, resulting in different data pages ranging from the edited row(s) to the top of the parquet file.

We will solve this problem by writing parquet files with a brand new pyarrow feature called content-defined chunking (CDC). This feature ensures that the columns are consistently getting chunked into data pages based on their content, similarly how the Xet storage layer deduplicates data but applied to the logical values of the columns before any serialization or compression happens.

The feature could be enabled by passing use_content_defined_chunking=True to the write_parquet function:

import pyarrow.parquet as pq

pq.write_table(table, "hf://user/repo/filename.parquet", use_content_defined_chunking=True)

Pandas also supports the brand new feature:

df.to_parquet("hf://user/repo/filename.parquet", use_content_defined_chunking=True)

Let’s visualize the deduplication difference before and after using the Parquet CDC feature:

visualize(table, table_with_deleted_rows, title="With Deleted Rows", prefix="orca", with_cdc=True)
visualize(table, table_with_inserted_rows, title="With Inserted Rows", prefix="orca", with_cdc=True)



Deleted Rows

Compression Vanilla Parquet CDC Parquet
None Parquet none nocdc Parquet none cdc
Dedup Stats 185.3 MB / 306.8 MB = 60% 162.9 MB / 307.2 MB = 53%
Snappy Parquet snappy nocdc Parquet snappy cdc
Dedup Stats 174.4 MB / 188.3 MB = 92% 104.3 MB / 188.8 MB = 55%



Inserted Rows

Compression Vanilla Parquet CDC Parquet
None Parquet none nocdc Parquet none cdc
Dedup Stats 190.1 MB / 318.0 MB = 59% 164.1 MB / 318.4 MB = 51%
Snappy Parquet snappy nocdc Parquet snappy cdc
Dedup Stats 186.2 MB / 195.2 MB = 95% 102.8 MB / 195.7 MB = 52%

Looks a lot better! For the reason that proof of the pudding is within the eating, let’s actually upload the tables using the content-defined chunking parquet feature and see how much data is transferred.

Note that we’d like to upload the unique table first with content-defined chunking enabled:

>>> pq.write_table(table, "hf://datasets/kszucs/pq/orca-cdc.parquet", use_content_defined_chunking=True)
Recent Data Upload: 100%|███████████████████████████████████████████████| 94.5MB / 94.5MB, 46.5kB/s  
Total Bytes:  96.4M
Total Transfer:  94.5M
>>> pq.write_table(
...     table_with_inserted_rows, 
...     "hf://datasets/kszucs/pq/orca-inserted-rows-cdc.parquet", 
...     use_content_defined_chunking=True
... )
Recent Data Upload: 100%|███████████████████████████████████████████████| 6.00MB / 6.00MB, 1.00MB/s  
Total Bytes:  99.3M
Total Transfer:  6.00M
>>> pq.write_table(
...     table_with_deleted_rows, 
...     "hf://datasets/kszucs/pq/orca-deleted-rows-cdc.parquet", 
...     use_content_defined_chunking=True
... )
Recent Data Upload: 100%|███████████████████████████████████████████████| 7.57MB / 7.57MB, 1.35MB/s  
Total Bytes:  92.4M
Total Transfer:  7.57M

The uploaded data is significantly smaller than before, showing a lot better deduplication performance as highlighted within the heatmaps above.

Essential to notice that the identical performance advantages apply to downloads using the huggingface_hub.hf_hub_download() and datasets.load_dataset() functions.



6. Using Different Row-group Sizes

There are cases depending on the reader/author constraints where larger or smaller row-group sizes may be useful. The parquet author implementations use fixed-sized row-groups by default, within the case of pyarrow the default is 1Mi rows. Dataset writers may change to scale back the row-group size as a way to improve random access performance or to scale back the memory footprint of the reader application.

Changing the row-group size will shift rows between row-groups, shifting values between data pages, so we have now the same problem as with inserting or deleting rows. Let’s compare the deduplication performance between different row-group sizes using the parquet CDC feature:

from de import visualize


table = orca[2_000_000:3_000_000]

visualize(table, (table, {"row_group_size": 128 * 1024}), title="Small Row Groups", with_cdc=True, prefix="orca")
visualize(table, (table, {"row_group_size": 256 * 1024}), title="Medium Row Groups", with_cdc=True, prefix="orca")



Small Row Groups

Compression Vanilla Parquet CDC Parquet
None Parquet none nocdc Parquet none cdc
Dedup Stats 1.6 GB / 3.1 GB = 52% 1.6 GB / 3.1 GB = 50%
Snappy Parquet snappy nocdc Parquet snappy cdc
Dedup Stats 1.1 GB / 1.9 GB = 59% 995.0 MB / 1.9 GB = 51%



Medium Row Groups

Compression Vanilla Parquet CDC Parquet
None Parquet none nocdc Parquet none cdc
Dedup Stats 1.6 GB / 3.1 GB = 51% 1.6 GB / 3.1 GB = 50%
Snappy Parquet snappy nocdc Parquet snappy cdc
Dedup Stats 1.1 GB / 1.9 GB = 57% 976.5 MB / 1.9 GB = 50%



7. Using Various File-Level Splits

Datasets are sometimes split into multiple files to enhance parallelism and random access. Parquet CDC combined with the Xet storage layer can efficiently deduplicate data across multiple files even when the information is split at different boundaries.

Let’s write out the dataset with three different file-level splitting then compare the deduplication performance:

from pathlib import Path
from de import estimate


def write_dataset(table, base_dir, num_shards, **kwargs):
    """Easy utility to put in writing a pyarrow table to multiple Parquet files."""
    
    base_dir = Path(base_dir)
    base_dir.mkdir(parents=True, exist_ok=True)
    
    rows_per_file = len(table) / num_shards
    for i in range(num_shards):
        start = i * rows_per_file
        end = min((i + 1) * rows_per_file, len(table))
        shard = table.slice(start, end - start)
        path = base_dir / f"part-{i}.parquet"
        pq.write_table(shard, path, **kwargs)


write_dataset(orca, "orca5-cdc", num_shards=5, use_content_defined_chunking=True)
write_dataset(orca, "orca10-cdc", num_shards=10, use_content_defined_chunking=True)
write_dataset(orca, "orca20-cdc", num_shards=20, use_content_defined_chunking=True)

estimate("orca5-cdc/*.parquet", "orca10-cdc/*.parquet", "orca20-cdc/*.parquet")
Total size: 9.3 GB
Chunk size: 3.2 GB

Although we uploaded the dataset with three different sharding configurations, the general upload size can be barely larger than the unique dataset size.



Using Parquet CDC feature with Pandas

Up to now we have used PyArrow, let’s explore using the identical CDC feature with Pandas by downloading, filtering then uploading the dataset with the content-defined chunking feature enabled:

import pandas as pd

src = "hf://datasets/teknium/OpenHermes-2.5/openhermes2_5.json"
df = pd.read_json(src)
>>> dst = "hf://datasets/kszucs/pq/hermes-2.5-cdc.parquet"
>>> df.to_parquet(dst, use_content_defined_chunking=True)
Recent Data Upload: 100%|███████████████████████████████████████████████|  799MB /  799MB,  197kB/s  
Total Bytes:  799M
Total Transfer:  799M
>>> short_df = df[[len(c) < 10 for c in df.conversations]]
>>> short_dst = "hf://datasets/kszucs/pq/hermes-2.5-cdc-short.parquet"
>>> short_df.to_parquet(short_dst, use_content_defined_chunking=True)
Recent Data Upload: 100%|███████████████████████████████████████████████| 21.9MB / 21.9MB, 45.4kB/s  
Total Bytes:  801M
Total Transfer:  21.9M
import pyarrow as pa
from de import visualize

visualize(
    pa.Table.from_pandas(df), 
    pa.Table.from_pandas(short_df),
    title="Hermes 2.5 Short Conversations",
    with_cdc=True,
    prefix="hermes"
)



Hermes 2.5 Short Conversations

Compression Vanilla Parquet CDC Parquet
None Parquet none nocdc Parquet none cdc
Dedup Stats 1.9 GB / 3.2 GB = 58% 1.6 GB / 3.2 GB = 51%
Snappy Parquet snappy nocdc Parquet snappy cdc
Dedup Stats 1.5 GB / 1.6 GB = 94% 821.1 MB / 1.6 GB = 51%

Since Parquet CDC is applied on the parquet data page level (column chunk level), the deduplication performance will depend on the filter’s selectivity, or somewhat the distribution of the changes across the dataset. If many of the data pages are affected, then the deduplication ratio will drop significantly.



References

More details in regards to the feature could be found at:



Conclusion

We explored the performance advantages of the brand new Parquet content-defined chunking feature used alongside Hugging Face’s Xet storage layer. We demonstrated how it may well efficiently deduplicate data in various scenarios making parquet operations faster and more storage-efficient. Comparing to traditional cloud storage solutions, the Xet storage layer with Parquet CDC can significantly reduce data transfer times and costs.

Migrate your Hugging Face repositories from Git LFS to Xet to learn from this here: https://huggingface.co/join/xet



Source link

ASK ANA

What are your thoughts on this topic?
Let us know in the comments below.

0 0 votes
Article Rating
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

0
Would love your thoughts, please comment.x
()
x