Home/Documentation
Voice Memory · Python SDK Reference

Voice SDK reference (v0.3)

Method-by-method reference for the two Python packages shipping in v0.3: memoair-voice and memoair-livekit. The underlying HTTP wire contract (loopback runtime) is documented separately in the HTTP API reference.

Which class do I use?

  • MemoAirVoiceClient default choice. High-level async client. Wraps the local runtime + cloud-side index management. Use this in custom Python agents, scripts, and notebooks.
  • MemoAirLiveKitAgent — drop-in livekit.agents.Agent subclass shipped by memoair-livekit. Use when building a LiveKit Agents worker.
  • MemoAirVoiceMemory / MemoAirVoiceMemorySync — raw runtime client with framework-tool adapters. Use when building your own framework adapter or when you need explicit control over start_session / after_turn.
  • MemoryTool / MemoryToolSync — the registerable callable returned by search_memory_tool().

Install

terminal
BASH
pip install "memoair-voice>=0.3.1" "memoair-livekit>=0.3.1"

memoair-livekit depends on memoair-voice. Install only the first if you don't use LiveKit.

MemoAirVoiceClient

High-level async client. Construct it once at process boot, pass a per-call user={id,name,metadata} object to search_memory / save_response, and call aclose() on shutdown. All recall calls are local-loopback; create_index and list_indexes hit MemoAir cloud.

import
PYTHON
from memoair_voice import MemoAirVoiceClient

__init__

signature.py
PYTHON
MemoAirVoiceClient(
*,
api_key: str,
project_id: str,
agent_id: str,
cloud_base_url: str = "https://backend.memoair.space",
max_concurrent_users: int = 20,
runtime_idle_ttl_s: int = 300,
runtime_port_range: tuple[int, int] = (7878, 7977),
) -> MemoAirVoiceClient

Parameters

api_keystrRequired

MemoAir account API key (looks like memoair_pk_…). Account-scoped: one key per org, shared across all projects and agents. Used for cloud-side create_index / list_indexes AND forwarded to every spawned runtime so it can bootstrap projections from cloud.

project_idstrRequired

MemoAir project ID (looks like proj_…). Identifies the workspace whose org index, profile, and permanent lanes you want to access.

agent_idstrRequired

MemoAir agent ID (looks like agent_…). Identifies the voice bot inside the project. Sent on every cloud call as the X-Agent-Id header so dashboard analytics, eval traces, and prompt versions stay scoped per agent.

cloud_base_urlstrOptionalDefault: "https://backend.memoair.space"

Base URL of MemoAir cloud. Used for create_index / list_indexes and forwarded to every pool-spawned runtime so its sync loops target the same backend as the client.

max_concurrent_usersintOptionalDefault: 20

Maximum number of concurrent (project_id, user.id) runtimes the internal pool keeps alive. When the cap is hit and a fresh user arrives, the LRU runtime is flushed and evicted. Hitting the cap with every handle in-flight raises pool.exhausted — bump for high-concurrency bridges.

runtime_idle_ttl_sintOptionalDefault: 300

Idle reaper threshold (seconds). Runtimes whose handle has been released for longer than this TTL are flushed and shut down. Lower it on memory-pressured hosts; raise it for chatty users where bootstrap latency matters.

runtime_port_rangetuple[int, int]OptionalDefault: (7878, 7977)

Inclusive loopback port range the pool draws from when spawning a new runtime. Each (project_id, user.id) gets its own port. Widen this range if you need more than ~100 concurrent users on a single host; clashes with externally bound ports raise pool.port_exhausted.

Raises ValueError if any of api_key, project_id, or agent_id are empty strings. Construct the client once at boot; pass user={...} per call so the internal runtime pool can fan out across end-users.

search_memory()

Per-turn local recall across up to four lanes. Returns a composed contextText ready to splice into your LLM prompt. Never hits the cloud; never blocks longer than timeout_ms.

signature.py
PYTHON
async def search_memory(
self,
query: str,
*,
user: dict,
user_id: str | None = None, # compatibility alias for user["id"]
lanes: list[str] | None = None,
intent: str = "answer_current_user",
top_k: dict[str, int] | None = None,
timeout_ms: int = 250,
) -> SearchResult

Parameters

userdictRequired

Preferred end-user object: {"id": "...", "name": "...", "metadata": {...}}. The id routes the call to the runtime for this (project_id, user.id) pair; name + metadata are stored on the durable session_start event for dashboard caller views.

user_idstr | NoneOptionalDefault: None

Compatibility alias for user.id. Prefer user={...} so display metadata can be stored alongside the identifier.

querystrRequired

The user's transcribed turn text. Used by the runtime composer to score recall across lanes.

laneslist[str] | NoneOptionalDefault: None

Subset of ["profile", "working", "permanent", "org"]. Defaults to all four. Set to gate which lanes participate this turn — e.g. ["permanent", "org"] to skip live profile and working brief.

intentstrOptionalDefault: "answer_current_user"

Free-form intent hint. The runtime can bias scoring per intent (answer_current_user, summarise_recent, etc.). Reserved for future use; default is safe.

top_kdict[str, int] | NoneOptionalDefault: None

Per-lane top_k override, e.g. {"permanent": 6, "org": 4, "working": 4}. Defaults to the runtime's bootstrap budgets.

timeout_msintOptionalDefault: 250

Per-turn deadline in milliseconds. Lanes that respond after the deadline are dropped; the composed contextText still returns with whatever arrived in time (trace.degraded=true).

Returns: SearchResult (see Result Types below) — always returns; falls back to empty contextText on degraded lanes rather than raising.

example.py
PYTHON
result = await client.search_memory(
"what timezone does the user prefer?",
user={
"id": "caller_42",
"name": "Alex",
"metadata": {"plan": "premium"},
},
lanes=["profile", "working", "permanent", "org"],
timeout_ms=250,
)
print(result.contextText)
print("hit sources:", result.sources)

save_response()

Persist a completed (user, assistant) turn. Forwards to the local runtime which appends to the session JSONL and enqueues cloud sync. Best-effort: transient runtime errors are swallowed so the audio path is never blocked.

signature.py
PYTHON
async def save_response(
self,
*,
user_text: str,
assistant_text: str,
user: dict,
user_id: str | None = None,
turn_id: str | None = None,
tool_calls: list[dict] | None = None,
metadata: dict | None = None,
) -> str | None

Parameters

userdictRequired

Preferred end-user object: {"id": "...", "name": "...", "metadata": {...}}. Must match the same user object (or at least same id) passed to search_memory for this turn.

user_idstr | NoneOptionalDefault: None

Compatibility alias for user.id. Prefer user={...} for new code.

user_textstrRequired

The user's final transcribed turn (post-VAD finalisation).

assistant_textstrRequired

The LLM's completed reply for the same turn.

turn_idstr | NoneOptionalDefault: None

Optional caller-supplied turn ID. Auto-generated (uuid4) if omitted. Provide an explicit value if you need to correlate the saved event with your own transcript ledger.

tool_callslist[dict] | NoneOptionalDefault: None

Optional list of tool-call telemetry, e.g. [{"name": "search_memory", "latencyMs": 8.8, "hits": 3}]. Surfaces in the dashboard replay view.

metadatadict | NoneOptionalDefault: None

Free-form per-turn metadata, e.g. {"interrupted": false, "framework": "livekit"}.

Returns: the runtime's localEventId on success, or None if the runtime dropped/swallowed the call.

example.py
PYTHON
await client.save_response(
user_text="What time is standup?",
assistant_text="10:30 IST.",
user={"id": "caller_42", "name": "Alex"},
metadata={"framework": "livekit", "interrupted": False},
)

create_index()

Cloud-side: build or append an org index. Build-or-append semantics — the first call with a given name CREATES the index, subsequent calls APPEND documents (or replace by matching id). Refreshes the local runtime's org projection on success.

signature.py
PYTHON
async def create_index(
self,
name: str,
documents: list[dict],
*,
metadata: dict | None = None,
) -> IndexBuildResult

Parameters

namestrRequired

Stable index name. The first call with a given name creates the index; subsequent calls APPEND documents (or replace any with a matching document id).

documentslist[dict]Required

List of {"id", "text", "metadata"} dicts. id is a stable per-doc key; text is the searchable content; metadata is free-form tags (string values). Per call cap: 100 docs and 1 MB total — use the dashboard for bulk uploads.

metadatadict | NoneOptionalDefault: None

Optional index-level metadata, attached to every chunk created by this call.

Returns: IndexBuildResult(index_name, chunk_count, version).

Raises: MemoAirVoiceMemoryError with status_code=413 if the payload exceeds the 100-doc / 1 MB cap — use the MemoAir dashboard for bulk ingest beyond that.

example.py
PYTHON
result = await client.create_index(
"agent-memory",
documents=[
{"id": "returns", "text": "30-day returns with receipt.", "metadata": {"topic": "returns"}},
{"id": "shipping", "text": "Express ships in 1-2 days.", "metadata": {"topic": "shipping"}},
],
)
print(f"Indexed {result.chunk_count} chunks, version={result.version}")

list_indexes()

Cloud-side: list all org indexes in the current project. No arguments.

signature.py
PYTHON
async def list_indexes(self) -> list[dict]

Returns: raw cloud response list (each item has name, version, chunkCount, createdAt, …).

session() / aclose()

Use session(user=...) when many calls happen back-to-back for the same end-user and you want to pin the runtime handle. Always call aclose() on process shutdown so the pool flushes and terminates every runtime.

lifecycle.py
PYTHON
session = await client.session(user={"id": "caller_42", "name": "Alex"})
async with session as s:
result = await s.search_memory("what did we discuss?")
await s.save_response(user_text="what did we discuss?", assistant_text="...")
 
await client.aclose()

Concurrency model — the runtime pool

A single MemoAirVoiceClient can serve many concurrent end-users. Behind the scenes the client owns a RuntimePool that spawns one voice-runtime process per (project_id, user.id) on a free port from runtime_port_range. Subsequent calls for the same user_id reuse that runtime; calls for a new user_id either reuse a warm idle handle or spawn a fresh one.

  • Bounded LRU. The pool never exceeds max_concurrent_users (default 20). When the cap is reached and a fresh user arrives, the LRU handle is flushed (POST /v1/runtime/sync/flush) and shut down before the new runtime spawns — no event loss.
  • Idle reaper. Background task evicts handles whose last release was longer than runtime_idle_ttl_s seconds ago (default 300). Trades a bootstrap-latency hit on the next call for the ~30 MB / runtime memory savings.
  • Per-call routing. Every search_memory / save_response takes a required user={...}. The pool resolves user.id to a runtime handle for the duration of the call, then releases.
  • Identity headers. The cloud base URL receives Authorization: Bearer memoair_pk_…, X-Project-Id, X-Agent-Id, and X-User-Id on every request — see concepts for the four-level identity model.

Pool error codes

  • pool.exhausted — every handle in the pool is in-flight; LRU eviction has nothing to drop. Bump max_concurrent_users or wait for an in-flight call to release.
  • pool.port_exhausted — every port inside runtime_port_range is bound by an existing pool handle or an external process. Widen the range or lower max_concurrent_users.
  • pool.closed aclose() already drained the pool. Construct a fresh client.

MemoAirLiveKitAgent

Drop-in livekit.agents.Agent subclass shipped by memoair-livekit. Wires every memory lifecycle hook for you. LiveKit dispatch is 1:1 (one process per call), so this class keeps user_id at construction time and pins its internal pool to that single end-user. See LiveKit integration for an end-to-end agent.

import
PYTHON
from memoair_livekit import MemoAirLiveKitAgent

__init__

signature.py
PYTHON
MemoAirLiveKitAgent(
*,
api_key: str,
project_id: str,
agent_id: str,
user_id: str,
instructions: str | None = None,
search_timeout_ms: int = 250,
cloud_base_url: str = "https://backend.memoair.space",
**agent_kwargs,
) -> MemoAirLiveKitAgent

Parameters

api_keystrRequired

MemoAir account API key (memoair_pk_…). Same value as the cloud-side dashboard.

project_idstrRequired

MemoAir project ID (proj_…).

agent_idstrRequired

MemoAir agent ID (agent_…). Sent as X-Agent-Id on every cloud call.

user_idstrRequired

End-user identity for THIS LiveKit job. LiveKit dispatch is 1:1 (one process per call), so the agent surface keeps user_id at construction time and pins the internal pool to it. Use the LiveKit participant identity in production; fall back to a stable console-mode default for local dev.

instructionsstr | NoneOptionalDefault: None

System prompt forwarded to LiveKit's Agent base class. None is allowed but most agents pass a real value here.

search_timeout_msintOptionalDefault: 250

Per-turn deadline for the in-flight search_memory call inside on_user_turn_completed. Search failures are swallowed and logged at WARNING — the LLM is never blocked on memory I/O.

cloud_base_urlstrOptionalDefault: "https://backend.memoair.space"

MemoAir cloud base URL forwarded to the spawned runtime.

**agent_kwargsAnyOptional

Any other kwarg LiveKit's Agent.__init__ accepts (tools, chat_ctx, stt, llm, tts, vad, …) is forwarded as-is. Keeps the subclass forward-compatible with LiveKit Agent surface additions.

Lifecycle hooks

Called automatically by LiveKit's pipeline. You don't invoke these yourself — they're documented so you know what the subclass does.

  • on_enter() — opens the MemoAir runtime session (load_index equivalent).
  • on_user_turn_completed(turn_ctx, new_message) — runs search_memory with a timeout, then splices a SYSTEM message into turn_ctx.items before the user's message (LiveKit #5053 ordering workaround). Search failures are logged at WARNING and swallowed — the LLM never blocks on memory I/O.
  • on_agent_response_completed(chat_ctx, new_message) — persists the completed turn via save_response. Required wiring: LiveKit's base Agent does not call this; forward LiveKit's conversation_item_added event for assistant items into this method (see the LiveKit page for the exact handler).
  • on_exit() — idempotent close of the runtime session.

MemoAirVoiceMemory (low-level)

Raw async wrapper around the loopback runtime. Use when you want explicit control over session lifecycle, or when building a framework adapter MemoAir doesn't ship yet. Most code should use MemoAirVoiceClient instead.

import
PYTHON
from memoair_voice import MemoAirVoiceMemory

Constructor

signature.py
PYTHON
MemoAirVoiceMemory(
*,
project_id: str,
user_id: str,
api_key: str | None = None,
internal_token: str | None = None,
runtime_url: str = "http://127.0.0.1:7878",
runtime_token: str | None = None,
session_id: str | None = None,
satham: dict | None = None,
auto_start_runtime: bool = True,
runtime_binary: str | None = None,
cloud_base_url: str = "https://backend.memoair.space",
storage_root: str | None = None,
# plus runtime_version, runtime_repo, runtime_cache_dir, client, runtime_manager
)

Either api_key or internal_token must be set if the runtime needs to bootstrap from cloud. The runtime binary itself is auto-resolved (downloaded from ghcr.io/memoair/memoair-runtime on first use) when auto_start_runtime=True.

Methods

  • await memory.start_session(metadata=None, timeout_s=5.0) -> dict — open a voice session against the runtime. Returns the raw runtime body ({ sessionId, bootstrap, paths }).
  • await memory.end_session(reason="completed", timeout_s=5.0) -> dict — close the session and flush.
  • memory.search_memory_tool(*, search_memory_timeout_ms=None) -> MemoryTool — return a registerable LLM tool.
  • await memory.after_turn(user_text, assistant_text, turn_id, tool_calls=None, metadata=None, timeout_s=1.0) -> str | None — persist a completed turn. Note this is the raw form MemoAirVoiceClient.save_response wraps; here turn_id is required.
  • memory.to_openai_function() -> dict — OpenAI Chat Completions / Responses tool schema.
  • memory.to_livekit_tool() -> FunctionTool — requires livekit-agents installed.
  • memory.to_pipecat_tool() -> dict { name, schema, handler }. Requires pipecat-ai installed.

MemoAirVoiceMemorySync (sync mirror)

Synchronous variant of MemoAirVoiceMemory with identical method names — drop the async/await keywords. Use with non-async agent frameworks. Same to_openai_function / to_livekit_tool / to_pipecat_tool adapters.

MemoryTool

Async callable returned by search_memory_tool(). Calling it forwards the query to the local runtime and returns the composed contextText string. Register tool.schema() with your LLM framework, or call await tool(query) directly.

MemoryTool.py
PYTHON
class MemoryTool:
name: str # always "search_memory"
last_trace: dict | None # most recent runtime trace
 
async def __call__(
self,
query: str,
*,
intent: str = "answer_current_user",
) -> str: ...
# returns the composed contextText
 
def schema(self) -> dict: ...
# OpenAI-style function-tool JSON schema
example.py
PYTHON
tool = client.search_memory_tool()
 
# Option 1 — register the schema with OpenAI tools surface
openai_tools = [tool.schema()]
 
# Option 2 — call directly inside your own handler
context = await tool("what does the user prefer for follow-ups?")
print(context)
print("trace:", tool.last_trace)

MemoryToolSync is the sync mirror (same shape, no await).

Result types

All result dataclasses are frozen and importable from memoair_voice.

SearchResult

SearchResult.py
PYTHON
@dataclass(frozen=True)
class SearchResult:
contextText: str # composed system-message string ready to splice
profile: dict | None # current profile snapshot, or None
working: list[dict] # working-brief hits
permanent: list[dict] # per-user permanent hits
org: list[dict] # workspace-shared org hits
sources: list[dict] # unioned source metadata for citations
trace: dict # per-lane latency + degraded flag

IndexBuildResult

IndexBuildResult.py
PYTHON
@dataclass(frozen=True)
class IndexBuildResult:
index_name: str
chunk_count: int # number of embedding chunks created/appended
version: int | None # monotonic version, bumped per call

IndexLoadResult

IndexLoadResult.py
PYTHON
@dataclass(frozen=True)
class IndexLoadResult:
session_id: str
profile_version: int | None
permanent_manifest_version: int | None
org_manifest_version: int | None
scopes_loaded: list[str]

Errors

MemoAirVoiceMemoryError

Raised on hard runtime errors (non-transient 4xx, timeouts, unparseable responses) and on cloud-side index failures. The runtime's { "error": { code, message, details } } envelope is parsed into the exception attributes.

MemoAirVoiceMemoryError.py
PYTHON
class MemoAirVoiceMemoryError(Exception):
code: str # e.g. "runtime.identity_mismatch"
message: str
status_code: int | None # HTTP status if applicable
details: dict # extra context from the envelope

Common codes: runtime.bad_request, runtime.session_not_found, runtime.bootstrap_failed, runtime.projection_unavailable, runtime.identity_mismatch, runtime.timeout, runtime.internal.

End-to-end example

Seed an org index, then run a single turn against it.

end_to_end.py
PYTHON
import asyncio
import os
from memoair_voice import MemoAirVoiceClient
 
 
async def main() -> None:
client = MemoAirVoiceClient(
api_key=os.environ["MEMOAIR_API_KEY"],
project_id=os.environ["MEMOAIR_PROJECT_ID"],
agent_id=os.environ["MEMOAIR_AGENT_ID"],
)
try:
# Seed (idempotent — re-running appends / replaces by id)
await client.create_index(
"agent-memory",
documents=[
{"id": "returns", "text": "Returns accepted within 30 days with a receipt.", "metadata": {"topic": "returns"}},
{"id": "shipping", "text": "Express ships in 1-2 business days.", "metadata": {"topic": "shipping"}},
],
)
 
# Pretend a user just spoke this:
user_text = "How long do I have to return something?"
 
user = {"id": "caller_42", "name": "Alex"}
recall = await client.search_memory(
user_text,
user=user,
timeout_ms=250,
)
print("context:", recall.contextText)
 
assistant_text = "You have 30 days from purchase to return with a receipt."
await client.save_response(
user_text=user_text,
assistant_text=assistant_text,
user=user,
)
finally:
await client.aclose()
 
 
asyncio.run(main())