Skip to content

Distributed Training

Interactive model training can be done within Notebooks. Note that you will only have access to the resources that you requested while creating the notebook and that you can only have access to a single node with one or multiple GPUs. If you need more resources, e.g. for a data distributed training, you can use several methods using KubeFlow and Ray. Depending on the used ML framework, the need for interactive access and reproducability, you can choose between the methods described below. If you need access to storage, for example for reading training data, see the storage documentation on how to access different storage backends.

Method Supported Framework How do you interact with the service?
Applying PyTorchJob Manifest PyTorch, PyTroch Lightning Submission via kubectl
Applying MPIJob Manifest Horovod (PyTorch, Tensorflow, Keras, PyTorch, MXNet), MPI Submission via kubectl
Applying RayJob Manifest PyTorch, PyTorch Lightning, Tensorflow, Horovod, XGBoost, LightGBM, HuggingFace Transformers & Accelerate, DeepSpeed Submission via kubectl
Connecting to RayCluster PyTorch, PyTorch Lightning, Tensorflow, Horovod, XGBoost, LightGBM, HuggingFace Transformers & Accelerate, DeepSpeed Programatically, can be run from interactive sessions within notebooks, VS Code, pipelines

PyTorchJob Manifest

Using the KubeFlow PyTorchJob, you can submit a distributed training which will set the required configuration for PyTorch to enable the DistributedDataParallel strategy. You will have to make changes to your code to make it compatible with the torchrun CLI. See the PyTorch documentation for more information on how to transition from single node to multi node training.

  1. Prepare your code
    from torch.utils.data.distributed import DistributedSampler
    from torch.nn.parallel import DistributedDataParallel as DDP
    from torch.distributed import init_process_group, destroy_process_group
    
    def ddp_setup():
        torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
        init_process_group(backend="nccl")
    
    def prepare_dataloader(dataset: Dataset, batch_size: int):
        return DataLoader(
            dataset,
            batch_size=batch_size,
            pin_memory=True,
            shuffle=False,
            sampler=DistributedSampler(dataset)
        )
    
    def main(total_epochs: int, batch_size: int):
        ddp_setup()
        train_data_loader = prepare_dataloader(dataset, batch_size)
    
        for epoch in range(total_epochs):
            train_data.sampler.set_epoch(epoch)
            for source, targets in self.train_data:
                run_batch(source, targets)
    
        destroy_process_group()
    
  2. Build and push your image. Replace the placeholder with your username/project and image name.
  3. Adjust the manifest to your needs
    apiVersion: kubeflow.org/v1
    kind: PyTorchJob
    spec:
      pytorchReplicaSpecs:
        Master:
          replicas: 1
          restartPolicy: OnFailure
          template:
            metadata:
              annotations:
                sidecar.istio.io/inject: "false"
            spec:
              containers:
                - name: pytorch
                  image: REPLACE_WITH_IMAGE_URL # registry.cern.ch/user/image
                  command:
                  - "REPLACE_WITH_COMMAND" # python3 train.py
        Worker:
          replicas: REPLACE_WITH_NUMBER_OF_WORKERS # e.g. 1
          restartPolicy: OnFailure
          template:
            metadata:
              annotations:
                sidecar.istio.io/inject: "false"
              labels:
                mount-eos: "true"
                inject-oauth2-token: "true"
            spec:
              containers:
              - name: worker
                image: REPLACE_WITH_IMAGE_URL
                command:
                  - "REPLACE_WITH_COMMAND" # python3 train.py
                resources:
                  limits:
                    nvidia.com/gpu: REPLACE_WITH_NUMBER_OF_GPUS_PER_WORKER # e.g. 1 
    
  4. Run kubectl apply -f job.yaml in a terminal either in a jupyter notebook or a VS Code Server
  5. Run kubectl get pytorchjobs to see the status of your job

MPIJob Manifest

You can use the MPIJob to run any MPI-based workload. Typically this is needed if you use the Horovod framework for distributed training. Alternatively, you can also run your custom mpirun workloads. 1. Prepare your code 2. Build and push your image. Replace the placeholder with your username/project and image name. 3. Adjust the manifest to your needs

apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
  name: job
spec:
  slotsPerWorker: 1
  runPolicy:
    cleanPodPolicy: Running
  mpiReplicaSpecs:
    Launcher:
      replicas: 1
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: 'false'
        spec:
          containers:
          - image: REPLACE_WITH_IMAGE_URL
            name: worker
            command:
            - "REPLACE_WITH_COMMAND" # python ./train.py --learning-rate=0.01
    Worker:
      replicas: REPLACE_WITH_NUMBER_OF_WORKERS
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
          labels:
            mount-eos: "true"
            inject-oauth2-token: "true"
        spec:
          containers:
          - image: REPLACE_WITH_IMAGE_URL
            name: worker
            resources:
              limits:
                nvidia.com/gpu: REPLACE_WITH_NUMBER_OF_GPUS_PER_WORKER # e.g. 2 
4. Run kubectl apply -f job.yaml in a terminal either in a jupyter notebook or a VS Code Server 5. Run kubectl get mpijobs to see the status of your job

TensorflowJob Manifest

You can use the TFJob to run Tensorflow-based workload. See the TensorFlow documentation about different strategies and how to implement them.

  1. Prepare your code For example, use the ParameterServerStrategy for a data-parallel method to scale up model training on multiple machines. A parameter server training cluster consists of workers and parameter servers. Variables are created on parameter servers and they are read and updated by workers in each step.
    strategy = tf.distribute.experimental.ParameterServerStrategy(
      tf.distribute.cluster_resolver.TFConfigClusterResolver(),
      variable_partitioner=variable_partitioner)
    coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
      strategy)
    
  2. Build and push your image. Replace the placeholder with your username/project and image name.
  3. Adjust the manifest to your needs
    apiVersion: kubeflow.org/v1
    kind: TFJob
    metadata:
      name: job
    spec:
      PS:
        replicas: 1
        template:
          metadata:
            annotations:
              sidecar.istio.io/inject: 'false'
          spec:
            containers:
            - image: REPLACE_WITH_IMAGE_URL
              name: worker
              command:
              - "REPLACE_WITH_COMMAND" # python ./train.py --learning-rate=0.01
      Worker:
        replicas: REPLACE_WITH_NUMBER_OF_WORKERS
        template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
          labels:
            mount-eos: "true"
            inject-oauth2-token: "true"
        spec:
          containers:
          - image: REPLACE_WITH_IMAGE_URL
            name: worker
            resources:
              limits:
                nvidia.com/gpu: REPLACE_WITH_NUMBER_OF_GPUS_PER_WORKER # e.g. 2 
    
  4. Run kubectl apply -f job.yaml in a terminal either in a jupyter notebook or a VS Code Server.
  5. Run kubectl get tfjobs to see the status of your job

Ray

Ray Train is a scalable machine learning library for distributed training and fine-tuning. It supports many common ML frameworks. The examples below will use PyTorch for demonstration. Note that other frameworks require other changes to your code.

Ray Images

Ray provides base images for many Python versions. Remember to use the registry proxy cache to prevent rate limiting. For example, use registry.cern.ch/docker.io/rayproject/ray:2.43.0-py39-gpu to get Ray 2.43 and Python 3.9. Remember to:

  • Set the rayVersion in the rayClusterConfig to the Ray version used in your custom Docker image.
  • Set the ray-head container’s image to the custom image’s name on Dockerhub.
  • Set the ray-worker container’s image to the custom image’s name on Dockerhub.

RayJob Manifest

A RayJob automatically creates a RayCluster and submits a job when the cluster is ready. You can also configure RayJob to automatically delete the RayCluster once the Ray job finishes.

  1. Adjust your code to interact with Ray:
    from ray.train import ScalingConfig
    from ray.train.torch import TorchTrainer
    
    def train():
        train_loader = DataLoader(subset, batch_size=int(batch_size), shuffle=True)
        train_loader = ray.train.torch.prepare_data_loader(train_loader)
        model = Model()
        model = ray.train.torch.prepare_model(model)
    
        for epoch in range(num_epochs):
            if ray.train.get_context().get_world_size() > 1:
                train_dataloader.sampler.set_epoch(epoch)
    
            for source, targets in self.train_data:
                run_batch()
    
    
    if __name__ == "__main__":
        ray.init()
        print(ray.cluster_resources())
    
        scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
        trainer = TorchTrainer(train, scaling_config=scaling_config, train_loop_config={"batch_size": batch_size})
        trainer.fit()
    
  2. Prepare your image. You have to use a Ray base image
    FROM ray:2.43.0-py312-cpu
    USER root
    RUN pip install torch
    RUN pip install torchvision
    USER ray
    
  3. Build and push your image. Replace the placeholder with your username/project and image name.
    docker login registry.cern.ch
    docker build -t registry.cern.ch/REPLACE_WITH_YOUR_USERNAME/REPLACE_WITH_YOUR_IMAGE .
    docker push registry.cern.ch/REPLACE_WITH_YOUR_USERNAME/REPLACE_WITH_YOUR_IMAGE
    
  4. Adjust the manifest to your needs
    apiVersion: ray.io/v1
    kind: RayJob
    metadata:
      name: job
    spec:
      entrypoint: REPLACE_WITH_ENTRYPOINT_COMMAND # python3 train.py --batch-size=10
      rayClusterSpec:
        rayVersion: '2.43.0' # should match the Ray version in the image of the containers!!
        headGroupSpec:
          rayStartParams:
            dashboard-host: '0.0.0.0'
          template:
            metadata:
              annotations:
                sidecar.istio.io/inject: "false"
              labels:
                mount-eos: "true"
                inject-oauth2-token: "true"
            spec:
              containers:
              - name: ray-head
                image: REPLACE_WITH_IMAGE_URL 
                ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265 # Ray dashboard
                  name: dashboard
                - containerPort: 10001
                  name: client
                resources:
                  limits:
                    cpu: "1"
                  requests:
                    cpu: "200m"
            workerGroupSpecs:
            - replicas: REPLACE_WITH_NUMBER_OF_WORKERS
              minReplicas: 1
              maxReplicas: 2
              groupName: small-group
              rayStartParams: {}
              template:
                metadata:
                  annotations:
                    sidecar.istio.io/inject: "false"
                  labels:
                    mount-eos: "true"
                    inject-oauth2-token: "true"
                spec:
                    containers:
                    - name: ray-worker
                      image: REPLACE_WITH_IMAGE_URL
                      lifecycle:
                        preStop:
                          exec:
                            command: [ "/bin/sh","-c","ray stop" ]
                      resources:
                        limits:
                          cpu: "1"
                          nvidia.com/gpu: 1
                        requests:
                          cpu: "200m"    
        submitterPodTemplate:
          metadata:
            annotations:
                sidecar.istio.io/inject: "false"
          spec:
            restartPolicy: Never
            containers:
              - name: rayjob-submitter
                image: rayproject/ray:2.9.0
    
  5. Run kubectl apply -f job.yaml in a terminal either in a jupyter notebook or a VS Code Server
  6. Run kubectl get rayjobs to see the status of your job
    NAME                  JOB STATUS   DEPLOYMENT STATUS   START TIME             END TIME               AGE
    job                                Initializing        2025-03-03T11:35:32Z                          11s
    
  7. Run kubectl get pods to see the worker pods. After a while, the submitter pods will be created
    NAME                                                      READY   STATUS         RESTARTS   AGE
    job-raycluster-7gx7p-head-hrsjk                           2/2     Running        0          104s
    job-raycluster-7gx7p-worker-small-group-dgzhp             2/2     Running        0          104s
    job-m44ns                                                 1/1     Running        0          4m17s
    
  8. Run kubectl logs -f REPLACE_WITH_SUBMITTER_POD_NAME e.g. kubectl logs job-m44ns to follow the logs of your job

Connecting to RayCluster

If you want leverage Ray for your distributed training but want the ability to iterate over the code, you can setup a RayCluster before hand and connect to it from within an interactive session e.g. inside a Jupyter notebook or VS Code development envrionment.

  1. Adjust the manifest to your needs.
    apiVersion: ray.io/v1
    kind: RayCluster
    metadata:
      name: raycluster
    spec:
      rayVersion: '2.23.0'
    headGroupSpec:
      rayStartParams:
        num-cpus: '1'
      template:
        metadata:
          labels:
            sidecar.istio.io/inject: "false"
        spec:
          serviceAccountName: default-editor
          containers:
          - name: ray-head
            image: REPLACE_WITH_RAY_IMAGE # e.g. registry.cern.ch/ngt-usecases/ray-torch-2.43.py312
            lifecycle:
              preStop:
                exec:
                    command: ["/bin/sh","-c","ray stop"]
            volumeMounts:
                - mountPath: /tmp/ray
                  name: ray-logs
            resources:
              limits:
                cpu: "1"
                memory: "2G"
              requests:
                  cpu: "100m"
                  memory: "2G"
          volumes:
            - name: ray-logs
              emptyDir: {}
    workerGroupSpecs:
      - replicas: 10
        minReplicas: 1
        maxReplicas: 10
        groupName: small-group
        rayStartParams:
            num-cpus: '1'
            node-manager-port: '6380'
            object-manager-port: '6381'
            runtime-env-agent-port: '6382'
            dashboard-agent-grpc-port: '6383'
            dashboard-agent-listen-port: '52365'
            metrics-export-port: '8080'
            max-worker-port: '10012'
        template:
            metadata:
              labels:
                sidecar.istio.io/inject: "false"
            spec:
              serviceAccountName: default-editor
              containers:
              - name: ray-worker
                image: REPLACE_WITH_RAY_IMAGE # e.g. registry.cern.ch/ngt-usecases/ray-torch-2.43.py312
                lifecycle:
                  preStop:
                    exec:
                      command: ["/bin/sh","-c","ray stop"]
                volumeMounts:
                  - mountPath: /tmp/ray
                    name: ray-logs
                resources:
                  limits:
                    cpu: "1"
                    memory: "500Mi"
                  requests:
                    cpu: "300m"
                    memory: "500Mi"
              volumes:
                  - name: ray-logs
                    emptyDir: {}
    
  2. Run kubectl get rayclusters to check the cluster status
    NAME                  DESIRED WORKERS   AVAILABLE WORKERS   CPUS   MEMORY   GPUS   STATUS   AGE
    kubeflow-raycluster   1                 1                   400m   3G       0      ready    2m9s
    
  3. Once your cluster is running, you can submit your workload from either a Notebook or a VS Code.
    import ray
    ray.init(address="ray://kubeflow-raycluster-head-svc:10001")
    print(ray.cluster_resources())
    
    NUM_WORKERS = 2
    scaling_config = ScalingConfig(num_workers=NUM_WORKERS, use_gpu=True)
    trainer = TorchTrainer(train, scaling_config=scaling_config, train_loop_config={"batch_size": batch_size})
    trainer.fit()
    

Warning

Make sure to use the same Ray versions and the same Python versions in your RayCluster and your local environment. Otherwise you might encounter errors like:

2024-11-28 14:35:57,994 WARNING utils.py:1591 -- Python patch version mismatch: The cluster was started with:
Ray: 2.23.0
Python: 3.11.9
This process on Ray Client was started with:
Ray: 2.23.0
Python: 3.11.10
RuntimeError: Python minor versions differ between client and server: client is 3.9.18, server is 3.11.9
For example, if you use Ray v2.43.0 and Python v3.12 in your Ray Cluster e.g. by using the image registry.cern.ch/docker.io/rayproject/ray:2.43.0-py312-cpu, you must also use the matching Ray client via pip install "ray[client]==2.43.0" and use Python v3.12 in your local environment, e.g. by basing your notebook image on the python:3.12 image.