Constructing an Interactive UI for Llamaindex Workflows

-

When starting the workflow from the terminal, it is easy to see which step it’s executing and the logging we put in those steps.

Terminal log for the workflow execution (Screenshot by writer)

We may also enable the human-in-the-loop interaction by simply using user_feedback = input()within the workflow. It will pause the workflow and wait for the user input (See the human-in-the-loop example on this official Llamaindex notebook). Nonetheless, to have the opportunity to realize the identical functionality in a user-friendly interface, we’d like additional modifications to the unique workflow.

Workflow can take an extended time to execute, so for a greater user experience, Llamaindex provided a solution to send streaming events to point the progress of the workflow, as shown within the notebook here. In my workflow, I define a WorkflowStreamingEvent class to incorporate useful information in regards to the event message, corresponding to the kind of the event, and from which step it is distributed:

class WorkflowStreamingEvent(BaseModel):
event_type: Literal["server_message", "request_user_input"] = Field(
..., description="Sort of the event"
)
event_sender: str = Field(
..., description="Sender (workflow step name) of the event"
)
event_content: Dict[str, Any] = Field(..., description="Content of the event")

To enable sending streaming events, the workflow step must have access to the shared context, which is completed by adding @step(pass_context=True) decorator to the step definition. Then within the step definition, we will send event messages in regards to the progress through the context. For instance, within the tavily_query() step:

@step(pass_context=True)
async def tavily_query(self, ctx: Context, ev: StartEvent) -> TavilyResultsEvent:
ctx.data["research_topic"] = ev.user_query
query = f"arxiv papers in regards to the cutting-edge of {ev.user_query}"
ctx.write_event_to_stream(
Event(
msg=WorkflowStreamingEvent(
event_type="server_message",
event_sender=inspect.currentframe().f_code.co_name,
event_content={"message": f"Querying Tavily with: '{query}'"},
).model_dump()
)
)

In this instance, we set the event_type to be “server_message” . It implies that it’s an update message and no user motion is required. We now have one other kind of event "request_user_input" that indicates a user input is required. For instance, within the gather_feedback_outline() step within the workflow, after generating the slide text outlines from the unique paper summary, a message is distributed to prompt the user to offer approval and feedback on the outline text:

@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
"""Present user the unique paper summary and the outlines generated, gather feedback from user"""
...

# Send a special event indicating that user input is required
ctx.write_event_to_stream(
Event(
msg=json.dumps(
{
"event_type": "request_user_input",
"event_sender": inspect.currentframe().f_code.co_name,
"event_content": {
"summary": ev.summary,
"outline": ev.outline.dict(),
"message": "Do you approve this outline? If not, please provide feedback.",
},
}
)
)
)

...

These events are handled in another way within the backend API and the frontend logic, which I’ll describe intimately within the later sections of this text.

Workflow steps that requires user feedback (Image by writer)

When sending a "request_user_input" event to the user, we only need to proceed to the subsequent step after now we have received the user input. As shown within the workflow diagram above, it either proceeds to the outlines_with_layout()step if the user approves the outline, or to the summary2outline() step again if the user doesn’t approve.

That is achieved using the Future() object from Python’s asyncio library. Within the SlideGenerationWorkflow class, we set an attribute self.user_input_future = asyncio.Future() that might be waited on within the gather_feedback_outline() step. The next execution of the workflow is conditioned on the content of the user feedback:

@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
...

# Wait for user input
if not self.user_input_future.done():
user_response = await self.user_input_future
logger.info(f"gather_feedback_outline: Got user response: {user_response}")

# Process user_response, which ought to be a JSON string
try:
response_data = json.loads(user_response)
approval = response_data.get("approval", "").lower().strip()
feedback = response_data.get("feedback", "").strip()
except json.JSONDecodeError:
# Handle invalid JSON
logger.error("Invalid user response format")
raise Exception("Invalid user response format")

if approval == ":material/thumb_up:":
return OutlineOkEvent(summary=ev.summary, outline=ev.outline)
else:
return OutlineFeedbackEvent(
summary=ev.summary, outline=ev.outline, feedback=feedback
)

We arrange the backend using fastAPI, expose a POST endpoint to handle requests, and initiate the workflow run. The asynchronous function run_workflow_endpoint() takes ResearchTopic as input. Within the function, an asynchronous generator event_generator() is defined, which creates a task to run the workflow and streams the events to the client because the workflow progresses. When the workflow finishes, it can also stream the ultimate file results to the client.


class ResearchTopic(BaseModel):
query: str = Field(..., example="example query")

@app.post("/run-slide-gen")
async def run_workflow_endpoint(topic: ResearchTopic):
workflow_id = str(uuid.uuid4())

wf = SummaryAndSlideGenerationWorkflow(wid=workflow_id, timeout=2000, verbose=True)
wf.add_workflows(
summary_gen_wf=SummaryGenerationWorkflow(
wid=workflow_id, timeout=800, verbose=True
)
)
wf.add_workflows(
slide_gen_wf=SlideGenerationWorkflow(
wid=workflow_id, timeout=1200, verbose=True
)
)

async def event_generator():
loop = asyncio.get_running_loop()
logger.debug(f"event_generator: loop id {id(loop)}")
yield f"{json.dumps({'workflow_id': workflow_id})}nn"

task = asyncio.create_task(wf.run(user_query=topic.query))
logger.debug(f"event_generator: Created task {task}")
try:
async for ev in wf.stream_events():
logger.info(f"Sending message to frontend: {ev.msg}")
yield f"{ev.msg}nn"
await asyncio.sleep(0.1) # Small sleep to make sure proper chunking
final_result = await task

# Construct the download URL
download_pptx_url = f"http://backend:80/download_pptx/{workflow_id}"
download_pdf_url = f"http://backend:80/download_pdf/{workflow_id}"

final_result_with_url = {
"result": final_result,
"download_pptx_url": download_pptx_url,
"download_pdf_url": download_pdf_url,
}

yield f"{json.dumps({'final_result': final_result_with_url})}nn"
except Exception as e:
error_message = f"Error in workflow: {str(e)}"
logger.error(error_message)
yield f"{json.dumps({'event': 'error', 'message': error_message})}nn"
finally:
# Clean up
workflows.pop(workflow_id, None)

return StreamingResponse(event_generator(), media_type="text/event-stream")

Along with this endpoint, there are endpoints for receiving user input from the client and handling file download requests. Since each workflow is assigned a singular workflow ID, we will map the user input received from the client to the right workflow. By call the set_result() on the awaiting Future, the pending workflow can resume execution.

@app.post("/submit_user_input")
async def submit_user_input(data: dict = Body(...)):
workflow_id = data.get("workflow_id")
user_input = data.get("user_input")
wf = workflows.get(workflow_id)
if wf and wf.user_input_future:
loop = wf.user_input_future.get_loop() # Get the loop from the long run
logger.info(f"submit_user_input: wf.user_input_future loop id {id(loop)}")
if not wf.user_input_future.done():
loop.call_soon_threadsafe(wf.user_input_future.set_result, user_input)
logger.info("submit_user_input: set_result called")
else:
logger.info("submit_user_input: future already done")
return {"status": "input received"}
else:
raise HTTPException(
status_code=404, detail="Workflow not found or future not initialized"
)

The download endpoint also identifies where the ultimate file is positioned based on the workflow ID.

@app.get("/download_pptx/{workflow_id}")
async def download_pptx(workflow_id: str):
file_path = (
Path(settings.WORKFLOW_ARTIFACTS_PATH)
/ "SlideGenerationWorkflow"
/ workflow_id
/ "final.pptx"
)
if file_path.exists():
return FileResponse(
path=file_path,
media_type="application/vnd.openxmlformats-officedocument.presentationml.presentation",
filename=f"final.pptx",
)
else:
raise HTTPException(status_code=404, detail="File not found")

Within the frontend page, after the user submits the research topic through st.text_input(), a long-running process is began in a background thread in a brand new event loop for receiving the streamed events from the backend, without interfering with the remainder of the page:

def start_long_running_task(url, payload, message_queue, user_input_event):
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
get_stream_data(url, payload, message_queue, user_input_event)
)
loop.close()
except Exception as e:
message_queue.put(("error", f"Exception in background thread: {str(e)}"))

...

def principal():

...

with st.sidebar:
with st.form(key="slide_gen_form"):
query = st.text_input(
"Enter the subject of your research:",
)
submit_button = st.form_submit_button(label="Submit")

if submit_button:
# Reset the workflow_complete flag for a brand new workflow
st.session_state.workflow_complete = False
# Start the long-running task in a separate thread
if (
st.session_state.workflow_thread is None
or not st.session_state.workflow_thread.is_alive()
):
st.write("Starting the background thread...")

st.session_state.workflow_thread = threading.Thread(
goal=start_long_running_task,
args=(
"http://backend:80/run-slide-gen",
{"query": query},
st.session_state.message_queue,
st.session_state.user_input_event,
),
)
st.session_state.workflow_thread.start()
st.session_state.received_lines = []
else:
st.write("Background thread is already running.")

The event data streamed from the backend is fetched by httpx.AsyncClient and put right into a message queue for further processing. Different information is extracted depending on the event types. For event type “request_user_input”, the thread can be paused until the user input is provided.

async def fetch_streaming_data(url: str, payload: dict = None):
async with httpx.AsyncClient(timeout=1200.0) as client:
async with client.stream("POST", url=url, json=payload) as response:
async for line in response.aiter_lines():
if line:
yield line

async def get_stream_data(url, payload, message_queue, user_input_event):
# message_queue.put(("message", "Beginning to fetch streaming data..."))
data_json = None
async for data in fetch_streaming_data(url, payload):
if data:
try:
data_json = json.loads(data)
if "workflow_id" in data_json:
# Send workflow_id to principal thread
message_queue.put(("workflow_id", data_json["workflow_id"]))
proceed
elif "final_result" in data_json:
# Send final_result to principal thread
message_queue.put(("final_result", data_json["final_result"]))
proceed
event_type = data_json.get("event_type")
event_sender = data_json.get("event_sender")
event_content = data_json.get("event_content")
if event_type in ["request_user_input"]:
# Send the message to the principal thread
message_queue.put(("user_input_required", data_json))
# Wait until user input is provided
user_input_event.wait()
user_input_event.clear()
proceed
else:
# Send the road to the principal thread
message_queue.put(("message", format_workflow_info(data_json)))
except json.JSONDecodeError: # todo: is that this vital?
message_queue.put(("message", data))
if data_json and "final_result" in data_json or "final_result" in str(data):
break # Stop processing after receiving the outcome

We store the messages within the st.session_state and use a st.expander() to display and update these streamed data.

if st.session_state.received_lines:
with expander_placeholder.container():
# Create or update the expander with the newest truncated line
expander = st.expander(st.session_state.expander_label)
for line in st.session_state.received_lines:
expander.write(line)
expander.divider()

To make sure the UI stays responsive and displays the event messages after they are being processed in a background thread, we use a customed autorefresh component to refresh the page at a set interval:

if not st.session_state.workflow_complete:
st_autorefresh(interval=2000, limit=None, key="data_refresh")

When the streamed event is of type “request_user_input”, we’ll display related information in a separate container and gather user feedback. As there might be multiple events that require user input from one workflow run, we put them in a message queue and make certain to assign a singular key to the st.feedback(), st.text_area() and st.button() which can be linked to every event to make sure the widgets don’t interfere with one another:

def gather_outline_feedback(placeholder):
container = placeholder.container()
with container:
if st.session_state.user_input_required:
data = st.session_state.user_input_prompt
event_type = data.get("event_type")
if event_type == "request_user_input":
summary = data.get("event_content").get("summary")
outline = data.get("event_content").get("outline")
prompt_message = data.get("event_content").get(
"message", "Please review the outline."
)

# display the content for user input
st.markdown("## Original Summary:")
st.text_area("Summary", summary, disabled=True, height=400)
st.divider()
st.markdown("## Generated Slide Outline:")
st.json(outline)
st.write(prompt_message)

# Define unique keys for widgets
current_prompt = st.session_state.prompt_counter
approval_key = f"approval_state_{current_prompt}"
feedback_key = f"user_feedback_{current_prompt}"

# Display the approval feedback widget
approval = st.feedback("thumbs", key=approval_key)
st.write(f"Current Approval state is: {approval}")
logging.info(f"Current Approval state is: {approval}")

# Display the feedback text area
feedback = st.text_area(
"Please provide feedback if you've any:", key=feedback_key
)

# Handle the submission of user response
if st.button(
"Submit Feedback", key=f"submit_response_{current_prompt}"
):
if not st.session_state.user_response_submitted:
# Retrieve approval and feedback using unique keys
approval_state = st.session_state.get(approval_key)
user_feedback = st.session_state.get(feedback_key, "")

# Ensure approval_state is valid
if approval_state not in [0, 1]:
st.error("Please select an approval option.")
return

user_response = {
"approval": (
":material/thumb_down:"
if approval_state == 0
else ":material/thumb_up:"
),
"feedback": user_feedback,
}
# Send the user's response to the backend

try:
response = requests.post(
"http://backend:80/submit_user_input",
json={
"workflow_id": st.session_state.workflow_id,
"user_input": json.dumps(user_response),
},
)
response.raise_for_status()
logging.info(
f"Backend response for submitting approval: {response.status_code}"
)
except requests.RequestException as e:
st.error(f"Didn't submit user input: {str(e)}")
return

...

Ultimately, when the workflow run finally finishes, the frontend client will get a response that comprises the trail to the ultimate generated files (same slide deck in pdf format for rendering within the UI and pptx format for downloading because the outcome). We display the pdf file and create a button for downloading the pptx file:

  if "download_url_pdf" in st.session_state and st.session_state.download_url_pdf:
download_url_pdf = st.session_state.download_url_pdf
try:
# Fetch the PDF content
pdf_response = requests.get(download_url_pdf)
pdf_response.raise_for_status()
st.session_state.pdf_data = pdf_response.content

st.markdown("### Generated Slide Deck:")
# Display the PDF using an iframe
st.markdown(
f'',
unsafe_allow_html=True,
)
except Exception as e:
st.error(f"Didn't load the PDF file: {str(e)}")

# Provide the download button for PPTX if available
if (
"download_url_pptx" in st.session_state
and st.session_state.download_url_pptx
):
download_url_pptx = st.session_state.download_url_pptx
try:
# Fetch the PPTX content
pptx_response = requests.get(download_url_pptx)
pptx_response.raise_for_status()
pptx_data = pptx_response.content

st.download_button(
label="Download Generated PPTX",
data=pptx_data,
file_name="generated_slides.pptx",
mime="application/vnd.openxmlformats-officedocument.presentationml.presentation",
)
except Exception as e:
st.error(f"Didn't load the PPTX file: {str(e)}")

We’ll create a multi-service Docker application with docker-compose to run the frontend and backend apps.

version: '3.8'

services:
backend:
construct:
context: ./backend
args:
- --no-cache
ports:
- "8000:80"
networks:
- app-network
volumes:
- .env:/app/.env
- ./data:/app/data
- ./workflow_artifacts:/app/workflow_artifacts
- ~/.azure:/root/.azure

frontend:
construct:
context: ./frontend
args:
- --no-cache
ports:
- "8501:8501"
networks:
- app-network

networks:
app-network:

That’s it! Just run docker-compose up, and we now have an app that may run a research workflow based on the user’s input query, prompt the user for feedback in the course of the execution, and display the outcome to the user.

ASK ANA

What are your thoughts on this topic?
Let us know in the comments below.

0 0 votes
Article Rating
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

0
Would love your thoughts, please comment.x
()
x