diff --git a/bench/create_config.py b/bench/create_config.py new file mode 100644 index 0000000..829ffc9 --- /dev/null +++ b/bench/create_config.py @@ -0,0 +1,100 @@ + +""" +python create_config.py --out_dir tmp --exp_name test_2_node --tp 2 --cp 2 --pp 2 --dp 2 --model_name HuggingFaceTB/SmolLM-360M-Instruct +""" +from copy import deepcopy +from transformers import AutoConfig +import os +import shutil +import argparse +import json +from typing import Optional + +def create_single_config( + out_dir: str, + tp: int, + cp: int, + pp: int, + dp: int, + model_name: str, + num_hidden_layers: Optional[int], + num_attention_heads: Optional[int], + num_key_value_heads: Optional[int], + grad_acc: int, + mbs: int, + seq_len: int, + exp_name: str, +): + run_path = os.path.join(out_dir, exp_name) + + if not os.path.exists(out_dir): + os.makedirs(out_dir) + + with open("template/base_config.json", "r") as f: + base_config = json.load(f) + + config_content = deepcopy(base_config) + config_content["training"]["seq_length"] = seq_len + config_content["checkpoint"]["save_dir"] = run_path + + config_content["model"]["name"] = model_name + + tmp_model_config = AutoConfig.from_pretrained(model_name) + config_content["model"]["num_hidden_layers"] = tmp_model_config.num_hidden_layers if num_hidden_layers is None else num_hidden_layers + config_content["model"]["num_attention_heads"] = tmp_model_config.num_attention_heads if num_attention_heads is None else num_attention_heads + config_content["model"]["num_key_value_heads"] = tmp_model_config.num_key_value_heads if num_key_value_heads is None else num_key_value_heads + del tmp_model_config + + config_content['distributed']['tp_size'] = tp + config_content['distributed']['cp_size'] = cp + config_content['distributed']['pp_size'] = pp + config_content['distributed']['dp_size'] = dp + + gbs = dp * mbs * grad_acc + gbs_token = gbs * seq_len + print(f"Gbs_token: {gbs_token:,}, Gbs: {gbs}, dp: {dp}, seq_len: {seq_len}, grad_acc: {grad_acc}, mbs: {mbs}") + + config_content['training']['gradient_accumulation_steps'] = grad_acc + config_content['training']['micro_batch_size'] = mbs + + if os.path.exists(run_path): + shutil.rmtree(run_path) + + os.makedirs(run_path) + with open(os.path.join(run_path, "config.json"), "w") as new_config: + json.dump(config_content, new_config, indent=4) + del config_content + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--out_dir", type=str, help="Output directory to store the configs", default="tmp") + parser.add_argument("--tp", type=int, help="number of tensor parallelism", default=1) + parser.add_argument("--cp", type=int, help="number of context parallelism", default=1) + parser.add_argument("--pp", type=int, help="number of pipeline parallelism", default=1) + parser.add_argument("--dp", type=int, help="number of data parallelism", default=1) + parser.add_argument("--model_name", type=str, help="Model name to create configs for", default="HuggingFaceTB/SmolLM-360M-Instruct") + parser.add_argument("--num_hidden_layers", type=int, help="Number of hidden layers", default=None) + parser.add_argument("--num_attention_heads", type=int, help="Number of attention heads", default=None) + parser.add_argument("--num_key_value_heads", type=int, help="Number of key value heads", default=None) + parser.add_argument("--grad_acc", type=int, help="grad accumulation", default=1) + parser.add_argument("--mbs", type=int, help="micro batch size", default=1) + parser.add_argument("--seq_len", type=int, help="Sequence length", default=1024) + parser.add_argument("--exp_name", type=str, help="Experiment name", default="dummy_exp") + + args=parser.parse_args() + + create_single_config( + out_dir=args.out_dir, + tp=args.tp, + cp=args.cp, + dp=args.dp, + pp=args.pp, + model_name=args.model_name, + num_hidden_layers=args.num_hidden_layers, + num_attention_heads=args.num_attention_heads, + num_key_value_heads=args.num_key_value_heads, + grad_acc=args.grad_acc, + mbs=args.mbs, + seq_len=args.seq_len, + exp_name=args.exp_name, + ) diff --git a/bench/create_configs.py b/bench/create_configs.py deleted file mode 100644 index 037d1eb..0000000 --- a/bench/create_configs.py +++ /dev/null @@ -1,192 +0,0 @@ -from copy import deepcopy -import numpy as np -from template.template_base_configs import template_base_config -import itertools -import yaml -import os -from transformers import AutoTokenizer -import math -import shutil -import argparse - -def update_config_based_on_model(model: str, config: dict): - - # Setting num_attention_heads = num_key_value_heads for all models <=> using MHA for all layers - - if model == "small-llama": - config["model"]["model_config"]["hidden_size"] = 512 - config["model"]["model_config"]["intermediate_size"] = 1024 - config["model"]["model_config"]["num_attention_heads"] = 16 - config["model"]["model_config"]["num_hidden_layers"] = 10 - config["model"]["model_config"]["num_key_value_heads"] = 16 - config["model"]["model_config"]["max_position_embeddings"] = config["tokens"]["sequence_length"] - elif model == "llama-1M": - config["model"]["model_config"]["hidden_size"] = 768 - config["model"]["model_config"]["intermediate_size"] = 3072 - config["model"]["model_config"]["num_attention_heads"] = 16 - config["model"]["model_config"]["num_hidden_layers"] = 12 - config["model"]["model_config"]["num_key_value_heads"] = 16 - config["model"]["model_config"]["max_position_embeddings"] = config["tokens"]["sequence_length"] - elif model == "llama-1B": - # HuggingFaceFW/ablation-model-fineweb-v1 - config["model"]["model_config"]["hidden_size"] = 2048 - config["model"]["model_config"]["intermediate_size"] = 4096 - config["model"]["model_config"]["num_attention_heads"] = 32 - config["model"]["model_config"]["num_hidden_layers"] = 24 - config["model"]["model_config"]["num_key_value_heads"] = 32 - config["model"]["model_config"]["max_position_embeddings"] = config["tokens"]["sequence_length"] - - tokenizer = AutoTokenizer.from_pretrained(config["tokenizer"]["tokenizer_name_or_path"]) - config["model"]["model_config"]["vocab_size"] = tokenizer.vocab_size - -def create_single_config( - out_dir: str, - model: str, - gpus: int, - dp: int, - tp: int, - pp: int, - bapr: int, - mbs: int, - no_profiler: bool = False, - cluster: str = "hf", - exp_name: str = None, - seq_len: int = 4096, - lighteval: bool = False, - s3: bool = False, - # recompute_layer: bool = False, - dry_run: bool = False -): - - run_path = os.path.join(out_dir, exp_name) - - if not os.path.exists(out_dir): - os.makedirs(out_dir) - - print(f"Creating single config for {model} given {gpus} GPUs") - config_content = deepcopy(base_config) - config_content["tokens"]["sequence_length"] = seq_len - # config_content["parallelism"]["recompute_layer"] = recompute_layer - config_content["checkpoints"]["checkpoints_path"] = run_path - update_config_based_on_model(model, config_content) - - if cluster == "hf": - tp_max_cluster = 8 - elif cluster == "swiss-ai": - tp_max_cluster = 4 # GH200 - - config_content['parallelism']['dp'] = dp - config_content['parallelism']['tp'] = tp - config_content['parallelism']['pp'] = pp - - # Compute global batch_size and print - gbs = dp * mbs * bapr - gbs_token = gbs * seq_len - # Print in human readable format - print(f"Gbs_token: {gbs_token:,}, Gbs: {gbs}, dp: {dp}, seq_len: {seq_len}, bapr: {bapr}, mbs: {mbs}") - - config_content['tokens']['batch_accumulation_per_replica'] = bapr - config_content['tokens']['micro_batch_size'] = mbs - - # Create a directory for each combination of parallelism - # if recompute_layer: - # run_path += "_recompute_layer" - - # Get absoulte path for run_path - if no_profiler: - config_content['profiler'] = None - else: - config_content['profiler']['profiler_export_path'] = os.path.abspath(run_path) - - if s3: - config_content["general"]["is_s3_available"] = True - config_content['s3_upload'] = { - "remove_after_upload": True, - "s5cmd_concurrency": 5, - "s5cmd_numworkers": 16, - "s5cmd_path": "/fsx/elie_bakouch/miniconda3/envs/smollm/bin/s5cmd", - "upload_s3_path": f"s3://huggingface-brrr-us-east-1/fmom/nanotron_pr/{exp_name}" - } - - if lighteval: - config_content['lighteval'] = { - "batch_size": 16, - "generation": None, - "logging": { - "output_dir": None, - "public_run": False, - "push_to_hub": True, - "push_to_tensorboard": True, - "results_org": "HuggingFaceSmol", - "save_details": True, - "tensorboard_metric_prefix": "eval" - }, - "parallelism": { - "dp": dp, - "expert_parallel_size": 1, - "pp": pp, - "pp_engine": "1f1b", - "recompute_layer": False, - "tp": tp, - "tp_linear_async_communication": False, - "tp_mode": "ALL_REDUCE", - "tp_recompute_allgather": True - }, - "tasks": { - "custom_tasks": "nanotron.lighteval.evaluation_tasks", - "dataset_loading_processes": 8, - "max_samples": 1000, - "multichoice_continuations_start_space": None, - "num_fewshot_seeds": None, - "pair_wise_tokenization": False, - "tasks": "early-signal" - } - } - - if os.path.exists(run_path): - shutil.rmtree(run_path) - - if not dry_run: - os.makedirs(run_path) - with open(os.path.join(run_path, "config.yaml"), "w") as new_config: - yaml.dump(config_content, new_config, default_flow_style=False, sort_keys=False) - - del config_content - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--out_dir", type=str, help="Output directory to store the configs") - parser.add_argument("--model", type=str, help="Model to create configs for") - parser.add_argument("--gpus", type=int, help="Number of GPUs") - parser.add_argument("--dp", type=int, required=True, help="Max number of data parallelism") - parser.add_argument("--tp", type=int, required=True, help="Max number of tensor parallelism") - parser.add_argument("--pp", type=int, required=True, help="Max number of pipeline parallelism") - parser.add_argument("--bapr", type=int, help="Max batch accumulation per replica") - parser.add_argument("--mbs", type=int, help="Max micro batch size") - parser.add_argument("--seq_len", type=int, help="Sequence length", default=4096) - parser.add_argument("--exp_name", type=str, help="Experiment name") - parser.add_argument("--recompute_layer", action="store_true", help="Enable recompute allgather for tensor parallelism") - parser.add_argument("--use_async", action="store_true", help="Enable async communication for tensor parallelism") - parser.add_argument("--lighteval", action="store_true", help="Enable light evaluation") - parser.add_argument("--s3", action="store_true", help="Enable light evaluation") - - args=parser.parse_args() - - create_single_config( - out_dir=args.out_dir, - model=args.model, - gpus=args.gpus, - dp=args.dp, - tp=args.tp, - pp=args.pp, - bapr=args.bapr, - mbs=args.mbs, - cluster="hf", - exp_name=args.exp_name, - seq_len=args.seq_len, - # recompute_layer=args.recompute_layer, - lighteval=args.lighteval, - s3=args.s3, - dry_run=False, - no_profiler=True - ) diff --git a/bench/template/template_base_configs.json b/bench/template/base_config.json similarity index 93% rename from bench/template/template_base_configs.json rename to bench/template/base_config.json index f00155c..5d86fe3 100644 --- a/bench/template/template_base_configs.json +++ b/bench/template/base_config.json @@ -2,8 +2,8 @@ "distributed": { "tp_size": 1, "cp_size": 1, - "pp_size": 2, - "dp_size": 2, + "pp_size": 1, + "dp_size": 1, "master_addr": "localhost", "master_port": 29500, "backend": "nccl", @@ -22,7 +22,6 @@ "learning_rate": 3e-4, "total_train_steps": 200, "seq_length": 1024, - "local_batch_size": 64, "micro_batch_size": 32, "gradient_accumulation_steps": 1, "num_samples": 400000, diff --git a/train.py b/train.py index 0540fd7..4bb41c3 100644 --- a/train.py +++ b/train.py @@ -76,7 +76,6 @@ if __name__ == "__main__": # hyperparameters SEQ_LEN = config["training"]["seq_length"] - LOCAL_BATCH_SIZE = config["training"]["local_batch_size"] MICRO_BATCH_SIZE = config["training"]["micro_batch_size"] LEARNING_RATE = config["training"]["learning_rate"] NUM_SAMPLES = config["training"]["num_samples"] @@ -116,10 +115,6 @@ if __name__ == "__main__": setup_process_group_manager(tp_size=TP_SIZE, cp_size=CP_SIZE, pp_size=PP_SIZE, dp_size=DP_SIZE) is_wandb_rank = pgm.process_group_manager.tp_rank == 0 and pgm.process_group_manager.dp_rank == 0 and pgm.process_group_manager.cp_rank == 0 and pgm.process_group_manager.pp_is_last_stage - tokens_per_step = LOCAL_BATCH_SIZE * SEQ_LEN * GRAD_ACC * DP_SIZE - if pgm.process_group_manager.global_rank == 0: - print("Tokens per step:", to_readable_format(tokens_per_step), is_print_rank=is_wandb_rank) - set_all_seed(SEED) model_config = AutoConfig.from_pretrained(MODEL_NAME) @@ -130,13 +125,31 @@ if __name__ == "__main__": start_time = time.time() model = Llama(config=model_config) print("init model time:", time.time()-start_time, is_print_rank=is_wandb_rank) + + start_time = time.time() + data_loader = MicroBatchDataLoader( + micro_batch_size=MICRO_BATCH_SIZE, + seq_length=SEQ_LEN, + dataset_name=DATASET_NAME, + tokenizer_name=MODEL_NAME, + grad_acc=GRAD_ACC, + num_workers=NUM_WORKERS, + num_proc=NUM_PROC, + num_samples=NUM_SAMPLES + ) + print("init dataloader time:", time.time()-start_time, is_print_rank=is_wandb_rank) + tokens_per_step = data_loader.global_batch_size * SEQ_LEN + if pgm.process_group_manager.global_rank == 0: + print("Tokens per step:", to_readable_format(tokens_per_step), is_print_rank=is_wandb_rank) + if is_wandb_rank and USE_WANDB: wandb.init( project="picotron", name=f"test_convergence_GBS_{tokens_per_step}_{pgm.process_group_manager}", config={ "tensor_parallel_size": pgm.process_group_manager.tp_size, + "context_parallel_size": pgm.process_group_manager.cp_size, "pipeline_parallel_size": pgm.process_group_manager.pp_size, "data_parallel_size": pgm.process_group_manager.dp_size, "model": config["model"]["name"], @@ -144,8 +157,8 @@ if __name__ == "__main__": "max_tokens": MAX_TOKENS, "learning_rate": LEARNING_RATE, "seed": SEED, - "micro_batch_size": MICRO_BATCH_SIZE, - "global_batch_size": LOCAL_BATCH_SIZE * pgm.process_group_manager.dp_size * GRAD_ACC, + "micro_batch_size": data_loader.micro_batch_size, + "global_batch_size": data_loader.global_batch_size, "gradient_accumulation": GRAD_ACC, }, ) @@ -170,19 +183,6 @@ if __name__ == "__main__": model.train() print("model to device time:", time.time()-start_time, is_print_rank=is_wandb_rank) - start_time = time.time() - data_loader = MicroBatchDataLoader( - local_batch_size=LOCAL_BATCH_SIZE, - micro_batch_size=MICRO_BATCH_SIZE, - seq_length=SEQ_LEN, - dataset_name=DATASET_NAME, - tokenizer_name=MODEL_NAME, - grad_acc = GRAD_ACC, - num_workers=NUM_WORKERS, - num_proc=NUM_PROC, - num_samples=NUM_SAMPLES - ) - print("init dataloader time:", time.time()-start_time, is_print_rank=is_wandb_rank) tensor_shapes = (data_loader.micro_batch_size, data_loader.seq_length_per_gpu, model_config.hidden_size) optimizer = AdamW(model.parameters(), lr=LEARNING_RATE) diff --git a/utils.py b/utils.py index 874d6dd..ad4a355 100644 --- a/utils.py +++ b/utils.py @@ -75,14 +75,16 @@ def load_checkpoint(model, optimizer, out_dir): return checkpoint['trained_steps'], checkpoint['trained_tokens'] class MicroBatchDataLoader(DataLoader): - def __init__(self, local_batch_size, micro_batch_size, seq_length, dataset_name, tokenizer_name, num_workers, num_proc, grad_acc=1, split="train", num_samples=None): - self.global_batch_size = local_batch_size * pgm.process_group_manager.dp_world_size + def __init__(self, micro_batch_size, seq_length, dataset_name, tokenizer_name, num_workers, num_proc, grad_acc, split="train", num_samples=None): + self.micro_batch_size = micro_batch_size self.seq_length = seq_length - self.local_batch_size = local_batch_size + self.grad_acc = grad_acc + + self.local_batch_size = micro_batch_size * grad_acc + self.global_batch_size = self.local_batch_size * pgm.process_group_manager.dp_world_size self.num_local_micro_batches = self.local_batch_size // self.micro_batch_size self.num_global_micro_batches = self.global_batch_size // self.micro_batch_size - self.grad_acc = grad_acc self.seq_length_per_gpu = seq_length // pgm.process_group_manager.cp_world_size