AsyncSandbox

class AsyncSandbox(SandboxSetup, SandboxApi)

E2B cloud sandbox is a secure and isolated cloud environment.

The sandbox allows you to:

  • Access Linux OS
  • Create, list, and delete files and directories
  • Run commands
  • Run isolated code
  • Access the internet

Check docs here.

Use the AsyncSandbox.create() to create a new sandbox.

Example:

from e2b import AsyncSandbox

sandbox = await AsyncSandbox.create()

files

@property
def files() -> Filesystem

Module for interacting with the sandbox filesystem.

commands

@property
def commands() -> Commands

Module for running commands in the sandbox.

pty

@property
def pty() -> Pty

Module for interacting with the sandbox pseudo-terminal.

sandbox_id

@property
def sandbox_id() -> str

Unique identifier of the sandbox.

__init__

def __init__(**opts: Unpack[AsyncSandboxOpts])

Use AsyncSandbox.create() to create a new sandbox instead.

is_running

async def is_running(request_timeout: Optional[float] = None) -> bool

Check if the sandbox is running.

Arguments:

  • request_timeout: Timeout for the request in seconds

Returns:

True if the sandbox is running, False otherwise Example

sandbox = await AsyncSandbox.create()
await sandbox.is_running() # Returns True

await sandbox.kill()
await sandbox.is_running() # Returns False

create

@classmethod
async def create(cls,
                 template: Optional[str] = None,
                 timeout: Optional[int] = None,
                 metadata: Optional[Dict[str, str]] = None,
                 envs: Optional[Dict[str, str]] = None,
                 api_key: Optional[str] = None,
                 domain: Optional[str] = None,
                 debug: Optional[bool] = None,
                 request_timeout: Optional[float] = None)

Create a new sandbox.

By default, the sandbox is created from the default base sandbox template.

Arguments:

  • template: Sandbox template name or ID
  • timeout: Timeout for the sandbox in seconds, default to 300 seconds. Maximum time a sandbox can be kept alive is 24 hours (86_400 seconds) for Pro users and 1 hour (3_600 seconds) for Hobby users.
  • metadata: Custom metadata for the sandbox
  • envs: Custom environment variables for the sandbox
  • api_key: E2B API Key to use for authentication, defaults to E2B_API_KEY environment variable
  • request_timeout: Timeout for the request in seconds

Returns:

sandbox instance for the new sandbox Use this method instead of using the constructor to create a new sandbox.

connect

@classmethod
async def connect(cls,
                  sandbox_id: str,
                  api_key: Optional[str] = None,
                  domain: Optional[str] = None,
                  debug: Optional[bool] = None)

Connect to an existing sandbox.

With sandbox ID you can connect to the same sandbox from different places or environments (serverless functions, etc).

Arguments:

  • sandbox_id: Sandbox ID
  • api_key: E2B API Key to use for authentication, defaults to E2B_API_KEY environment variable

Returns:

sandbox instance for the existing sandbox @example

sandbox = await AsyncSandbox.create()
sandbox_id = sandbox.sandbox_id

same_sandbox = await AsyncSandbox.connect(sandbox_id)


#### kill

```python
@overload
async def kill(request_timeout: Optional[float] = None) -> bool

Kill the sandbox.

Arguments:

  • request_timeout: Timeout for the request in seconds

Returns:

True if the sandbox was killed, False if the sandbox was not found

kill

@overload
@staticmethod
async def kill(sandbox_id: str,
               api_key: Optional[str] = None,
               domain: Optional[str] = None,
               debug: Optional[bool] = None,
               request_timeout: Optional[float] = None) -> bool

Kill the sandbox specified by sandbox ID.

Arguments:

  • sandbox_id: Sandbox ID
  • api_key: E2B API Key to use for authentication, defaults to E2B_API_KEY environment variable
  • request_timeout: Timeout for the request in seconds

Returns:

True if the sandbox was killed, False if the sandbox was not found

set_timeout

@overload
async def set_timeout(timeout: int,
                      request_timeout: Optional[float] = None) -> None

Set the timeout of the sandbox.

After the timeout expires the sandbox will be automatically killed. This method can extend or reduce the sandbox timeout set when creating the sandbox or from the last call to .set_timeout.

Maximum time a sandbox can be kept alive is 24 hours (86_400 seconds) for Pro users and 1 hour (3_600 seconds) for Hobby users.

Arguments:

  • timeout: Timeout for the sandbox in seconds
  • request_timeout: Timeout for the request in seconds

set_timeout

@overload
@staticmethod
async def set_timeout(sandbox_id: str,
                      timeout: int,
                      api_key: Optional[str] = None,
                      domain: Optional[str] = None,
                      debug: Optional[bool] = None,
                      request_timeout: Optional[float] = None) -> None

Set the timeout of the specified sandbox.

After the timeout expires the sandbox will be automatically killed. This method can extend or reduce the sandbox timeout set when creating the sandbox or from the last call to .set_timeout.

Maximum time a sandbox can be kept alive is 24 hours (86_400 seconds) for Pro users and 1 hour (3_600 seconds) for Hobby users.

Arguments:

  • sandbox_id: Sandbox ID
  • timeout: Timeout for the sandbox in seconds
  • request_timeout: Timeout for the request in seconds

Filesystem

class Filesystem()

Module for interacting with the filesystem in the sandbox.

read

@overload
async def read(path: str,
               format: Literal["text"] = "text",
               user: Username = "user",
               request_timeout: Optional[float] = None) -> str

Read file content as a str.

Arguments:

  • path: Path to the file
  • user: Run the operation as this user
  • format: Format of the file content—text by default
  • request_timeout: Timeout for the request in seconds

Returns:

File content as a str

read

@overload
async def read(path: str,
               format: Literal["bytes"],
               user: Username = "user",
               request_timeout: Optional[float] = None) -> bytearray

Read file content as a bytearray.

Arguments:

  • path: Path to the file
  • user: Run the operation as this user
  • format: Format of the file content—bytes
  • request_timeout: Timeout for the request in seconds

Returns:

File content as a bytearray

read

@overload
async def read(
        path: str,
        format: Literal["stream"],
        user: Username = "user",
        request_timeout: Optional[float] = None) -> AsyncIterator[bytes]

Read file content as a AsyncIterator[bytes].

Arguments:

  • path: Path to the file
  • user: Run the operation as this user
  • format: Format of the file content—stream
  • request_timeout: Timeout for the request in seconds

Returns:

File content as an AsyncIterator[bytes]

write

async def write(path: str,
                data: Union[str, bytes, IO],
                user: Username = "user",
                request_timeout: Optional[float] = None) -> EntryInfo

Write content to a file on the path.

Writing to a file that doesn't exist creates the file.

Writing to a file that already exists overwrites the file.

Writing to a file at path that doesn't exist creates the necessary directories.

Arguments:

  • path: Path to the file
  • data: Data to write to the file, can be a str, bytes, or IO.
  • user: Run the operation as this user
  • request_timeout: Timeout for the request in seconds

Returns:

Information about the written file

list

async def list(path: str,
               user: Username = "user",
               request_timeout: Optional[float] = None) -> List[EntryInfo]

List entries in a directory.

Arguments:

  • path: Path to the directory
  • user: Run the operation as this user
  • request_timeout: Timeout for the request in seconds

Returns:

List of entries in the directory

exists

async def exists(path: str,
                 user: Username = "user",
                 request_timeout: Optional[float] = None) -> bool

Check if a file or a directory exists.

Arguments:

  • path: Path to a file or a directory
  • user: Run the operation as this user
  • request_timeout: Timeout for the request in seconds

Returns:

True if the file or directory exists, False otherwise

remove

async def remove(path: str,
                 user: Username = "user",
                 request_timeout: Optional[float] = None) -> None

Remove a file or a directory.

Arguments:

  • path: Path to a file or a directory
  • user: Run the operation as this user
  • request_timeout: Timeout for the request in seconds

rename

async def rename(old_path: str,
                 new_path: str,
                 user: Username = "user",
                 request_timeout: Optional[float] = None) -> EntryInfo

Rename a file or directory.

Arguments:

  • old_path: Path to the file or directory to rename
  • new_path: New path to the file or directory
  • user: Run the operation as this user
  • request_timeout: Timeout for the request in seconds

Returns:

Information about the renamed file or directory

make_dir

async def make_dir(path: str,
                   user: Username = "user",
                   request_timeout: Optional[float] = None) -> bool

Create a new directory and all directories along the way if needed on the specified path.

Arguments:

  • path: Path to a new directory. For example '/dirA/dirB' when creating 'dirB'.
  • user: Run the operation as this user
  • request_timeout: Timeout for the request in seconds

Returns:

True if the directory was created, False if the directory already exists

watch_dir

async def watch_dir(path: str,
                    on_event: OutputHandler[FilesystemEvent],
                    on_exit: Optional[OutputHandler[Exception]] = None,
                    user: Username = "user",
                    request_timeout: Optional[float] = None,
                    timeout: Optional[float] = 60) -> AsyncWatchHandle

Watch directory for filesystem events.

Arguments:

  • path: Path to a directory to watch
  • on_event: Callback to call on each event in the directory
  • on_exit: Callback to call when the watching ends
  • user: Run the operation as this user
  • request_timeout: Timeout for the request in seconds
  • timeout: Timeout for the watch operation in seconds. Using 0 will not limit the watch time

Returns:

AsyncWatchHandle object for stopping watching directory

AsyncWatchHandle

class AsyncWatchHandle()

Handle for watching a directory in the sandbox filesystem.

Use .stop() to stop watching the directory.

stop

async def stop()

Stop watching the directory.

Commands

class Commands()

Module for executing commands in the sandbox.

list

async def list(request_timeout: Optional[float] = None) -> List[ProcessInfo]

Lists all running commands and PTY sessions.

Arguments:

  • request_timeout: Timeout for the request in seconds

Returns:

List of running commands and PTY sessions

kill

async def kill(pid: int, request_timeout: Optional[float] = None) -> bool

Kill a running command specified by its process ID.

It uses SIGKILL signal to kill the command.

Arguments:

  • pid: Process ID of the command. You can get the list of processes using sandbox.commands.list()
  • request_timeout: Timeout for the request in seconds

Returns:

True if the command was killed, False if the command was not found

send_stdin

async def send_stdin(pid: int,
                     data: str,
                     request_timeout: Optional[float] = None) -> None

Send data to command stdin.

:param pid Process ID of the command. You can get the list of processes using sandbox.commands.list(). :param data: Data to send to the command :param request_timeout: Timeout for the request in seconds

run

@overload
async def run(cmd: str,
              background: Union[Literal[False], None] = None,
              envs: Optional[Dict[str, str]] = None,
              user: Username = "user",
              cwd: Optional[str] = None,
              on_stdout: Optional[OutputHandler[Stdout]] = None,
              on_stderr: Optional[OutputHandler[Stderr]] = None,
              timeout: Optional[float] = 60,
              request_timeout: Optional[float] = None) -> CommandResult

Start a new command and wait until it finishes executing.

Arguments:

  • cmd: Command to execute
  • background: False if the command should be executed in the foreground, True if the command should be executed in the background
  • envs: Environment variables used for the command
  • user: User to run the command as
  • cwd: Working directory to run the command
  • on_stdout: Callback for command stdout output
  • on_stderr: Callback for command stderr output
  • timeout: Timeout for the command connection in seconds. Using 0 will not limit the command connection time
  • request_timeout: Timeout for the request in seconds

Returns:

CommandResult result of the command execution

run

@overload
async def run(cmd: str,
              background: Literal[True],
              envs: Optional[Dict[str, str]] = None,
              user: Username = "user",
              cwd: Optional[str] = None,
              on_stdout: Optional[OutputHandler[Stdout]] = None,
              on_stderr: Optional[OutputHandler[Stderr]] = None,
              timeout: Optional[float] = 60,
              request_timeout: Optional[float] = None) -> AsyncCommandHandle

Start a new command and return a handle to interact with it.

Arguments:

  • cmd: Command to execute
  • background: False if the command should be executed in the foreground, True if the command should be executed in the background
  • envs: Environment variables used for the command
  • user: User to run the command as
  • cwd: Working directory to run the command
  • on_stdout: Callback for command stdout output
  • on_stderr: Callback for command stderr output
  • timeout: Timeout for the command connection in seconds. Using 0 will not limit the command connection time
  • request_timeout: Timeout for the request in seconds

Returns:

AsyncCommandHandle handle to interact with the running command

connect

async def connect(
        pid: int,
        timeout: Optional[float] = 60,
        request_timeout: Optional[float] = None,
        on_stdout: Optional[OutputHandler[Stdout]] = None,
        on_stderr: Optional[OutputHandler[Stderr]] = None
) -> AsyncCommandHandle

Connects to a running command.

You can use AsyncCommandHandle.wait() to wait for the command to finish and get execution results.

Arguments:

  • pid: Process ID of the command to connect to. You can get the list of processes using sandbox.commands.list()
  • request_timeout: Request timeout in seconds
  • timeout: Timeout for the command connection in seconds. Using 0 will not limit the command connection time
  • on_stdout: Callback for command stdout output
  • on_stderr: Callback for command stderr output

Returns:

AsyncCommandHandle handle to interact with the running command

Pty

class Pty()

Module for interacting with PTYs (pseudo-terminals) in the sandbox.

kill

async def kill(pid: int, request_timeout: Optional[float] = None) -> bool

Kill PTY.

Arguments:

  • pid: Process ID of the PTY
  • request_timeout: Timeout for the request in seconds

Returns:

true if the PTY was killed, false if the PTY was not found

send_stdin

async def send_stdin(pid: int,
                     data: bytes,
                     request_timeout: Optional[float] = None) -> None

Send input to a PTY.

Arguments:

  • pid: Process ID of the PTY
  • data: Input data to send
  • request_timeout: Timeout for the request in seconds

create

async def create(
        size: PtySize,
        on_data: OutputHandler[PtyOutput],
        user: Username = "user",
        cwd: Optional[str] = None,
        envs: Optional[Dict[str, str]] = None,
        timeout: Optional[float] = 60,
        request_timeout: Optional[float] = None) -> AsyncCommandHandle

Start a new PTY (pseudo-terminal).

Arguments:

  • size: Size of the PTY
  • on_data: Callback to handle PTY data
  • user: User to use for the PTY
  • cwd: Working directory for the PTY
  • envs: Environment variables for the PTY
  • timeout: Timeout for the PTY in seconds
  • request_timeout: Timeout for the request in seconds

Returns:

Handle to interact with the PTY

resize

async def resize(pid: int,
                 size: PtySize,
                 request_timeout: Optional[float] = None)

Resize PTY.

Call this when the terminal window is resized and the number of columns and rows has changed.

Arguments:

  • pid: Process ID of the PTY
  • size: New size of the PTY
  • request_timeout: Timeout for the request in seconds

AsyncCommandHandle

class AsyncCommandHandle()

Command execution handle.

It provides methods for waiting for the command to finish, retrieving stdout/stderr, and killing the command.

pid

@property
def pid()

Command process ID.

stdout

@property
def stdout()

Command stdout output.

stderr

@property
def stderr()

Command stderr output.

error

@property
def error()

Command execution error message.

exit_code

@property
def exit_code()

Command execution exit code.

0 if the command finished successfully.

It is None if the command is still running.

disconnect

async def disconnect() -> None

Disconnects from the command.

The command is not killed, but SDK stops receiving events from the command. You can reconnect to the command using sandbox.commands.connect method.

wait

async def wait() -> CommandResult

Wait for the command to finish and return the result.

If the command exits with a non-zero exit code, it throws a CommandExitException.

Returns:

CommandResult result of command execution

kill

async def kill() -> bool

Kills the command.

It uses SIGKILL signal to kill the command

Returns:

True if the command was killed successfully, False if the command was not found

SandboxApi

class SandboxApi(SandboxApiBase)

list

@classmethod
async def list(cls,
               api_key: Optional[str] = None,
               domain: Optional[str] = None,
               debug: Optional[bool] = None,
               request_timeout: Optional[float] = None) -> List[SandboxInfo]

List all running sandboxes.

Arguments:

  • api_key: API key to use for authentication, defaults to E2B_API_KEY environment variable
  • request_timeout: Timeout for the request in seconds

Returns:

List of running sandboxes