Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/PROTO: Fix RNDV_SCHEME logic #10230

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 8 additions & 18 deletions src/ucp/rndv/proto_rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,24 +395,14 @@ static void ucp_proto_rndv_ctrl_variant_probe(
/* Set priority and threshold for this variant */
cfg_thresh = params->super.cfg_thresh;
cfg_priority = params->super.cfg_priority;
if ((remote_proto->cfg_thresh != UCS_MEMUNITS_INF) &&
(remote_proto->cfg_thresh != UCS_MEMUNITS_AUTO)) {
/* Consider remote priority and threshold only if RNDV_SCHEME or
* BCOPY/ZCOPY thresh are set to force these settings */
cfg_priority = remote_proto->cfg_priority;
cfg_thresh = (cfg_thresh == UCS_MEMUNITS_AUTO) ?
remote_proto->cfg_thresh :
ucs_max(cfg_thresh, remote_proto->cfg_thresh);
}

/* Remote variants priorities are used to respect RNDV_SCHEME setting
* so they should contain value greater than CTRL message `cfg_thresh`.
* Equality is allowed for RTR remote variants.
*/
ucs_assertv(params->super.cfg_priority <= remote_proto->cfg_priority,
"remote_proto=%s params->super.cfg_priority=%u "
"remote_proto->cfg_priority=%u", variant_name,
params->super.cfg_priority, remote_proto->cfg_priority);
if (remote_proto->cfg_thresh != UCS_MEMUNITS_AUTO) {
/* If RNDV_SCHEME is set, all protocols except forced one report INF */
ucs_assertv(remote_proto->cfg_thresh == UCS_MEMUNITS_INF,
"variant_name=%s remote_proto->cfg_thresh=%zu",
variant_name, remote_proto->cfg_thresh);
cfg_thresh = remote_proto->cfg_thresh;
}

ucp_proto_select_add_proto(&params->super.super, cfg_thresh, cfg_priority,
perf, rpriv, priv_size);

Expand Down
10 changes: 4 additions & 6 deletions src/ucp/rndv/proto_rndv.inl
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@
static UCS_F_ALWAYS_INLINE size_t
ucp_proto_rndv_cfg_thresh(ucp_context_h context, uint64_t rndv_modes)
{
ucp_rndv_mode_t mode = context->config.ext.rndv_mode;
ucs_assert(!(rndv_modes & UCS_BIT(UCP_RNDV_MODE_AUTO)));

if (context->config.ext.rndv_mode == UCP_RNDV_MODE_AUTO) {
return UCS_MEMUNITS_AUTO; /* automatic threshold */
} else if (rndv_modes & UCS_BIT(context->config.ext.rndv_mode)) {
return 0; /* enabled by default */
} else {
return UCS_MEMUNITS_INF; /* used only as last resort */
if ((mode == UCP_RNDV_MODE_AUTO) || (rndv_modes & UCS_BIT(mode))) {
return UCS_MEMUNITS_AUTO;
}
return UCS_MEMUNITS_INF; /* used only as last resort */
}

static UCS_F_ALWAYS_INLINE ucs_status_t
Expand Down
23 changes: 20 additions & 3 deletions test/gtest/ucp/test_ucp_am.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1627,15 +1627,19 @@ class test_ucp_am_nbx_rndv : public test_ucp_am_nbx_prereg {
void *data, size_t length,
const ucp_am_recv_param_t *rx_param)
{
EXPECT_TRUE(rx_param->recv_attr & UCP_AM_RECV_ATTR_FLAG_RNDV);
EXPECT_FALSE(rx_param->recv_attr & UCP_AM_RECV_ATTR_FLAG_DATA);

ucs_status_t status = test_ucp_am_nbx::am_data_handler(header,
header_length,
data, length,
rx_param);
EXPECT_FALSE(UCS_STATUS_IS_ERR(status));

if (!m_check_recv_rndv_flags) {
return status;
}

EXPECT_TRUE(rx_param->recv_attr & UCP_AM_RECV_ATTR_FLAG_RNDV);
EXPECT_FALSE(rx_param->recv_attr & UCP_AM_RECV_ATTR_FLAG_DATA);

return UCS_INPROGRESS;
}

Expand Down Expand Up @@ -1732,8 +1736,17 @@ class test_ucp_am_nbx_rndv : public test_ucp_am_nbx_prereg {
return cfg->rndv_frag_size[mem_type];
}

void check_rma_support()
{
if (!sender().is_rndv_supported()) {
UCS_TEST_MESSAGE << "RNDV is not supported";
m_check_recv_rndv_flags = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? what if this is AM-based rndv?
also if rndv is not supported this test suite should be skipped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe message confused you. That case is created to handler AM-based RNDV case. AFAIR AM RNDV doesn't set recv_attr flags.

I can change message here to "RMA is not supported" would it be better from your point of view?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what flag do you mean? AM-based RNDV is using generic flow RTS->RTR->data chunks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, you are right. The thing here is that due to new logic if RNDV_SCHEME=get_zcopy and rndv/get/zcopy is unavailable eager can be selected instead of RNDV.

So we cannot rely on fact that UCP_AM_RECV_ATTR_FLAG_RNDV and UCP_AM_RECV_ATTR_FLAG_DATA will be set in am_data_handler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can skip these tests if RNDV not supported but I don't think that it is important

}
}

protected:
static constexpr unsigned RNDV_THRESH = 128;
bool m_check_recv_rndv_flags = true;
ucs_status_t m_status;
bool m_am_recv_cb_invoked;
};
Expand All @@ -1745,11 +1758,13 @@ UCS_TEST_P(test_ucp_am_nbx_rndv, rndv_auto, "RNDV_SCHEME=auto")

UCS_TEST_P(test_ucp_am_nbx_rndv, rndv_get, "RNDV_SCHEME=get_zcopy")
{
check_rma_support();
test_am_send_recv(64 * UCS_KBYTE);
}

UCS_TEST_P(test_ucp_am_nbx_rndv, rndv_put, "RNDV_SCHEME=put_zcopy")
{
check_rma_support();
test_am_send_recv(64 * UCS_KBYTE);
}

Expand Down Expand Up @@ -2084,6 +2099,8 @@ UCS_TEST_P(test_ucp_am_nbx_rndv_ppln, host_buff_host_frag,
"RNDV_FRAG_MEM_TYPE=host")
{
// Host memory should not be pipelined thru host staging buffers
m_check_recv_rndv_flags = false;

test_ppln_send(UCS_MEMORY_TYPE_HOST, 2, 0);
}

Expand Down
28 changes: 20 additions & 8 deletions test/gtest/ucp/test_ucp_sockaddr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -899,13 +899,18 @@ class test_ucp_sockaddr : public ucp_test {
return get_variant_value() & TEST_MODIFIER_SA_DATA_V1;
}

bool has_rndv_lanes(ucp_ep_h ep)
bool has_rndv_lanes(const entity &entity)
{
const auto *config = &entity.worker()->context->config.ext;
const auto &ep = entity.ep();
uint64_t iface_flags = (config->rndv_mode == UCP_RNDV_MODE_GET_ZCOPY) ?
UCT_IFACE_FLAG_GET_ZCOPY :
UCT_IFACE_FLAG_PUT_ZCOPY;

for (ucp_lane_index_t lane_idx = 0;
lane_idx < ucp_ep_num_lanes(ep); ++lane_idx) {
if ((lane_idx != ucp_ep_get_cm_lane(ep)) &&
(ucp_ep_get_iface_attr(ep, lane_idx)->cap.flags &
(UCT_IFACE_FLAG_GET_ZCOPY | UCT_IFACE_FLAG_PUT_ZCOPY)) &&
(ucp_ep_get_iface_attr(ep, lane_idx)->cap.flags & iface_flags) &&
/* RNDV lanes should be selected if transport supports GET/PUT
* Zcopy and: */
(/* - either memory invalidation can be done on its MD */
Expand Down Expand Up @@ -939,6 +944,10 @@ class test_ucp_sockaddr : public ucp_test {

listen_and_communicate(false, SEND_DIRECTION_BIDI);

if (!has_rndv_lanes(sender())) {
UCS_TEST_SKIP_R("no RNDV lanes");
}

mem_buffer send_buffer(length, UCS_MEMORY_TYPE_HOST);
send_buffer.pattern_fill(1, length);
void *sreq = send(sender(), send_buffer.ptr(), length,
Expand Down Expand Up @@ -1386,7 +1395,7 @@ UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_wireup, compare_cm_and_wireup_configs,
cm_ep_cfg_key = &ucp_ep_config(sender().ep())->key;
/* Don't check RNDV lanes, because CM prefers p2p connection mode for RNDV
* lanes and they don't support memory invalidation on MD */
should_check_rndv_lanes = !has_rndv_lanes(sender().ep());
should_check_rndv_lanes = !has_rndv_lanes(sender());
EXPECT_NE(UCP_NULL_LANE, ucp_ep_get_cm_lane(sender().ep()));
disconnect(sender());
disconnect(receiver());
Expand Down Expand Up @@ -1996,8 +2005,7 @@ UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_check_lanes, check_rndv_lanes,
{
listen_and_communicate(false, SEND_DIRECTION_BIDI);

EXPECT_EQ(has_rndv_lanes(sender().ep()),
has_rndv_lanes(receiver().ep()));
EXPECT_EQ(has_rndv_lanes(sender()), has_rndv_lanes(receiver()));

concurrent_disconnect();
}
Expand Down Expand Up @@ -3169,6 +3177,10 @@ class test_ucp_sockaddr_protocols_err_sender
send_recv(sender(), receiver(), send_recv_type(), false, cb_type(),
sender_idx);

if (!has_rndv_lanes(sender())) {
UCS_TEST_SKIP_R("no RNDV lanes");
}

for (size_t i = 0; i < num_sends; ++i) {
void *sreq = send(sender(), send_buf.ptr(), size,
SEND_RECV_TAG, send_cb, NULL, sender_idx);
Expand Down Expand Up @@ -3248,7 +3260,7 @@ UCS_TEST_P(test_ucp_sockaddr_protocols_err_sender,
{
size_t num_sends = ucs_max(100, 100000 / ucs::test_time_multiplier() /
ucs::test_time_multiplier());
do_tag_rndv_killed_sender_test(1, 128, num_sends);
do_tag_rndv_killed_sender_test(1, 1024, num_sends);
}

UCS_TEST_P(test_ucp_sockaddr_protocols_err_sender,
Expand All @@ -3257,7 +3269,7 @@ UCS_TEST_P(test_ucp_sockaddr_protocols_err_sender,
{
size_t num_sends = ucs_max(100, 100000 / ucs::test_time_multiplier() /
ucs::test_time_multiplier());
do_tag_rndv_killed_sender_test(4, 128, num_sends);
do_tag_rndv_killed_sender_test(4, 1024, num_sends);
}

UCP_INSTANTIATE_CM_TEST_CASE(test_ucp_sockaddr_protocols_err_sender)
Expand Down
6 changes: 6 additions & 0 deletions test/gtest/ucp/ucp_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,12 @@ bool ucp_test_base::entity::is_rndv_put_ppln_supported() const
return false;
}

bool ucp_test_base::entity::is_rndv_supported() const
{
const auto config = ucp_ep_config(ep());
return config->key.rma_bw_lanes[0] != UCP_NULL_LANE;
}

bool ucp_test_base::entity::is_conn_reqs_queue_empty() const
{
return m_conn_reqs.empty();
Expand Down
2 changes: 2 additions & 0 deletions test/gtest/ucp/ucp_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class ucp_test_base : public ucs::test_base {

bool is_rndv_put_ppln_supported() const;

bool is_rndv_supported() const;

bool is_conn_reqs_queue_empty() const;

protected:
Expand Down
Loading