diff --git a/bench/check_status.sh b/bench/check_status.sh new file mode 100755 index 0000000..bfaa826 --- /dev/null +++ b/bench/check_status.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +# Initialize counters +declare -A counts +statuses=("init" "pending" "running" "fail" "oom" "timeout" "completed") +for status in "${statuses[@]}"; do + counts[$status]=0 +done + +# Find and process all status.txt files +while IFS= read -r -d '' file; do + status=$(cat "$file" | tr -d '[:space:]') + + if [[ " ${statuses[@]} " =~ " ${status} " ]]; then + ((counts[$status]++)) + fi +done < <(find "$1" -name "status.txt" -print0) + +# Calculate total +total=0 +for count in "${counts[@]}"; do + ((total += count)) +done + +# Print the results +echo "Status | Count" +echo "-----------|---------" +for status in "${statuses[@]}"; do + printf "%-10s | %d\n" "$status" "${counts[$status]}" +done +echo "-----------|---------" +echo "Total | $total" \ No newline at end of file diff --git a/bench/create_configs.py b/bench/create_configs.py new file mode 100644 index 0000000..037d1eb --- /dev/null +++ b/bench/create_configs.py @@ -0,0 +1,192 @@ +from copy import deepcopy +import numpy as np +from template.template_base_configs import template_base_config +import itertools +import yaml +import os +from transformers import AutoTokenizer +import math +import shutil +import argparse + +def update_config_based_on_model(model: str, config: dict): + + # Setting num_attention_heads = num_key_value_heads for all models <=> using MHA for all layers + + if model == "small-llama": + config["model"]["model_config"]["hidden_size"] = 512 + config["model"]["model_config"]["intermediate_size"] = 1024 + config["model"]["model_config"]["num_attention_heads"] = 16 + config["model"]["model_config"]["num_hidden_layers"] = 10 + config["model"]["model_config"]["num_key_value_heads"] = 16 + config["model"]["model_config"]["max_position_embeddings"] = config["tokens"]["sequence_length"] + elif model == "llama-1M": + config["model"]["model_config"]["hidden_size"] = 768 + config["model"]["model_config"]["intermediate_size"] = 3072 + config["model"]["model_config"]["num_attention_heads"] = 16 + config["model"]["model_config"]["num_hidden_layers"] = 12 + config["model"]["model_config"]["num_key_value_heads"] = 16 + config["model"]["model_config"]["max_position_embeddings"] = config["tokens"]["sequence_length"] + elif model == "llama-1B": + # HuggingFaceFW/ablation-model-fineweb-v1 + config["model"]["model_config"]["hidden_size"] = 2048 + config["model"]["model_config"]["intermediate_size"] = 4096 + config["model"]["model_config"]["num_attention_heads"] = 32 + config["model"]["model_config"]["num_hidden_layers"] = 24 + config["model"]["model_config"]["num_key_value_heads"] = 32 + config["model"]["model_config"]["max_position_embeddings"] = config["tokens"]["sequence_length"] + + tokenizer = AutoTokenizer.from_pretrained(config["tokenizer"]["tokenizer_name_or_path"]) + config["model"]["model_config"]["vocab_size"] = tokenizer.vocab_size + +def create_single_config( + out_dir: str, + model: str, + gpus: int, + dp: int, + tp: int, + pp: int, + bapr: int, + mbs: int, + no_profiler: bool = False, + cluster: str = "hf", + exp_name: str = None, + seq_len: int = 4096, + lighteval: bool = False, + s3: bool = False, + # recompute_layer: bool = False, + dry_run: bool = False +): + + run_path = os.path.join(out_dir, exp_name) + + if not os.path.exists(out_dir): + os.makedirs(out_dir) + + print(f"Creating single config for {model} given {gpus} GPUs") + config_content = deepcopy(base_config) + config_content["tokens"]["sequence_length"] = seq_len + # config_content["parallelism"]["recompute_layer"] = recompute_layer + config_content["checkpoints"]["checkpoints_path"] = run_path + update_config_based_on_model(model, config_content) + + if cluster == "hf": + tp_max_cluster = 8 + elif cluster == "swiss-ai": + tp_max_cluster = 4 # GH200 + + config_content['parallelism']['dp'] = dp + config_content['parallelism']['tp'] = tp + config_content['parallelism']['pp'] = pp + + # Compute global batch_size and print + gbs = dp * mbs * bapr + gbs_token = gbs * seq_len + # Print in human readable format + print(f"Gbs_token: {gbs_token:,}, Gbs: {gbs}, dp: {dp}, seq_len: {seq_len}, bapr: {bapr}, mbs: {mbs}") + + config_content['tokens']['batch_accumulation_per_replica'] = bapr + config_content['tokens']['micro_batch_size'] = mbs + + # Create a directory for each combination of parallelism + # if recompute_layer: + # run_path += "_recompute_layer" + + # Get absoulte path for run_path + if no_profiler: + config_content['profiler'] = None + else: + config_content['profiler']['profiler_export_path'] = os.path.abspath(run_path) + + if s3: + config_content["general"]["is_s3_available"] = True + config_content['s3_upload'] = { + "remove_after_upload": True, + "s5cmd_concurrency": 5, + "s5cmd_numworkers": 16, + "s5cmd_path": "/fsx/elie_bakouch/miniconda3/envs/smollm/bin/s5cmd", + "upload_s3_path": f"s3://huggingface-brrr-us-east-1/fmom/nanotron_pr/{exp_name}" + } + + if lighteval: + config_content['lighteval'] = { + "batch_size": 16, + "generation": None, + "logging": { + "output_dir": None, + "public_run": False, + "push_to_hub": True, + "push_to_tensorboard": True, + "results_org": "HuggingFaceSmol", + "save_details": True, + "tensorboard_metric_prefix": "eval" + }, + "parallelism": { + "dp": dp, + "expert_parallel_size": 1, + "pp": pp, + "pp_engine": "1f1b", + "recompute_layer": False, + "tp": tp, + "tp_linear_async_communication": False, + "tp_mode": "ALL_REDUCE", + "tp_recompute_allgather": True + }, + "tasks": { + "custom_tasks": "nanotron.lighteval.evaluation_tasks", + "dataset_loading_processes": 8, + "max_samples": 1000, + "multichoice_continuations_start_space": None, + "num_fewshot_seeds": None, + "pair_wise_tokenization": False, + "tasks": "early-signal" + } + } + + if os.path.exists(run_path): + shutil.rmtree(run_path) + + if not dry_run: + os.makedirs(run_path) + with open(os.path.join(run_path, "config.yaml"), "w") as new_config: + yaml.dump(config_content, new_config, default_flow_style=False, sort_keys=False) + + del config_content + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--out_dir", type=str, help="Output directory to store the configs") + parser.add_argument("--model", type=str, help="Model to create configs for") + parser.add_argument("--gpus", type=int, help="Number of GPUs") + parser.add_argument("--dp", type=int, required=True, help="Max number of data parallelism") + parser.add_argument("--tp", type=int, required=True, help="Max number of tensor parallelism") + parser.add_argument("--pp", type=int, required=True, help="Max number of pipeline parallelism") + parser.add_argument("--bapr", type=int, help="Max batch accumulation per replica") + parser.add_argument("--mbs", type=int, help="Max micro batch size") + parser.add_argument("--seq_len", type=int, help="Sequence length", default=4096) + parser.add_argument("--exp_name", type=str, help="Experiment name") + parser.add_argument("--recompute_layer", action="store_true", help="Enable recompute allgather for tensor parallelism") + parser.add_argument("--use_async", action="store_true", help="Enable async communication for tensor parallelism") + parser.add_argument("--lighteval", action="store_true", help="Enable light evaluation") + parser.add_argument("--s3", action="store_true", help="Enable light evaluation") + + args=parser.parse_args() + + create_single_config( + out_dir=args.out_dir, + model=args.model, + gpus=args.gpus, + dp=args.dp, + tp=args.tp, + pp=args.pp, + bapr=args.bapr, + mbs=args.mbs, + cluster="hf", + exp_name=args.exp_name, + seq_len=args.seq_len, + # recompute_layer=args.recompute_layer, + lighteval=args.lighteval, + s3=args.s3, + dry_run=False, + no_profiler=True + ) diff --git a/bench/submit_jobs.py b/bench/submit_jobs.py new file mode 100644 index 0000000..586833c --- /dev/null +++ b/bench/submit_jobs.py @@ -0,0 +1,232 @@ +from enum import Enum +import os +from jinja2 import Template +import subprocess +import yaml +from typing import List + +class Status(Enum): + # INIT -> PENDING -> [RUNNING | FAIL | TIMEOUT OOM] -> COMPLETED + INIT = "init" # Job is created + PENDING = "pending" # Job is waiting for ressources + RUNNING = "running" # Job is running + FAIL = "fail" # Job failed + OOM = "oom" # Job failed due to out of memory (expected behavior) + TIMEOUT = "timeout" # Job failed due to timeout + COMPLETED = "completed" # Job is completed + +class Job: + def __init__(self, root_path: str, qos: str) -> None: + self.root_path = root_path + self.name = os.path.basename(root_path) + self.config = os.path.join(root_path, "config.yaml") + self.qos = qos + + # Check if the status.txt file exists + status_file_path = os.path.join(self.root_path, "status.txt") + if not os.path.exists(status_file_path): + # Create the status.txt file with INIT status + with open(status_file_path, 'w') as f: + f.write(Status.INIT.value) + self.status = self.get_status() + + def get_status(self) -> Status: + """ + Read the status of the job from `status.txt` and return it + """ + is_existing = lambda value_to_check: any(value.value == value_to_check for value in Status.__members__.values()) + + status_file_path = os.path.join(self.root_path, "status.txt") + with open(status_file_path, 'r') as f: + status = f.read() + if not is_existing(status): + raise ValueError("Invalid status") + return Status(status) + + def set_status(self, status: Status) -> Status: + """ + Update the status of the job in `status.txt` and return the new status + """ + status_file_path = os.path.join(self.root_path, "status.txt") + with open(status_file_path, 'w') as f: + f.write(status.value) + return status + +class Scheduler: + + def __init__(self, inp_dir: str, qos: str) -> None: + jobs_directory_paths = [os.path.abspath(root) for root, dirs, _ in os.walk(inp_dir) if not dirs] + jobs_directory_paths = [job_path.replace("/profiler", "") if "profiler" in job_path else job_path for job_path in jobs_directory_paths] + self.job_lists = [Job(job_path, qos) for job_path in jobs_directory_paths] + + def keep_only_jobs(self, status: Status): + return [job for job in self.job_lists if job.status == status] + + def filter_out_jobs(self, status: Status): + return [job for job in self.job_lists if job.status != status] + + def create_slurm_script(self, job: Job, cluster: str): + # Submit job to the cluster (edit jinja) + # load yaml config.yaml + with open(job.config, 'r') as file: + config = yaml.load(file, Loader=yaml.FullLoader) + + if cluster == "hf": + max_nodes = 8 + elif cluster == "swiss-ai": + max_nodes = 4 + else: + raise ValueError("Invalid cluster") + + # Pick the right number of nodes and n_proc_per_node + world_size = config['parallelism']['pp'] * config['parallelism']['dp'] * config['parallelism']['tp'] + assert world_size <= max_nodes or world_size % max_nodes == 0 + nodes = max(1, world_size // max_nodes) + n_proc_per_node = min(8, world_size // nodes) + assert nodes * n_proc_per_node == world_size + + target_path_hf_hub = os.path.join(os.path.basename(os.path.dirname(os.path.dirname(job.root_path))), os.path.basename(os.path.dirname(job.root_path)), os.path.basename(job.root_path)) + + context_bench = { + 'nodes': nodes, + 'n_proc_per_node': n_proc_per_node, + 'root_path': job.root_path, + 'target_path_hf_hub': target_path_hf_hub, + "config": job.config, + "qos": job.qos, + } + + #TODO: don't hardcode the base_bench.slurm path. Should be #HOME/bench_cluster/template/base_bench.slurm + if cluster == "hf": + base_path = "/fsx/ferdinandmom/ferdinand-hf/nanotron/debug/template/base_bench.slurm" + else: + raise ValueError("Invalid cluster") + + with open(base_path, 'r') as file: + base_bench_file = file.read() + + base_bench_template = Template(base_bench_file) + + # Write the rendered script to a new file located at the job root_path + output_file_path = os.path.join(job.root_path, "bench.slurm") + with open(output_file_path, 'w') as file: + file.write(base_bench_template.render(context_bench)) + + print(f"Slurm script created at {output_file_path}") + + def launch_dependency(self, job_array: List[Job], env_vars): + + prev_job_id = None + for job in job_array: + if prev_job_id is None: + result = subprocess.run(["sbatch", '--parsable', os.path.join(job.root_path, "bench.slurm")], env=env_vars, capture_output=True, text=True) + else: + result = subprocess.run(["sbatch", '--parsable', '--dependency=afterany:'+prev_job_id, os.path.join(job.root_path, "bench.slurm")], env=env_vars, capture_output=True, text=True) + job.set_status(Status.PENDING) + prev_job_id = result.stdout.strip() + + + def check_status(self): + # find all status files using self.jobs_directory_paths + status_files = [os.path.join(job.root_path, "status.txt") for job in self.job_lists] + + status_counts = { + "init": 0, + "pending": 0, + "running": 0, + "fail": 0, + "oom": 0, + "timeout": 0, + "completed": 0 + } + + for status_file in status_files: + with open(status_file, 'r') as f: + status = f.read().strip() + if status in status_counts: + status_counts[status] += 1 + else: + raise ValueError(f"Invalid status: {status}") + + total = sum(status_counts.values()) + + # Print the status counts in a formatted table + print(f"{'Status':<10} | {'Count':<6}") + print(f"{'-'*10}-|-{'-'*6}") + for status, count in status_counts.items(): + print(f"{status.capitalize():<10} | {count:<6}") + + print(f"{'-'*10}-|-{'-'*6}") + print(f"{'Total':<10} | {total:<6}") + +def submit_jobs(inp_dir, qos, nb_slurm_array, cluster: str, only: str = None): + scheduler = Scheduler(inp_dir, qos) + + #TODO: batch into job arrays + env_vars = os.environ.copy() + total_jobs = len(scheduler.job_lists) + + if only == "fail": + scheduler.job_lists = scheduler.keep_only_jobs(Status.FAIL) + elif only == "pending": + scheduler.job_lists = scheduler.keep_only_jobs(Status.PENDING) + elif only == "timeout": + scheduler.job_lists = scheduler.keep_only_jobs(Status.TIMEOUT) + elif only == "running": + scheduler.job_lists = scheduler.keep_only_jobs(Status.RUNNING) + + if only is not None: + filtered_jobs = len(scheduler.job_lists) + if filtered_jobs == 0: + print(f"No '{only}' jobs to resubmit") + return + print(f"Only {filtered_jobs}/{total_jobs} jobs with status '{only}' will be resubmitted") + + scheduler.job_lists = scheduler.filter_out_jobs(Status.COMPLETED) + + if nb_slurm_array > 0: + # Use job dependecies + + # Distribute the jobs into the arrays + base_jobs_per_array = len(scheduler.job_lists) // nb_slurm_array + extra_jobs = len(scheduler.job_lists) % nb_slurm_array + distribution = [base_jobs_per_array] * nb_slurm_array + for i in range(extra_jobs): + distribution[i] += 1 + + start = 0 + + for i, nb_jobs in enumerate(distribution): + previous_job_id = None + end = start + nb_jobs + job_array = scheduler.job_lists[start:end] + + print(f"Launching job Dependency array {i+1} with {nb_jobs} jobs") + + for job in job_array: + scheduler.create_slurm_script(job, cluster) + + scheduler.launch_dependency(job_array, env_vars) + + start = end + else: + # Don't use job dependecies + for job in scheduler.job_lists: + scheduler.create_slurm_script(job, cluster) + print(os.path.join(job.root_path, "bench.slurm")) + subprocess.run(["sbatch", os.path.join(job.root_path, "bench.slurm")], env=env_vars) + job.set_status(Status.PENDING) + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description='Submit jobs to the cluster') + parser.add_argument('--inp_dir', type=str, help='Input directory containing the jobs') + parser.add_argument('--qos', type=str, help='QOS of the jobs') + parser.add_argument('--nb_slurm_array', type=int, default=0, help='Number of slurm arrays') + parser.add_argument('--cluster', type=str, default='hf', help='Cluster to submit the jobs') + parser.add_argument('--only', type=str, default=None, help='Filter the jobs to submit') + + args = parser.parse_args() + + submit_jobs(args.inp_dir, args.qos, args.nb_slurm_array, cluster=args.cluster, only=args.only) diff --git a/bench/template/base_bench.slurm b/bench/template/base_bench.slurm new file mode 100644 index 0000000..918902c --- /dev/null +++ b/bench/template/base_bench.slurm @@ -0,0 +1,91 @@ +#!/bin/bash + +#SBATCH --job-name=bench-picotron +#SBATCH --time=00:30:00 +#SBATCH --partition=hopper-prod +#SBATCH --nodes={{ nodes }} +#SBATCH --gres=gpu:{{ n_proc_per_node }} +#SBATCH --qos={{ qos }} +#SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task=96 +#SBATCH --exclusive +#SBATCH --output={{ root_path }}/log_%j.out +#SBATCH --error={{ root_path }}/log_%j.out + +# Function to update status based on squeue output +update_status() { + job_id=$1 + status_file=$2 + # For unknown reasons, it doenst update status for pending. It only works for running + while true; do + job_status=$(squeue --job $job_id --noheader --format=%T) + echo "Job status: $job_status" + if [ -z "$job_status" ]; then + # Job has finished or is not found + break + elif [ "$job_status" = "RUNNING" ]; then + printf "running" > $status_file + break + fi + sleep 10 + done +} + +# Misc initializations. +echo "========================" +echo "START TIME: $(date)" +source /etc/profile.d/modules.sh +source /fsx/ferdinandmom/miniforge3/etc/profile.d/conda.sh +conda activate /fsx/ferdinandmom/miniforge3/envs/env-picotron +echo python3 version = $(python3 --version) +echo "========================" + +# Slurm stuff +export HOSTNAMES=$(scontrol show hostnames "$SLURM_JOB_NODELIST") +export MASTER_ADDR=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1) +export MASTER_PORT=$((1024 + RANDOM % 64511)) + +export TMPDIR=/scratch +export TORCH_HOME=/fsx/$USER/.cache/torch +export HF_HOME=/fsx/$USER/.cache/huggingface +export WANDB_DIR=/fsx/$USER/.cache/wandb +export CUBLAS_WORKSPACE_CONFIG=":4096:8" +export CUDA_DEVICE_MAX_CONNECTIONS="1" + +module load cuda/12.1 +huggingface-cli login --token $HUGGINGFACE_TOKEN + +GIT_REPO="/fsx/ferdinandmom/ferdinand-hf/picotron/" +CMD="$GIT_REPO/run_train.py --config-path {{ config }} --logs-path {{ root_path }} --run output --slurm --nodes {{ nodes }}" + +LAUNCHER="python" + +# Checkout the bench_cluster branch +cd $GIT_REPO +# Get the current job ID +job_id=${SLURM_JOB_ID} + +# Update status to "pending" or "running" in the background +update_status $job_id {{ root_path }}/status.txt & + +# Run the main command +echo "Running command: $CMD" +srun -u $LAUNCHER $CMD +exit_status=$? + +job_id=$SLURM_JOB_ID + +# Update status based on the exit status of `srun` +if [ $exit_status -eq 0 ]; then + printf "completed" > {{ root_path }}/status.txt +else + if grep -q "OutOfMemoryError" {{ root_path }}/log_${job_id}.out; then + printf "oom" > {{ root_path }}/status.txt + elif grep -q " CUDA error: an illegal memory access" {{ root_path }}/log_${job_id}.out; then + printf "oom" > {{ root_path }}/status.txt + elif grep -q "Timeout" {{ root_path }}/log_${job_id}.out; then + printf "timeout" > {{ root_path }}/status.txt + else + printf "fail" > {{ root_path }}/status.txt + fi +fi \ No newline at end of file diff --git a/bench/template/template_base_configs.json b/bench/template/template_base_configs.json new file mode 100644 index 0000000..f00155c --- /dev/null +++ b/bench/template/template_base_configs.json @@ -0,0 +1,51 @@ +{ + "distributed": { + "tp_size": 1, + "cp_size": 1, + "pp_size": 2, + "dp_size": 2, + "master_addr": "localhost", + "master_port": 29500, + "backend": "nccl", + "use_cpu": false + }, + "model": { + "name": "HuggingFaceTB/SmolLM-360M-Instruct", + "num_hidden_layers": 16, + "num_attention_heads": 16, + "num_key_value_heads": 4, + "dtype": "bfloat16", + "use_flash_attention": true + }, + "training": { + "seed": 42, + "learning_rate": 3e-4, + "total_train_steps": 200, + "seq_length": 1024, + "local_batch_size": 64, + "micro_batch_size": 32, + "gradient_accumulation_steps": 1, + "num_samples": 400000, + "max_tokens": null + }, + "dataset": { + "name": "roneneldan/TinyStories", + "num_workers": 4, + "num_proc": 4 + }, + "checkpoint": { + "save_dir": "ckpt", + "save_frequency": 300, + "load_path": "" + }, + "logging": { + "use_wandb": false, + "project_name": "picotron", + "run_name": null + }, + "environment": { + "OMP_NUM_THREADS": "1", + "TOKENIZERS_PARALLELISM": "false", + "FLASH_ATTEN": "1" + } +} diff --git a/train.py b/train.py index db5f701..a93185f 100644 --- a/train.py +++ b/train.py @@ -10,6 +10,7 @@ CUDA_DEVICE_MAX_CONNECTIONS=1 torchrun --nproc_per_node=4 --nnodes=1 -- """ import os +import json import time import argparse from src.parallel.context_parallel import parallel_input @@ -57,76 +58,80 @@ def train_step(model, data_loader, device): return acc_loss -if __name__ == "__main__": +if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("--tp_size", type=int, default=1) - parser.add_argument("--cp_size", type=int, default=1) - parser.add_argument("--pp_size", type=int, default=1) - parser.add_argument("--dp_size", type=int, default=1) - parser.add_argument("--use_wandb", action="store_true", default=False) - parser.add_argument("--use_cpu", action="store_true", default=False) - parser.add_argument("--master_addr", type=str, default="localhost") - parser.add_argument("--master_port", type=int, default=29500) - parser.add_argument("--load_path", type=str, default="", help="Path to load the model from") - parser.add_argument("--ckpt_dir", type=str, default="ckpt", help="Directory to save checkpoints") - parser.add_argument("--ckpt_freq", type=int, default=300, help="Frequency to save checkpoints") - + parser.add_argument("--config", type=str, default="", help="Path to config file") args = parser.parse_args() - - os.environ["OMP_NUM_THREADS"] = "1" - os.environ["TOKENIZERS_PARALLELISM"] = "false" - os.environ["FLASH_ATTEN"] = "1" # Use cuda kernels from flash attention repo to accelerate the training. Model dtype should be torch.float16! - os.environ["DEVICE"] = "cuda" if not args.use_cpu else "cpu" - - dtype = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.is_bf16_supported() and not args.use_cpu else torch.float32 # if GPU is not available or not supported, use torch.float32 - assert (dtype == torch.bfloat16 and os.getenv("FLASH_ATTEN") == "1") or os.getenv("FLASH_ATTEN") != "1", "Kernel operations requires dtype=torch.bfloat16" + with open(args.config, "r") as f: + config = json.load(f) + + os.environ["OMP_NUM_THREADS"] = config["environment"]["OMP_NUM_THREADS"] + os.environ["TOKENIZERS_PARALLELISM"] = config["environment"]["TOKENIZERS_PARALLELISM"] + os.environ["FLASH_ATTEN"] = config["environment"]["FLASH_ATTEN"] # Use cuda kernels from flash attention repo to accelerate the training. Model dtype should be torch.float16! + os.environ["DEVICE"] = "cpu" if config["distributed"]["use_cpu"] else "cuda" + + dtype = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.is_bf16_supported() and not config["distributed"]["use_cpu"] else torch.float32 # if GPU is not available or not supported, use torch.float32 + assert (dtype == torch.bfloat16 and os.getenv("FLASH_ATTEN") == "1") or os.getenv("FLASH_ATTEN") != "1", "Kernel operations requires dtype=torch.bfloat16" + + # hyperparameters + SEQ_LEN = config["training"]["seq_length"] + LOCAL_BATCH_SIZE = config["training"]["local_batch_size"] + MICRO_BATCH_SIZE = config["training"]["micro_batch_size"] + LEARNING_RATE = config["training"]["learning_rate"] + NUM_SAMPLES = config["training"]["num_samples"] + MAX_TOKENS = config["training"]["max_tokens"] + SEED = config["training"]["seed"] + TOTAL_TRAIN_STEPS = config["training"]["total_train_steps"] + GRAD_ACC = config["training"]["gradient_accumulation_steps"] + MODEL_NAME = config["model"]["name"] + DATASET_NAME = config["dataset"]["name"] + NUM_WORKERS = config["dataset"]["num_workers"] + NUM_PROC = config["dataset"]["num_proc"] + USE_WANDB = config["logging"]["use_wandb"] + TP_SIZE = config["distributed"]["tp_size"] + PP_SIZE = config["distributed"]["pp_size"] + DP_SIZE = config["distributed"]["dp_size"] + CP_SIZE = config["distributed"]["cp_size"] + LOAD_PATH = config["checkpoint"]["load_path"] + CHECKPOINT_DIR = config["checkpoint"]["save_dir"] + CHECKPOINT_FREQ = config["checkpoint"]["save_frequency"] + local_rank = int(os.environ["LOCAL_RANK"]) world_size = int(os.environ["WORLD_SIZE"]) host = os.environ["MASTER_ADDR"] port = int(os.environ["MASTER_PORT"]) + backend = "gloo" if config["distributed"]["use_cpu"] else "nccl" - ## hyperparameters - SEQ_LEN, LOCAL_BATCH_SIZE, MICRO_BATCH_SIZE, LEARNING_RATE, NUM_SAMPLES, MAX_TOKENS, SEED = 1024, 64, 32, 3e-4, 400000, None, 42 - total_train_steps = 200 - grad_acc = 1 - - assert SEQ_LEN % args.cp_size == 0, "SEQ_LEN must be divisible by cp_size for Context Parallelism" + assert SEQ_LEN % CP_SIZE == 0, "SEQ_LEN must be divisible by cp_size for Context Parallelism" + assert world_size == TP_SIZE * PP_SIZE * DP_SIZE * CP_SIZE, "world_size must be equal to tp_size * pp_size * dp_size * cp_size" - backend = "gloo" if args.use_cpu else "nccl" - if backend == "nccl": torch.cuda.set_device(local_rank) device = torch.device("cuda", local_rank) else: device = torch.device("cpu") - + dist.init_process_group(rank=local_rank, world_size=world_size, backend=backend, init_method=f"tcp://{host}:{port}") - - setup_process_group_manager(tp_size=args.tp_size, cp_size=args.cp_size, pp_size=args.pp_size, dp_size=args.dp_size) + setup_process_group_manager(tp_size=TP_SIZE, cp_size=CP_SIZE, pp_size=PP_SIZE, dp_size=DP_SIZE) is_wandb_rank = pgm.process_group_manager.tp_rank == 0 and pgm.process_group_manager.dp_rank == 0 and pgm.process_group_manager.cp_rank == 0 and pgm.process_group_manager.pp_is_last_stage - # if pgm.process_group_manager.global_rank == 0: - # display_4D_parallelism_grid() - - tokens_per_step = LOCAL_BATCH_SIZE * SEQ_LEN * grad_acc * args.dp_size + tokens_per_step = LOCAL_BATCH_SIZE * SEQ_LEN * GRAD_ACC * DP_SIZE if pgm.process_group_manager.global_rank == 0: print("Tokens per step:", to_readable_format(tokens_per_step), is_print_rank=is_wandb_rank) + set_all_seed(SEED) - dataset_name = "roneneldan/TinyStories" - model_name = "HuggingFaceTB/SmolLM-360M-Instruct" - # model_name = "meta-llama/Llama-2-7b-hf" - config = AutoConfig.from_pretrained(model_name) - config.num_hidden_layers = 16 - config.num_attention_heads = 16 - config.num_key_value_heads = 4 + model_config = AutoConfig.from_pretrained(MODEL_NAME) + model_config.num_hidden_layers = config["model"]["num_hidden_layers"] + model_config.num_attention_heads = config["model"]["num_attention_heads"] + model_config.num_key_value_heads = config["model"]["num_key_value_heads"] start_time = time.time() - model = Llama(config=config) + model = Llama(config=model_config) print("init model time:", time.time()-start_time, is_print_rank=is_wandb_rank) - if is_wandb_rank and args.use_wandb: + if is_wandb_rank and USE_WANDB: wandb.init( project="picotron", name=f"test_convergence_GBS_{tokens_per_step}_{pgm.process_group_manager}", @@ -134,26 +139,26 @@ if __name__ == "__main__": "tensor_parallel_size": pgm.process_group_manager.tp_size, "pipeline_parallel_size": pgm.process_group_manager.pp_size, "data_parallel_size": pgm.process_group_manager.dp_size, - "model": model_name, - "dataset": dataset_name, + "model": config["model"]["name"], + "dataset": config["dataset"]["name"], "max_tokens": MAX_TOKENS, "learning_rate": LEARNING_RATE, "seed": SEED, "micro_batch_size": MICRO_BATCH_SIZE, - "global_batch_size": LOCAL_BATCH_SIZE * args.dp_size * grad_acc, - "gradient_accumulation": grad_acc, + "global_batch_size": LOCAL_BATCH_SIZE * pgm.process_group_manager.dp_size * GRAD_ACC, + "gradient_accumulation": GRAD_ACC, }, ) start_time = time.time() + model.to(dtype).to(device) + if pgm.process_group_manager.tp_world_size > 1: TensorParallel(model) if pgm.process_group_manager.pp_world_size > 1: - model = PipelineParallel(model, config) - - model.to(dtype).to(device) - + model = PipelineParallel(model, model_config) + # Context parallel and Data parallel both need gradient synchronization if pgm.process_group_manager.cp_dp_world_size > 1: model = DataParallel(model) @@ -165,27 +170,34 @@ if __name__ == "__main__": print("model to device time:", time.time()-start_time, is_print_rank=is_wandb_rank) start_time = time.time() - data_loader = MicroBatchDataLoader(local_batch_size=LOCAL_BATCH_SIZE, micro_batch_size=MICRO_BATCH_SIZE, seq_length=SEQ_LEN, dataset_name=dataset_name, tokenizer_name=model_name, grad_acc = grad_acc,num_workers=4, num_proc=4, num_samples=NUM_SAMPLES) + data_loader = MicroBatchDataLoader( + local_batch_size=LOCAL_BATCH_SIZE, + micro_batch_size=MICRO_BATCH_SIZE, + seq_length=SEQ_LEN, + dataset_name=DATASET_NAME, + tokenizer_name=MODEL_NAME, + grad_acc = GRAD_ACC, + num_workers=NUM_WORKERS, + num_proc=NUM_PROC, + num_samples=NUM_SAMPLES + ) print("init dataloader time:", time.time()-start_time, is_print_rank=is_wandb_rank) - tensor_shapes = (data_loader.micro_batch_size, data_loader.seq_length_per_gpu, config.hidden_size) + tensor_shapes = (data_loader.micro_batch_size, data_loader.seq_length_per_gpu, model_config.hidden_size) optimizer = AdamW(model.parameters(), lr=LEARNING_RATE) trained_tokens, step = 0, 0 - if args.load_path: - step, trained_tokens = load_checkpoint(model, optimizer, args.load_path) + if LOAD_PATH: + step, trained_tokens = load_checkpoint(model, optimizer, LOAD_PATH) - checkpoint_dir = args.ckpt_dir - checkpoint_freq = args.ckpt_freq - dist.barrier() - #TODO: Double-check consumed tokens after each steps (for example, MICRO_BATCH_SIZE=2 and using only dp_size=4, num_local_micro_batches=0 => division by 0) - #TODO: Check convergence - #TODO: Try multi-nodes - #TODO: Add activation checkpointing - #TODO: add gradient accumulation + # #TODO: Double-check consumed tokens after each steps (for example, MICRO_BATCH_SIZE=2 and using only dp_size=4, num_local_micro_batches=0 => division by 0) + # #TODO: Check convergence + # #TODO: Try multi-nodes + # #TODO: Add activation checkpointing + # #TODO: add gradient accumulation - while trained_tokens < MAX_TOKENS: + while MAX_TOKENS is None or trained_tokens < MAX_TOKENS: #TODO: Add epoch support # data_loader.set_epoch(step) step_start_time = time.time() @@ -217,17 +229,17 @@ if __name__ == "__main__": f"Memory usage: {torch.cuda.memory_reserved() / 1e9:.2f}GB" , is_print_rank=is_wandb_rank) - if args.use_wandb: + if USE_WANDB: wandb.log({"loss": loss, "tokens_per_step": tokens_per_step, "tokens_per_second": tokens_per_step / step_duration,\ "memory_usage": torch.cuda.memory_reserved() / 1e9, "trained_tokens": trained_tokens}) - if step % checkpoint_freq == 0: - save_checkpoint(model, optimizer, step, trained_tokens, checkpoint_dir+f"/{step}") + if step % CHECKPOINT_FREQ == 0: + save_checkpoint(model, optimizer, step, trained_tokens, CHECKPOINT_DIR+f"/{step}") - if step >= total_train_steps: + if step >= TOTAL_TRAIN_STEPS: break - if is_wandb_rank and args.use_wandb: + if is_wandb_rank and USE_WANDB: wandb.finish() dist.destroy_process_group() diff --git a/utils.py b/utils.py index a52cef0..874d6dd 100644 --- a/utils.py +++ b/utils.py @@ -190,64 +190,4 @@ class MicroBatchDataLoader(DataLoader): except StopIteration: self._iterator = None raise StopIteration - return batch - -## def display_4D_parallelism_grid(): -# #TODO(fmom): fix me -# #TODO(fmom): add color to distinguish between different parallelism groups -# def create_gpu_box(gpu_num, tp, cp, pp): -# return [ -# f"+------+", -# f"|GPU:{gpu_num:<2d}|", -# f"| TP:{tp:d} |", -# f"| CP:{cp:d} |", -# f"| PP:{pp:d} |", -# f"+------+" -# ] -# -# def create_node(start_gpu, tp_size, cp_size, pp_size, node_index): -# boxes = [] -# for i in range(8): # 8 GPUs per node -# gpu_num = start_gpu + i -# tp = gpu_num % tp_size -# cp = (gpu_num // tp_size) % cp_size -# pp = (gpu_num // (tp_size * cp_size)) % pp_size -# boxes.append(create_gpu_box(gpu_num, tp, cp, pp)) -# return [' '.join(row) for row in zip(*boxes)] -# -# def create_dp_box(replica_output): -# width = len(replica_output[0]) + 4 -# top_bottom = f"+{'-' * (width - 2)}+" -# return [top_bottom] + [f"| {line} |" for line in replica_output] + [top_bottom] -# -# tp_size = pgm.process_group_manager.tp_size -# cp_size = pgm.process_group_manager.cp_size -# pp_size = pgm.process_group_manager.pp_size -# dp_size = pgm.process_group_manager.dp_size -# total_gpus_per_replica = tp_size * cp_size * pp_size -# num_nodes_per_replica = (total_gpus_per_replica + 7) // 8 # Round up to nearest whole node -# -# output = [] -# output.append("=== Simplified Parallelism Configuration ===") -# output.append(f"TP Size: {tp_size}, CP Size: {cp_size}, PP Size: {pp_size}, DP Size: {dp_size}") -# output.append(f"Total GPUs for one replica: {total_gpus_per_replica}") -# output.append(f"Number of nodes per replica: {num_nodes_per_replica} (8 GPUs per node)") -# output.append(f"Total GPUs: {total_gpus_per_replica * dp_size}") -# output.append(f"Total nodes: {num_nodes_per_replica * dp_size}") -# output.append("") -# -# for dp in range(dp_size): -# replica_output = [] -# for node in range(num_nodes_per_replica): -# start_gpu = (dp * total_gpus_per_replica) + (node * 8) -# node_output = create_node(start_gpu, tp_size, cp_size, pp_size, node) -# replica_output.append(f"Node {dp * num_nodes_per_replica + node}:") -# replica_output.extend(node_output) -# replica_output.append("") -# -# dp_box = create_dp_box(replica_output) -# output.append(f"Data Parallel Group {dp}:") -# output.extend(dp_box) -# output.append("") -# -# print("\n".join(output)) + return batch \ No newline at end of file