File size: 4,882 Bytes
54ba632 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
from nanotron import logging
from nanotron.config import (
PretrainDatasetsArgs,
)
from nanotron.dataloader import (
clm_process,
dummy_infinite_data_generator,
get_datasets,
get_train_dataloader,
)
from nanotron.logging import log_rank
from nanotron.parallel.pipeline_parallel.utils import get_input_output_pp_ranks
from nanotron.trainer import DistributedTrainer
from nanotron.utils import (
main_rank_first,
)
try:
from huggingface_hub import __version__ as hf_hub_version
from transformers import AutoTokenizer
from transformers import __version__ as tf_version
except ImportError:
hf_hub_version = None
tf_version = None
logger = logging.get_logger(__name__)
def get_dataloader(trainer: DistributedTrainer):
"""Returns a dataloader for training."""
# First, we need to know which ranks to feed the dataloader to
input_pp_rank, output_pp_rank = get_input_output_pp_ranks(model=trainer.model)
# Case 1: Dummy data generator
if trainer.config.data.dataset is None:
log_rank("Using dummy data generator", logger=logger, level=logging.INFO, rank=0)
dataloader = dummy_infinite_data_generator(
micro_batch_size=trainer.micro_batch_size,
sequence_length=trainer.sequence_length,
input_pp_rank=input_pp_rank,
output_pp_rank=output_pp_rank,
vocab_size=trainer.model_config.vocab_size,
seed=trainer.config.data.seed,
parallel_context=trainer.parallel_context,
)()
# Case 2: HuggingFace datasets
elif isinstance(trainer.config.data.dataset, PretrainDatasetsArgs):
log_rank("Using `datasets` library", logger=logger, level=logging.INFO, rank=0)
tokenizer_path = trainer.config.tokenizer.tokenizer_name_or_path
log_rank(
f"Loading tokenizer from {tokenizer_path} and transformers/hf_hub versions {tf_version, hf_hub_version}",
logger=logger,
level=logging.INFO,
rank=0,
)
# We need to the 1st device to process dataset and cache it, then other devices load from cache
with main_rank_first(trainer.parallel_context.world_pg):
# TODO @nouamanetazi: this may timeout before 1st device finishes processing dataset. Can we have a ctxmanager to modify timeout?
# TODO: generalise to include for validation/test splits
# We load the raw dataset
raw_dataset = get_datasets(
hf_dataset_or_datasets=trainer.config.data.dataset.hf_dataset_or_datasets,
splits=trainer.config.data.dataset.hf_dataset_splits,
)["train"]
tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "left"
# We apply the Causal Language Modeling preprocessing
train_dataset = clm_process(
raw_dataset=raw_dataset,
tokenizer=tokenizer,
text_column_name=trainer.config.data.dataset.text_column_name,
dataset_processing_num_proc_per_process=trainer.config.data.dataset.dataset_processing_num_proc_per_process,
dataset_overwrite_cache=trainer.config.data.dataset.dataset_overwrite_cache,
sequence_length=trainer.sequence_length,
)
# We load the processed dataset on the ranks requiring it
dataloader = get_train_dataloader(
train_dataset=train_dataset,
sequence_length=trainer.sequence_length,
parallel_context=trainer.parallel_context,
input_pp_rank=input_pp_rank,
output_pp_rank=output_pp_rank,
micro_batch_size=trainer.micro_batch_size,
consumed_train_samples=trainer.consumed_train_samples,
dataloader_num_workers=trainer.config.data.num_loading_workers,
seed_worker=trainer.config.data.seed,
dataloader_drop_last=True,
)
# Check if we have enough samples for train_steps
assert (
trainer.config.tokens.train_steps - trainer.start_iteration_step
) * trainer.global_batch_size // trainer.parallel_context.dp_pg.size() < len(dataloader), (
f"Dataset is too small for steps ({len(dataloader)} < {(trainer.config.tokens.train_steps - trainer.start_iteration_step) * trainer.global_batch_size // trainer.parallel_context.dp_pg.size()}), "
f"Try train_steps<={len(dataloader) * trainer.parallel_context.dp_pg.size() // trainer.global_batch_size + trainer.start_iteration_step}"
)
else:
raise ValueError(f"Unhandled case of `self.config.data.dataset`. Got: {trainer.config.data.dataset}")
return dataloader
|