Skip to content

Commit

Permalink
feat(gpc): remove unit-test code
Browse files Browse the repository at this point in the history
  • Loading branch information
mwiacx committed Oct 29, 2024
1 parent 8c850e4 commit c87dc8f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 98 deletions.
97 changes: 2 additions & 95 deletions internlm/core/context/parallel_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
# adopted from https://github.com/hpcaitech/ColossalAI/blob/main/colossalai/context

import inspect
import math
import random
import socket
import sys
Expand All @@ -22,7 +21,6 @@
from internlm.utils.timeout import LLM_NCCL_TIMEOUT
from internlm.utils.utils import TensorParallelMode

from . import process_group_initializer as pgroup_initializer
from .process_group_initializer import (
GroupConfig,
ParallelMode,
Expand Down Expand Up @@ -629,24 +627,6 @@ def init_parallel_groups(self):

self.check_sanity()

initializer_args = [
rank,
world_size,
self.weight_parallel_size,
self.weight_data_parallel_size,
self.sequence_parallel_size,
self.data_parallel_size,
self.pipeline_parallel_size,
self.tensor_parallel_size,
self.zero1_parallel_size,
self.nettest_parallel_size,
self.expert_parallel_size,
self.expert_tensor_parallel_size,
self.expert_weight_parallel_size,
self.expert_data_parallel_size,
parallel_config.sequence_2D,
]

parallel_sizes = {
ParallelMode.TENSOR: self.tensor_parallel_size,
ParallelMode.SEQUENCE: self.sequence_parallel_size,
Expand All @@ -655,8 +635,8 @@ def init_parallel_groups(self):
ParallelMode.ZERO1: self.zero1_parallel_size,
ParallelMode.WEIGHT: self.weight_parallel_size,
ParallelMode.WEIGHT_DATA: self.weight_data_parallel_size,
ParallelMode.NETTEST: self.nettest_parallel_size, # FIXME: impl it.
ParallelMode.EXPERT: self.expert_parallel_size, # FIXME: anonymous?
ParallelMode.NETTEST: self.nettest_parallel_size,
ParallelMode.EXPERT: self.expert_parallel_size,
ParallelMode.EXPERT_WEIGHT: self.expert_weight_parallel_size,
ParallelMode.EXPERT_TENSOR: self.expert_tensor_parallel_size,
ParallelMode.EXPERT_DATA: self.expert_data_parallel_size,
Expand Down Expand Up @@ -696,83 +676,10 @@ def init_parallel_groups(self):
generate_2d_attn_process_group(world_size, rank, parallel_config.sequence_2D, parallel_sizes)
)

# print(f"rank{rank}: group_results={group_results}", flush=True)

def _check_helper(result) -> bool:
try:
_idx = [_r[-1] for _r in group_results].index(result[-1])
except ValueError:
if rank == 0:
print(f"WARN: {result[-1]} not exist in new code", flush=True)
return

new_res = group_results[_idx]

for _idx in range(len(new_res)):
if isinstance(new_res[_idx], dist.ProcessGroup):
assert (
new_res[_idx].size() == result[_idx].size()
), f"rank{rank}: assert failed, mode={new_res[-1]}, {new_res[_idx]} != {result[_idx]}"
else:
assert (
new_res[_idx] == result[_idx]
), f"rank{rank}: assert failed, mode={new_res[-1]}, idx={_idx}, {new_res} != {result}"

if rank == 0:
print(f"{result[-1]} assert passed", flush=True)

old_group_results = self._old_process_group_allocation(parallel_config, initializer_args)
for res in old_group_results:
_check_helper(res)

# register process groups
for result in group_results:
self._register_dist(*result)

# FIXME: remove it after unit-test.
exit(-1)

# FIXME: remove it after unit-test.
def _old_process_group_allocation(self, parallel_config, initializer_args):
# run initialization of different process groups
initializers, group_results = [], []

if "gqa" in parallel_config and parallel_config["gqa"] is True:
initializers.append(pgroup_initializer.Initializer_GQA(*initializer_args))
initializers.append(pgroup_initializer.Initializer_Weight(*initializer_args))
initializers.append(pgroup_initializer.Initializer_Weight_Data(*initializer_args))
initializers.append(pgroup_initializer.Initializer_Tensor(*initializer_args))
initializers.append(pgroup_initializer.Initializer_Data(*initializer_args))
initializers.append(pgroup_initializer.Initializer_ISP_Data(*initializer_args)) # FIXME: ???
if (
isinstance(parallel_config["tensor"], dict)
and parallel_config["tensor"]["mode"] == TensorParallelMode.isp.name
):
initializers.append(pgroup_initializer.Initializer_Zero1_ISP(*initializer_args))
else:
initializers.append(pgroup_initializer.Initializer_Zero1(*initializer_args))
if isinstance(parallel_config["zero1"], dict) and parallel_config["zero1"].get("fsdp", False):
initializers.append(pgroup_initializer.Initializer_Zero3_dp(*initializer_args))
initializers.append(pgroup_initializer.Initializer_Nettest(*initializer_args))
if self.pipeline_parallel_size > 1:
initializers.append(pgroup_initializer.Initializer_Pipeline(*initializer_args))
if self.config.model.get("num_experts", 1) > 1:
if isinstance(parallel_config["tensor"], dict) and parallel_config["tensor"]["mode"] == "isp":
initializers.append(pgroup_initializer.Initializer_Expert_Weight_Data(*initializer_args))
else:
initializers.append(pgroup_initializer.Initializer_Expert_Data(*initializer_args))
if parallel_config.sequence_2D.get("enable", False) is True:
initializers.append(pgroup_initializer.Initializer_2D_SEQUENCE_PARALLEL(*initializer_args))

for initializer in initializers:
parallel_setting = initializer.init_dist_group()
if isinstance(parallel_setting, list):
group_results.extend(parallel_setting)
else:
group_results.append(parallel_setting)

return group_results

def is_initialized(self, parallel_mode: ParallelMode):
"""Returns a boolean value indicating whether `parallel_mode` is initialized
in the current system.
Expand Down
20 changes: 17 additions & 3 deletions internlm/core/context/process_group_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@

import torch.distributed as dist

from internlm.utils.logger import get_logger
from internlm.utils.timeout import LLM_NCCL_TIMEOUT

logger = get_logger(__file__)


# parallel modes
class ParallelMode(Enum):
Expand Down Expand Up @@ -84,6 +87,7 @@ class ParallelMode(Enum):


class GroupConfig:
"""config for initialze a process group"""

def __init__(
self,
Expand Down Expand Up @@ -175,10 +179,16 @@ def _create_parallel_process_groups(
pre_group_size = pre_group_size * group.size
continue

group_ranks, accelerator_group = None, None
all_group_ranks = get_group_ranks(global_ranks_or_sizes, group.size, pre_group_size, group.allow_partial_group)
group_ranks = [_gr for _gr in all_group_ranks if self_rank in _gr][0]

accelerator_group = dist.new_group(group_ranks, timeout=LLM_NCCL_TIMEOUT)
for idx, ranks in enumerate(all_group_ranks):
_pg = dist.new_group(ranks, timeout=LLM_NCCL_TIMEOUT)
if self_rank in ranks:
group_ranks, accelerator_group = all_group_ranks[idx], _pg
else:
dist.destroy_process_group(_pg)

cpu_group = init_cpu_group(accelerator_group, group_ranks, with_cpu_group)

group_results.append(
Expand Down Expand Up @@ -327,7 +337,11 @@ def generate_2d_attn_process_group(
assert world_size % parallel_sizes[ParallelMode.SEQUENCE] == 0

if config.window_size >= 8 or config.window_size == config.context_size:
# TODO: warning
logger.warning("interleaved is forced False when window size > 8 or equals context size.")
config.interleaved = False

if config.device_placement_strategy.head_first and config.head_size > 1:
logger.warning("interleaved is forced False when head_first is True and head size > 1.")
config.interleaved = False

group_results = []
Expand Down

0 comments on commit c87dc8f

Please sign in to comment.