Python SDK¶
The official Python SDK for Tilde provides a high-level client for managing repositories, sessions, objects, and more.
Installation¶
Requires Python 3.10 or later. The SDK uses httpx for HTTP transport.
Configuration¶
The SDK authenticates with API keys (static, long-lived tokens) or short-lived sandbox tokens issued automatically when running inside a Tilde sandbox. Create an API key in the Tilde web UI or via the REST API -- the token is shown only once.
Credentials are resolved in this order (highest priority first):
- Explicit parameter:
Client(api_key=...)ortilde.configure(api_key=...) TILDE_API_KEYenvironment variable~/.tilde/config.yamlwritten bytilde auth loginTILDE_SANDBOX_CREDENTIALS_URI-- auto-detected when running inside a Tilde sandbox (see Running inside a sandbox)
Static credentials from steps 1-3 always suppress auto-detection.
The API endpoint URL is resolved separately:
Client(endpoint_url=...)ortilde.configure(endpoint_url=...)TILDE_ENDPOINT_URLenvironment variableTILDE_API_URLenvironment variable (injected by the sandbox runtime alongsideTILDE_SANDBOX_CREDENTIALS_URI)~/.tilde/config.yamlendpoint_urlfield- Default:
https://tilde.run
Missing API key is not an error at construction time
Client(api_key=None) and a bare tilde.configure() call succeed. ConfigurationError is raised only on the first request if no key has been resolved. This means unit tests that construct Client() without credentials will only fail when they actually call the API.
Using the CLI's saved credentials¶
If you've already authenticated with the Tilde CLI via tilde auth login, the SDK picks up the credentials it wrote to ~/.tilde/config.yaml -- no env var or in-code configuration needed:
This is the fastest way to get started on a local machine. The SDK also reads endpoint_url from the same file, and picks up TILDE_ENDPOINT_URL / TILDE_API_URL env vars, so if the CLI or sandbox runtime configured a custom endpoint the SDK follows automatically.
Environment variables¶
Set TILDE_API_KEY (and optionally TILDE_ENDPOINT_URL) to use the SDK without any configuration in code. This is the right approach for CI/CD pipelines and agent workflows where the CLI hasn't been run:
Module-level configuration¶
Explicit client¶
Use an explicit client when you need multiple configurations or want full control over the HTTP lifecycle:
from tilde import Client
client = Client(api_key="YOUR_API_TOKEN")
repo = client.repository("my-team/my-data")
client.close()
The client supports context managers:
Custom endpoint¶
By default the SDK connects to https://tilde.run. To point it at a different endpoint, set TILDE_ENDPOINT_URL or pass endpoint_url:
Running inside a sandbox¶
When your code runs inside a Tilde sandbox, the SDK automatically authenticates as the sandbox's target principal -- no API key or config file needed. The sandbox runtime injects two environment variables:
TILDE_SANDBOX_CREDENTIALS_URI-- a link-local metadata endpoint that issues short-lived bearer tokens scoped to the sandbox principal. Tokens are refreshed automatically before expiry.TILDE_API_URL-- the API endpoint URL to use with those tokens.
The SDK detects these variables at construction time when no static key is configured:
import tilde
# Inside a sandbox -- credentials and endpoint are auto-detected.
repo = tilde.repository("my-team/my-data")
If you set TILDE_API_KEY or pass api_key= explicitly, those static credentials win and the sandbox metadata endpoint is not used. This lets you deliberately target a different principal (e.g., your own user key) while your code runs inside a sandbox.
Default sandbox image¶
The SDK uses ubuntu:22.04 as the default sandbox image when no image is passed to repo.shell() or repo.execute(). Override it with default_sandbox_image (or the TILDE_DEFAULT_SANDBOX_IMAGE env var):
Sandbox Execution¶
The fastest way to run code against your repository data. Sandboxes execute inside isolated containers with your data mounted as a volume -- every change is captured as a transaction. For direct async lifecycle control (create, poll, cancel), see Sandboxes (Low-Level).
Transactional by default
All filesystem modifications made in a sandbox happen in the context of a transactional session. If anything fails midway or is aborted, changes don't take effect. Only successful sandboxes' changes are committed atomically to the repository -- so your data is always in a consistent state.
Interactive shell¶
Use repo.shell() to run multiple commands in a single sandbox session:
import tilde
repo = tilde.repository("my-team/my-data")
with repo.shell(image="python:3.12") as sh:
sh.run("pip install pandas")
result = sh.run("python train.py")
print(result.stdout.text())
# Stream output line by line
result = sh.run("cat /sandbox/results.csv")
for line in result.stdout.iter_lines():
print(line)
The shell is a context manager -- when the block exits, the sandbox is closed and changes are committed automatically. If an exception occurs, changes are rolled back.
shell.run() returns a RunResult with stdout, stderr, and exit_code. stdout and stderr are independent OutputStream objects captured on separate channels. Pass check=True to raise CommandError on non-zero exit codes.
shell.run() also accepts per-call keyword arguments:
| Argument | Type | Description |
|---|---|---|
env |
dict[str, str] |
Additional environment variables merged into the container environment for this call only |
cwd |
str |
Working directory for this exec |
stdin |
bytes |
Bytes to feed to the child's stdin |
check |
bool |
Raise CommandError on non-zero exit (default False) |
One-shot execution¶
For a single command that doesn't need an interactive session:
result = repo.execute("python train.py", image="python:3.12")
print(result.stdout.text())
# check=False to handle errors yourself
result = repo.execute("might-fail", image="python:3.12", check=False)
if result.exit_code != 0:
print(result.stdout.text())
execute() raises CommandError on non-zero exit codes by default. Pass check=False to inspect the result directly.
Output streams¶
Both execute() and shell.run() return a RunResult with stdout, stderr, and exit_code fields. Both fields are OutputStream objects with the same interface.
Behavioral difference:
shell.run()-- stdout and stderr are captured on separate channels over the/execWebSocket.result.stderrcontains only stderr bytes;result.stdoutcontains only stdout bytes.execute()-- the server merges stdout and stderr into a single stream.result.stdoutholds the merged output;result.stderris always empty bytes.
OutputStream methods:
| Method | Returns | Description |
|---|---|---|
.read() |
bytes |
Full output as raw bytes |
.text(encoding='utf-8') |
str |
Full output decoded as a string. Pass encoding= to override UTF-8. |
.iter_bytes(chunk_size) |
Iterator[bytes] |
Yield byte chunks |
.iter_text(chunk_size) |
Iterator[str] |
Yield text chunks |
.iter_lines() |
Iterator[str] |
Yield lines (no trailing newlines) |
Pagination¶
Every .list() (and uncommitted() / diff()) call returns a PaginatedIterator that fetches pages lazily as you iterate. By default it walks the entire collection -- no manual cursor handling required:
Two keyword arguments tune the iteration:
| Argument | Effect |
|---|---|
amount |
Cap on the total number of results yielded across all pages. Stops iteration early once reached. |
page_size |
Number of results requested per HTTP page. Defaults to 100, server max is 1000. |
Use amount when you only want the first N items, regardless of how the server pages them:
# Yield at most 25 commits, then stop.
for commit in repo.commits.list(amount=25):
print(commit.id, commit.message)
Use page_size to tune the round-trip count when iterating long lists:
# Fetch 500 entries per HTTP request while walking every object in the snapshot.
for entry in commit.objects.list(page_size=500):
process(entry)
The two are independent: list(amount=10, page_size=500) makes a single 500-item request and yields the first 10 items. Note that when amount < page_size, the extra results fetched in the first page are discarded -- page_size does not cap total results, only per-request count.
Resuming with after=¶
after= is a resume cursor: pass the last key you processed to pick up where you left off. This is the right pattern for bookkeeping across restarts or batched processing:
# Resume a long object walk from where a previous run stopped.
last_seen = load_checkpoint() # e.g. "data/file100.csv"
for entry in commit.objects.list(prefix="data/", after=last_seen):
process(entry)
save_checkpoint(entry.path)
after= works on all .list() calls and on session.uncommitted() / commit.diff().
Repositories¶
Create a repository through the organization:
import tilde
org = tilde.organizations.get("my-team")
repo = org.repositories.create(
"my-data",
description="My dataset",
retention_days=90,
)
# List repositories in an organization
for r in org.repositories.list():
print(r.name, r.description)
Access a repository by its organization/repository path:
repo = tilde.repository("my-team/my-data")
# Lazy-loaded properties
print(repo.id)
print(repo.description)
print(repo.retention_days)
print(repo.created_by_type)
print(repo.created_by)
print(repo.created_at)
Update or delete:
Sessions (Advanced)¶
Sessions provide direct transactional access to objects in a repository. They act like transactions: stage changes, then commit or rollback. Most users should prefer sandbox execution for running code -- sessions are useful when you need fine-grained control over individual object operations from your own process.
Use the context manager for automatic rollback on error:
repo = tilde.repository("my-team/my-data")
with repo.session() as session:
session.objects.put("data/train.csv", open("train.csv", "rb"))
session.objects.put("data/test.csv", open("test.csv", "rb"))
session.objects.delete("data/old.csv")
session.commit("Add new dataset")
# rolls back automatically on exception
For explicit control:
session = repo.session()
session.objects.put("data/file.csv", b"col1,col2\na,b\n")
# List uncommitted changes
for entry in session.uncommitted():
print(entry.path, entry.entry.size)
# commit() returns the commit ID on success, or "" / None when approval is required.
result = session.commit("Add CSV data", metadata={"source": "ingest"})
if result:
print(f"Committed: {result}")
To discard all staged changes:
Attaching to an existing session¶
repo.attach(session_id) creates a Session handle pointing at a session that already exists on the server -- no new session is created. Use this when you need to resume work across process boundaries or machines: one process uploads objects and writes the session_id to a queue or database; another process picks it up and commits.
# Process A: upload objects, hand off the session ID
session = repo.session()
session.objects.put("data/part1.csv", open("part1.csv", "rb"))
print(session.session_id) # pass this to process B
# Process B: attach and commit
session = repo.attach("existing-session-uuid")
session.objects.put("data/part2.csv", open("part2.csv", "rb"))
session.commit("Add dataset parts 1 and 2")
Objects¶
Writing objects¶
Objects are written within a session. The put() method accepts bytes, file-like objects, pathlib.Path, or any iterable of bytes chunks:
import pathlib
with repo.session() as session:
# From bytes
session.objects.put("data/hello.txt", b"Hello, Tilde!")
# From a file
with open("dataset.parquet", "rb") as f:
session.objects.put("data/dataset.parquet", f)
# From a pathlib.Path (opened and closed automatically)
session.objects.put("data/model.bin", pathlib.Path("model.bin"))
# Copy an object within the session
session.objects.copy("data/hello.txt", "data/hello-backup.txt")
# Delete an object
session.objects.delete("data/old-file.csv")
# Bulk delete
deleted = session.objects.delete_many(["data/a.csv", "data/b.csv", "data/c.csv"])
print(f"Deleted {deleted} objects")
session.commit("Upload files")
The put() method returns a PutObjectResult with path and etag fields:
with repo.session() as session:
result = session.objects.put("data/file.csv", b"col1,col2\na,b\n")
print(result.path, result.etag)
session.commit("Add CSV")
Checking object metadata in a session¶
session.objects.head() checks existence and retrieves metadata without downloading content. It includes staged (not yet committed) objects:
session = repo.session()
session.objects.put("data/report.csv", b"col1,col2\na,b\n")
meta = session.objects.head("data/report.csv")
print(meta.etag, meta.content_length, meta.content_type)
Returns ObjectMetadata with etag, content_type, content_length, and reproducible fields. Raises NotFoundError if the path does not exist.
Copying objects¶
Copy an object to a new path within a session. The copy is server-side -- no data is downloaded or re-uploaded:
with repo.session() as session:
result = session.objects.copy("data/original.csv", "data/copy.csv")
print(result.source_path, result.destination_path)
session.commit("Copy CSV file")
The copy() method returns a CopyObjectResult with source_path and destination_path fields.
Large file uploads¶
For files larger than 64 MB, the SDK automatically uses multipart upload -- splitting the file into parts and uploading them in parallel. No code changes are needed:
with repo.session() as session:
# Files under 64 MB use a single upload
# Files 64 MB and above use multipart upload automatically
session.objects.put("data/large-dataset.parquet", open("large-dataset.parquet", "rb"))
session.commit("Add large dataset")
If the server does not support multipart uploads, the SDK falls back to single upload transparently.
Reading objects¶
Read objects from a committed snapshot via the commit log:
# Get the latest commit
for commit in repo.commits.list():
with commit.objects.get("data/hello.txt") as f:
print(f.read().decode())
# Get metadata without downloading
meta = commit.objects.head("data/hello.txt")
print(meta.etag, meta.content_type, meta.content_length)
break
Read objects within a session (includes uncommitted changes):
session = repo.session()
with session.objects.get("data/hello.txt") as f:
content = f.read()
session.rollback()
Listing objects¶
for commit in repo.commits.list():
# List all objects (auto-paginating)
for entry in commit.objects.list():
print(entry.path, entry.type, entry.entry.size)
# Filter by prefix
for entry in commit.objects.list(prefix="data/"):
print(entry.path)
# Directory grouping with delimiter
for entry in commit.objects.list(prefix="data/", delimiter="/"):
if entry.type == "prefix":
print(f"Directory: {entry.path}")
else:
print(f"Object: {entry.path}")
break
Streaming large objects¶
For large files, disable caching and stream in chunks:
for commit in repo.commits.list():
with commit.objects.get("data/large-file.parquet", cache=False) as f:
with open("local-copy.parquet", "wb") as out:
for chunk in f.iter_bytes(chunk_size=8192):
out.write(chunk)
break
Byte range reads¶
Pass byte_range=(start, end) to read only a portion of an object without downloading the full content. Useful for file headers, tailing logs, or columnar formats like Parquet:
# Read the first 4 bytes
with commit.objects.get("data/file.parquet", byte_range=(0, 3)) as f:
magic = f.read()
# Read from offset 1024 to end of file
with commit.objects.get("data/file.parquet", byte_range=(1024, None)) as f:
tail = f.read()
print(f.content_range) # e.g. "bytes 1024-49151/49152"
print(f.content_length) # bytes returned
Commit Log & Diffs¶
Browse commit history and inspect changes:
repo = tilde.repository("my-team/my-data")
# List commits (newest first, auto-paginating)
for commit in repo.commits.list():
print(f"{commit.id[:12]} {commit.committer}: {commit.message}")
Pass ref= (a commit ID) to list commits starting from that commit and walking backward through history:
Look up a single commit by ID:
View the changes introduced by a specific commit:
for commit in repo.commits.list():
for entry in commit.diff():
print(f" {entry.status}: {entry.path}")
break # just the latest commit
Reverting a commit¶
Create a new commit that undoes the changes from a previous commit:
for commit in repo.commits.list():
revert_commit = commit.revert(message="Undo last change")
print(f"Reverted to: {revert_commit.id}") # revert_commit is a Commit
break
Organizations¶
import tilde
# Create
org = tilde.organizations.create("my-team", display_name="My Team")
# List
for org in tilde.organizations.list():
print(org.name, org.display_name)
# Get
org = tilde.organizations.get("my-team")
# Delete
tilde.organizations.delete("my-team")
The Organization returned by get() exposes all org-scoped sub-resources:
org = tilde.organizations.get("my-team")
for repo in org.repositories.list():
print(repo.name)
for member in org.members.list():
print(member.username)
# Also provides: org.groups, org.policies, org.connectors, org.roles, org.agents
Members¶
org = tilde.organizations.get("my-team")
for m in org.members.list():
print(m.username)
org.members.create("alice") # add by username
org.members.delete("user-uuid") # remove by user_id
Groups¶
org = tilde.organizations.get("my-team")
groups = org.groups
# Create
group = groups.create("data-engineers", description="Data engineering team")
# List
for g in groups.list():
print(g.name)
# Get details (includes members and policy attachments)
group = groups.get(group.id)
print(group.name, len(list(group.members.list())))
# Policy attachments on this group (loaded by groups.get(id))
for att in group.attachments:
print(att.policy_name, att.attached_at)
# Manage members
group.members.create(subject_type="user", subject_id="user-uuid")
group.members.delete(subject_type="user", subject_id="user-uuid")
# Update
group = group.update(name="new-name", description="Updated description")
# Delete
group.delete()
# View effective groups for a user
for g in groups.effective(principal_type="user", principal_id="user-uuid"):
print(g.group_name, g.source)
Policies¶
org = tilde.organizations.get("my-team")
policies = org.policies
# Validate before creating
result = policies.validate("""
GetObject()
""")
print(result.valid, result.errors)
# Create
policy = policies.create(
name="read-only",
policy_text="""
ListRepositories()
GetRepository()
ListObjects()
GetObject()
LogCommits()
""",
description="Read-only access",
)
# Generate a policy from natural language (LLM-backed, non-deterministic output)
policy_text = policies.generate("Allow read-only access to all repositories")
print(policy_text)
# Attach to a group
policy.attach(principal_type="group", principal_id="group-uuid")
# Detach
policy.detach(principal_type="group", principal_id="group-uuid")
# Update a policy
policy = policy.update(description="Updated description", policy_text="""
GetObject()
""")
# List all attachments across the organization
for att in policies.attachments():
print(att.policy_name, att.principal_type, att.principal_name)
# List attachments for a single policy (loaded by policies.get or policies.create)
for att in policy.attachments:
print(att.principal_type, att.principal_name, att.attached_at)
# View effective policies for a principal (user, agent, or role)
for ep in policies.effective(principal_type="user", principal_id="user-uuid"):
print(ep.policy_name, ep.source)
# Also works for agents
for ep in policies.effective(principal_type="agent", principal_id="agent-uuid"):
print(ep.policy_name, ep.source)
Roles vs Agents¶
Both Roles and Agents are non-human identities. The key differences:
| Roles | Agents | |
|---|---|---|
| API key prefix | trk- |
tak- |
| Metadata | No | Yes (metadata dict) |
| Inline policy | No | Yes (inline_policy) |
| Designed for | CI/CD pipelines, automation | Autonomous tools, AI assistants |
Use Roles when you need a simple service account with no extra state. Use Agents when the identity needs metadata (e.g., environment, version) or a directly attached policy without creating a standalone policy object.
Roles¶
Roles are non-human identities for CI/CD pipelines and automation. They authenticate via API keys (prefixed trk-).
org = tilde.organizations.get("my-team")
# Create
role = org.roles.create("ci-deployer", description="CI/CD pipeline")
print(role.name, role.id)
# List
for role in org.roles.list():
print(role.name, role.description)
# Get
role = org.roles.get("ci-deployer")
# Delete (revokes all API keys, removes from groups and policies)
org.roles.delete("ci-deployer")
Role API keys¶
role = org.roles.get("ci-deployer")
# Create a key (token is only shown once)
created = role.api_keys.create("github-actions-key")
print(created.token) # trk-... full token
# List keys
for key in role.api_keys.list():
print(key.name, key.token_hint)
# Revoke a key
key = role.api_keys.get(key_id)
key.revoke()
Agents¶
Agents are non-human identities that carry metadata, designed for automated tools and AI assistants. They authenticate via API keys (prefixed tak-).
org = tilde.organizations.get("my-team")
# Create
agent = org.agents.create("data-pipeline", description="Nightly data pipeline", metadata={"env": "prod"})
print(agent.name, agent.id)
# List
for agent in org.agents.list():
print(agent.name, agent.metadata)
# Get
agent = org.agents.get("data-pipeline")
# Update (description, metadata, and inline_policy are independently optional)
agent = agent.update(metadata={"env": "staging"})
# Delete (revokes all API keys, removes from groups and policies)
org.agents.delete("data-pipeline")
Agent inline policy¶
Agents can have an inline policy attached directly, without creating a standalone policy object:
agent = org.agents.get("data-pipeline")
# Set an inline policy
agent.update(inline_policy="""
ListRepositories()
GetRepository()
ListObjects()
GetObject()
""")
# Inspect the current inline policy
agent = org.agents.get("data-pipeline")
print(agent.inline_policy)
print(agent.inline_policy_updated_at)
Agent API keys¶
agent = org.agents.get("data-pipeline")
# Create a key (token is only shown once)
created = agent.api_keys.create("pipeline-key")
print(created.token) # tak-... full token
# List keys
for key in agent.api_keys.list():
print(key.name, key.token_hint)
# Revoke a key
key = agent.api_keys.get(key_id)
key.revoke()
Connectors & Imports¶
Creating connectors¶
org = tilde.organizations.get("my-team")
connectors = org.connectors
# S3 connector
connector = connectors.create(
name="production-s3",
type="s3",
source_uri="s3://my-bucket/datasets/",
config={
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-west-2",
},
)
# GCS connector
connector = connectors.create(
name="production-gcs",
type="gcs",
source_uri="gs://my-bucket/datasets/",
config={
"credentials_json": open("service-account-key.json").read(),
"project_id": "my-project",
},
)
Attaching connectors to repositories¶
repo = tilde.repository("my-team/my-data")
repo.connectors.attach(connector.id)
for c in repo.connectors.list():
print(c.name, c.type)
repo.connectors.detach(connector.id)
Importing from a connector¶
Use create_from_connector() to import objects from an external source (S3, GCS) into a repository:
repo = tilde.repository("my-team/my-data")
# Start an import job from a connector
job = repo.imports.create_from_connector(
connector_id=connector.id,
destination_path="imported/",
source_prefix="datasets/",
commit_message="Import production datasets",
)
# Poll for completion
import time
while True:
job.refresh()
print(f"Status: {job.status}, Objects: {job.objects_imported}")
if job.status in ("completed", "failed"):
break
time.sleep(2)
if job.status == "completed":
print(f"Import done! Commit: {job.commit_id}")
elif job.status == "failed":
print(f"Import failed: {job.error}")
create_from_connector() returns a fully populated ImportJob. Call job.refresh() to fetch the latest status, or repo.imports.get(job_id) to look up a job by ID.
Cross-repository imports¶
Use create_from_repository() to import objects from another Tilde repository. This copies data from the source repository into the destination, creating a new commit:
repo = tilde.repository("my-team/my-data")
# Import from another repository
job = repo.imports.create_from_repository(
repo_path="other-team/source-data",
destination_path="external/",
source_prefix="datasets/train/",
commit_message="Import training data from source-data",
)
while True:
job.refresh()
print(f"Status: {job.status}, Objects: {job.objects_imported}")
if job.status in ("completed", "failed"):
break
time.sleep(2)
if job.status == "completed":
print(f"Import done! Commit: {job.commit_id}")
The source repository path uses the "organization/repository" format. You must have read access to the source repository to import from it.
Agent Approval Workflow¶
When a repository has Require approval for agent commits enabled, commits made by agent API keys return HTTP 202 instead of completing immediately. The server responds with a URL to an approval page in the Tilde UI where a human reviewer can inspect the staged changes and approve or roll back the session.
The SDK handles this transparently. By default, commit() prints the approval URL and blocks until a human approves or rolls back the session, polling every 2 seconds:
import tilde
repo = tilde.repository("my-team/my-data")
with repo.session() as session:
session.objects.put("data/results.csv", b"col1,col2\na,b\n")
session.commit("Add results")
# When approval is required, the SDK:
# 1. Emits a UserWarning with the approval URL
# 2. Blocks until the session is approved or rolled back
When this runs and approval is required, you will see a warning like:
UserWarning: Approval required - review and approve at: https://tilde.run/console/organizations/my-team/repositories/my-data/sessions/abc-123/approve
Non-blocking mode¶
If you don't want the SDK to wait for approval, pass block_for_approval=False. The call returns None immediately after emitting the warning:
with repo.session() as session:
session.objects.put("data/results.csv", b"col1,col2\na,b\n")
result = session.commit("Add results", block_for_approval=False)
# result is None - approval is pending
# The session will NOT be auto-rolled-back on exit because
# the SDK considers it submitted for review.
Structured result¶
Use commit_result() instead of commit() to get a structured CommitResult object without blocking or emitting warnings:
with repo.session() as session:
session.objects.put("data/results.csv", b"col1,col2\na,b\n")
result = session.commit_result("Add results")
if result.status == "committed":
print(f"Committed: {result.commit_id}")
elif result.status == "approval_required":
print(f"Awaiting approval: {result.web_url}")
Return values¶
commit() returns different types depending on the scenario:
| Scenario | Return value |
|---|---|
| No approval required | str -- the new commit ID |
Approval required, block_for_approval=True (default) |
"" -- empty string; polling blocked until the session resolved (committed or rolled back), but the commit ID is not returned on this path |
Approval required, block_for_approval=False |
None -- returned immediately; the session is pending review |
Use commit_result() instead of commit() when you need to know the commit ID after an approval-gated commit, or want a structured result without blocking or side-channel warnings. commit_result() always returns a CommitResult with status, commit_id, and web_url fields.
Enabling the setting¶
Toggle approval gating in the Tilde web UI under Repository Settings.
Sandboxes (Low-Level)¶
For most use cases, prefer Sandbox Execution (repo.shell() and repo.execute()), which manages the sandbox lifecycle for you and returns output synchronously.
The low-level sandbox API gives you direct control over async sandbox lifecycle: create, poll, stream logs, and cancel independently. See the Sandboxes guide for concepts, triggers, and more.
Creating a sandbox¶
import tilde
repo = tilde.repository("my-team/my-data")
sandbox = repo.sandboxes.create(
image="python:3.11",
command=["python", "process.py"],
env={"OUTPUT_FORMAT": "json"},
mountpoint="/data",
path_prefix="datasets/",
timeout_seconds=600,
)
print(sandbox.id)
Checking status and streaming logs¶
sandbox.status() returns a SandboxStatus with the following fields:
| Field | Type | Description |
|---|---|---|
state |
str |
Current state (see table below) |
status_reason |
str |
Machine-readable qualifier for state (see table below). Empty while running. |
exit_code |
int \| None |
Process exit code (set once terminal) |
commit_id |
str |
Commit ID if the sandbox committed changes |
web_url |
str |
Approval URL if the sandbox is awaiting approval |
error_message |
str |
Error description if state is failed or errored |
Sandbox states:
| State | Terminal? | Meaning | status_reason values |
|---|---|---|---|
starting |
No | Container is being prepared. Do not attach. | created, initializing |
running |
No | Container is executing. Attach the terminal or stream logs. | (empty) |
done |
Yes | Sandbox finished its work. | committed (changes landed; read commit_id), no_changes (nothing to commit), awaiting_approval (changes need human review; surface web_url) |
errored |
Yes | The sandbox's command exited non-zero. Read exit_code. |
exit_non_zero |
failed |
Yes | Tilde-side infrastructure failure. Read error_message. |
internal_error, image_not_found, sandbox_lost, timeout, commit_failed, policy_violation, storage_not_ready |
cancelled |
Yes | Cancelled by user or operator. | cancelled_by_user, user_initiated_cancel |
The full outcome of a sandbox is the (state, status_reason) pair — read both. Convenience properties on SandboxStatus cover the common branches:
| Property | Returns True when |
|---|---|
is_active |
state is starting or running |
is_terminal |
state is done, errored, failed, or cancelled |
is_committed |
state == "done" and status_reason == "committed" |
is_awaiting_approval |
state == "done" and status_reason == "awaiting_approval" |
has_no_changes |
state == "done" and status_reason == "no_changes" |
# Poll for status
status = sandbox.status()
print(status.state, status.status_reason, status.exit_code, status.commit_id)
if status.is_awaiting_approval:
print(f"Approve at: {status.web_url}")
elif status.state == "errored":
print(f"Command failed with exit code {status.exit_code}")
elif status.state == "failed":
print(f"Sandbox infrastructure error ({status.status_reason}): {status.error_message}")
# Stream merged stdout+stderr
with status.stdout() as stream:
for line in stream:
print(line, end="")
# Stream outbound network request logs (NDJSON, one JSON object per line)
with status.network() as stream:
for line in stream:
print(line)
Cancelling a sandbox¶
Listing sandboxes¶
for sb in repo.sandboxes.list():
print(sb.id, sb.status().state)
# Look up a single sandbox by ID
sb = repo.sandboxes.get("sandbox-uuid")
Delegation¶
Run a sandbox as an agent instead of yourself:
sandbox = repo.sandboxes.create(
image="python:3.11",
command=["python", "pipeline.py"],
run_as={"type": "agent", "id": "AGENT_UUID"},
)
Sandbox Triggers¶
Triggers automatically create sandboxes when specific files change in a commit. See the Triggers Reference for concepts, condition types, and examples.
Creating a trigger¶
import tilde
repo = tilde.repository("my-team/my-data")
trigger = repo.sandbox_triggers.create(
name="Summarize new documents",
description="Generate summaries when documents are uploaded",
conditions=[
{"type": "prefix", "prefix": "documents/"},
],
sandbox_config={
"image": "my-org/doc-summarizer:latest",
"command": ["python", "summarize.py"],
"mountpoint": "/repo",
"timeout_seconds": 300,
},
run_as={"type": "agent", "id": "AGENT_UUID"}, # optional
)
print(trigger.id)
Listing triggers¶
for trigger in repo.sandbox_triggers.list():
print(trigger.name, trigger.enabled)
# Look up a single trigger by ID
trigger = repo.sandbox_triggers.get("trigger-uuid")
Updating a trigger¶
trigger = trigger.update(
name="Summarize new documents",
conditions=[{"type": "prefix", "prefix": "documents/v2/"}],
sandbox_config={
"image": "python:3.13",
"command": ["python", "summarize.py", "--strict"],
},
)
Enabling and disabling¶
Deleting a trigger¶
Viewing trigger runs¶
Each time a trigger fires on a commit, a run is recorded:
Repository Secrets¶
Repository secrets are encrypted key-value pairs injected as environment variables into sandboxes. They are scoped to a repository and apply to all sandboxes running in it.
repo = tilde.repository("my-team/my-data")
# Create or update a secret
repo.secrets.create("API_KEY", "sk-abc123...")
# Get the decrypted value
secret = repo.secrets.get("API_KEY")
print(secret.value)
# List secrets (metadata only, no values)
for entry in repo.secrets.list():
print(entry.name, entry.created_at)
# Delete a secret
repo.secrets.delete("API_KEY")
Agent Secrets¶
Agent secrets are scoped to an agent and injected into sandboxes running as that agent. They override repository secrets with the same key.
org = tilde.organizations.get("my-team")
agent = org.agents.get("data-pipeline")
# Create or update a secret
agent.secrets.create("OPENAI_KEY", "sk-abc123...")
# Get the decrypted value
secret = agent.secrets.get("OPENAI_KEY")
print(secret.value)
# List secrets (metadata only, no values)
for entry in agent.secrets.list():
print(entry.name, entry.created_at)
# Delete a secret
agent.secrets.delete("OPENAI_KEY")
Secret precedence in sandboxes¶
When a sandbox is created, environment variables are resolved in this order (highest to lowest):
envpassed in the sandbox request- Agent secrets (if running as an agent)
- Repository secrets
A key from a higher-precedence source is never overwritten by a lower one.
Error Handling¶
The SDK raises typed exceptions for all API errors:
import tilde
from tilde.exceptions import NotFoundError, AuthenticationError, APIError
try:
repo = tilde.repository("my-team/nonexistent")
_ = repo.id # triggers the API call
except NotFoundError as e:
print(f"Not found: {e.message}")
except AuthenticationError:
print("Invalid API key")
except APIError as e:
print(f"API error {e.status_code}: {e.message} (request_id: {e.request_id})")
| Exception | HTTP Status | When |
|---|---|---|
BadRequestError |
400 | Invalid input |
AuthenticationError |
401 | Invalid or missing API key |
ForbiddenError |
403 | Insufficient permissions |
NotFoundError |
404 | Resource not found |
ConflictError |
409 | Duplicate resource |
GoneError |
410 | Connector deleted |
PreconditionFailedError |
412 | Source object changed |
LockedError |
423 | Connector disabled |
ServerError |
5xx | Server error |
Non-HTTP errors:
| Exception | When |
|---|---|
ConfigurationError |
Missing API key |
TransportError |
Network failures (DNS, timeout) |
SerializationError |
Invalid JSON in response |
SandboxError |
Sandbox lifecycle failure |
CommandError |
Non-zero exit from repo.execute() or shell.run(check=True) |
All exceptions inherit from tilde.exceptions.TildeError.