: Our project might be to construct a advice system for Netflix users based on their viewing history. We’ll use Apache Kafka to stream user’s viewing history to a machine learning model that can make recommendations based on the user’s previous viewing history.
Listed below are the steps we’ll follow:
- Install Kafka and ZooKeeper in your machine
- Download and extract the Netflix dataset
- Create a Kafka producer to stream the info
- Create a Kafka consumer to receive the streaming data
- Implement a machine learning algorithm to make recommendations
- Construct a Flask web application to display recommendations
Let’s start!
First, we want to put in Kafka and ZooKeeper on our local machine. To do that, we will follow the instructions within the Apache Kafka documentation.
Next, we want to download and extract the Netflix dataset. You’ll be able to download the dataset from Kaggle.
Once you have got downloaded the dataset, extract it to a folder in your local machine.
Now we’re able to create a Kafka producer to stream the info from the dataset. We’ll use the kafka-python
library to create the producer.
First, let’s install the library:
!pip install kafka-python
Now let’s write the code for the Kafka producer:
from kafka import KafkaProducer
import json
import timeproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
json.dumps(x).encode('utf-8'))
with open('netflix_titles.csv') as f:
next(f) # Skip the header row
for line in f:
producer.send('netflix', line)
time.sleep(0.1) # Add a delay to simulate streaming
This code reads within the Netflix dataset file and sends each line to the Kafka topic ‘netflix’. We also add a delay of 0.1 seconds between each message to simulate streaming.
Next, we want to create a Kafka consumer to receive the streaming data from the producer. We’ll use the kafka-python
library again to create the buyer.
First, let’s install the library:
!pip install kafka-python
Now let’s write the code for the Kafka consumer:
from kafka import KafkaConsumer
import jsonconsumer = KafkaConsumer('netflix',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
print(message.value)
This code creates a Kafka consumer that listens to the ‘netflix’ topic and prints each received message to the console.
Now that we’re receiving streaming data from the Kafka producer, we will implement a machine learning algorithm to make recommendations based on the user’s viewing history.
For this project, we’ll use a content-based advice system. Which means that we’ll recommend titles to the user based on their previous viewing history. We’ll use the plot description of every title to make recommendations.
First, let’s install the mandatory libraries:
!pip install pandas numpy scikit-learn nltk
Now let’s write the code for the machine learning algorithm:
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
nltk.download('stopwords')
nltk.download('punkt')# Load the Netflix dataset
netflix_df = pd.read_csv('netflix_titles.csv')
# Replace missing values with empty strings
netflix_df = netflix_df.fillna('')
# Mix the 'title' and 'description' columns right into a single column
netflix_df['combined'] = netflix_df['title'] + ' ' + netflix_df['description']
# Tokenize the text and take away stopwords
stop_words = set(stopwords.words('english'))
netflix_df['combined'] = netflix_df['combined'].apply(lambda x: ' '.join([word.lower() for word in word_tokenize(x) if word.lower() not in stop_words]))
# Compute TF-IDF vectors for every title
tfidf = TfidfVectorizer()
tfidf_matrix = tfidf.fit_transform(netflix_df['combined'])
# Compute cosine similarity between titles
cosine_sim = cosine_similarity(tfidf_matrix, tfidf_matrix)
# Define a function to get recommendations for a given title
def get_recommendations(title, cosine_sim=cosine_sim, df=netflix_df):
# Get the index of the title within the dataframe
idx = df[df['title']==title].index[0]
# Compute the cosine similarity between the title and all other titles
sim_scores = list(enumerate(cosine_sim[idx]))
# Sort the titles by similarity rating
sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
# Get the highest 10 most similar titles
sim_scores = sim_scores[1:11]
# Get the titles indices
titles_indices = [i[0] for i in sim_scores]
# Return the highest 10 most similar titles
return df.iloc[titles_indices]['title'].tolist()
This code loads the Netflix dataset and pre-processes the text data by tokenizing the text and removing stop words. We then compute TF-IDF vectors for every title and compute cosine similarity between titles. Finally, we define a function to get recommendations for a given title based on cosine similarity.
Now that now we have a machine learning algorithm to make recommendations, we will construct a Flask web application to display the recommendations to the user.
First, let’s install the mandatory libraries:
!pip install flask
Now let’s write the code for the Flask web application:
from flask import Flask, render_template, request
app = Flask(__name__)# Home page
@app.route('/')
def home():
return render_template('index.html')
# Advice page
@app.route('/recommend', methods=['POST'])
def recommend():
# Get the user's viewing history
history = request.form['history']
# Make recommendations based on the user's viewing history
recommendations = get_recommendations(history)
# Render the recommendations page with the recommendations
return render_template('recommendations.html', recommendations=recommendations)
if __name__ == '__main__':
app.run(debug=True)
This code defines a Flask web application with two routes: the house page and the advice page. The house page displays a form where the user can enter their viewing history. When the user submits the shape, the appliance makes recommendations based on the user’s viewing history using the get_recommendations()
function we defined earlier. The recommendations are then displayed on the recommendations page.
Now that now we have a working Flask web application, we will deploy it on Apache Kafka using the confluent-kafka-python
library.
First, let’s install the mandatory libraries:
!pip install confluent-kafka
Now let’s write the code to publish messages to a Kafka topic:
from confluent_kafka import Producerdef delivery_report(err, msg):
if err isn't None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
p = Producer({'bootstrap.servers': 'localhost:9092'})
def publish_message(topic_name, value):
try:
p.produce(topic_name, value.encode('utf-8'), callback=delivery_report)
p.flush()
except Exception as e:
print('Exception in publishing message')
print(str(e))
This code defines a Producer
object that publishes messages to a Kafka topic using the publish_message()
function.
Next, let’s write the code to eat messages from the Kafka topic and make recommendations based on the user’s viewing history:
from confluent_kafka import Consumer, KafkaErrorc = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})
c.subscribe(['viewing-history'])
while True:
msg = c.poll(1.0)
if msg is None:
proceed
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {}'.format(msg.topic()))
else:
print('Error while consuming message: {}'.format(msg.error()))
else:
print('Received message: {}'.format(msg.value().decode('utf-8')))
history = msg.value().decode('utf-8')
recommendations = get_recommendations(history)
publish_message('recommendations', ', '.join(recommendations))
This code defines a Consumer
object that subscribes to the viewing-history
topic and consumes messages. When a message is received, the appliance makes recommendations based on the user’s viewing history using the get_recommendations()
function, and publishes the recommendations to the recommendations
topic using the publish_message()
function.
Now that now we have deployed the Flask web application on Apache Kafka, we will test the appliance by sending messages to the viewing-history
topic and receiving recommendations from the recommendations
topic.
To send messages to the viewing-history
topic, we will use the kafka-console-producer
command:
$ kafka-console-producer --broker-list localhost:9092 --topic viewing-history
This command opens a console where we will enter messages to send to the viewing-history
topic.
To receive recommendations from the recommendations
topic, we will use the kafka-console-consumer
command:
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic recommendations --from-beginning
This command opens a console where we will see the recommendations sent by the appliance.
On this tutorial, we built a machine learning project using Apache Kafka and Python to make personalized recommendations to users based on their viewing history.
We began by cleansing and pre-processing the Netflix dataset, then built a machine learning algorithm to compute cosine similarity between titles and make recommendations based on a user’s viewing history.
We then built a Flask web application to display the recommendations to the user, and deployed the appliance on Apache Kafka to enable real-time message processing. We also discussed how one can test the appliance by sending messages to Kafka topics.
This project demonstrates how Apache Kafka might be used to construct real-time machine learning applications that may handle large volumes of knowledge and make personalized recommendations in real-time. The project might be prolonged to handle more complex datasets and algorithms, and might be deployed on cloud platforms for scalable and reliable performance.
I hope you found this tutorial helpful in constructing your personal machine learning projects using Apache Kafka and Python. Pleased coding!