Documentation Index
Fetch the complete documentation index at: https://e2b.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
AsyncSandbox
class AsyncSandbox(SandboxApi)
E2B cloud sandbox is a secure and isolated cloud environment.
The sandbox allows you to:
- Access Linux OS
- Create, list, and delete files and directories
- Run commands
- Run isolated code
- Access the internet
Check docs here.
Use the AsyncSandbox.create() to create a new sandbox.
Example:
from e2b import AsyncSandbox
sandbox = await AsyncSandbox.create()
files
@property
def files() -> Filesystem
Module for interacting with the sandbox filesystem.
commands
@property
def commands() -> Commands
Module for running commands in the sandbox.
pty
@property
def pty() -> Pty
Module for interacting with the sandbox pseudo-terminal.
git
@property
def git() -> Git
Module for running git operations in the sandbox.
__init__
def __init__(**opts: Unpack[SandboxOpts])
Use AsyncSandbox.create() to create a new sandbox instead.
is_running
async def is_running(request_timeout: Optional[float] = None) -> bool
Check if the sandbox is running.
Arguments:
request_timeout: Timeout for the request in seconds
Returns:
True if the sandbox is running, False otherwise
Example
sandbox = await AsyncSandbox.create()
await sandbox.is_running() # Returns True
await sandbox.kill()
await sandbox.is_running() # Returns False
create
@classmethod
async def create(cls,
template: Optional[str] = None,
timeout: Optional[int] = None,
metadata: Optional[Dict[str, str]] = None,
envs: Optional[Dict[str, str]] = None,
secure: bool = True,
allow_internet_access: bool = True,
mcp: Optional[McpServer] = None,
network: Optional[SandboxNetworkOpts] = None,
**opts: Unpack[ApiParams]) -> Self
Create a new sandbox.
By default, the sandbox is created from the default base sandbox template.
Arguments:
template: Sandbox template name or ID
timeout: Timeout for the sandbox in seconds, default to 300 seconds. The maximum time a sandbox can be kept alive is 24 hours (86_400 seconds) for Pro users and 1 hour (3_600 seconds) for Hobby users.
metadata: Custom metadata for the sandbox
envs: Custom environment variables for the sandbox
secure: Envd is secured with access token and cannot be used without it, defaults to True.
allow_internet_access: Allow sandbox to access the internet, defaults to True. If set to False, it works the same as setting network deny_out to [0.0.0.0/0].
mcp: MCP server to enable in the sandbox
network: Sandbox network configuration
Returns:
A Sandbox instance for the new sandbox
Use this method instead of using the constructor to create a new sandbox.
connect
@overload
async def connect(timeout: Optional[int] = None,
**opts: Unpack[ApiParams]) -> Self
Connect to a sandbox. If the sandbox is paused, it will be automatically resumed.
Sandbox must be either running or be paused.
With sandbox ID you can connect to the same sandbox from different places or environments (serverless functions, etc).
Arguments:
timeout: Timeout for the sandbox in seconds
For running sandboxes, the timeout will update only if the new timeout is longer than the existing one.
Returns:
A running sandbox instance
@example
sandbox = await AsyncSandbox.create()
await sandbox.beta_pause()
same_sandbox = await sandbox.connect()
connect
@overload
@staticmethod
async def connect(sandbox_id: str,
timeout: Optional[int] = None,
**opts: Unpack[ApiParams]) -> "AsyncSandbox"
Connect to a sandbox. If the sandbox is paused, it will be automatically resumed.
Sandbox must be either running or be paused.
With sandbox ID you can connect to the same sandbox from different places or environments (serverless functions, etc).
Arguments:
sandbox_id: Sandbox ID
timeout: Timeout for the sandbox in seconds
For running sandboxes, the timeout will update only if the new timeout is longer than the existing one.
Returns:
A running sandbox instance
@example
sandbox = await AsyncSandbox.create()
await AsyncSandbox.beta_pause(sandbox.sandbox_id)
same_sandbox = await AsyncSandbox.connect(sandbox.sandbox_id))
connect
@class_method_variant("_cls_connect_sandbox")
async def connect(timeout: Optional[int] = None,
**opts: Unpack[ApiParams]) -> Self
Connect to a sandbox. If the sandbox is paused, it will be automatically resumed.
Sandbox must be either running or be paused.
With sandbox ID you can connect to the same sandbox from different places or environments (serverless functions, etc).
Arguments:
timeout: Timeout for the sandbox in seconds
For running sandboxes, the timeout will update only if the new timeout is longer than the existing one.
Returns:
A running sandbox instance
@example
sandbox = await AsyncSandbox.create()
await sandbox.beta_pause()
same_sandbox = await sandbox.connect()
kill
@overload
async def kill(**opts: Unpack[ApiParams]) -> bool
Kill the sandbox.
Returns:
True if the sandbox was killed, False if the sandbox was not found
kill
@overload
@staticmethod
async def kill(sandbox_id: str, **opts: Unpack[ApiParams]) -> bool
Kill the sandbox specified by sandbox ID.
Arguments:
Returns:
True if the sandbox was killed, False if the sandbox was not found
kill
@class_method_variant("_cls_kill")
async def kill(**opts: Unpack[ApiParams]) -> bool
Kill the sandbox specified by sandbox ID.
Returns:
True if the sandbox was killed, False if the sandbox was not found
set_timeout
@overload
async def set_timeout(timeout: int, **opts: Unpack[ApiParams]) -> None
Set the timeout of the sandbox.
This method can extend or reduce the sandbox timeout set when creating the sandbox or from the last call to .set_timeout.
The maximum time a sandbox can be kept alive is 24 hours (86_400 seconds) for Pro users and 1 hour (3_600 seconds) for Hobby users.
Arguments:
timeout: Timeout for the sandbox in seconds
set_timeout
@overload
@staticmethod
async def set_timeout(sandbox_id: str, timeout: int,
**opts: Unpack[ApiParams]) -> None
Set the timeout of the specified sandbox.
This method can extend or reduce the sandbox timeout set when creating the sandbox or from the last call to .set_timeout.
The maximum time a sandbox can be kept alive is 24 hours (86_400 seconds) for Pro users and 1 hour (3_600 seconds) for Hobby users.
Arguments:
sandbox_id: Sandbox ID
timeout: Timeout for the sandbox in seconds
set_timeout
@class_method_variant("_cls_set_timeout")
async def set_timeout(timeout: int, **opts: Unpack[ApiParams]) -> None
Set the timeout of the specified sandbox.
This method can extend or reduce the sandbox timeout set when creating the sandbox or from the last call to .set_timeout.
The maximum time a sandbox can be kept alive is 24 hours (86_400 seconds) for Pro users and 1 hour (3_600 seconds) for Hobby users.
Arguments:
timeout: Timeout for the sandbox in seconds
get_info
@overload
async def get_info(**opts: Unpack[ApiParams]) -> SandboxInfo
Get sandbox information like sandbox ID, template, metadata, started at/end at date.
Returns:
Sandbox info
get_info
@overload
@staticmethod
async def get_info(sandbox_id: str, **opts: Unpack[ApiParams]) -> SandboxInfo
Get sandbox information like sandbox ID, template, metadata, started at/end at date.
Arguments:
Returns:
Sandbox info
get_info
@class_method_variant("_cls_get_info")
async def get_info(**opts: Unpack[ApiParams]) -> SandboxInfo
Get sandbox information like sandbox ID, template, metadata, started at/end at date.
Returns:
Sandbox info
get_metrics
@overload
async def get_metrics(start: Optional[datetime.datetime] = None,
end: Optional[datetime.datetime] = None,
**opts: Unpack[ApiParams]) -> List[SandboxMetrics]
Get the metrics of the current sandbox.
Arguments:
start: Start time for the metrics, defaults to the start of the sandbox
end: End time for the metrics, defaults to the current time
Returns:
List of sandbox metrics containing CPU, memory and disk usage information
get_metrics
@overload
@staticmethod
async def get_metrics(sandbox_id: str,
start: Optional[datetime.datetime] = None,
end: Optional[datetime.datetime] = None,
**opts: Unpack[ApiParams]) -> List[SandboxMetrics]
Get the metrics of the sandbox specified by sandbox ID.
Arguments:
sandbox_id: Sandbox ID
start: Start time for the metrics, defaults to the start of the sandbox
end: End time for the metrics, defaults to the current time
Returns:
List of sandbox metrics containing CPU, memory and disk usage information
get_metrics
@class_method_variant("_cls_get_metrics")
async def get_metrics(start: Optional[datetime.datetime] = None,
end: Optional[datetime.datetime] = None,
**opts: Unpack[ApiParams]) -> List[SandboxMetrics]
Get the metrics of the current sandbox.
Arguments:
start: Start time for the metrics, defaults to the start of the sandbox
end: End time for the metrics, defaults to the current time
Returns:
List of sandbox metrics containing CPU, memory and disk usage information
beta_create
@classmethod
async def beta_create(cls,
template: Optional[str] = None,
timeout: Optional[int] = None,
auto_pause: bool = False,
metadata: Optional[Dict[str, str]] = None,
envs: Optional[Dict[str, str]] = None,
secure: bool = True,
allow_internet_access: bool = True,
mcp: Optional[McpServer] = None,
**opts: Unpack[ApiParams]) -> Self
[BETA] This feature is in beta and may change in the future.
Create a new sandbox.
By default, the sandbox is created from the default base sandbox template.
Arguments:
template: Sandbox template name or ID
timeout: Timeout for the sandbox in seconds, default to 300 seconds. The maximum time a sandbox can be kept alive is 24 hours (86_400 seconds) for Pro users and 1 hour (3_600 seconds) for Hobby users.
auto_pause: Automatically pause the sandbox after the timeout expires. Defaults to False.
metadata: Custom metadata for the sandbox
envs: Custom environment variables for the sandbox
secure: Envd is secured with access token and cannot be used without it, defaults to True.
allow_internet_access: Allow sandbox to access the internet, defaults to True.
mcp: MCP server to enable in the sandbox
Returns:
A Sandbox instance for the new sandbox
Use this method instead of using the constructor to create a new sandbox.
beta_pause
@overload
async def beta_pause(**opts: Unpack[ApiParams]) -> None
[BETA] This feature is in beta and may change in the future.
Pause the sandbox.
Returns:
Sandbox ID that can be used to resume the sandbox
beta_pause
@overload
@staticmethod
async def beta_pause(sandbox_id: str, **opts: Unpack[ApiParams]) -> None
[BETA] This feature is in beta and may change in the future.
Pause the sandbox specified by sandbox ID.
Arguments:
Returns:
Sandbox ID that can be used to resume the sandbox
beta_pause
@class_method_variant("_cls_pause")
async def beta_pause(**opts: Unpack[ApiParams]) -> None
[BETA] This feature is in beta and may change in the future.
Pause the sandbox.
Returns:
Sandbox ID that can be used to resume the sandbox
create_snapshot
@overload
async def create_snapshot(**opts: Unpack[ApiParams]) -> SnapshotInfo
Create a snapshot of the sandbox’s current state.
The sandbox will be paused while the snapshot is being created.
The snapshot can be used to create new sandboxes with the same filesystem and state.
Snapshots are persistent and survive sandbox deletion.
Use the returned snapshot_id with AsyncSandbox.create(snapshot_id) to create a new sandbox from the snapshot.
Returns:
Snapshot information including the snapshot ID
create_snapshot
@overload
@staticmethod
async def create_snapshot(sandbox_id: str,
**opts: Unpack[ApiParams]) -> SnapshotInfo
Create a snapshot from the sandbox specified by sandbox ID.
The sandbox will be paused while the snapshot is being created.
Arguments:
Returns:
Snapshot information including the snapshot ID
create_snapshot
@class_method_variant("_cls_create_snapshot")
async def create_snapshot(**opts: Unpack[ApiParams]) -> SnapshotInfo
Create a snapshot of the sandbox’s current state.
The sandbox will be paused while the snapshot is being created.
The snapshot can be used to create new sandboxes with the same filesystem and state.
Snapshots are persistent and survive sandbox deletion.
Use the returned snapshot_id with AsyncSandbox.create(snapshot_id) to create a new sandbox from the snapshot.
Returns:
Snapshot information including the snapshot ID
list_snapshots
@overload
def list_snapshots(limit: Optional[int] = None,
next_token: Optional[str] = None,
**opts: Unpack[ApiParams]) -> AsyncSnapshotPaginator
List snapshots for this sandbox.
Arguments:
limit: Maximum number of snapshots to return per page
next_token: Token for pagination
Returns:
Paginator for listing snapshots
list_snapshots
@overload
@staticmethod
def list_snapshots(sandbox_id: Optional[str] = None,
limit: Optional[int] = None,
next_token: Optional[str] = None,
**opts: Unpack[ApiParams]) -> AsyncSnapshotPaginator
List all snapshots.
Arguments:
sandbox_id: Filter snapshots by source sandbox ID
limit: Maximum number of snapshots to return per page
next_token: Token for pagination
Returns:
Paginator for listing snapshots
list_snapshots
@class_method_variant("_cls_list_snapshots")
def list_snapshots(limit: Optional[int] = None,
next_token: Optional[str] = None,
**opts: Unpack[ApiParams]) -> AsyncSnapshotPaginator
List snapshots for this sandbox.
Arguments:
limit: Maximum number of snapshots to return per page
next_token: Token for pagination
Returns:
Paginator for listing snapshots
delete_snapshot
@staticmethod
async def delete_snapshot(snapshot_id: str, **opts: Unpack[ApiParams]) -> bool
Delete a snapshot.
Arguments:
Returns:
True if the snapshot was deleted, False if it was not found
get_mcp_token
async def get_mcp_token() -> Optional[str]
Get the MCP token for the sandbox.
Returns:
MCP token for the sandbox, or None if MCP is not enabled.
AsyncWatchHandle
Handle for watching a directory in the sandbox filesystem.
Use .stop() to stop watching the directory.
stop
Stop watching the directory.
Filesystem
Module for interacting with the filesystem in the sandbox.
read
@overload
async def read(path: str,
format: Literal["text"] = "text",
user: Optional[Username] = None,
request_timeout: Optional[float] = None) -> str
Read file content as a str.
Arguments:
path: Path to the file
user: Run the operation as this user
format: Format of the file content—text by default
request_timeout: Timeout for the request in seconds
Returns:
File content as a str
read
@overload
async def read(path: str,
format: Literal["bytes"],
user: Optional[Username] = None,
request_timeout: Optional[float] = None) -> bytearray
Read file content as a bytearray.
Arguments:
path: Path to the file
user: Run the operation as this user
format: Format of the file content—bytes
request_timeout: Timeout for the request in seconds
Returns:
File content as a bytearray
read
@overload
async def read(
path: str,
format: Literal["stream"],
user: Optional[Username] = None,
request_timeout: Optional[float] = None) -> AsyncIterator[bytes]
Read file content as a AsyncIterator[bytes].
Arguments:
path: Path to the file
user: Run the operation as this user
format: Format of the file content—stream
request_timeout: Timeout for the request in seconds
Returns:
File content as an AsyncIterator[bytes]
write
async def write(path: str,
data: Union[str, bytes, IO],
user: Optional[Username] = None,
request_timeout: Optional[float] = None) -> WriteInfo
Write content to a file on the path.
Writing to a file that doesn’t exist creates the file.
Writing to a file that already exists overwrites the file.
Writing to a file at path that doesn’t exist creates the necessary directories.
Arguments:
path: Path to the file
data: Data to write to the file, can be a str, bytes, or IO.
user: Run the operation as this user
request_timeout: Timeout for the request in seconds
Returns:
Information about the written file
write_files
async def write_files(
files: List[WriteEntry],
user: Optional[Username] = None,
request_timeout: Optional[float] = None) -> List[WriteInfo]
Writes multiple files.
Writes a list of files to the filesystem.
When writing to a file that doesn’t exist, the file will get created.
When writing to a file that already exists, the file will get overwritten.
When writing to a file that’s in a directory that doesn’t exist, you’ll get an error.
Arguments:
files: list of files to write as WriteEntry objects, each containing path and data
user: Run the operation as this user
request_timeout: Timeout for the request
Returns:
Information about the written files
list
async def list(path: str,
depth: Optional[int] = 1,
user: Optional[Username] = None,
request_timeout: Optional[float] = None) -> List[EntryInfo]
List entries in a directory.
Arguments:
path: Path to the directory
depth: Depth of the directory to list
user: Run the operation as this user
request_timeout: Timeout for the request in seconds
Returns:
List of entries in the directory
exists
async def exists(path: str,
user: Optional[Username] = None,
request_timeout: Optional[float] = None) -> bool
Check if a file or a directory exists.
Arguments:
path: Path to a file or a directory
user: Run the operation as this user
request_timeout: Timeout for the request in seconds
Returns:
True if the file or directory exists, False otherwise
get_info
async def get_info(path: str,
user: Optional[Username] = None,
request_timeout: Optional[float] = None) -> EntryInfo
Get information about a file or directory.
Arguments:
path: Path to a file or a directory
user: Run the operation as this user
request_timeout: Timeout for the request in seconds
Returns:
Information about the file or directory like name, type, and path
remove
async def remove(path: str,
user: Optional[Username] = None,
request_timeout: Optional[float] = None) -> None
Remove a file or a directory.
Arguments:
path: Path to a file or a directory
user: Run the operation as this user
request_timeout: Timeout for the request in seconds
rename
async def rename(old_path: str,
new_path: str,
user: Optional[Username] = None,
request_timeout: Optional[float] = None) -> EntryInfo
Rename a file or directory.
Arguments:
old_path: Path to the file or directory to rename
new_path: New path to the file or directory
user: Run the operation as this user
request_timeout: Timeout for the request in seconds
Returns:
Information about the renamed file or directory
make_dir
async def make_dir(path: str,
user: Optional[Username] = None,
request_timeout: Optional[float] = None) -> bool
Create a new directory and all directories along the way if needed on the specified path.
Arguments:
path: Path to a new directory. For example ‘/dirA/dirB’ when creating ‘dirB’.
user: Run the operation as this user
request_timeout: Timeout for the request in seconds
Returns:
True if the directory was created, False if the directory already exists
watch_dir
async def watch_dir(path: str,
on_event: OutputHandler[FilesystemEvent],
on_exit: Optional[OutputHandler[Exception]] = None,
user: Optional[Username] = None,
request_timeout: Optional[float] = None,
timeout: Optional[float] = 60,
recursive: bool = False) -> AsyncWatchHandle
Watch directory for filesystem events.
Arguments:
path: Path to a directory to watch
on_event: Callback to call on each event in the directory
on_exit: Callback to call when the watching ends
user: Run the operation as this user
request_timeout: Timeout for the request in seconds
timeout: Timeout for the watch operation in seconds. Using 0 will not limit the watch time
recursive: Watch directory recursively
Returns:
AsyncWatchHandle object for stopping watching directory
Git
Async module for running git operations in the sandbox.
__init__
def __init__(commands: Commands) -> None
Create a Git helper bound to the sandbox command runner.
Arguments:
commands: Command runner used to execute git commands
clone
async def clone(url: str,
path: Optional[str] = None,
branch: Optional[str] = None,
depth: Optional[int] = None,
username: Optional[str] = None,
password: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None,
dangerously_store_credentials: bool = False)
Clone a git repository into the sandbox.
Arguments:
url: Git repository URL
path: Destination path for the clone
branch: Branch to check out
depth: If set, perform a shallow clone with this depth
username: Username for HTTP(S) authentication
password: Password or token for HTTP(S) authentication
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
dangerously_store_credentials: Store credentials in the cloned repository when True
Returns:
Command result from the command runner
init
async def init(path: str,
bare: bool = False,
initial_branch: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Initialize a new git repository.
Arguments:
path: Destination path for the repository
bare: Create a bare repository when True
initial_branch: Initial branch name (for example, “main”)
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
remote_add
async def remote_add(path: str,
name: str,
url: str,
fetch: bool = False,
overwrite: bool = False,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Add (or update) a remote for a repository.
Arguments:
path: Repository path
name: Remote name (for example, “origin”)
url: Remote URL
fetch: Fetch the remote after adding it when True
overwrite: Overwrite the remote URL if it already exists when True
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
remote_get
async def remote_get(path: str,
name: str,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None) -> Optional[str]
Get the URL for a git remote.
Returns None when the remote does not exist.
Arguments:
path: Repository path
name: Remote name (for example, “origin”)
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Remote URL if present, otherwise None
status
async def status(path: str,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None) -> GitStatus
Get repository status information.
Arguments:
path: Repository path
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Parsed git status
branches
async def branches(path: str,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None) -> GitBranches
List branches in a repository.
Arguments:
path: Repository path
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Parsed branch list
create_branch
async def create_branch(path: str,
branch: str,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Create and check out a new branch.
Arguments:
path: Repository path
branch: Branch name to create
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
checkout_branch
async def checkout_branch(path: str,
branch: str,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Check out an existing branch.
Arguments:
path: Repository path
branch: Branch name to check out
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
delete_branch
async def delete_branch(path: str,
branch: str,
force: bool = False,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Delete a branch.
Arguments:
path: Repository path
branch: Branch name to delete
force: Force deletion with -D when True
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
add
async def add(path: str,
files: Optional[List[str]] = None,
all: bool = True,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Stage files for commit.
Arguments:
path: Repository path
files: Files to add; when omitted, adds the current directory
all: When True and files is omitted, stage all changes
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
commit
async def commit(path: str,
message: str,
author_name: Optional[str] = None,
author_email: Optional[str] = None,
allow_empty: bool = False,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Create a commit in the repository.
Arguments:
path: Repository path
message: Commit message
author_name: Commit author name
author_email: Commit author email
allow_empty: Allow empty commits when True
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
reset
async def reset(path: str,
mode: Optional[str] = None,
target: Optional[str] = None,
paths: Optional[List[str]] = None,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Reset the current HEAD to a specified state.
Arguments:
path: Repository path
mode: Reset mode (soft, mixed, hard, merge, keep)
target: Commit, branch, or ref to reset to (defaults to HEAD)
paths: Paths to reset
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
restore
async def restore(path: str,
paths: List[str],
staged: Optional[bool] = None,
worktree: Optional[bool] = None,
source: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Restore working tree files or unstage changes.
Arguments:
path: Repository path
paths: Paths to restore (use [”.”] for all)
staged: When True, restore the index (unstage)
worktree: When True, restore working tree files
source: Restore from the given source (commit, branch, or ref)
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
push
async def push(path: str,
remote: Optional[str] = None,
branch: Optional[str] = None,
set_upstream: bool = True,
username: Optional[str] = None,
password: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Push commits to a remote.
Arguments:
path: Repository path
remote: Remote name, e.g. origin
branch: Branch name to push
set_upstream: Set upstream tracking when True
username: Username for HTTP(S) authentication
password: Password or token for HTTP(S) authentication
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
pull
async def pull(path: str,
remote: Optional[str] = None,
branch: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Pull changes from a remote.
Arguments:
path: Repository path
remote: Remote name, e.g. origin
branch: Branch name to pull
username: Username for HTTP(S) authentication
password: Password or token for HTTP(S) authentication
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
set_config
async def set_config(key: str,
value: str,
scope: str = "global",
path: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Set a git config value.
Use scope="local" together with path to configure a specific repository.
Arguments:
key: Git config key (e.g. pull.rebase)
value: Git config value
scope: Config scope: global, local, or system
path: Repository path required when scope is local
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
get_config
async def get_config(key: str,
scope: str = "global",
path: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None) -> Optional[str]
Get a git config value.
Returns None when the key is not set in the requested scope.
Arguments:
key: Git config key (e.g. pull.rebase)
scope: Config scope: global, local, or system
path: Repository path required when scope is local
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Config value if present, otherwise None
dangerously_authenticate
async def dangerously_authenticate(username: str,
password: str,
host: str = "github.com",
protocol: str = "https",
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Dangerously authenticate git globally via the credential helper.
This persists credentials in the credential store and may be accessable to agents running on the sandbox.
Prefer short-lived credentials when possible.
Arguments:
username: Username for HTTP(S) authentication
password: Password or token for HTTP(S) authentication
host: Host to authenticate for, defaults to github.com
protocol: Protocol to authenticate for, defaults to https
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
async def configure_user(name: str,
email: str,
scope: str = "global",
path: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
user: Optional[str] = None,
cwd: Optional[str] = None,
timeout: Optional[float] = None,
request_timeout: Optional[float] = None)
Configure git user name and email.
Arguments:
name: Git user name
email: Git user email
scope: Config scope: global, local, or system
path: Repository path required when scope is local
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
timeout: Timeout for the command connection in seconds
request_timeout: Timeout for the request in seconds
Returns:
Command result from the command runner
Commands
Module for executing commands in the sandbox.
list
async def list(request_timeout: Optional[float] = None) -> List[ProcessInfo]
Lists all running commands and PTY sessions.
Arguments:
request_timeout: Timeout for the request in seconds
Returns:
List of running commands and PTY sessions
kill
async def kill(pid: int, request_timeout: Optional[float] = None) -> bool
Kill a running command specified by its process ID.
It uses SIGKILL signal to kill the command.
Arguments:
pid: Process ID of the command. You can get the list of processes using sandbox.commands.list()
request_timeout: Timeout for the request in seconds
Returns:
True if the command was killed, False if the command was not found
send_stdin
async def send_stdin(pid: int,
data: str,
request_timeout: Optional[float] = None) -> None
Send data to command stdin.
:param pid Process ID of the command. You can get the list of processes using sandbox.commands.list().
:param data: Data to send to the command
:param request_timeout: Timeout for the request in seconds
run
@overload
async def run(cmd: str,
background: Union[Literal[False], None] = None,
envs: Optional[Dict[str, str]] = None,
user: Optional[Username] = None,
cwd: Optional[str] = None,
on_stdout: Optional[OutputHandler[Stdout]] = None,
on_stderr: Optional[OutputHandler[Stderr]] = None,
stdin: Optional[bool] = None,
timeout: Optional[float] = 60,
request_timeout: Optional[float] = None) -> CommandResult
Start a new command and wait until it finishes executing.
Arguments:
cmd: Command to execute
background: False if the command should be executed in the foreground, True if the command should be executed in the background
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
on_stdout: Callback for command stdout output
on_stderr: Callback for command stderr output
stdin: If True, the command will have a stdin stream that you can send data to using sandbox.commands.send_stdin()
timeout: Timeout for the command connection in seconds. Using 0 will not limit the command connection time
request_timeout: Timeout for the request in seconds
Returns:
CommandResult result of the command execution
run
@overload
async def run(cmd: str,
background: Literal[True],
envs: Optional[Dict[str, str]] = None,
user: Optional[Username] = None,
cwd: Optional[str] = None,
on_stdout: Optional[OutputHandler[Stdout]] = None,
on_stderr: Optional[OutputHandler[Stderr]] = None,
stdin: Optional[bool] = None,
timeout: Optional[float] = 60,
request_timeout: Optional[float] = None) -> AsyncCommandHandle
Start a new command and return a handle to interact with it.
Arguments:
cmd: Command to execute
background: False if the command should be executed in the foreground, True if the command should be executed in the background
envs: Environment variables used for the command
user: User to run the command as
cwd: Working directory to run the command
on_stdout: Callback for command stdout output
on_stderr: Callback for command stderr output
stdin: If True, the command will have a stdin stream that you can send data to using sandbox.commands.send_stdin()
timeout: Timeout for the command connection in seconds. Using 0 will not limit the command connection time
request_timeout: Timeout for the request in seconds
Returns:
AsyncCommandHandle handle to interact with the running command
connect
async def connect(
pid: int,
timeout: Optional[float] = 60,
request_timeout: Optional[float] = None,
on_stdout: Optional[OutputHandler[Stdout]] = None,
on_stderr: Optional[OutputHandler[Stderr]] = None
) -> AsyncCommandHandle
Connects to a running command.
You can use AsyncCommandHandle.wait() to wait for the command to finish and get execution results.
Arguments:
pid: Process ID of the command to connect to. You can get the list of processes using sandbox.commands.list()
request_timeout: Request timeout in seconds
timeout: Timeout for the command connection in seconds. Using 0 will not limit the command connection time
on_stdout: Callback for command stdout output
on_stderr: Callback for command stderr output
Returns:
AsyncCommandHandle handle to interact with the running command
Pty
Module for interacting with PTYs (pseudo-terminals) in the sandbox.
kill
async def kill(pid: int, request_timeout: Optional[float] = None) -> bool
Kill PTY.
Arguments:
pid: Process ID of the PTY
request_timeout: Timeout for the request in seconds
Returns:
true if the PTY was killed, false if the PTY was not found
send_stdin
async def send_stdin(pid: int,
data: bytes,
request_timeout: Optional[float] = None) -> None
Send input to a PTY.
Arguments:
pid: Process ID of the PTY
data: Input data to send
request_timeout: Timeout for the request in seconds
create
async def create(
size: PtySize,
on_data: OutputHandler[PtyOutput],
user: Optional[Username] = None,
cwd: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
timeout: Optional[float] = 60,
request_timeout: Optional[float] = None) -> AsyncCommandHandle
Start a new PTY (pseudo-terminal).
Arguments:
size: Size of the PTY
on_data: Callback to handle PTY data
user: User to use for the PTY
cwd: Working directory for the PTY
envs: Environment variables for the PTY
timeout: Timeout for the PTY in seconds
request_timeout: Timeout for the request in seconds
Returns:
Handle to interact with the PTY
connect
async def connect(
pid: int,
on_data: OutputHandler[PtyOutput],
timeout: Optional[float] = 60,
request_timeout: Optional[float] = None) -> AsyncCommandHandle
Connect to a running PTY.
Arguments:
pid: Process ID of the PTY to connect to. You can get the list of running PTYs using sandbox.pty.list().
on_data: Callback to handle PTY data
timeout: Timeout for the PTY connection in seconds. Using 0 will not limit the connection time
request_timeout: Timeout for the request in seconds
Returns:
Handle to interact with the PTY
resize
async def resize(pid: int,
size: PtySize,
request_timeout: Optional[float] = None)
Resize PTY.
Call this when the terminal window is resized and the number of columns and rows has changed.
Arguments:
pid: Process ID of the PTY
size: New size of the PTY
request_timeout: Timeout for the request in seconds
AsyncCommandHandle
class AsyncCommandHandle()
Command execution handle.
It provides methods for waiting for the command to finish, retrieving stdout/stderr, and killing the command.
pid
Command process ID.
stdout
Command stdout output.
stderr
Command stderr output.
error
Command execution error message.
exit_code
@property
def exit_code()
Command execution exit code.
0 if the command finished successfully.
It is None if the command is still running.
disconnect
async def disconnect() -> None
Disconnects from the command.
The command is not killed, but SDK stops receiving events from the command.
You can reconnect to the command using sandbox.commands.connect method.
wait
async def wait() -> CommandResult
Wait for the command to finish and return the result.
If the command exits with a non-zero exit code, it throws a CommandExitException.
Returns:
CommandResult result of command execution
kill
Kills the command.
It uses SIGKILL signal to kill the command
Returns:
True if the command was killed successfully, False if the command was not found
SandboxApi
class SandboxApi(SandboxBase)
list
@staticmethod
def list(query: Optional[SandboxQuery] = None,
limit: Optional[int] = None,
next_token: Optional[str] = None,
**opts: Unpack[ApiParams]) -> AsyncSandboxPaginator
List all running sandboxes.
Arguments:
query: Filter the list of sandboxes by metadata or state, e.g. SandboxListQuery(metadata={"key": "value"}) or SandboxListQuery(state=[SandboxState.RUNNING])
limit: Maximum number of sandboxes to return per page
next_token: Token for pagination
Returns:
List of running sandboxes
AsyncSandboxPaginator
class AsyncSandboxPaginator(SandboxPaginatorBase)
Paginator for listing sandboxes.
Example:
paginator = AsyncSandbox.list()
while paginator.has_next:
sandboxes = await paginator.next_items()
print(sandboxes)
next_items
async def next_items() -> List[SandboxInfo]
Returns the next page of sandboxes.
Call this method only if has_next is True, otherwise it will raise an exception.
Returns:
List of sandboxes
AsyncSnapshotPaginator
class AsyncSnapshotPaginator(SnapshotPaginatorBase)
Paginator for listing snapshots.
Example:
paginator = AsyncSandbox.list_snapshots()
while paginator.has_next:
snapshots = await paginator.next_items()
print(snapshots)
next_items
async def next_items() -> List[SnapshotInfo]
Returns the next page of snapshots.
Call this method only if has_next is True, otherwise it will raise an exception.
Returns:
List of snapshots