Building Intelligent Agent Workflows with LlamaIndex: From Basics to Advanced Patterns

In this blog, we’ll explore how to design powerful and flexible multi-agent workflows using the llama_index framework. From basic sequential flows to advanced branching, loops, parallelism, and LLM-powered agents, you’ll learn how to model real-world software development pipelines as executable, traceable workflows.

Lets start with “Hello World” workflow

A very basic workflow class where it will receive the StartEvent and emits the StopEvent.

class AgentDocWorkflow(Workflow):
    @step
    async def my_step(self, ev: StartEvent) -> StopEvent:
        """
        This is a sample step in the workflow.
        """
        return StopEvent(result="Hello World")

Instantiate and run it. Now instantiate the workflow and wait for the function to complete using await keyword.

basic_workflow = AgentDocWorkflow(timeout=10, verbose=False)
result = await basic_workflow.run()
print(result)

Sequential Multi-Step Workflow

Let’s build a linear sequence: developer → tester → deployer.

class DeveloperWorkflow(Workflow):
    @step
    async def developer(self, ev: StartEvent) -> Code:
        return Code(code_output="Code completed successfully")

    @step
    async def tester(self, ev: Code) -> Test:
        return Test(test_output="Test completed successfully")

    @step
    async def deployer(self, ev: Test) -> StopEvent:
        return StopEvent(result="Deployment completed successfully")

Visualization:

Iterative Feedback Loop (e.g., Code Review Cycles)

Create the event classes to accept and emit by the functions

class AgentLoopWorkflow(Workflow):
    def __init__(self,timeout=10, verbose=False):
        super().__init__()  # call parent class constructor
        self.timeout = timeout
        self.verbose = verbose
        self.iteration_flag = 0


    @step
    async def developer(self, ev: StartEvent | Review) -> Code:
             ..........

            return Code(code_output="Code changed as per review comments ")
            

    @step
    async def reviewer(self, ev: Code ) -> Review | Test:
        
        if self.iteration_flag == 1:
            print(ev.code_output)
            return Review(review_output="Address the review comments ")   
        else:
            print(ev.code_output)
            return Test(test_output="Review completed successfully")
        

    @step
    async def tester(self, ev: Test) -> StopEvent:
        ........................
        return StopEvent(result="Test completed successfully")

Don’t get perplexed by the interation_flag, it just introduced ti create the review loop.

Depending on the iteration, the loop either continues with feedback or progresses to testing.

Visualization:

Branching Workflows: Parallel Architect Paths

What if your process starts with either a software architect or test architect?

import random
class AgentBranchWorkflow(Workflow):

    @step
    async def start(self, ev: StartEvent) -> SoftwareArchitect | TestArchitect:

        if condtion:
            return SoftwareArchitect(design_output="Software architecture designed")
        else:
            return TestArchitect(test_plan="Test plan created")

    @step
    async def software_architect(self, ev: SoftwareArchitect) -> Developer:
        return Developer(code_output="Get the Software architecture")

    @step
    async def developer(self, ev: Developer) -> StopEvent:
        return StopEvent(result="Code developed based on architecture")

    @step
    async def test_architect(self, ev: TestArchitect) -> Tester:
        return Tester(test_result="Get the test plan")

    @step
    async def tester(self, ev: Tester) -> StopEvent:
        return StopEvent(result="Testing completed successfully")

Visualization:

Parallel Processing with Context.send_event

For concurrent thread-like execution:

import asyncio

class Thread(Event):
    query : str

class ParallelWorkflow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> Thread:
        ctx.send_event(Thread(query="Query for parallel processing 1"))
        ctx.send_event(Thread(query="Query for parallel processing 2"))
        ctx.send_event(Thread(query="Query for parallel processing 3"))

Workers process them concurrently


    @step(num_workers=4)
    async def process_thread(self, ctx: Context, ev: Thread) -> StopEvent:
        await asyncio.sleep(random.randint(1,5))  # Simulate some processing time
        return StopEvent(result=f"Processed thread with query: {ev.query}")

Visualization:

The same can be augmented via ctx.collect to wait for all the events to be received and then ending the workflow.

    @step
    async def collect_results(self, ctx: Context, ev: CollectorThread) -> StopEvent:
        #wait for all events to be collected
        result = ctx.collect_events(ev, [CollectorThread] * 3)
        if result is None:
            print("Not all events collected yet")
            return None
        
        print(result)
        return StopEvent(result="Done")

The collection of parallel output guarantees that the workflow only proceeds once all threads complete.

Concurrent Workflows with Multiple Event Types

Use case: development, testing, and certification occur independently but must finish before delivery.

class ConcurrentWorkflow_DiffEventTypes(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> Development | Testing| Certification:
ctx.send_event(Development(query="Query for development"))
ctx.send_event(Testing(query="Query for testing"))
ctx.send_event(Certification(query="Query for certification"))

@step
async def process_development(self, ctx: Context, ev: Development) -> DevelopmentComplete:
return DevelopmentComplete(result=ev.query)

@step
async def process_testing(self, ctx: Context, ev: Testing) -> TestingComplete:
return TestingComplete(result=ev.query)

@step
async def process_certification(self, ctx: Context, ev: Certification) -> CertificationComplete:
return CertificationComplete(result=ev.query)


@step
async def Event_Collector(
self,
ctx: Context,
ev: DevelopmentComplete | TestingComplete | CertificationComplete
) -> StopEvent:

events = ctx.collect_events(ev, [CertificationComplete, TestingComplete, DevelopmentComplete])
if events is None:
print("Not all events collected yet")
return None

print("All events collected:", events)
return StopEvent(result="Done")

This showcases dependency resolution across heterogeneous paths and ensure guaranteed and definitive event collection flow based on the sequence described in collect_events irrespective of emission sequence.

Visualization:

OpenAI Integration: LLM Inside a Workflow

The provided code defines a three-step asynchronous workflow using the llama_index framework, where each step represents a stage in a process.

It begins by signaling progress with a ProgressEvent, then uses OpenAI’s gpt-4o-mini model to stream a response about the Taj Mahal, emitting each token as a TextEvent in real-time.

Finally, it concludes the workflow with a completion message. The workflow supports event streaming, allowing live feedback and progress tracking, making it suitable for interactive or UI-driven applications involving LLMs.

It also helps the user to that LLM is working by giving intermediate response thereby elevating user experience.

class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
        llm = OpenAI(model="gpt-4o-mini", api_key=api_key) 
        generator = await llm.astream_complete(
            "Please give me the first 50 words about Taj Mahal, a monument in India."  # Example prompt
        )
        async for response in generator:
            ctx.write_event_to_stream(TextEvent(delta=response.delta))
        return SecondEvent(
            second_output="Second step complete, full response attached",
            response=str(response),
        )

    @step
    async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
        return StopEvent(result="Workflow complete.")
workflow = MyWorkflow(timeout=30, verbose=False)
handler = workflow.run(first_input="Start the workflow.")

async for ev in handler.stream_events():
    if isinstance(ev, ProgressEvent):
        print(ev.msg)
    if isinstance(ev, TextEvent):
        print(ev.delta, end="")

final_result = await handler
print("Final result = ", final_result)

Output:

Step one is happening
The Taj Mahal, located in Agra, India, is an iconic mausoleum built by Mughal Emperor Shah Jahan in memory of his beloved wife, Mumtaz Mahal. Completed in 1653, it showcases exquisite white marble architecture, intricate carvings, and beautiful gardens, symbolizing love and devotion. It is a UNESCO World Heritage Site.Step three is happening
Final result =  Workflow complete.

This simulates a live LLM-driven response stream during the workflow.

Conclusion

Workflows built with llama_index offer modular, async, and declarative structures to model almost any multi-agent process. Whether you’re creating a dev-test-deploy pipeline or orchestrating LLM interactions, this approach gives you full control and visibility into every step.

Stay tuned for future deep dives on memory integration, agent coordination, and RAG-enhanced workflows!

Links

To get code, check the below jupyter notebook

https://github.com/juggarnautss/Event_Driven_Agent_Doc_Workflow/blob/main/agent_doc_workflow.ipynb: Building Intelligent Agent Workflows with LlamaIndex: From Basics to Advanced Patterns

Author Profile

Sirin Shaikh
Sirin ShaikhAI | Amplifying Impact
Talks about AI | GenAI | Machine Learning | Cloud | Kubernetes

Leave a Comment