Job SDK reference
Job SDK reference
Customer-hosted cloud features
Some features are only available on customer-hosted clouds. Reach out to support@anyscale.com for info.
Job SDK
anyscale.job.submit
Submit a job.
Returns the id of the submitted job.
Arguments
config(JobConfig): The config options defining the job.
Returns: str
Examples
- Python
import anyscale
from anyscale.job.models import JobConfig, ConnectionConfig, ConnectionType
# Basic job submission
anyscale.job.submit(
JobConfig(
name="my-job",
entrypoint="python main.py",
working_dir=".",
),
)
# Job with Databricks connection for credential injection
anyscale.job.submit(
JobConfig(
name="my-job-with-databricks",
entrypoint="python main.py",
working_dir=".",
connections=[
ConnectionConfig(
type=ConnectionType.DATABRICKS,
name="my-databricks-connection",
)
],
),
)
anyscale.job.status
Get the status of a job.
Arguments
name(str | None) = None: Name of the job.id(str | None) = None: Unique ID of the jobcloud(str | None) = None: The Anyscale Cloud to run this workload on. If not provided, the organization default will be used (or, if running in a workspace, the cloud of the workspace).project(str | None) = None: Named project to use for the job. If not provided, the default project for the cloud will be used (or, if running in a workspace, the project of the workspace).include_archived(bool) = False: Include archived jobs when searching by name. Ignored when using id.
Returns: JobStatus
Examples
- Python
import anyscale
from anyscale.job.models import JobStatus
status: JobStatus = anyscale.job.status(name="my-job")
anyscale.job.terminate
Terminate a job.
This command is asynchronous, so it always returns immediately.
Returns the id of the terminated job.
Arguments
name(str | None) = None: Name of the job.id(str | None) = None: Unique ID of the jobcloud(str | None) = None: The Anyscale Cloud to run this workload on. If not provided, the organization default will be used (or, if running in a workspace, the cloud of the workspace).project(str | None) = None: Named project to use for the job. If not provided, the default project for the cloud will be used (or, if running in a workspace, the project of the workspace).include_archived(bool) = False: Include archived jobs when searching by name. Ignored when using id.
Returns: str
Examples
- Python
import anyscale
anyscale.job.terminate(name="my-job")
anyscale.job.archive
Archive a job.
This command is asynchronous, so it always returns immediately.
Returns the id of the archived job.
Arguments
name(str | None) = None: Name of the job.id(str | None) = None: Unique ID of the jobcloud(str | None) = None: The Anyscale Cloud to run this workload on. If not provided, the organization default will be used (or, if running in a workspace, the cloud of the workspace).project(str | None) = None: Named project to use for the job. If not provided, the default project for the cloud will be used (or, if running in a workspace, the project of the workspace).include_archived(bool) = False: Include archived jobs when searching by name. Ignored when using id.
Returns: str
Examples
- Python
import anyscale
anyscale.job.archive(name="my-job")
anyscale.job.delete
Delete a job and all associated job runs.
The job must be in a terminal state (SUCCEEDED, FAILED). This action permanently removes the job and cannot be undone.
Returns the id of the deleted job.
Arguments
name(str | None) = None: Name of the job.id(str | None) = None: Unique ID of the jobcloud(str | None) = None: The Anyscale Cloud to run this workload on. If not provided, the organization default will be used (or, if running in a workspace, the cloud of the workspace).project(str | None) = None: Named project to use for the job. If not provided, the default project for the cloud will be used (or, if running in a workspace, the project of the workspace).include_archived(bool) = False: Include archived jobs when searching by name. Ignored when using id.
Returns: str
Examples
- Python
import anyscale
anyscale.job.delete(name="my-job")
anyscale.job.get_logs
Get the logs for a job.
Arguments
id(str | None) = None: Unique ID of the jobname(str | None) = None: Name of the jobcloud(str | None) = None: The Anyscale Cloud to run this workload on. If not provided, the organization default will be used (or, if running in a workspace, the cloud of the workspace).project(str | None) = None: Named project to use for the job. If not provided, the default project for the cloud will be used (or, if running in a workspace, the project of the workspace).run(str | None) = None: The name of the run to query. Names can be found in the JobStatus. If not provided, the last job run will be used.mode(str | JobLogMode) = TAIL: The mode of log fetching to be used. Supported modes can be found in JobLogMode. If not provided, JobLogMode.TAIL will be used.max_lines(int | None) = None: The number of log lines to be fetched. If not provided, the complete log will be fetched.include_archived(bool) = False: Include archived jobs when searching by name. Ignored when using id.
Returns: str
Examples
- Python
import anyscale
anyscale.job.get_logs(name="my-job", run="job-run-name")
anyscale.job.wait
Wait for a job to enter a specific state.
Arguments
name(str | None) = None: Name of the job.id(str | None) = None: Unique ID of the jobcloud(str | None) = None: The Anyscale Cloud to run this workload on. If not provided, the organization default will be used (or, if running in a workspace, the cloud of the workspace).project(str | None) = None: Named project to use for the job. If not provided, the default project for the cloud will be used (or, if running in a workspace, the project of the workspace).state(JobState | str) = SUCCEEDED: Target state of the jobtimeout_s(float) = 1800: Number of seconds to wait before timing out, this timeout will not affect job executionfollow(bool) = False: Whether to follow the logs of the job. If True, the logs will be streamed to the console.include_archived(bool) = False: Include archived jobs when searching by name. Ignored when using id.
Examples
- Python
import anyscale
anyscale.job.wait(name="my-job", timeout_s=180)
anyscale.job.list
List jobs with filtering and pagination.
Returns a ResultIterator that lazily fetches pages of jobs.
Arguments
name(str | None) = None: Filter by job name.job_id(str | None) = None: Fetch a specific job by ID.project(str | None) = None: Filter by project name.cloud(str | None) = None: Filter by cloud name.include_all_users(bool) = False: Include jobs from all users.include_archived(bool) = False: Include archived jobs.state_filter(List[JobState | str] | None) = None: Filter by job states (list of JobState or str).tags_filter(Dict[str, List[str]] | None) = None: Filter by tags (dict of key to list of values).page_size(int | None) = None: Number of items per page.max_items(int | None) = None: Maximum total items to return.sort_field(str | None) = None: Field to sort by (CREATED_AT, NAME, STATUS, etc.).sort_order(str | None) = None: Sort order (ASC or DESC).created_at_from(str | None) = None: Filter for jobs created at or after this time (e.g. 2026-01-01T00:00:00Z).created_at_to(str | None) = None: Filter for jobs created at or before this time (e.g. 2026-12-31T23:59:59Z).updated_at_from(str | None) = None: Filter for jobs updated at or after this time (e.g. 2026-01-01T00:00:00Z).updated_at_to(str | None) = None: Filter for jobs updated at or before this time (e.g. 2026-12-31T23:59:59Z).status_updated_at_from(str | None) = None: Filter for jobs whose status was last updated at or after this time (e.g. 2026-01-01T00:00:00Z).status_updated_at_to(str | None) = None: Filter for jobs whose status was last updated at or before this time (e.g. 2026-12-31T23:59:59Z).
Returns: ResultIterator[JobStatus]
Examples
- Python
import anyscale
from anyscale.job.models import JobStatus
# List all jobs
for job in anyscale.job.list(max_items=10):
print(f"{job.name}: {job.state}")
# Filter by project
jobs = list(anyscale.job.list(project="my-project"))
anyscale.job.add_tags
Upsert (add/update) tag key/value pairs for a job.
Arguments
job_id(str | None) = None: ID of the job. Provide either job_id or name.name(str | None) = None: Name of the job. Provide either job_id or name.cloud(str | None) = None: Cloud name (used when resolving by name).project(str | None) = None: Project name (used when resolving by name).tags(Dict[str, str]): Key/value tags to upsert as a map {key: value}.include_archived(bool) = False: Include archived jobs when searching by name. Ignored when using job_id.
Examples
- Python
import anyscale
anyscale.job.add_tags(job_id="job_123", tags={"team": "mlops", "env": "prod"})
anyscale.job.remove_tags
Remove tags by key from a job.
Arguments
job_id(str | None) = None: ID of the job. Provide either job_id or name.name(str | None) = None: Name of the job. Provide either job_id or name.cloud(str | None) = None: Cloud name (used when resolving by name).project(str | None) = None: Project name (used when resolving by name).keys(List[str]): List of tag keys to remove.include_archived(bool) = False: Include archived jobs when searching by name. Ignored when using job_id.
Examples
- Python
import anyscale
anyscale.job.remove_tags(job_id="job_123", keys=["team", "env"])
anyscale.job.list_tags
List tags for a job as a key/value mapping.
Arguments
job_id(str | None) = None: ID of the job. Provide either job_id or name.name(str | None) = None: Name of the job. Provide either job_id or name.cloud(str | None) = None: Cloud name (used when resolving by name).project(str | None) = None: Project name (used when resolving by name).include_archived(bool) = False: Include archived jobs when searching by name. Ignored when using job_id.
Returns: Dict[str, str]
Examples
- Python
import anyscale
tags: dict[str, str] = anyscale.job.list_tags(name="my-job")
Job models
JobQueueSpec
Options defining a job queue.
When the first job with a given job queue spec is submitted, the job queue will be created. Subsequent jobs with the same job queue spec will reuse the queue instead of creating another one.
Jobs can also target an existing queue using the name parameter.
Fields
idle_timeout_s(int): Timeout that the job queue cluster will be kept running while no jobs are running.name(str | None): Name of the job queue that can be used to target it when submitting future jobs. The name of a job queue must be unique within a project.execution_mode(JobQueueExecutionMode): Execution mode of the jobs submitted into the queue (one of: FIFO,LIFO,PRIORITY).compute_config(str | None): The name of an existing compute config that will be used to create the job queue cluster. If not specified, the compute config of the associated job will be used.max_concurrency(int): Max number of jobs that can run concurrently. Defaults to 1, meaning only one job can run at a given time.auto_termination_threshold_job_count(int | None): Maximum number of jobs the cluster can run before it becomes eligible for termination
Python Methods
def to_dict(self) -> Dict[str, Any]
"""Return a dictionary representation of the model."""
Examples
- YAML
- Python
job_queue_spec:
# Unique name that can be used to target this queue by other jobs.
name: my-job-queue
execution_mode: FIFO
# Name of a compute config that will be used to create a cluster to execute jobs in this queue.
# Must match the compute config of the job if specified.
compute_config: my-compute-config:1
max_concurrency: 5
idle_timeout_s: 3600
import anyscale
from anyscale.job.models import JobQueueSpec, JobQueueExecutionMode
job_queue_spec = JobQueueSpec(
# Unique name that can be used to target this queue by other jobs.
name="my-job-queue",
execution_mode=JobQueueExecutionMode.FIFO,
# Name of a compute config that will be used to create a cluster to execute jobs in this queue.
# Must match the compute config of the job if specified.
compute_config="my-compute-config:1",
max_concurrency=5,
idle_timeout_s=3600,
)
JobQueueConfig
Configuration options for a job related to using a job queue for scheduling and execution.
Fields
priority(int | None): Job's relative priority (only relevant for Job Queues of type PRIORITY). Valid values range from 0 (highest) to +inf (lowest). Default value is Nonetarget_job_queue_name(str | None): The name of an existing job queue to schedule this job in. If this is provided, job_queue_spec cannot be.job_queue_spec(JobQueueSpec | None): Configuration options defining a job queue to be created for the job if needed. If this is provided, target_job_queue_name cannot be.
Python Methods
def __init__(self, **fields) -> JobQueueConfig
"""Construct a model with the provided field values set."""
def options(self, **fields) -> JobQueueConfig
"""Return a copy of the model with the provided field values overwritten."""
def to_dict(self) -> Dict[str, Any]
"""Return a dictionary representation of the model."""
Examples
- YAML
- Python
# An example configuration that creates a job queue if one does not exist with the provided options.
job_queue_config:
# Priority of the job (only relevant if the execution_mode is "PRIORITY").
priority: 100
# Specification of the target Job Queue (will be created if does not exist)
job_queue_spec:
name: my-job-queue
compute_config: my-compute-config:1
idle_timeout_s: 3600
# An example config that targets an existing job queue by name.
job_queue_config:
# Priority of the job (only relevant if the execution_mode is "PRIORITY").
priority: 100
# Name for the job queue this job should be added to (specified in `JobQueueSpec.name`
# on queue's creation)
target_job_queue_name: my-new-queue
import anyscale
from anyscale.job.models import JobQueueConfig
# An example configuration that creates a job queue if one does not exist with the provided options.
job_queue_config = JobQueueConfig(
# Priority of the job (only relevant if the execution_mode is "PRIORITY").
priority=100,
# Specification of the target Job Queue (will be created if does not exist)
job_queue_spec=JobQueueSpec(
name="my-job-queue",
compute_config="my-compute-config:1",
idle_timeout_s=3600,
),
)
# An example config that targets an existing job queue by name.
job_queue_config = JobQueueConfig(
# Job's priority (only relevant for priority queues)
priority=100,
# Name for the job queue this job should be added to (specified in `JobQueueSpec.name`
# on queue's creation)
target_job_queue_name="my-new-queue"
)
JobQueueExecutionMode
Execution mode for job queues.
Values
FIFO: Executes jobs in chronological order ('first in, first out')LIFO: Executes jobs in reversed chronological order ('last in, first out')PRIORITY: Executes jobs in the order induced by ordering their priorities in ascending order, with 0 being the highest priority
JobConfig
Configuration options for a job.
Fields
name(str | None): Name of the job. Multiple jobs can be submitted with the same name.image_uri(str | None): URI of an existing image. Exclusive withcontainerfile.containerfile(str | None): The file path to a containerfile that will be built into an image before running the workload. Exclusive withimage_uri.compute_config(ComputeConfig | MultiResourceComputeConfig | Dict | str | None): The name of an existing registered compute config or an inlined ComputeConfig or MultiResourceComputeConfig object.working_dir(str | None): Directory that will be used as the working directory for the application. If a local directory is provided, it will be uploaded to cloud storage automatically. When running inside a workspace, this defaults to the current working directory ('.').excludes(List[str] | None): A list of file path globs that will be excluded when uploading local files forworking_dir.requirements(str | List[str] | None): A list of pip requirements or a path to arequirements.txtfile for the workload. Anyscale installs these dependencies on top of the image. If you run a workload from a workspace, the default is to use the workspace dependencies, but specifying this option overrides them.env_vars(Dict[str, str] | None): A dictionary of environment variables that will be set for the workload.py_modules(List[str] | None): A list of local directories or remote URIs that will be uploaded and added to the Python path.py_executable(str | None): Specifies the executable used for running the Ray workers. It can include arguments as well.cloud(str | None): The Anyscale Cloud to run this workload on. If not provided, the organization default will be used (or, if running in a workspace, the cloud of the workspace).project(str | None): The project for the workload. If not provided, the default project for the cloud will be used (or, if running in a workspace, the project of the workspace).registry_login_secret(str | None): A name or identifier of the secret containing credentials to authenticate to the docker registry hosting the image. This can only be used when 'image_uri' is specified and the image is not hosted on Anyscale.ray_version(str | None): The Ray version (X.Y.Z) specified for this image specified by either an image URI or a containerfile. If you don't specify a Ray version, Anyscale defaults to the latest Ray version available at the time of the Anyscale CLI/SDK release.connections(List[ConnectionConfig | Dict] | None): Connections to third-party integrations (e.g., Databricks) to associate with the workload. This feature is in beta preview. Contact Anyscale support to request enablement.entrypoint(str): Command that will be run to execute the job, e.g.,python main.py.max_retries(int | None): Maximum number of times the job will be retried before being marked failed. Defaults to1.job_queue_config(JobQueueConfig | None): Job's configuration related to scheduling & execution using job queuestimeout_s(int | None): The timeout in seconds for each job run. Set to None for no limit to be set.tags(Dict[str, str] | None): Tags to associate with the job.
Python Methods
def __init__(self, **fields) -> JobConfig
"""Construct a model with the provided field values set."""
def options(self, **fields) -> JobConfig
"""Return a copy of the model with the provided field values overwritten."""
def to_dict(self) -> Dict[str, Any]
"""Return a dictionary representation of the model."""
Examples
- YAML
- Python
name: my-job
entrypoint: python main.py
image_uri: anyscale/image/my-image:1 # (Optional) Exclusive with `containerfile`.
containerfile: /path/to/Dockerfile # (Optional) Exclusive with `image_uri`.
compute_config: my-compute-config:1 # (Optional) An inline dictionary can also be provided.
working_dir: /path/to/working_dir # (Optional) Defaults to `.`.
excludes: # (Optional) List of files to exclude from being packaged up for the job.
- .git
- .env
- .DS_Store
- __pycache__
requirements: # (Optional) List of requirements files to install. Can also be a path to a requirements.txt.
- emoji==1.2.0
- numpy==1.19.5
env_vars: # (Optional) Dictionary of environment variables to set in the job.
MY_ENV_VAR: my_value
ANOTHER_ENV_VAR: another_value
py_modules: # (Optional) A list of local directories or remote URIs that will be added to the Python path.
- /path/to/my_module
- s3://my_bucket/my_module
cloud: anyscale-prod # (Optional) The name of the Anyscale Cloud.
project: my-project # (Optional) The name of the Anyscale Project.
max_retries: 3 # (Optional) Maximum number of times the job will be retried before being marked failed. Defaults to `1`.
tags:
team: mlops
purpose: training
connections: # (Optional) List of third-party connections for credential injection.
- type: databricks
name: my-databricks-connection
from anyscale.job.models import JobConfig
config = JobConfig(
name="my-job",
entrypoint="python main.py",
max_retries=1,
# An inline `ComputeConfig` or `MultiResourceComputeConfig` can also be provided.
compute_config="my-compute-config:1",
# A containerfile path can also be provided.
image_uri="anyscale/image/my-image:1",
)
ConnectionConfig
Configuration for a third-party connection.
Connections allow workloads (jobs, workspaces, etc.) to access external services like Databricks Unity Catalog. Each connection is identified by its type and name.
This feature is in beta preview. Contact Anyscale support to request enablement.
Fields
type(ConnectionType): The type of connection (e.g., DATABRICKS).name(str): The name of the connection as registered in the organization settings.
Python Methods
def __init__(self, **fields) -> ConnectionConfig
"""Construct a model with the provided field values set."""
def options(self, **fields) -> ConnectionConfig
"""Return a copy of the model with the provided field values overwritten."""
def to_dict(self) -> Dict[str, Any]
"""Return a dictionary representation of the model."""
Examples
- YAML
- Python
connections:
- type: databricks
name: my-databricks-connection
from anyscale._private.models.integrations import ConnectionConfig, ConnectionType
connection = ConnectionConfig(
type=ConnectionType.DATABRICKS,
name="my-databricks-connection",
)
ConnectionType
Type of third-party connection.
Values
DATABRICKS: Databricks connection for Unity Catalog access
JobState
Current state of a job.
Values
STARTING: The job is being started and is not yet running.RUNNING: The job is running. A job will have state RUNNING if a job run fails and there are remaining retries.FAILED: The job did not finish running or the entrypoint returned an exit code other than 0 after retrying up to max_retries times.SUCCEEDED: The job finished running and its entrypoint returned exit code 0.TERMINATED: The job was terminated before completion.UNKNOWN: The CLI/SDK received an unexpected state from the API server. In most cases, this means you need to update the CLI.
JobStatus
Current status of a job.
Fields
id(str): Unique ID of the job (generated when the job is first submitted).name(str): Name of the job. Multiple jobs can be submitted with the same name.state(str | JobState): Current state of the job.config(JobConfig): Configuration of the job.runs(List[JobRunStatus]): List of job run states.creator_id(str): ID of the user who created the job.created_at(datetime | None): Timestamp when the job was created.updated_at(datetime | None): Timestamp when the job was last updated.status_updated_at(datetime | None): Timestamp when the job's status was last updated.
Python Methods
def to_dict(self) -> Dict[str, Any]
"""Return a dictionary representation of the model."""
Examples
- Python
- CLI
import anyscale
from anyscale.job.models import JobStatus
status: JobStatus = anyscale.job.status(name="my-job")
$ anyscale job status -n my-job
id: prodjob_3suiybn8r7dhz92yv63jqzm473
name: my-job
state: STARTING
JobRunStatus
Current status of an individual job run.
Fields
name(str): Name of the job run.state(str | JobRunState): Current state of the job run.
Python Methods
def to_dict(self) -> Dict[str, Any]
"""Return a dictionary representation of the model."""
Examples
- Python
- CLI
import anyscale
from anyscale.job.models import JobRunStatus
run_statuses: List[JobRunStatus] = anyscale.job.status(name="my-job").runs
$ anyscale job status -n my-job
id: prodjob_6ntzknwk1i9b1uw1zk1gp9dbhe
name: my-job
state: STARTING
runs:
- name: raysubmit_ynxBVGT1SmzndiXL
state: SUCCEEDED
JobRunState
Current state of an individual job run.
Values
STARTING: The job run is being started and is not yet running.RUNNING: The job run is running.FAILED: The job run did not finish running or the entrypoint returned an exit code other than 0.SUCCEEDED: The job run finished running and its entrypoint returned exit code 0.UNKNOWN: The CLI/SDK received an unexpected state from the API server. In most cases, this means you need to update the CLI.
JobLogMode
Mode to use for getting job logs.
Values
HEAD: Fetch logs from the start of the job's log.TAIL: Fetch logs from the end of the job's log.
JobSortField
Fields available for sorting jobs.
Values
STATUS: Sort by job status.CREATED_AT: Sort by creation timestamp.LAST_UPDATED_AT: Sort by last update timestamp.LAST_STATUS_UPDATED_AT: Sort by last status update timestamp.ENDED_AT: Sort by end timestamp.NAME: Sort by job name.ID: Sort by job ID.
JobSortOrder
Enum for sort order directions.
Values
ASC: Sort in ascending order.DESC: Sort in descending order.