Introduction
a continuous variable for 4 different products. The machine learning pipeline was in-built Databricks and there are two major components.Â
- Feature preparation in SQL with serverless compute.
- Inference on an ensemble of several hundred models using job clusters to have control over compute power.
The target is to tune the info flow to maximise cluster usage and ensure scalability. Inference is finished on 4 sets of ML models, one set per product. Nevertheless, we’ll deal with how the info is saved as it’ll lay out how much parallelism we will leverage for inference. We is not going to deal with the inner workings of the inference itself.
If there are too few file partitions, the cluster will take an extended time scanning large files and at that time, unless repartitioned (which means added network latency and data shuffling), you could be inferencing on a big set of rows in every partition too. Also leading to long term times.
Nevertheless, business has limited patience to ship out ML pipelines with a direct impact on the org. So tests are limited.
In this text, we’ll review our feature data landscape, then provide an summary of the ML inference, and present the outcomes and discussions of the inference performance based on 4 dataset treatment scenarios:
- Partitioned table, no salt, no row limit in partitionsÂ
- Partitioned table, salted, with 1M row limitÂ
- Liquid-clustered table, no salt, no row limit in partitionsÂ
- Liquid-clustered table, salted, with 1M row limitÂ
Data Landscape
The dataset accommodates features that the set of ML models uses for inference. It has ~550M rows and accommodates 4 products identified within the attribute ProductLine:
- Product A:Â ~10.45M (1.9%)
- Product B:Â ~4.4M (0.8%)
- Product C:Â ~100M (17.6%)
- Product D:Â ~354M (79.7%)
It then has one other low cardinality attribute attrB, that accommodates only two distinct values and is used as a filter to extract subsets of the dataset for each a part of the ML system.
Furthermore, RunDate logs the date when the features were generated. They’re append-only. Finally, the dataset is read using the next query:
SELECT
Id,
ProductLine,
AttrB,
AttrC,
RunDate,
{model_features}
FROM
catalog.schema.FeatureStore
WHERE
ProductLine = :product AND
AttrB = :attributeB AND
RunDate = :RunDate
Salt Implementation
The salting here is generated dynamically. Its purpose is to distribute the info in accordance with the volumes. Which means that large products receive more buckets and smaller products receive fewer buckets. As an example, Product D should receive around 80% of the buckets, given the proportions in the info landscape.
We do that so we will have predictable inference run times and maximize cluster utilization.
# Calculate percentage of every (ProductLine, AttrB) based on row counts
brand_cat_counts = df_demand_price_grid_load.groupBy(
"ProductLine", "AttrB"
).count()
total_count = df_demand_price_grid_load.count()
brand_cat_percents = brand_cat_counts.withColumn(
"percent", F.col("count") / F.lit(total_count)
)
# Collect percentages as dicts with string keys (it will later determine
# the variety of salt buckets each product receives
brand_cat_percent_dict = {
f"{row['ProductLine']}|{row['AttrB']}": row['percent']
for row in brand_cat_percents.collect()
}
# Collect counts as dicts with string keys (it will help
# so as to add an extra bucket if counts is just not divisible by the variety of
# buckets for the product
brand_cat_count_dict = {
f"{row['ProductLine']}|{row['AttrB']}": row['count']
for row in brand_cat_percents.collect()
}
# Helper to flatten key-value pairs for create_map
def dict_to_map_expr(d):
expr = []
for k, v in d.items():
expr.append(F.lit(k))
expr.append(F.lit(v))
return expr
percent_case = F.create_map(*dict_to_map_expr(brand_cat_percent_dict))
count_case = F.create_map(*dict_to_map_expr(brand_cat_count_dict))
# Add string key column in pyspark
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"product_cat_key",
F.concat_ws("|", F.col("ProductLine"), F.col("AttrB"))
)
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"percent", percent_case.getItem(F.col("product_cat_key"))
).withColumn(
"product_count", count_case.getItem(F.col("product_cat_key"))
)
# Set min/max buckets
min_buckets = 10
max_buckets = 1160
# Calculate buckets per row based on (BrandName, price_delta_cat) percentage
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"buckets_base",
(F.lit(min_buckets) + (F.col("percent") * (max_buckets - min_buckets))).forged("int")
)
# Add an additional bucket if brand_count is just not divisible by buckets_base
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"buckets",
F.when(
(F.col("product_count") % F.col("buckets_base")) != 0,
F.col("buckets_base") + 1
).otherwise(F.col("buckets_base"))
)
# Generate salt per row based on (ProductLine, AttrB) bucket count
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"salt",
(F.rand(seed=42) * F.col("buckets")).forged("int")
)
# Perform the repartition using the core attributes and the salt column
df_demand_price_grid_load = df_demand_price_grid_load.repartition(
1200, "AttrB", "ProductLine", "salt"
).drop("product_cat_key", "percent", "brand_count", "buckets_base", "buckets", "salt")
Finally, we save our dataset to the feature table and add a max variety of rows per partition. That is to forestall Spark from generating partitions with too many rows, which it will possibly do even when we now have already computed the salt.
Why can we implement 1M rows? The first focus is on model inference time, not a lot on file size. After a number of tests with 1M, 1.5M, 2M, the primary yields the very best performance in our case. Again, very budget and time-constrained for this project, so we now have to take advantage of our resources.
df_demand_price_grid_load.write
.mode("overwrite")
.option("replaceWhere", f"RunDate = '{params['RunDate']}'")
.option("maxRecordsPerFile", 1_000_000)
.partitionBy("RunDate", "price_delta_cat", "BrandName")
.saveAsTable(f"{params['catalog_revauto']}.{params['schema_revenueautomation']}.demand_features_price_grid")
Why not only depend on Spark’s Adaptive Query Execution (AQE)?
Recall that the first focus is on inference times, not on measurements tuned for normal Spark SQL queries like file size. Using only AQE was actually our initial attempt. As you will notice in the outcomes, the run times were very undesirable and didn’t maximize the cluster utilization given our data proportions.Â
Machine Learning inferenceÂ
There’s a pipeline with 4 tasks, one per product. Every task does the next general steps:
- Loads the features from the corresponding product
- Loads the subset of ML models for the corresponding product
- Performs inference in half the subset sliced byÂ
AttrBÂ - Performs inference in the opposite half sliced byÂ
AttrB - Saves data to the outcomes table
We’ll deal with one among the inference stages to not overwhelm this text with numbers, although the opposite stage could be very similar in structure and results. Furthermore, you possibly can see the DAG for the inference to guage in Fig. 2.

It seems very straightforward, however the run times can vary depending on how your data is saved and the scale of your cluster.Â
Cluster configuration
For the inference stage we’re analyzing, there may be one cluster per product, tuned for the infrastructure limitations of the project, and in addition the distribution of information:
- Product A: 35 employees (Standard_DS14v2, 420 cores)
- Product B: 5 employees (Standard_DS14v2, 70 cores)
- Product C: 1 employee (Standard_DS14v2, 14 cores)
- Product D: 1 employee (Standard_DS14v2, 14 cores)
As well as, AdaptiveQueryExecution is enabled by default, which can let Spark determine learn how to best save the info given the context you provide.
Results and discussion
You will note for every scenario an outline of the variety of file partitions per product and the common variety of rows per partition to present you a sign of what number of rows the ML system will do inference per Spark task. Moreover, we present Spark UI metrics to watch run-time performance and search for the distribution of information at inference time. We’ll do the Spark UI portion just for Product D, which is the most important, to not include an excess of knowledge. As well as, depending on the scenario, inference on Product D becomes a bottleneck in run time. One more reason why it was the first focus of the outcomes.
Non-Salted andÂ
You possibly can see in Fig. 3that the common file partition has tens of tens of millions of rows, which suggests considerable run time for a single executor. The most important on average is Product C with greater than 45M rows in a single partition. The smallest is Product B with roughly 12M average rows.

Fig 4. depict the variety of partitions per product, with a complete of 26 for all. Checking product D, 18 partitions fall very wanting the 420 cores we now have available and on average, every partition will perform inference on ~40M rows.

Take a take a look at Fig 5. In total, the cluster spent 9.9 hours and it still wasn’t complete, as we needed to kill the job, for it was becoming expensive and blocking other people’s tests.

From the summary statistics in Fig. 6 for the tasks that did finish, we will see that there was heavy skew within the partitions for Product D. The utmost input size was ~56M and the runtime was 7.8h.

Non-salted and Liquid
On this scenario, we will observe very similar results by way of average variety of rows per file partition and variety of partitions per product, as seen in Fig. 7 and Fig. 8, respectively.

Product D has 19 file partitions, still very wanting 420 cores.Â

We are able to already anticipate that this experiment was going to be very expensive, so I made a decision to skip the inference test for this scenario. Again, in a great situation, we supply it forward, but there may be a backlog of tickets in my board.
Salty andÂ
After applying the salting and repartition process, we find yourself with ~2.5M average records per partition for products A and B, and ~1M for products C and D as depicted in Fig 9.

Furthermore, we will see in Fig. 10 that the variety of file partitions increased to roughly 860 for product D, which supplies 430 for every inference stage.

This ends in a run time of 3h for inferencing Product D with 360 tasks as seen in Fig 11.

Checking the summary statistics from Fig. 12, the distribution looks balanced with run times around 1.7, but a maximum task taking 3h, which is price further investigating in the longer term.

One great profit is that the salt distributes the info in accordance with the proportions of the products. If we had more availability of resources, we could increase the variety of shuffle partitions in repartition() and add employees in accordance with the proportions of the info. This ensures that our process scales predictably.
Salty and Liquid
This scenario combines the 2 strongest levers we now have explored to this point:
salting to manage file size and parallelism, and liquid clustering to maintain related data colocated without rigid partition boundaries.
After applying the identical salting strategy and a 1M row limit per partition, the liquid-clustered table shows a really similar average partition size to the salted and partitioned case, as shown in Fig 13. Products C and D remain near the 1M rows goal, while products A and B settle barely above that threshold.

Nevertheless, the essential difference appears in how these partitions are distributed and consumed by Spark. As shown in Fig. 14, product D again reaches a high variety of file partitions, providing enough parallelism to saturate the available cores during inference.

Unlike the partitioned counterpart, liquid clustering allows Spark to adapt file layout over time while still benefiting from the salt. This ends in a more even distribution of labor across executors, with fewer extreme outliers in each input size and task duration.
From the summary statistics in Fig. 15, we observe that the vast majority of tasks are accomplished inside a decent runtime window, and the utmost task duration is lower than within the salty and partitioned scenario. This means reduced skew and higher load balancing across the cluster.


A crucial side effect is that liquid clustering preserves data locality for the filtered columns without enforcing strict partition boundaries. This enables Spark to still profit from data skipping, while the salt ensures that no single executor is overwhelmed with tens of tens of millions of rows.
Overall, salty and liquid emerges as probably the most robust setup: it maximizes parallelism, minimizes skew, and reduces operational risk when inference workloads grow or cluster configurations change.
Key Takeaways
- Inference scalability is usually limited by data layout, not model complexity. Poorly sized file partitions can leave tons of of cores idle while a number of executors process tens of tens of millions of rows.
- Partitioning alone is just not enough for large-scale inference. Without controlling file size, partitioned tables can still produce massive partitions that result in long-running, skewed tasks.
- Salting is an efficient tool to unlock parallelism. Introducing a salt key and enforcing a row limit per partition dramatically increases the variety of runnable tasks and stabilizes runtimes.
- Liquid clustering complements salting by reducing skew without rigid boundaries. It allows Spark to adapt file layout over time, making the system more resilient as data grows.
