Skip to content

DAG Pipeline API

Two ways to run DAG pipelines:

  • CLI (li o flow): the orchestrator plans the DAG automatically from a prompt
  • Python API (Builder + Session.flow()): you construct the DAG explicitly

This page covers the Python API path.

Quick example

import asyncio
import lionagi as li
from lionagi import Builder

async def main():
    session = li.Session()
    builder = Builder()

    n1 = builder.add_operation(
        "communicate",
        instruction="Research quantum error correction techniques.",
    )
    n2 = builder.add_operation(
        "communicate",
        instruction="Write an executive summary of the research.",
        depends_on=[n1],
    )

    results = await session.flow(builder.get_graph(), parallel=True)
    print(results[n2])

asyncio.run(main())
# output:
Quantum error correction uses...
Run completed: 2 nodes, 1 parallel wave

Builder (OperationGraphBuilder)

from lionagi import Builder

builder = Builder(name="MyGraph")

add_operation()

node_id = builder.add_operation(
    operation="communicate",
    node_id=None,              # optional reference label
    depends_on=None,           # list of node_ids this depends on
    inherit_context=False,     # inherit conversation history from dependency
    branch=None,               # assign to a specific Branch
    instruction="...",         # passed to branch.operate() or branch.communicate()
    **parameters,              # any Branch operation kwargs
)

Returns: str — node ID used in depends_on lists.

operation must be the name of a Branch method: "communicate", "operate", "ReAct", "parse", etc.

add_aggregation()

agg_id = builder.add_aggregation(
    operation="communicate",
    source_node_ids=[n1, n2, n3],  # defaults to current graph heads
    inherit_context=False,
    inherit_from_source=0,
    instruction="Synthesize the above findings.",
)

Adds a node that depends on multiple sources — useful for fan-in synthesis.

expand_from_result()

new_ids = builder.expand_from_result(
    items=results[n1],          # list of items from a prior execution result
    source_node_id=n1,
    operation="communicate",
    strategy=ExpansionStrategy.CONCURRENT,
    inherit_context=False,
    instruction="Analyze this item: {item}",
)

Expands the graph dynamically after partial execution — useful for iterative pipelines.

ExpansionStrategy values:

Value Behavior
CONCURRENT All expanded nodes run in parallel
SEQUENTIAL Expanded nodes chain one after another
SEQUENTIAL_CONCURRENT_CHUNK Sequential chunks, concurrent within each chunk
CONCURRENT_SEQUENTIAL_CHUNK Concurrent chunks, sequential within each chunk

Other builder methods

# add a conditional branch structure
ids = builder.add_conditional_branch(
    condition_check_op="communicate",
    true_op="communicate",
    false_op="communicate",
    instruction="Is this claim factual? Answer YES or NO.",
)
# ids: {"check": id, "true": id, "false": id}

# mark nodes as already executed (for incremental builds)
builder.mark_executed([n1, n2])

# get unexecuted nodes
pending = builder.get_unexecuted_nodes()

# get node by reference label
node = builder.get_node_by_reference("my_label")

# inspect graph state
state = builder.visualize_state()
# {"total_nodes": 4, "executed": 2, "pending": 2, ...}

# get the Graph object for session.flow()
graph = builder.get_graph()

Execution via Session.flow()

results = await session.flow(
    graph,
    context={"domain": "finance"},  # injected into all nodes
    parallel=True,
    max_concurrent=5,
    verbose=True,
)

See session.md#flow for full parameter reference.

Parallel execution semantics

Nodes without depends_on run concurrently (up to max_concurrent). Nodes with depends_on wait for all dependencies to complete. Nodes sharing a branch= reference run sequentially within that branch's message history — this lets one agent accumulate context across multiple DAG nodes.

branch_a = session.new_branch(name="analyst")

n1 = builder.add_operation("communicate", branch=branch_a, instruction="Step 1")
n2 = builder.add_operation("communicate", branch=branch_a, instruction="Step 2", depends_on=[n1])
n3 = builder.add_operation("communicate", branch=branch_a, instruction="Step 3", depends_on=[n2])
# n1 → n2 → n3 run sequentially on branch_a, accumulating history

CLI flow vs Python builder

Aspect li o flow Python Builder
DAG construction LLM plans it You define explicitly
Flexibility High (natural language) Total (programmatic)
Re-planning Built-in (control nodes) Manual via expand_from_result
Typing FlowPlan schema Operation objects
Best for Ad-hoc orchestration Application embedding

Full example: fan-out + synthesis

import asyncio
import lionagi as li
from lionagi import Builder

async def analyze(topic: str) -> str:
    session = li.Session()
    builder = Builder()

    aspects = ["technical feasibility", "market impact", "regulatory risk"]
    worker_ids = []
    for aspect in aspects:
        nid = builder.add_operation(
            "communicate",
            instruction=f"Analyze the {aspect} of: {topic}",
        )
        worker_ids.append(nid)

    synthesis_id = builder.add_aggregation(
        operation="communicate",
        source_node_ids=worker_ids,
        instruction="Synthesize these three analyses into an executive brief.",
    )

    results = await session.flow(builder.get_graph(), parallel=True, max_concurrent=3)
    return results[synthesis_id]

asyncio.run(analyze("open-source LLM deployment in regulated industries"))

Note context accumulation

Flow operations accumulate cross-node context in a Note object passed through session.flow(). Each operation can read from and write to this shared note.

  • deep_update() merges nested dicts across operations — keys are merged recursively.
  • List values are replaced (last writer wins), not concatenated.
  • Note is available as results.context after session.flow() completes.
results = await session.flow(builder.get_graph(), parallel=True)
accumulated = results.context  # Note object with merged state

→ Note API: note.md

Next: Team messaging — inter-branch messaging