Skip to main content

Frequently Asked Questions

Check your docs version

This version of the Anyscale docs is deprecated. Go to the latest version for up to date information.

How do I leverage Ray AIR fault tolerance with Anyscale Jobs?

You can configure fault tolerance for training and tuning workloads by using Ray Tune with Anyscale Jobs.

To enable this fault tolerance, Tune experiment state needs to be saved to some persistent storage location. We show 2 ways of configuring storage options in Tune:

Network Filesystem (NFS)

Anyscale Jobs provides a few mounted network filesystems out of the box. Using one of these provided NFS mounts is the recommended way to use Tune with Anyscale Jobs. See this section on storage options for the full list.

For the following example, we'll use the /mnt/user_storage as the location where all nodes save their experiment outputs (like model checkpoints and reported results). Using this storage location will persist experiment data through cluster restarts when the Anyscale Job retries on a failure.

tune_nfs_example.py
import argparse
import os

import anyscale
from ray import tune
from ray.air import CheckpointConfig, RunConfig
from ray.rllib.algorithms.ppo import PPO

parser = argparse.ArgumentParser()
parser.add_argument(
"--experiment-name",
type=str,
default="anyscale_job_tune_nfs_example",
help=(
"Tune experiment name. "
"Experiment outputs will be saved to: "
"`/mnt/user_storage/ray_results/<experiment_name>`."
"Be sure to specify a unique experiment name for each new job. "
"Otherwise, this script will try to restore from an existing experiment."
),
)
args = parser.parse_args()

local_dir = "/mnt/user_storage/ray_results"
experiment_dir = os.path.join(local_dir, args.experiment_name)

if os.path.exists(experiment_dir):
# Anyscale Job retry
# The experiment directory exists, and we want to resume from where we left off
tuner = tune.Tuner.restore(experiment_dir, resume_errored=True)
else:
# Initial launch of the Anyscale Job
# We start a new Tune experiment from scratch
tuner = tune.Tuner(
PPO,
param_space={
"env": "Humanoid-v2",
"num_sgd_iter": tune.choice([10, 20, 30]),
},
tune_config=tune.TuneConfig(
num_samples=2, metric="episode_reward_mean", mode="max"
),
run_config=RunConfig(
name=args.experiment_name,
local_dir=local_dir,
# Disable Tune's default syncing, since the NFS
# will already be synchronized for all nodes.
sync_config=tune.SyncConfig(syncer=None),
stop={"training_iteration": 100},
checkpoint_config=CheckpointConfig(num_to_keep=1),
),
)

result_grid = tuner.fit()
best_result = result_grid.get_best_result()

# Output some information about the best result
anyscale.job.output(
{
"best_result_log_dir": str(best_result.log_dir),
"best_result_checkpoint_uri": best_result.checkpoint.uri,
"best_result_config": best_result.config,
"best_result_metrics": best_result.metrics,
}
)

Launch this script from a workspace with: anyscale job submit -- python tune_nfs_example.py --experiment-name="<some-unique-name>"

For more details, see the Ray docs on configuring Tune experiments with NFS.

When your Anyscale Job retries, it will automatically resume each trial from their latest checkpoint. When your Anyscale Job completes, you can view information about the best result from the Job logs in the Anyscale UI.

Cloud Storage (Amazon S3, Google Cloud Storage)

Another option is to use cloud storage accessible by all nodes in the cluster.

  1. Make sure you can access an S3 bucket (S3 guide) or GCS bucket (GCS guide) from your cluster.
  2. Set the upload_dir configuration in the Tune SyncConfig to point to your bucket. You should use a s3:// or gs:// URL.
tune_cloud_example.py
import argparse
import os

import anyscale
from ray import tune
from ray.air import CheckpointConfig, RunConfig
from ray.air._internal.remote_storage import list_at_uri
from ray.rllib.algorithms.ppo import PPO

parser = argparse.ArgumentParser()
parser.add_argument(
"--experiment-name",
type=str,
default="anyscale_job_tune_cloud_example",
help=(
"Tune experiment name. "
"Experiment outputs will be saved to: "
"`s3://tune-bucket/<experiment_name>`."
"Be sure to specify a unique experiment name for each new job. "
"Otherwise, this script will try to restore from an existing experiment."
),
)
args = parser.parse_args()

upload_dir = "s3://tune-bucket"
experiment_dir = os.path.join(upload_dir, args.experiment_name)

if args.experiment_name in list_at_uri(upload_dir):
# Anyscale Job retry
# The experiment directory exists, and we want to resume from where we left off
tuner = tune.Tuner.restore(experiment_dir, resume_errored=True)
else:
# Initial launch of the Anyscale Job
# We start a new Tune experiment from scratch
tuner = tune.Tuner(
PPO,
param_space={
"env": "Humanoid-v2",
"num_sgd_iter": tune.choice([10, 20, 30]),
},
tune_config=tune.TuneConfig(
num_samples=2, metric="episode_reward_mean", mode="max"
),
run_config=RunConfig(
name=args.experiment_name,
sync_config=tune.SyncConfig(upload_dir=upload_dir),
stop={"training_iteration": 100},
checkpoint_config=CheckpointConfig(num_to_keep=1),
),
)

result_grid = tuner.fit()
best_result = result_grid.get_best_result()

# Output some information about the best result
anyscale.job.output(
{
"best_result_log_dir": str(best_result.log_dir),
"best_result_checkpoint_uri": best_result.checkpoint.uri,
"best_result_config": best_result.config,
"best_result_metrics": best_result.metrics,
}
)

For more details, see the Ray docs on configuring Tune experiments with cloud storage.

How do I make Anyscale Jobs wait for a cluster to obtain min_nodes before executing my workload?

In some scenarios, you may want your job to wait for some minimum number of nodes to be spun up before launching. If those nodes are not ready after a certain period of time, we will terminate the cluster or job.

We are working on building this in as a supported feature on the platform. Until then, here is a simple script that you can use to enable this behavior.

Instructions

1. Download wait-for-nodes.py script

Add the wait_for_nodes.py script to your working directory:

import argparse
import time
from anyscale import AnyscaleSDK
import os

import ray

ray.init(address="auto")

parser = argparse.ArgumentParser()
parser.add_argument("max_time_s", type=int, help="Wait for this number of seconds")
parser.add_argument(
"num_nodes", type=int, nargs="?", help="Wait for this number of nodes (includes head)", default=-1,
)



args = parser.parse_args()

curr_nodes = 0
start = time.time()
next_feedback = start
max_time = start + args.max_time_s

cluster_id = os.environ.get("ANYSCALE_CLUSTER_ID") or os.environ.get("ANYSCALE_SESSION_ID")

sdk = None

if args.num_nodes == -1:
if not cluster_id:
raise RuntimeError("Could not find cluster id and num_nodes argument not provided. "
"Cannot determine number of nodes needed to wait for. Aborting now...")
print("num_nodes argument not provided. Will try to detect min nodes requested in compute config.")
sdk = AnyscaleSDK()
cluster = sdk.get_cluster(cluster_id).result
compute_config = sdk.get_cluster_compute(cluster.cluster_compute_id).result.config
# Calculate number of nodes to wait for: 1 for head node + sum of all worker nodes
worker_node_types = compute_config.worker_node_types or []
num_nodes = 1 + sum(worker_node.min_workers or 0 for worker_node in worker_node_types)
print(f"Detected a total of {num_nodes} minimum nodes.")
else:
num_nodes = args.num_nodes

if not cluster_id:
print("Warning: Could not find cluster id... "
"This script will fail but will not attempt to terminate the cluster for you. ")

while not curr_nodes >= num_nodes:
now = time.time()

if now >= max_time:
print(
f"Maximum wait time reached, but only "
f"{curr_nodes}/{num_nodes} nodes came up. Aborting."
)
if not sdk:
sdk = AnyscaleSDK()
if cluster_id:
sdk.terminate_cluster(cluster_id, {})
time.sleep(5)
else:
raise RuntimeError("Could not find cluster id... Terminating app to let autosuspend kick in.")


if now >= next_feedback:
passed = now - start
print(
f"Waiting for more nodes to come up: "
f"{curr_nodes}/{num_nodes} "
f"({passed:.0f} seconds passed)"
)
next_feedback = now + 10

time.sleep(5)
curr_nodes = sum(1 for node in ray.nodes() if node["Alive"])

passed = time.time() - start
print(
f"Cluster is up: {curr_nodes}/{num_nodes} nodes online after "
f"{passed:.0f} seconds"
)
  1. The first argument is the max number of seconds to wait for before terminating the cluster
  2. The second argument is optional and takes in the minimum number of nodes to wait for before proceeding. If no argument is provided, it calculates the min number of nodes based on the min_nodes requested for each node type + 1 for the head node.

2. Update entry point to use this script.

Update the entry point for your Anyscale Job.

job.yaml
entrypoint: python wait-for-nodes.py 300 && python my-app.py
working_dir: .
upload_path: s3://my-upload-bucket