Skip to main content

Frequently Asked Questions

How do I leverage Ray AIR fault tolerance with Anyscale Jobs?

You can configure fault tolerance for training and tuning workloads by using Ray Tune with Anyscale Jobs.

To enable this fault tolerance, Tune experiment state needs to be saved to some persistent storage location. We show 2 ways of configuring storage options in Tune:

Network Filesystem (NFS)

Anyscale Jobs provides a few mounted network filesystems out of the box. Using one of these provided NFS mounts is the recommended way to use Tune with Anyscale Jobs. See this section on storage options for the full list.

For the following example, we'll use the /mnt/user_storage as the location where all nodes save their experiment outputs (like model checkpoints and reported results). Using this storage location will persist experiment data through cluster restarts when the Anyscale Job retries on a failure.

tune_nfs_example.py
import argparse
import os

import anyscale
from ray import tune
from ray.air import CheckpointConfig, RunConfig
from ray.rllib.algorithms.ppo import PPO

parser = argparse.ArgumentParser()
parser.add_argument(
"--experiment-name",
type=str,
default="anyscale_job_tune_nfs_example",
help=(
"Tune experiment name. "
"Experiment outputs will be saved to: "
"`/mnt/user_storage/ray_results/<experiment_name>`."
"Be sure to specify a unique experiment name for each new job. "
"Otherwise, this script will try to restore from an existing experiment."
),
)
args = parser.parse_args()

local_dir = "/mnt/user_storage/ray_results"
experiment_dir = os.path.join(local_dir, args.experiment_name)

if os.path.exists(experiment_dir):
# Anyscale Job retry
# The experiment directory exists, and we want to resume from where we left off
tuner = tune.Tuner.restore(experiment_dir, resume_errored=True)
else:
# Initial launch of the Anyscale Job
# We start a new Tune experiment from scratch
tuner = tune.Tuner(
PPO,
param_space={
"env": "Humanoid-v2",
"num_sgd_iter": tune.choice([10, 20, 30]),
},
tune_config=tune.TuneConfig(
num_samples=2, metric="episode_reward_mean", mode="max"
),
run_config=RunConfig(
name=args.experiment_name,
local_dir=local_dir,
# Disable Tune's default syncing, since the NFS
# will already be synchronized for all nodes.
sync_config=tune.SyncConfig(syncer=None),
stop={"training_iteration": 100},
checkpoint_config=CheckpointConfig(num_to_keep=1),
),
)

result_grid = tuner.fit()
best_result = result_grid.get_best_result()

# Output some information about the best result
anyscale.job.output(
{
"best_result_log_dir": str(best_result.log_dir),
"best_result_checkpoint_uri": best_result.checkpoint.uri,
"best_result_config": best_result.config,
"best_result_metrics": best_result.metrics,
}
)

Launch this script from a workspace with: anyscale job submit -- python tune_nfs_example.py --experiment-name="<some-unique-name>"

For more details, see the Ray docs on configuring Tune experiments with NFS.

When your Anyscale Job retries, it will automatically resume each trial from their latest checkpoint. When your Anyscale Job completes, you can view information about the best result from the Job logs in the Anyscale UI.

Cloud Storage (Amazon S3, Google Cloud Storage)

Another option is to use cloud storage accessible by all nodes in the cluster.

  1. Make sure you can access an S3 bucket (S3 guide) or GCS bucket (GCS guide) from your cluster.
  2. Set the upload_dir configuration in the Tune SyncConfig to point to your bucket. You should use a s3:// or gs:// URL.
tune_cloud_example.py
import argparse
import os

import anyscale
from ray import tune
from ray.air import CheckpointConfig, RunConfig
from ray.air._internal.remote_storage import list_at_uri
from ray.rllib.algorithms.ppo import PPO

parser = argparse.ArgumentParser()
parser.add_argument(
"--experiment-name",
type=str,
default="anyscale_job_tune_cloud_example",
help=(
"Tune experiment name. "
"Experiment outputs will be saved to: "
"`s3://tune-bucket/<experiment_name>`."
"Be sure to specify a unique experiment name for each new job. "
"Otherwise, this script will try to restore from an existing experiment."
),
)
args = parser.parse_args()

upload_dir = "s3://tune-bucket"
experiment_dir = os.path.join(upload_dir, args.experiment_name)

if args.experiment_name in list_at_uri(upload_dir):
# Anyscale Job retry
# The experiment directory exists, and we want to resume from where we left off
tuner = tune.Tuner.restore(experiment_dir, resume_errored=True)
else:
# Initial launch of the Anyscale Job
# We start a new Tune experiment from scratch
tuner = tune.Tuner(
PPO,
param_space={
"env": "Humanoid-v2",
"num_sgd_iter": tune.choice([10, 20, 30]),
},
tune_config=tune.TuneConfig(
num_samples=2, metric="episode_reward_mean", mode="max"
),
run_config=RunConfig(
name=args.experiment_name,
sync_config=tune.SyncConfig(upload_dir=upload_dir),
stop={"training_iteration": 100},
checkpoint_config=CheckpointConfig(num_to_keep=1),
),
)

result_grid = tuner.fit()
best_result = result_grid.get_best_result()

# Output some information about the best result
anyscale.job.output(
{
"best_result_log_dir": str(best_result.log_dir),
"best_result_checkpoint_uri": best_result.checkpoint.uri,
"best_result_config": best_result.config,
"best_result_metrics": best_result.metrics,
}
)

For more details, see the Ray docs on configuring Tune experiments with cloud storage.

How do I make Anyscale Jobs wait for a cluster to obtain min_nodes before executing my workload?

In some scenarios, you may want your job to wait for some minimum number of nodes to be spun up before launching. If those nodes are not ready after a certain period of time, we will terminate the cluster or job.

We are working on building this in as a supported feature on the platform. Until then, here is a simple script that you can use to enable this behavior.

Instructions

1. Download wait-for-nodes.py script

Download the wait-for-nodes.py script to your working dir. This script takes two arguments:

  1. The first argument is the max number of seconds to wait for before terminating the cluster
  2. The second argument is optional and takes in the minimum number of nodes to wait for before proceeding. If no argument is provided, it calculates the min number of nodes based on the min_nodes requested for each node type + 1 for the head node.

2. Update entry point to use this script.

Update the entry point for your Anyscale Job.

job.yaml
entrypoint: python wait-for-nodes.py 300 && python my-app.py
working_dir: .
upload_path: s3://my-upload-bucket