better config creation

This commit is contained in:
ferdinand.mom 2024-10-30 13:53:50 +00:00
parent 402aa4ccfc
commit 1dbe034d57
5 changed files with 128 additions and 219 deletions

100
bench/create_config.py Normal file
View File

@ -0,0 +1,100 @@
"""
python create_config.py --out_dir tmp --exp_name test_2_node --tp 2 --cp 2 --pp 2 --dp 2 --model_name HuggingFaceTB/SmolLM-360M-Instruct
"""
from copy import deepcopy
from transformers import AutoConfig
import os
import shutil
import argparse
import json
from typing import Optional
def create_single_config(
out_dir: str,
tp: int,
cp: int,
pp: int,
dp: int,
model_name: str,
num_hidden_layers: Optional[int],
num_attention_heads: Optional[int],
num_key_value_heads: Optional[int],
grad_acc: int,
mbs: int,
seq_len: int,
exp_name: str,
):
run_path = os.path.join(out_dir, exp_name)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
with open("template/base_config.json", "r") as f:
base_config = json.load(f)
config_content = deepcopy(base_config)
config_content["training"]["seq_length"] = seq_len
config_content["checkpoint"]["save_dir"] = run_path
config_content["model"]["name"] = model_name
tmp_model_config = AutoConfig.from_pretrained(model_name)
config_content["model"]["num_hidden_layers"] = tmp_model_config.num_hidden_layers if num_hidden_layers is None else num_hidden_layers
config_content["model"]["num_attention_heads"] = tmp_model_config.num_attention_heads if num_attention_heads is None else num_attention_heads
config_content["model"]["num_key_value_heads"] = tmp_model_config.num_key_value_heads if num_key_value_heads is None else num_key_value_heads
del tmp_model_config
config_content['distributed']['tp_size'] = tp
config_content['distributed']['cp_size'] = cp
config_content['distributed']['pp_size'] = pp
config_content['distributed']['dp_size'] = dp
gbs = dp * mbs * grad_acc
gbs_token = gbs * seq_len
print(f"Gbs_token: {gbs_token:,}, Gbs: {gbs}, dp: {dp}, seq_len: {seq_len}, grad_acc: {grad_acc}, mbs: {mbs}")
config_content['training']['gradient_accumulation_steps'] = grad_acc
config_content['training']['micro_batch_size'] = mbs
if os.path.exists(run_path):
shutil.rmtree(run_path)
os.makedirs(run_path)
with open(os.path.join(run_path, "config.json"), "w") as new_config:
json.dump(config_content, new_config, indent=4)
del config_content
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--out_dir", type=str, help="Output directory to store the configs", default="tmp")
parser.add_argument("--tp", type=int, help="number of tensor parallelism", default=1)
parser.add_argument("--cp", type=int, help="number of context parallelism", default=1)
parser.add_argument("--pp", type=int, help="number of pipeline parallelism", default=1)
parser.add_argument("--dp", type=int, help="number of data parallelism", default=1)
parser.add_argument("--model_name", type=str, help="Model name to create configs for", default="HuggingFaceTB/SmolLM-360M-Instruct")
parser.add_argument("--num_hidden_layers", type=int, help="Number of hidden layers", default=None)
parser.add_argument("--num_attention_heads", type=int, help="Number of attention heads", default=None)
parser.add_argument("--num_key_value_heads", type=int, help="Number of key value heads", default=None)
parser.add_argument("--grad_acc", type=int, help="grad accumulation", default=1)
parser.add_argument("--mbs", type=int, help="micro batch size", default=1)
parser.add_argument("--seq_len", type=int, help="Sequence length", default=1024)
parser.add_argument("--exp_name", type=str, help="Experiment name", default="dummy_exp")
args=parser.parse_args()
create_single_config(
out_dir=args.out_dir,
tp=args.tp,
cp=args.cp,
dp=args.dp,
pp=args.pp,
model_name=args.model_name,
num_hidden_layers=args.num_hidden_layers,
num_attention_heads=args.num_attention_heads,
num_key_value_heads=args.num_key_value_heads,
grad_acc=args.grad_acc,
mbs=args.mbs,
seq_len=args.seq_len,
exp_name=args.exp_name,
)

View File

@ -1,192 +0,0 @@
from copy import deepcopy
import numpy as np
from template.template_base_configs import template_base_config
import itertools
import yaml
import os
from transformers import AutoTokenizer
import math
import shutil
import argparse
def update_config_based_on_model(model: str, config: dict):
# Setting num_attention_heads = num_key_value_heads for all models <=> using MHA for all layers
if model == "small-llama":
config["model"]["model_config"]["hidden_size"] = 512
config["model"]["model_config"]["intermediate_size"] = 1024
config["model"]["model_config"]["num_attention_heads"] = 16
config["model"]["model_config"]["num_hidden_layers"] = 10
config["model"]["model_config"]["num_key_value_heads"] = 16
config["model"]["model_config"]["max_position_embeddings"] = config["tokens"]["sequence_length"]
elif model == "llama-1M":
config["model"]["model_config"]["hidden_size"] = 768
config["model"]["model_config"]["intermediate_size"] = 3072
config["model"]["model_config"]["num_attention_heads"] = 16
config["model"]["model_config"]["num_hidden_layers"] = 12
config["model"]["model_config"]["num_key_value_heads"] = 16
config["model"]["model_config"]["max_position_embeddings"] = config["tokens"]["sequence_length"]
elif model == "llama-1B":
# HuggingFaceFW/ablation-model-fineweb-v1
config["model"]["model_config"]["hidden_size"] = 2048
config["model"]["model_config"]["intermediate_size"] = 4096
config["model"]["model_config"]["num_attention_heads"] = 32
config["model"]["model_config"]["num_hidden_layers"] = 24
config["model"]["model_config"]["num_key_value_heads"] = 32
config["model"]["model_config"]["max_position_embeddings"] = config["tokens"]["sequence_length"]
tokenizer = AutoTokenizer.from_pretrained(config["tokenizer"]["tokenizer_name_or_path"])
config["model"]["model_config"]["vocab_size"] = tokenizer.vocab_size
def create_single_config(
out_dir: str,
model: str,
gpus: int,
dp: int,
tp: int,
pp: int,
bapr: int,
mbs: int,
no_profiler: bool = False,
cluster: str = "hf",
exp_name: str = None,
seq_len: int = 4096,
lighteval: bool = False,
s3: bool = False,
# recompute_layer: bool = False,
dry_run: bool = False
):
run_path = os.path.join(out_dir, exp_name)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
print(f"Creating single config for {model} given {gpus} GPUs")
config_content = deepcopy(base_config)
config_content["tokens"]["sequence_length"] = seq_len
# config_content["parallelism"]["recompute_layer"] = recompute_layer
config_content["checkpoints"]["checkpoints_path"] = run_path
update_config_based_on_model(model, config_content)
if cluster == "hf":
tp_max_cluster = 8
elif cluster == "swiss-ai":
tp_max_cluster = 4 # GH200
config_content['parallelism']['dp'] = dp
config_content['parallelism']['tp'] = tp
config_content['parallelism']['pp'] = pp
# Compute global batch_size and print
gbs = dp * mbs * bapr
gbs_token = gbs * seq_len
# Print in human readable format
print(f"Gbs_token: {gbs_token:,}, Gbs: {gbs}, dp: {dp}, seq_len: {seq_len}, bapr: {bapr}, mbs: {mbs}")
config_content['tokens']['batch_accumulation_per_replica'] = bapr
config_content['tokens']['micro_batch_size'] = mbs
# Create a directory for each combination of parallelism
# if recompute_layer:
# run_path += "_recompute_layer"
# Get absoulte path for run_path
if no_profiler:
config_content['profiler'] = None
else:
config_content['profiler']['profiler_export_path'] = os.path.abspath(run_path)
if s3:
config_content["general"]["is_s3_available"] = True
config_content['s3_upload'] = {
"remove_after_upload": True,
"s5cmd_concurrency": 5,
"s5cmd_numworkers": 16,
"s5cmd_path": "/fsx/elie_bakouch/miniconda3/envs/smollm/bin/s5cmd",
"upload_s3_path": f"s3://huggingface-brrr-us-east-1/fmom/nanotron_pr/{exp_name}"
}
if lighteval:
config_content['lighteval'] = {
"batch_size": 16,
"generation": None,
"logging": {
"output_dir": None,
"public_run": False,
"push_to_hub": True,
"push_to_tensorboard": True,
"results_org": "HuggingFaceSmol",
"save_details": True,
"tensorboard_metric_prefix": "eval"
},
"parallelism": {
"dp": dp,
"expert_parallel_size": 1,
"pp": pp,
"pp_engine": "1f1b",
"recompute_layer": False,
"tp": tp,
"tp_linear_async_communication": False,
"tp_mode": "ALL_REDUCE",
"tp_recompute_allgather": True
},
"tasks": {
"custom_tasks": "nanotron.lighteval.evaluation_tasks",
"dataset_loading_processes": 8,
"max_samples": 1000,
"multichoice_continuations_start_space": None,
"num_fewshot_seeds": None,
"pair_wise_tokenization": False,
"tasks": "early-signal"
}
}
if os.path.exists(run_path):
shutil.rmtree(run_path)
if not dry_run:
os.makedirs(run_path)
with open(os.path.join(run_path, "config.yaml"), "w") as new_config:
yaml.dump(config_content, new_config, default_flow_style=False, sort_keys=False)
del config_content
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--out_dir", type=str, help="Output directory to store the configs")
parser.add_argument("--model", type=str, help="Model to create configs for")
parser.add_argument("--gpus", type=int, help="Number of GPUs")
parser.add_argument("--dp", type=int, required=True, help="Max number of data parallelism")
parser.add_argument("--tp", type=int, required=True, help="Max number of tensor parallelism")
parser.add_argument("--pp", type=int, required=True, help="Max number of pipeline parallelism")
parser.add_argument("--bapr", type=int, help="Max batch accumulation per replica")
parser.add_argument("--mbs", type=int, help="Max micro batch size")
parser.add_argument("--seq_len", type=int, help="Sequence length", default=4096)
parser.add_argument("--exp_name", type=str, help="Experiment name")
parser.add_argument("--recompute_layer", action="store_true", help="Enable recompute allgather for tensor parallelism")
parser.add_argument("--use_async", action="store_true", help="Enable async communication for tensor parallelism")
parser.add_argument("--lighteval", action="store_true", help="Enable light evaluation")
parser.add_argument("--s3", action="store_true", help="Enable light evaluation")
args=parser.parse_args()
create_single_config(
out_dir=args.out_dir,
model=args.model,
gpus=args.gpus,
dp=args.dp,
tp=args.tp,
pp=args.pp,
bapr=args.bapr,
mbs=args.mbs,
cluster="hf",
exp_name=args.exp_name,
seq_len=args.seq_len,
# recompute_layer=args.recompute_layer,
lighteval=args.lighteval,
s3=args.s3,
dry_run=False,
no_profiler=True
)

View File

@ -2,8 +2,8 @@
"distributed": {
"tp_size": 1,
"cp_size": 1,
"pp_size": 2,
"dp_size": 2,
"pp_size": 1,
"dp_size": 1,
"master_addr": "localhost",
"master_port": 29500,
"backend": "nccl",
@ -22,7 +22,6 @@
"learning_rate": 3e-4,
"total_train_steps": 200,
"seq_length": 1024,
"local_batch_size": 64,
"micro_batch_size": 32,
"gradient_accumulation_steps": 1,
"num_samples": 400000,

View File

@ -76,7 +76,6 @@ if __name__ == "__main__":
# hyperparameters
SEQ_LEN = config["training"]["seq_length"]
LOCAL_BATCH_SIZE = config["training"]["local_batch_size"]
MICRO_BATCH_SIZE = config["training"]["micro_batch_size"]
LEARNING_RATE = config["training"]["learning_rate"]
NUM_SAMPLES = config["training"]["num_samples"]
@ -116,10 +115,6 @@ if __name__ == "__main__":
setup_process_group_manager(tp_size=TP_SIZE, cp_size=CP_SIZE, pp_size=PP_SIZE, dp_size=DP_SIZE)
is_wandb_rank = pgm.process_group_manager.tp_rank == 0 and pgm.process_group_manager.dp_rank == 0 and pgm.process_group_manager.cp_rank == 0 and pgm.process_group_manager.pp_is_last_stage
tokens_per_step = LOCAL_BATCH_SIZE * SEQ_LEN * GRAD_ACC * DP_SIZE
if pgm.process_group_manager.global_rank == 0:
print("Tokens per step:", to_readable_format(tokens_per_step), is_print_rank=is_wandb_rank)
set_all_seed(SEED)
model_config = AutoConfig.from_pretrained(MODEL_NAME)
@ -130,13 +125,31 @@ if __name__ == "__main__":
start_time = time.time()
model = Llama(config=model_config)
print("init model time:", time.time()-start_time, is_print_rank=is_wandb_rank)
start_time = time.time()
data_loader = MicroBatchDataLoader(
micro_batch_size=MICRO_BATCH_SIZE,
seq_length=SEQ_LEN,
dataset_name=DATASET_NAME,
tokenizer_name=MODEL_NAME,
grad_acc=GRAD_ACC,
num_workers=NUM_WORKERS,
num_proc=NUM_PROC,
num_samples=NUM_SAMPLES
)
print("init dataloader time:", time.time()-start_time, is_print_rank=is_wandb_rank)
tokens_per_step = data_loader.global_batch_size * SEQ_LEN
if pgm.process_group_manager.global_rank == 0:
print("Tokens per step:", to_readable_format(tokens_per_step), is_print_rank=is_wandb_rank)
if is_wandb_rank and USE_WANDB:
wandb.init(
project="picotron",
name=f"test_convergence_GBS_{tokens_per_step}_{pgm.process_group_manager}",
config={
"tensor_parallel_size": pgm.process_group_manager.tp_size,
"context_parallel_size": pgm.process_group_manager.cp_size,
"pipeline_parallel_size": pgm.process_group_manager.pp_size,
"data_parallel_size": pgm.process_group_manager.dp_size,
"model": config["model"]["name"],
@ -144,8 +157,8 @@ if __name__ == "__main__":
"max_tokens": MAX_TOKENS,
"learning_rate": LEARNING_RATE,
"seed": SEED,
"micro_batch_size": MICRO_BATCH_SIZE,
"global_batch_size": LOCAL_BATCH_SIZE * pgm.process_group_manager.dp_size * GRAD_ACC,
"micro_batch_size": data_loader.micro_batch_size,
"global_batch_size": data_loader.global_batch_size,
"gradient_accumulation": GRAD_ACC,
},
)
@ -170,19 +183,6 @@ if __name__ == "__main__":
model.train()
print("model to device time:", time.time()-start_time, is_print_rank=is_wandb_rank)
start_time = time.time()
data_loader = MicroBatchDataLoader(
local_batch_size=LOCAL_BATCH_SIZE,
micro_batch_size=MICRO_BATCH_SIZE,
seq_length=SEQ_LEN,
dataset_name=DATASET_NAME,
tokenizer_name=MODEL_NAME,
grad_acc = GRAD_ACC,
num_workers=NUM_WORKERS,
num_proc=NUM_PROC,
num_samples=NUM_SAMPLES
)
print("init dataloader time:", time.time()-start_time, is_print_rank=is_wandb_rank)
tensor_shapes = (data_loader.micro_batch_size, data_loader.seq_length_per_gpu, model_config.hidden_size)
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)

View File

@ -75,14 +75,16 @@ def load_checkpoint(model, optimizer, out_dir):
return checkpoint['trained_steps'], checkpoint['trained_tokens']
class MicroBatchDataLoader(DataLoader):
def __init__(self, local_batch_size, micro_batch_size, seq_length, dataset_name, tokenizer_name, num_workers, num_proc, grad_acc=1, split="train", num_samples=None):
self.global_batch_size = local_batch_size * pgm.process_group_manager.dp_world_size
def __init__(self, micro_batch_size, seq_length, dataset_name, tokenizer_name, num_workers, num_proc, grad_acc, split="train", num_samples=None):
self.micro_batch_size = micro_batch_size
self.seq_length = seq_length
self.local_batch_size = local_batch_size
self.grad_acc = grad_acc
self.local_batch_size = micro_batch_size * grad_acc
self.global_batch_size = self.local_batch_size * pgm.process_group_manager.dp_world_size
self.num_local_micro_batches = self.local_batch_size // self.micro_batch_size
self.num_global_micro_batches = self.global_batch_size // self.micro_batch_size
self.grad_acc = grad_acc
self.seq_length_per_gpu = seq_length // pgm.process_group_manager.cp_world_size