Scaling AI-based Data Processing with Hugging Face + Dask

-



The Hugging Face platform has many datasets and pre-trained models that make using and training state-of-the-art machine learning models increasingly accessible. Nonetheless, it could possibly be hard to scale AI tasks because AI datasets are sometimes large (100s GBs to TBs) and using Hugging Face transformers for model inference can sometimes be computationally expensive.

Dask, a Python library for distributed computing, can handle out-of-core computing (processing data that doesn’t slot in memory) by breaking datasets into manageable chunks. This makes it easy to do things like:

  • Efficient data loading and preprocessing of TB-scale datasets with a straightforward to make use of API that mimics pandas
  • Parallel model inference (with the choice of multi-node GPU inference)

On this post we show an example of knowledge processing from the FineWeb dataset, using the FineWeb-Edu classifier to discover web pages with high educational value. We’ll show:

  • The right way to process 100 rows locally with pandas
  • Scaling to 211 million rows with Dask across multiple GPUs on the cloud



Processing 100 Rows with Pandas

The FineWeb dataset consists of 15 trillion tokens of English web data from Common Crawl, a non-profit that hosts a public web crawl dataset updated monthly. This dataset is usually used for quite a lot of tasks comparable to large language model training, classification, content filtering, and data retrieval across quite a lot of sectors.

It could actually take > 1 minute to download and browse in a single file with pandas on a laptop.

import pandas as pd

df = pd.read_parquet(
    "hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/000_00000.parquet"
)

Next, we’ll use the HF FineWeb-Edu classifier to guage the tutorial value of the net pages in our dataset. Web pages are ranked on a scale from 0 to five, with 0 being not educational and 5 being highly educational. We will use pandas to do that on a smaller, 100-row subset of the information, which takes ~10 seconds on a M1 Mac with a GPU.

from transformers import pipeline

def compute_scores(texts):
    import torch

    
    if torch.cuda.is_available():
        device = torch.device("cuda")
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    pipe = pipeline(
        "text-classification",
        model="HuggingFaceFW/fineweb-edu-classifier",
        device=device
    )
    results = pipe(
        texts.to_list(),
        batch_size=25,                    
        padding="longest",
        truncation=True,
        function_to_apply="none"
    )
    
    return pd.Series([r["score"] for r in results])

df = df[:100]
min_edu_score = 3
df["edu-classifier-score"] = compute_scores(df.text)
df = df[df["edu-classifier-score"] >= min_edu_score]

Note that we also added a step to ascertain the available hardware contained in the compute_scores function, because it’ll be distributed after we scale with Dask in the subsequent step. This makes it easy to go from testing locally on a single machine (either on a CPU or perhaps you’ve gotten a MacBook with an Apple silicon GPU) to distributing across multiple machines (like NVIDIA GPUs).



Scaling to 211 Million Rows with Dask

All the 2024 February/March crawl is 432 GB on disk, or ~715 GB in memory, split up across 250 Parquet files. Even on a machine with enough memory for the entire dataset, this may be prohibitively slow to do serially.

To scale up, we will use Dask DataFrame, which helps you process large tabular data by parallelizing pandas. It closely resembles the pandas API, making it easy to go from testing on a single dataset to scaling out to the complete dataset. Dask works well with Parquet, the default format on Hugging Face datasets, to enable wealthy data types, efficient columnar filtering, and compression.

import dask.dataframe as dd

df = dd.read_parquet(
    
    "hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet" 
)

We’ll apply the compute_scores function for text classification in parallel on our Dask DataFrame using map_partitions, which applies our function in parallel on each pandas DataFrame within the larger Dask DataFrame. The meta argument is restricted to Dask, and indicates the information structure (column names and data types) of the output.

from transformers import pipeline

def compute_scores(texts):
    import torch

    
    if torch.cuda.is_available():
        device = torch.device("cuda")
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    pipe = pipeline(
        "text-classification",
        model="HuggingFaceFW/fineweb-edu-classifier",
        device=device,
    )
    results = pipe(
        texts.to_list(),
        batch_size=768,
        padding="longest",
        truncation=True,
        function_to_apply="none",
    )

    return pd.Series([r["score"] for r in results])

min_edu_score = 3
df["edu-classifier-score"] = df.text.map_partitions(compute_scores, meta=pd.Series([0]))
df = df[df["edu-classifier-score"] >= min_edu_score]

Note that we’ve picked a batch_size that works well for this instance, but you’ll likely wish to customize this depending on the hardware, data, and model you’re using in your individual workflows (see the HF docs on pipeline batching).

Now that we’ve identified the rows of the dataset we’re occupied with, we will save the result for other downstream analyses. Dask DataFrame robotically supports distributed writing to Parquet. Hugging Face uses commits to trace dataset changes and allows writing a Dask DataFrame in parallel.

repo_id = "/"  
df.to_parquet(f"hf://datasets/{repo_id}")

Since this creates one commit per file, it’s endorsed to squash the history after the upload:

from huggingface_hub import HfApi

HfApi().super_squash_history(repo_id=repo_id, repo_type="dataset")

Alternatively you need to use this tradition function which uploads multiple files per commit.



Multi-GPU Parallel Model Inference

There are a lot of ways to deploy Dask on quite a lot of hardware. Here, we’ll use Coiled to deploy Dask on the cloud so we will spin up VMs as needed, after which clean them up after we’re done.

cluster = coiled.Cluster(
    region="us-east-1",                 
    n_workers=100,                      
    spot_policy="spot_with_fallback",   
    worker_vm_types="g5.xlarge",        
    worker_options={"nthreads": 1},
)
client = cluster.get_client()

Under the hood Coiled handles:

  • Provisioning cloud VMs with GPU hardware. On this case, g5.xlarge instances on AWS.
  • Establishing the suitable NVIDIA drivers, CUDA runtime, etc.
  • Routinely installing the identical packages you’ve gotten locally on the cloud VM with package sync. This includes Python files in your working directory.

The workflow took ~5 hours to finish and we had good GPU hardware utilization.

Median GPU utilization is 100% and median memory usage is 21.5 GB, just under the 24 GB available on the GPU.
GPU utilization and memory usage are each near their maximum capability, which implies we’re utilizing the available hardware well.

Putting all of it together, here is the whole workflow:

import dask.dataframe as dd
from transformers import pipeline
from huggingface_hub import HfApi
import os
import coiled

cluster = coiled.Cluster(
    region="us-east-1",
    n_workers=100,
    spot_policy="spot_with_fallback",
    worker_vm_types="g5.xlarge",
    worker_options={"nthreads": 1},
)
client = cluster.get_client()
cluster.send_private_envs(
    {"HF_TOKEN": ""}             
)

df = dd.read_parquet(
    "hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet"
)

def compute_scores(texts):
    import torch

    
    if torch.cuda.is_available():
        device = torch.device("cuda")
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    pipe = pipeline(
        "text-classification",
        model="HuggingFaceFW/fineweb-edu-classifier",
        device=device
    )
    results = pipe(
        texts.to_list(),
        batch_size=768,
        padding="longest",
        truncation=True,
        function_to_apply="none"
    )

    return pd.Series([r["score"] for r in results])

min_edu_score = 3
df["edu-classifier-score"] = df.text.map_partitions(compute_scores, meta=pd.Series([0]))
df = df[df["edu-classifier-score"] >= min_edu_score]

repo_id = "/"  
df.to_parquet(f"hf://datasets/{repo_id}")

HfApi().super_squash_history(repo_id=repo_id, repo_type="dataset")  



Conclusion

Hugging Face + Dask is a strong combination. In this instance, we scaled up our classification task from 100 rows to 211 million rows through the use of Dask + Coiled to run the workflow in parallel across multiple GPUs on the cloud.

This same sort of workflow may be used for other use cases like:

  • Filtering genomic data to pick out genes of interest
  • Extracting information from unstructured text and turning them into structured datasets
  • Cleansing text data scraped from the web or Common Crawl
  • Running multimodal model inference to research large audio, image, or video datasets



Source link

ASK ANA

What are your thoughts on this topic?
Let us know in the comments below.

0 0 votes
Article Rating
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

0
Would love your thoughts, please comment.x
()
x