Skip to content

Commit

Permalink
Merge pull request #167 from dkl/fix-op-leak
Browse files Browse the repository at this point in the history
Fix leak of remaining pending operations during io_service destruction
  • Loading branch information
aboseley authored Dec 5, 2021
2 parents 27740d7 + 15a4f45 commit e0058a3
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 170 deletions.
25 changes: 4 additions & 21 deletions azmq/detail/reactor_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,45 +14,28 @@

#include <boost/optional.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/intrusive/list.hpp>

namespace azmq {
namespace detail {
class reactor_op {
public:
using socket_type = socket_ops::socket_type;
using flags_type = socket_ops::flags_type;
boost::intrusive::list_member_hook<> member_hook_;
boost::system::error_code ec_;
size_t bytes_transferred_;
size_t bytes_transferred_ = 0;

bool do_perform(socket_type & socket) { return perform_func_(this, socket); }
static void do_complete(reactor_op * op) {
op->complete_func_(op, op->ec_, op->bytes_transferred_);
}
virtual ~reactor_op() = default;
virtual bool do_perform(socket_type& socket) = 0;
virtual void do_complete() = 0;

static boost::system::error_code canceled() { return boost::asio::error::operation_aborted; }

protected:
typedef bool (*perform_func_type)(reactor_op*, socket_type &);
typedef void (*complete_func_type)(reactor_op* op, boost::system::error_code const&, size_t);

perform_func_type perform_func_;
complete_func_type complete_func_;

bool try_again() const {
return ec_.value() == boost::system::errc::resource_unavailable_try_again;
}

bool is_canceled() const { return ec_ == canceled(); }

reactor_op(perform_func_type perform_func,
complete_func_type complete_func)
: bytes_transferred_(0)
, perform_func_(perform_func)
, complete_func_(complete_func)
{ }

};

} // namespace detail
Expand Down
80 changes: 23 additions & 57 deletions azmq/detail/receive_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,16 @@ namespace detail {
template<typename MutableBufferSequence>
class receive_buffer_op_base : public reactor_op {
public:
receive_buffer_op_base(MutableBufferSequence const& buffers,
flags_type flags,
complete_func_type complete_func)
: reactor_op(&receive_buffer_op_base::do_perform, complete_func)
, buffers_(buffers)
receive_buffer_op_base(MutableBufferSequence const& buffers, flags_type flags)
: buffers_(buffers)
, flags_(flags)
{ }

static bool do_perform(reactor_op* base, socket_type & socket) {
auto o = static_cast<receive_buffer_op_base*>(base);
o->ec_ = boost::system::error_code();

o->bytes_transferred_ += socket_ops::receive(o->buffers_, socket, o->flags_ | ZMQ_DONTWAIT, o->ec_);
if (o->ec_)
return !o->try_again();
virtual bool do_perform(socket_type& socket) override {
ec_ = boost::system::error_code();
bytes_transferred_ += socket_ops::receive(buffers_, socket, flags_ | ZMQ_DONTWAIT, ec_);
if (ec_)
return !try_again();
return true;
}

Expand All @@ -60,20 +55,12 @@ class receive_buffer_op : public receive_buffer_op_base<MutableBufferSequence> {
receive_buffer_op(MutableBufferSequence const& buffers,
Handler handler,
socket_ops::flags_type flags)
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags,
&receive_buffer_op::do_complete)
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags)
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base,
const boost::system::error_code &,
size_t) {
auto o = static_cast<receive_buffer_op*>(base);
auto h = std::move(o->handler_);
auto ec = o->ec_;
auto bt = o->bytes_transferred_;
delete o;
h(ec, bt);
virtual void do_complete() override {
handler_(this->ec_, this->bytes_transferred_);
}

private:
Expand All @@ -87,21 +74,12 @@ class receive_more_buffer_op : public receive_buffer_op_base<MutableBufferSequen
receive_more_buffer_op(MutableBufferSequence const& buffers,
Handler handler,
socket_ops::flags_type flags)
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags,
&receive_more_buffer_op::do_complete)
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags)
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base,
const boost::system::error_code &,
size_t) {
auto o = static_cast<receive_more_buffer_op*>(base);
auto h = std::move(o->handler_);
auto ec = o->ec_;
auto bt = o->bytes_transferred_;
auto m = o->more();
delete o;
h(ec, std::make_pair(bt, m));
virtual void do_complete() override {
handler_(this->ec_, std::make_pair(this->bytes_transferred_, this->more()));
}

private:
Expand All @@ -110,19 +88,15 @@ class receive_more_buffer_op : public receive_buffer_op_base<MutableBufferSequen

class receive_op_base : public reactor_op {
public:
receive_op_base(socket_ops::flags_type flags,
complete_func_type complete_func)
: reactor_op(&receive_op_base::do_perform, complete_func)
, flags_(flags)
receive_op_base(socket_ops::flags_type flags)
: flags_(flags)
{ }

static bool do_perform(reactor_op* base, socket_type & socket) {
auto o = static_cast<receive_op_base*>(base);
o->ec_ = boost::system::error_code();

o->bytes_transferred_ = socket_ops::receive(o->msg_, socket, o->flags_ | ZMQ_DONTWAIT, o->ec_);
if (o->ec_)
return !o->try_again();
virtual bool do_perform(socket_type& socket) override {
ec_ = boost::system::error_code();
bytes_transferred_ = socket_ops::receive(msg_, socket, flags_ | ZMQ_DONTWAIT, ec_);
if (ec_)
return !try_again();
return true;
}

Expand All @@ -136,20 +110,12 @@ class receive_op : public receive_op_base {
public:
receive_op(Handler handler,
socket_ops::flags_type flags)
: receive_op_base(flags, &receive_op::do_complete)
: receive_op_base(flags)
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base,
const boost::system::error_code &,
size_t) {
auto o = static_cast<receive_op*>(base);
auto h = std::move(o->handler_);
auto m = std::move(o->msg_);
auto ec = o->ec_;
auto bt = o->bytes_transferred_;
delete o;
h(ec, m, bt);
virtual void do_complete() override {
handler_(ec_, msg_, bytes_transferred_);
}

private:
Expand Down
65 changes: 20 additions & 45 deletions azmq/detail/send_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,16 @@ namespace detail {
template<typename ConstBufferSequence>
class send_buffer_op_base : public reactor_op {
public:
send_buffer_op_base(ConstBufferSequence const& buffers,
flags_type flags,
complete_func_type complete_func)
: reactor_op(&send_buffer_op_base::do_perform, complete_func)
, buffers_(buffers)
send_buffer_op_base(ConstBufferSequence const& buffers, flags_type flags)
: buffers_(buffers)
, flags_(flags)
{ }

static bool do_perform(reactor_op* base, socket_type & socket) {
auto o = static_cast<send_buffer_op_base*>(base);
o->ec_ = boost::system::error_code();
o->bytes_transferred_ += socket_ops::send(o->buffers_, socket, o->flags_ | ZMQ_DONTWAIT, o->ec_);
if (o->ec_) {
return !o->try_again();
virtual bool do_perform(socket_type& socket) override {
ec_ = boost::system::error_code();
bytes_transferred_ += socket_ops::send(buffers_, socket, flags_ | ZMQ_DONTWAIT, ec_);
if (ec_) {
return !try_again();
}
return true;
}
Expand All @@ -54,21 +50,12 @@ class send_buffer_op : public send_buffer_op_base<ConstBufferSequence> {
send_buffer_op(ConstBufferSequence const& buffers,
Handler handler,
reactor_op::flags_type flags)
: send_buffer_op_base<ConstBufferSequence>(buffers, flags,
&send_buffer_op::do_complete)
: send_buffer_op_base<ConstBufferSequence>(buffers, flags)
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base,
const boost::system::error_code &,
size_t) {
auto o = static_cast<send_buffer_op*>(base);
auto h = std::move(o->handler_);
auto ec = o->ec_;
auto bt = o->bytes_transferred_;
delete o;

h(ec, bt);
virtual void do_complete() override {
handler_(this->ec_, this->bytes_transferred_);
}

private:
Expand All @@ -77,21 +64,16 @@ class send_buffer_op : public send_buffer_op_base<ConstBufferSequence> {

class send_op_base : public reactor_op {
public:
send_op_base(message msg,
flags_type flags,
complete_func_type complete_func)
: reactor_op(&send_op_base::do_perform, complete_func)
, msg_(std::move(msg))
send_op_base(message msg, flags_type flags)
: msg_(std::move(msg))
, flags_(flags)
{ }

static bool do_perform(reactor_op* base, socket_type & socket) {
auto o = static_cast<send_op_base*>(base);
o->ec_ = boost::system::error_code();
o->bytes_transferred_ = socket_ops::send(o->msg_, socket, o->flags_ | ZMQ_DONTWAIT, o->ec_);

if (o->ec_)
return !o->try_again(); // some other error
virtual bool do_perform(socket_type & socket) override {
ec_ = boost::system::error_code();
bytes_transferred_ = socket_ops::send(msg_, socket, flags_ | ZMQ_DONTWAIT, ec_);
if (ec_)
return !try_again(); // some other error
return true;
};

Expand All @@ -106,19 +88,12 @@ class send_op : public send_op_base {
send_op(message msg,
Handler handler,
flags_type flags)
: send_op_base(std::move(msg), flags, &send_op::do_complete)
: send_op_base(std::move(msg), flags)
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base,
const boost::system::error_code &,
size_t) {
auto o = static_cast<send_op*>(base);
auto h = std::move(o->handler_);
auto ec = o->ec_;
auto bt = o->bytes_transferred_;
delete o;
h(ec, bt);
virtual void do_complete() override {
handler_(ec_, bytes_transferred_);
}

private:
Expand Down
Loading

0 comments on commit e0058a3

Please sign in to comment.