Handling files and packages on your cluster with Ray runtime environments

By Archit Kulkarni and Edward Oakes   

Ray makes scaling up to a cluster seamless. 

But how do you get your files onto the cluster, and make sure all the machines have right Python packages? And what if you need to run multiple jobs on the same cluster with different, possibly conflicting package dependencies? 

You could bake all your files and packages into a container image, but this comes with one major drawback when doing iterative development: it's slow. It can take several minutes to rebuild a container image, push it to the cluster, and then wait for your cluster to restart.

For rapid iterative development, Ray’s runtime environments feature provides a simple answer to all these questions. A runtime environment is specified in Python, and is most easily described with an example:

1
2
3
4
5
runtime_env = {
    "working_dir": "/code/my_project"
     "pip": ["pendulum", "requests==2.27.1"]
    "env_vars": {"TF_WARNINGS": "none"}
}

(Note: Ray provides a typed API for runtime_env as well.)  

Pass this into ray.init(runtime_env=…) when connecting to your cluster using Ray Client, or in a standalone JSON file when using Ray Job Submission, and this environment will be automatically installed on all nodes in the long-running Ray cluster, and used by all workers for that particular Ray job (note that multiple jobs with different runtime environments can be run concurrently!).  

Concretely, “installing a runtime environment” means:

  • The  working_dir directory on your laptop will be uploaded to all the cluster machines, and the Ray worker processes will be started in their node’s copy of that directory.

  • The environment variables specified by env_vars will be set for all the Ray workers.

  • The packages specified in the pip field will be installed into a virtualenv via pip install and will be importable by the Ray workers.

There are more runtime_env fields besides those mentioned above; these can be found in the API reference.

The workflow now looks like this: Update the runtime_env python object along with your other code updates, rerun your script, and repeat.  You don’t have to restart your Ray cluster, and you don’t have to rebuild any container image.

blog-runtime-env-dependency-workflow

You can even specify different runtime environments per-task or per-actor by passing them into the @ray.remote decorator:

1
2
3
@ray.remote(runtime_env={…})
Class MyActor:
    # …

This is useful for applications that require running concurrent workloads with different, possibly conflicting package dependencies (for example, one workload that needs tensor flow 1 and another that needs tensor flow 2).

Caching for speed

Files and packages, once downloaded or installed for the first time, are cached on the cluster for quick reuse. Thus, for example, if you simply want to change your pip requirements but don’t change anything in your working_dir, the working_dir won’t be reuploaded. And conversely, if you edit a file in your working_dir but don’t change your pip requirements, the working_dir will be re-uploaded but the Python packages won’t be reinstalled. If you ever start up more tasks and actors whose runtime environment needs any of these existing resources, those resources won’t be re-downloaded or re-installed.

Architecture

blog-runtime-env-architecture

The Raylet is a process that runs on each node in a Ray cluster. Also running on each node is a RuntimeEnvAgent process that handles all runtime environment installation and deletion. When a task is invoked or an actor is instantiated that needs a runtime environment, the Raylet sends a GetOrCreateRuntimeEnv gRPC request to the agent. The agent checks its cache to see which of the runtime_env fields have already been installed on the node, and installs whatever is missing. After the agent replies to the Raylet, the Raylet can schedule the task or actor.

When this actor or task exits, the underlying Ray worker exits and the Raylet issues a request to the agent to delete the runtime environment resources if possible. If the runtime environment isn’t referenced by any running tasks, actors, or jobs on the cluster, its local resources are marked unused, ready to be garbage collected whenever the cache size limit is exceeded.

The caching is achieved by using a reference counting mechanism to keep track of which Ray workers depend on which runtime_env resources (files and packages). The cache size limit is configurable. See the docs for more information.

Future work

Stay tuned for the following features, and please reach out to us on Slack if these features would be useful to you — we’d love to learn more about your use case!

  • Better support for specifying a Docker image to use in a runtime environment: If you already have container images ready to go, this feature would allow you to run jobs, tasks, and actors on a single cluster using your images — useful for heavyweight dependencies that you don’t want to install at runtime.

  • Cross-language support: Ray runtime environments can currently only be used with Python, but Ray Core supports Java as well. Here runtime environments would be able to specify JAR files and other Java-specific dependencies for your tasks and actors.

  • A plugin API to enable users to add new fields to runtime_env for customized setup, beyond the existing fields such as pip, conda, and working_dir: This would allow you to run a special setup hook for tasks and actors without having to maintain your own fork of Ray.

Sharing

Sign up for product updates