Documents
The Documents resource handles file upload and processing. Upload PDFs, text files, and other documents to extract knowledge and add it to your graph.
client.documents.upload()POST /v1/documents/uploadUpload a document for processing. Returns a job ID to track progress.
Parameters
filestr | Path | bytes | BinaryIORequiredThe file to upload. Can be a file path, bytes, or file-like object.
group_idstrRequiredThe group ID to associate the document with.
filenamestrOptionalOverride filename (useful when passing bytes or file objects).
Returns
UploadDocumentResponse with job_id, filename, status, and message.
Example
from memoair import MemoAir client = MemoAir() # Upload a PDFjob = client.documents.upload( file="company_handbook.pdf", group_id="org:acme",) print(f"Job ID: {job.job_id}")print(f"Status: {job.status}")client.documents.get_status()GET /v1/documents/jobs/{job_id}Check the processing status of an uploaded document.
Returns
JobStatusResponse with:
job_id,filenamestatus: JobStatus- Current processing stageprogress: int- Percentage complete (0-100)episode_count- Number of chunks createdis_complete(),is_success()- Helper methods
Job Status Values
QUEUEDWaiting to be processedPARSINGExtracting text from documentENRICHINGExtracting facts and entitiesINGESTINGAdding to knowledge graphCOMPLETESuccessfully processedERRORProcessing failedstatus = client.documents.get_status(job_id="job-123") print(f"Status: {status.status}")print(f"Progress: {status.progress}%") if status.is_complete(): if status.is_success(): print(f"Created {status.episode_count} chunks") else: print(f"Error: {status.error_message}")client.documents.wait_for_completion()GET /v1/documents/jobs/{job_id}/waitBlock until a document finishes processing. Polls status internally.
Parameters
job_idstrRequiredThe job ID returned from upload().
poll_intervalfloatOptionalDefault: 1.0Seconds between status checks.
timeoutfloatOptionalMaximum seconds to wait. None = wait forever.
Example
from memoair import MemoAir client = MemoAir() # Upload and wait for completionjob = client.documents.upload( file="handbook.pdf", group_id="org:acme",) # Block until done (with timeout)try: status = client.documents.wait_for_completion( job_id=job.job_id, timeout=300, # 5 minutes max ) if status.is_success(): print(f"Document processed: {status.episode_count} chunks") else: print(f"Processing failed: {status.error_message}") except TimeoutError: print("Document processing timed out")client.documents.list_jobs()GET /v1/documents/jobsList all document processing jobs, optionally filtered by status.
# List all jobsjobs = client.documents.list_jobs(limit=20) # Filter by statusprocessing = client.documents.list_jobs(status="PARSING")completed = client.documents.list_jobs(status="COMPLETE") for job in jobs.jobs: print(f"{job.filename}: {job.status}")client.documents.history()GET /v1/documents/history/{group_id}List all processed documents for a group.
# Get document history for an organizationdocs = client.documents.history( group_id="org:acme", limit=50,) print(f"Total documents: {docs.total}")for doc in docs.documents: print(f"- {doc.filename} ({doc.created_at})")