Implementing a Machine Learning-Based Netflix Advice System using Apache Kafka and Python Step 1: Install Kafka and ZooKeeper in your machine Step 2: Download and extract the Netflix dataset Step 3: Create a Kafka producer to stream the info Step 4: Create a Kafka consumer to receive the streaming data Step 5: Implement a machine learning algorithm to make recommendations Step 6: Construct a Flask web application to display recommendations Step 7: Deploy the Flask web application on Apache Kafka Step 8: Test the appliance That’s It!

-

!pip install kafka-python
from kafka import KafkaProducer
import json
import time

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
json.dumps(x).encode('utf-8'))

with open('netflix_titles.csv') as f:
next(f) # Skip the header row
for line in f:
producer.send('netflix', line)
time.sleep(0.1) # Add a delay to simulate streaming

!pip install kafka-python
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer('netflix',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
print(message.value)

!pip install pandas numpy scikit-learn nltk
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
nltk.download('stopwords')
nltk.download('punkt')

# Load the Netflix dataset
netflix_df = pd.read_csv('netflix_titles.csv')

# Replace missing values with empty strings
netflix_df = netflix_df.fillna('')

# Mix the 'title' and 'description' columns right into a single column
netflix_df['combined'] = netflix_df['title'] + ' ' + netflix_df['description']

# Tokenize the text and take away stopwords
stop_words = set(stopwords.words('english'))
netflix_df['combined'] = netflix_df['combined'].apply(lambda x: ' '.join([word.lower() for word in word_tokenize(x) if word.lower() not in stop_words]))

# Compute TF-IDF vectors for every title
tfidf = TfidfVectorizer()
tfidf_matrix = tfidf.fit_transform(netflix_df['combined'])

# Compute cosine similarity between titles
cosine_sim = cosine_similarity(tfidf_matrix, tfidf_matrix)

# Define a function to get recommendations for a given title
def get_recommendations(title, cosine_sim=cosine_sim, df=netflix_df):
# Get the index of the title within the dataframe
idx = df[df['title']==title].index[0]

# Compute the cosine similarity between the title and all other titles
sim_scores = list(enumerate(cosine_sim[idx]))

# Sort the titles by similarity rating
sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)

# Get the highest 10 most similar titles
sim_scores = sim_scores[1:11]

# Get the titles indices
titles_indices = [i[0] for i in sim_scores]

# Return the highest 10 most similar titles
return df.iloc[titles_indices]['title'].tolist()

!pip install flask
from flask import Flask, render_template, request
app = Flask(__name__)

# Home page
@app.route('/')
def home():
return render_template('index.html')

# Advice page
@app.route('/recommend', methods=['POST'])
def recommend():
# Get the user's viewing history
history = request.form['history']

# Make recommendations based on the user's viewing history
recommendations = get_recommendations(history)

# Render the recommendations page with the recommendations
return render_template('recommendations.html', recommendations=recommendations)

if __name__ == '__main__':
app.run(debug=True)

!pip install confluent-kafka
from confluent_kafka import Producer

def delivery_report(err, msg):
if err isn't None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

p = Producer({'bootstrap.servers': 'localhost:9092'})

def publish_message(topic_name, value):
try:
p.produce(topic_name, value.encode('utf-8'), callback=delivery_report)
p.flush()
except Exception as e:
print('Exception in publishing message')
print(str(e))

from confluent_kafka import Consumer, KafkaError

c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})

c.subscribe(['viewing-history'])

while True:
msg = c.poll(1.0)

if msg is None:
proceed
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {}'.format(msg.topic()))
else:
print('Error while consuming message: {}'.format(msg.error()))
else:
print('Received message: {}'.format(msg.value().decode('utf-8')))
history = msg.value().decode('utf-8')
recommendations = get_recommendations(history)
publish_message('recommendations', ', '.join(recommendations))

$ kafka-console-producer --broker-list localhost:9092 --topic viewing-history
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic recommendations --from-beginning

Machine Learning & AI

ASK DUKE

What are your thoughts on this topic?
Let us know in the comments below.

6 COMMENTS

0 0 votes
Article Rating
guest
6 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

6
0
Would love your thoughts, please comment.x
()
x