Skip to main content

gRPC Services

note

Support for gRPC services requires Ray 2.7+ and Anyscale CLI version 0.5.129+.

This tutorial walks you through how to deploy gRPC Ray Serve applications in an Anyscale Service.

For more information on gRPC Ray Serve applications, see the Ray Serve gRPC service docs.

All code used for this tutorial can be found in Running gRPC services on Anyscale.

Getting started

Build an image containing compiled protobufs and servicer functions

Ray Serve running gRPC services require to have importable gRPC servicer functions to start. This means in order to use custom defined gRPC service and methods on Anyscale Services, you would need to start with BYOD with the Python code artifact compiled from a .proto file.

We will start with a quickstart example. We define our user_defined_protos.proto like the following

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.ray.examples.user_defined_protos";
option java_outer_classname = "UserDefinedProtos";

package userdefinedprotos;

message UserDefinedMessage {
string name = 1;
}

message UserDefinedResponse {
string greeting = 1;
}

service UserDefinedService {
rpc __call__(UserDefinedMessage) returns (UserDefinedResponse);
}

And our deployment.py like the following

from ray import serve
from user_defined_protos_pb2 import UserDefinedMessage, UserDefinedResponse


@serve.deployment
class GrpcDeployment:
def __call__(self, user_message: UserDefinedMessage) -> UserDefinedResponse:
greeting = f"Hello {user_message.name}!"
user_response = UserDefinedResponse(greeting=greeting)
return user_response


grpc_app = GrpcDeployment.options(name="grpc-deployment").bind()

And our Dockerfile like the following

# Use Anyscale base image
FROM anyscale/ray:2.9.0-py310

# Install dependencies
RUN pip install --upgrade pip && pip install -U torch==2.0.1 torchvision==0.15.2

WORKDIR /home/ray

# Copy protobuf and deployment definitions into the docker image
COPY user_defined_protos.proto /home/ray/user_defined_protos.proto
COPY deployment.py /home/ray/deployment.py

# Add working directory into python path so they are importable
ENV PYTHONPATH=/home/ray

# Build python code from .proto file
RUN python -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. ./user_defined_protos.proto

Once we have all three of those files created locally, we can build and push the docker image with the following command

# build the docker image
docker build . -t my-registry/my-image:tag

# push the docker image to your registry
docker push my-registry/my-image:tag

If you want to learn more about docker registry and options, you can read more here

Deploy a service using the built environment

First thing we need is a cluster environment built with the docker image we just created in the previous step. Create a cluster_env.yaml file with the following content

docker_image: my-registry/my-image:tag  # <--- replace with your image
ray_version: 2.9.0 # <--- replace if your image is based off different Ray version

Create a cluster environment using the Anyscale CLI.

anyscale cluster-env build cluster_env.yaml --name grpc-cluster-env

Next, we need to create a service definition file. Create a service.yaml file with the following content

name: grpc-service
cluster_env: grpc-cluster-env:1
cloud: anyscale_v2_default_cloud
config:
protocols:
grpc:
enabled: true # Enable gRPC protocol.
service_names:
- userdefinedprotos # The name of the gRPC service in your .proto file.
ray_serve_config:
grpc_options:
port: 9000
grpc_servicer_functions:
- user_defined_protos_pb2_grpc.add_UserDefinedServiceServicer_to_server

applications:
- name: grpc_app
route_prefix: /grpc_app
import_path: deployment:grpc_app
runtime_env: { }

The only thing to note here is that we need to ensure the gRPC protocol is enabled by setting config.protocols.grpc.enabled to true and config.protocols.grpc.service_names to match with the name of the services in the .proto file.

To roll out the service, we can run the following command

anyscale service rollout -f service.yaml

You should see the following output with the URL taking you to the Services UI

% anyscale service rollout -f service.yaml
Authenticating

(anyscale +3.2s) Using default compute config for specified cloud anyscale_v2_default_cloud: cpt_mkrmyaw6z3lcxu84ch7zr4i1j2.
(anyscale +3.6s) No project specified. Continuing without a project.
(anyscale +4.6s) Service service2_sfvfqms9txim3zbfsfxibi5fwv has been deployed. Service is transitioning towards: RUNNING.
(anyscale +4.6s) View the service in the UI at https://console.anyscale.com/services/service2_sfvfqms9txim3zbfsfxibi5fwv

Send test requests to the service

Once the service is in running state, you can find the token and domain name in the Anyscale UI by clicking on the Query button in the upper right corner.

import grpc
from user_defined_protos_pb2_grpc import UserDefinedServiceStub
from user_defined_protos_pb2 import UserDefinedMessage


# Replace url and token with your own.
url = "grpc-service-bxauk.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com"
token = "ABNM_uL1LdlNhqB-jy_h0Jmb5JmocVHPwfZOL7iyTe4"

credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(url, credentials)
stub = UserDefinedServiceStub(channel)
request = UserDefinedMessage(name="Ray")
auth_token_metadata = ("authorization", f"bearer {token}")
metadata = (
("application", "grpc_app"),
auth_token_metadata,
)
response, call = stub.__call__.with_call(request=request, metadata=metadata)
print(call.trailing_metadata()) # Request id is returned in the trailing metadata
print("Output type:", type(response)) # Response is a type of UserDefinedMessage
print("Full output:", response)

Real-world example: image classification service

In the previous example, we used a simple example to demonstrate how to deploy a gRPC service on Anyscale. In this example, we will use an image classification service to demonstrate a real world use case.

Rebuild Docker image

We define our user_defined_protos.proto like the following

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.ray.examples.user_defined_protos";
option java_outer_classname = "UserDefinedProtos";

package userdefinedprotos;

message ImageData {
string url = 1;
string filename = 2;
}
message ImageClass {
repeated string classes = 1;
repeated float probabilities = 2;
}
service ImageClassificationService {
rpc Predict(ImageData) returns (ImageClass);
}

And our deployment.py like the following

import requests
import torch
from PIL import Image
from io import BytesIO
from ray import serve
from ray.serve.handle import RayServeDeploymentHandle
from torchvision import transforms
from typing import List
from user_defined_protos_pb2 import ImageData, ImageClass


@serve.deployment
class ImageClassifier:
def __init__(
self,
_image_downloader: RayServeDeploymentHandle,
_data_preprocessor: RayServeDeploymentHandle,
):
self._image_downloader = _image_downloader.options(use_new_handle_api=True)
self._data_preprocessor = _data_preprocessor.options(use_new_handle_api=True)
self.model = torch.hub.load(
"pytorch/vision:v0.10.0", "resnet18", pretrained=True
)
self.model.eval()
self.categories = self._image_labels()

def _image_labels(self) -> List[str]:
categories = []
url = (
"https://raw.githubusercontent.com/pytorch/hub/master/imagenet_classes.txt"
)
labels = requests.get(url).text
for label in labels.split("\n"):
categories.append(label.strip())
return categories

async def Predict(self, image_data: ImageData) -> ImageClass:
# Download image
image = await self._image_downloader.remote(image_data.url)

# Preprocess image
input_batch = await self._data_preprocessor.remote(image)
# Predict image
with torch.no_grad():
output = self.model(input_batch)

probabilities = torch.nn.functional.softmax(output[0], dim=0)
return self.process_model_outputs(probabilities)

def process_model_outputs(self, probabilities: torch.Tensor) -> ImageClass:
image_classes = []
image_probabilities = []
# Show top categories per image
top5_prob, top5_catid = torch.topk(probabilities, 5)
for i in range(top5_prob.size(0)):
image_classes.append(self.categories[top5_catid[i]])
image_probabilities.append(top5_prob[i].item())

return ImageClass(
classes=image_classes,
probabilities=image_probabilities,
)


@serve.deployment
class ImageDownloader:
def __call__(self, image_url: str):
image_bytes = requests.get(image_url).content
return Image.open(BytesIO(image_bytes)).convert("RGB")


@serve.deployment
class DataPreprocessor:
def __init__(self):
self.preprocess = transforms.Compose(
[
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
),
]
)

def __call__(self, image: Image):
input_tensor = self.preprocess(image)
return input_tensor.unsqueeze(0) # create a mini-batch as expected by the model


image_downloader = ImageDownloader.bind()
data_preprocessor = DataPreprocessor.bind()
grpc_image_classifier = ImageClassifier.options(name="grpc-image-classifier").bind(
image_downloader, data_preprocessor
)

We can reuse the same Dockerfile to build and push the docker image containing the new code.

Roll out new services

Once we pushed the new docker image, make sure to change the docker image tag in cluster_env.yaml (if you used a different tag) and rerun the command to create a new cluster environment.

Next, we need to update a service definition file to deploys the grpc_image_classifier application. Create a service.yaml file with the following content

name: grpc-service
cluster_env: grpc-cluster-env:2 # rebuild the cluster env will automatically increment the version
cloud: anyscale_v2_default_cloud
config:
protocols:
grpc:
enabled: true
service_names:
- userdefinedprotos
# Ray Serve config now deploys the grpc_image_classifier application
ray_serve_config:
grpc_options:
port: 9000
grpc_servicer_functions:
- user_defined_protos_pb2_grpc.add_ImageClassificationServiceServicer_to_server

applications:
- name: grpc_image_classifier
route_prefix: /grpc_image_classifier
import_path: deployment:grpc_image_classifier
runtime_env: { }

Just like before we can run anyscale service rollout -f service.yaml to roll out this image classification service.

Send image classification request

Once the new service is running, you can find the token and domain in the upper right corner of the Anyscale UI by clicking "Query" button. For sending request through gRPC using Python, you can use the following example code

import grpc
from user_defined_protos_pb2_grpc import ImageClassificationServiceStub
from user_defined_protos_pb2 import ImageData


# Replace url and token with your own.
url = "grpc-service-bxauk.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com"
token = "ABNM_uL1LdlNhqB-jy_h0Jmb5JmocVHPwfZOL7iyTe4"

credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(url, credentials)
stub = ImageClassificationServiceStub(channel)
test_in = ImageData(
url="https://github.com/pytorch/hub/raw/master/images/dog.jpg",
)
auth_token_metadata = ("authorization", f"bearer {token}")
metadata = (
("application", "grpc_image_classifier"),
auth_token_metadata,
)
response, call = stub.Predict.with_call(request=test_in, metadata=metadata)
print(call.trailing_metadata()) # Request id is returned in the trailing metadata
print("Output type:", type(response)) # Response is a type of ImageClass
print("Full output:", response)
print("Output classes field:", response.classes)
print("Output probabilities field:", response.probabilities)

This code will create a secured channel with the service and pass the auth token as a metadata in the request. The service will ran some image processing steps to download, resize, and normalize the image. Then run the model inference image classification. The output should consist of top 5 image classes and their probabilities.