Skip to content

Commit

Permalink
UCT/IB/MLX5/DC: hybrid dcs fixes - pending queue
Browse files Browse the repository at this point in the history
  • Loading branch information
roiedanino committed Oct 21, 2024
1 parent 6aacca7 commit 72788d5
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 45 deletions.
7 changes: 4 additions & 3 deletions src/uct/ib/mlx5/dc/dc_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ uct_dc_mlx5_poll_tx(uct_dc_mlx5_iface_t *iface, int poll_flags)
txwq = &dci->txwq;
hw_ci = ntohs(cqe->wqe_counter);

ucs_trace_poll("dc iface %p tx_cqe: dci[%d] txqp %p hw_ci %d",
iface, dci_index, txqp, hw_ci);
ucs_trace_poll("dc iface %p tx_cqe: dci[%d] txqp %p hw_ci %d", iface,
dci_index, txqp, hw_ci);

uct_rc_mlx5_txqp_process_tx_cqe(txqp, cqe, hw_ci);
uct_dc_mlx5_update_tx_res(iface, txwq, txqp, hw_ci);
Expand Down Expand Up @@ -1180,7 +1180,8 @@ static void uct_dc_mlx5_iface_cleanup_fc_ep(uct_dc_mlx5_iface_t *iface)
goto out;
}

if (uct_dc_mlx5_iface_is_policy_shared(iface)) {
if (uct_dc_mlx5_iface_is_policy_shared(iface) ||
uct_dc_mlx5_is_hw_dci(iface, fc_ep->dci)) {
txqp = &fc_dci->txqp;
ucs_queue_for_each_safe(op, iter, &txqp->outstanding, queue) {
if (op->handler == uct_dc_mlx5_ep_fc_pure_grant_send_completion) {
Expand Down
7 changes: 3 additions & 4 deletions src/uct/ib/mlx5/dc/dc_mlx5.inl
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,12 @@ uct_dc_mlx5_ep_pending_common(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep,
UCS_STATIC_ASSERT(sizeof(uct_dc_mlx5_pending_req_priv) <=
UCT_PENDING_REQ_PRIV_LEN);

if (uct_dc_mlx5_iface_is_policy_shared(iface)) {
if (ep->dci != UCT_DC_MLX5_EP_NO_DCI) {
uct_dc_mlx5_pending_req_priv(r)->ep = ep;
group = uct_dc_mlx5_ep_rand_arb_group(iface, ep);
} else {
group = &ep->arb_group;
}

group = uct_dc_mlx5_ep_arb_group(iface, ep);

if (push_to_head) {
uct_pending_req_arb_group_push_head(group, r);
} else {
Expand Down
34 changes: 24 additions & 10 deletions src/uct/ib/mlx5/dc/dc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1460,7 +1460,8 @@ unsigned uct_dc_mlx5_ep_dci_release_progress(void *arg)
dci_pool = &iface->tx.dci_pool[pool_index];
while (dci_pool->release_stack_top >= 0) {
dci = ucs_array_elem(&dci_pool->stack, dci_pool->release_stack_top--);
ucs_assert(dci < iface->tx.ndci * iface->tx.num_dci_pools);
ucs_assert((dci - uct_dc_mlx5_iface_is_hybrid(iface)) <
iface->tx.ndci * iface->tx.num_dci_pools);
uct_dc_mlx5_iface_dci_release(iface, dci);
}

Expand All @@ -1476,6 +1477,21 @@ unsigned uct_dc_mlx5_ep_dci_release_progress(void *arg)
return 1;
}

static ucs_arbiter_cb_result_t
uct_dc_mlx5_iface_shared_dci_pending_resched(ucs_arbiter_cb_result_t res,
uct_dc_mlx5_iface_t *iface,
uct_dc_mlx5_ep_t *ep)
{
if ((res == UCS_ARBITER_CB_RESULT_DESCHED_GROUP) &&
uct_rc_fc_has_resources(&iface->super.super, &ep->fc)) {
/* We can't desched group with a shared dci if non FC resources are
* missing, since it's never scheduled again. */
res = UCS_ARBITER_CB_RESULT_RESCHED_GROUP;
}

return res;
}

/**
* dispatch requests waiting for tx resources (dcs* DCI policies)
*/
Expand All @@ -1494,6 +1510,11 @@ uct_dc_mlx5_iface_dci_do_dcs_pending_tx(ucs_arbiter_t *arbiter,
ucs_arbiter_cb_result_t res;

res = uct_dc_mlx5_iface_dci_do_common_pending_tx(ep, elem);

if (uct_dc_mlx5_is_hw_dci(iface, ep->dci)) {
return uct_dc_mlx5_iface_shared_dci_pending_resched(res, iface, ep);
}

if ((res != UCS_ARBITER_CB_RESULT_REMOVE_ELEM) || !is_only) {
return res;
}
Expand Down Expand Up @@ -1522,14 +1543,7 @@ uct_dc_mlx5_iface_dci_do_rand_pending_tx(ucs_arbiter_t *arbiter,
ucs_arbiter_cb_result_t res;

res = uct_dc_mlx5_iface_dci_do_common_pending_tx(ep, elem);
if ((res == UCS_ARBITER_CB_RESULT_DESCHED_GROUP) &&
uct_rc_fc_has_resources(&iface->super.super, &ep->fc)) {
/* We can't desched group with rand policy if non FC resources are
* missing, since it's never scheduled again. */
res = UCS_ARBITER_CB_RESULT_RESCHED_GROUP;
}

return res;
return uct_dc_mlx5_iface_shared_dci_pending_resched(res, iface, ep);
}

static ucs_arbiter_cb_result_t
Expand All @@ -1545,7 +1559,7 @@ uct_dc_mlx5_ep_arbiter_purge_cb(ucs_arbiter_t *arbiter, ucs_arbiter_group_t *gro
priv);
uct_rc_pending_req_t *freq;

if (uct_dc_mlx5_iface_is_policy_shared(iface) &&
if ((uct_dc_mlx5_is_dci_shared(iface, ep->dci)) &&
(uct_dc_mlx5_pending_req_priv(req)->ep != ep)) {
/* Element belongs to another ep - do not remove it */
return UCS_ARBITER_CB_RESULT_NEXT_GROUP;
Expand Down
64 changes: 36 additions & 28 deletions src/uct/ib/mlx5/dc/dc_mlx5_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ uct_dc_mlx5_iface_is_hybrid(const uct_dc_mlx5_iface_t *iface)
return iface->tx.policy == UCT_DC_TX_POLICY_DCS_HYBRID;
}

static UCS_F_ALWAYS_INLINE int
uct_dc_mlx5_is_hw_dci(const uct_dc_mlx5_iface_t *iface, uint8_t dci)
{
return dci == iface->tx.hybrid_hw_dci;
}

static UCS_F_ALWAYS_INLINE int
uct_dc_mlx5_is_dci_shared(uct_dc_mlx5_iface_t *iface, uint8_t dci)
{
return uct_dc_mlx5_iface_dci(iface, dci)->flags & UCT_DC_DCI_FLAG_SHARED;
}

static UCS_F_ALWAYS_INLINE ucs_arbiter_group_t*
uct_dc_mlx5_ep_rand_arb_group(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)
{
Expand All @@ -292,19 +304,18 @@ uct_dc_mlx5_ep_rand_arb_group(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)
static UCS_F_ALWAYS_INLINE ucs_arbiter_group_t*
uct_dc_mlx5_ep_arb_group(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)
{
return (uct_dc_mlx5_iface_is_policy_shared(iface)) ?
uct_dc_mlx5_ep_rand_arb_group(iface, ep) : &ep->arb_group;
return uct_dc_mlx5_iface_is_policy_shared(iface) ?
uct_dc_mlx5_ep_rand_arb_group(iface, ep) :
&ep->arb_group;
}

static UCS_F_ALWAYS_INLINE void
uct_dc_mlx5_iface_dci_sched_tx(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)
{
if (uct_dc_mlx5_iface_is_policy_shared(iface)) {
ucs_arbiter_group_schedule(uct_dc_mlx5_iface_tx_waitq(iface),
uct_dc_mlx5_ep_rand_arb_group(iface, ep));
} else if (uct_dc_mlx5_iface_dci_has_tx_resources(iface, ep->dci)) {
if (uct_dc_mlx5_iface_dci_has_tx_resources(iface, ep->dci) ||
uct_dc_mlx5_is_dci_shared(iface, ep->dci)) {
ucs_arbiter_group_schedule(uct_dc_mlx5_iface_tx_waitq(iface),
&ep->arb_group);
uct_dc_mlx5_ep_arb_group(iface, ep));
}
}

Expand All @@ -313,7 +324,7 @@ uct_dc_mlx5_ep_from_dci(uct_dc_mlx5_iface_t *iface, uint8_t dci_index)
{
/* Can be used with dcs* policies only, with rand policy every dci may
* be used by many eps */
ucs_assert(!uct_dc_mlx5_iface_is_policy_shared(iface));
ucs_assert(!uct_dc_mlx5_is_dci_shared(iface, dci_index));
return uct_dc_mlx5_iface_dci(iface, dci_index)->ep;
}

Expand All @@ -329,18 +340,6 @@ uct_dc_mlx5_init_dci_config(uct_dc_mlx5_dci_config_t *dci_config,
dci_config->max_rd_atomic = max_rd_atomic;
}

static UCS_F_ALWAYS_INLINE int
uct_dc_mlx5_is_hw_dci(const uct_dc_mlx5_iface_t *iface, uint8_t dci)
{
return dci == iface->tx.hybrid_hw_dci;
}

static UCS_F_ALWAYS_INLINE int
uct_dc_mlx5_is_dci_shared(uct_dc_mlx5_iface_t *iface, uint8_t dci)
{
return uct_dc_mlx5_iface_dci(iface, dci)->flags & UCT_DC_DCI_FLAG_SHARED;
}

ucs_status_t static UCS_F_ALWAYS_INLINE
uct_dc_mlx5_dci_pool_init_dci(uct_dc_mlx5_iface_t *iface, uint8_t pool_index,
uint8_t dci_index)
Expand Down Expand Up @@ -564,7 +563,7 @@ uct_dc_mlx5_iface_dci_release(uct_dc_mlx5_iface_t *iface, uint8_t dci_index)

ucs_trace_data("iface %p: release dci %d from ep %p", iface, dci_index,
uct_dc_mlx5_ep_from_dci(iface, dci_index));

ucs_assert(!uct_dc_mlx5_is_hw_dci(iface, dci_index));
uct_dc_mlx5_iface_dci(iface, dci_index)->ep = NULL;
pool->stack_top--;
ucs_assertv(pool->stack_top >= 0, "dci pool underflow, stack_top=%d",
Expand All @@ -584,7 +583,8 @@ uct_dc_mlx5_iface_dci_put(uct_dc_mlx5_iface_t *iface, uint8_t dci_index)

ucs_assert(dci_index != UCT_DC_MLX5_EP_NO_DCI);

if (uct_dc_mlx5_iface_is_policy_shared(iface)) {
if (uct_dc_mlx5_iface_is_policy_shared(iface) ||
uct_dc_mlx5_is_hw_dci(iface, dci_index)) {
return;
}

Expand Down Expand Up @@ -615,7 +615,8 @@ uct_dc_mlx5_iface_dci_put(uct_dc_mlx5_iface_t *iface, uint8_t dci_index)
ep->flags &= ~UCT_DC_MLX5_EP_FLAG_TX_WAIT;
}
}
ucs_arbiter_group_schedule(uct_dc_mlx5_iface_tx_waitq(iface), &ep->arb_group);
ucs_arbiter_group_schedule(uct_dc_mlx5_iface_tx_waitq(iface),
&ep->arb_group);
return;
}

Expand Down Expand Up @@ -690,13 +691,20 @@ uct_dc_mlx5_iface_dci_detach(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)

ucs_assert(!uct_dc_mlx5_iface_is_policy_shared(iface));
ucs_assert(dci_index != UCT_DC_MLX5_EP_NO_DCI);
ucs_assert(iface->tx.dci_pool[uct_dc_mlx5_ep_pool_index(ep)].stack_top > 0);

if (uct_dc_mlx5_iface_dci_has_outstanding(iface, dci_index) ||
uct_dc_mlx5_is_hw_dci(iface, dci_index)) {

if (uct_dc_mlx5_is_hw_dci(iface, dci_index)) {
ep->dci = UCT_DC_MLX5_EP_NO_DCI;
ep->flags &= ~UCT_DC_MLX5_EP_FLAG_TX_WAIT;
return 1;
}

if (uct_dc_mlx5_iface_dci_has_outstanding(iface, dci_index)) {
return 0;
}

ucs_assert(iface->tx.dci_pool[uct_dc_mlx5_ep_pool_index(ep)].stack_top > 0);

ep->dci = UCT_DC_MLX5_EP_NO_DCI;
ep->flags &= ~UCT_DC_MLX5_EP_FLAG_TX_WAIT;

Expand Down Expand Up @@ -756,8 +764,8 @@ uct_dc_mlx5_iface_dci_get(uct_dc_mlx5_iface_t *iface, uct_dc_mlx5_ep_t *ep)
}

/* if dci has sent more than quota, and there are eps waiting for dci
* allocation ep goes into tx_wait state.
*/
* allocation ep goes into tx_wait state.
*/
txqp = &dci->txqp;
available = uct_rc_txqp_available(txqp);
waitq = uct_dc_mlx5_iface_dci_waitq(iface, pool_index);
Expand Down

0 comments on commit 72788d5

Please sign in to comment.