Home Artificial Intelligence Training a Machine Learning Model on a Kafka Stream Running Kafka with Docker A Kafka producer for training data A Kafka consumer for training an ML model Conclusion

Training a Machine Learning Model on a Kafka Stream Running Kafka with Docker A Kafka producer for training data A Kafka consumer for training an ML model Conclusion

0
Training a Machine Learning Model on a Kafka Stream
Running Kafka with Docker
A Kafka producer for training data
A Kafka consumer for training an ML model
Conclusion

Updating a machine learning model online and in near real-time using training data generated by a Kafka producer

Photo by Jonathan Borba on Unsplash

Recently, I’ve grow to be increasingly desirous about online machine learning — the flexibility to update an ML model’s weights in a production setting. Besides the the subject providing fun architectural challenges for me, the approach boasts massive potential gains. This study from Grubhub in 2021 demonstrated a +20% with metrics increase and 45x cost savings by leveraging online learning, and I’m all about saving money to earn a living.

Stateful retraining — Image by Chip Huyen with permission
Online learning — Image by Creator

From a practical perspective, nonetheless, working with data streams and streaming architecture remains to be pretty latest to ML practitioners. Making a real-time stream of coaching data aside, there are fairly few resources on consuming such a knowledge source to update a model in a web based setting. In this text, I’ll display:

  • Establishing a Kafka instance
  • Making a producer that generates training data
  • Making a consumer that uses that training data to update an ML model

My preferred approach to working with Kafka locally is via docker-compose. If not already installed in your environment, you may follow instructions here.

Shuyi Yang’s article on the subject provides a high-level overview of this approach, and we will use an identical docker-compose.yaml file that creates local Kafka and Zookeeper instances and exposes Kafka on port 9092:

version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
depends_on:
- zookeeper
image: wurstmeister/kafka
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "ml_training_data:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock

It also creates a Kafka topic called ml_training_data that we’ll use later. You’ll be able to run the file by changing within the directory with the file above and running:

docker-compose up

First, let’s install the Python libraries we’ll need to make use of:

python -m pip install kafka-python river  

Next, we want to create a synthetic source of coaching data that’s written to our Kafka topic. For this, we’ll use the River Python library, which has easy-to-use APIs for streaming data:

from time import sleep
from json import dumps
import random

from river import datasets
from kafka import KafkaProducer

# create a kafka product that connects to Kafka on port 9092
producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda x: dumps(x).encode("utf-8"),
)

# Initialize the River phishing dataset.
# This dataset accommodates features from web pages
# which can be classified as phishing or not.
dataset = datasets.Phishing()

# Send observations to the Kafka topic one-at-a-time with a random sleep
for x, y in dataset:
print(f"Sending: {x, y}")
data = {"x": x, "y": y}
producer.send("ml_training_data", value=data)
sleep(random.random())

The code above uses the toy River Phishing dataset (CC BY 4.0), and sends labeled data observations to our Kafka topic one-at-a-time. This dataset accommodates features from web pages which can be classified as phishing or not. The samples within the dataset are tuples that seem like this:


[({'empty_server_form_handler': 0.0,
'popup_window': 0.0,
'https': 0.0,
'request_from_other_domain': 0.0,
'anchor_from_other_domain': 0.0,
'is_popular': 0.5,
'long_url': 1.0,
'age_of_domain': 1,
'ip_in_url': 1},
True),
({'empty_server_form_handler': 1.0,
'popup_window': 0.0,
'https': 0.5,
'request_from_other_domain': 0.5,
'anchor_from_other_domain': 0.0,
'is_popular': 0.5,
'long_url': 0.0,
'age_of_domain': 1,
'ip_in_url': 0},
True)]

First, run the producer:

python producer.py

Then you must see the next within the console:


Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 0.5, 'https': 1.0, 'request_from_other_domain': 1.0, 'anchor_from_other_domain': 0.5, 'is_popular': 0.5, 'long_url': 0.0, 'age_of_domain': 1, 'ip_in_url': 1}, False)
Sending: ({'empty_server_form_handler': 0.0, 'popup_window': 0.5, 'https': 0.0, 'request_from_other_domain': 0.0, 'anchor_from_other_domain': 0.0, 'is_popular': 0.5, 'long_url': 0.0, 'age_of_domain': 1, 'ip_in_url': 0}, True)
Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 1.0, 'https': 1.0, 'request_from_other_domain': 0.0, 'anchor_from_other_domain': 1.0, 'is_popular': 0.0, 'long_url': 0.5, 'age_of_domain': 1, 'ip_in_url': 0}, False)
Sending: ({'empty_server_form_handler': 0.5, 'popup_window': 0.0, 'https': 0.0, 'request_from_other_domain': 0.5, 'anchor_from_other_domain': 1.0, 'is_popular': 0.5, 'long_url': 1.0, 'age_of_domain': 0, 'ip_in_url': 0}, True)
Sending: ({'empty_server_form_handler': 0.0, 'popup_window': 0.0, 'https': 1.0, 'request_from_other_domain': 1.0, 'anchor_from_other_domain': 0.0, 'is_popular': 1.0, 'long_url': 0.0, 'age_of_domain': 0, 'ip_in_url': 0}, True)
Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 1.0, 'https': 1.0, 'request_from_other_domain': 0.5, 'anchor_from_other_domain': 0.0, 'is_popular': 1.0, 'long_url': 1.0, 'age_of_domain': 0, 'ip_in_url': 0}, False)

Writing a straightforward Kafka consumer will allow us to read the information we’ve been pushing from the stream because it is available in, and use it to update the weights on our model.


from json import loads
from time import sleep

from kafka import KafkaConsumer

from river import linear_model
from river import compose
from river import preprocessing
from river import metrics

# use rocauc because the metric for evaluation
metric = metrics.ROCAUC()

# create a straightforward LR model with a scaler
model = compose.Pipeline(
preprocessing.StandardScaler(), linear_model.LogisticRegression()
)

# create our Kafka consumer
consumer = KafkaConsumer(
"ml_training_data",
bootstrap_servers=["localhost:9092"],
auto_offset_reset="earliest",
enable_auto_commit=True,
group_id="my-group-id",
value_deserializer=lambda x: loads(x.decode("utf-8")),
)

# use each event to update our model and print the metrics
for event in consumer:
event_data = event.value
try:
x = event_data["x"]
y = event_data["y"]
y_pred = model.predict_proba_one(x)
model.learn_one(x, y)
metric.update(y, y_pred)
print(metric)
except:
print("Processing bad data...")

The code above initializes a straightforward ML model using River’s LogisticRegression class. Then, we repeatedly process events and use them to update our ML model — printing the ROCAUC metric for every sample added.

To begin training, run:

python consumer.py

You need to see something just like the following within the console because the model learns remark by remark!

ROCAUC: 87.12%
ROCAUC: 87.29%
ROCAUC: 87.42%
ROCAUC: 87.29%
ROCAUC: 87.42%

Each continuous training and online learning have huge potential in areas where real-time or near real-time labeled data might be made available to models making real-time decisions. All code and directions can be found in this Github repo. More to return soon!

LEAVE A REPLY

Please enter your comment!
Please enter your name here