From 70c232f85a9e83421a4d9ca95e6384364271f2bc Mon Sep 17 00:00:00 2001 From: youkaichao Date: Mon, 8 Jul 2024 21:31:44 -0700 Subject: [PATCH] [core][distributed] fix ray worker rank assignment (#6235) --- vllm/executor/ray_gpu_executor.py | 32 ++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index bc7ef9cc..6e13264a 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -134,11 +134,32 @@ class RayGPUExecutor(DistributedGPUExecutor): worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids", use_dummy_driver=True) - node_workers = defaultdict(list) - node_gpus = defaultdict(list) + # the order in `worker_node_and_gpu_ids` does not necessarily match + # the machine boundaries. We need to make sure that workers in the + # same node are assigned consecutive ranks. + # examples: + # [('852a09a13c7503ef126d7c828454c741494b1be33a8627a5206604d9', [0]), ('dfaad7adfdae57a694cc74490db45bd112c9f31243523e43ddc2e7f0', [0]), ('dfaad7adfdae57a694cc74490db45bd112c9f31243523e43ddc2e7f0', [1]), ('dfaad7adfdae57a694cc74490db45bd112c9f31243523e43ddc2e7f0', [2]), ('dfaad7adfdae57a694cc74490db45bd112c9f31243523e43ddc2e7f0', [3]), ('852a09a13c7503ef126d7c828454c741494b1be33a8627a5206604d9', [1]), ('852a09a13c7503ef126d7c828454c741494b1be33a8627a5206604d9', [2]), ('852a09a13c7503ef126d7c828454c741494b1be33a8627a5206604d9', [3])] # noqa - for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids): - node_workers[node_id].append(i) + # initialize worker ranks with -1 (unassigned) + worker_ranks = [-1 for x in worker_node_and_gpu_ids] + current_rank = 0 + while -1 in worker_ranks: + # whenever we find an unassigned worker, find the node + index = worker_ranks.index(-1) + current_node_id = worker_node_and_gpu_ids[index][0] + # assign ranks to all workers in the same node + for i, (node_id, _) in enumerate(worker_node_and_gpu_ids): + if node_id == current_node_id: + worker_ranks[i] = current_rank + current_rank += 1 + # with the above example, worker_ranks will be [0, 4, 5, 6, 7, 1, 2, 3] + + node_workers = defaultdict(list) # node id -> list of worker ranks + node_gpus = defaultdict(list) # node id -> list of gpu ids + + for worker_rank, (node_id, gpu_ids) in zip(worker_ranks, + worker_node_and_gpu_ids): + node_workers[node_id].append(worker_rank) # `gpu_ids` can be a list of strings or integers. # convert them to integers for consistency. # NOTE: gpu_ids can be larger than 9 (e.g. 16 GPUs), @@ -184,7 +205,8 @@ class RayGPUExecutor(DistributedGPUExecutor): local_rank=node_workers[node_id].index(rank), rank=rank, distributed_init_method=distributed_init_method, - ) for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids) + ) for rank, (node_id, + _) in zip(worker_ranks, worker_node_and_gpu_ids) ] self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs)