Javascript Fatigue: HTMX Is All You Have to Construct ChatGPT — Part 2

-

1, we showed how we could leverage HTMX so as to add interactivity to our HTML elements. In other words, Javascript without Javascript. As an example that, we began constructing a straightforward chat that might return a simulated LLM response. In this text, we are going to extend the capabilities of our chatbot and add several features, amongst which streaming, which is a major enhancement when it comes to user experience in comparison with the synchronous chat built previously.

  • ✅ Real-time streaming with SSE
  • ✅ Session-based architecture for multiple users
  • ✅ Async coordination with asyncio.Queue
  • ✅ Clean HTMX patterns with dedicated SSE handling
  • ✅ A Google Search Agent to reply queries with fresh data
  • ✅ Almost Zero JavaScript

Here’s what we are going to construct today:

From sync communication to async

What we built previously leveraged very basic web functionalities leveraging forms. Our communication was synchronous, meaning we don’t get anything until the server is completed. We issue a request, we wait for the total response, and we display it. Between the 2, we just…wait.

But modern chatbots work in another way, by providing asynchronous communication capabilities. This is completed using streaming: we get updates and partial responses as a substitute of waiting for the total response. This is especially helpful when the response process takes time, which is usually the case for LLMs when the reply is long.

SSE vs Websockets

SSE (Server-sent Events) and Websockets are two real-time data exchanges protocols between a client and a server.

Websockets allows for full-duplex connections: this implies the browser and the server can each send and receive data concurrently. This is usually utilized in online gaming, chat applications, and collaborative tools (think Google Sheets).

SSE is unidirectional and only allows a one-way conversation, from server to client. Which means that the client cannot send anything to the server via this protocol. If websockets is a two-way phone conversation where people can speak and listen at the identical time, SSE is like listening to the radio. SSE are typically used to send notification, update charts in finance applications, or newsfeeds.

So why do we elect SSE? Well because in our use case we don’t need full duplex, and that easy HTTP (which will not be how Websockets work) is enough for our use case: we send data, we receive data. SSE just means that we’ll receive data in a stream, nothing more is required.

What we wish to do

  1. User inputs a question
  2. Server receives the query and sends it to the LLM
  3. LLM starts producing content
  4. For each bit of content, the server returns it immediately
  5. Browser adds this piece of data to the DOM

We’ll separate our work into backend and frontend sections.

Backend

The backend will proceed in 2 steps:

  • A POST endpoint that may receive the message, and return nothing
  • A GET endpoint that may read a queue and produce an output stream.

In our demo, to start with, we are going to create a fake LLM response by repeating the user input, meaning that the words of the stream shall be the exact same because the user input.

To maintain things clean, we’d like to separate the message streams (the queues) by user session, otherwise we’d find yourself mixing up conversations. We’ll due to this fact create a session dictionary to host our queues.

Next, we’d like to inform the backend to attend before the queue is filled before streaming our response. If we don’t, we are going to encounter concurrency run or timing issues: SSE starts on client side, queue is empty, SSE closes, user inputs a message but…it’s too late!

The answer: async queues! Using asynchronous queues has several benefits:

  • If queue has data: Returns immediately
  • If queue is empty: Suspends execution until queue.put() known as
  • Multiple consumers: Each gets their very own data
  • Thread-safe: No race conditions

I do know you might be burning to know more, so here is the code below:

from fastapi import FastAPI, Request, Form
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, StreamingResponse
import asyncio
import time
import uuid

app = FastAPI()
templates = Jinja2Templates("templates")

# This object will store session id and their corresponding value, an async queue.
sessions = dict()

@app.get("/")
async def root(request: Request):
    session_id = str(uuid.uuid4())
    sessions[session_id] = asyncio.Queue()
    return templates.TemplateResponse(request, "index.html", context={"session_id": session_id})


@app.post("/chat")
async def chat(request: Request, query: str=Form(...), session_id: str=Form(...)):
    """ Send message to session-based queue """

    # Create the session if it doesn't exist
    if session_id not in sessions:
        sessions[session_id] = asyncio.Queue()

    # Put the message within the queue
    await sessions[session_id].put(query)

    return {"status": "queued", "session_id": session_id}


@app.get("/stream/{session_id}")
async def stream(session_id: str):
    
    async def response_stream():

        if session_id not in sessions:
            print(f"Session {session_id} not found!")
            return

        queue = sessions[session_id]

        # This BLOCKS until data arrives
        print(f"Waiting for message in session {session_id}")
        data = await queue.get()
        print(f"Got message: {data}")

        message = ""
        await asyncio.sleep(1)
        for token in data.replace("n", " ").split(" "):
            message += token + " "
            data = f"""data: 
  • AI

    {message}

  • nn""" yield data await asyncio.sleep(0.03) queue.task_done() return StreamingResponse(response_stream(), media_type="text/event-stream")

    Let’s explain a few key concepts here.

    Session isolation

    It will be important that every users gets their very own message queue, in order not to combine up conversations. The method to try this is by utilizing the sessions dictionary. In real production apps, we’d probably use Redis to store that. Within the code below, we see that a brand new session id is created on page load, and stored within the sessions dictionary. Reloading the page will start a brand new session, we will not be persisting the message queues but we could via a database for instance. This topic is roofed partially 3.

    # This object will store session id and their corresponding value, an async queue.
    sessions = dict()
    
    @app.get("/")
    async def root(request: Request):
        session_id = str(uuid.uuid4())
        sessions[session_id] = asyncio.Queue()
        return templates.TemplateResponse(request, "index.html", context={"session_id": session_id})
    

    Blocking coordination

    We want to manage the order wherein SSE are sent and the user query is received. The order is, on the backend side:

    1. Receive user message
    2. Create a message queue and populate it
    3. Send messages from the queue in a Streaming Response

    Failure to achieve this may result in unwanted behavior, ie. first reading the (empty) message queue, then populating it with the user’s query.

    The answer to manage the order is to make use of asyncio.Queue. This object shall be used twice:

    • Once we insert recent messages within the queue. Inserting messages will “get up” the polling within the SSE endpoint
    await sessions[session_id].put(query)
    • Once we pull messages from the queue. On this line, the code is blocked until a signal from the queue arrives saying “hey, i’ve recent data!”:
    data = await queue.get()

    This pattern offers several benefits:

    • Each user has its own queue
    • There isn’t any risk of race conditions

    Streaming simulation

    In this text, we are going to simulate a LLM response by splitting the user’s query in words and return those words one after the other. Partly 3, we are going to actually plug an actual LLM to that.

    The streaming is handled via the StreamingResponse object from FastAPI. This object expects an asynchronous generator that may yield data until the generator is over. We’ve got to make use of the yield keyword as a substitute of the return keyword, otherwise our generator would just stop after the primary iteration.

    Let’s decompose our streaming function:

    First, we’d like to make sure we have now a queue for the present session from which we are going to pull messages:

    if session_id not in sessions:
        print(f"Session {session_id} not found!")
        return
    
    queue = sessions[session_id]

    Next, once we have now the queue, we are going to pull messages from the queue if it comprises any, otherwise the code pauses and waits for messages to reach. That is crucial a part of our function:

    # This BLOCKS until data arrives
    print(f"Waiting for message in session {session_id}")
    data = await queue.get()
    print(f"Got message: {data}")

    To simulate stream, we are going to now chunk the message in words (called tokens here), and add a while sleeps to simulate the text generation process from a LLM (the asyncio.sleep parts). Notice how the information we is definitely HTML strings, encapsulated in a string starting with “data:”. That is how SSE messages are sent. It’s also possible to decide to flag your messages with the “event:” metadata. An example can be:

    event: my_custom_event
    data: 

    Content to swap into your HTML page.

    Let’s see how we implement it in Python (for the purists, use Jinja templates to render the HTML as a substitute of a string:) ):

    message = ""
    
    # First pause to let the browser display "Pondering when the message is shipped"
    await asyncio.sleep(1)
    
    # Simulate streaming by splitting message in words
    for token in data.replace("n", " ").split(" "):
    
        # We append tokens to the message
        message += token + " "
    
        # We wrap the message in HTML tags with the "data" metadata
        data = f"""data: 
  • AI

    {message}

  • nn""" yield data # Pause to simulate the LLM generation process await asyncio.sleep(0.03) queue.task_done()

    Frontend

    Our frontend has 2 jobs: send user queries to the backend, and listen for SSE message on a selected channel (the session_id). To try this, we apply an idea called “Separation of concepts”, meaning each HTMX element is liable for a single job only.

    • the shape sends a user input
    • the sse listener handles the streaming
    • the ul chat displays the message

    To send messages, we are going to use a normal textarea input in a form. The HTMX magic is slightly below:

    
    

    For those who remember the article from part 1, we have now several HTMX attributes which deserve explanations:

    • hx-post: The endpoint the shape data shall be submitted.
    • hx-swap: Set to none, because in our case the endpoint doesn’t return any data.
    • hx-trigger: Specifies which event will trigger the request
    • hx-on::before-request: A really light part with javascript so as to add some snappiness to the app. We’ll append the user’s request to the list within the chat, and display a “Pondering” message to the user while we’re waiting for the SSE messages to stream. That is nicer that having to stare at a blank page.

    It’s value nothing that we actually send 2 parameters to the backend: the user’s input and the session id. This fashion, the message shall be inserted in the best queue on the backend side.

    Then, we define one other component that’s specifically dedicated to listening to SSE messages.

    
    
    
    
    

    li:last-child" style="display: none;" >

    This component will take heed to the /stream endpoint and pass its session id to listen for messages for this session only. The hx-target tells the browser so as to add the information to the last li element of the chat. The hx-swap specifies that the information is definitely meant to exchange all the current li element. That is how our streaming effect will work: replacing current message with the newest one.

    Note: other methods might have been used to exchange specific elements of the DOM, reminiscent of out-of-band (OOB) swaps. They work slightly bit in another way since they require a selected id to search for within the DOM. In our case, we selected on purpose to not assign ids to every written list elements

    A Real Chatbot using Google Agent Development Kit

    Now could be the time to exchange our dummy streaming endpoint with an actual LLM. To realize that, we are going to construct an agent using Google ADK, equipped with tools and memory to fetch information and remember conversation details.

    A really short introduction to agents

    You almost certainly already know what a LLM is, at the very least I assume you do. The essential drawback of LLMs as of today is that LLMs alone cannot access real time information: their knowledge is frozen for the time being they were trained. The opposite drawback is their inability to access information that’s outside their training scope (eg, your organization’s internal data),

    Agents are a sort of AI applications that may reason, act and observe. The reasoning part is handled by the LLM, the “brain”. The “hands” of the agents are what we call “tools”, and might take several forms:

    • a Python function, for instance to fetch an API
    • a MCP server, which is a normal that enables agents to connect with APIs through a standardized interface (eg accessing all of the Gsuite tools without having to write down yourself the API connectors)
    • other agents (in that case, this pattern known as agent delegation were a router or master agents controls different sub-agents)

    In our demo, to make things quite simple, we are going to use a quite simple agent that may use one tool: Google Search. This can allow us to get fresh information and ensure it’s reliable (at the very least we hope that the Google Search results are…)

    Within the Google ADK world, agents need basic information:

    • name and outline, for documentation purposes mostly
    • instructions: the prompt that defines the behavior of the agent (tools use, output format, steps to follow, etc)
    • tools: the functions / MCP servers / agents the agent can use to satisfy its objective

    There are also other concepts around memory and session management, but which are out of scope.

    Without further ado, let’s define our agent!

    A Streaming Google Search Agent

    from google.adk.agents import Agent
    from google.adk.agents.run_config import RunConfig, StreamingMode
    from google.adk.runners import Runner
    from google.adk.sessions import InMemorySessionService
    from google.genai import types
    from google.adk.tools import google_search
    
    # Define constants for the agent
    APP_NAME = "default"  # Application
    USER_ID = "default"  # User
    SESSION = "default"  # Session
    MODEL_NAME = "gemini-2.5-flash-lite"
    
    # Step 1: Create the LLM Agent
    root_agent = Agent(
        model=MODEL_NAME,
        name="text_chat_bot",
        description="A text chatbot",
        instruction="You might be a helpful assistant. Your goal is to reply questions based in your knowledge. Use your Google Search tool to offer the newest and most accurate information",
        tools=[google_search]
    )
    
    # Step 2: Arrange Session Management
    # InMemorySessionService stores conversations in RAM (temporary)
    session_service = InMemorySessionService()
    
    # Step 3: Create the Runner
    runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)

    The `Runner` object acts because the orchestrator between you and the agent.

    Next, we (re)define our `/stream` endpoint. We first check the session for the agent exists, otherwise we create it:

            # Try and create a brand new session or retrieve an existing one
            try:
                session = await session_service.create_session(
                    app_name=APP_NAME, user_id=USER_ID, session_id=session_id
                )
            except:
                session = await session_service.get_session(
                    app_name=APP_NAME, user_id=USER_ID, session_id=session_id
                )

    Then, we take the user query, pass it to the agent in an async fashion to get a stream back:

            # Convert the query string to the ADK Content format
            query = types.Content(role="user", parts=[types.Part(text=query)])
    
            # Stream the agent's response asynchronously
            async for event in runner.run_async(
                user_id=USER_ID, session_id=session.id, new_message=query, run_config=RunConfig(streaming_mode=StreamingMode.SSE)
            ):
    

    There’s a subtlety next. When generating a response, the agent might output a double linebreak “nn”. That is problematic because SSE events end with this symbol. Having a double linebreak in your string due to this fact means:

    • your current message shall be truncated
    • your next message shall be incorrectly formatted and the SSE stream will stop

    You’ll be able to try it by yourself. To repair this, we are going to use slightly hack, together with one other little hack to format list elements (I exploit Tailwind CSS which overrides certain CSS rules). The hack is:

                if event.partial:
                    message += event.content.parts[0].text
                  
                    # Hack here
                    html_content = markdown.markdown(message, extensions=['fenced_code']).replace("n", "
    ").replace("
  • ", "
  • ").replace("
      ", "
        ") full_html = f"""data:
      • AI

        {html_content}

      • nn""" yield full_html
  • This fashion, we be certain that no double linebreaks will break our SSE stream.

    Full code for the route is below:

    @app.get("/stream/{session_id}")
    async def stream(session_id: str):
    
        async def response_stream():
    
            if session_id not in sessions:
                print(f"Session {session_id} not found!")
                return
    
            # Try and create a brand new session or retrieve an existing one
            try:
                session = await session_service.create_session(
                    app_name=APP_NAME, user_id=USER_ID, session_id=session_id
                )
            except:
                session = await session_service.get_session(
                    app_name=APP_NAME, user_id=USER_ID, session_id=session_id
                )
    
            queue = sessions[session_id]
    
            # This BLOCKS until data arrives
            print(f"Waiting for message in session {session_id}")
            query = await queue.get()
            print(f"Got message: {query}")
    
            message = ""
    
            # Convert the query string to the ADK Content format
            query = types.Content(role="user", parts=[types.Part(text=query)])
    
            # Stream the agent's response asynchronously
            async for event in runner.run_async(
                user_id=USER_ID, session_id=session.id, new_message=query, run_config=RunConfig(streaming_mode=StreamingMode.SSE)
            ):
                if event.partial:
                    message += event.content.parts[0].text
    
                    html_content = markdown.markdown(message, extensions=['fenced_code']).replace("n", "
    ").replace("
  • ", "
  • ").replace("
      ", "
        ") full_html = f"""data:
      • AI

        {html_content}

      • nn""" yield full_html queue.task_done() return StreamingResponse(response_stream(), media_type="text/event-stream")
  • And that’s it! You’ll have the ability to converse together with your chat!

    I add below slightly CSS snippet to format code blocks. Indeed, when you ask your chat to provide code snippets, you wish it properly formatted. Here is the HTML:

    pre, code {
          background-color: black;
          color: lightgrey;
          padding: 1%;
          border-radius: 10px;
          white-space: pre-wrap;
          font-size: 0.8rem;
          letter-spacing: -1px;
        }

    You’ll be able to now also generate code snippets:

    Mind = blown

    Workflow recap

    With less that 200 LoC, we were able to write down a chat with the next worflow, stream a response from the server and display it very nicely by fiddling with SSE and HTMX.

    User types "Hello World" → Submit
    ├── 1. Add "Me: Hello World" to speak
    ├── 2. Add "AI: Pondering..." to speak  
    ├── 3. POST /chat with message
    ├── 4. Server queues message
    ├── 5. SSE stream produces a LLM response based on the query
    ├── 6. Stream "AI: This" (replaces "Pondering...")
    ├── 7. Stream "AI: That is the reply ..."
    └── 8. Complete
    

    Conclusion

    On this series of articles, we showed how easy it may very well be to develop a chatbot app with almost no javascript and no heavy JS framework, just by utilizing Python and HTML. We covered topics reminiscent of Server-side rendering, Server-sent Events (SSE), async streaming, agents, with the assistance of a magical library, HTMX.

    The essential purpose of those articles was to point out that web applications will not be inaccessible to non-Javascript developers ! There’s actually a really strong and valid reason not to make use of Javascript everytime for web development, and although Javascript is a strong language, my feeling today is that it is usually overused instead of simpler, yet robust approaches. The server-side vs client-side applications debate is long-standing and never over yet, but I hope these articles were an eye-opener to a few of you, and that it will definitely taught you something.

    Stay tuned!

    ASK ANA

    What are your thoughts on this topic?
    Let us know in the comments below.

    0 0 votes
    Article Rating
    guest
    0 Comments
    Oldest
    Newest Most Voted
    Inline Feedbacks
    View all comments

    Share this article

    Recent posts

    0
    Would love your thoughts, please comment.x
    ()
    x