Asynchronous LLM API Calls in Python: A Comprehensive Guide

-

As developers and dta scientists, we regularly find ourselves needing to interact with these powerful models through APIs. Nonetheless, as our applications grow in complexity and scale, the necessity for efficient and performant API interactions becomes crucial. That is where asynchronous programming shines, allowing us to maximise throughput and minimize latency when working with LLM APIs.

On this comprehensive guide, we’ll explore the world of asynchronous LLM API calls in Python. We’ll cover the whole lot from the fundamentals of asynchronous programming to advanced techniques for handling complex workflows. By the tip of this text, you’ll need a solid understanding of the right way to leverage asynchronous programming to supercharge your LLM-powered applications.

Before we dive into the specifics of async LLM API calls, let’s establish a solid foundation in asynchronous programming concepts.

Asynchronous programming allows multiple operations to be executed concurrently without blocking the foremost thread of execution. In Python, that is primarily achieved through the asyncio module, which provides a framework for writing concurrent code using coroutines, event loops, and futures.

Key concepts:

  • Coroutines: Functions defined with async def that could be paused and resumed.
  • Event Loop: The central execution mechanism that manages and runs asynchronous tasks.
  • Awaitables: Objects that could be used with the await keyword (coroutines, tasks, futures).

Here’s an easy example as an example these concepts:

import asyncio
async def greet(name):
    await asyncio.sleep(1)  # Simulate an I/O operation
    print(f"Hello, {name}!")
async def foremost():
    await asyncio.gather(
        greet("Alice"),
        greet("Bob"),
        greet("Charlie")
    )
asyncio.run(foremost())

In this instance, we define an asynchronous function greet that simulates an I/O operation with asyncio.sleep(). The foremost function uses asyncio.gather() to run multiple greetings concurrently. Despite the sleep delay, all three greetings will likely be printed after roughly 1 second, demonstrating the ability of asynchronous execution.

The Need for Async in LLM API Calls

When working with LLM APIs, we regularly encounter scenarios where we want to make multiple API calls, either in sequence or parallel. Traditional synchronous code can result in significant performance bottlenecks, especially when coping with high-latency operations like network requests to LLM services.

Consider a scenario where we want to generate summaries for 100 different articles using an LLM API. With a synchronous approach, each API call would block until it receives a response, potentially taking several minutes to finish all requests. An asynchronous approach, alternatively, allows us to initiate multiple API calls concurrently, dramatically reducing the general execution time.

Setting Up Your Environment

To start with async LLM API calls, you will need to establish your Python environment with the obligatory libraries. Here’s what you will need:

  • Python 3.7 or higher (for native asyncio support)
  • aiohttp: An asynchronous HTTP client library
  • openai: The official OpenAI Python client (in the event you’re using OpenAI’s GPT models)
  • langchain: A framework for constructing applications with LLMs (optional, but really helpful for complex workflows)

You may install these dependencies using pip:

pip install aiohttp openai langchain

Basic Async LLM API Calls with asyncio and aiohttp

Let's start by making an easy asynchronous call to an LLM API using aiohttp. We'll use OpenAI's GPT-3.5 API for example, however the concepts apply to other LLM APIs as well.

import asyncio
import aiohttp
from openai import AsyncOpenAI
async def generate_text(prompt, client):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.decisions[0].message.content
async def foremost():
    prompts = [
        "Explain quantum computing in simple terms.",
        "Write a haiku about artificial intelligence.",
        "Describe the process of photosynthesis."
    ]
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, lead to zip(prompts, results):
        print(f"Prompt: {prompt}nResponse: {result}n")
asyncio.run(foremost())

In this instance, we define an asynchronous function generate_text that makes a call to the OpenAI API using the AsyncOpenAI client. The foremost function creates multiple tasks for various prompts and uses asyncio.gather() to run them concurrently.

This approach allows us to send multiple requests to the LLM API concurrently, significantly reducing the entire time required to process all prompts.

Advanced Techniques: Batching and Concurrency Control

While the previous example demonstrates the fundamentals of async LLM API calls, real-world applications often require more sophisticated approaches. Let's explore two vital techniques: batching requests and controlling concurrency.

Batching Requests: When coping with numerous prompts, it's often more efficient to batch them into groups slightly than sending individual requests for every prompt. This reduces the overhead of multiple API calls and might lead to raised performance.

import asyncio
from openai import AsyncOpenAI
async def process_batch(batch, client):
    responses = await asyncio.gather(*[
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        ) for prompt in batch
    ])
    return [response.choices[0].message.content for response in responses]
async def foremost():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    batch_size = 10
    
    async with AsyncOpenAI() as client:
        results = []
        for i in range(0, len(prompts), batch_size):
            batch = prompts[i:i+batch_size]
            batch_results = await process_batch(batch, client)
            results.extend(batch_results)
    
    for prompt, lead to zip(prompts, results):
        print(f"Prompt: {prompt}nResponse: {result}n")
asyncio.run(foremost())

Concurrency Control: While asynchronous programming allows for concurrent execution, it is important to manage the extent of concurrency to avoid overwhelming the API server or exceeding rate limits. We will use asyncio.Semaphore for this purpose.

import asyncio
from openai import AsyncOpenAI
async def generate_text(prompt, client, semaphore):
    async with semaphore:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.decisions[0].message.content
async def foremost():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, lead to zip(prompts, results):
        print(f"Prompt: {prompt}nResponse: {result}n")
asyncio.run(foremost())

In this instance, we use a semaphore to limit the variety of concurrent requests to five, ensuring we do not overwhelm the API server.

Error Handling and Retries in Async LLM Calls

When working with external APIs, it's crucial to implement robust error handling and retry mechanisms. Let's enhance our code to handle common errors and implement exponential backoff for retries.

import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
class APIError(Exception):
    pass
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(prompt, client):
    try:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.decisions[0].message.content
    except Exception as e:
        print(f"Error occurred: {e}")
        raise APIError("Did not generate text")
async def process_prompt(prompt, client, semaphore):
    async with semaphore:
        try:
            result = await generate_text_with_retry(prompt, client)
            return prompt, result
        except APIError:
            return prompt, "Did not generate response after multiple attempts."
async def foremost():
    prompts = [f"Tell me a fact about number {i}" for i in range(20)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, lead to results:
        print(f"Prompt: {prompt}nResponse: {result}n")
asyncio.run(foremost())

This enhanced version includes:

  • A custom APIError exception for API-related errors.
  • A generate_text_with_retry function decorated with @retry from the tenacity library, implementing exponential backoff.
  • Error handling within the process_prompt function to catch and report failures.

Optimizing Performance: Streaming Responses

For long-form content generation, streaming responses can significantly improve the perceived performance of your application. As a substitute of waiting for the complete response, you may process and display chunks of text as they turn into available.

import asyncio
from openai import AsyncOpenAI
async def stream_text(prompt, client):
    stream = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    full_response = ""
    async for chunk in stream:
        if chunk.decisions[0].delta.content will not be None:
            content = chunk.decisions[0].delta.content
            full_response += content
            print(content, end='', flush=True)
    
    print("n")
    return full_response
async def foremost():
    prompt = "Write a brief story a few time-traveling scientist."
    
    async with AsyncOpenAI() as client:
        result = await stream_text(prompt, client)
    
    print(f"Full response:n{result}")
asyncio.run(foremost())

This instance demonstrates the right way to stream the response from the API, printing each chunk because it arrives. This approach is especially useful for chat applications or any scenario where you desire to provide real-time feedback to the user.

Constructing Async Workflows with LangChain

For more complex LLM-powered applications, the LangChain framework provides a high-level abstraction that simplifies the means of chaining multiple LLM calls and integrating other tools. Let's take a look at an example of using LangChain with async capabilities:

This instance shows how LangChain could be used to create more complex workflows with streaming and asynchronous execution. The AsyncCallbackManager and StreamingStdOutCallbackHandler enable real-time streaming of the generated content.

import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
async def generate_story(topic):
    llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()]))
    prompt = PromptTemplate(
        input_variables=["topic"],
        template="Write a brief story about {topic}."
    )
    chain = LLMChain(llm=llm, prompt=prompt)
    return await chain.arun(topic=topic)
async def foremost():
    topics = ["a magical forest", "a futuristic city", "an underwater civilization"]
    tasks = [generate_story(topic) for topic in topics]
    stories = await asyncio.gather(*tasks)
    
    for topic, story in zip(topics, stories):
        print(f"nTopic: {topic}nStory: {story}n{'='*50}n")
asyncio.run(foremost())

Serving Async LLM Applications with FastAPI

To make your async LLM application available as an online service, FastAPI is an great alternative attributable to its native support for asynchronous operations. Here's an example of the right way to create an easy API endpoint for text generation:

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
class GenerationRequest(BaseModel):
    prompt: str
class GenerationResponse(BaseModel):
    generated_text: str
@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": request.prompt}]
    )
    generated_text = response.decisions[0].message.content
    
    # Simulate some post-processing within the background
    background_tasks.add_task(log_generation, request.prompt, generated_text)
    
    return GenerationResponse(generated_text=generated_text)
async def log_generation(prompt: str, generated_text: str):
    # Simulate logging or additional processing
    await asyncio.sleep(2)
    print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}")
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

This FastAPI application creates an endpoint /generate that accepts a prompt and returns generated text. It also demonstrates the right way to use background tasks for extra processing without blocking the response.

Best Practices and Common Pitfalls

As you're employed with async LLM APIs, keep these best practices in mind:

  1. Use connection pooling: When making multiple requests, reuse connections to cut back overhead.
  2. Implement proper error handling: At all times account for network issues, API errors, and unexpected responses.
  3. Respect rate limits: Use semaphores or other concurrency control mechanisms to avoid overwhelming the API.
  4. Monitor and log: Implement comprehensive logging to trace performance and discover issues.
  5. Use streaming for long-form content: It improves user experience and allows for early processing of partial results.

mm

I even have spent the past five years immersing myself within the fascinating world of Machine Learning and Deep Learning. My passion and expertise have led me to contribute to over 50 diverse software engineering projects, with a selected give attention to AI/ML. My ongoing curiosity has also drawn me toward Natural Language Processing, a field I'm desperate to explore further.

ASK DUKE

What are your thoughts on this topic?
Let us know in the comments below.

0 0 votes
Article Rating
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

0
Would love your thoughts, please comment.x
()
x