Skip to content

Commit

Permalink
refine the implementation of megatron and mocked dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
zigzagcai committed Sep 25, 2024
1 parent 9d2b679 commit 7af2347
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 113 deletions.
15 changes: 11 additions & 4 deletions internlm/data/build_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ def get_streaming_train_loader_items(data_cfg):


def get_megatron_train_loader_items(data_cfg):
assert data_cfg.get(
"pack_sample_into_one", False
), "megatron dataloader curently only supports pack_sample_into_one=True"
try:
from internlm.data.megatron import helpers # noqa # pylint: disable=W0611
except ImportError:
Expand All @@ -174,14 +177,14 @@ def get_megatron_train_loader_items(data_cfg):
"internlm/data/megatron/helpers.so",
]
)

# NOTICE: Currently we only support single megatron dataset, a.k.a., single .bin and .idx
# Megatron dataset (.bin and.idx) should be generated by Megatron-LM tools/preprocess_data.py
# https://github.com/NVIDIA/Megatron-LM/blob/main/tools/preprocess_data.py
train_ds = build_megatron_dataset(
data_prefix=data_cfg.train_folder,
data_impl=data_cfg.get("data_impl", "infer"),
splits_string="1.0, 0.0, 0.0",
train_valid_test_num_samples=[9600000, 0, 0],
seq_len=data_cfg.seq_len,
seed=data_cfg.get("seed", 1024),
skip_warmup=True,
)

train_sampler = MegatronBatchSampler(
Expand All @@ -202,14 +205,18 @@ def get_mock_train_loader_items(data_cfg):
assert data_cfg.get(
"pack_sample_into_one", False
), "mocked dataloader curently only supports pack_sample_into_one=True"

train_ds = MockedDataset(
train_folder=data_cfg.train_folder,
micro_bsz=data_cfg.micro_bsz,
micro_num=data_cfg.micro_num,
seq_len=data_cfg.seq_len,
)

train_sampler = MockedSequentialBatchSampler(train_ds, data_cfg.micro_num)

train_collate_fn = partial(packed_collate_fn, packed_length=data_cfg.seq_len * data_cfg.micro_bsz)

return train_ds, train_sampler, train_collate_fn


Expand Down
2 changes: 1 addition & 1 deletion internlm/data/megatron/batch_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ def __iter__(self):
start_idx, end_idx = self.get_start_end_idx()
yield batch[start_idx:end_idx]

# TODO: implement copy method that compatible with InternEvo trainstate
# TODO: implement copy method that compatible with InternEvo trainstate ckpt save and load.
def copy(self):
return copy.deepcopy(self)
56 changes: 22 additions & 34 deletions internlm/data/megatron/collaters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,36 @@


def megatron_collate_fn(batch, micro_num, micro_bsz, seq_len):

input_ids_result = [[] for _ in range(micro_num)]
labels_result = [[] for _ in range(micro_num)]
cu_seqlens = []
input_ids_list = [[] for _ in range(micro_num)]
labels_list = [[] for _ in range(micro_num)]
cu_seqlens_list = []
indexes = []
indexes_list = []

for i, item in enumerate(batch):
assert i < micro_num * micro_bsz
seq_len_list = item["text"]
assert len(seq_len_list) == seq_len + 1

micro_bsz_index = i % micro_bsz
micro_num_index = i // micro_bsz

input_ids_result[micro_num_index].append(seq_len_list[:-1])
labels_result[micro_num_index].append(seq_len_list[1:])

cu_seqlens.append(seq_len * micro_bsz_index)
indexes = indexes + list(range(seq_len))
assert len(batch) == micro_bsz * micro_num
for idx, b in enumerate(batch):
tokens = b["text"]
# The length of megatron preprocessed data samples is (seq_len + 1)
# So we use the first seq_len tokens as input and the last seq_len tokens as shifted labels
assert len(tokens) == seq_len + 1
micro_bsz_index = idx % micro_bsz
micro_num_index = idx // micro_bsz
input_ids_list[micro_num_index].append(tokens[:-1])
labels_list[micro_num_index].append(tokens[1:])

if micro_bsz_index == micro_bsz - 1:
input_ids_result[micro_num_index] = torch.cat(
[torch.from_numpy(arr).long() for arr in input_ids_result[micro_num_index]], dim=0
# Since megatron data sample is numpy format, we need to convert it to tensor and concate within micro batch
input_ids_list[micro_num_index] = torch.cat(
[torch.from_numpy(arr) for arr in input_ids_list[micro_num_index]], dim=0
)
labels_result[micro_num_index] = torch.cat(
[torch.from_numpy(arr).long() for arr in labels_result[micro_num_index]], dim=0
labels_list[micro_num_index] = torch.cat(
[torch.from_numpy(arr) for arr in labels_list[micro_num_index]], dim=0
)
cu_seqlens.append(seq_len * micro_bsz)
cu_seqlens_list.append(torch.IntTensor(cu_seqlens))
cu_seqlens = []
indexes_list.append(torch.IntTensor(indexes))
indexes = []

input_ids = torch.stack(input_ids_result)
labels = torch.stack(labels_result)
indexes = torch.stack(indexes_list)
cu_seqlens_list.append(torch.IntTensor([i * seq_len for i in range(micro_bsz + 1)]))
indexes_list.append(torch.IntTensor(list(range(seq_len)) * micro_bsz))

return {
"input_ids": input_ids,
"input_ids": torch.stack(input_ids_list),
"cu_seqlens": cu_seqlens_list,
"indexes": indexes,
"indexes": torch.stack(indexes_list),
"type_ids": torch.zeros(micro_num, micro_bsz * seq_len, dtype=torch.int64),
}, labels
}, torch.stack(labels_list)
90 changes: 17 additions & 73 deletions internlm/data/megatron/dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# adapted from https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/datasets/gpt_dataset.py
# adapted from https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/datasets/indexed_dataset.py

import hashlib
import os
import struct
Expand Down Expand Up @@ -764,82 +765,25 @@ def get_indexed_dataset_(data_prefix, data_impl, skip_warmup):
return indexed_dataset


def get_train_valid_test_split_(splits_string, size):
"""Get dataset splits from comma or '/' separated string list."""

splits = []
if splits_string.find(",") != -1:
splits = [float(s) for s in splits_string.split(",")]
elif splits_string.find("/") != -1:
splits = [float(s) for s in splits_string.split("/")]
else:
splits = [float(splits_string)]
while len(splits) < 3:
splits.append(0.0)
splits = splits[:3]
splits_sum = sum(splits)
assert splits_sum > 0.0
splits = [split / splits_sum for split in splits]
splits_index = [0]
for index, split in enumerate(splits):
splits_index.append(splits_index[index] + int(round(split * float(size))))
diff = splits_index[-1] - size
for index in range(1, len(splits_index)):
splits_index[index] -= diff
assert len(splits_index) == 4
assert splits_index[-1] == size
return splits_index


def build_megatron_dataset(
data_prefix,
data_impl,
splits_string,
train_valid_test_num_samples,
seq_len,
seed,
skip_warmup,
return_doc_ids=False,
*,
data_cache_path=None,
):

# Indexed dataset.
indexed_dataset = get_indexed_dataset_(data_prefix, data_impl, skip_warmup)

total_num_of_documents = indexed_dataset.sizes.shape[0]
splits = get_train_valid_test_split_(splits_string, total_num_of_documents)

# Print stats about the splits.
print_rank_0(" > dataset split:")

def print_split_stats(index, name):
print_rank_0(" {}:".format(name))
print_rank_0(
" document indices in [{}, {}) total of {} "
"documents".format(splits[index], splits[index + 1], splits[index + 1] - splits[index])
)

print_split_stats(0, "train")

def build_dataset(index, name):
dataset = None
if splits[index + 1] > splits[index]:
documents = np.arange(start=splits[index], stop=splits[index + 1], step=1, dtype=np.int32)
dataset = GPTDataset(
name,
data_prefix,
documents,
indexed_dataset,
splits_string,
train_valid_test_num_samples[index],
seq_len,
seed,
return_doc_ids,
data_cache_path=data_cache_path,
)
return dataset

train_dataset = build_dataset(0, "train")

return train_dataset
indexed_dataset = get_indexed_dataset_(data_prefix, data_impl="infer", skip_warmup=True)

# GPT dataset.
return GPTDataset(
name="train",
data_prefix=data_prefix,
documents=np.arange(start=0, stop=indexed_dataset.sizes.shape[0], step=1, dtype=np.int32),
indexed_dataset=indexed_dataset,
splits_string="1.0, 0.0, 0.0", # proportion of dataset for train/valid/test, we set 1.0 for train only
num_samples=gpc.config.data.micro_bsz
* gpc.config.data.micro_num
* gpc.get_world_size(ParallelMode.DATA)
* gpc.config.data.total_steps, # total number of train samples
seq_length=seq_len,
seed=seed,
)
2 changes: 1 addition & 1 deletion internlm/data/mocked/batch_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ def __iter__(self):
def __len__(self):
return (len(self.train_ds) + self.micro_num - 1) // self.micro_num

# TODO: implement copy method that compatible with InternEvo trainstate
# TODO: implement copy method that compatible with InternEvo trainstate ckpt save and load.
def copy(self):
return copy.deepcopy(self)

0 comments on commit 7af2347

Please sign in to comment.