Scaling Statistics: Incremental Standard Deviation in SQL with dbt

-

Why scan yesterday’s data when you possibly can increment today’s?

Image by the creator

SQL aggregation functions may be computationally expensive when applied to large datasets. As datasets grow, recalculating metrics over the whole dataset repeatedly becomes inefficient. To handle this challenge, incremental aggregation is commonly employed — a way that involves maintaining a previous state and updating it with recent incoming data. While this approach is simple for aggregations like COUNT or SUM, the query arises: how can or not it’s applied to more complex metrics like standard deviation?

Standard deviation is a statistical metric that measures the extent of variation or dispersion in a variable’s values relative to its mean.
It’s derived by taking the square root of the variance.
The formula for calculating the variance of a sample is as follows:

Sample variance formula

Calculating standard deviation may be complex, because it involves updating each the mean and the sum of squared differences across all data points. Nevertheless, with algebraic manipulation, we are able to derive a formula for incremental computation — enabling updates using an existing dataset and incorporating recent data seamlessly. This approach avoids recalculating from scratch at any time when recent data is added, making the method rather more efficient (An in depth derivation is out there on my GitHub).

Derived sample variance formula

The formula was principally broken into 3 parts:
1. The present’s set weighted variance
2. The brand new set’s weighted variance
3. The mean difference variance, accounting for between-group variance.

This method enables incremental variance computation by retaining the COUNT (k), AVG (µk), and VAR (Sk) of the prevailing set, and mixing them with the COUNT (n), AVG (µn), and VAR (Sn) of the brand new set. Because of this, the updated standard deviation may be calculated efficiently without rescanning the whole dataset.

Now that we’ve wrapped our heads around the maths behind incremental standard deviation (or a minimum of caught the gist of it), let’s dive into the dbt SQL implementation. In the next example, we’ll walk through find out how to arrange an incremental model to calculate and update these statistics for a user’s transaction data.

Consider a transactions table named stg__transactions, which tracks user transactions (events). Our goal is to create a time-static table, int__user_tx_state, that aggregates the ‘state’ of user transactions. The column details for each tables are provided in the image below.

Image by the creator

To make the method efficient, we aim to update the state table incrementally by combining the brand new incoming transactions data with the prevailing aggregated data (i.e. the present user state). This approach allows us to calculate the updated user state without scanning through all historical data.

Image by the creator

The code below assumes understanding of some dbt concepts, in case you’re unfamiliar with it, chances are you’ll still find a way to grasp the code, although I strongly encourage going through dbt’s incremental guide or read this awesome post.

We’ll construct a full dbt SQL step-by-step, aiming to calculate incremental aggregations efficiently without repeatedly scanning the whole table. The method begins by defining the model as incremental in dbt and using unique_key to update existing rows quite than inserting recent ones.

-- depends_on: {{ ref('stg__transactions') }}
{{ config(materialized='incremental', unique_key=['USER_ID'], incremental_strategy='merge') }}

Next, we fetch records from the stg__transactions table.
The is_incremental block filters transactions with timestamps later than the most recent user update, effectively including “only recent transactions”.

WITH NEW_USER_TX_DATA AS (
SELECT
USER_ID,
TX_ID,
TX_TIMESTAMP,
TX_VALUE
FROM {{ ref('stg__transactions') }}
{% if is_incremental() %}
WHERE TX_TIMESTAMP > COALESCE((select max(UPDATED_AT) from {{ this }}), 0::TIMESTAMP_NTZ)
{% endif %}
)

After retrieving the brand new transaction records, we aggregate them by user, allowing us to incrementally update each user’s state in the next CTEs.

INCREMENTAL_USER_TX_DATA AS (
SELECT
USER_ID,
MAX(TX_TIMESTAMP) AS UPDATED_AT,
COUNT(TX_VALUE) AS INCREMENTAL_COUNT,
AVG(TX_VALUE) AS INCREMENTAL_AVG,
SUM(TX_VALUE) AS INCREMENTAL_SUM,
COALESCE(STDDEV(TX_VALUE), 0) AS INCREMENTAL_STDDEV,
FROM
NEW_USER_TX_DATA
GROUP BY
USER_ID
)

Now we get to the heavy part where we want to truly calculate the aggregations. After we’re not in incremental mode (i.e. we don’t have any “state” rows yet) we simply select the brand new aggregations

NEW_USER_CULMULATIVE_DATA AS (
SELECT
NEW_DATA.USER_ID,
{% if not is_incremental() %}
NEW_DATA.UPDATED_AT AS UPDATED_AT,
NEW_DATA.INCREMENTAL_COUNT AS COUNT_TX,
NEW_DATA.INCREMENTAL_AVG AS AVG_TX,
NEW_DATA.INCREMENTAL_SUM AS SUM_TX,
NEW_DATA.INCREMENTAL_STDDEV AS STDDEV_TX
{% else %}
...

But once we’re in incremental mode, we want to hitch past data and mix it with the brand new data we created within the INCREMENTAL_USER_TX_DATA CTE based on the formula described above.
We start by calculating the brand new SUM, COUNT and AVG:

  ...
{% else %}
COALESCE(EXISTING_USER_DATA.COUNT_TX, 0) AS _n, -- that is n
NEW_DATA.INCREMENTAL_COUNT AS _k, -- that is k
COALESCE(EXISTING_USER_DATA.SUM_TX, 0) + NEW_DATA.INCREMENTAL_SUM AS NEW_SUM_TX, -- recent sum
COALESCE(EXISTING_USER_DATA.COUNT_TX, 0) + NEW_DATA.INCREMENTAL_COUNT AS NEW_COUNT_TX, -- recent count
NEW_SUM_TX / NEW_COUNT_TX AS AVG_TX, -- recent avg
...

We then calculate the variance formula’s three parts

1. The present weighted variance, which is truncated to 0 if the previous set consists of 1 or less items:

    ...
CASE
WHEN _n > 1 THEN (((_n - 1) / (NEW_COUNT_TX - 1)) * POWER(COALESCE(EXISTING_USER_DATA.STDDEV_TX, 0), 2))
ELSE 0
END AS EXISTING_WEIGHTED_VARIANCE, -- existing weighted variance
...

2. The incremental weighted variance in the identical way:

    ...
CASE
WHEN _k > 1 THEN (((_k - 1) / (NEW_COUNT_TX - 1)) * POWER(NEW_DATA.INCREMENTAL_STDDEV, 2))
ELSE 0
END AS INCREMENTAL_WEIGHTED_VARIANCE, -- incremental weighted variance
...

3. The mean difference variance, as outlined earlier, together with SQL join terms to incorporate past data.

    ...
POWER((COALESCE(EXISTING_USER_DATA.AVG_TX, 0) - NEW_DATA.INCREMENTAL_AVG), 2) AS MEAN_DIFF_SQUARED,
CASE
WHEN NEW_COUNT_TX = 1 THEN 0
ELSE (_n * _k) / (NEW_COUNT_TX * (NEW_COUNT_TX - 1))
END AS BETWEEN_GROUP_WEIGHT, -- between group weight
BETWEEN_GROUP_WEIGHT * MEAN_DIFF_SQUARED AS MEAN_DIFF_VARIANCE, -- mean diff variance
EXISTING_WEIGHTED_VARIANCE + INCREMENTAL_WEIGHTED_VARIANCE + MEAN_DIFF_VARIANCE AS VARIANCE_TX,
CASE
WHEN _n = 0 THEN NEW_DATA.INCREMENTAL_STDDEV -- no "past" data
WHEN _k = 0 THEN EXISTING_USER_DATA.STDDEV_TX -- no "recent" data
ELSE SQRT(VARIANCE_TX) -- stddev (which is the basis of variance)
END AS STDDEV_TX,
NEW_DATA.UPDATED_AT AS UPDATED_AT,
NEW_SUM_TX AS SUM_TX,
NEW_COUNT_TX AS COUNT_TX
{% endif %}
FROM
INCREMENTAL_USER_TX_DATA new_data
{% if is_incremental() %}
LEFT JOIN
{{ this }} EXISTING_USER_DATA
ON
NEW_DATA.USER_ID = EXISTING_USER_DATA.USER_ID
{% endif %}
)

Finally, we select the table’s columns, accounting for each incremental and non-incremental cases:

SELECT
USER_ID,
UPDATED_AT,
COUNT_TX,
SUM_TX,
AVG_TX,
STDDEV_TX
FROM NEW_USER_CULMULATIVE_DATA

By combining all these steps, we arrive at the ultimate SQL model:

-- depends_on: {{ ref('stg__initial_table') }}
{{ config(materialized='incremental', unique_key=['USER_ID'], incremental_strategy='merge') }}
WITH NEW_USER_TX_DATA AS (
SELECT
USER_ID,
TX_ID,
TX_TIMESTAMP,
TX_VALUE
FROM {{ ref('stg__initial_table') }}
{% if is_incremental() %}
WHERE TX_TIMESTAMP > COALESCE((select max(UPDATED_AT) from {{ this }}), 0::TIMESTAMP_NTZ)
{% endif %}
),
INCREMENTAL_USER_TX_DATA AS (
SELECT
USER_ID,
MAX(TX_TIMESTAMP) AS UPDATED_AT,
COUNT(TX_VALUE) AS INCREMENTAL_COUNT,
AVG(TX_VALUE) AS INCREMENTAL_AVG,
SUM(TX_VALUE) AS INCREMENTAL_SUM,
COALESCE(STDDEV(TX_VALUE), 0) AS INCREMENTAL_STDDEV,
FROM
NEW_USER_TX_DATA
GROUP BY
USER_ID
),

NEW_USER_CULMULATIVE_DATA AS (
SELECT
NEW_DATA.USER_ID,
{% if not is_incremental() %}
NEW_DATA.UPDATED_AT AS UPDATED_AT,
NEW_DATA.INCREMENTAL_COUNT AS COUNT_TX,
NEW_DATA.INCREMENTAL_AVG AS AVG_TX,
NEW_DATA.INCREMENTAL_SUM AS SUM_TX,
NEW_DATA.INCREMENTAL_STDDEV AS STDDEV_TX
{% else %}
COALESCE(EXISTING_USER_DATA.COUNT_TX, 0) AS _n, -- that is n
NEW_DATA.INCREMENTAL_COUNT AS _k, -- that is k
COALESCE(EXISTING_USER_DATA.SUM_TX, 0) + NEW_DATA.INCREMENTAL_SUM AS NEW_SUM_TX, -- recent sum
COALESCE(EXISTING_USER_DATA.COUNT_TX, 0) + NEW_DATA.INCREMENTAL_COUNT AS NEW_COUNT_TX, -- recent count
NEW_SUM_TX / NEW_COUNT_TX AS AVG_TX, -- recent avg
CASE
WHEN _n > 1 THEN (((_n - 1) / (NEW_COUNT_TX - 1)) * POWER(COALESCE(EXISTING_USER_DATA.STDDEV_TX, 0), 2))
ELSE 0
END AS EXISTING_WEIGHTED_VARIANCE, -- existing weighted variance
CASE
WHEN _k > 1 THEN (((_k - 1) / (NEW_COUNT_TX - 1)) * POWER(NEW_DATA.INCREMENTAL_STDDEV, 2))
ELSE 0
END AS INCREMENTAL_WEIGHTED_VARIANCE, -- incremental weighted variance
POWER((COALESCE(EXISTING_USER_DATA.AVG_TX, 0) - NEW_DATA.INCREMENTAL_AVG), 2) AS MEAN_DIFF_SQUARED,
CASE
WHEN NEW_COUNT_TX = 1 THEN 0
ELSE (_n * _k) / (NEW_COUNT_TX * (NEW_COUNT_TX - 1))
END AS BETWEEN_GROUP_WEIGHT, -- between group weight
BETWEEN_GROUP_WEIGHT * MEAN_DIFF_SQUARED AS MEAN_DIFF_VARIANCE,
EXISTING_WEIGHTED_VARIANCE + INCREMENTAL_WEIGHTED_VARIANCE + MEAN_DIFF_VARIANCE AS VARIANCE_TX,
CASE
WHEN _n = 0 THEN NEW_DATA.INCREMENTAL_STDDEV -- no "past" data
WHEN _k = 0 THEN EXISTING_USER_DATA.STDDEV_TX -- no "recent" data
ELSE SQRT(VARIANCE_TX) -- stddev (which is the basis of variance)
END AS STDDEV_TX,
NEW_DATA.UPDATED_AT AS UPDATED_AT,
NEW_SUM_TX AS SUM_TX,
NEW_COUNT_TX AS COUNT_TX
{% endif %}
FROM
INCREMENTAL_USER_TX_DATA new_data
{% if is_incremental() %}
LEFT JOIN
{{ this }} EXISTING_USER_DATA
ON
NEW_DATA.USER_ID = EXISTING_USER_DATA.USER_ID
{% endif %}
)

SELECT
USER_ID,
UPDATED_AT,
COUNT_TX,
SUM_TX,
AVG_TX,
STDDEV_TX
FROM NEW_USER_CULMULATIVE_DATA

Throughout this process, we demonstrated find out how to handle each non-incremental and incremental modes effectively, leveraging mathematical techniques to update metrics like variance and standard deviation efficiently. By combining historical and recent data seamlessly, we achieved an optimized, scalable approach for real-time data aggregation.

In this text, we explored the mathematical technique for incrementally calculating standard deviation and find out how to implement it using dbt’s incremental models. This approach proves to be highly efficient, enabling the processing of enormous datasets without the necessity to re-scan the whole dataset. In practice, this results in faster, more scalable systems that may handle real-time updates efficiently. When you’d wish to discuss this further or share your thoughts, be happy to achieve out — I’d love to listen to your thoughts!

ASK DUKE

What are your thoughts on this topic?
Let us know in the comments below.

0 0 votes
Article Rating
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

0
Would love your thoughts, please comment.x
()
x