Skip to content

Advanced Usage

HookRegistry

Attach callbacks that fire before or after every model invocation on an iModel.

from lionagi.service.hooks import HookRegistry, HookEventTypes
import lionagi as li

async def log_pre(event, **kw):
    print(f"→ invoking: {type(event).__name__}")

async def log_post(event, **kw):
    print(f"← done: {type(event).__name__}")

hooks = HookRegistry(
    hooks={
        HookEventTypes.PreInvocation: log_pre,
        HookEventTypes.PostInvocation: log_post,
    }
)

model = li.iModel(model="gpt-4o", hook_registry=hooks)

PreInvocation fires when the event is dequeued, just before the API call. PostInvocation fires after the response arrives. Both receive the Event instance.

Sharing hooks across models:

chat = li.iModel(model="gpt-4o", hook_registry=hooks)
parse = li.iModel(model="gpt-4o-mini", hook_registry=hooks)
branch = li.Branch(chat_model=chat, parse_model=parse)

Rate limiting

Control throughput with limit_requests and limit_tokens per rate-limit window:

model = li.iModel(
    model="gpt-4o",
    limit_requests=500,          # max requests per window
    limit_tokens=100_000,        # max tokens per window
    capacity_refresh_time=60,    # window duration in seconds
    queue_capacity=200,          # max queued requests (backpressure)
    concurrency_limit=10,        # max concurrent in-flight requests
)

When the queue fills, new requests block until capacity frees. capacity_refresh_time sets the sliding window for RPM/TPM limits.

Independent buckets for parallel workflows — use model.copy() for separate counters:

workers = [model.copy(share_session=False) for _ in range(5)]
branches = [li.Branch(chat_model=w) for w in workers]

Custom middle

The Middle protocol is a callable that advances the branch by one assistant turn. Inject via branch.operate(middle=...).

Retry-wrapping example:

from lionagi.operations.types import ChatParam, ParseParam
from lionagi.operations.communicate.communicate import communicate

class RetryMiddle:
    def __init__(self, max_retries: int = 3):
        self._max = max_retries

    async def __call__(
        self, branch, instruction, chat_param: ChatParam,
        parse_param: ParseParam | None = None,
        clear_messages: bool = False,
        skip_validation: bool = False,
    ):
        for attempt in range(self._max):
            try:
                return await communicate(
                    branch, instruction, chat_param, parse_param,
                    clear_messages, skip_validation,
                )
            except Exception:
                if attempt == self._max - 1:
                    raise

retry = RetryMiddle(max_retries=3)
result = await branch.operate(instruction="...", middle=retry)

Deterministic replay (testing):

class ReplayMiddle:
    def __init__(self, responses: list[str]):
        self._queue = list(responses)

    async def __call__(self, branch, instruction, chat_param, parse_param=None, **_):
        return self._queue.pop(0)

replay = ReplayMiddle(["Paris", "42"])
result = await branch.operate(instruction="Capital of France?", middle=replay)
# No API call made — returns "Paris" from queue

Cache middle:

class CacheMiddle:
    def __init__(self):
        self._cache: dict[str, Any] = {}

    async def __call__(self, branch, instruction, chat_param, parse_param=None,
                       clear_messages=False, skip_validation=False):
        key = str(instruction)
        if key in self._cache:
            return self._cache[key]
        result = await communicate(
            branch, instruction, chat_param, parse_param,
            clear_messages, skip_validation,
        )
        self._cache[key] = result
        return result

Structured output edge cases

handle_validation Modes

When the model returns output that does not parse into your response_format schema:

Mode Behavior
"raise" Raise ValueError immediately
"return_value" Return the raw string (default for operate())
"return_none" Return None
# Strict: raises on any parse failure
result = await branch.operate(
    instruction="Extract entity",
    response_format=EntityModel,
    handle_validation="raise",
)

# Lenient: raw string on failure — check type before using
result = await branch.operate(
    instruction="Extract entity",
    response_format=EntityModel,
    handle_validation="return_value",
)
if not isinstance(result, EntityModel):
    print("Parse failed:", result)

Fuzzy Key Matching

branch.parse() enables fuzzy key matching by default — handles minor key name variations from the model (e.g., "key_points" vs "keyPoints"):

verdict = await branch.parse(
    text=raw_llm_output,
    response_format=VerdictModel,
    fuzzy_match=True,
    similarity_threshold=0.85,  # lower = more tolerant
    handle_validation="raise",
)

Disable with fuzzy_match=False when you need exact key matching.

Streaming + Structured Output

With CLI endpoints and stream_persist=True, chunks write to JSONL as they arrive:

result = await branch.operate(
    instruction="Generate a detailed report on...",
    response_format=ReportModel,
    stream_persist=True,
    persist_dir="./logs/streams",
    chat_model=li.iModel(provider="claude_code", model="sonnet"),
)
# chunks → ./logs/streams/{branch_id}.buffer.jsonl
# return value → ReportModel (parsed from accumulated text)

FieldModel Dynamic Extensions

Add fields to any operate() call without modifying your base schema:

from lionagi import FieldModel

result = await branch.operate(
    instruction="Analyze this article",
    response_format=ArticleAnalysis,
    field_models=[
        FieldModel(
            name="confidence",
            annotation=float,
            default=0.0,
            description="Confidence score 0–1",
        ),
    ],
)

Next: Provider reference