8000 Support pinning to local rank GPU index in Spark estimators by thinkall · Pull Request #3737 · horovod/horovod · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Support pinning to local rank GPU index in Spark estimators #3737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 31, 2023

Conversation

thinkall
Copy link
Contributor
@thinkall thinkall commented Oct 11, 2022

Signed-off-by: Li Jiang bnujli@gmail.com

Checklist before submitting

  • Did you read the contributor guide?
  • Did you update the docs?
  • Did you write any tests to validate this change?
  • Did you update the CHANGELOG, if this change affects users?

Description

When using spark estimators (torch/lightning/keras) in GPU clusters, get_available_devices will be called

def get_available_devices():
if 'gpu' not in _info.resources:
return []
return _info.resources['gpu'].addresses

in _get_assigned_gpu_or_default.

def _get_assigned_gpu_or_default(default):
from horovod.spark.task import get_available_devices
available_devices = get_available_devices()
if available_devices:
# if GPU-aware scheduling is available, pin to the assigned GPU index
return int(available_devices[0])
else:
# pin to default GPU index (local rank)
return default

And the get_available_devices will get the devices by calling _get_resources.

def _get_resources(self):
if version.parse(pyspark.__version__) >= version.parse('3.0.0'):
task_context = pyspark.TaskContext.get()
if task_context:
return task_context.resources()
else:
# task_context is None when not run on Spark worker
# this only happens while running test_spark.test_task_fn_run_gloo_exec()
print("Not running inside Spark worker, no resources available")
return dict()

It works well in single-GPU per node clusters. However, when it comes to clusters with multi-GPUs per node. Wrong GPU index may be returned.

For spark clusters, usually getGpusResources.sh is used for discovering GPU resources.

For clusters with multi-GPUs per node, if multi-GPUs are assigned to one executor, then the resources will be like:

image

In this case, _get_resources will return {"name": "gpu", "addresses":["0","1"]}, thus _get_assigned_gpu_or_default will always return 0, which then be pined to different horovod processes. As a result, only one GPU (always index 0) of each node can be used by horovod spark estimators.

If one GPU is assigned to one executor, then the resources will be like:

image

In this case, still only one GPU (always index 0) of each node can be used by horovod spark estimators.

Only when each node has multi-executors and each executor has been assigned different GPUs, will all the GPUs be available to horovod spark estimators. Like below:

image

However, in this case, GPUs must be configured in EXCLUSIVE_PROCESS mode. Unfortunately, for spark clusters with GPUs, spark-rapids is usually needed. Which means, if we configure GPUs in EXCLUSIVE_PROCESS mode, we can only use spark-rapids or horovod, but not both.

To solve the issue, and let us use all GPUs in all nodes, and leverage both spark-rapids and horovod seems to be meaningful.

In fact, it's quite easy to handle. Just add an env USE_DEFAULT_GPU_INDEX for manually pin to default GPU index (local rank) will do the trick.

import os
def get_available_devices():
    if 'gpu' not in _info.resources or os.getenv('USE_DEFAULT_GPU_INDEX') == '1':
        return []
    return _info.resources['gpu'].addresses

With this small change, users can maually pin to default GPU index (local rank) just like they do for horovod runners.

Review process to land

  1. All tests and other checks must succeed.
  2. At least one member of the technical steering committee must review and approve.
  3. If any member of the technical steering committee requests changes, they must be addressed.

@github-actions
Copy link
github-actions bot commented Oct 11, 2022

Unit Test Results

  1 155 files  ±0    1 155 suites  ±0   12h 57m 8s ⏱️ + 4m 28s
     841 tests ±0       788 ✔️ ±0       53 💤 ±0  0 ±0 
23 809 runs  ±0  17 023 ✔️ ±0  6 786 💤 ±0  0 ±0 

Results for commit 6f8491b. ± Comparison against base commit bfe96f8.

♻️ This comment has been updated with latest results.

@github-actions
Copy link
github-actions bot commented Oct 11, 2022

Unit Test Results (with flaky tests)

  1 282 files   -   13    1 282 suites   - 13   13h 30m 55s ⏱️ -6s
     841 tests ±    0       787 ✔️ ±    0       53 💤 ±0  1 ±0 
26 600 runs   - 169  18 740 ✔️  - 161  7 859 💤  - 7  1  - 1 

For more details on these failures, see this check.

Results for commit 6f8491b. ± Comparison against base commit bfe96f8.

♻️ This comment has been updated with latest results.

@Tixxx
Copy link
Collaborator
Tixxx commented Nov 16, 2022

in general LGTM, just a minor comment

@thinkall thinkall requested a review from Tixxx November 17, 2022 08:07
@thinkall thinkall force-pushed the spark-estimator-multi-gpus-per-node branch 2 times, most recently from 0b451d3 to c84b240 Compare November 17, 2022 08:17
@EnricoMi
Copy link
Collaborator

I'd say the problem is here:

 if available_devices: 
     # if GPU-aware scheduling is available, pin to the assigned GPU index 
     return int(available_devices[0]) 

This should read

 if available_devices:
    # if GPU-aware scheduling is available, pin to one of the assigned GPUs
    if len(available_devices) >= hvd.local_size():
        # pin to GPU based on local rank index
        return int(available_devices[hvd.local_rank()])
    else:
        # pin to first available GPU
        return int(available_devices[0])

@EnricoMi
Copy link
Collaborator

With

--conf spark.executor.resource.gpu.amount=2

your executors get two GPUs assigned.

With

--conf spark.task.resource.gpu.amount=1

each task running on those multi-GPU executors get a single GPU assigned.

Shouldn't available_devices look like ['0'] for one task and ['1'] for the other task on the same executor? Shouldn't available_devices always be length equal to spark.task.resource.gpu.amount, even if spark.task.resource.gpu.amount is larger than one?

@thinkall
Copy link
Contributor Author

This works only if GPUs are configured in EXCLUSIVE_PROCESS mode. Unfortunately, in many clusters, GPUs are too expensive to run only one process.

With

--conf spark.executor.resource.gpu.amount=2

your executors get two GPUs assigned.

With

--conf spark.task.resource.gpu.amount=1

each task running on those multi-GPU executors get a single GPU assigned.

Shouldn't available_devices look like ['0'] for one task and ['1'] for the other task on the same executor? Shouldn't available_devices always be length equal to spark.task.resource.gpu.amount, even if spark.task.resource.gpu.amount is larger than one?

@thinkall
Copy link
Contributor Author

I'd say the problem is here:

 if available_devices: 
     # if GPU-aware scheduling is available, pin to the assigned GPU index 
     return int(available_devices[0]) 

This should read

 if available_devices:
    # if GPU-aware scheduling is available, pin to one of the assigned GPUs
    if len(available_devices) >= hvd.local_size():
        # pin to GPU based on local rank index
        return int(available_devices[hvd.local_rank()])
    else:
        # pin to first available GPU
        return int(available_devices[0])

I think this solves most issues. In the beginning, I was wondering why horovod didn't consider availabel_devices could be larger than 1. So I suggested the env PR. Now, it looks like combine the two updates, horovod can work better.

Say we have 3 VMs and each have 4 GPUs. There are three kinds of spark settings.

Case 1. Each VM has 1 spark executor, and all the 4 GPUs are assinged to this executor.

Now, we want to init 4 horovod processes on this VM since it has 4 GPUs.

if len(available_devices) >= hvd.local_size():
        # pin to GPU based on local rank index
        return int(available_devices[hvd.local_rank()])

This works as available_devices will always be like ['0', '1', '2', '3']

Case 2. Each VM has 4 spark executors and each executor has been assigned with 1 GPU.

Now, we still want to init 4 horovod processes on each VM.

    else:
        # pin to first available GPU
        return int(available_devices[0])

This will work.

Case 3. Each VM has 1 spark executor but only 1 or even 0 GPU is assigned to it. (In this case, configs should be improved from the spark side, but it could happen.)

Now, we still want to init 4 horovod processes on each VM.

os.getenv('HOROVOD_USE_DEFAULT_GPU_INDEX') == '1'

This will do the trick.

@thinkall
Copy link
Contributor Author
thinkall commented Nov 18, 2022

I'd say the problem is here:

 if available_devices: 
     # if GPU-aware scheduling is available, pin to the assigned GPU index 
     return int(available_devices[0]) 

This should read

 if available_devices:
    # if GPU-aware scheduling is available, pin to one of the assigned GPUs
    if len(available_devices) >= hvd.local_size():
        # pin to GPU based on local rank index
        return int(available_devices[hvd.local_rank()])
    else:
        # pin to first available GPU
        return int(available_devices[0])

One thing I'd like to mention is that to add hvd.local_size() in _get_assigned_gpu_or_default, we'll need to update all the related codes as in below figure.

image

And the _get_assigned_gpu_or_default will be like below:

def _get_assigned_gpu_or_default(default, hvd=None):
    from horovod.spark.task import get_available_devices
    available_devices = get_available_devices()
    if available_devices:
        if hvd and len(available_devices) >= hvd.local_size():
            # if GPU-aware scheduling is available, pin to one of the assigned GPUs
            # pin to GPU based on local rank index
            return int(available_devices[hvd.local_rank()])
        else:
            # pin to first assigned GPU
            return int(available_devices[0])
    else:
        # pin to default GPU index (local rank)
        return default

@thinkall thinkall requested review from EnricoMi and removed request for Tixxx November 18, 2022 04:01
@EnricoMi
Copy link
Collaborator
EnricoMi commented Nov 18, 2022

Case 1 and 2 are understood. Case 3 looks like two cases:

Case 3.1: There is one GPU assigned to the task but you want the local rank GPU to be used, which could be a completely different GPU (not visible in Spark resources).
Case 3.2: There is no GPU assigned and you want the local rank GPU to be used.

Case 3.2 is the current behaviour, right?
Case 3.1 looks fishy, see #3737 (comment).

@thinkall
Copy link
Contributor Author

Case 1 and 2 are understood. Case 3 looks like two cases:

Case 3.1: There is one GPU assigned to the task but you want the local rank GPU to be used, which could be a completely different GPU (not visible in Spark resources). Case 3.2: There is no GPU assigned and you want the local rank GPU to be used.

Case 3.2 is the current behaviour, right? Case 3.1 looks fishy, see #3737 (comment).

Yes, Case 3.2 is the current behaviour. Case 3.1 could happen too. I agree that should be handled first by the Spark cluster side. A good thing is that our solution for Case 3.2 would also benefit Case 3.1.

@thinkall thinkall requested a review from EnricoMi November 20, 2022 05:02
@EnricoMi
Copy link
Collaborator

I am still not convinced that a wrong GPU configuration provided to Horovod on Spark should be worked around by Horovod.

@thinkall
Copy link
Contributor Author
thinkall commented Nov 21, 2022

I am still not convinced that a wrong GPU configuration provided to Horovod on Spark should be worked around by Horovod.

I agree with you, @EnricoMi , Horovod has no responsibility for working around other's issue. In this PR, changes for _get_assigned_gpu_or_default are needed for improving Horovod. So I guess your concerns are mainly for adding this HOROVOD_USE_DEFAULT_GPU_INDEX. In my view, if we provide this option in Horovod, Horovod will become more robust and flexible. Overall, I believe this one line code change is worth it. But it's your call, so if you still insist, I'll remove this env and keep other changes.

What do you think?

Thanks :-)

@thinkall
Copy link
Contributor Author
thinkall commented Dec 8, 2022

Hi @EnricoMi , may I ask for your decision again? Thanks.

@thinkall thinkall requested review from Tixxx and EnricoMi and removed request for EnricoMi and Tixxx December 30, 2022 05:16
@EnricoMi EnricoMi force-pushed the spark-estimator-multi-gpus-per-node branch from 7e338b9 to 3933d69 Compare January 12, 2023 07:27
@EnricoMi
Copy link
Collaborator
EnricoMi commented Jan 12, 2023

@thinkall sorry for the long delay. I have reviewed the changes and made the following changes:

  • reverted logic changes in _get_assigned_gpu_or_default
  • renamed HOROVOD_USE_DEFAULT_GPU_INDEX to HOROVOD_SPARK_USE_LOCAL_RANK_GPU_INDEX
  • renamed _get_assigned_gpu_or_default to _get_assigned_gpu_or_local_rank
  • moved usage of HOROVOD_SPARK_USE_LOCAL_RANK_GPU_INDEX into _get_assigned_gpu_or_local_rank
  • updated docs

Let me know what you think.

@thinkall
Copy link
Contributor Author

@thinkall sorry for the long delay. I have reviewed the changes and made the following changes:

  • reverted logic changes in _get_assigned_gpu_or_default
  • renamed HOROVOD_USE_DEFAULT_GPU_INDEX to HOROVOD_SPARK_USE_LOCAL_RANK_GPU_INDEX
  • renamed _get_assigned_gpu_or_default to _get_assigned_gpu_or_local_rank
  • moved usage of HOROVOD_SPARK_USE_LOCAL_RANK_GPU_INDEX into _get_assigned_gpu_or_local_rank
  • updated docs

Let me know what you think.

Hi @EnricoMi , no problem for the delay :-) And your changes sound great to me! Thanks.

thinkall and others added 10 commits January 13, 2023 09:41
…local rank)

Signed-off-by: Li Jiang <bnujli@gmail.com>
Signed-off-by: Li Jiang <bnujli@gmail.com>
Signed-off-by: Li Jiang <bnujli@gmail.com>
Signed-off-by: Li Jiang <bnujli@gmail.com>
…ling it"

This reverts commit 75b59c6.

Signed-off-by: Enrico Minack <github@enrico.minack.dev>
…ling it"

This reverts commit d9584b2.

Signed-off-by: Enrico Minack <github@enrico.minack.dev>
…method

Signed-off-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Enrico Minack <github@enrico.minack.dev>
@thinkall thinkall force-pushed the spark-estimator-multi-gpus-per-node branch from dfe245c to 6f8491b Compare January 13, 2023 01:42
@thinkall
Copy link
Contributor Author
thinkall commented Jan 13, 2023

Fixed DCO failure and rebased on master. @EnricoMi

@EnricoMi EnricoMi changed the title feat: support manually pin to default GPU index in spark estimators Support pinning to local rank GPU index in Spark estimators Jan 31, 2023
@EnricoMi EnricoMi merged commit ede98b8 into horovod:master Jan 31, 2023
@thinkall thinkall deleted the spark-estimator-multi-gpus-per-node branch February 2, 2023 07:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants
0