Train, Tune, and Serve an RL model
The task at hand: a mysterious corridor
Our agent can start on either (1) or (6) and must find its way to the middle, (4).
A seemingly simple task, but (3) shows up twice to both the left and right of (4), so any deterministic policy won't reach goal state for 50% of the time. Our RL agent must learn a stochastic (probabilistic) policy.
We'll train, tune, and deploy this RL model to an HTTPS endpoint using Ray on Anyscale.
info
Just want to see the code? Check out this Github repo.
Prerequisites
- Have an Anyscale account and have set up an AWS or GCP cloud
- Have your credentials in
~/.anyscale/credentials.json
- AWS or GCP credentials locally for uploading files to cloud storage
Overview: steps involved
- Get the code and run it!
- Structure of the project
- Code walkthrough
- Define our
gym
environment inenv.py
- Define our RLlib Pytorch model in
model.py
- Define our training function with Ray Tune in
train.py
- Deploying to Ray Serve in
deploy.py
- Training & deploying on Anyscale cluster with
jobs.py
- Define our
- Where to go from here
Let's start!
Get the code and run it
Because the cluster launching & model training takes around 20 minutes for this example, it's a good idea to kick off these jobs before diving into the code. Let's do that now.
# download the code
git clone https://github.com/anyscale/rl-end-to-end-tutorial.git ./tutorial
# install necessary Python packages
pip install -r tutorial/requirements.txt
Great, you've got the code.
Our final step to run will be uploading the code and running the jobs.py
Anyscale SDK script provided in the repo. Depending on your cloud provider type, you'll want to run this script with slightly different arguments.
Some tips for finding these input arguments:
--cloud-name
: Find one through the Anyscale UI or with the CLI:anyscale cloud list
--gcp-proj-id
: only for clouds on GCP, find it in GCP console under Settings > Project Details--checkpoints-dir
: either an S3 or GS directory, and should include the prefix (ie:s3://
)--code-path
: where to upload the code to. If you're running code without modifications. Simply a zipped archive of the repo. Any time you want to make a code change and run again, you'll need to re-zip the repo folder and re-upload.--aws-role-arn
: for AWS-specific clouds only, follow the Quick & easy method for adding an S3 bucket guide and copy the ARN, and use it here. If you already have an IAM role or are a more advanced user, see Bring your own AWS IAM role or Accessing your private S3 bucket. You can craft your own fine-grained IAM role and use that ARN here.
Then all that's left is to run it!
- Deploy in Anyscale Managed Cloud (AWS)
- Deploy in your GCP account
- Deploy in your AWS account
#########################
# FILL IN THESE DETAILS #
#########################
export S3_BUCKET=s3://<your bucket here>
export CLOUD_NAME=<your Anyscale AWS cloud name>
# Assemble paths and copy up working directory to S3
zip -r job_working_dir.zip ./tutorial
aws s3 cp ./job_working_dir.zip $S3_BUCKET/my_code/code.zip
cd ./tutorial
# finally, run the Anyscale SDK script!
python jobs.py \
--cloud-name $CLOUD_NAME \
--checkpoints-dir $S3_BUCKET/my_checkpoints/ \
--code-path $S3_BUCKET/my_code/code.zip
# Install gcloud: https://cloud.google.com/sdk/docs/install
# then get GCP credentials locally
gcloud auth login
# next, fill in the variables in this shell script, and run it
cd tutorial
sh setup_gcp_bucket.sh
#########################
# FILL IN THESE DETAILS #
#########################
export GCP_BUCKET=gs://<your-bucket>
export GCP_PROJ_ID=<your-GCP-project-id
export CLOUD_NAME=<your Anyscale AWS cloud name>
# Assemble paths and copy up working directory to S3
zip -r job_working_dir.zip ./tutorial
gcloud cp ./job_working_dir.zip $GCP_BUCKET/my_code/code.zip
cd ./tutorial
# finally, run the Anyscale SDK script!
python jobs.py \
--cloud-name $CLOUD_NAME \
--checkpoints-dir $GCP_BUCKET/my_checkpoints/ \
--code-path $GCP_BUCKET/my_code/code.zip \
--gcp-proj-id $GCP_PROJ_ID
#########################
# FILL IN THESE DETAILS #
#########################
export S3_BUCKET=s3://<your bucket here>
export CLOUD_NAME=<your Anyscale AWS cloud name>
export AWS_ARN_ROLE_STRING=<your AWS ARN role from `--aws-role-arn` guidance above>
# Assemble paths and copy up working directory to S3
cd ..
zip -r job_working_dir.zip ./tutorial
aws s3 cp ./job_working_dir.zip $S3_BUCKET/my_code/code.zip
cd ./tutorial
# finally, run the Anyscale SDK script!
python jobs.py \
--cloud-name $CLOUD_NAME \
--checkpoints-dir $S3_BUCKET/my_checkpoints/ \
--code-path $S3_BUCKET/my_code/code.zip \
--aws-role-arn $AWS_ARN_ROLE_STRING
You should see something like the following:
(anyscale +3.6s) Loaded Anyscale authentication token from ~/.anyscale/credentials.json
Building cluster env ... rl-cuj-cluster-env-20211203-001228
Find your new build here: https://console.anyscale.com/configurations/?state=CreatedByMe&tab=cluster-env
Booting up these two clusters, training the RL model, and deploying to a service should take around 15-20 minutes. While we're waiting, let's understand how all the pieces fit together.
Structure of our project repo
% tree
.
├── README.md
├── deploy.py
├── env.py
├── jobs.py
├── model.py
├── requirements.txt
├── setup_conda_env.sh
├── setup_gcp_bucket.sh
└── train.py
We have a few important files:
model.py
: contains our RLlib Pytorch model definitionenv.py
: holds our agent's RL gym environment setup codetrain.py
: runs our Ray RLlib training loop, and tunes hyperparameters with Ray Tune, saving checkpoints to cloud storagedeploy.py
: reads our best model checkpoints from cloud storage and deploys to HTTPS endpoint on Anyscalejobs.py
: glues together all of our code and runs it using the Anyscale SDKrequirements.txt
: holds the Python depdenencies to run
In particular, jobs.py
handles all the packaging necessary to deploy our two units of work (train.py
and deploy.py
) as Anyscale Jobs.
Anysacle Jobs handles all of the boilerplate around launching clusters, applying depenedencies, retrying in case of failures, and tearing down the cluster after the job finishes. Read more here if you're curious.
Let's start walking through the code!
Step 1: Defining our gym environment in env.py
In RLlib, our ML model needs a way to define the environment to determine which actions and states are legal and how rewards are allocated.
import gym
from gym.spaces import Discrete, Box
import numpy as np
import random
from ray.rllib.env.env_context import EnvContext
class MysteriousCorridor(gym.Env):
"""Example of a custom env in which you walk down a mysterious corridor.
You can configure the reward of the destination state via the env config.
A mysterious corridor has 7 cells and looks like this:
-------------------------------------------
| 1 | 2 | 3 | 4 | 3 | 5 | 6 |
-------------------------------------------
You always start from state 1 (left most) or 6 (right most).
The goal is to get to the destination state 4 (in the middle).
There are only 2 actions, 0 means go left, 1 means go right.
"""
def __init__(self, config: EnvContext):
self.seed(random.randint(0, 1000))
self.action_space = Discrete(2)
self.observation_space = Box(0.0, 6.0, shape=(1, ), dtype=np.float32)
self.reward = config["reward"]
self.reset()
def reset(self):
# cur_pos is the actual postion of the player. not the state a player
# sees from outside of the environemtn.
# E.g., when cur_pos is 1, the returned state is 3.
# Start from either side of the corridor, 0 or 4.
self.cur_pos = random.choice([0, 6])
return [self.cur_pos]
def _pos_to_state(self, pos):
ptos = [1, 2, 3, 4, 3, 5, 6]
return ptos[pos]
def step(self, action):
assert action in [0, 1], action
if action == 0:
self.cur_pos = max(0, self.cur_pos - 1)
if action == 1:
self.cur_pos = min(6, self.cur_pos + 1)
done = (self.cur_pos == 3)
reward = self.reward if done else -0.1
return [self._pos_to_state(self.cur_pos)], reward, done, {}
def seed(self, seed=None):
random.seed(seed)
def render(self):
def symbol(i):
if i == self.cur_pos:
return "o"
elif i == 3:
return "x"
elif i == 2 or i == 4:
return "_"
else:
return " "
return "| " + " | ".join([symbol(i) for i in range(7)]) + " |"
To learn more about defining your own custom envs in RLlib, see the guide or this code example.
Step 2: Define our RLlib Pytorch model
RLlib requires that your model subclass TorchModelV2 and implement a few functions.
If this step seems a little arcane, check out the RLlib documentation on defining custom deep learning models.
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFC
from ray.rllib.utils.framework import try_import_torch
torch, nn = try_import_torch()
class TorchCustomModel(TorchModelV2, nn.Module):
"""Example of a PyTorch custom model that just delegates to a fc-net."""
def __init__(self, obs_space, action_space, num_outputs, model_config,
name):
TorchModelV2.__init__(self, obs_space, action_space, num_outputs,
model_config, name)
nn.Module.__init__(self)
self.torch_sub_model = TorchFC(obs_space, action_space, num_outputs,
model_config, name)
def forward(self, input_dict, state, seq_lens):
input_dict["obs"] = input_dict["obs"].float()
fc_out, _ = self.torch_sub_model(input_dict, state, seq_lens)
return fc_out, []
def value_function(self):
return torch.reshape(self.torch_sub_model.value_function(), [-1])
Step 3: Define our training function in train.py
As most RL practitioners know, RL training can be very sensitive to initial conditions. In the Ray ecosystem, it's quite easy to tame this using Ray Tune, a hyperparameter tuning framework.
Ray Tune will not only help to intelligently search the hyperparameter space, but also regularly save model snapshots ("checkpoints") and Tensorboard results into durable cloud storage such as S3 or GCS. These can be used to restart runs or survive a run failure.
A few notes on the code below:
TRAINER_CFG
is an RLlib configuration dictCUJ-RL
determines the path in cloud storage bucket location (ie: S3/GCS bucket) that will be used to store model checkpoints as well as Tensorboard results, metrics, and much moretune.run()
can be used to optimize any model/function given a loss criterion (API reference)
Here's the code:
import argparse
from datetime import datetime
import os
import subprocess
import time
from typing import Any, Dict, Tuple
import ray
from ray import tune
from ray.rllib.agents.ppo import ppo
import env
import model
TRAINER_CFG = {
"env": env.MysteriousCorridor,
"env_config": {
"reward": 10.0,
},
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
"num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
"model": {
"custom_model": model.TorchCustomModel,
"fcnet_hiddens": [20, 20],
"vf_share_layers": True,
},
"num_workers": 1, # parallelism
"framework": "torch",
"rollout_fragment_length": 10,
"lr": 0.01,
}
RUN_PREFIX = "CUJ-RL"
def train(ts: str, mode: str, upload_dir: str) -> str:
print("Training & tuning automatically with Ray Tune...")
local = mode == "local"
run_name = f"{RUN_PREFIX}-{ts}"
results = tune.run(
ppo.PPOTrainer,
name=run_name,
config=TRAINER_CFG,
checkpoint_freq=5,
checkpoint_at_end=True,
sync_config=None if local else tune.SyncConfig(upload_dir=upload_dir),
stop={"training_iteration": 10},
num_samples=1 if local else 10,
metric="episode_reward_mean",
mode="max")
print("Best checkpoint: ")
print(results.best_checkpoint)
if upload_dir:
tmp_file = "/tmp/best_checkpoint.txt"
with open(tmp_file, "w") as f:
f.write(results.best_checkpoint)
best_checkpoint_file = os.path.join(
upload_dir, run_name, "best_checkpoint.txt")
print("Saving best checkpoint in: ", best_checkpoint_file)
if upload_dir.startswith("gs://"):
subprocess.run(["gsutil", "cp", tmp_file, best_checkpoint_file],
check=True)
elif upload_dir.startswith("s3://"):
subprocess.run(["aws", "s3", "cp", tmp_file, best_checkpoint_file],
check=True)
else:
raise ValueError("Unknown upload dir type: ", upload_dir)
return results.best_checkpoint
With this, we have a training loop that runs equally well on our laptop as on the cloud. You'll notice further down in the file, that you can run train.py
in three different modes using the --mode
flag:
- Run on laptop
- Run on an Anyscale cluster
- Run as an Anysacle Job in production
# restart your local Ray cluster as a head node
ray stop && ray start --head
# run on your local machine, good for quick iteration
python train.py --mode local --checkpoints-dir=<s3://...|gs://...>
########################################################
# then, wait until the train script finishes running...
# check the checkpoints directory
ls <path printed out from train.py stdout>
# run on OSS Ray cluster
python train.py --upload-dir s3 --address 145.34.44.233:1234 --checkpoints-dir s3://<checkpoints-dir>
# run on Anyscale cluster
python train.py --upload-dir s3 --address anyscale://my_dev_cluster --checkpoints-dir gs://<checkpoints-dir>
# TODO: give example of where checkpoint results will be
aws s3 ls s3://<checkpoints-dir>/...
gcloud gs ls gs://<checkpoints-dir>/...
# https://console.cloud.google.com/storage/browser/jun-riot-test/test-cuj/CUJ-RL-20211201-1108?pageState=(%22StorageObjectListTable%22:(%22f%22:%22%255B%255D%22))&project=jun-anyscale-dev&prefix=&forceOnObjectsSortingFiltering=false
# this will handle starting cluster, retrying failures, and tearing down cluster afterwards
python train.py --mode job --checkpoints-dir s3://<checkpoints-dir>
# TODO: give example of where checkpoint results will be
aws s3 ls s3://<checkpoints-dir>/...
One of the primary goals of Ray has always been to allow fast iteration and a clear path to production. You should be able to run the same code on your laptop as you do the cloud!
Now that we've covered training a model, let's move on to actually deploying it!
Step 4: Deploying to Ray Serve with deploy.py
As any ML practitioner knows, a model isn't useful unless we have a way to deploy it.
Ray Serve is a model-agnostic, scalable, GPU-ready, low latency (<2 ms request overhead) serving framework that runs on Ray, and can run in an HA (highly available) way on Anyscale.
Here we'll show an easy way to deploy our trained policy to a Ray Serve HTTPS endpoint, and do so in a way where we can easily move to production with Anyscale later.
We'll demonstrate how to encapsulate state on your endpoint with a request counter as well.
A few notes:
@serve.deployment(name="corridor", num_replicas=2)
is a decorator that transforms this Python class into an endpoint with two replicas. See more on Serve Deployments here.- We can specify our endpoints use GPUs or other custom resources with
num_gpus=4
. See an example here
import os
import argparse
from datetime import datetime
import json
import logging
import requests
import subprocess
import time
from urllib import parse
import ray
from ray.rllib.agents import ppo
from ray.rllib.env.env_context import EnvContext
from ray import serve
import env
import train
RUN_PREFIX = "CUJ-RL"
def sync_chkpt(ckpt_path, bucket):
print("Starting checkpoint sync...")
if not bucket.endswith("/"):
bucket += "/"
ts = datetime.now().strftime('%Y%m%d-%H%M')
# Tune returns a local checkpoint path at this point.
# TODO: we won't need to do this hacky conversion once we
# get the cloud checkpoint path directly.
ckpt_name = os.path.basename(ckpt_path)
storage_path = os.path.dirname(
bucket + ckpt_path.strip("/home/ray/ray_results"))
model_path = os.path.join(os.getcwd(), f"ray_model_{ts}")
if bucket.startswith("gs"):
sync = ["gsutil", "rsync"]
elif bucket.startswith("s3"):
sync = ["aws", "s3", "sync"]
subprocess.run(["mkdir", "-p", model_path], check=True)
subprocess.run(sync + [storage_path, model_path], check=True)
print("Synced", storage_path, "to", model_path)
return os.path.join(model_path, ckpt_name)
@serve.deployment(name="corridor", num_replicas=2)
class Corridor(object):
def __init__(self, ckpt_path, local, bucket):
try:
print("Deployment initializing.", locals())
config = ppo.DEFAULT_CONFIG.copy()
config.update(train.TRAINER_CFG)
config['num_workers'] = 0
if local:
model_path = ckpt_path
else:
assert bucket, "Bucket must be provided if not local."
model_path = sync_chkpt(ckpt_path, bucket)
agent = ppo.PPOTrainer(config=config, env=env.MysteriousCorridor)
agent.restore(model_path)
print("Agent restored")
self._policy = agent.workers.local_worker().get_policy()
self._count = 0
except Exception as e:
print(e)
import traceback
traceback.print_exc()
def __action(self, state):
action = self._policy.compute_single_action(state)
# JSON can't handle int64. Convert to int32.
return int(action[0])
async def __call__(self, request):
self._count += 1
body = await request.body()
try:
data = json.loads(body.decode("utf-8"))
except ValueError as e:
# Can't parse body as json data.
return "can't decode: " + body.decode("utf-8")
try:
return {
"count": self._count,
"action": self.__action(data["state"]),
}
except Exception as e:
return str(e)
def deploy(ckpt_path: str, local: bool, bucket: str = None):
serve.start(detached=True)
Corridor.deploy(ckpt_path, local=local, bucket=bucket)
print("Corridor service deployed!")
print(f"You can query the model at: {Corridor.url}")
return Corridor.url
Once again, deploy.py
has been lovingly crafted with --mode
flags to show you how to deploy your Ray Serve endpoint in local, cluster, and job modes:
- Deploy on laptop
- Deploy on an Anyscale cluster
- Deploy as an Anysacle Job in production
python deploy.py --ckpt-path /home/ray/ray_results/CUJ-RL-xxxx/path/to/checkpoint/here
python deploy.py --mode client ...
python deploy.py --mode job --checkpoints-dir s3://your/path/here
--ckpt-path /home/ray/ray_results/CUJ-RL-xxxx/path/to/checkpoint/here
We've seen all the pieces. Let's glue it all together using the Anyscale SDK.
Step 5: Training & deploying on Anyscale with jobs.py
Let's break it down.
jobs.py
does the following:
- (not pictured) Uses the SDK to create compute configs and cluster environments for both the training and the deploy task.
- Kicks off
train.py
with the Anyscale API using the SDK. This:- Starts a Ray cluster in the Data Plane (either customer's account or Anyscale's, if using our Managed Cloud)
- Runs the command specified
- Retries if needed
- Termiantes the cluster
- Does the same, but for the
deploy.py
:- Starts a Ray cluster in the Data Plane (either customer's account or Anyscale's, if using our Managed Cloud)
- Runs the command specified (long lived service)
- Tests the HTTPS endpoint deployed by querying it
We've wisely chosen to use cloud storage (ie: S3) as our state between runs, which allows our model deployment to refer to the checkpoints generated by the previously run training job. Read more about model checkpointing and tuning with Ray Tune here.
And in case you're wondering: the deploy job could have been kicked off by the training job itself! We'll leave that as an exercise to the reader :)
Where to go from here
Congratulations. You've trained, tuned, and deployed a scalable RL model without any complex devops or management of a Ray cluster yourself.
More Anyscale tutorials
- CI/CD
- Parallelism in Ray with time series data
- Serving a Pytorch model
- Developing Ray Serve applications
- Weights & Biases on Anyscale
- MLflow on Anyscale