diff --git a/include/uxr/client/core/session/session.h b/include/uxr/client/core/session/session.h index dfffe6d7..4112c5c4 100644 --- a/include/uxr/client/core/session/session.h +++ b/include/uxr/client/core/session/session.h @@ -399,6 +399,15 @@ UXRDLLAPI uxrStreamId uxr_create_input_reliable_stream( UXRDLLAPI void uxr_flash_output_streams( uxrSession* session); +/** + * @brief Flashes one output stream seding the data through the transport. + * @param session A uxrSession structure previously initialized. + * @param stream_id A uxrStreamId structure previously initialized. + */ +UXRDLLAPI void uxr_flash_one_output_stream( + uxrSession* session, + const uxrStreamId stream_id); + /** * @brief Keeps communication between the Client and the Agent. * This function involves the following actions: @@ -466,12 +475,29 @@ UXRDLLAPI bool uxr_run_session_until_timeout( * The aforementioned actions will be performed in a loop until a the `timeout` is exceeded * or the output reliable streams confirm the delivery of all their messages. * @param session A uxrSession structure previously initialized. - * @param timeout The waiting time in milliseconds. + * @param timeout_ms The waiting time in milliseconds. * @return `true` if all output reliable streams confirm the delivery of their messages. `false` in other case. */ UXRDLLAPI bool uxr_run_session_until_confirm_delivery( uxrSession* session, - int timeout); + int timeout_ms); + +/** + * @brief Keeps communication between the Client and the Agent. + * This function involves the following actions: + * 1. flushing one output streams sending the data through the transport, + * 2. listening messages from the Agent calling the associated callback (topic and status). + * The aforementioned actions will be performed in a loop until a the `timeout` is exceeded + * or the output reliable streams confirm the delivery of all their messages. + * @param session A uxrSession structure previously initialized. + * @param stream A uxrStreamId previously initialized. + * @param timeout_ms The waiting time in milliseconds. + * @return `true` if given output reliable stream confirms the delivery of his messages. `false` in other case. + */ +UXRDLLAPI bool uxr_run_session_until_confirm_delivery_one_stream( + uxrSession* session, + const uxrStreamId stream, + int timeout_ms); /** * @brief Keeps communication between the Client and the Agent. diff --git a/src/c/core/session/session.c b/src/c/core/session/session.c index 1df93894..20572c28 100644 --- a/src/c/core/session/session.c +++ b/src/c/core/session/session.c @@ -389,11 +389,15 @@ bool uxr_run_session_until_confirm_delivery( uxr_flash_output_streams(session); - bool timeout = false; - while (!uxr_output_streams_confirmed(&session->streams) && !timeout) + int64_t start_timestamp = uxr_millis(); + int remaining_time = timeout_ms; + + do { - timeout = !listen_message_reliably(session, timeout_ms); + listen_message_reliably(session, remaining_time); + remaining_time = timeout_ms - (int)(uxr_millis() - start_timestamp); } + while (remaining_time > 0 && !uxr_output_streams_confirmed(&session->streams)); bool ret = uxr_output_streams_confirmed(&session->streams); @@ -401,6 +405,41 @@ bool uxr_run_session_until_confirm_delivery( return ret; } +bool uxr_run_session_until_confirm_delivery_one_stream( + uxrSession* session, + const uxrStreamId stream_id, + int timeout_ms) +{ + if (stream_id.direction != UXR_OUTPUT_STREAM || + stream_id.type != UXR_RELIABLE_STREAM || + stream_id.index >= session->streams.output_reliable_size) + { + return false; + } + + UXR_LOCK_SESSION(session); + + const uxrOutputReliableStream* stream = + &session->streams.output_reliable[stream_id.index]; + + uxr_flash_one_output_stream(session, stream_id); + + int64_t start_timestamp = uxr_millis(); + int remaining_time = timeout_ms; + + do + { + listen_message_reliably(session, remaining_time); + remaining_time = timeout_ms - (int)(uxr_millis() - start_timestamp); + } + while (remaining_time > 0 && !uxr_output_one_stream_confirmed(stream)); + + bool ret = uxr_output_one_stream_confirmed(stream); + + UXR_UNLOCK_SESSION(session); + return ret; +} + bool uxr_run_session_until_all_status( uxrSession* session, int timeout_ms, @@ -604,6 +643,49 @@ void uxr_flash_output_streams( } } +void uxr_flash_one_output_stream( + uxrSession* session, + const uxrStreamId stream_id) +{ + UXR_HANDLE_SHARED_MEMORY(); + + if (stream_id.direction == UXR_OUTPUT_STREAM) + { + if (stream_id.type == UXR_BEST_EFFORT_STREAM) + { + uxrOutputBestEffortStream* stream = &session->streams.output_best_effort[stream_id.index]; + + uint8_t* buffer; size_t length; uxrSeqNum seq_num; + + UXR_LOCK_STREAM_ID(session, stream_id); + + if (uxr_prepare_best_effort_buffer_to_send(stream, &buffer, &length, &seq_num)) + { + uxr_stamp_session_header(&session->info, stream_id.raw, seq_num, buffer); + send_message(session, buffer, length); + } + + UXR_UNLOCK_STREAM_ID(session, stream_id); + } + else if (stream_id.type == UXR_RELIABLE_STREAM) + { + uxrOutputReliableStream* stream = &session->streams.output_reliable[stream_id.index]; + + uint8_t* buffer; size_t length; uxrSeqNum seq_num; + + UXR_LOCK_STREAM_ID(session, stream_id); + + while (uxr_prepare_next_reliable_buffer_to_send(stream, &buffer, &length, &seq_num)) + { + uxr_stamp_session_header(&session->info, stream_id.raw, seq_num, buffer); + send_message(session, buffer, length); + } + + UXR_UNLOCK_STREAM_ID(session, stream_id); + } + } +} + //================================================================== // PRIVATE //================================================================== diff --git a/src/c/core/session/stream/stream_storage.c b/src/c/core/session/stream/stream_storage.c index 1d9bcdc3..5d44fbbb 100644 --- a/src/c/core/session/stream/stream_storage.c +++ b/src/c/core/session/stream/stream_storage.c @@ -148,3 +148,13 @@ bool uxr_output_streams_confirmed( } return up_to_date; } + +bool uxr_output_one_stream_confirmed( + const uxrOutputReliableStream* stream) +{ + bool up_to_date = true; + UXR_LOCK((uxrMutex*) &stream->mutex); + up_to_date = uxr_is_output_up_to_date(stream); + UXR_UNLOCK((uxrMutex*) &stream->mutex); + return up_to_date; +} diff --git a/src/c/core/session/stream/stream_storage_internal.h b/src/c/core/session/stream/stream_storage_internal.h index 647b1266..96e05ca9 100644 --- a/src/c/core/session/stream/stream_storage_internal.h +++ b/src/c/core/session/stream/stream_storage_internal.h @@ -69,6 +69,9 @@ uxrInputReliableStream* uxr_get_input_reliable_stream( bool uxr_output_streams_confirmed( const uxrStreamStorage* storage); +bool uxr_output_one_stream_confirmed( + const uxrOutputReliableStream* storage); + #ifdef __cplusplus } #endif // ifdef __cplusplus