Distributed Training
Interactive model training can be done within Notebooks. Note that you will only have access to the resources that you requested while creating the notebook and that you can only have access to a single node with one or multiple GPUs. If you need more resources, e.g. for a data distributed training, you can use several methods using KubeFlow and Ray. Depending on the used ML framework, the need for interactive access and reproducability, you can choose between the methods described below. If you need access to storage, for example for reading training data, see the storage documentation on how to access different storage backends.
Method | Supported Framework | How do you interact with the service? |
---|---|---|
Applying PyTorchJob Manifest | PyTorch, PyTroch Lightning | Submission via kubectl |
Applying MPIJob Manifest | Horovod (PyTorch, Tensorflow, Keras, PyTorch, MXNet), MPI | Submission via kubectl |
Applying RayJob Manifest | PyTorch, PyTorch Lightning, Tensorflow, Horovod, XGBoost, LightGBM, HuggingFace Transformers & Accelerate, DeepSpeed | Submission via kubectl |
Connecting to RayCluster | PyTorch, PyTorch Lightning, Tensorflow, Horovod, XGBoost, LightGBM, HuggingFace Transformers & Accelerate, DeepSpeed | Programatically, can be run from interactive sessions within notebooks, VS Code, pipelines |
PyTorchJob Manifest
Using the KubeFlow PyTorchJob
, you can submit a distributed training which will set the required configuration for PyTorch to enable the DistributedDataParallel
strategy. You will have to make changes to your code to make it compatible with the torchrun
CLI. See the PyTorch documentation for more information on how to transition from single node to multi node training.
- Prepare your code
from torch.utils.data.distributed import DistributedSampler from torch.nn.parallel import DistributedDataParallel as DDP from torch.distributed import init_process_group, destroy_process_group def ddp_setup(): torch.cuda.set_device(int(os.environ["LOCAL_RANK"])) init_process_group(backend="nccl") def prepare_dataloader(dataset: Dataset, batch_size: int): return DataLoader( dataset, batch_size=batch_size, pin_memory=True, shuffle=False, sampler=DistributedSampler(dataset) ) def main(total_epochs: int, batch_size: int): ddp_setup() train_data_loader = prepare_dataloader(dataset, batch_size) for epoch in range(total_epochs): train_data.sampler.set_epoch(epoch) for source, targets in self.train_data: run_batch(source, targets) destroy_process_group()
- Build and push your image. Replace the placeholder with your username/project and image name.
- Adjust the manifest to your needs
apiVersion: kubeflow.org/v1 kind: PyTorchJob spec: pytorchReplicaSpecs: Master: replicas: 1 restartPolicy: OnFailure template: metadata: annotations: sidecar.istio.io/inject: "false" spec: containers: - name: pytorch image: REPLACE_WITH_IMAGE_URL # registry.cern.ch/user/image command: - "REPLACE_WITH_COMMAND" # python3 train.py Worker: replicas: REPLACE_WITH_NUMBER_OF_WORKERS # e.g. 1 restartPolicy: OnFailure template: metadata: annotations: sidecar.istio.io/inject: "false" labels: mount-eos: "true" inject-oauth2-token: "true" spec: containers: - name: worker image: REPLACE_WITH_IMAGE_URL command: - "REPLACE_WITH_COMMAND" # python3 train.py resources: limits: nvidia.com/gpu: REPLACE_WITH_NUMBER_OF_GPUS_PER_WORKER # e.g. 1
- Run
kubectl apply -f job.yaml
in a terminal either in a jupyter notebook or a VS Code Server - Run
kubectl get pytorchjobs
to see the status of your job
MPIJob Manifest
You can use the MPIJob
to run any MPI-based workload. Typically this is needed if you use the Horovod framework for distributed training. Alternatively, you can also run your custom mpirun
workloads.
1. Prepare your code
2. Build and push your image. Replace the placeholder with your username/project and image name.
3. Adjust the manifest to your needs
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
name: job
spec:
slotsPerWorker: 1
runPolicy:
cleanPodPolicy: Running
mpiReplicaSpecs:
Launcher:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: 'false'
spec:
containers:
- image: REPLACE_WITH_IMAGE_URL
name: worker
command:
- "REPLACE_WITH_COMMAND" # python ./train.py --learning-rate=0.01
Worker:
replicas: REPLACE_WITH_NUMBER_OF_WORKERS
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
labels:
mount-eos: "true"
inject-oauth2-token: "true"
spec:
containers:
- image: REPLACE_WITH_IMAGE_URL
name: worker
resources:
limits:
nvidia.com/gpu: REPLACE_WITH_NUMBER_OF_GPUS_PER_WORKER # e.g. 2
kubectl apply -f job.yaml
in a terminal either in a jupyter notebook or a VS Code Server
5. Run kubectl get mpijobs
to see the status of your job
TensorflowJob Manifest
You can use the TFJob
to run Tensorflow-based workload. See the TensorFlow documentation about different strategies and how to implement them.
- Prepare your code
For example, use the
ParameterServerStrategy
for a data-parallel method to scale up model training on multiple machines. A parameter server training cluster consists of workers and parameter servers. Variables are created on parameter servers and they are read and updated by workers in each step. - Build and push your image. Replace the placeholder with your username/project and image name.
- Adjust the manifest to your needs
apiVersion: kubeflow.org/v1 kind: TFJob metadata: name: job spec: PS: replicas: 1 template: metadata: annotations: sidecar.istio.io/inject: 'false' spec: containers: - image: REPLACE_WITH_IMAGE_URL name: worker command: - "REPLACE_WITH_COMMAND" # python ./train.py --learning-rate=0.01 Worker: replicas: REPLACE_WITH_NUMBER_OF_WORKERS template: metadata: annotations: sidecar.istio.io/inject: "false" labels: mount-eos: "true" inject-oauth2-token: "true" spec: containers: - image: REPLACE_WITH_IMAGE_URL name: worker resources: limits: nvidia.com/gpu: REPLACE_WITH_NUMBER_OF_GPUS_PER_WORKER # e.g. 2
- Run
kubectl apply -f job.yaml
in a terminal either in a jupyter notebook or a VS Code Server. - Run
kubectl get tfjobs
to see the status of your job
Ray
Ray Train is a scalable machine learning library for distributed training and fine-tuning.
It supports many common ML frameworks. The examples below will use PyTorch
for demonstration. Note that other frameworks require other changes to your code.
Ray Images
Ray provides base images for many Python versions. Remember to use the registry proxy cache to prevent rate limiting. For example, use registry.cern.ch/docker.io/rayproject/ray:2.43.0-py39-gpu
to get Ray 2.43
and Python 3.9
.
Remember to:
- Set the
rayVersion
in therayClusterConfig
to the Ray version used in your custom Docker image. - Set the
ray-head
container’s image to the custom image’s name on Dockerhub. - Set the
ray-worker
container’s image to the custom image’s name on Dockerhub.
RayJob Manifest
A RayJob automatically creates a RayCluster and submits a job when the cluster is ready. You can also configure RayJob to automatically delete the RayCluster once the Ray job finishes.
- Adjust your code to interact with Ray:
from ray.train import ScalingConfig from ray.train.torch import TorchTrainer def train(): train_loader = DataLoader(subset, batch_size=int(batch_size), shuffle=True) train_loader = ray.train.torch.prepare_data_loader(train_loader) model = Model() model = ray.train.torch.prepare_model(model) for epoch in range(num_epochs): if ray.train.get_context().get_world_size() > 1: train_dataloader.sampler.set_epoch(epoch) for source, targets in self.train_data: run_batch() if __name__ == "__main__": ray.init() print(ray.cluster_resources()) scaling_config = ScalingConfig(num_workers=2, use_gpu=True) trainer = TorchTrainer(train, scaling_config=scaling_config, train_loop_config={"batch_size": batch_size}) trainer.fit()
- Prepare your image. You have to use a Ray base image
- Build and push your image. Replace the placeholder with your username/project and image name.
- Adjust the manifest to your needs
apiVersion: ray.io/v1 kind: RayJob metadata: name: job spec: entrypoint: REPLACE_WITH_ENTRYPOINT_COMMAND # python3 train.py --batch-size=10 rayClusterSpec: rayVersion: '2.43.0' # should match the Ray version in the image of the containers!! headGroupSpec: rayStartParams: dashboard-host: '0.0.0.0' template: metadata: annotations: sidecar.istio.io/inject: "false" labels: mount-eos: "true" inject-oauth2-token: "true" spec: containers: - name: ray-head image: REPLACE_WITH_IMAGE_URL ports: - containerPort: 6379 name: gcs-server - containerPort: 8265 # Ray dashboard name: dashboard - containerPort: 10001 name: client resources: limits: cpu: "1" requests: cpu: "200m" workerGroupSpecs: - replicas: REPLACE_WITH_NUMBER_OF_WORKERS minReplicas: 1 maxReplicas: 2 groupName: small-group rayStartParams: {} template: metadata: annotations: sidecar.istio.io/inject: "false" labels: mount-eos: "true" inject-oauth2-token: "true" spec: containers: - name: ray-worker image: REPLACE_WITH_IMAGE_URL lifecycle: preStop: exec: command: [ "/bin/sh","-c","ray stop" ] resources: limits: cpu: "1" nvidia.com/gpu: 1 requests: cpu: "200m" submitterPodTemplate: metadata: annotations: sidecar.istio.io/inject: "false" spec: restartPolicy: Never containers: - name: rayjob-submitter image: rayproject/ray:2.9.0
- Run
kubectl apply -f job.yaml
in a terminal either in a jupyter notebook or a VS Code Server - Run
kubectl get rayjobs
to see the status of your job - Run
kubectl get pods
to see the worker pods. After a while, the submitter pods will be created - Run
kubectl logs -f REPLACE_WITH_SUBMITTER_POD_NAME
e.g.kubectl logs job-m44ns
to follow the logs of your job
Connecting to RayCluster
If you want leverage Ray for your distributed training but want the ability to iterate over the code, you can setup a RayCluster
before hand and connect to it from within an interactive session e.g. inside a Jupyter notebook or VS Code development envrionment.
- Adjust the manifest to your needs.
apiVersion: ray.io/v1 kind: RayCluster metadata: name: raycluster spec: rayVersion: '2.23.0' headGroupSpec: rayStartParams: num-cpus: '1' template: metadata: labels: sidecar.istio.io/inject: "false" spec: serviceAccountName: default-editor containers: - name: ray-head image: REPLACE_WITH_RAY_IMAGE # e.g. registry.cern.ch/ngt-usecases/ray-torch-2.43.py312 lifecycle: preStop: exec: command: ["/bin/sh","-c","ray stop"] volumeMounts: - mountPath: /tmp/ray name: ray-logs resources: limits: cpu: "1" memory: "2G" requests: cpu: "100m" memory: "2G" volumes: - name: ray-logs emptyDir: {} workerGroupSpecs: - replicas: 10 minReplicas: 1 maxReplicas: 10 groupName: small-group rayStartParams: num-cpus: '1' node-manager-port: '6380' object-manager-port: '6381' runtime-env-agent-port: '6382' dashboard-agent-grpc-port: '6383' dashboard-agent-listen-port: '52365' metrics-export-port: '8080' max-worker-port: '10012' template: metadata: labels: sidecar.istio.io/inject: "false" spec: serviceAccountName: default-editor containers: - name: ray-worker image: REPLACE_WITH_RAY_IMAGE # e.g. registry.cern.ch/ngt-usecases/ray-torch-2.43.py312 lifecycle: preStop: exec: command: ["/bin/sh","-c","ray stop"] volumeMounts: - mountPath: /tmp/ray name: ray-logs resources: limits: cpu: "1" memory: "500Mi" requests: cpu: "300m" memory: "500Mi" volumes: - name: ray-logs emptyDir: {}
- Run
kubectl get rayclusters
to check the cluster status - Once your cluster is running, you can submit your workload from either a Notebook or a VS Code.
import ray ray.init(address="ray://kubeflow-raycluster-head-svc:10001") print(ray.cluster_resources()) NUM_WORKERS = 2 scaling_config = ScalingConfig(num_workers=NUM_WORKERS, use_gpu=True) trainer = TorchTrainer(train, scaling_config=scaling_config, train_loop_config={"batch_size": batch_size}) trainer.fit()
Warning
Make sure to use the same Ray versions and the same Python versions in your RayCluster
and your local environment.
Otherwise you might encounter errors like:
2024-11-28 14:35:57,994 WARNING utils.py:1591 -- Python patch version mismatch: The cluster was started with:
Ray: 2.23.0
Python: 3.11.9
This process on Ray Client was started with:
Ray: 2.23.0
Python: 3.11.10
RuntimeError: Python minor versions differ between client and server: client is 3.9.18, server is 3.11.9
v2.43.0
and Python v3.12
in your Ray Cluster e.g. by using the image registry.cern.ch/docker.io/rayproject/ray:2.43.0-py312-cpu
, you must also use the matching Ray client via pip install "ray[client]==2.43.0"
and use Python v3.12
in your local environment, e.g. by basing your notebook image on the python:3.12
image.