about examining crime trends in your local area. You recognize that relevant data exists, and you might have some basic analytical skills which you can use to research this data. Nonetheless, this data is changing steadily, and you desire to keep your evaluation updated with probably the most recent crime incidents without repeating your evaluation. How can we automate this process?
Well, when you’ve stumbled upon this text, you’re in luck! Together, we’ll walk through easy methods to create a knowledge pipeline to extract local police log data, and connect this to a visualization platform to look at local crime trends over time. For this text, we’ll extract data on incidents reported to the Cambridge (MA) Police Department (CPD), after which visualize this data as a dashboard in Metabase.
Moreover, this text can function a general template for anybody looking to write down ETL pipelines orchestrated in Prefect, and/or anybody who wants to attach Metabase to their data stores to create insightful analyses/reports.
Contents:
Background Knowledge
Before we dive into the pipeline, it’ll be helpful to review the next concepts, or keep these links as reference as you read.
Data of Interest
The info we’ll be working with accommodates a set of police log entries, where each entry is a single incident reported to/by the CPD. Each entry accommodates comprehensive information describing the incident, including but not limited to:
- Date & time of the incident
- Sort of incident that occurred
- The road where the incident took place
- A plaintext description of what happened

Take a look at the portal for more information in regards to the data.
For monitoring crime trends in Cambridge, MA, creating a knowledge pipeline to extract this data is suitable, as the info is updated each day (in response to their website). If the info was updated less steadily (e.g. annually), then creating a knowledge pipeline to automate this process wouldn’t save us much effort. We could simply revisit the info portal at the tip of annually, download the .csv, and complete our evaluation.
Now that we’ve found the suitable dataset, let’s walk through the implementation.
ETL Pipeline
To go from raw CPD log data to a Metabase dashboard, our project will consist of the next major steps:
- Extract the info through the use of its corresponding API.
- Transforming it to organize it for storage.
- Loading it right into a PostgreSQL database.
- Visualizing the info in Metabase.
The info flow of our system will appear to be the next:

Our pipeline follows an ETL workflow, which implies that we’ll transform the info before importing it into PostgreSQL. This requires loading data into memory while executing data transformations, which could also be problematic for big datasets which might be too big to slot in memory. On this case, we may consider an ELT workflow, where we transform the info in the identical infrastructure where it’s stored. Since our dataset is small (<10k rows), this shouldn’t be an issue, and we’ll benefit from the undeniable fact that pandas makes data transformation easy.
We’ll extract the CPD log data by making a request for the dataset to the Socrata Open Data API. We’ll use sodapy — a python client for the API — to make the request.
We’ll encapsulate this extraction code in its own file — extract.py.
import pandas as pd
from sodapy import Socrata
from dotenv import load_dotenv
import os
from prefect import task
@task(retries=3, retry_delay_seconds=[10, 10, 10]) # retry API request in case of failure
def extract_data():
'''
Extract incident data reported to the Cambridge Police Department using the Socrata Open Data API.
Return the incident data as a Pandas DataFrame.
'''
# fetch Socrata app token from .env
# include this app token when interacting with the Socrata API to avoid request throttling, so we are able to fetch all of the incidents
load_dotenv()
APP_TOKEN = os.getenv("SOCRATA_APP_TOKEN")
# create Socrata client to interact with the Socrata API (https://github.com/afeld/sodapy)
client = Socrata(
"data.cambridgema.gov",
APP_TOKEN,
timeout=30 # increase timeout from 10s default - sometimes, it takes longer to fetch all the outcomes
)
# fetch all data, paginating over results
DATASET_ID = "3gki-wyrb" # unique identifier for Cambridge Police Log data (https://data.cambridgema.gov/Public-Safety/Each day-Police-Log/3gki-wyrb/about_data)
results = client.get_all(DATASET_ID)
# Convert to pandas DataFrame
results_df = pd.DataFrame.from_records(results)
return results_df
Notes in regards to the code:
- Socrata throttles requests when you don’t include an app token that uniquely identifies your application. To fetch all the outcomes, we’ll include this token in our request and put this in a .env file to maintain this out of our source code.
- We’ll specify a 30 second timeout (as an alternative of the ten second default timeout) when making our request to the Socrata API. From experience using the API, fetching all the outcomes could sometimes take longer than 10 seconds, and 30 seconds was typically enough to avoid timeout errors.
- We’ll load the fetched results right into a pandas DataFrame, since we’ll validate and transform this data using pandas.
ETL: Validate
Now, we’ll do some basic data quality checks on the info.
The info is already fairly clean (which is sensible because it’s provided by the Cambridge Police Department). So, our data quality checks will act more as a “sanity check” that we didn’t ingest anything unexpected.
We’ll validate the next:
- All of the expected columns (as specified here) are present.
- All IDs are numeric.
- Datetimes follow ISO 8601 format.
- There aren’t any missing values in columns that ought to contain data. Specifically, each incident must have a Datetime, ID, Type, and Location.
We’ll put this validation code in its own file — validate.py.
from datetime import datetime
from collections import Counter
import pandas as pd
from prefect import task
### UTILITIES
def check_valid_schema(df):
'''
Check whether the DataFrame content accommodates the expected columns for the Cambridge Police dataset.
Otherwise, raise an error.
'''
SCHEMA_COLS = ['date_time', 'id', 'type', 'subtype', 'location', 'last_updated', 'description']
if Counter(df.columns) != Counter(SCHEMA_COLS):
raise ValueError("Schema doesn't match with the expected schema.")
def check_numeric_id(df):
'''
Convert 'id' values to numeric.
If any 'id' values are non-numeric, replace them with NaN, so that they may be removed downstream in the info transformations.
'''
df['id'] = pd.to_numeric(df['id'], errors='coerce')
return df
def verify_datetime(df):
'''
Confirm 'date_time' values follow ISO 8601 format (https://www.iso.org/iso-8601-date-and-time-format.html).
Raise a ValueError if any of the 'date_time' values are invalid.
'''
df.apply(lambda row: datetime.fromisoformat(row['date_time']), axis=1)
def check_missing_values(df):
'''
Check whether there are any missing values in columns that require data.
For police logs, each incident must have a datetime, ID, incident type, and site.
'''
REQUIRED_COLS = ['date_time', 'id', 'type', 'location']
for col in REQUIRED_COLS:
if df[col].isnull().sum() > 0:
raise ValueError(f"Missing values are present within the '{col}' attribute.")
### VALIDATION LOGIC
@task
def validate_data(df):
'''
Check the info satisfies the next data quality checks:
- schema is valid
- IDs are numeric
- datetime follows ISO 8601 format
- no missing values in columns that require data
'''
check_valid_schema(df)
df = check_numeric_id(df)
verify_datetime(df)
check_missing_values(df)
return df
When implementing these data quality checks, it’s essential to take into consideration easy methods to handle data quality checks that fail.
- Do we would like our pipeline to fail loudly (e.g. raise an error/crash)?
- Should our pipeline handle failures silently? For example, mark data identified to be invalid in order that it will probably be removed downstream?
We’ll raise an error if:
- The ingested data doesn’t follow the expected schema. It doesn’t make sense to process the info if it doesn’t contain what we expect.
- Datetime doesn’t follow ISO 8601 format. There’s no standard approach to convert incorrect datetime values to its corresponding correct datetime format.
- The incident accommodates missing values for any one among datetime, ID, type, and site. Without these values, the incident can’t be described comprehensively.
For records which have non-numeric IDs, we’ll fill them with NaN placeholders after which remove them downstream within the transformation step. These records don’t break our evaluation if we simply remove them.
ETL: Transform
Now, we’ll do some transformations on our data to organize it for storage in PostgreSQL.
We’ll do the next transformations:
- Remove duplicate rows — we’ll use the ‘ID’ column to discover duplicates.
- Remove invalid rows — a few of the rows that failed the info quality checks were marked with an NaN ‘ID’, so we’ll remove these.
- Split the datetime column into separate 12 months, month, day, and time columns. In our final evaluation, we will want to analyze crime trends by these different time intervals, so we’ll create these additional columns here to simplify our queries downstream.
We’ll put this transformation code in its own file — transform.py.
import pandas as pd
from prefect import task
### UTILITIES
def remove_duplicates(df):
'''
Remove duplicate rows from dataframe based on 'id' column. Keep the primary occurrence.
'''
return df.drop_duplicates(subset=["id"], keep='first')
def remove_invalid_rows(df):
'''
Remove rows where the 'id' is NaN, as these IDs were identified as non-numeric.
'''
return df.dropna(subset='id')
def split_datetime(df):
'''
Split the date_time column into separate 12 months, month, day, and time columns.
'''
# convert to datetime
df['date_time'] = pd.to_datetime(df['date_time'])
# extract 12 months/month/day/time
df['year'] = df['date_time'].dt.12 months
df['month'] = df['date_time'].dt.month
df['day'] = df['date_time'].dt.day
df['hour'] = df['date_time'].dt.hour
df['minute'] = df['date_time'].dt.minute
df['second'] = df['date_time'].dt.second
return df
### TRANSFORMATION LOGIC
@task
def transform_data(df):
'''
Apply the next transformations to the passed in dataframe:
- deduplicate records (keep the primary)
- remove invalid rows
- split datetime into 12 months, month, day, and time columns
'''
df = remove_duplicates(df)
df = remove_invalid_rows(df)
df = split_datetime(df)
return df
ETL: Load
Now our data is able to import into into PostgreSQL.
Before we are able to import our data, we’d like to create our PostgreSQL instance. We’ll create one locally using a compose file. This file allows us to define & configure all of the services that our application needs.
services:
postgres_cpd: # postgres instance for CPD ETL pipeline
image: postgres:16
container_name: postgres_cpd_dev
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: cpd_db
ports:
- "5433:5432" # Postgres is already on port 5432 on my local machine
volumes:
- pgdata_cpd:/var/lib/postgresql/data
restart: unless-stopped
pgadmin:
image: dpage/pgadmin4
container_name: pgadmin_dev
environment:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
ports:
- "8081:80"
depends_on: # don't start pg_admin until our postgres instance is running
- postgres_cpd
volumes:
pgdata_cpd: # all data for our postgres_cpd service shall be stored here
There are two predominant services defined here:
- postgres_cpd — That is our PostgreSQL instance where we’ll store our data.
- pgadmin —DB admin platform which provides a GUI we are able to use to question data in our PostgreSQL database. Not functionally required, but useful for checking the info in our database. For more information on connecting to your PostgreSQL database in pgAdmin, click here.
Let’s highlight some essential configuration for our postgres_cpd service:
- container_name: postgres_cpd_dev -> Our service will run in a container (i.e. an isolated process) named postgres_cpd_dev. Docker generates random container names when you don’t specify this, so assigning a reputation will make it more straightforward to interact with the container.
- environment: -> We create a Postgres user from credentials stored in our .env file. Moreover, we create a default database, cpd_dev.
- ports: -> Our PostgreSQL service will listen on port 5432 inside the container. Nonetheless, we’ll map port 5433 on the host machine to port 5432 within the container, allowing us to connect with PostgreSQL from our host machine via port 5433.
- volumes: -> Our service will store all its data (e.g. configuration, data files) under the next directory inside the container: /var/lib/postgresql/data. We’ll mount this container directory to a named Docker volume stored on our local machine, pgdata_cpd. This enables us to persist the database data beyond the lifetime of the container.
Now that we’ve created our PostgreSQL instance, we are able to execute queries against it. Importing our data into PostgreSQL requires executing two queries against the database:
- Creating the table that can store the info.
- Loading our transformed data into that table.
Every time we execute a question against our PostgreSQL instance, we’d like to do the next:
- Establish our connection to PostgreSQL.
- Execute the query.
- Commit the changes & close the connection.
from prefect import task
from sqlalchemy import create_engine
import psycopg2
from dotenv import load_dotenv
import os
# read content from .env, which accommodates our Postgres credentials
load_dotenv()
def create_postgres_table():
'''
Create the cpd_incidents table in Postgres DB (cpd_db) if it doesn't exist.
'''
# establish connection to DB
conn = psycopg2.connect(
host="localhost",
port="5433",
database="cpd_db",
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD")
)
# create cursor object to execute SQL
cur = conn.cursor()
# execute query to create the table
create_table_query = '''
CREATE TABLE IF NOT EXISTS cpd_incidents (
date_time TIMESTAMP,
id INTEGER PRIMARY KEY,
type TEXT,
subtype TEXT,
location TEXT,
description TEXT,
last_updated TIMESTAMP,
12 months INTEGER,
month INTEGER,
day INTEGER,
hour INTEGER,
minute INTEGER,
second INTEGER
)
'''
cur.execute(create_table_query)
# commit changes
conn.commit()
# close cursor and connection
cur.close()
conn.close()
@task
def load_into_postgres(df):
'''
Loads the transformed data passed in as a DataFrame
into the 'cpd_incidents' table in our Postgres instance.
'''
# create table to insert data into as needed
create_postgres_table()
# create Engine object to connect with DB
engine = create_engine(f"postgresql://{os.getenv("POSTGRES_USER")}:{os.getenv("POSTGRES_PASSWORD")}@localhost:5433/cpd_db")
# insert data into Postgres DB into the 'cpd_incidents' table
df.to_sql('cpd_incidents', engine, if_exists='replace')
Things to notice in regards to the code above:
- Just like how we fetched our app token for extracting our data, we’ll fetch our Postgres credentials from a .env file.
- To load the DataFrame containing our transformed data into Postgres, we’ll use the pandas.DataFrame.to_sql(). It’s a straightforward approach to insert DataFrame data into any database supported by SQLAlchemy.
Defining the Data Pipeline
We’ve implemented the person components of the ETL process. Now, we’re able to encapsulate these components right into a pipeline.
There are numerous tools available to make use of for orchestrating pipelines defined in python. Two popular options are Apache Airflow and Prefect.
For it’s simplicity, we’ll proceed with defining our pipeline using Prefect. We want to do the next to start:
- Install Prefect in our development environment.
- Get a Prefect API server. Since we don’t need to manage our own infrastructure to run Prefect, we’ll join for for Prefect Cloud.
For more information on Prefect setup, take a look at the docs.
Next, we must add the next decorators to our code:
- @task -> Add this to every function that implements a component of our ETL pipeline (i.e. our extract, validate, transform, and cargo functions).
- @flow -> Add this decorator to the function that encapsulates the ETL components into an executable pipeline.
In the event you look back at our extract, validate, transform, and cargo code, you’ll see that we added the @task decorator to those functions.
Now, let’s define our ETL pipeline that executes these tasks. We’ll put the next in a separate file, etl_pipeline.py.
from extract import extract_data
from validate import validate_data
from transform import transform_data
from load import load_into_postgres
from prefect import flow
@flow(name="cpd_incident_etl", log_prints=True) # Our pipeline will appear as 'cpd_incident_etl' within the Prefect UI. All print outputs shall be displayed in Prefect.
def etl():
'''
Execute the ETL pipeline:
- Extract CPD incident data from the Socrata API
- Validate and transform the extracted data to organize it for storage
- Import the transformed data into Postgres
'''
print("Extracting data...")
extracted_df = extract_data()
print("Performing data quality checks...")
validated_df = validate_data(extracted_df)
print("Performing data transformations...")
transformed_df = transform_data(validated_df)
print("Importing data into Postgres...")
load_into_postgres(transformed_df)
print("ETL complete!")
if __name__ == "__main__":
# CPD data is anticipated to be updated each day (https://data.cambridgema.gov/Public-Safety/Each day-Police-Log/3gki-wyrb/about_data)
# Thus, we'll execute our pipeline each day (at midnight)
etl.serve(name="cpd-pipeline-deployment", cron="0 0 * * *")
Things to notice in regards to the code:
- @flow(name=”cpd_incident_etl”, log_prints=True) -> this names our pipeline “cpd_incident_etl”, which shall be reflected within the Prefect UI. The output of all our print statements shall be logged in Prefect.
- etl.serve(name=”cpd-pipeline-deployment”, cron=”0 0 * * *”) -> this creates a deployment of our pipeline, named “cpd-pipeline-deployment”, that runs on daily basis at midnight.


Now that we’ve created our pipeline to load our data into PostgreSQL, it’s time to visualise it.
There are numerous approaches we could take to visualise our data. Some notable options include:
Each are good options. Without going into an excessive amount of detail behind each BI tool, we’ll use Metabase to make a straightforward dashboard.
- Metabase is an open-source BI and embedded analytics tool that makes data visualization and evaluation easy.
- Connecting Metabase to our data sources and deploying it is easy, in comparison with other BI tools (ex: Apache Superset).
In the longer term, if we would like to have more customization over our visuals/reports, we are able to think about using other tools. For now, Metabase will do for making a POC.
Metabase permits you to select between using its cloud version or managing a self-hosted instance. Metabase Cloud offeres several payment plans, but you may create a self-hosted instance of Metabase at no cost using Docker. We’ll define our Metabase instance in our compose file.
- Since we’re self-hosting, we also should define the Metabase application database, which accommodates the metadata that Metabase needs to question your data sources (in our case, postgres_cpd).
services:
postgres_cpd: # postgres instance for CPD ETL pipeline
image: postgres:16
container_name: postgres_cpd_dev
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: cpd_db
ports:
- "5433:5432" # Postgres is already on port 5432 on my local machine
volumes:
- pgdata_cpd:/var/lib/postgresql/data
restart: unless-stopped
networks:
- metanet1
pgadmin:
image: dpage/pgadmin4
container_name: pgadmin_dev
environment:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
ports:
- "8081:80"
depends_on:
- postgres_cpd
networks:
- metanet1
metabase: # taken from https://www.metabase.com/docs/latest/installation-and-operation/running-metabase-on-docker
image: metabase/metabase:latest
container_name: metabase
hostname: metabase
volumes:
- /dev/urandom:/dev/random:ro
ports:
- "3000:3000"
environment:
MB_DB_TYPE: postgres
MB_DB_DBNAME: metabaseappdb
MB_DB_PORT: 5432
MB_DB_USER: ${METABASE_DB_USER}
MB_DB_PASS: ${METABASE_DB_PASSWORD}
MB_DB_HOST: postgres_metabase # must match container name of postgres_mb (Metabase Postgres instance)
networks:
- metanet1
healthcheck:
test: curl --fail -I http://localhost:3000/api/health || exit 1
interval: 15s
timeout: 5s
retries: 5
postgres_mb: # postgres instance for managing Metabase instance
image: postgres:16
container_name: postgres_metabase # other services must use this name to speak with this container
hostname: postgres_metabase # internal identifier, doesn't impact communication with other services (helpful for logs)
environment:
POSTGRES_USER: ${METABASE_DB_USER}
POSTGRES_DB: metabaseappdb
POSTGRES_PASSWORD: ${METABASE_DB_PASSWORD}
ports:
- "5434:5432"
volumes:
- pgdata_mb:/var/lib/postgresql/data
networks:
- metanet1
# Here, we'll define separate volumes to isolate DB configuration & data files for every Postgres database.
# Our Postgres DB for our application should store its config/data individually from the Postgres DB our Metabase service relies on.
volumes:
pgdata_cpd:
pgdata_mb:
# define the network over which all of the services will communicate
networks:
metanet1:
driver: bridge # TO DO: 'bridge' is the default network - services will give you the chance to speak with one another using their service names
To create our Metabase instance, we made the next changes to our compose file:
- Added two services: metabase (our Metabase instance) and postgres_mb (our Metabase instance’s application database).
- Defined a further volume, pgdata_mb. This can store the info for the Metabase application database (postgres_mb).
- Defined the network over which the services will communicate, metanet1.
Without going into an excessive amount of detail, let’s break down the metabase and postgres_mb services.
Our Metabase instance (metabase):
- This service shall be exposed on port 3000 on the host machine and inside the container. If we’re running this service on our local machine, we’ll give you the chance to access it at localhost:3000.
- We connect Metabase to it’s application database by ensuring that the MB_DB_HOST, MB_DB_PORT, and MB_DB_NAME environment variables match up with the container name, ports, and database name listed under the postgres_mb service.
For more information on easy methods to run Metabase in Docker, take a look at the docs.
After establishing Metabase, you’ll be prompted to attach Metabase to your data source.

After choosing a PostgreSQL data source, we are able to specify the next connection string to attach Metabase to our PostgreSQL instance, substituting your credentials as needed:
postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@postgres_cpd:5432/cpd_db

After establishing the connection, we are able to create our dashboard. You’ll be able to create a wide range of visuals in Metabase, so we won’t go into the specifics here.
Let’s revisit the instance dashboard that we displayed firstly of this post. This dashboard nicely summarizes recent and historical trends in reported CPD incidents.

From this dashboard, we are able to see the next:
- Most incidents are reported to the CPD within the mid-late afternoon.
- An amazing majority of reported incidents are of the “INCIDENT” type.
- The variety of reported incidents peaked around August-October of 2025, and has been decreasing steadily ever since.
Luckily for us, Metabase will query our database at any time when we load this dashboard, so we won’t should worry about this dashboard displaying stale data.
Take a look at the Git repo here if you desire to dive deeper into the implementation.
Wrap-up and Future Work
Thanks for reading! Let’s briefly recap what we built:
- We built a knowledge pipeline to extract, transform, and cargo Cambridge Police Log data right into a self-hosted PostgreSQL database.
- We deployed this pipeline using Prefect and scheduled it to run each day.
- We created a self-hosted instance of Metabase, connected it to our PostgreSQL database, and created a dashboard to visualise recent and historical crime trends in Cambridge, MA.
There are numerous ways to construct upon this project, including but not limited to:
- Creating additional visualizations (geospatial heatmap) to visualise crime frequencies in numerous areas inside Cambridge. This is able to require transforming our street location data into latitude/longitude coordinates.
- Deploying our self-hosted pipeline and services off of our local machine.
- Consider joining this data with other datasets for insightful cross-domain evaluation. For example, perhaps we could join this dataset to demographic/census data (using street location) to see whether areas of various demographic makeup inside Cambridge have different incident rates.
If you might have some other ideas for easy methods to extend upon this project, or you’ll’ve built things in another way, I’d love to listen to it within the comments!
Sources & GitHub
Prefect:
Metabase:
Docker:
GitHub Repo:
CPD Each day Police Log Dataset:
