Reduce Parquet file upload and download times on Hugging Face Hub by leveraging the brand new Xet storage layer and Apache Arrow’s Parquet Content-Defined Chunking (CDC) feature enabling more efficient and scalable data workflows.
TL;DR: Parquet Content-Defined Chunking (CDC) is now available in PyArrow and Pandas, enabling efficient deduplication of Parquet files on content-addressable storage systems like Hugging Face’s Xet storage layer. CDC dramatically reduces data transfer and storage costs by uploading or downloading only the modified data chunks. Enable CDC by passing the use_content_defined_chunking argument:
import pandas as pd
import pyarrow.parquet as pq
df.to_parquet("hf://datasets/{user}/{repo}/path.parquet", use_content_defined_chunking=True)
pq.write_table(table, "hf://datasets/{user}/{repo}/path.parquet", use_content_defined_chunking=True)
Table of Contents
Introduction
Apache Parquet is a columnar storage format that’s widely utilized in the information engineering community.
As of today, Hugging Face hosts nearly 21 PB of datasets, with Parquet files alone accounting for over 4 PB of that storage. Optimizing Parquet storage is subsequently a high priority.
Hugging Face has introduced a brand new storage layer called Xet that leverages content-defined chunking to efficiently deduplicate chunks of knowledge reducing storage costs and improving download/upload speeds.
While Xet is format agnostic, Parquet’s layout and column-chunk (data page) based compression can produce entirely different byte-level representations for data with minor changes, resulting in suboptimal deduplication performance. To handle this, the Parquet files must be written in a way that minimizes the byte-level differences between similar data, which is where content-defined chunking (CDC) comes into play.
Let’s explore the performance advantages of the brand new Parquet CDC feature used alongside Hugging Face’s Xet storage layer.
Data Preparation
For demonstration purposes, we are going to use a manageable sized subset of OpenOrca dataset.
import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
from huggingface_hub import hf_hub_download
def shuffle_table(table, seed=40):
rng = np.random.default_rng(seed)
indices = rng.permutation(len(table))
return table.take(indices)
path = hf_hub_download(
repo_id="Open-Orca/OpenOrca",
filename="3_5M-GPT3_5-Augmented.parquet",
repo_type="dataset"
)
orca = pq.read_table(path, schema=pa.schema([
pa.field("id", pa.string()),
pa.field("system_prompt", pa.string()),
pa.field("question", pa.large_string()),
pa.field("response", pa.large_string()),
]))
orca = orca.add_column(
orca.schema.get_field_index("query"),
"question_length",
pc.utf8_length(orca["question"])
)
orca = orca.add_column(
orca.schema.get_field_index("response"),
"response_length",
pc.utf8_length(orca["response"])
)
orca = shuffle_table(orca)
table = orca[:100_000]
table[:3].to_pandas()
| id | system_prompt | question_length | query | response_length | response | |
|---|---|---|---|---|---|---|
| 0 | cot.64099 | You might be an AI assistant that helps people find… | 241 | Consider the query. What’s the euphrates l… | 1663 | The query is asking what the Euphrates Rive… |
| 1 | flan.1206442 | You might be an AI assistant. You shall be given a t… | 230 | Single/multi-select query: Is it possible t… | 751 | It isn’t possible to conclude that the cowboy… |
| 2 | t0.1170225 | You might be an AI assistant. User will you give yo… | 1484 | Q:I’m taking a test and should guess the righ… | 128 | The passage mainly tells us what things are im… |
Upload the table as a Parquet file to Hugging Face Hub
Since pyarrow>=21.0.0 we will use Hugging Face URIs within the pyarrow functions to directly read and write parquet (and other file formats) files to the Hub using the hf:// URI scheme.
>>> pq.write_table(table, "hf://datasets/kszucs/pq/orca.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████| 96.1MB / 96.1MB, 48.0kB/s
Total Bytes: 96.1M
Total Transfer: 96.1M
We will see that the table has been uploaded entirely (total bytes == total transfer) as latest data since it isn’t known to the Xet storage layer yet. Now read it back as a pyarrow table:
downloaded_table = pq.read_table("hf://datasets/kszucs/pq/orca.parquet")
assert downloaded_table.equals(table)
Note that each one pyarrow functions that accept a file path also accept a Hugging Face URI, like pyarrow datasets,
CSV functions, incremental Parquet author or reading only the parquet metadata:
pq.read_metadata("hf://datasets/kszucs/pq/orca.parquet")
created_by: parquet-cpp-arrow version 21.0.0-SNAPSHOT
num_columns: 6
num_rows: 100000
num_row_groups: 1
format_version: 2.6
serialized_size: 4143
Different Use Cases for Parquet Deduplication
To show the effectiveness of the content-defined chunking feature, we are going to check out the way it performs in case of:
- Re-uploading exact copies of the table
- Adding/removing columns from the table
- Changing column types within the table
- Appending latest rows and concatenating tables
- Inserting / deleting rows within the table
- Change row-group size of the table
- Using Various File-Level Splits
1. Re-uploading Exact Copies of the Table
While this use case sounds trivial, traditional file systems don’t deduplicate files leading to full re-upload and re-download of the information. In contrast, a system utilizing content-defined chunking can recognize that the file content is an identical and avoid unnecessary data transfer.
>>> pq.write_table(table, "hf://datasets/kszucs/pq/orca-copy.parquet")
Recent Data Upload: | | 0.00B / 0.00B, 0.00B/s
Total Bytes: 96.1M
Total Transfer: 0.00
We will see that no latest data has been uploaded, and the operation was instantaneous. Now let’s have a look at what happens if we upload the identical file again but to a distinct repository:
>>> pq.write_table(table, "hf://datasets/kszucs/pq-copy/orca-copy-again.parquet")
Recent Data Upload: | | 0.00B / 0.00B, 0.00B/s
Total Bytes: 96.1M
Total Transfer: 0.00
The upload was instantaneous again since deduplication works across repositories as well. It is a key feature of the Xet storage layer, allowing efficient data sharing and collaboration. You may read more about the main points and scaling challenges within the From Chunks to Blocks: Accelerating Uploads and Downloads on the Hub blog post.
2. Adding and Removing Columns from the Table
First write out the unique and adjusted tables to local parquet files to see their sizes:
table_with_new_columns = table.add_column(
table.schema.get_field_index("response"),
"response_short",
pc.utf8_slice_codeunits(table["response"], 0, 10)
)
table_with_removed_columns = table.drop(["response"])
pq.write_table(table, "/tmp/original.parquet")
pq.write_table(table_with_new_columns, "/tmp/with-new-columns.parquet")
pq.write_table(table_with_removed_columns, "/tmp/with-removed-columns.parquet")
!ls -lah /tmp/*.parquet
-rw-r--r-- 1 kszucs wheel 92M Jul 22 14:47 /tmp/original.parquet
-rw-r--r-- 1 kszucs wheel 92M Jul 22 14:47 /tmp/with-new-columns.parquet
-rw-r--r-- 1 kszucs wheel 67M Jul 22 14:47 /tmp/with-removed-columns.parquet
Now upload them to Hugging Face to see how much data is definitely transferred:
>>> pq.write_table(table_with_new_columns, "hf://datasets/kszucs/pq/orca-added-columns.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████| 575kB / 575kB, 288kB/s
Total Bytes: 96.6M
Total Transfer: 575k
We will see that only the brand new columns and the brand new parquet metadata placed within the file’s footer were uploaded, while the unique data was not transferred again. It is a huge advantage of the Xet storage layer, because it allows us to efficiently add latest columns without transferring all the dataset again.
Same applies to removing columns, as we will see below:
>>> pq.write_table(table_with_removed_columns, "hf://datasets/kszucs/pq/orca-removed-columns.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████| 37.7kB / 37.7kB, 27.0kB/s
Total Bytes: 70.6M
Total Transfer: 37.7k
To have a greater understanding of what has been uploaded, we will visualize the differences between the 2 parquet files using the deduplication estimation tool:
from de import visualize
visualize(table, table_with_new_columns, title="With Recent Columns", prefix="orca")
With Recent Columns
| Compression | Vanilla Parquet |
|---|---|
| None | ![]() |
| Dedup Stats | 157.4 MB / 313.8 MB = 50% |
| Snappy | ![]() |
| Dedup Stats | 96.7 MB / 192.7 MB = 50% |
Adding two latest columns mean that we have now unseen data pages which should be transferred (highlighted in red), but the remaining of the information stays unchanged (highlighted in green), so it isn’t transferred again. Note the small red area within the footer metadata which nearly all the time changes as we modify the parquet file. The dedup stats show where smaller ratios mean higher deduplication performance.
Also visualize the difference after removing a column:
visualize(table, table_with_removed_columns, title="With Removed Columns", prefix="orca")
With Removed Columns
| Compression | Vanilla Parquet |
|---|---|
| None | ![]() |
| Dedup Stats | 156.6 MB / 269.4 MB = 58% |
| Snappy | ![]() |
| Dedup Stats | 96.1 MB / 166.7 MB = 57% |
Since we’re removing entire columns we will only see changes within the footer metadata, all the opposite columns remain unchanged and already existing within the storage layer, in order that they aren’t transferred again.
3. Changing Column Types within the Table
One other common use case is changing the column types within the table e.g. to scale back the storage size or to optimize the information for specific queries. Let’s change the question_length column from int64 data type to int32 and see how much data is transferred:
table_without_text = table_with_new_columns.drop(["question", "response"])
table_with_casted_column = table_without_text.set_column(
table_without_text.schema.get_field_index("question_length"),
"question_length",
table_without_text["question_length"].forged("int32")
)
>>> pq.write_table(table_with_casted_column, "hf://datasets/kszucs/pq/orca-casted-column.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████| 181kB / 181kB, 113kB/s
Total Bytes: 1.80M
Total Transfer: 181k
Again, we will see that only the brand new column and the updated parquet metadata were uploaded. Now visualize the deduplication heatmap:
visualize(table_without_text, table_with_casted_column, title="With Casted Column", prefix="orca")
With Casted Column
| Compression | Vanilla Parquet |
|---|---|
| None | ![]() |
| Dedup Stats | 2.8 MB / 5.3 MB = 52% |
| Snappy | ![]() |
| Dedup Stats | 1.9 MB / 3.6 MB = 53% |
The primary red region indicates the brand new column that was added, while the second red region indicates the updated metadata within the footer. The remainder of the information stays unchanged and isn’t transferred again.
4. Appending Recent Rows and Concatenating Tables
We’re going to append latest rows by concatenating one other slice of the unique dataset to the table.
table = orca[:100_000]
next_10k_rows = orca[100_000:110_000]
table_with_appended_rows = pa.concat_tables([table, next_10k_rows])
assert len(table_with_appended_rows) == 110_000
Now check that only the brand new rows are being uploaded because the original data is already known to the Xet storage layer:
>>> pq.write_table(table_with_appended_rows, "hf://datasets/kszucs/pq/orca-appended-rows.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████| 10.3MB / 10.3MB, 1.36MB/s
Total Bytes: 106M
Total Transfer: 10.3M
visualize(table, table_with_appended_rows, title="With Appended Rows", prefix="orca")
With Appended Rows
| Compression | Vanilla Parquet |
|---|---|
| None | ![]() |
| Dedup Stats | 173.1 MB / 328.8 MB = 52% |
| Snappy | ![]() |
| Dedup Stats | 106.5 MB / 201.8 MB = 52% |
Since each column gets latest data, we will see multiple red regions. That is as a result of the actual parquet file specification where whole columns are laid out after one another (inside each row group).
5. Inserting / Deleting Rows within the Table
Here comes the difficult part as insertions and deletions are shifting the prevailing rows which result in different columns chunks or data pages within the parquet nomenclature. Since each data page is compressed individually, even a single row insertion or deletion can result in a very different byte-level representation ranging from the edited row(s) to the top of the parquet file.
This parquet specific problem can’t be solved by the Xet storage layer alone, the parquet file itself must be written in a way that minimizes the information page differences even when there are inserted or deleted rows.
Let’s try to make use of the prevailing mechanism and see the way it performs.
table = orca[:100_000]
table_with_deleted_rows = pa.concat_tables([
orca[:15_000],
orca[18_000:60_000],
orca[61_000:100_000]
])
table_with_inserted_rows = pa.concat_tables([
orca[:10_000],
orca[100_000:101_000],
orca[10_000:50_000],
orca[101_000:103_000],
orca[50_000:100_000],
])
assert len(table) == 100_000
assert len(table_with_deleted_rows) == 96_000
assert len(table_with_inserted_rows) == 103_000
>>> pq.write_table(table_with_inserted_rows, "hf://datasets/kszucs/pq/orca-inserted-rows.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████| 89.8MB / 89.8MB, 42.7kB/s
Total Bytes: 99.1M
Total Transfer: 89.8M
>>> pq.write_table(table_with_deleted_rows, "hf://datasets/kszucs/pq/orca-deleted-rows.parquet")
Recent Data Upload: 100%|███████████████████████████████████████████████| 78.2MB / 78.2MB, 46.5kB/s
Total Bytes: 92.2M
Total Transfer: 78.2M
Also visualize each cases to see the differences:
visualize(table, table_with_deleted_rows, title="Deleted Rows", prefix="orca")
visualize(table, table_with_inserted_rows, title="Inserted Rows", prefix="orca")
Deleted Rows
| Compression | Vanilla Parquet |
|---|---|
| None | ![]() |
| Dedup Stats | 185.3 MB / 306.8 MB = 60% |
| Snappy | ![]() |
| Dedup Stats | 174.4 MB / 188.3 MB = 92% |
Inserted Rows
| Compression | Vanilla Parquet |
|---|---|
| None | ![]() |
| Dedup Stats | 190.1 MB / 318.0 MB = 59% |
| Snappy | ![]() |
| Dedup Stats | 186.2 MB / 195.2 MB = 95% |
We will see that the deduplication performance has dropped significantly (higher ratio), and the deduplication heatmaps show that the compressed parquet files are quite different from one another. That is as a result of the incontrovertible fact that the inserted and deleted rows have shifted the prevailing rows, resulting in different data pages ranging from the edited row(s) to the top of the parquet file.
We will solve this problem by writing parquet files with a brand new pyarrow feature called content-defined chunking (CDC). This feature ensures that the columns are consistently getting chunked into data pages based on their content, similarly how the Xet storage layer deduplicates data but applied to the logical values of the columns before any serialization or compression happens.
The feature could be enabled by passing use_content_defined_chunking=True to the write_parquet function:
import pyarrow.parquet as pq
pq.write_table(table, "hf://user/repo/filename.parquet", use_content_defined_chunking=True)
Pandas also supports the brand new feature:
df.to_parquet("hf://user/repo/filename.parquet", use_content_defined_chunking=True)
Let’s visualize the deduplication difference before and after using the Parquet CDC feature:
visualize(table, table_with_deleted_rows, title="With Deleted Rows", prefix="orca", with_cdc=True)
visualize(table, table_with_inserted_rows, title="With Inserted Rows", prefix="orca", with_cdc=True)
Deleted Rows
| Compression | Vanilla Parquet | CDC Parquet |
|---|---|---|
| None | ![]() |
![]() |
| Dedup Stats | 185.3 MB / 306.8 MB = 60% | 162.9 MB / 307.2 MB = 53% |
| Snappy | ![]() |
![]() |
| Dedup Stats | 174.4 MB / 188.3 MB = 92% | 104.3 MB / 188.8 MB = 55% |
Inserted Rows
| Compression | Vanilla Parquet | CDC Parquet |
|---|---|---|
| None | ![]() |
![]() |
| Dedup Stats | 190.1 MB / 318.0 MB = 59% | 164.1 MB / 318.4 MB = 51% |
| Snappy | ![]() |
![]() |
| Dedup Stats | 186.2 MB / 195.2 MB = 95% | 102.8 MB / 195.7 MB = 52% |
Looks a lot better! For the reason that proof of the pudding is within the eating, let’s actually upload the tables using the content-defined chunking parquet feature and see how much data is transferred.
Note that we’d like to upload the unique table first with content-defined chunking enabled:
>>> pq.write_table(table, "hf://datasets/kszucs/pq/orca-cdc.parquet", use_content_defined_chunking=True)
Recent Data Upload: 100%|███████████████████████████████████████████████| 94.5MB / 94.5MB, 46.5kB/s
Total Bytes: 96.4M
Total Transfer: 94.5M
>>> pq.write_table(
... table_with_inserted_rows,
... "hf://datasets/kszucs/pq/orca-inserted-rows-cdc.parquet",
... use_content_defined_chunking=True
... )
Recent Data Upload: 100%|███████████████████████████████████████████████| 6.00MB / 6.00MB, 1.00MB/s
Total Bytes: 99.3M
Total Transfer: 6.00M
>>> pq.write_table(
... table_with_deleted_rows,
... "hf://datasets/kszucs/pq/orca-deleted-rows-cdc.parquet",
... use_content_defined_chunking=True
... )
Recent Data Upload: 100%|███████████████████████████████████████████████| 7.57MB / 7.57MB, 1.35MB/s
Total Bytes: 92.4M
Total Transfer: 7.57M
The uploaded data is significantly smaller than before, showing a lot better deduplication performance as highlighted within the heatmaps above.
Essential to notice that the identical performance advantages apply to downloads using the huggingface_hub.hf_hub_download() and datasets.load_dataset() functions.
6. Using Different Row-group Sizes
There are cases depending on the reader/author constraints where larger or smaller row-group sizes may be useful. The parquet author implementations use fixed-sized row-groups by default, within the case of pyarrow the default is 1Mi rows. Dataset writers may change to scale back the row-group size as a way to improve random access performance or to scale back the memory footprint of the reader application.
Changing the row-group size will shift rows between row-groups, shifting values between data pages, so we have now the same problem as with inserting or deleting rows. Let’s compare the deduplication performance between different row-group sizes using the parquet CDC feature:
from de import visualize
table = orca[2_000_000:3_000_000]
visualize(table, (table, {"row_group_size": 128 * 1024}), title="Small Row Groups", with_cdc=True, prefix="orca")
visualize(table, (table, {"row_group_size": 256 * 1024}), title="Medium Row Groups", with_cdc=True, prefix="orca")
Small Row Groups
| Compression | Vanilla Parquet | CDC Parquet |
|---|---|---|
| None | ![]() |
![]() |
| Dedup Stats | 1.6 GB / 3.1 GB = 52% | 1.6 GB / 3.1 GB = 50% |
| Snappy | ![]() |
![]() |
| Dedup Stats | 1.1 GB / 1.9 GB = 59% | 995.0 MB / 1.9 GB = 51% |
Medium Row Groups
| Compression | Vanilla Parquet | CDC Parquet |
|---|---|---|
| None | ![]() |
![]() |
| Dedup Stats | 1.6 GB / 3.1 GB = 51% | 1.6 GB / 3.1 GB = 50% |
| Snappy | ![]() |
![]() |
| Dedup Stats | 1.1 GB / 1.9 GB = 57% | 976.5 MB / 1.9 GB = 50% |
7. Using Various File-Level Splits
Datasets are sometimes split into multiple files to enhance parallelism and random access. Parquet CDC combined with the Xet storage layer can efficiently deduplicate data across multiple files even when the information is split at different boundaries.
Let’s write out the dataset with three different file-level splitting then compare the deduplication performance:
from pathlib import Path
from de import estimate
def write_dataset(table, base_dir, num_shards, **kwargs):
"""Easy utility to put in writing a pyarrow table to multiple Parquet files."""
base_dir = Path(base_dir)
base_dir.mkdir(parents=True, exist_ok=True)
rows_per_file = len(table) / num_shards
for i in range(num_shards):
start = i * rows_per_file
end = min((i + 1) * rows_per_file, len(table))
shard = table.slice(start, end - start)
path = base_dir / f"part-{i}.parquet"
pq.write_table(shard, path, **kwargs)
write_dataset(orca, "orca5-cdc", num_shards=5, use_content_defined_chunking=True)
write_dataset(orca, "orca10-cdc", num_shards=10, use_content_defined_chunking=True)
write_dataset(orca, "orca20-cdc", num_shards=20, use_content_defined_chunking=True)
estimate("orca5-cdc/*.parquet", "orca10-cdc/*.parquet", "orca20-cdc/*.parquet")
Total size: 9.3 GB
Chunk size: 3.2 GB
Although we uploaded the dataset with three different sharding configurations, the general upload size can be barely larger than the unique dataset size.
Using Parquet CDC feature with Pandas
Up to now we have used PyArrow, let’s explore using the identical CDC feature with Pandas by downloading, filtering then uploading the dataset with the content-defined chunking feature enabled:
import pandas as pd
src = "hf://datasets/teknium/OpenHermes-2.5/openhermes2_5.json"
df = pd.read_json(src)
>>> dst = "hf://datasets/kszucs/pq/hermes-2.5-cdc.parquet"
>>> df.to_parquet(dst, use_content_defined_chunking=True)
Recent Data Upload: 100%|███████████████████████████████████████████████| 799MB / 799MB, 197kB/s
Total Bytes: 799M
Total Transfer: 799M
>>> short_df = df[[len(c) < 10 for c in df.conversations]]
>>> short_dst = "hf://datasets/kszucs/pq/hermes-2.5-cdc-short.parquet"
>>> short_df.to_parquet(short_dst, use_content_defined_chunking=True)
Recent Data Upload: 100%|███████████████████████████████████████████████| 21.9MB / 21.9MB, 45.4kB/s
Total Bytes: 801M
Total Transfer: 21.9M
import pyarrow as pa
from de import visualize
visualize(
pa.Table.from_pandas(df),
pa.Table.from_pandas(short_df),
title="Hermes 2.5 Short Conversations",
with_cdc=True,
prefix="hermes"
)
Hermes 2.5 Short Conversations
| Compression | Vanilla Parquet | CDC Parquet |
|---|---|---|
| None | ![]() |
![]() |
| Dedup Stats | 1.9 GB / 3.2 GB = 58% | 1.6 GB / 3.2 GB = 51% |
| Snappy | ![]() |
![]() |
| Dedup Stats | 1.5 GB / 1.6 GB = 94% | 821.1 MB / 1.6 GB = 51% |
Since Parquet CDC is applied on the parquet data page level (column chunk level), the deduplication performance will depend on the filter’s selectivity, or somewhat the distribution of the changes across the dataset. If many of the data pages are affected, then the deduplication ratio will drop significantly.
References
More details in regards to the feature could be found at:
Conclusion
We explored the performance advantages of the brand new Parquet content-defined chunking feature used alongside Hugging Face’s Xet storage layer. We demonstrated how it may well efficiently deduplicate data in various scenarios making parquet operations faster and more storage-efficient. Comparing to traditional cloud storage solutions, the Xet storage layer with Parquet CDC can significantly reduce data transfer times and costs.
Migrate your Hugging Face repositories from Git LFS to Xet to learn from this here: https://huggingface.co/join/xet




























