Skip to main content

Parallelism in Ray with Time Series Data

The most basic use of ray is to parallelize workloads. You'll have to consider how to issue tasks to Ray depending on the size and speed of your workloads. This document has a few patterns for approaching parallelism -- how Python looks when written as a single thread, and then two approachs to delegating parallel tasks with Ray Core.

Naive Approach -- Single Threaded

Naive Parallelism

The code block below will generate a Prophet model and fit against the dataset 1 by 1, single threaded.

loc_list = df["PULocationID"].unique()

# vanilla impl without ray
result = {}
for i in loc_list:
m = Prophet()
m.fit(df[df["PULocationID"]==i])
result[i]=m

Connecing to Anyscale

The archtitectures repository contains the code snippets on this page. To run them in Anyscale, this connection call makes sure to exclude the training data and also installs prophet as part of a runtime environment.

ray.init(
"anyscale://parallel",
log_to_driver=False,
runtime_env={"pip":["prophet"],"excludes":["yellow*"]}
)

Firehose Approach

Firehose Approach

The code block below will launch Ray task for every fit_prophet invocation, and the task will be executed asynchronously on remote Ray workers. For-loop, also called the driver script, will be launching the tasks as fast as possible.

This approach delivers the highest throughput, shortest execution time from start to finish. But it is also the least efficient given that the cluster is trying to scale out a large number of worker nodes to meet the demand. Anyscale's hosted Ray cluster can scale out very fast, but it still take some time. In the scenario where each individual tasks are very short, it could be that when the new worker nodes is ready, there are some earlier tasks are finished and those Ray worker process can be reused for task execution, resulting the now worker nodes no longer needed.

@ray.remote
def fit_prophet(i):
m = Prophet()
m.fit(df[df["PULocationID"]==i])
return m

## Fire Hose Approach -- fire as fast as you can and wait for the result
result = []
for i in loc_list:
result.append(fit_prophet.remote(i))
ray.get(result)

Ray Task Launching with Back Pressure

Ray Task Launching with Back Pressure

The below code block will launch Ray tasks as fast as possible while trying to maintain the max number of tasks in-flight to be within a threshold. This is done via implemented the back pressure pattern. There is a specific check before the actual launch of the task, checking if the number of tasks in flight / not-yet-returned is greater than the threshold. If so, enter in a blocking execution to wait for the threshold to be satisfied and then proceed to launch new tasks.

## back pressure to limit the # of tasks in flight
result = []
max_tasks = 10 # specifying the max number of results
for i in loc_list:
if len(result) > max_tasks:
# calculating how many results should be available
num_ready = len(result)-max_tasks
# wait for num_returns to be equal to num_ready, ensuring the amount of task in flight is checked
ray.wait(result, num_returns=num_ready)
result.append(fit_prophet.remote(i))
ray.get(result)