Let’s be honest: we’ve all been there.
It’s Friday afternoon. You’ve trained a model, validated it, and deployed the inference pipeline. The metrics look green. You close up your laptop for the weekend, and luxuriate in the break.
Monday morning, you might be greeted with the message “Pipeline failed” when checking into work. What’s happening? All the pieces was perfect while you deployed the inference pipeline.
The reality is that the problem could possibly be numerous things. Perhaps the upstream engineering team modified the user_id column from an integer to a string. Or possibly the price column suddenly comprises negative numbers. Or my personal favorite: the column name modified from created_at to createdAt (camelCase strikes again!).
The industry calls this Schema Drift. I call it a headache.
These days, persons are talking loads about Data Contracts. Often, this involves selling you an expensive SaaS platform or a fancy microservices architecture. But if you happen to are only a Data Scientist or Engineer attempting to keep your Python pipelines from exploding, you don’t necessarily need enterprise bloat.
The Tool: Pandera
Let’s undergo tips on how to create an easy data contract in Python using the library Pandera. It’s an open-source Python library that lets you define schemas as class objects. It feels very much like Pydantic (if you happen to’ve used FastAPI), but it surely is built specifically for DataFrames.
To start, you’ll be able to simply install pandera using pip:
pip install pandera
A Real-Life Example: The Marketing Leads Feed
Let’s take a look at a classic scenario. You might be ingesting a CSV file of selling leads from a third-party vendor.
Here’s what we expect the information to seem like:
- id: An integer (have to be unique).
- email: A string (must actually seem like an email).
- signup_date: A legitimate datetime object.
- lead_score: A float between 0.0 and 1.0.
Here is the messy reality of our raw data that we recieve:
import pandas as pd
import numpy as np
# Simulating incoming data that MIGHT break our pipeline
data = {
"id": [101, 102, 103, 104],
"email": ["[email protected]", "[email protected]", "INVALID_EMAIL", "[email protected]"],
"signup_date": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04"],
"lead_score": [0.5, 0.8, 1.5, -0.1] # Note: 1.5 and -0.1 are out of bounds!
}
df = pd.DataFrame(data)
In the event you fed this dataframe right into a model expecting a rating between 0 and 1, your predictions could be garbage. In the event you tried to hitch on id and there have been duplicates, your row counts would explode. Messy data results in messy data science!
Step 1: Define The Contract
As an alternative of writing a dozen if statements to ascertain data quality, we define a SchemaModel. That is our contract.
import pandera as pa
from pandera.typing import Series
class LeadsContract(pa.SchemaModel):
# 1. Check data types and existence
id: Series[int] = pa.Field(unique=True, ge=0)
# 2. Check formatting using
email: Series[str] = pa.Field(str_matches=r"[^@]+@[^@]+.[^@]+")
# 3. Coerce types (convert string dates to datetime objects mechanically)
signup_date: Series[pd.Timestamp] = pa.Field(coerce=True)
# 4. Check business logic (bounds)
lead_score: Series[float] = pa.Field(ge=0.0, le=1.0)
class Config:
# This ensures strictness: if an additional column appears, or one is missing, throw an error.
strict = True
Look over the code above to get the final feel for a way Pandera sets up a contract. You possibly can worry about the small print later while you leaf through the Pandera documentation.
Step 2: Implement The Contract
Now, we’d like to use the contract we made to our data. The naive technique to do that is to run LeadsContract.validate(df). This works, but it surely crashes on the error it finds. In production, you normally need to know that’s improper with the file, not only the primary row.
We are able to enable “lazy” validation to catch all errors without delay.
try:
# lazy=True means "find all errors before crashing"
validated_df = LeadsContract.validate(df, lazy=True)
print("Data passed validation! Proceeding to ETL...")
except pa.errors.SchemaErrors as err:
print("⚠️ Data Contract Breached!")
print(f"Total errors found: {len(err.failure_cases)}")
# Let us take a look at the precise failures
print("nFailure Report:")
print(err.failure_cases[['column', 'check', 'failure_case']])
The Output
In the event you run the code above, you won’t get a generic KeyError. You’re going to get a particular report detailing exactly why the contract was breached:
⚠️ Data Contract Breached!
Total errors found: 3
Failure Report:
column check failure_case
0 email str_matches INVALID_EMAIL
1 lead_score less_than_or_equal_to 1.5
2 lead_score greater_than_or_equal_to -0.1
In a more realistic scenario, you’ll probably log the output to a file and arrange alerts so that you just get notified with something is broken.
Why This Matters
This approach shifts the dynamic of your work.
And not using a contract, your code fails deep contained in the transformation logic (or worse, it doesn’t fail, and also you write bad data to the warehouse). You spend hours debugging NaN values.
With a contract:
- Fail Fast: The pipeline stops on the door. Bad data never enters your core logic.
- Clear Blame: You possibly can send that Failure Report back to the information provider and say, “Rows 3 and 4 violated the schema. Please fix.”
- Documentation: The
LeadsContractclass serves as living documentation. Recent joiners to the project don’t must guess what the columns represent; they will just read the code. You furthermore may avoid establishing a separate data contract in SharePoint, Confluence, or wherever that quickly get outdated.
The “Good Enough” Solution
You possibly can definitely go deeper. You possibly can integrate this with Airflow, push metrics to a dashboard, or use tools like great_expectations for more complex statistical profiling.
But for 90% of the use cases I see, an easy validation step at first of your Python script is sufficient to sleep soundly on a Friday night.
Start small. Define a schema on your messiest dataset, wrap it in a try/catch block, and see what number of headaches it saves you this week. When this easy approach will not be suitable anymore, THEN I might consider more elaborate tools for data contacts.
In the event you are concerned with AI, data science, or data engineering, please follow me or connect on LinkedIn.
