Asynchronous Machine Learning Inference with Celery, Redis, and Florence 2

-

An easy tutorial to get you began on asynchronous ML inference

Photo by Fabien BELLANGER on Unsplash

Most machine learning serving tutorials concentrate on real-time synchronous serving, which allows for immediate responses to prediction requests. Nonetheless, this approach can struggle with surges in traffic and is just not ideal for long-running tasks. It also requires more powerful machines to reply quickly, and if the client or server fails, the prediction result is frequently lost.

On this blog post, we’ll exhibit how you can run a machine learning model as an asynchronous employee using Celery and Redis. We shall be using the Florence 2 base model, a Vision language model known for its impressive performance. This tutorial will provide a minimal yet functional example that you may adapt and extend for your individual use cases.

You may check a demo of the app here: https://caption-app-dfmj3maizq-ew.a.run.app/

The core of our solution is predicated on Celery, a Python library that implements this client/employee logic for us. It allows us to distribute the compute work across many employees, improving the scalability of your ML inference use case to high and unpredictable loads.

The method works as follows:

  1. The client submits a task with some parameters to a queue managed by the broker (Redis in our example).
  2. A employee (or multiple ones) constantly monitors the queue and picks up tasks as they arrive. It then executes them and saves the lead to the backend storage.
  3. The client is in a position to fetch the results of the duty using its id either by polling the backend or by subscribing to the duty’s channel.

Let’s start with a simplified example:

Image by Writer

First, run Redis:

docker run -p 6379:6379 redis

Here is the employee code:

from celery import Celery
# Configure Celery to make use of Redis because the broker and backend
app = Celery(
"tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0"
)
# Define a sure bet
@app.task
def add(x, y):
return x + y
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info"])

And the client code:

from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")
print(f"{app.control.inspect().energetic()=}")
task_name = "tasks.add"
add = app.signature(task_name)
print("Gotten Task")
# Send a task to the employee
result = add.delay(4, 6)
print("Waiting for Task")
result.wait()
# Get the result
print(f"Result: {result.result}")

This offers the result that we expect: “Result: 10”

Now, let’s move on to the actual use case: Serving Florence 2.

We are going to construct a multi-container image captioning application that uses Redis for task queuing, Celery for task distribution, and a neighborhood volume or Google Cloud Storage for potential image storage. The applying is designed with few core components: model inference, task distribution, client interaction and file storage.

Architecture Overview:

Image by writer
  1. Client: Initiates image captioning requests by sending them to the employee (through the broker).
  2. Employee: Receives requests, downloads images, performs inference using the pre-trained model, and returns results.
  3. Redis: Acts as a message broker facilitating communication between the client and employee.
  4. File Storage: Temporary storage for image files

Component Breakdown:

1. Model Inference (model.py):

  • Dependencies & Initialization:
import os
from io import BytesIO
import requests
from google.cloud import storage
from loguru import logger
from modeling_florence2 import Florence2ForConditionalGeneration
from PIL import Image
from processing_florence2 import Florence2Processor
model = Florence2ForConditionalGeneration.from_pretrained(
"microsoft/Florence-2-base-ft"
)
processor = Florence2Processor.from_pretrained("microsoft/Florence-2-base-ft")
  • Imports vital libraries for image processing, web requests, Google Cloud Storage interaction, and logging.
  • Initializes the pre-trained Florence-2 model and processor for image caption generation.
  • Image Download (download_image):
def download_image(url):
if url.startswith("http://") or url.startswith("https://"):
# Handle HTTP/HTTPS URLs
# ... (code to download image from URL) ...
elif url.startswith("gs://"):
# Handle Google Cloud Storage paths
# ... (code to download image from GCS) ...
else:
# Handle local file paths
# ... (code to open image from local path) ...
  • Downloads the image from the provided URL.
  • Supports HTTP/HTTPS URLs, Google Cloud Storage paths (gs://), and native file paths.
  • Inference Execution (run_inference):
def run_inference(url, task_prompt):
# ... (code to download image using download_image function) ...
try:
# ... (code to open and process the image) ...
inputs = processor(text=task_prompt, images=image, return_tensors="pt")
except ValueError:
# ... (error handling) ...
# ... (code to generate captions using the model) ...
generated_ids = model.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
# ... (model generation parameters) ...
)
# ... (code to decode generated captions) ...
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
# ... (code to post-process generated captions) ...
parsed_answer = processor.post_process_generation(
generated_text, task=task_prompt, image_size=(image.width, image.height)
)
return parsed_answer

Orchestrates the image captioning process:

  • Downloads the image using download_image.
  • Prepares the image and task prompt for the model.
  • Generates captions using the loaded Florence-2 model.
  • Decodes and post-processes the generated captions.
  • Returns the ultimate caption.

2. Task Distribution (employee.py):

import os
from celery import Celery
# ... other imports ...
# Get Redis URL from environment variable or use default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Configure Celery to make use of Redis because the broker and backend
app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)
# ... (Celery configurations) ...
  • Sets up Celery to make use of Redis because the message broker for task distribution.
  • Task Definition (inference_task):
@app.task(bind=True, max_retries=3)
def inference_task(self, url, task_prompt):
# ... (logging and error handling) ...
return run_inference(url, task_prompt)
  • Defines the inference_task that shall be executed by Celery employees.
  • This task calls the run_inference function from model.py.
  • Employee Execution:
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info", "--pool=solo"])
  • Starts a Celery employee that listens for and executes tasks.

3. Client Interaction (client.py):

import os
from celery import Celery
# Get Redis URL from environment variable or use default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Configure Celery to make use of Redis because the broker and backend
app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)
  • Establishes a connection to Celery using Redis because the message broker.
  • Task Submission (send_inference_task):
def send_inference_task(url, task_prompt):
task = inference_task.delay(url, task_prompt)
print(f"Task sent with ID: {task.id}")
# Wait for the result
result = task.get(timeout=120)
return result
  • Sends a picture captioning task (inference_task) to the Celery employee.
  • Waits for the employee to finish the duty and retrieves the result.

Docker Integration (docker-compose.yml):

  • Defines a multi-container setup using Docker Compose:
  • redis: Runs the Redis server for message brokering.
  • model: Builds and deploys the model inference employee.
  • app: Builds and deploys the client application.
Flower image by RoonZ nl on Unsplash
  • flower: Runs a web-based Celery task monitoring tool.
Image by writer

You may run the total stack using:

docker-compose up

And there you may have it! We’ve just explored a comprehensive guide to constructing an asynchronous machine learning inference system using Celery, Redis, and Florence 2. This tutorial demonstrated how you can effectively use Celery for task distribution, Redis for message brokering, and Florence 2 for image captioning. By embracing asynchronous workflows, you may handle high volumes of requests, improve performance, and enhance the general resilience of your ML inference applications. The provided Docker Compose setup permits you to run the whole system on your individual with a single command.

Ready for the following step? Deploying this architecture to the cloud can have its own set of challenges. Let me know within the comments if you happen to’d wish to see a follow-up post on cloud deployment!

Code: https://github.com/CVxTz/celery_ml_deploy
Demo: https://caption-app-dfmj3maizq-ew.a.run.app/

ASK ANA

What are your thoughts on this topic?
Let us know in the comments below.

0 0 votes
Article Rating
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

0
Would love your thoughts, please comment.x
()
x