Skip to content

Session

class Session(Node, Relational)

Manages multiple Branch instances with inter-branch messaging and DAG execution.

Constructor

session = Session(
    branches=None,
    exchange=None,
    default_branch=None,
    name="Session",
    user=None,
)
Param Type Default Notes
branches Pile[Branch] \| None None Pre-existing branches
exchange Exchange \| None None Custom exchange instance
default_branch Branch \| None None Default for delegated operations
name str "Session" Human label
user SenderRecipient \| None None Session owner identity

Branch management

new_branch()

branch = session.new_branch(
    system="You are a research assistant.",
    name="researcher",
    imodel=li.iModel(model="gpt-4o"),
    tools=[search_fn],
    as_default_branch=True,
)
Param Type Default Notes
system System \| JsonValue None System prompt
user SenderRecipient None Branch owner
name str \| None None Human label
imodel iModel \| None None Chat model (deprecated alias for chat_model)
messages Pile[RoledMessage] None Restore prior history
tools list[Callable] None Pre-registered tools
as_default_branch bool False Set as session default

Returns: Branch

get_branch()

branch = session.get_branch(branch_id)
branch = session.get_branch("researcher")  # by name

Returns: Branch. Raises KeyError if not found and no default given.

Other branch operations

session.include_branches([b1, b2])          # add existing branches
session.remove_branch(branch_id)            # remove (keeps object)
session.remove_branch(branch_id, delete=True)  # remove + delete
session.change_default_branch(branch_id)    # swap default
b2 = session.split(branch_id)              # sync clone with same history
b2 = await session.asplit(branch_id)       # async clone

DAG execution

flow()

from lionagi import Builder

builder = Builder()
n1 = builder.add_operation("communicate", instruction="Research X")
n2 = builder.add_operation("communicate", instruction="Summarize", depends_on=[n1])
graph = builder.get_graph()

results = await session.flow(
    graph,
    context={"topic": "quantum computing"},
    parallel=True,
    max_concurrent=5,
)
# results: dict[node_id, Any]
Param Type Default Notes
graph Graph required Built via OperationGraphBuilder
context dict \| None None Shared context injected into all nodes
parallel bool True Run dependency-free nodes concurrently
max_concurrent int 5 Concurrency cap
verbose bool False Print progress
default_branch Branch \| ID.Ref \| None None Branch for unassigned nodes

Returns: dict[str, Any] — keyed by node ID.

For building graph, see flow.md.

Message exchange

For inter-branch messaging, see team.md.

Brief reference:

# queue a message
session.send(sender_id, recipient_id, "analysis complete")

# route all pending messages
await session.sync()

# read messages for a branch
msgs = session.receive(branch_id)

Data aggregation

# combined DataFrame of all branches
df = session.to_df()

# messages from specific branches only
df = session.to_df(branches=[b1.ln_id, b2.ln_id])

# merged Pile
pile = session.concat_messages()

Example: two-branch research + synthesis

import asyncio
import lionagi as li

async def main():
    session = li.Session()
    researcher = session.new_branch(
        system="You are a research specialist.",
        name="researcher",
        imodel=li.iModel(model="gpt-4o"),
    )
    writer = session.new_branch(
        system="You are a technical writer.",
        name="writer",
        imodel=li.iModel(model="gpt-4o"),
    )

    findings = await researcher.communicate("Summarize key advances in RAG architectures.")

    session.send(researcher.ln_id, writer.ln_id, findings)
    await session.sync()

    msgs = session.receive(writer.ln_id)
    report = await writer.communicate(
        f"Write a user-friendly guide based on: {msgs[0].content}"
    )
    print(report)

asyncio.run(main())

Next: DAG pipeline API — build typed DAG pipelines