Skip to main content

Train, Tune, and Serve an RL model

The task at hand: a mysterious corridor

A simple looking (yet deceptively difficult to learn) RL environment

Our agent can start on either (1) or (6) and must find its way to the middle, (4).

A seemingly simple task, but (3) shows up twice to both the left and right of (4), so any deterministic policy won't reach goal state for 50% of the time. Our RL agent must learn a stochastic (probabilistic) policy.

We'll train, tune, and deploy this RL model to an HTTPS endpoint using Ray on Anyscale.

info

Just want to see the code? Check out this Github repo.

Prerequisites

  1. Have an Anyscale account and have set up an AWS or GCP cloud
  2. Have your credentials in ~/.anyscale/credentials.json
  3. AWS or GCP credentials locally for uploading files to cloud storage

Overview: steps involved

  1. Get the code and run it!
  2. Structure of the project
  3. Code walkthrough
    1. Define our gym environment in env.py
    2. Define our RLlib Pytorch model in model.py
    3. Define our training function with Ray Tune in train.py
    4. Deploying to Ray Serve in deploy.py
    5. Training & deploying on Anyscale cluster with jobs.py
  4. Where to go from here

Let's start!

Get the code and run it

Because the cluster launching & model training takes around 20 minutes for this example, it's a good idea to kick off these jobs before diving into the code. Let's do that now.

# download the code
git clone https://github.com/anyscale/rl-end-to-end-tutorial.git ./tutorial

# install necessary Python packages
pip install -r tutorial/requirements.txt

Great, you've got the code.

Our final step to run will be uploading the code and running the jobs.py Anyscale SDK script provided in the repo. Depending on your cloud provider type, you'll want to run this script with slightly different arguments.

Some tips for finding these input arguments:

  • --cloud-name: Find one through the Anyscale UI or with the CLI: anyscale cloud list
  • --gcp-proj-id: only for clouds on GCP, find it in GCP console under Settings > Project Details
  • --checkpoints-dir : either an S3 or GS directory, and should include the prefix (ie: s3://)
  • --code-path: where to upload the code to. If you're running code without modifications. Simply a zipped archive of the repo. Any time you want to make a code change and run again, you'll need to re-zip the repo folder and re-upload.
  • --aws-role-arn: for AWS-specific clouds only, follow the Quick & easy method for adding an S3 bucket guide and copy the ARN, and use it here. If you already have an IAM role or are a more advanced user, see Bring your own AWS IAM role or Accessing your private S3 bucket. You can craft your own fine-grained IAM role and use that ARN here.

Then all that's left is to run it!

#########################
# FILL IN THESE DETAILS #
#########################
export S3_BUCKET=s3://<your bucket here>
export CLOUD_NAME=<your Anyscale AWS cloud name>

# Assemble paths and copy up working directory to S3
zip -r job_working_dir.zip ./tutorial
aws s3 cp ./job_working_dir.zip $S3_BUCKET/my_code/code.zip
cd ./tutorial

# finally, run the Anyscale SDK script!
python jobs.py \
--cloud-name $CLOUD_NAME \
--checkpoints-dir $S3_BUCKET/my_checkpoints/ \
--code-path $S3_BUCKET/my_code/code.zip

You should see something like the following:

(anyscale +3.6s) Loaded Anyscale authentication token from ~/.anyscale/credentials.json
Building cluster env ... rl-cuj-cluster-env-20211203-001228
Find your new build here: https://console.anyscale.com/configurations/?state=CreatedByMe&tab=cluster-env

Booting up these two clusters, training the RL model, and deploying to a service should take around 15-20 minutes. While we're waiting, let's understand how all the pieces fit together.

Structure of our project repo

% tree
.
├── README.md
├── deploy.py
├── env.py
├── jobs.py
├── model.py
├── requirements.txt
├── setup_conda_env.sh
├── setup_gcp_bucket.sh
└── train.py

We have a few important files:

  • model.py: contains our RLlib Pytorch model definition
  • env.py: holds our agent's RL gym environment setup code
  • train.py: runs our Ray RLlib training loop, and tunes hyperparameters with Ray Tune, saving checkpoints to cloud storage
  • deploy.py: reads our best model checkpoints from cloud storage and deploys to HTTPS endpoint on Anyscale
  • jobs.py: glues together all of our code and runs it using the Anyscale SDK
  • requirements.txt: holds the Python depdenencies to run

In particular, jobs.py handles all the packaging necessary to deploy our two units of work (train.py and deploy.py) as Anyscale Jobs.

Anysacle Jobs handles all of the boilerplate around launching clusters, applying depenedencies, retrying in case of failures, and tearing down the cluster after the job finishes. Read more here if you're curious.

Let's start walking through the code!

Step 1: Defining our gym environment in env.py

In RLlib, our ML model needs a way to define the environment to determine which actions and states are legal and how rewards are allocated.

env.py
import gym
from gym.spaces import Discrete, Box
import numpy as np
import random

from ray.rllib.env.env_context import EnvContext


class MysteriousCorridor(gym.Env):
"""Example of a custom env in which you walk down a mysterious corridor.

You can configure the reward of the destination state via the env config.

A mysterious corridor has 7 cells and looks like this:
-------------------------------------------
| 1 | 2 | 3 | 4 | 3 | 5 | 6 |
-------------------------------------------
You always start from state 1 (left most) or 6 (right most).
The goal is to get to the destination state 4 (in the middle).
There are only 2 actions, 0 means go left, 1 means go right.
"""

def __init__(self, config: EnvContext):
self.seed(random.randint(0, 1000))

self.action_space = Discrete(2)
self.observation_space = Box(0.0, 6.0, shape=(1, ), dtype=np.float32)
self.reward = config["reward"]

self.reset()

def reset(self):
# cur_pos is the actual postion of the player. not the state a player
# sees from outside of the environemtn.
# E.g., when cur_pos is 1, the returned state is 3.
# Start from either side of the corridor, 0 or 4.
self.cur_pos = random.choice([0, 6])
return [self.cur_pos]

def _pos_to_state(self, pos):
ptos = [1, 2, 3, 4, 3, 5, 6]
return ptos[pos]

def step(self, action):
assert action in [0, 1], action

if action == 0:
self.cur_pos = max(0, self.cur_pos - 1)
if action == 1:
self.cur_pos = min(6, self.cur_pos + 1)

done = (self.cur_pos == 3)
reward = self.reward if done else -0.1

return [self._pos_to_state(self.cur_pos)], reward, done, {}

def seed(self, seed=None):
random.seed(seed)

def render(self):
def symbol(i):
if i == self.cur_pos:
return "o"
elif i == 3:
return "x"
elif i == 2 or i == 4:
return "_"
else:
return " "
return "| " + " | ".join([symbol(i) for i in range(7)]) + " |"

To learn more about defining your own custom envs in RLlib, see the guide or this code example.

Step 2: Define our RLlib Pytorch model

RLlib requires that your model subclass TorchModelV2 and implement a few functions.

If this step seems a little arcane, check out the RLlib documentation on defining custom deep learning models.

model.py
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFC
from ray.rllib.utils.framework import try_import_torch

torch, nn = try_import_torch()


class TorchCustomModel(TorchModelV2, nn.Module):
"""Example of a PyTorch custom model that just delegates to a fc-net."""

def __init__(self, obs_space, action_space, num_outputs, model_config,
name):
TorchModelV2.__init__(self, obs_space, action_space, num_outputs,
model_config, name)
nn.Module.__init__(self)

self.torch_sub_model = TorchFC(obs_space, action_space, num_outputs,
model_config, name)

def forward(self, input_dict, state, seq_lens):
input_dict["obs"] = input_dict["obs"].float()
fc_out, _ = self.torch_sub_model(input_dict, state, seq_lens)
return fc_out, []

def value_function(self):
return torch.reshape(self.torch_sub_model.value_function(), [-1])

Step 3: Define our training function in train.py

As most RL practitioners know, RL training can be very sensitive to initial conditions. In the Ray ecosystem, it's quite easy to tame this using Ray Tune, a hyperparameter tuning framework.

Ray Tune will not only help to intelligently search the hyperparameter space, but also regularly save model snapshots ("checkpoints") and Tensorboard results into durable cloud storage such as S3 or GCS. These can be used to restart runs or survive a run failure.

A few notes on the code below:

  • TRAINER_CFG is an RLlib configuration dict
  • CUJ-RL determines the path in cloud storage bucket location (ie: S3/GCS bucket) that will be used to store model checkpoints as well as Tensorboard results, metrics, and much more
  • tune.run() can be used to optimize any model/function given a loss criterion (API reference)

Here's the code:

import argparse
from datetime import datetime
import os
import subprocess
import time
from typing import Any, Dict, Tuple

import ray
from ray import tune
from ray.rllib.agents.ppo import ppo

import env
import model


TRAINER_CFG = {
"env": env.MysteriousCorridor,
"env_config": {
"reward": 10.0,
},
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
"num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
"model": {
"custom_model": model.TorchCustomModel,
"fcnet_hiddens": [20, 20],
"vf_share_layers": True,
},
"num_workers": 1, # parallelism
"framework": "torch",
"rollout_fragment_length": 10,
"lr": 0.01,
}

RUN_PREFIX = "CUJ-RL"


def train(ts: str, mode: str, upload_dir: str) -> str:
print("Training & tuning automatically with Ray Tune...")
local = mode == "local"

run_name = f"{RUN_PREFIX}-{ts}"
results = tune.run(
ppo.PPOTrainer,
name=run_name,
config=TRAINER_CFG,
checkpoint_freq=5,
checkpoint_at_end=True,
sync_config=None if local else tune.SyncConfig(upload_dir=upload_dir),
stop={"training_iteration": 10},
num_samples=1 if local else 10,
metric="episode_reward_mean",
mode="max")

print("Best checkpoint: ")
print(results.best_checkpoint)

if upload_dir:
tmp_file = "/tmp/best_checkpoint.txt"
with open(tmp_file, "w") as f:
f.write(results.best_checkpoint)
best_checkpoint_file = os.path.join(
upload_dir, run_name, "best_checkpoint.txt")
print("Saving best checkpoint in: ", best_checkpoint_file)

if upload_dir.startswith("gs://"):
subprocess.run(["gsutil", "cp", tmp_file, best_checkpoint_file],
check=True)
elif upload_dir.startswith("s3://"):
subprocess.run(["aws", "s3", "cp", tmp_file, best_checkpoint_file],
check=True)
else:
raise ValueError("Unknown upload dir type: ", upload_dir)

return results.best_checkpoint

With this, we have a training loop that runs equally well on our laptop as on the cloud. You'll notice further down in the file, that you can run train.py in three different modes using the --mode flag:

# restart your local Ray cluster as a head node
ray stop && ray start --head

# run on your local machine, good for quick iteration
python train.py --mode local --checkpoints-dir=<s3://...|gs://...>

########################################################
# then, wait until the train script finishes running...

# check the checkpoints directory
ls <path printed out from train.py stdout>

One of the primary goals of Ray has always been to allow fast iteration and a clear path to production. You should be able to run the same code on your laptop as you do the cloud!

Now that we've covered training a model, let's move on to actually deploying it!

Step 4: Deploying to Ray Serve with deploy.py

As any ML practitioner knows, a model isn't useful unless we have a way to deploy it.

Ray Serve is a model-agnostic, scalable, GPU-ready, low latency (<2 ms request overhead) serving framework that runs on Ray, and can run in an HA (highly available) way on Anyscale.

Here we'll show an easy way to deploy our trained policy to a Ray Serve HTTPS endpoint, and do so in a way where we can easily move to production with Anyscale later.

We'll demonstrate how to encapsulate state on your endpoint with a request counter as well.

A few notes:

  • @serve.deployment(name="corridor", num_replicas=2)is a decorator that transforms this Python class into an endpoint with two replicas. See more on Serve Deployments here.
  • We can specify our endpoints use GPUs or other custom resources with num_gpus=4. See an example here
deploy.py
import os
import argparse
from datetime import datetime
import json
import logging
import requests
import subprocess
import time
from urllib import parse

import ray
from ray.rllib.agents import ppo
from ray.rllib.env.env_context import EnvContext
from ray import serve

import env
import train


RUN_PREFIX = "CUJ-RL"


def sync_chkpt(ckpt_path, bucket):
print("Starting checkpoint sync...")

if not bucket.endswith("/"):
bucket += "/"

ts = datetime.now().strftime('%Y%m%d-%H%M')
# Tune returns a local checkpoint path at this point.
# TODO: we won't need to do this hacky conversion once we
# get the cloud checkpoint path directly.
ckpt_name = os.path.basename(ckpt_path)
storage_path = os.path.dirname(
bucket + ckpt_path.strip("/home/ray/ray_results"))

model_path = os.path.join(os.getcwd(), f"ray_model_{ts}")
if bucket.startswith("gs"):
sync = ["gsutil", "rsync"]
elif bucket.startswith("s3"):
sync = ["aws", "s3", "sync"]
subprocess.run(["mkdir", "-p", model_path], check=True)
subprocess.run(sync + [storage_path, model_path], check=True)

print("Synced", storage_path, "to", model_path)
return os.path.join(model_path, ckpt_name)


@serve.deployment(name="corridor", num_replicas=2)
class Corridor(object):
def __init__(self, ckpt_path, local, bucket):
try:
print("Deployment initializing.", locals())
config = ppo.DEFAULT_CONFIG.copy()
config.update(train.TRAINER_CFG)
config['num_workers'] = 0

if local:
model_path = ckpt_path
else:
assert bucket, "Bucket must be provided if not local."
model_path = sync_chkpt(ckpt_path, bucket)
agent = ppo.PPOTrainer(config=config, env=env.MysteriousCorridor)
agent.restore(model_path)
print("Agent restored")

self._policy = agent.workers.local_worker().get_policy()
self._count = 0
except Exception as e:
print(e)
import traceback
traceback.print_exc()

def __action(self, state):
action = self._policy.compute_single_action(state)
# JSON can't handle int64. Convert to int32.
return int(action[0])

async def __call__(self, request):
self._count += 1

body = await request.body()
try:
data = json.loads(body.decode("utf-8"))
except ValueError as e:
# Can't parse body as json data.
return "can't decode: " + body.decode("utf-8")

try:
return {
"count": self._count,
"action": self.__action(data["state"]),
}
except Exception as e:
return str(e)


def deploy(ckpt_path: str, local: bool, bucket: str = None):
serve.start(detached=True)
Corridor.deploy(ckpt_path, local=local, bucket=bucket)

print("Corridor service deployed!")
print(f"You can query the model at: {Corridor.url}")
return Corridor.url

Once again, deploy.py has been lovingly crafted with --mode flags to show you how to deploy your Ray Serve endpoint in local, cluster, and job modes:

python deploy.py --ckpt-path /home/ray/ray_results/CUJ-RL-xxxx/path/to/checkpoint/here

We've seen all the pieces. Let's glue it all together using the Anyscale SDK.

Step 5: Training & deploying on Anyscale with jobs.py

Let's break it down.

jobs.py at work

jobs.py does the following:

  1. (not pictured) Uses the SDK to create compute configs and cluster environments for both the training and the deploy task.
  2. Kicks off train.py with the Anyscale API using the SDK. This:
    1. Starts a Ray cluster in the Data Plane (either customer's account or Anyscale's, if using our Managed Cloud)
    2. Runs the command specified
    3. Retries if needed
    4. Termiantes the cluster
  3. Does the same, but for the deploy.py:
    1. Starts a Ray cluster in the Data Plane (either customer's account or Anyscale's, if using our Managed Cloud)
    2. Runs the command specified (long lived service)
  4. Tests the HTTPS endpoint deployed by querying it

We've wisely chosen to use cloud storage (ie: S3) as our state between runs, which allows our model deployment to refer to the checkpoints generated by the previously run training job. Read more about model checkpointing and tuning with Ray Tune here.

And in case you're wondering: the deploy job could have been kicked off by the training job itself! We'll leave that as an exercise to the reader :)

Where to go from here

Congratulations. You've trained, tuned, and deployed a scalable RL model without any complex devops or management of a Ray cluster yourself.

More Anyscale tutorials

  • CI/CD
  • Parallelism in Ray with time series data
  • Serving a Pytorch model
  • Developing Ray Serve applications
  • Weights & Biases on Anyscale
  • MLflow on Anyscale