Implementing a Machine Learning-Based Netflix Advice System using Apache Kafka and Python Step 1: Install Kafka and ZooKeeper in your machine Step 2: Download and extract the Netflix dataset Step 3: Create a Kafka producer to stream the info Step 4: Create a Kafka consumer to receive the streaming data Step 5: Implement a machine learning algorithm to make recommendations Step 6: Construct a Flask web application to display recommendations Step 7: Deploy the Flask web application on Apache Kafka Step 8: Test the appliance That’s It!


!pip install kafka-python
from kafka import KafkaProducer
import json
import time

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:

with open('netflix_titles.csv') as f:
next(f) # Skip the header row
for line in f:
producer.send('netflix', line)
time.sleep(0.1) # Add a delay to simulate streaming

!pip install kafka-python
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer('netflix',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:

!pip install pandas numpy scikit-learn nltk
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize'stopwords')'punkt')

# Load the Netflix dataset
netflix_df = pd.read_csv('netflix_titles.csv')

# Replace missing values with empty strings
netflix_df = netflix_df.fillna('')

# Mix the 'title' and 'description' columns right into a single column
netflix_df['combined'] = netflix_df['title'] + ' ' + netflix_df['description']

# Tokenize the text and take away stopwords
stop_words = set(stopwords.words('english'))
netflix_df['combined'] = netflix_df['combined'].apply(lambda x: ' '.join([word.lower() for word in word_tokenize(x) if word.lower() not in stop_words]))

# Compute TF-IDF vectors for every title
tfidf = TfidfVectorizer()
tfidf_matrix = tfidf.fit_transform(netflix_df['combined'])

# Compute cosine similarity between titles
cosine_sim = cosine_similarity(tfidf_matrix, tfidf_matrix)

# Define a function to get recommendations for a given title
def get_recommendations(title, cosine_sim=cosine_sim, df=netflix_df):
# Get the index of the title within the dataframe
idx = df[df['title']==title].index[0]

# Compute the cosine similarity between the title and all other titles
sim_scores = list(enumerate(cosine_sim[idx]))

# Sort the titles by similarity rating
sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)

# Get the highest 10 most similar titles
sim_scores = sim_scores[1:11]

# Get the titles indices
titles_indices = [i[0] for i in sim_scores]

# Return the highest 10 most similar titles
return df.iloc[titles_indices]['title'].tolist()

!pip install flask
from flask import Flask, render_template, request
app = Flask(__name__)

# Home page
def home():
return render_template('index.html')

# Advice page
@app.route('/recommend', methods=['POST'])
def recommend():
# Get the user's viewing history
history = request.form['history']

# Make recommendations based on the user's viewing history
recommendations = get_recommendations(history)

# Render the recommendations page with the recommendations
return render_template('recommendations.html', recommendations=recommendations)

if __name__ == '__main__':

!pip install confluent-kafka
from confluent_kafka import Producer

def delivery_report(err, msg):
if err isn't None:
print('Message delivery failed: {}'.format(err))
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

p = Producer({'bootstrap.servers': 'localhost:9092'})

def publish_message(topic_name, value):
p.produce(topic_name, value.encode('utf-8'), callback=delivery_report)
except Exception as e:
print('Exception in publishing message')

from confluent_kafka import Consumer, KafkaError

c = Consumer({
'bootstrap.servers': 'localhost:9092',
'': 'mygroup',
'auto.offset.reset': 'earliest'


while True:
msg = c.poll(1.0)

if msg is None:
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {}'.format(msg.topic()))
print('Error while consuming message: {}'.format(msg.error()))
print('Received message: {}'.format(msg.value().decode('utf-8')))
history = msg.value().decode('utf-8')
recommendations = get_recommendations(history)
publish_message('recommendations', ', '.join(recommendations))

$ kafka-console-producer --broker-list localhost:9092 --topic viewing-history
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic recommendations --from-beginning

Machine Learning & AI


What are your thoughts on this topic?
Let us know in the comments below.


0 0 votes
Article Rating
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

Would love your thoughts, please comment.x