diff --git a/picotron/checkpoint.py b/picotron/checkpoint.py index 87d8851..9481d88 100644 --- a/picotron/checkpoint.py +++ b/picotron/checkpoint.py @@ -56,7 +56,7 @@ def init_model_with_materialized_weights(model, model_config, hf_hub_safetensors initialization_manager = InitializationManager(model, model_config) layer_names = initialization_manager.get_layer_names_in_sft_format() - print(f"Rank {pgm.process_group_manager.pp_rank} responsible for layers: {len(layer_names)}") + print(f"Rank {pgm.process_group_manager.pp_rank} responsible for {len(layer_names)} layers") if len(layer_names) == 0: raise Exception("Some ranks has no layers. There are too many ranks and not enough layers to distribute.") diff --git a/picotron/data.py b/picotron/data.py index c6d4392..959fa6d 100644 --- a/picotron/data.py +++ b/picotron/data.py @@ -1,14 +1,16 @@ import torch +import torch.distributed as dist from torch.utils.data import DataLoader, DistributedSampler import numpy as np from functools import partial from datasets import Features, Sequence, Value, load_dataset from transformers import AutoTokenizer +from picotron.utils import print import picotron.process_group_manager as pgm class MicroBatchDataLoader(DataLoader): - def __init__(self, micro_batch_size, seq_length, dataset_name, tokenizer_name, num_workers, num_proc, grad_acc_steps, split="train", num_samples=None): + def __init__(self, micro_batch_size, seq_length, dataset_name, tokenizer_name, num_workers, num_proc, grad_acc_steps, device, split="train", num_samples=None): self.micro_batch_size = micro_batch_size self.seq_length = seq_length @@ -17,9 +19,20 @@ class MicroBatchDataLoader(DataLoader): self.num_global_micro_batches = self.global_batch_size // self.micro_batch_size self.seq_length_per_gpu = seq_length // pgm.process_group_manager.cp_world_size - - self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name) self.dataset = load_dataset(dataset_name, split=split) + + if pgm.process_group_manager.global_rank == 0: + print(f"rank: {pgm.process_group_manager.global_rank}: Creating tokenizer") + self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name) + objects = [self.tokenizer] + else: + print(f"rank: {pgm.process_group_manager.global_rank}: Initialized tokenizer to None") + objects = [None] + + print(f"rank: {pgm.process_group_manager.global_rank}: Broadcasting tokenizer to all ranks", is_print_rank=pgm.process_group_manager.global_rank==0) + dist.broadcast_object_list(objects, src=0, device=device) + self.tokenizer = objects[0] + if num_samples: self.dataset = self.dataset.select(range(min(num_samples, len(self.dataset)))) diff --git a/train.py b/train.py index 334a13e..bb83b98 100644 --- a/train.py +++ b/train.py @@ -108,6 +108,7 @@ if __name__ == "__main__": dataset_name=config["dataset"]["name"], tokenizer_name=config["model"]["name"], grad_acc_steps=config["training"]["gradient_accumulation_steps"], + device=device, num_workers=config["dataset"]["num_workers"], num_proc=config["dataset"]["num_proc"], num_samples=config["training"]["num_samples"]