# -*- coding: utf-8 -*- # Copyright 2023 XuMing(xuming624@qq.com) and The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Fine-tuning the library models for causal language modeling (GPT, GPT-2, CTRL, ...) on a text file or a dataset. part of this code is adapted from https://github.com/huggingface/transformers/blob/main/examples/pytorch/language-modeling/run_clm.py """ import math import os from dataclasses import dataclass, field from glob import glob from itertools import chain from typing import Optional, List, Dict, Any, Mapping import numpy as np import torch from datasets import load_dataset from loguru import logger from peft import LoraConfig, TaskType, get_peft_model, PeftModel, prepare_model_for_int8_training from sklearn.metrics import accuracy_score from transformers import ( AutoConfig, BloomForCausalLM, AutoModelForCausalLM, AutoModel, LlamaTokenizer, LlamaForCausalLM, BloomTokenizerFast, AutoTokenizer, HfArgumentParser, Trainer, TrainingArguments, is_torch_tpu_available, set_seed, ) from transformers.trainer import TRAINING_ARGS_NAME from transformers.utils.versions import require_version MODEL_CLASSES = { "bloom": (AutoConfig, BloomForCausalLM, BloomTokenizerFast), "chatglm": (AutoConfig, AutoModel, AutoTokenizer), "llama": (AutoConfig, LlamaForCausalLM, LlamaTokenizer), "baichuan": (AutoConfig, AutoModelForCausalLM, AutoTokenizer), "auto": (AutoConfig, AutoModelForCausalLM, AutoTokenizer), } @dataclass class ModelArguments: """ Arguments pertaining to which model/config/tokenizer we are going to fine-tune, or train from scratch. """ model_type: str = field( default=None, metadata={"help": "Model type selected in the list: " + ", ".join(MODEL_CLASSES.keys())} ) model_name_or_path: Optional[str] = field( default=None, metadata={ "help": ( "The model checkpoint for weights initialization.Don't set if you want to train a model from scratch." ) }, ) tokenizer_name_or_path: Optional[str] = field( default=None, metadata={ "help": ( "The tokenizer for weights initialization.Don't set if you want to train a model from scratch." ) }, ) load_in_8bit: bool = field(default=False, metadata={"help": "Whether to load the model in 8bit mode or not."}) cache_dir: Optional[str] = field( default=None, metadata={"help": "Where do you want to store the pretrained models downloaded from huggingface.co"}, ) use_fast_tokenizer: bool = field( default=False, metadata={"help": "Whether to use one of the fast tokenizer (backed by the tokenizers library) or not."}, ) torch_dtype: Optional[str] = field( default=None, metadata={ "help": ( "Override the default `torch.dtype` and load the model under this dtype. If `auto` is passed, the " "dtype will be automatically derived from the model's weights." ), "choices": ["auto", "bfloat16", "float16", "float32"], }, ) device_map: Optional[str] = field( default="auto", metadata={"help": "Device to map model to. If `auto` is passed, the device will be selected automatically. "}, ) trust_remote_code: bool = field( default=True, metadata={"help": "Whether to trust remote code when loading a model from a remote checkpoint."}, ) def __post_init__(self): if self.model_type is None: raise ValueError( "You must specify a valid model_type to run training. Available model types are " + ", ".join( MODEL_CLASSES.keys())) if self.model_name_or_path is None: raise ValueError("You must specify a valid model_name_or_path to run training.") @dataclass class DataTrainingArguments: """ Arguments pertaining to what data we are going to input our model for training and eval. """ dataset_name: Optional[str] = field( default=None, metadata={"help": "The name of the dataset to use (via the datasets library)."} ) dataset_config_name: Optional[str] = field( default=None, metadata={"help": "The configuration name of the dataset to use (via the datasets library)."} ) train_file_dir: Optional[str] = field(default=None, metadata={"help": "The train text data file folder."}) validation_file_dir: Optional[str] = field( default=None, metadata={"help": "An optional input evaluation data file to evaluate the perplexity on text file folder."}, ) max_train_samples: Optional[int] = field( default=None, metadata={ "help": ( "For debugging purposes or quicker training, truncate the number of training examples to this " "value if set." ) }, ) max_eval_samples: Optional[int] = field( default=None, metadata={ "help": ( "For debugging purposes or quicker training, truncate the number of evaluation examples to this " "value if set." ) }, ) streaming: bool = field(default=False, metadata={"help": "Enable streaming mode"}) block_size: Optional[int] = field( default=1024, metadata={ "help": ( "Optional input sequence length after tokenization. " "The training dataset will be truncated in block of this size for training. " "Default to the model max input length for single sentence inputs (take into account special tokens)." ) }, ) overwrite_cache: bool = field( default=False, metadata={"help": "Overwrite the cached training and evaluation sets"} ) validation_split_percentage: Optional[int] = field( default=1, metadata={ "help": "The percentage of the train set used as validation set in case there's no validation split" }, ) preprocessing_num_workers: Optional[int] = field( default=None, metadata={"help": "The number of processes to use for the preprocessing."}, ) keep_linebreaks: bool = field( default=True, metadata={"help": "Whether to keep line breaks when using TXT files or not."} ) def __post_init__(self): if self.streaming: require_version("datasets>=2.0.0", "The streaming feature requires `datasets>=2.0.0`") @dataclass class PeftArguments(TrainingArguments): use_peft: bool = field(default=True, metadata={"help": "Whether to use peft"}) target_modules: Optional[str] = field(default="all") lora_rank: Optional[int] = field(default=8) lora_dropout: Optional[float] = field(default=0.05) lora_alpha: Optional[float] = field(default=32.0) modules_to_save: Optional[str] = field(default=None) peft_path: Optional[str] = field(default=None) def accuracy(predictions, references, normalize=True, sample_weight=None): return { "accuracy": float(accuracy_score(references, predictions, normalize=normalize, sample_weight=sample_weight)) } def compute_metrics(eval_preds): preds, labels = eval_preds # preds have the same shape as the labels, after the argmax(-1) has been calculated # by preprocess_logits_for_metrics, we need to shift the labels labels = labels[:, 1:].reshape(-1) preds = preds[:, :-1].reshape(-1) return accuracy(predictions=preds, references=labels) def preprocess_logits_for_metrics(logits, labels): if isinstance(logits, tuple): # Depending on the model and config, logits may contain extra tensors, # like past_key_values, but logits always come first logits = logits[0] return logits.argmax(dim=-1) def fault_tolerance_data_collator(features: List) -> Dict[str, Any]: if not isinstance(features[0], Mapping): features = [vars(f) for f in features] first = features[0] batch = {} # Special handling for labels. # Ensure that tensor is created with the correct type if "label" in first and first["label"] is not None: label = first["label"].item() if isinstance(first["label"], torch.Tensor) else first["label"] dtype = torch.long if isinstance(label, int) else torch.float batch["labels"] = torch.tensor([f["label"] for f in features], dtype=dtype) elif "label_ids" in first and first["label_ids"] is not None: if isinstance(first["label_ids"], torch.Tensor): batch["labels"] = torch.stack([f["label_ids"] for f in features]) else: dtype = torch.long if type(first["label_ids"][0]) is int else torch.float batch["labels"] = torch.tensor([f["label_ids"] for f in features], dtype=dtype) # Handling of all other possible keys. # Again, we will use the first element to figure out which key/values are not None for this model. try: for k, v in first.items(): if k not in ("label", "label_ids") and v is not None and not isinstance(v, str): if isinstance(v, torch.Tensor): batch[k] = torch.stack([f[k] for f in features]) elif isinstance(v, np.ndarray): batch[k] = torch.tensor(np.stack([f[k] for f in features])) else: batch[k] = torch.tensor([f[k] for f in features]) except ValueError: # quick fix by simply take the first example for k, v in first.items(): if k not in ("label", "label_ids") and v is not None and not isinstance(v, str): if isinstance(v, torch.Tensor): batch[k] = torch.stack([features[0][k]] * len(features)) elif isinstance(v, np.ndarray): batch[k] = torch.tensor(np.stack([features[0][k]] * len(features))) else: batch[k] = torch.tensor([features[0][k]] * len(features)) return batch class GroupTextsBuilder: def __init__(self, max_seq_length): self.max_seq_length = max_seq_length def __call__(self, examples): # Concatenate all texts. firsts = {k: examples[k][0][0] for k in examples.keys()} lasts = {k: examples[k][0][-1] for k in examples.keys()} contents = {k: sum([vi[1:-1] for vi in v], []) for k, v in examples.items()} total_length = len(contents[list(examples.keys())[0]]) content_length = self.max_seq_length - 2 if total_length >= content_length: total_length = (total_length // content_length) * content_length # Split by chunks of max_len. result = { k: [[firsts[k]] + t[i: i + content_length] + [lasts[k]] for i in range(0, total_length, content_length)] for k, t in contents.items()} return result class SavePeftModelTrainer(Trainer): """ Trainer for lora models """ def save_model(self, output_dir=None, _internal_call=False): """Save the LoRA model.""" os.makedirs(output_dir, exist_ok=True) torch.save(self.args, os.path.join(output_dir, TRAINING_ARGS_NAME)) self.model.save_pretrained(output_dir) def save_model(output_dir, model, tokenizer, args): """Save the model and the tokenizer.""" os.makedirs(output_dir, exist_ok=True) # Take care of distributed/parallel training model_to_save = model.module if hasattr(model, "module") else model model_to_save.save_pretrained(output_dir) tokenizer.save_pretrained(output_dir) torch.save(args, os.path.join(output_dir, TRAINING_ARGS_NAME)) def print_trainable_parameters(model): """ Prints the number of trainable parameters in the model. """ trainable_params = 0 all_param = 0 for _, param in model.named_parameters(): all_param += param.numel() if param.requires_grad: trainable_params += param.numel() print( f"trainable params: {trainable_params} || all params: {all_param} || trainable%: {100 * trainable_params / all_param}" ) def find_all_linear_names(peft_model, int4=False, int8=False): """Find all linear layer names in the model. reference from qlora paper.""" cls = torch.nn.Linear if int4 or int8: import bitsandbytes as bnb if int4: cls = bnb.nn.Linear4bit elif int8: cls = bnb.nn.Linear8bitLt lora_module_names = set() for name, module in peft_model.named_modules(): if isinstance(module, cls): # last layer is not add to lora_module_names if 'lm_head' in name: continue names = name.split('.') lora_module_names.add(names[0] if len(names) == 1 else names[-1]) return sorted(lora_module_names) def main(): parser = HfArgumentParser((ModelArguments, DataTrainingArguments, PeftArguments)) model_args, data_args, training_args = parser.parse_args_into_dataclasses() logger.info(f"Model args: {model_args}") logger.info(f"Data args: {data_args}") logger.info(f"Training args: {training_args}") logger.info( f"Process rank: {training_args.local_rank}, device: {training_args.device}, n_gpu: {training_args.n_gpu}" + f" distributed training: {bool(training_args.local_rank != -1)}, 16-bits training: {training_args.fp16}" ) # Set seed before initializing model. set_seed(training_args.seed) # Load tokenizer if not model_args.model_type: raise ValueError("Please specify a model_type, e.g. llama, chatglm, bloom, etc.") config_class, model_class, tokenizer_class = MODEL_CLASSES[model_args.model_type] tokenizer_kwargs = { "cache_dir": model_args.cache_dir, "use_fast": model_args.use_fast_tokenizer, "trust_remote_code": model_args.trust_remote_code, } tokenizer_name_or_path = model_args.tokenizer_name_or_path if not tokenizer_name_or_path: tokenizer_name_or_path = model_args.model_name_or_path tokenizer = tokenizer_class.from_pretrained(tokenizer_name_or_path, **tokenizer_kwargs) # Preprocessing the datasets. def tokenize_function(examples): return tokenizer(examples["text"]) if data_args.block_size is None: block_size = tokenizer.model_max_length if block_size > 2048: logger.warning( "The chosen tokenizer supports a `model_max_length` that is longer than the default `block_size` value" " of 2048. If you would like to use a longer `block_size` up to `tokenizer.model_max_length` you can" " override this default with `--block_size xxx`." ) else: if data_args.block_size > tokenizer.model_max_length: logger.warning( f"The block_size passed ({data_args.block_size}) is larger than the maximum length for the model" f"({tokenizer.model_max_length}). Using block_size={tokenizer.model_max_length}." ) block_size = min(data_args.block_size, tokenizer.model_max_length) # Main data processing function that will concatenate all texts from our dataset and generate chunks of block_size. def group_texts(examples): # Concatenate all texts. concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()} total_length = len(concatenated_examples[list(examples.keys())[0]]) # We drop the small remainder, we could add padding if the model supported it instead of this drop, you can # customize this part to your needs. if total_length >= block_size: total_length = (total_length // block_size) * block_size # Split by chunks of max_len. result = { k: [t[i: i + block_size] for i in range(0, total_length, block_size)] for k, t in concatenated_examples.items() } result["labels"] = result["input_ids"].copy() return result # Get the datasets: you can either provide your own CSV/JSON/TXT training and evaluation files (see below) # or just provide the name of one of the public datasets available on the hub at https://huggingface.co/datasets/ # (the dataset will be downloaded automatically from the datasets Hub). # # For CSV/JSON files, this script will use the column called 'text' or the first column if no column called # 'text' is found. You can easily tweak this behavior (see below). # # In distributed training, the load_dataset function guarantee that only one local process can concurrently # download the dataset. if data_args.dataset_name is not None: # Downloading and loading a dataset from the hub. raw_datasets = load_dataset( data_args.dataset_name, data_args.dataset_config_name, cache_dir=model_args.cache_dir, streaming=data_args.streaming, ) if "validation" not in raw_datasets.keys(): raw_datasets["validation"] = load_dataset( data_args.dataset_name, data_args.dataset_config_name, split=f"train[:{data_args.validation_split_percentage}%]", cache_dir=model_args.cache_dir, streaming=data_args.streaming, ) raw_datasets["train"] = load_dataset( data_args.dataset_name, data_args.dataset_config_name, split=f"train[{data_args.validation_split_percentage}%:]", cache_dir=model_args.cache_dir, streaming=data_args.streaming, ) else: data_files = {} dataset_args = {} if data_args.train_file_dir is not None and os.path.exists(data_args.train_file_dir): train_data_files = glob(f'{data_args.train_file_dir}/**/*.txt', recursive=True) + glob( f'{data_args.train_file_dir}/**/*.json', recursive=True) + glob( f'{data_args.train_file_dir}/**/*.jsonl', recursive=True) logger.info(f"train files: {train_data_files}") # Train data files must be same type, e.g. all txt or all jsonl types = [f.split('.')[-1] for f in train_data_files] if len(set(types)) > 1: raise ValueError(f"train files must be same type, e.g. all txt or all jsonl, but got {types}") data_files["train"] = train_data_files if data_args.validation_file_dir is not None and os.path.exists(data_args.validation_file_dir): eval_data_files = glob(f'{data_args.validation_file_dir}/**/*.txt', recursive=True) + glob( f'{data_args.train_file_dir}/**/*.json', recursive=True) + glob( f'{data_args.train_file_dir}/**/*.jsonl', recursive=True) logger.info(f"eval files: {eval_data_files}") data_files["validation"] = eval_data_files # Train data files must be same type, e.g. all txt or all jsonl types = [f.split('.')[-1] for f in eval_data_files] if len(set(types)) > 1: raise ValueError(f"train files must be same type, e.g. all txt or all jsonl, but got {types}") extension = "text" if data_files["train"][0].endswith('txt') else 'json' if extension == "text": dataset_args["keep_linebreaks"] = data_args.keep_linebreaks raw_datasets = load_dataset( extension, data_files=data_files, cache_dir=model_args.cache_dir, **dataset_args, ) # If no validation data is there, validation_split_percentage will be used to divide the dataset. if "validation" not in raw_datasets.keys(): raw_datasets["validation"] = load_dataset( extension, data_files=data_files, split=f"train[:{data_args.validation_split_percentage}%]", cache_dir=model_args.cache_dir, **dataset_args, ) raw_datasets["train"] = load_dataset( extension, data_files=data_files, split=f"train[{data_args.validation_split_percentage}%:]", cache_dir=model_args.cache_dir, **dataset_args, ) logger.info(f"Raw datasets: {raw_datasets}") # Preprocessing the datasets. if training_args.do_train: column_names = list(raw_datasets["train"].features) else: column_names = list(raw_datasets["validation"].features) with training_args.main_process_first(desc="Dataset tokenization and grouping"): if not data_args.streaming: tokenized_datasets = raw_datasets.map( tokenize_function, batched=True, num_proc=data_args.preprocessing_num_workers, remove_columns=column_names, load_from_cache_file=not data_args.overwrite_cache, desc="Running tokenizer on dataset", ) lm_datasets = tokenized_datasets.map( group_texts, batched=True, num_proc=data_args.preprocessing_num_workers, load_from_cache_file=not data_args.overwrite_cache, desc=f"Grouping texts in chunks of {block_size}", ) else: tokenized_datasets = raw_datasets.map( tokenize_function, batched=True, remove_columns=column_names, ) lm_datasets = tokenized_datasets.map( group_texts, batched=True, ) train_dataset = None max_train_samples = 0 if training_args.do_train: if "train" not in tokenized_datasets: raise ValueError("--do_train requires a train dataset") train_dataset = lm_datasets['train'] max_train_samples = len(train_dataset) if data_args.max_train_samples is not None and data_args.max_train_samples > 0: max_train_samples = min(len(train_dataset), data_args.max_train_samples) train_dataset = train_dataset.select(range(max_train_samples)) logger.debug(f"Num train_samples: {len(train_dataset)}") logger.debug("Tokenized training example:") logger.debug(tokenizer.decode(train_dataset[0]['input_ids'])) eval_dataset = None max_eval_samples = 0 if training_args.do_eval: if "validation" not in tokenized_datasets: raise ValueError("--do_eval requires a validation dataset") eval_dataset = lm_datasets["validation"] max_eval_samples = len(eval_dataset) if data_args.max_eval_samples is not None and data_args.max_eval_samples > 0: max_eval_samples = min(len(eval_dataset), data_args.max_eval_samples) eval_dataset = eval_dataset.select(range(max_eval_samples)) logger.debug(f"Num eval_samples: {len(eval_dataset)}") logger.debug("Tokenized eval example:") logger.debug(tokenizer.decode(eval_dataset[0]['input_ids'])) # Load model if model_args.model_type and model_args.model_name_or_path: torch_dtype = ( model_args.torch_dtype if model_args.torch_dtype in ["auto", None] else getattr(torch, model_args.torch_dtype) ) world_size = int(os.environ.get("WORLD_SIZE", 1)) ddp = world_size != 1 if ddp: model_args.device_map = {"": int(os.environ["LOCAL_RANK"]) or 0} config = config_class.from_pretrained( model_args.model_name_or_path, torch_dtype=torch_dtype, trust_remote_code=model_args.trust_remote_code, cache_dir=model_args.cache_dir ) model = model_class.from_pretrained( model_args.model_name_or_path, config=config, load_in_8bit=model_args.load_in_8bit, device_map=model_args.device_map, trust_remote_code=model_args.trust_remote_code, ) else: raise ValueError(f"Error, model_name_or_path is None, Continue PT must be loaded from a pre-trained model") if training_args.use_peft: if training_args.peft_path is not None: logger.info(f"Peft from pre-trained model: {training_args.peft_path}") model = PeftModel.from_pretrained(model, training_args.peft_path, is_trainable=True) else: logger.info("Init new peft model") target_modules = training_args.target_modules.split(',') if training_args.target_modules else None if target_modules and 'all' in target_modules: target_modules = find_all_linear_names(model, int4=False, int8=model_args.load_in_8bit) modules_to_save = training_args.modules_to_save if modules_to_save is not None: modules_to_save = modules_to_save.split(',') logger.info(f"Peft target_modules: {target_modules}") logger.info(f"Peft lora_rank: {training_args.lora_rank}") peft_config = LoraConfig( task_type=TaskType.CAUSAL_LM, target_modules=target_modules, inference_mode=False, r=training_args.lora_rank, lora_alpha=training_args.lora_alpha, lora_dropout=training_args.lora_dropout, modules_to_save=modules_to_save) model = get_peft_model(model, peft_config) if model_args.load_in_8bit: model = prepare_model_for_int8_training(model) model.print_trainable_parameters() else: logger.info("Full parameters training") model = model.float() print_trainable_parameters(model) # Initialize our Trainer if training_args.gradient_checkpointing: model.gradient_checkpointing_enable() model.config.use_cache = False else: model.config.use_cache = True model.enable_input_require_grads() if not ddp and torch.cuda.device_count() > 1: # Keeps Trainer from trying its own DataParallelism when more than 1 gpu is available model.is_parallelizable = True model.model_parallel = True trainer = SavePeftModelTrainer( model=model, args=training_args, train_dataset=train_dataset if training_args.do_train else None, eval_dataset=eval_dataset if training_args.do_eval else None, tokenizer=tokenizer, data_collator=fault_tolerance_data_collator, compute_metrics=compute_metrics if training_args.do_eval and not is_torch_tpu_available() else None, preprocess_logits_for_metrics=preprocess_logits_for_metrics if training_args.do_eval and not is_torch_tpu_available() else None, ) # Training if training_args.do_train: logger.info("*** Train ***") logger.debug(f"Train dataloader example: {next(iter(trainer.get_train_dataloader()))}") checkpoint = None if training_args.resume_from_checkpoint is not None: checkpoint = training_args.resume_from_checkpoint train_result = trainer.train(resume_from_checkpoint=checkpoint) metrics = train_result.metrics metrics["train_samples"] = max_train_samples logger.debug(f"Training metrics: {metrics}") trainer.log_metrics("train", metrics) trainer.save_metrics("train", metrics) trainer.save_state() logger.info(f"Saving model checkpoint to {training_args.output_dir}") save_model(training_args.output_dir, model, tokenizer, training_args) # Evaluation if training_args.do_eval and trainer.is_world_process_zero(): logger.info("*** Evaluate ***") metrics = trainer.evaluate() metrics["eval_samples"] = max_eval_samples try: perplexity = math.exp(metrics["eval_loss"]) except OverflowError: perplexity = float("inf") metrics["perplexity"] = perplexity logger.debug(f"Eval metrics: {metrics}") trainer.log_metrics("eval", metrics) trainer.save_metrics("eval", metrics) if __name__ == "__main__": main()