broadcast tokenizer to every rank as well

This commit is contained in:
ferdinand.mom 2024-12-03 14:20:44 +00:00
parent 00ddbd9d2e
commit 09dfd1676f
3 changed files with 18 additions and 4 deletions

View File

@ -56,7 +56,7 @@ def init_model_with_materialized_weights(model, model_config, hf_hub_safetensors
initialization_manager = InitializationManager(model, model_config)
layer_names = initialization_manager.get_layer_names_in_sft_format()
print(f"Rank {pgm.process_group_manager.pp_rank} responsible for layers: {len(layer_names)}")
print(f"Rank {pgm.process_group_manager.pp_rank} responsible for {len(layer_names)} layers")
if len(layer_names) == 0:
raise Exception("Some ranks has no layers. There are too many ranks and not enough layers to distribute.")

View File

@ -1,14 +1,16 @@
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader, DistributedSampler
import numpy as np
from functools import partial
from datasets import Features, Sequence, Value, load_dataset
from transformers import AutoTokenizer
from picotron.utils import print
import picotron.process_group_manager as pgm
class MicroBatchDataLoader(DataLoader):
def __init__(self, micro_batch_size, seq_length, dataset_name, tokenizer_name, num_workers, num_proc, grad_acc_steps, split="train", num_samples=None):
def __init__(self, micro_batch_size, seq_length, dataset_name, tokenizer_name, num_workers, num_proc, grad_acc_steps, device, split="train", num_samples=None):
self.micro_batch_size = micro_batch_size
self.seq_length = seq_length
@ -17,9 +19,20 @@ class MicroBatchDataLoader(DataLoader):
self.num_global_micro_batches = self.global_batch_size // self.micro_batch_size
self.seq_length_per_gpu = seq_length // pgm.process_group_manager.cp_world_size
self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
self.dataset = load_dataset(dataset_name, split=split)
if pgm.process_group_manager.global_rank == 0:
print(f"rank: {pgm.process_group_manager.global_rank}: Creating tokenizer")
self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
objects = [self.tokenizer]
else:
print(f"rank: {pgm.process_group_manager.global_rank}: Initialized tokenizer to None")
objects = [None]
print(f"rank: {pgm.process_group_manager.global_rank}: Broadcasting tokenizer to all ranks", is_print_rank=pgm.process_group_manager.global_rank==0)
dist.broadcast_object_list(objects, src=0, device=device)
self.tokenizer = objects[0]
if num_samples:
self.dataset = self.dataset.select(range(min(num_samples, len(self.dataset))))

View File

@ -108,6 +108,7 @@ if __name__ == "__main__":
dataset_name=config["dataset"]["name"],
tokenizer_name=config["model"]["name"],
grad_acc_steps=config["training"]["gradient_accumulation_steps"],
device=device,
num_workers=config["dataset"]["num_workers"],
num_proc=config["dataset"]["num_proc"],
num_samples=config["training"]["num_samples"]