diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index bc7ef9cc..6e13264a 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -134,11 +134,32 @@ class RayGPUExecutor(DistributedGPUExecutor): worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids", use_dummy_driver=True) - node_workers = defaultdict(list) - node_gpus = defaultdict(list) + # the order in `worker_node_and_gpu_ids` does not necessarily match + # the machine boundaries. We need to make sure that workers in the + # same node are assigned consecutive ranks. + # examples: + # [('852a09a13c7503ef126d7c828454c741494b1be33a8627a5206604d9', [0]), ('dfaad7adfdae57a694cc74490db45bd112c9f31243523e43ddc2e7f0', [0]), ('dfaad7adfdae57a694cc74490db45bd112c9f31243523e43ddc2e7f0', [1]), ('dfaad7adfdae57a694cc74490db45bd112c9f31243523e43ddc2e7f0', [2]), ('dfaad7adfdae57a694cc74490db45bd112c9f31243523e43ddc2e7f0', [3]), ('852a09a13c7503ef126d7c828454c741494b1be33a8627a5206604d9', [1]), ('852a09a13c7503ef126d7c828454c741494b1be33a8627a5206604d9', [2]), ('852a09a13c7503ef126d7c828454c741494b1be33a8627a5206604d9', [3])] # noqa - for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids): - node_workers[node_id].append(i) + # initialize worker ranks with -1 (unassigned) + worker_ranks = [-1 for x in worker_node_and_gpu_ids] + current_rank = 0 + while -1 in worker_ranks: + # whenever we find an unassigned worker, find the node + index = worker_ranks.index(-1) + current_node_id = worker_node_and_gpu_ids[index][0] + # assign ranks to all workers in the same node + for i, (node_id, _) in enumerate(worker_node_and_gpu_ids): + if node_id == current_node_id: + worker_ranks[i] = current_rank + current_rank += 1 + # with the above example, worker_ranks will be [0, 4, 5, 6, 7, 1, 2, 3] + + node_workers = defaultdict(list) # node id -> list of worker ranks + node_gpus = defaultdict(list) # node id -> list of gpu ids + + for worker_rank, (node_id, gpu_ids) in zip(worker_ranks, + worker_node_and_gpu_ids): + node_workers[node_id].append(worker_rank) # `gpu_ids` can be a list of strings or integers. # convert them to integers for consistency. # NOTE: gpu_ids can be larger than 9 (e.g. 16 GPUs), @@ -184,7 +205,8 @@ class RayGPUExecutor(DistributedGPUExecutor): local_rank=node_workers[node_id].index(rank), rank=rank, distributed_init_method=distributed_init_method, - ) for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids) + ) for rank, (node_id, + _) in zip(worker_ranks, worker_node_and_gpu_ids) ] self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs)