Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT/IB/MLX5/DC: hybrid dcs fixes - pending queue #10243

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/uct/ib/mlx5/dc/dc_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ static ucs_status_t uct_dc_mlx5_iface_query(uct_iface_h tl_iface, uct_iface_attr

/* Error handling is not supported with random dci policy
* TODO: Fix */
if (uct_dc_mlx5_iface_is_policy_shared(iface)) {
if (uct_dc_mlx5_iface_is_policy_shared(iface) ||
uct_dc_mlx5_iface_is_hybrid(iface)) {
iface_attr->cap.flags &= ~(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE |
UCT_IFACE_FLAG_ERRHANDLE_ZCOPY_BUF |
UCT_IFACE_FLAG_ERRHANDLE_REMOTE_MEM);
Expand Down Expand Up @@ -1180,7 +1181,8 @@ static void uct_dc_mlx5_iface_cleanup_fc_ep(uct_dc_mlx5_iface_t *iface)
goto out;
}

if (uct_dc_mlx5_iface_is_policy_shared(iface)) {
if (uct_dc_mlx5_iface_is_policy_shared(iface) ||
uct_dc_mlx5_is_hw_dci(iface, fc_ep->dci)) {
txqp = &fc_dci->txqp;
ucs_queue_for_each_safe(op, iter, &txqp->outstanding, queue) {
if (op->handler == uct_dc_mlx5_ep_fc_pure_grant_send_completion) {
Expand Down
7 changes: 3 additions & 4 deletions src/uct/ib/mlx5/dc/dc_mlx5.inl
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,12 @@ uct_dc_mlx5_ep_pending_common(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep,
UCS_STATIC_ASSERT(sizeof(uct_dc_mlx5_pending_req_priv) <=
UCT_PENDING_REQ_PRIV_LEN);

if (uct_dc_mlx5_iface_is_policy_shared(iface)) {
if (ep->dci != UCT_DC_MLX5_EP_NO_DCI) {
uct_dc_mlx5_pending_req_priv(r)->ep = ep;
group = uct_dc_mlx5_ep_rand_arb_group(iface, ep);
} else {
group = &ep->arb_group;
}

group = uct_dc_mlx5_ep_arb_group(iface, ep);

if (push_to_head) {
uct_pending_req_arb_group_push_head(group, r);
} else {
Expand Down
32 changes: 23 additions & 9 deletions src/uct/ib/mlx5/dc/dc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1460,7 +1460,8 @@ unsigned uct_dc_mlx5_ep_dci_release_progress(void *arg)
dci_pool = &iface->tx.dci_pool[pool_index];
while (dci_pool->release_stack_top >= 0) {
dci = ucs_array_elem(&dci_pool->stack, dci_pool->release_stack_top--);
ucs_assert(dci < iface->tx.ndci * iface->tx.num_dci_pools);
ucs_assert((dci - uct_dc_mlx5_iface_is_hybrid(iface)) <
iface->tx.ndci * iface->tx.num_dci_pools);
uct_dc_mlx5_iface_dci_release(iface, dci);
}

Expand All @@ -1476,6 +1477,21 @@ unsigned uct_dc_mlx5_ep_dci_release_progress(void *arg)
return 1;
}

static ucs_arbiter_cb_result_t
uct_dc_mlx5_iface_shared_dci_pending_resched(ucs_arbiter_cb_result_t res,
uct_dc_mlx5_iface_t *iface,
uct_dc_mlx5_ep_t *ep)
{
if ((res == UCS_ARBITER_CB_RESULT_DESCHED_GROUP) &&
uct_rc_fc_has_resources(&iface->super.super, &ep->fc)) {
/* We can't desched group with a shared dci if non FC resources are
* missing, since it's never scheduled again. */
res = UCS_ARBITER_CB_RESULT_RESCHED_GROUP;
}

return res;
}

/**
* dispatch requests waiting for tx resources (dcs* DCI policies)
*/
Expand All @@ -1494,6 +1510,11 @@ uct_dc_mlx5_iface_dci_do_dcs_pending_tx(ucs_arbiter_t *arbiter,
ucs_arbiter_cb_result_t res;

res = uct_dc_mlx5_iface_dci_do_common_pending_tx(ep, elem);

if (uct_dc_mlx5_is_hw_dci(iface, ep->dci)) {
return uct_dc_mlx5_iface_shared_dci_pending_resched(res, iface, ep);
}

if ((res != UCS_ARBITER_CB_RESULT_REMOVE_ELEM) || !is_only) {
return res;
}
Expand Down Expand Up @@ -1522,14 +1543,7 @@ uct_dc_mlx5_iface_dci_do_rand_pending_tx(ucs_arbiter_t *arbiter,
ucs_arbiter_cb_result_t res;

res = uct_dc_mlx5_iface_dci_do_common_pending_tx(ep, elem);
if ((res == UCS_ARBITER_CB_RESULT_DESCHED_GROUP) &&
uct_rc_fc_has_resources(&iface->super.super, &ep->fc)) {
/* We can't desched group with rand policy if non FC resources are
* missing, since it's never scheduled again. */
res = UCS_ARBITER_CB_RESULT_RESCHED_GROUP;
}

return res;
return uct_dc_mlx5_iface_shared_dci_pending_resched(res, iface, ep);
}

static ucs_arbiter_cb_result_t
Expand Down
66 changes: 37 additions & 29 deletions src/uct/ib/mlx5/dc/dc_mlx5_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ uct_dc_mlx5_iface_is_hybrid(const uct_dc_mlx5_iface_t *iface)
return iface->tx.policy == UCT_DC_TX_POLICY_DCS_HYBRID;
}

static UCS_F_ALWAYS_INLINE int
uct_dc_mlx5_is_hw_dci(const uct_dc_mlx5_iface_t *iface, uint8_t dci)
{
return dci == iface->tx.hybrid_hw_dci;
}

static UCS_F_ALWAYS_INLINE int
uct_dc_mlx5_is_dci_shared(uct_dc_mlx5_iface_t *iface, uint8_t dci)
{
return uct_dc_mlx5_iface_dci(iface, dci)->flags & UCT_DC_DCI_FLAG_SHARED;
}

static UCS_F_ALWAYS_INLINE ucs_arbiter_group_t*
uct_dc_mlx5_ep_rand_arb_group(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)
{
Expand All @@ -292,19 +304,18 @@ uct_dc_mlx5_ep_rand_arb_group(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)
static UCS_F_ALWAYS_INLINE ucs_arbiter_group_t*
uct_dc_mlx5_ep_arb_group(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)
{
return (uct_dc_mlx5_iface_is_policy_shared(iface)) ?
uct_dc_mlx5_ep_rand_arb_group(iface, ep) : &ep->arb_group;
return uct_dc_mlx5_iface_is_policy_shared(iface) ?
uct_dc_mlx5_ep_rand_arb_group(iface, ep) :
&ep->arb_group;
}

static UCS_F_ALWAYS_INLINE void
uct_dc_mlx5_iface_dci_sched_tx(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)
{
if (uct_dc_mlx5_iface_is_policy_shared(iface)) {
ucs_arbiter_group_schedule(uct_dc_mlx5_iface_tx_waitq(iface),
uct_dc_mlx5_ep_rand_arb_group(iface, ep));
} else if (uct_dc_mlx5_iface_dci_has_tx_resources(iface, ep->dci)) {
if (uct_dc_mlx5_iface_dci_has_tx_resources(iface, ep->dci) ||
uct_dc_mlx5_is_dci_shared(iface, ep->dci)) {
ucs_arbiter_group_schedule(uct_dc_mlx5_iface_tx_waitq(iface),
&ep->arb_group);
uct_dc_mlx5_ep_arb_group(iface, ep));
}
}

Expand All @@ -313,7 +324,7 @@ uct_dc_mlx5_ep_from_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci_index)
{
/* Can be used with dcs* policies only, with rand policy every dci may
* be used by many eps */
ucs_assert(!uct_dc_mlx5_iface_is_policy_shared(iface));
ucs_assert(!uct_dc_mlx5_is_dci_shared(iface, dci_index));
return uct_dc_mlx5_iface_dci(iface, dci_index)->ep;
}

Expand All @@ -329,18 +340,6 @@ uct_dc_mlx5_init_dci_config(uct_dc_mlx5_dci_config_t *dci_config,
dci_config->max_rd_atomic = max_rd_atomic;
}

static UCS_F_ALWAYS_INLINE int
uct_dc_mlx5_is_hw_dci(const uct_dc_mlx5_iface_t *iface, uint8_t dci)
{
return dci == iface->tx.hybrid_hw_dci;
}

static UCS_F_ALWAYS_INLINE int
uct_dc_mlx5_is_dci_shared(uct_dc_mlx5_iface_t *iface, uint8_t dci)
{
return uct_dc_mlx5_iface_dci(iface, dci)->flags & UCT_DC_DCI_FLAG_SHARED;
}

ucs_status_t static UCS_F_ALWAYS_INLINE
uct_dc_mlx5_dci_pool_init_dci(uct_dc_mlx5_iface_t *iface, uint8_t pool_index,
uint8_t dci_index)
Expand Down Expand Up @@ -564,7 +563,7 @@ uct_dc_mlx5_iface_dci_release(uct_dc_mlx5_iface_t *iface, uint8_t dci_index)

ucs_trace_data("iface %p: release dci %d from ep %p", iface, dci_index,
uct_dc_mlx5_ep_from_dci(iface, dci_index));

ucs_assert(!uct_dc_mlx5_is_hw_dci(iface, dci_index));
uct_dc_mlx5_iface_dci(iface, dci_index)->ep = NULL;
pool->stack_top--;
ucs_assertv(pool->stack_top >= 0, "dci pool underflow, stack_top=%d",
Expand All @@ -584,7 +583,8 @@ uct_dc_mlx5_iface_dci_put(uct_dc_mlx5_iface_t *iface, uint8_t dci_index)

ucs_assert(dci_index != UCT_DC_MLX5_EP_NO_DCI);

if (uct_dc_mlx5_iface_is_policy_shared(iface)) {
if (uct_dc_mlx5_iface_is_policy_shared(iface) ||
uct_dc_mlx5_is_hw_dci(iface, dci_index)) {
return;
}

Expand Down Expand Up @@ -615,7 +615,8 @@ uct_dc_mlx5_iface_dci_put(uct_dc_mlx5_iface_t *iface, uint8_t dci_index)
ep->flags &= ~UCT_DC_MLX5_EP_FLAG_TX_WAIT;
}
}
ucs_arbiter_group_schedule(uct_dc_mlx5_iface_tx_waitq(iface), &ep->arb_group);
ucs_arbiter_group_schedule(uct_dc_mlx5_iface_tx_waitq(iface),
&ep->arb_group);
return;
}

Expand Down Expand Up @@ -690,13 +691,20 @@ uct_dc_mlx5_iface_dci_detach(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)

ucs_assert(!uct_dc_mlx5_iface_is_policy_shared(iface));
ucs_assert(dci_index != UCT_DC_MLX5_EP_NO_DCI);
ucs_assert(iface->tx.dci_pool[uct_dc_mlx5_ep_pool_index(ep)].stack_top > 0);

if (uct_dc_mlx5_iface_dci_has_outstanding(iface, dci_index) ||
uct_dc_mlx5_is_hw_dci(iface, dci_index)) {

if (uct_dc_mlx5_is_hw_dci(iface, dci_index)) {
ep->dci = UCT_DC_MLX5_EP_NO_DCI;
ep->flags &= ~UCT_DC_MLX5_EP_FLAG_TX_WAIT;
return 1;
}

if (uct_dc_mlx5_iface_dci_has_outstanding(iface, dci_index)) {
return 0;
}

ucs_assert(iface->tx.dci_pool[uct_dc_mlx5_ep_pool_index(ep)].stack_top > 0);

ep->dci = UCT_DC_MLX5_EP_NO_DCI;
ep->flags &= ~UCT_DC_MLX5_EP_FLAG_TX_WAIT;

Expand Down Expand Up @@ -756,8 +764,8 @@ uct_dc_mlx5_iface_dci_get(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)
}

/* if dci has sent more than quota, and there are eps waiting for dci
* allocation ep goes into tx_wait state.
*/
* allocation ep goes into tx_wait state.
*/
txqp = &dci->txqp;
available = uct_rc_txqp_available(txqp);
waitq = uct_dc_mlx5_iface_dci_waitq(iface, pool_index);
Expand Down Expand Up @@ -857,7 +865,7 @@ static inline struct mlx5_grh_av *uct_dc_mlx5_ep_get_grh(uct_dc_mlx5_ep_t *ep)
return _status; \
} \
} \
if (!uct_dc_mlx5_iface_is_policy_shared(_iface)) { \
if (!uct_dc_mlx5_is_dci_shared(_iface, _ep->dci)) { \
uct_rc_iface_check_pending(&(_iface)->super.super, \
&(_ep)->arb_group); \
} \
Expand Down
25 changes: 20 additions & 5 deletions test/gtest/uct/ib/test_dc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,14 @@ UCS_TEST_P(test_dc, dcs_single) {
ucs_status_t status;
uct_dc_mlx5_ep_t *ep;
uct_dc_mlx5_iface_t *iface;
uint16_t first_allocated_dci;

m_e1->connect_to_iface(0, *m_e2);
ep = dc_ep(m_e1, 0);
iface = dc_iface(m_e1);
first_allocated_dci = uct_dc_mlx5_iface_is_hybrid(dc_iface(m_e1)) ? 1 : 0;


EXPECT_EQ(UCT_DC_MLX5_EP_NO_DCI, ep->dci);
status = uct_ep_am_short(m_e1->ep(0), 0, 0, NULL, 0);
EXPECT_UCS_OK(status);
Expand All @@ -182,7 +186,8 @@ UCS_TEST_P(test_dc, dcs_single) {
/* after the flush dci must be released */
EXPECT_EQ(UCT_DC_MLX5_EP_NO_DCI, ep->dci);
EXPECT_EQ(0, iface->tx.dci_pool[0].stack_top);
EXPECT_EQ(0, ucs_array_elem(&iface->tx.dci_pool[0].stack, 0));
EXPECT_EQ(first_allocated_dci,
ucs_array_elem(&iface->tx.dci_pool[0].stack, 0));
}

UCS_TEST_P(test_dc, dcs_multi) {
Expand All @@ -209,8 +214,10 @@ UCS_TEST_P(test_dc, dcs_multi) {
}

/* this should fail because there are no free dci */
status = uct_ep_am_short(m_e1->ep(i), 0, 0, NULL, 0);
EXPECT_EQ(UCS_ERR_NO_RESOURCE, status);
if (!uct_dc_mlx5_iface_is_hybrid(iface)) {
status = uct_ep_am_short(m_e1->ep(i), 0, 0, NULL, 0);
EXPECT_EQ(UCS_ERR_NO_RESOURCE, status);
}

flush();

Expand Down Expand Up @@ -296,6 +303,10 @@ UCS_TEST_P(test_dc, dcs_ep_flush_pending, "DC_NUM_DCI=1") {

iface = dc_iface(m_e1);

if (uct_dc_mlx5_iface_is_hybrid(iface)) {
UCS_TEST_SKIP_R("hybrid DCI policy does not support pending flush requests while ndci = 1");
}

/* shorten test time by reducing dci QP resources */
ucs_array_elem(&iface->tx.dcis, 0).txqp.available = 8;
do {
Expand Down Expand Up @@ -345,6 +356,10 @@ UCS_TEST_P(test_dc, dcs_ep_purge_pending, "DC_NUM_DCI=1") {
m_e1->connect_to_iface(0, *m_e2);

iface = dc_iface(m_e1);
if (uct_dc_mlx5_iface_is_hybrid(iface)) {
UCS_TEST_SKIP_R("hybrid DCI policy does not support pending purge requests while ndci = 1");
}

ep = dc_ep(m_e1, 0);
ucs_array_elem(&iface->tx.dcis, 0).txqp.available = 8;

Expand Down Expand Up @@ -499,8 +514,8 @@ class test_dc_flow_control : public test_rc_flow_control {

if (all_resources.empty()) {
std::vector<uct_dc_tx_policy_t> policies =
{UCT_DC_TX_POLICY_DCS_QUOTA, UCT_DC_TX_POLICY_RAND,
UCT_DC_TX_POLICY_HW_DCS};
{UCT_DC_TX_POLICY_DCS_QUOTA, UCT_DC_TX_POLICY_DCS_HYBRID,
UCT_DC_TX_POLICY_RAND, UCT_DC_TX_POLICY_HW_DCS};
for (auto policy : policies) {
for (const auto &elem : resources) {
struct resource rsc = *elem;
Expand Down
Loading