Voice SDK reference (v0.3)
Method-by-method reference for the two Python packages shipping in v0.3: memoair-voice and memoair-livekit. The underlying HTTP wire contract (loopback runtime) is documented separately in the HTTP API reference.
Which class do I use?
MemoAirVoiceClient— default choice. High-level async client. Wraps the local runtime + cloud-side index management. Use this in custom Python agents, scripts, and notebooks.MemoAirLiveKitAgent— drop-inlivekit.agents.Agentsubclass shipped bymemoair-livekit. Use when building a LiveKit Agents worker.MemoAirVoiceMemory/MemoAirVoiceMemorySync— raw runtime client with framework-tool adapters. Use when building your own framework adapter or when you need explicit control overstart_session/after_turn.MemoryTool/MemoryToolSync— the registerable callable returned bysearch_memory_tool().
Install
pip install "memoair-voice>=0.3.1" "memoair-livekit>=0.3.1"memoair-livekit depends on memoair-voice. Install only the first if you don't use LiveKit.
MemoAirVoiceClient
High-level async client. Construct it once at process boot, pass a per-call user={id,name,metadata} object to search_memory / save_response, and call aclose() on shutdown. All recall calls are local-loopback; create_index and list_indexes hit MemoAir cloud.
from memoair_voice import MemoAirVoiceClient__init__
MemoAirVoiceClient( *, api_key: str, project_id: str, agent_id: str, cloud_base_url: str = "https://backend.memoair.space", max_concurrent_users: int = 20, runtime_idle_ttl_s: int = 300, runtime_port_range: tuple[int, int] = (7878, 7977),) -> MemoAirVoiceClientParameters
api_keystrRequiredMemoAir account API key (looks like memoair_pk_…). Account-scoped: one key per org, shared across all projects and agents. Used for cloud-side create_index / list_indexes AND forwarded to every spawned runtime so it can bootstrap projections from cloud.
project_idstrRequiredMemoAir project ID (looks like proj_…). Identifies the workspace whose org index, profile, and permanent lanes you want to access.
agent_idstrRequiredMemoAir agent ID (looks like agent_…). Identifies the voice bot inside the project. Sent on every cloud call as the X-Agent-Id header so dashboard analytics, eval traces, and prompt versions stay scoped per agent.
cloud_base_urlstrOptionalDefault: "https://backend.memoair.space"Base URL of MemoAir cloud. Used for create_index / list_indexes and forwarded to every pool-spawned runtime so its sync loops target the same backend as the client.
max_concurrent_usersintOptionalDefault: 20Maximum number of concurrent (project_id, user.id) runtimes the internal pool keeps alive. When the cap is hit and a fresh user arrives, the LRU runtime is flushed and evicted. Hitting the cap with every handle in-flight raises pool.exhausted — bump for high-concurrency bridges.
runtime_idle_ttl_sintOptionalDefault: 300Idle reaper threshold (seconds). Runtimes whose handle has been released for longer than this TTL are flushed and shut down. Lower it on memory-pressured hosts; raise it for chatty users where bootstrap latency matters.
runtime_port_rangetuple[int, int]OptionalDefault: (7878, 7977)Inclusive loopback port range the pool draws from when spawning a new runtime. Each (project_id, user.id) gets its own port. Widen this range if you need more than ~100 concurrent users on a single host; clashes with externally bound ports raise pool.port_exhausted.
Raises ValueError if any of api_key, project_id, or agent_id are empty strings. Construct the client once at boot; pass user={...} per call so the internal runtime pool can fan out across end-users.
search_memory()
Per-turn local recall across up to four lanes. Returns a composed contextText ready to splice into your LLM prompt. Never hits the cloud; never blocks longer than timeout_ms.
async def search_memory( self, query: str, *, user: dict, user_id: str | None = None, # compatibility alias for user["id"] lanes: list[str] | None = None, intent: str = "answer_current_user", top_k: dict[str, int] | None = None, timeout_ms: int = 250,) -> SearchResultParameters
userdictRequiredPreferred end-user object: {"id": "...", "name": "...", "metadata": {...}}. The id routes the call to the runtime for this (project_id, user.id) pair; name + metadata are stored on the durable session_start event for dashboard caller views.
user_idstr | NoneOptionalDefault: NoneCompatibility alias for user.id. Prefer user={...} so display metadata can be stored alongside the identifier.
querystrRequiredThe user's transcribed turn text. Used by the runtime composer to score recall across lanes.
laneslist[str] | NoneOptionalDefault: NoneSubset of ["profile", "working", "permanent", "org"]. Defaults to all four. Set to gate which lanes participate this turn — e.g. ["permanent", "org"] to skip live profile and working brief.
intentstrOptionalDefault: "answer_current_user"Free-form intent hint. The runtime can bias scoring per intent (answer_current_user, summarise_recent, etc.). Reserved for future use; default is safe.
top_kdict[str, int] | NoneOptionalDefault: NonePer-lane top_k override, e.g. {"permanent": 6, "org": 4, "working": 4}. Defaults to the runtime's bootstrap budgets.
timeout_msintOptionalDefault: 250Per-turn deadline in milliseconds. Lanes that respond after the deadline are dropped; the composed contextText still returns with whatever arrived in time (trace.degraded=true).
Returns: SearchResult (see Result Types below) — always returns; falls back to empty contextText on degraded lanes rather than raising.
result = await client.search_memory( "what timezone does the user prefer?", user={ "id": "caller_42", "name": "Alex", "metadata": {"plan": "premium"}, }, lanes=["profile", "working", "permanent", "org"], timeout_ms=250,)print(result.contextText)print("hit sources:", result.sources)save_response()
Persist a completed (user, assistant) turn. Forwards to the local runtime which appends to the session JSONL and enqueues cloud sync. Best-effort: transient runtime errors are swallowed so the audio path is never blocked.
async def save_response( self, *, user_text: str, assistant_text: str, user: dict, user_id: str | None = None, turn_id: str | None = None, tool_calls: list[dict] | None = None, metadata: dict | None = None,) -> str | NoneParameters
userdictRequiredPreferred end-user object: {"id": "...", "name": "...", "metadata": {...}}. Must match the same user object (or at least same id) passed to search_memory for this turn.
user_idstr | NoneOptionalDefault: NoneCompatibility alias for user.id. Prefer user={...} for new code.
user_textstrRequiredThe user's final transcribed turn (post-VAD finalisation).
assistant_textstrRequiredThe LLM's completed reply for the same turn.
turn_idstr | NoneOptionalDefault: NoneOptional caller-supplied turn ID. Auto-generated (uuid4) if omitted. Provide an explicit value if you need to correlate the saved event with your own transcript ledger.
tool_callslist[dict] | NoneOptionalDefault: NoneOptional list of tool-call telemetry, e.g. [{"name": "search_memory", "latencyMs": 8.8, "hits": 3}]. Surfaces in the dashboard replay view.
metadatadict | NoneOptionalDefault: NoneFree-form per-turn metadata, e.g. {"interrupted": false, "framework": "livekit"}.
Returns: the runtime's localEventId on success, or None if the runtime dropped/swallowed the call.
await client.save_response( user_text="What time is standup?", assistant_text="10:30 IST.", user={"id": "caller_42", "name": "Alex"}, metadata={"framework": "livekit", "interrupted": False},)create_index()
Cloud-side: build or append an org index. Build-or-append semantics — the first call with a given name CREATES the index, subsequent calls APPEND documents (or replace by matching id). Refreshes the local runtime's org projection on success.
async def create_index( self, name: str, documents: list[dict], *, metadata: dict | None = None,) -> IndexBuildResultParameters
namestrRequiredStable index name. The first call with a given name creates the index; subsequent calls APPEND documents (or replace any with a matching document id).
documentslist[dict]RequiredList of {"id", "text", "metadata"} dicts. id is a stable per-doc key; text is the searchable content; metadata is free-form tags (string values). Per call cap: 100 docs and 1 MB total — use the dashboard for bulk uploads.
metadatadict | NoneOptionalDefault: NoneOptional index-level metadata, attached to every chunk created by this call.
Returns: IndexBuildResult(index_name, chunk_count, version).
Raises: MemoAirVoiceMemoryError with status_code=413 if the payload exceeds the 100-doc / 1 MB cap — use the MemoAir dashboard for bulk ingest beyond that.
result = await client.create_index( "agent-memory", documents=[ {"id": "returns", "text": "30-day returns with receipt.", "metadata": {"topic": "returns"}}, {"id": "shipping", "text": "Express ships in 1-2 days.", "metadata": {"topic": "shipping"}}, ],)print(f"Indexed {result.chunk_count} chunks, version={result.version}")list_indexes()
Cloud-side: list all org indexes in the current project. No arguments.
async def list_indexes(self) -> list[dict]Returns: raw cloud response list (each item has name, version, chunkCount, createdAt, …).
session() / aclose()
Use session(user=...) when many calls happen back-to-back for the same end-user and you want to pin the runtime handle. Always call aclose() on process shutdown so the pool flushes and terminates every runtime.
session = await client.session(user={"id": "caller_42", "name": "Alex"})async with session as s: result = await s.search_memory("what did we discuss?") await s.save_response(user_text="what did we discuss?", assistant_text="...") await client.aclose()Concurrency model — the runtime pool
A single MemoAirVoiceClient can serve many concurrent end-users. Behind the scenes the client owns a RuntimePool that spawns one voice-runtime process per (project_id, user.id) on a free port from runtime_port_range. Subsequent calls for the same user_id reuse that runtime; calls for a new user_id either reuse a warm idle handle or spawn a fresh one.
- Bounded LRU. The pool never exceeds
max_concurrent_users(default 20). When the cap is reached and a fresh user arrives, the LRU handle is flushed (POST /v1/runtime/sync/flush) and shut down before the new runtime spawns — no event loss. - Idle reaper. Background task evicts handles whose last release was longer than
runtime_idle_ttl_sseconds ago (default 300). Trades a bootstrap-latency hit on the next call for the ~30 MB / runtime memory savings. - Per-call routing. Every
search_memory/save_responsetakes a requireduser={...}. The pool resolvesuser.idto a runtime handle for the duration of the call, then releases. - Identity headers. The cloud base URL receives
Authorization: Bearer memoair_pk_…,X-Project-Id,X-Agent-Id, andX-User-Idon every request — see concepts for the four-level identity model.
Pool error codes
pool.exhausted— every handle in the pool is in-flight; LRU eviction has nothing to drop. Bumpmax_concurrent_usersor wait for an in-flight call to release.pool.port_exhausted— every port insideruntime_port_rangeis bound by an existing pool handle or an external process. Widen the range or lowermax_concurrent_users.pool.closed—aclose()already drained the pool. Construct a fresh client.
MemoAirLiveKitAgent
Drop-in livekit.agents.Agent subclass shipped by memoair-livekit. Wires every memory lifecycle hook for you. LiveKit dispatch is 1:1 (one process per call), so this class keeps user_id at construction time and pins its internal pool to that single end-user. See LiveKit integration for an end-to-end agent.
from memoair_livekit import MemoAirLiveKitAgent__init__
MemoAirLiveKitAgent( *, api_key: str, project_id: str, agent_id: str, user_id: str, instructions: str | None = None, search_timeout_ms: int = 250, cloud_base_url: str = "https://backend.memoair.space", **agent_kwargs,) -> MemoAirLiveKitAgentParameters
api_keystrRequiredMemoAir account API key (memoair_pk_…). Same value as the cloud-side dashboard.
project_idstrRequiredMemoAir project ID (proj_…).
agent_idstrRequiredMemoAir agent ID (agent_…). Sent as X-Agent-Id on every cloud call.
user_idstrRequiredEnd-user identity for THIS LiveKit job. LiveKit dispatch is 1:1 (one process per call), so the agent surface keeps user_id at construction time and pins the internal pool to it. Use the LiveKit participant identity in production; fall back to a stable console-mode default for local dev.
instructionsstr | NoneOptionalDefault: NoneSystem prompt forwarded to LiveKit's Agent base class. None is allowed but most agents pass a real value here.
search_timeout_msintOptionalDefault: 250Per-turn deadline for the in-flight search_memory call inside on_user_turn_completed. Search failures are swallowed and logged at WARNING — the LLM is never blocked on memory I/O.
cloud_base_urlstrOptionalDefault: "https://backend.memoair.space"MemoAir cloud base URL forwarded to the spawned runtime.
**agent_kwargsAnyOptionalAny other kwarg LiveKit's Agent.__init__ accepts (tools, chat_ctx, stt, llm, tts, vad, …) is forwarded as-is. Keeps the subclass forward-compatible with LiveKit Agent surface additions.
Lifecycle hooks
Called automatically by LiveKit's pipeline. You don't invoke these yourself — they're documented so you know what the subclass does.
on_enter()— opens the MemoAir runtime session (load_indexequivalent).on_user_turn_completed(turn_ctx, new_message)— runssearch_memorywith a timeout, then splices a SYSTEM message intoturn_ctx.itemsbefore the user's message (LiveKit #5053 ordering workaround). Search failures are logged at WARNING and swallowed — the LLM never blocks on memory I/O.on_agent_response_completed(chat_ctx, new_message)— persists the completed turn viasave_response. Required wiring: LiveKit's baseAgentdoes not call this; forward LiveKit'sconversation_item_addedevent for assistant items into this method (see the LiveKit page for the exact handler).on_exit()— idempotent close of the runtime session.
MemoAirVoiceMemory (low-level)
Raw async wrapper around the loopback runtime. Use when you want explicit control over session lifecycle, or when building a framework adapter MemoAir doesn't ship yet. Most code should use MemoAirVoiceClient instead.
from memoair_voice import MemoAirVoiceMemoryConstructor
MemoAirVoiceMemory( *, project_id: str, user_id: str, api_key: str | None = None, internal_token: str | None = None, runtime_url: str = "http://127.0.0.1:7878", runtime_token: str | None = None, session_id: str | None = None, satham: dict | None = None, auto_start_runtime: bool = True, runtime_binary: str | None = None, cloud_base_url: str = "https://backend.memoair.space", storage_root: str | None = None, # plus runtime_version, runtime_repo, runtime_cache_dir, client, runtime_manager)Either api_key or internal_token must be set if the runtime needs to bootstrap from cloud. The runtime binary itself is auto-resolved (downloaded from ghcr.io/memoair/memoair-runtime on first use) when auto_start_runtime=True.
Methods
await memory.start_session(metadata=None, timeout_s=5.0) -> dict— open a voice session against the runtime. Returns the raw runtime body ({ sessionId, bootstrap, paths }).await memory.end_session(reason="completed", timeout_s=5.0) -> dict— close the session and flush.memory.search_memory_tool(*, search_memory_timeout_ms=None) -> MemoryTool— return a registerable LLM tool.await memory.after_turn(user_text, assistant_text, turn_id, tool_calls=None, metadata=None, timeout_s=1.0) -> str | None— persist a completed turn. Note this is the raw formMemoAirVoiceClient.save_responsewraps; hereturn_idis required.memory.to_openai_function() -> dict— OpenAI Chat Completions / Responses tool schema.memory.to_livekit_tool() -> FunctionTool— requireslivekit-agentsinstalled.memory.to_pipecat_tool() -> dict—{ name, schema, handler }. Requirespipecat-aiinstalled.
MemoAirVoiceMemorySync (sync mirror)
Synchronous variant of MemoAirVoiceMemory with identical method names — drop the async/await keywords. Use with non-async agent frameworks. Same to_openai_function / to_livekit_tool / to_pipecat_tool adapters.
MemoryTool
Async callable returned by search_memory_tool(). Calling it forwards the query to the local runtime and returns the composed contextText string. Register tool.schema() with your LLM framework, or call await tool(query) directly.
class MemoryTool: name: str # always "search_memory" last_trace: dict | None # most recent runtime trace async def __call__( self, query: str, *, intent: str = "answer_current_user", ) -> str: ... # returns the composed contextText def schema(self) -> dict: ... # OpenAI-style function-tool JSON schematool = client.search_memory_tool() # Option 1 — register the schema with OpenAI tools surfaceopenai_tools = [tool.schema()] # Option 2 — call directly inside your own handlercontext = await tool("what does the user prefer for follow-ups?")print(context)print("trace:", tool.last_trace)MemoryToolSync is the sync mirror (same shape, no await).
Result types
All result dataclasses are frozen and importable from memoair_voice.
SearchResult
@dataclass(frozen=True)class SearchResult: contextText: str # composed system-message string ready to splice profile: dict | None # current profile snapshot, or None working: list[dict] # working-brief hits permanent: list[dict] # per-user permanent hits org: list[dict] # workspace-shared org hits sources: list[dict] # unioned source metadata for citations trace: dict # per-lane latency + degraded flagIndexBuildResult
@dataclass(frozen=True)class IndexBuildResult: index_name: str chunk_count: int # number of embedding chunks created/appended version: int | None # monotonic version, bumped per callIndexLoadResult
@dataclass(frozen=True)class IndexLoadResult: session_id: str profile_version: int | None permanent_manifest_version: int | None org_manifest_version: int | None scopes_loaded: list[str]Errors
MemoAirVoiceMemoryError
Raised on hard runtime errors (non-transient 4xx, timeouts, unparseable responses) and on cloud-side index failures. The runtime's { "error": { code, message, details } } envelope is parsed into the exception attributes.
class MemoAirVoiceMemoryError(Exception): code: str # e.g. "runtime.identity_mismatch" message: str status_code: int | None # HTTP status if applicable details: dict # extra context from the envelopeCommon codes: runtime.bad_request, runtime.session_not_found, runtime.bootstrap_failed, runtime.projection_unavailable, runtime.identity_mismatch, runtime.timeout, runtime.internal.
End-to-end example
Seed an org index, then run a single turn against it.
import asyncioimport osfrom memoair_voice import MemoAirVoiceClient async def main() -> None: client = MemoAirVoiceClient( api_key=os.environ["MEMOAIR_API_KEY"], project_id=os.environ["MEMOAIR_PROJECT_ID"], agent_id=os.environ["MEMOAIR_AGENT_ID"], ) try: # Seed (idempotent — re-running appends / replaces by id) await client.create_index( "agent-memory", documents=[ {"id": "returns", "text": "Returns accepted within 30 days with a receipt.", "metadata": {"topic": "returns"}}, {"id": "shipping", "text": "Express ships in 1-2 business days.", "metadata": {"topic": "shipping"}}, ], ) # Pretend a user just spoke this: user_text = "How long do I have to return something?" user = {"id": "caller_42", "name": "Alex"} recall = await client.search_memory( user_text, user=user, timeout_ms=250, ) print("context:", recall.contextText) assistant_text = "You have 30 days from purchase to return with a receipt." await client.save_response( user_text=user_text, assistant_text=assistant_text, user=user, ) finally: await client.aclose() asyncio.run(main())