an actual issue when coping with very large datasets. What I mean by “very large” is data that exceeds the capability of a single machine’s RAM.
A few of the key friction points Pandas users face include:
In-Memory Constraints
Pandas requires the whole dataset it’s processing to be within the machine’s Random Access Memory (RAM). It might probably’t easily process data stored on a hard disk unless it’s first loaded, and if that data is simply too big in your memory, you get problems.
For instance, when you attempt to load a 100GB CSV file into Pandas on a normal laptop with 16GB of RAM, the code will crash immediately.
And, it isn’t only a 1:1 ratio. Because of knowledge types and object overhead, Pandas normally requires several multiples of the RAM required by the file’s on-disk size. With 16GB of RAM, your file size limit could also be as little as 3-4 GB.
Single-Threaded Execution
Pandas was designed for convenience and evaluation, not raw performance scale. By default, Pandas executes operations on a single CPU core. Even when a user is running their code on a robust server with 64 cores, Pandas will largely utilise just one, leaving the others idle.
Eager Execution vs. Lazy Evaluation
Pandas uses Eager Execution, meaning it performs calculations as soon because the code is run. Big Data tools (like Apache Spark) use Lazy Evaluation. The latter is usually more performant than eager execution because when there’s a series of steps required to perform some task, lazy evaluation can take a look at all of the steps and the required final result and optimise appropriately. Eager execution can’t do this. It blindly executes each step in turn, regardless of what.
Vertical Scaling Limits
To make Pandas work with larger datasets, you need to depend on Vertical Scaling (buying a dearer computer with more RAM and a faster CPU). But this may only take you thus far. As an example, Pandas has no native ability to “talk” to a cluster. It cannot distribute a dataframe across multiple machines.
So what to do?
As all the time within the IT world, several solutions present themselves. Three of the most well-liked alternatives are:-
1/ Dask or Ray
These are third-party libraries that show you how to to jot down distributed code that may run across clusters of computers. While these try and mimic the Pandas API, they still have subtle differences and limitations that may require code refactoring.
2/ Spark: One other distributed compute engine. Requires a unique syntax and a unique mental model.
3/ RDBMS: Requires moving your data right into a database and learning SQL.
The entire above options require quite a little bit of work to implement, but for the remainder of this text, I’ll focus on option 2.
So, let’s say I’ve convinced you, or not less than piqued your interest, and also you’re considering moving some or your whole existing Pandas-based processing to PySpark. What should your next move be? Well, you’ll need to start out converting some or your whole codebase. That could possibly be daunting, but don’t worry, I’ve got you covered.
Read on as I take you thru a bunch of example code snippets that showcase some typical data processing operations, from easy to more complex. I’m sure you’ll recognise a few of these patterns in your personal code. I’ll show you the Pandas way of doing things and replicate it in PySpark, providing output and timing comparisons between the 2.
Organising the dev environment
I’m running Ubuntu on WSL2. First, we’ll arrange a separate development environment for this work, ensuring our projects are siloed and don’t interfere with one another. I’m using Conda for this part, but be happy to make use of whichever method you’re accustomed to.
Install PySpark, Pandas, etc.
(base) $ conda create -n pandas_to_pyspark python=3.11 -y
(base) $ conda activate pandas_to_pyspark
(pands_to_pyspark) $ conda install jupyter polars pyarrow pandas -y
(pands_to_pyspark) $ conda install -c conda-forge pyspark
To examine that PySpark has been installed accurately, type the pyspark command right into a terminal window.
(pands_to_pyspark) pyspark
Python 3.11.14 | packaged by conda-forge | (principal, Oct 22 2025, 22:46:25) [GCC 14.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
WARNING: Using incubator modules: jdk.incubator.vector
WARNING: package sun.security.motion not in java.base
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/15 16:15:21 WARN Utils: Your hostname, tpr-desktop, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 as an alternative (on interface lo)
26/01/15 16:15:21 WARN Utils: Set SPARK_LOCAL_IP if you might want to bind to a different address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To regulate logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/15 16:15:22 WARN NativeCodeLoader: Unable to load native-hadoop library in your platform... using builtin-java classes where applicable
WARNING: A terminally deprecated method in sun.misc.Unsafe has been called
WARNING: sun.misc.Unsafe::arrayBaseOffset has been called by org.apache.spark.unsafe.Platform (file:/home/tom/miniconda3/envs/pandas_to_pyspark/lib/python3.11/site-packages/pyspark/jars/spark-unsafe_2.13-4.1.1.jar)
WARNING: Please consider reporting this to the maintainers of sophistication org.apache.spark.unsafe.Platform
WARNING: sun.misc.Unsafe::arrayBaseOffset will probably be removed in a future release
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/__ / .__/_,_/_/ /_/_ version 4.1.1
/_/
Using Python version 3.11.14 (principal, Oct 22 2025 22:46:25)
Spark context Web UI available at http://10.255.255.254:4040
Spark context available as 'sc' (master = local[*], app id = local-1768493723158).
SparkSession available as 'spark'.
>>>
When you don’t see the Spark welcome banner, then something has gone mistaken, and you must double-check your installation.
Getting our sample data set
We don’t need a sophisticated set for our purposes. A set of synthetic sales data with the next schema will suffice:
- order_id (int)
- order_date (date)
- customer_id (int)
- customer_name (str)
- product_id (int)
- product_name (str)
- category (str)
- quantity (int)
- price (float)
- total (float)
Our input data will probably be a 30-million-record CSV file. Here’s a Python program to generate the test data:
import polars as pl
import random
from datetime import datetime, timedelta
# Generate fake data
def generate_fake_data(num_records):
random.seed(42)
product_names = ['Laptop', 'Smartphone', 'Desk', 'Chair', 'Monitor',
'Printer', 'Paper', 'Pen', 'Notebook', 'Coffee Maker']
categories = ['Electronics', 'Electronics', 'Office', 'Office', 'Electronics',
'Electronics', 'Office', 'Office', 'Office', 'Electronics']
data = {
'order_id': range(num_records),
'order_date': [datetime(2023, 1, 1) + timedelta(days=random.randint(0, 364))
for _ in range(num_records)],
'customer_id': [random.randint(100, 999) for _ in range(num_records)],
'customer_name': [f'Customer_{random.randint(0, 99999)}' for _ in range(num_records)],
'product_id': [random.randint(200, 209) for _ in range(num_records)],
'product_name': [random.choice(product_names) for _ in range(num_records)],
'category': [random.choice(categories) for _ in range(num_records)],
'quantity': [random.randint(1, 10) for _ in range(num_records)],
'price': [round(random.uniform(1.99, 999.99), 2) for _ in range(num_records)]
}
df = pl.DataFrame(data)
df = df.with_columns((pl.col('price') * pl.col('quantity')).alias('total'))
return df
# Generate 30 million records
num_records = 30000000
df = generate_fake_data(num_records)
# Save to CSV
df.write_csv('/mnt/d/sales_data/sales_data_30m.csv')
print('CSV file with fake sales data has been created.')
Here’s what the primary few rows of my test data file looked like.
order_id,order_date,customer_id,customer_name,product_id,product_name,category,quantity,price,total
0,2023-11-24T00:00:00.000000,434,Customer_46318,201,Notebook,Office,6,925.68,5554.08
1,2023-02-27T00:00:00.000000,495,Customer_26514,203,Coffee Maker,Office,3,676.44,2029.3200000000002
2,2023-01-13T00:00:00.000000,377,Customer_56676,204,Pen,Electronics,10,533.2,5332.0
3,2023-05-21T00:00:00.000000,272,Customer_13772,209,Notebook,Electronics,5,752.0,3760.0
4,2023-05-06T00:00:00.000000,490,Customer_23118,206,Coffee Maker,Electronics,3,747.46,2242.38
5,2023-04-25T00:00:00.000000,515,Customer_88284,202,Desk,Electronics,10,886.22,8862.2
6,2023-03-13T00:00:00.000000,885,Customer_47303,200,Desk,Electronics,1,38.97,38.97
7,2023-02-22T00:00:00.000000,598,Customer_90712,203,Desk,Electronics,5,956.31,4781.549999999999
8,2023-12-13T00:00:00.000000,781,Customer_32943,205,Coffee Maker,Electronics,7,258.25,1807.75
9,2023-10-07T00:00:00.000000,797,Customer_40215,208,Pen,Electronics,8,464.81,3718.48
10,2023-02-14T00:00:00.000000,333,Customer_18388,209,Monitor,Electronics,1,478.95,478.95
Code Examples
Begin a Jupyter Notebook:
(pands_to_pyspark) $ jupyter notebook
The information and the 2 code sets I’ll be running are on my desktop PC. I’ll show the outputs from each code runs so you’ll be able to confirm they do the identical task, and I’ll include timings (in seconds) so you’ll be able to compare performance. The Pandas code and output first, then the Spark code and output.
The code snippets are short and well commented, so when you are already a Pandas programmer, it must be fairly easy to follow what’s happening within the PySpark code when you’re not already aware of it.
To be clear, because the input data set I’ll be using is NOT “big data”, the timings must be checked out as being of secondary importance.
Example 1 — Loading data from a CSV
We’ll start with a straightforward operation — simply reading our input CSV data file and sorting it by the order_date and order_id columns before displaying the primary and last five records.
Here’s the Pandas code.
import pandas as pd
import time
# 1. Define Path (WSL format)
file_path = "/mnt/d/sales_data/sales_data_30m.csv"
print(f"Starting process for {file_path}...")
# --- LOAD PHASE ---
start_load = time.time()
df = pd.read_csv(file_path)
end_load = time.time()
print(f"Loading complete. Time taken: {end_load - start_load:.2f} seconds")
# --- SORT PHASE ---
start_sort = time.time()
# Note: Sorting by two columns without delay
df_sorted = df.sort_values(by=['order_date', 'order_id'])
end_sort = time.time()
print(f"Sorting complete. Time taken: {end_sort - start_sort:.2f} seconds")
# --- DISPLAY ---
print("n" + "="*30)
print("TOP 5 RECORDS")
print(df_sorted.head(5))
print("nBOTTOM 5 RECORDS")
print(df_sorted.tail(5))
print("="*30)
total_time = end_sort - start_load
print(f"nTotal Execution Time: {total_time:.2f} seconds")
Here is the output.
(pands_to_pyspark) $ python ex1_pandas.py
Starting process for /mnt/d/sales_data/sales_data_30m.csv...
Loading complete. Time taken: 34.02 seconds
Sorting complete. Time taken: 7.00 seconds
==============================
TOP 5 RECORDS
order_id order_date customer_id customer_name ... category quantity price total
179 179 2023-01-01T00:00:00.000000 350 Customer_93033 ... Office 5 640.16 3200.80
520 520 2023-01-01T00:00:00.000000 858 Customer_31280 ... Electronics 3 841.21 2523.63
557 557 2023-01-01T00:00:00.000000 651 Customer_95137 ... Office 7 75.66 529.62
1080 1080 2023-01-01T00:00:00.000000 303 Customer_87422 ... Electronics 10 98.34 983.40
2023 2023 2023-01-01T00:00:00.000000 838 Customer_95193 ... Office 4 427.96 1711.84
[5 rows x 10 columns]
BOTTOM 5 RECORDS
order_id order_date customer_id customer_name ... category quantity price total
29997832 29997832 2023-12-31T00:00:00.000000 831 Customer_49372 ... Electronics 6 418.86 2513.16
29997903 29997903 2023-12-31T00:00:00.000000 449 Customer_17384 ... Office 3 494.29 1482.87
29998337 29998337 2023-12-31T00:00:00.000000 649 Customer_24018 ... Electronics 5 241.71 1208.55
29999674 29999674 2023-12-31T00:00:00.000000 105 Customer_39890 ... Office 1 94.97 94.97
29999933 29999933 2023-12-31T00:00:00.000000 572 Customer_38794 ... Office 8 375.36 3002.88
[5 rows x 10 columns]
==============================
Total Execution Time: 41.03 seconds
Here’s the equivalent Spark code and processing output.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType
import time
import pandas as pd
start_overall = time.time()
# 1. Initialize with explicit Memory and Shuffle tuning
spark = SparkSession.builder
.appName("OptimizedSpark")
.config("spark.sql.shuffle.partitions", "16")
.config("spark.driver.memory", "8g")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# 2. Define Manual Schema (Skips the double-read of inferSchema)
schema = StructType([
StructField("order_id", IntegerType(), True),
StructField("order_date", DateType(), True),
StructField("customer_id", IntegerType(), True),
StructField("customer_name", StringType(), True),
StructField("product_id", IntegerType(), True),
StructField("product_name", StringType(), True),
StructField("category", StringType(), True),
StructField("quantity", IntegerType(), True),
StructField("price", DoubleType(), True),
StructField("total", DoubleType(), True)
])
file_path = "/mnt/d/sales_data/sales_data_30m.csv"
print(f"Processing {file_path} with Optimized Spark...")
# --- LOAD ---
start_load = time.time()
# No inferSchema!
df = spark.read.csv(file_path, header=True, schema=schema)
print(f"LOAD INITIATED. (Time taken: {time.time() - start_load:.2f}s)")
# --- SORT ---
start_sort = time.time()
# Sorting 30M rows
df_sorted = df.orderBy(["order_date", "order_id"])
# Force the kind with a lightweight motion (NOT cache)
row_count = df_sorted.count()
end_sort = time.time()
print(f"SORT COMPLETE. Rows: {row_count}")
print(f" Time taken: {end_sort - start_sort:.2f} seconds")
# --- DISPLAY ---
print("n" + "="*80)
print("TOP 5 RECORDS")
print(df_sorted.limit(5).toPandas().to_string(index=False))
print("nBOTTOM 5 RECORDS")
tail_data = df_sorted.tail(5)
print(pd.DataFrame(tail_data, columns=df.columns).to_string(index=False))
print("="*80)
print(f"nTotal Execution Time: {time.time() - start_overall:.2f} seconds")
spark.stop()
And the output.
(pands_to_pyspark) $ spark-submit ex1_spark.py 2> /dev/null
Processing /mnt/d/sales_data/sales_data_30m.csv with Optimized Spark...
LOAD INITIATED. (Time taken: 0.72s)
SORT COMPLETE. Rows: 30000000
Time taken: 5.65 seconds
================================================================================
TOP 5 RECORDS
order_id order_date customer_id customer_name product_id product_name category quantity price total
179 2023-01-01 350 Customer_93033 207 Desk Office 5 640.16 3200.80
520 2023-01-01 858 Customer_31280 201 Pen Electronics 3 841.21 2523.63
557 2023-01-01 651 Customer_95137 209 Printer Office 7 75.66 529.62
1080 2023-01-01 303 Customer_87422 204 Smartphone Electronics 10 98.34 983.40
2023 2023-01-01 838 Customer_95193 201 Paper Office 4 427.96 1711.84
BOTTOM 5 RECORDS
order_id order_date customer_id customer_name product_id product_name category quantity price total
29997832 2023-12-31 831 Customer_49372 201 Chair Electronics 6 418.86 2513.16
29997903 2023-12-31 449 Customer_17384 205 Desk Office 3 494.29 1482.87
29998337 2023-12-31 649 Customer_24018 201 Smartphone Electronics 5 241.71 1208.55
29999674 2023-12-31 105 Customer_39890 203 Chair Office 1 94.97 94.97
29999933 2023-12-31 572 Customer_38794 201 Desk Office 8 375.36 3002.88
================================================================================
Total Execution Time: 36.12 seconds
Example 2— Converting a CSV file to Parquet
In this instance, we’ll read the identical 30M-record input CSV file, then write it out again as a Parquet file.
As before, we’ll start with the pandas code and output.
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import time
csv_file = "/mnt/d/sales_data/sales_data_30m.csv"
parquet_file = "/mnt/d/sales_data/sales_data_pandas_30m.parquet"
chunk_size = 1_000_000 # Process 1 million rows at a time
print(f"Starting memory-efficient conversion...")
start_total = time.time()
# 1. Create a CSV reader object (this does not load data yet)
reader = pd.read_csv(csv_file, chunksize=chunk_size)
parquet_writer = None
for i, chunk in enumerate(reader):
start_chunk = time.time()
# Convert Pandas chunk to PyArrow Table
table = pa.Table.from_pandas(chunk)
# Initialize the author on the primary chunk
if parquet_writer is None:
parquet_writer = pq.ParquetWriter(parquet_file, table.schema, compression='snappy')
# Write this chunk to the file
parquet_writer.write_table(table)
print(f"Processed chunk {i+1} (Rows {i*chunk_size} to {(i+1)*chunk_size}) in {time.time() - start_chunk:.2f}s")
# 2. Close the author
if parquet_writer:
parquet_writer.close()
print("n" + "="*40)
print(f"Conversion Complete!")
print(f"Total Time: {time.time() - start_total:.2f} seconds")
print("="*40)
The output.
(pands_to_pyspark) $ python ex2_pandas.py
Starting memory-efficient conversion...
Processed chunk 1 (Rows 0 to 1000000) in 4.82s
Processed chunk 2 (Rows 1000000 to 2000000) in 0.40s
Processed chunk 3 (Rows 2000000 to 3000000) in 0.39s
Processed chunk 4 (Rows 3000000 to 4000000) in 0.36s
Processed chunk 5 (Rows 4000000 to 5000000) in 0.43s
Processed chunk 6 (Rows 5000000 to 6000000) in 0.45s
Processed chunk 7 (Rows 6000000 to 7000000) in 0.35s
Processed chunk 8 (Rows 7000000 to 8000000) in 0.34s
Processed chunk 9 (Rows 8000000 to 9000000) in 0.36s
Processed chunk 10 (Rows 9000000 to 10000000) in 0.36s
Processed chunk 11 (Rows 10000000 to 11000000) in 0.37s
Processed chunk 12 (Rows 11000000 to 12000000) in 0.41s
Processed chunk 13 (Rows 12000000 to 13000000) in 0.48s
Processed chunk 14 (Rows 13000000 to 14000000) in 0.43s
Processed chunk 15 (Rows 14000000 to 15000000) in 0.38s
Processed chunk 16 (Rows 15000000 to 16000000) in 0.35s
Processed chunk 17 (Rows 16000000 to 17000000) in 0.34s
Processed chunk 18 (Rows 17000000 to 18000000) in 0.35s
Processed chunk 19 (Rows 18000000 to 19000000) in 0.36s
Processed chunk 20 (Rows 19000000 to 20000000) in 0.35s
Processed chunk 21 (Rows 20000000 to 21000000) in 0.34s
Processed chunk 22 (Rows 21000000 to 22000000) in 0.34s
Processed chunk 23 (Rows 22000000 to 23000000) in 0.34s
Processed chunk 24 (Rows 23000000 to 24000000) in 0.36s
Processed chunk 25 (Rows 24000000 to 25000000) in 0.36s
Processed chunk 26 (Rows 25000000 to 26000000) in 0.35s
Processed chunk 27 (Rows 26000000 to 27000000) in 0.36s
Processed chunk 28 (Rows 27000000 to 28000000) in 0.35s
Processed chunk 29 (Rows 28000000 to 29000000) in 0.35s
Processed chunk 30 (Rows 29000000 to 30000000) in 0.34s
========================================
Conversion Complete!
Total Time: 43.30 seconds
========================================
And now for PySpark.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType
import time
# Start the general timer immediately
start_overall = time.time()
# 1. Initialize Spark with high memory configuration
spark = SparkSession.builder
.appName("EfficientParquetConversion")
.config("spark.driver.memory", "8g")
.master("local[*]")
.getOrCreate()
# Silence logs
spark.sparkContext.setLogLevel("ERROR")
# 2. Explicitly define the Schema (Best for CSV)
schema = StructType([
StructField("order_id", IntegerType(), True),
StructField("order_date", DateType(), True),
StructField("customer_id", IntegerType(), True),
StructField("customer_name", StringType(), True),
StructField("product_id", IntegerType(), True),
StructField("product_name", StringType(), True),
StructField("category", StringType(), True),
StructField("quantity", IntegerType(), True),
StructField("price", DoubleType(), True),
StructField("total", DoubleType(), True)
])
csv_path = "/mnt/d/sales_data/sales_data_30m.csv"
parquet_path = "/mnt/d/sales_data/sales_data_parquet"
print(f"Starting Spark conversion to {parquet_path}...")
# 3. Read the CSV using the defined schema
start_proc = time.time()
df = spark.read.csv(csv_path, header=True, schema=schema)
# 4. Write to Parquet (Overwrite if exists)
df.write.mode("overwrite").parquet(parquet_path)
end_proc = time.time()
print("-" * 40)
print(f"CONVERSION COMPLETE")
print(f"Processing Time (Read + Write): {end_proc - start_proc:.2f} seconds")
print(f"Total Execution Time (incl. Spark startup): {time.time() - start_overall:.2f} seconds")
print("-" * 40)
spark.stop()
I can confirm that the contents of the parquet file created by Pandas and Pyspark were similar.
(pands_to_pyspark) $ spark-submit --driver-memory 8g ex2_spark.py 2> /dev/null
Starting Spark conversion to /mnt/d/sales_data/sales_data_parquet...
----------------------------------------
CONVERSION COMPLETE
Processing Time (Read + Write): 21.62 seconds
Total Execution Time (incl. Spark startup): 23.26 seconds
----------------------------------------
Example 3— Data pivoting
Read the Parquet files we just created and calculate the overall sales per product_name per order_date.
Pandas.
import pandas as pd
from timeit import default_timer as timer
parquet_path = r'/mnt/d/sales_data/sales_data_pandas_30m.parquet'
start = timer()
# Read the Parquet file
df = pd.read_parquet(parquet_path)
# 1) Make order_date a correct date
# Convert to datetime then extract the date component
df["order_date"] = pd.to_datetime(df["order_date"]).dt.date
# 2) Pivot (sum)
# Pandas pivot_table handles the aggregation (sum) and the form concurrently
pivot = df.pivot_table(
values="total",
index="order_date",
columns="product_name",
aggfunc="sum"
)
# 3) Sort rows by date (Pandas index)
pivot = pivot.sort_index()
# 4) Implement a consistent column order (alphabetical product columns)
# pivot_table already sorts columns by default, but we will be explicit
pivot = pivot.reindex(sorted(pivot.columns), axis=1)
# 5) (Optional) Replace nulls with 0
# pivot = pivot.fillna(0)
end = timer()
print(f"Pandas: read + standardized pivot took {end - start:.2f} seconds")
print(pivot.head(5))
Pandas Output.
(pandas_pysaprk) $ python ex3_pandas.py
Pandas: read + standardized pivot took 9.98 seconds
product_name Chair Coffee Maker Desk Laptop ... Paper Pen Printer Smartphone
order_date ...
2023-01-01 22041864.51 22596967.46 22228235.43 22319250.97 ... 22778128.78 22690394.34 22747419.90 22848102.42
2023-01-02 22702337.42 21960074.98 23539803.82 23332945.56 ... 22414013.44 22378123.52 22494364.89 22321919.79
2023-01-03 22626028.85 22651440.10 22930421.42 22938328.34 ... 22880161.09 21607713.73 22937117.72 22262604.28
2023-01-04 22605466.70 22652219.77 22463371.43 22506729.47 ... 23097987.72 22327386.63 22922449.38 22673066.75
2023-01-05 22581240.40 23004302.70 22511769.34 22882968.52 ... 22058769.99 22379327.80 22946133.94 22988219.48
[5 rows x 10 columns]
PySpark.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from timeit import default_timer as timer
# Initialize Spark
spark = SparkSession.builder
.appName("SparkPivotBenchmark")
.config("spark.driver.memory", "8g")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
parquet_path = '/mnt/d/sales_data/sales_data_parquet'
start = timer()
# 1. Read the Parquet file
df = spark.read.parquet(parquet_path)
# 2. Make order_date a correct date
# We solid the column to DateType
df = df.withColumn("order_date", F.col("order_date").solid("date"))
# 3. Pivot (sum)
# Spark's pivot is far faster when you provide the unique values (product_names)
# but it may possibly also infer them mechanically as shown below
pivot_df = df.groupBy("order_date")
.pivot("product_name")
.agg(F.sum("total"))
# 4. Sort rows by date
pivot_df = pivot_df.orderBy("order_date")
# 5. Implement consistent column order (alphabetical product columns)
# The primary column is 'order_date', the remainder are the pivoted products
columns = pivot_df.columns
product_cols = sorted([c for c in columns if c != "order_date"])
pivot_df = pivot_df.select(["order_date"] + product_cols)
# 6. Replace nulls with 0
pivot_df = pivot_df.na.fill(0)
# Trigger an motion to measure actual performance (count of pivoted days)
row_count = pivot_df.count()
end = timer()
print(f"PySpark: read + standardized pivot took {end - start:.2f} seconds")
print(f"Total days processed: {row_count}")
# 7. Display top 5
pivot_df.show(5)
spark.stop()
PySpark Output.
(pandas_pyspark) $ spark-submit --driver-memory 8g ex3_spark.py 2> /dev/null
PySpark: read + standardized pivot took 3.54 seconds
Total days processed: 365
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|order_date| Chair| Coffee Maker| Desk| Laptop| Monitor| Notebook| Paper| Pen| Printer| Smartphone|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|2023-01-01|2.2041864510000005E7|2.2596967459999997E7| 2.222823543E7|2.2319250969999995E7| 2.309861159E7|2.2687765309999995E7|2.2778128780000005E7|2.2690394339999996E7| 2.27474199E7|2.2848102419999998E7|
|2023-01-02| 2.270233742E7|2.1960074980000004E7|2.3539803819999993E7|2.3332945560000006E7|2.2441403840000004E7| 2.282151253E7| 2.241401344E7|2.2378123520000003E7| 2.249436489E7| 2.232191979E7|
|2023-01-03|2.2626028849999998E7| 2.26514401E7| 2.293042142E7| 2.293832834E7| 2.290862974E7|2.2432433990000006E7|2.2880161090000004E7|2.1607713730000008E7| 2.293711772E7| 2.226260428E7|
|2023-01-04|2.2605466699999996E7|2.2652219770000003E7| 2.246337143E7| 2.250672947000001E7|2.1930874809999995E7|2.3261865149999995E7| 2.309798772E7|2.2327386629999995E7|2.2922449380000003E7|2.2673066749999996E7|
|2023-01-05|2.2581240400000002E7|2.3004302700000003E7| 2.251176934E7|2.2882968520000003E7| 2.284090005E7| 2.272256243E7|2.2058769990000002E7|2.2379327800000004E7|2.2946133940000005E7| 2.298821948E7|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows
Example 4 — Windowing analytics with LAG/LEAD
For my final example code, we’ll calculate the SUM of all orders per order_date, then use LAG/LEAD functionality to calculate the share change in total orders over consecutive order dates.
Pandas.
import pandas as pd
from timeit import default_timer as timer
parquet_path = '/mnt/d/sales_data/sales_data_pandas_30m.parquet'
start = timer()
# 1. Read the Parquet file
df = pd.read_parquet(parquet_path)
# 2. Normalize order_date
# Pandas to_datetime is mostly flexible enough to handle multiple formats
# mechanically, which replaces the manual pl.coalesce logic.
df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce').dt.date
# 3. Group by date and aggregate
result_pandas = df.groupby("order_date")["total"].sum().reset_index()
# 4. Sort by date
result_pandas = result_pandas.sort_values("order_date")
# 5. Analytic functions (Lag and Lead)
# In Pandas, shift(1) is lag, shift(-1) is lead
result_pandas["total_lag"] = result_pandas["total"].shift(1)
result_pandas["total_lead"] = result_pandas["total"].shift(-1)
# 6. Calculate Percent Changes
# We use Series operations which handle the 'None/NaN' and 'divide by zero'
# logic just like pl.when().otherwise()
result_pandas["percent_change_from_lag"] = (
(result_pandas["total"] - result_pandas["total_lag"]) * 100 / result_pandas["total_lag"]
)
result_pandas["percent_change_from_lead"] = (
(result_pandas["total"] - result_pandas["total_lead"]) * 100 / result_pandas["total_lead"]
)
end = timer()
print(f"Pandas: read + analytic (lag/lead) took {end - start:.2f} seconds")
print(result_pandas.head(10).to_string(index=False))
Pandas Output.
(pandas_pyspark) $ python ex4_pandas.py
Pandas: read + analytic (lag/lead) took 8.99 seconds
order_date total total_lag total_lead percent_change_from_lag percent_change_from_lead
2023-01-01 226036740.71 NaN 226406499.79 NaN -0.163316
2023-01-02 226406499.79 226036740.71 226174879.26 0.163584 0.102408
2023-01-03 226174879.26 226406499.79 226441417.81 -0.102303 -0.117708
2023-01-04 226441417.81 226174879.26 226916194.65 0.117846 -0.209230
2023-01-05 226916194.65 226441417.81 226990804.43 0.209669 -0.032869
2023-01-06 226990804.43 226916194.65 225973424.85 0.032880 0.450221
2023-01-07 225973424.85 226990804.43 227894370.99 -0.448203 -0.842911
2023-01-08 227894370.99 225973424.85 227111347.09 0.850076 0.344775
2023-01-09 227111347.09 227894370.99 226271884.19 -0.343591 0.370997
2023-01-10 226271884.19 227111347.09 226635543.97 -0.369626 -0.160460
PySpark.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from timeit import default_timer as timer
# Initialize Spark
spark = SparkSession.builder
.appName("SparkAnalyticBenchmark")
.config("spark.driver.memory", "8g")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Path to the Parquet file
parquet_path = '/mnt/d/sales_data/sales_data_parquet'
start = timer()
# 1. Read the Parquet file
df = spark.read.parquet(parquet_path)
# 2. Normalize order_date
# Spark's to_date is efficient; coalesce handles multiple potential formats if needed
df = df.withColumn("order_date", F.to_date(F.col("order_date")))
# 3. Group by date and aggregate
daily_revenue = df.groupBy("order_date").agg(F.sum("total").alias("total"))
# 4. Define the Window for Analytic functions
# We must order by date for lag/result in make sense
window_spec = Window.orderBy("order_date")
# 5. Apply Lag and Lead
# lag(col, 1) = previous row; lead(col, 1) = next row
daily_revenue = daily_revenue.withColumn("total_lag", F.lag("total", 1).over(window_spec))
daily_revenue = daily_revenue.withColumn("total_lead", F.lead("total", 1).over(window_spec))
# 6. Calculate Percent Changes
# We use F.when() to handle nulls and avoid division by zero
daily_revenue = daily_revenue.withColumn(
"percent_change_from_lag",
F.when((F.col("total_lag").isNotNull()) & (F.col("total_lag") != 0),
(F.col("total") - F.col("total_lag")) * 100 / F.col("total_lag"))
.otherwise(None)
)
daily_revenue = daily_revenue.withColumn(
"percent_change_from_lead",
F.when((F.col("total_lead").isNotNull()) & (F.col("total_lead") != 0),
(F.col("total") - F.col("total_lead")) * 100 / F.col("total_lead"))
.otherwise(None)
)
# 7. Final Sort and Motion
result_spark = daily_revenue.orderBy("order_date")
# Trigger motion to measure performance
row_count = result_spark.count()
end = timer()
print(f"PySpark: read + analytic (lag/lead) took {end - start:.2f} seconds")
print(f"Total days processed: {row_count}")
# Display top 10
result_spark.show(10)
spark.stop()
PySpark Output.
(pandas_pyspark) $ spark-submit --driver-memory 8g ex4_spark.py 2> /dev/null
PySpark: read + analytic (lag/lead) took 4.05 seconds
Total days processed: 365
+----------+--------------------+--------------------+--------------------+-----------------------+------------------------+
|order_date| total| total_lag| total_lead|percent_change_from_lag|percent_change_from_lead|
+----------+--------------------+--------------------+--------------------+-----------------------+------------------------+
|2023-01-01| 2.2603674071E8| NULL|2.2640649979000002E8| NULL| -0.16331645970543143|
|2023-01-02|2.2640649979000002E8| 2.2603674071E8| 2.2617487926E8| 0.16358361868011784| 0.10240771687724477|
|2023-01-03| 2.2617487926E8|2.2640649979000002E8|2.2644141781000003E8| -0.1023029507610723| -0.11770750800707579|
|2023-01-04|2.2644141781000003E8| 2.2617487926E8|2.2691619464999998E8| 0.11784622185810545| -0.2092300378702583|
|2023-01-05|2.2691619464999998E8|2.2644141781000003E8|2.2699080442999995E8| 0.20966872782889678| -0.03286907599068832|
|2023-01-06|2.2699080442999995E8|2.2691619464999998E8| 2.259734248499999E8| 0.032879883304517334| 0.45022089684898775|
|2023-01-07| 2.259734248499999E8|2.2699080442999995E8|2.2789437099000004E8| -0.4482029933127909| -0.8429107448575048|
|2023-01-08|2.2789437099000004E8| 2.259734248499999E8|2.2711134708999988E8| 0.8500761278788644| 0.344775331586518|
|2023-01-09|2.2711134708999988E8|2.2789437099000004E8|2.2627188419000003E8| -0.34359071555765364| 0.37099744097899573|
|2023-01-10|2.2627188419000003E8|2.2711134708999988E8|2.2663554396999997E8| -0.3696261374678007| -0.1604601703817825|
+----------+--------------------+--------------------+--------------------+-----------------------+------------------------+
only showing top 10 rows
Summary
In this text, I explained that there are lots of paths to upgrade your systems if the info that you simply’re coping with starts to encroach on “big data” territory, such that it becomes difficult (or not possible) to process using your existing Pandas code base.
I cited three common alternatives: distributed libraries comparable to dask or ray, moving your data to an RDBMS and interrogating it with SQL, or using the distributed compute library – Spark.
Specializing in the latter, I outlined the case for PySpark, then used 4 real-world examples of typical data processing tasks for which Pandas is recurrently used, together with the equivalent PySpark code for every.
While the timing benchmarks showed some improvement in PySpark run times in comparison with Pandas, these weren’t the first focus. In any case, with even larger datasets, Pandas would simply not have the ability to process them in any respect, never mind inside a particular time-frame.
As an alternative, the principal aim of this text was to point out you the way relatively straightforward it’s to:
- Get a Spark environment up and running quickly.
- Replicate common Pandas data operations within the PySpark language to present you the peace of mind that big data mustn’t limit your processing abilities.
By bridging the gap between single-threaded evaluation and scalable big-data processing, know which you could confidently transition your workflows as your data outgrows your local hardware.
