import argparse import deepspeed parser = argparse.ArgumentParser(description='sp') parser.add_argument('--basepath', type=str, default='/home/lyh/weights/hf/llama31chat/8B/') parser.add_argument('--trainpath', type=str, default="/home/lyh/code/nlp/developing/vllmbase/vllm/gedata/l318b.jsonl") parser.add_argument('--testpath', type=str, default="/home/lyh/code/nlp/developing/vllmbase/vllm/gedata/0318.json") parser.add_argument('--savedir', type=str, default='0') parser.add_argument('--model_type', type=str, default='llama', choices=['llama', 'qwen3'], help="Model architecture type: 'llama' or 'qwen3'") parser.add_argument("--local_rank", type=int, default=-1, help="local_rank for distributed training on gpus") parser = deepspeed.add_config_arguments(parser) args = parser.parse_args() import json import re deepspeed_config = args.deepspeed_config with open(deepspeed_config) as f: ds_config = json.load(f) # [MODIFIED] Select config path based on model_type config_path_map = { 'llama': 'config.json', 'qwen3': 'config_qwen3.json' } config_path = config_path_map.get(args.model_type, 'config.json') train_config = { "bs": ds_config["train_micro_batch_size_per_gpu"], "num_epochs": 15, "num_workers": 16, "max_len": 1536, "config_path": config_path, "gradient_checkpointing": False } from safetensors import safe_open from transformers import AutoModelForCausalLM, AutoTokenizer import os # os.environ["CUDA_VISIBLE_DEVICES"] = "0,1" import torch from cnets import padding torch.backends.cuda.matmul.allow_tf32 = True from accelerate.utils import set_seed set_seed(0) from cnets import Model from configs import EConfig from datasets import load_dataset from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Union from torch import nn, optim from torch.utils.data import Dataset, DataLoader, DistributedSampler from tqdm import tqdm # import accelerate import numpy as np from transformers import PreTrainedTokenizerBase, get_linear_schedule_with_warmup def build_dataset_rank( tokenizer, datapath, model_type='llama' ): ds = load_dataset('json', data_files=datapath) ds = ds['train'] ds = ds.shuffle(seed=42) ds1 = ds original_columns1 = ds1.column_names num_proc = 1 # Changed from 8 to avoid DeepSpeed pickle issues # [MODIFIED] Auto-detect chat format from conversation string # Will be set dynamically in preprocess_function based on actual format def preprocess_function(examples): new_examples = { "attention_mask": [], "input_ids": [], "loss_mask": [] } for i in range(len(examples['id'])): messages = [ {"role": "system", "content": "You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. Your answers should not include any harmful, unethical, racist, sexist, toxic, dangerous, or illegal content. Please ensure that your responses are socially unbiased and positive in nature.\n\nIf a question does not make any sense, or is not factually coherent, explain why instead of answering something not correct. If you don't know the answer to a question, please don't share false information."}, ] convroles = ["user", "assistant"] roles = {"human": "user", "gpt": "assistant"} source = examples['conversations'][i] if not source: continue if roles[source[0]["from"]] != "user": # Skip the first one if it is not from human source = source[1:] for j, sentence in enumerate(source): role = roles[sentence["from"]] assert role == convroles[j % 2], f"{i}" # if sentence["from"]=="gpt": # sentence["value"]=" "+sentence["value"] messages.append( {"role": role, "content": sentence["value"]} ) # Try to use tokenizer's chat template, fallback to manual ChatML formatting try: conversation = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=False, ) except (ValueError, AttributeError): # Manually format as ChatML (used by Qwen and many others) conversation = "" for msg in messages: role = msg["role"] content = msg["content"] conversation += f"<|im_start|>{role}\n{content}<|im_end|>\n" if not tokenizer.pad_token_id: tokenizer.pad_token_id = tokenizer.unk_token_id input_ids = tokenizer( conversation, return_tensors="pt", add_special_tokens=False, ).input_ids[0] # filtering out the samples which is longer than max_len if len(input_ids) > train_config["max_len"]: continue loss_mask = torch.ones_like(input_ids) # print(i) total_len = len(input_ids) # Auto-detect format and set separators if "<|im_start|>" in conversation and "<|im_end|>" in conversation: # ChatML format (Qwen, default fallback) sep = "<|im_end|>\n<|im_start|>assistant\n" sep2 = "<|im_end|>\n<|im_start|>user\n" elif "<|eot_id|>" in conversation: # LLaMA-3 format sep = "<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n" sep2 = "<|eot_id|><|start_header_id|>user<|end_header_id|>" else: # Unknown format, skip this sample continue turns = conversation.split(sep2) # [MODIFIED] Skip samples with invalid conversation structure if len(turns) < 2: continue turns[1] = turns[0] + sep2 + turns[1] turns = turns[1:] cur_len = 1 loss_mask[:cur_len] = 0 for i, turn in enumerate(turns): if turn == "": break turn_len = len(tokenizer(turn).input_ids) parts = turn.split(sep) if len(parts) != 2: break parts[0] += sep # "-2" is hardcoded for the Llama tokenizer to make the offset correct. instruction_len = len(tokenizer(parts[0]).input_ids) - 1 # Ignore the user instructions if i == 0: loss_mask[cur_len: cur_len + instruction_len - 2] = 0 else: loss_mask[cur_len - 3: cur_len + instruction_len + 1] = 0 cur_len += turn_len if i != 0: cur_len += 3 # cur_len+=2 # if i != 0 and not tokenizer.legacy: # # The legacy and non-legacy modes handle special tokens differently # cur_len -= 1 loss_mask[cur_len:] = 0 attention_mask = torch.ones_like(loss_mask) # new_examples["conversation"].append(conversation) new_examples["input_ids"].append(input_ids[None, :]) new_examples["loss_mask"].append(loss_mask[None, :]) new_examples["attention_mask"].append(attention_mask[None, :]) return new_examples ds1 = ds1.map( preprocess_function, batched=True, num_proc=num_proc, remove_columns=original_columns1, load_from_cache_file=False ) ds1.set_format(type="torch") return ds1 class DataCollatorWithPadding: def paddingtensor(self, intensors, N): B, n, S = intensors.shape # padding_tensor = torch.zeros(B, N - n, S,dtype=intensors.dtype) padding_tensor = torch.zeros(B, N - n, S, dtype=intensors.dtype) outtensors = torch.cat((intensors, padding_tensor), dim=1) return outtensors def paddingtensor2D(self, intensors, N): B, n = intensors.shape padding_tensor = torch.zeros(B, N - n, dtype=intensors.dtype) outtensors = torch.cat((intensors, padding_tensor), dim=1) return outtensors def __call__(self, features: List[Dict[str, Any]]) -> Dict[str, Any]: max_length = max(item['input_ids'].shape[1] for item in features) batch_input_ids = torch.cat([self.paddingtensor2D(item['input_ids'], max_length) for item in features]) batch_attention_mask = torch.cat( [self.paddingtensor2D(item['attention_mask'], max_length) for item in features]) batch_loss_mask = torch.cat( [self.paddingtensor2D(item['loss_mask'], max_length) for item in features]) batch = { "input_ids": batch_input_ids, "attention_mask": batch_attention_mask, "loss_mask": batch_loss_mask, } return batch tokenizer = AutoTokenizer.from_pretrained(args.basepath) # [MODIFIED] Pass model_type to build_dataset_rank traindataset = build_dataset_rank(tokenizer, args.trainpath, model_type=args.model_type) testdataset = build_dataset_rank(tokenizer, args.testpath, model_type=args.model_type) config = EConfig.from_pretrained(train_config["config_path"]) # [MODIFIED] Pass model_type to Model model = Model(config, ds_config, train_config, path=args.basepath, load_emb=True, load_head=True, model_type=args.model_type) model.scandata(args.trainpath, args.basepath) criterion = nn.SmoothL1Loss(reduction="none") num_epochs = train_config["num_epochs"] # Only pass trainable parameters to DeepSpeed (frozen params cause grad tracking errors) trainable_params = [p for p in model.parameters() if p.requires_grad] model_engine, optimizer, _, _ = deepspeed.initialize(args=args, model=model, model_parameters=trainable_params, ) global_rank = deepspeed.comm.get_rank() rank = deepspeed.comm.get_local_rank() world_size = deepspeed.comm.get_world_size() if global_rank == 0: import wandb wandb.login(key="dcac9b6b99c4203de2a920453357bc8ed55a5baf") wandb.init(project="qwen3", entity="model-acceleration", config=ds_config) os.makedirs(args.savedir, exist_ok=True) sampler = DistributedSampler(testdataset, num_replicas=world_size, rank=global_rank, shuffle=False) test_loader = DataLoader(testdataset, batch_size=train_config["bs"], sampler=sampler, num_workers=0, pin_memory=True, collate_fn=DataCollatorWithPadding()) train_sampler = DistributedSampler(traindataset, num_replicas=world_size, rank=global_rank, shuffle=True) train_loader = DataLoader(traindataset, batch_size=train_config["bs"], sampler=train_sampler, num_workers=0, pin_memory=True, collate_fn=DataCollatorWithPadding()) def find_max_state_with_file(directory, filename="zero_to_fp32.py"): max_a = -1 for subdir in os.listdir(directory): match = re.match(r"state_(\d+)", subdir) if match: a_value = int(match.group(1)) subdir_path = os.path.join(directory, subdir) file_path = os.path.join(subdir_path, filename) if os.path.isdir(subdir_path) and os.path.exists(file_path): max_a = max(max_a, a_value) if max_a == -1: return None, 0 return f"{directory}/state_{max_a}", max_a + 1 checkpoint_path, start_epoch = find_max_state_with_file(args.savedir) if checkpoint_path: print(f"load from {checkpoint_path}") model_engine.load_checkpoint(checkpoint_path) for epoch in range(start_epoch, num_epochs): train_sampler.set_epoch(epoch+1) print(f"Now training epoch {epoch}") model.train() epoch_acces = [[] for _ in range(model.length)] epoch_plosses = [[] for _ in range(model.length)] for batch_idx, data in enumerate(tqdm(train_loader)): model.zero_grad() plosses, vlosses, acces = model_engine(input_ids=data["input_ids"].to(rank), attention_mask=data["attention_mask"].to(rank), loss_mask=data["loss_mask"], ) ploss_weight = [0.8 ** i for i in range(len(plosses))] ploss = sum([ploss_weight[i] * plosses[i] for i in range(len(plosses))]) loss = ploss model_engine.backward(loss) model_engine.step() if global_rank == 0: logdict = {"train/lr": optimizer.optimizer.param_groups[0]["lr"]} for i in range(len(plosses)): logdict[f"train/ploss_{i}"] = plosses[i].item() for i in range(len(acces)): logdict[f"train/acc_{i}"] = acces[i] wandb.log(logdict) epoch_acces = [epoch_acces[i] + [acces[i]] for i in range(len(acces))] epoch_plosses = [epoch_plosses[i] + [plosses[i].item()] for i in range(len(plosses))] for i in range(len(epoch_acces)): acc_i = torch.tensor(epoch_acces[i]).cuda().mean() deepspeed.comm.all_reduce(acc_i, op=deepspeed.comm.ReduceOp.AVG) acc_i = acc_i.item() if global_rank == 0: wandb.log({f"train/epochacc_{i}": acc_i}) print(f"Train Epoch [{epoch + 1}/{num_epochs}], position {i}, Acc: {acc_i:.2f}") for i in range(len(epoch_plosses)): loss_i = torch.tensor(epoch_plosses[i]).cuda().mean() deepspeed.comm.all_reduce(loss_i, op=deepspeed.comm.ReduceOp.AVG) loss_i = loss_i.item() if global_rank == 0: wandb.log({f"train/epochploss_{i}": loss_i}) print(f"Train Epoch [{epoch + 1}/{num_epochs}], position {i}, pLoss: {loss_i:.2f}") epoch_acces = [[] for _ in range(model.length)] epoch_plosses = [[] for _ in range(model.length)] for batch_idx, data in enumerate(tqdm(test_loader)): with torch.no_grad(): plosses, vlosses, acces = model_engine(input_ids=data["input_ids"].to(rank), attention_mask=data["attention_mask"].to(rank), loss_mask=data["loss_mask"], ) epoch_acces = [epoch_acces[i] + [acces[i]] for i in range(len(acces))] epoch_plosses = [epoch_plosses[i] + [plosses[i].item()] for i in range(len(plosses))] for i in range(len(epoch_acces)): acc_i = torch.tensor(epoch_acces[i]).cuda().mean() deepspeed.comm.all_reduce(acc_i, op=deepspeed.comm.ReduceOp.AVG) acc_i = acc_i.item() if global_rank == 0: wandb.log({f"test/epochacc_{i}": acc_i}) print(f"Test Epoch [{epoch + 1}/{num_epochs}], position {i}, Acc: {acc_i:.2f}") for i in range(len(epoch_plosses)): loss_i = torch.tensor(epoch_plosses[i]).cuda().mean() deepspeed.comm.all_reduce(loss_i, op=deepspeed.comm.ReduceOp.AVG) loss_i = loss_i.item() if global_rank == 0: wandb.log({f"test/epochploss_{i}": loss_i}) print(f"Test Epoch [{epoch + 1}/{num_epochs}], position {i}, pLoss: {loss_i:.2f}") # clear out the redundance cahce after each step torch.cuda.empty_cache() # 매 epoch마다 체크포인트 저장 (학습 재개 가능하도록) model_engine.save_16bit_model(f"{args.savedir}/state_{epoch}", exclude_frozen_parameters=True) deepspeed.DeepSpeedEngine.save_checkpoint(model_engine, save_dir=f"{args.savedir}/state_{epoch}") # 디스크 공간 절약: 오래된 체크포인트 삭제 (최근 3개만 유지) if global_rank == 0 and epoch > 2: old_checkpoint = f"{args.savedir}/state_{epoch - 3}" if os.path.exists(old_checkpoint): import shutil shutil.rmtree(old_checkpoint) print(f"Removed old checkpoint: {old_checkpoint}")