Skip to content

Commit

Permalink
Merge pull request ClickHouse#65182 from ClickHouse/s3-streams-scheduler
Browse files Browse the repository at this point in the history
IO scheduling on HTTP session level
  • Loading branch information
serxa authored Sep 2, 2024
2 parents af9291e + c6aa12f commit 1f5082e
Show file tree
Hide file tree
Showing 27 changed files with 573 additions and 168 deletions.
43 changes: 43 additions & 0 deletions base/poco/Net/include/Poco/Net/HTTPSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@


#include <ios>
#include <memory>
#include <functional>
#include "Poco/Any.h"
#include "Poco/Buffer.h"
#include "Poco/Exception.h"
Expand All @@ -33,6 +35,27 @@ namespace Net
{


class IHTTPSessionDataHooks
/// Interface to control stream of data bytes being sent or received though socket by HTTPSession
/// It allows to monitor, throttle and schedule data streams with syscall granulatrity
{
public:
virtual ~IHTTPSessionDataHooks() = default;

virtual void atStart(int bytes) = 0;
/// Called before sending/receiving data `bytes` to/from socket.

virtual void atFinish(int bytes) = 0;
/// Called when sending/receiving of data `bytes` is successfully finished.

virtual void atFail() = 0;
/// If an error occurred during send/receive `fail()` is called instead of `finish()`.
};


using HTTPSessionDataHooksPtr = std::shared_ptr<IHTTPSessionDataHooks>;


class Net_API HTTPSession
/// HTTPSession implements basic HTTP session management
/// for both HTTP clients and HTTP servers.
Expand Down Expand Up @@ -73,6 +96,12 @@ namespace Net
Poco::Timespan getReceiveTimeout() const;
/// Returns receive timeout for the HTTP session.

void setSendDataHooks(const HTTPSessionDataHooksPtr & sendDataHooks = {});
/// Sets data hooks that will be called on every sent to the socket.

void setReceiveDataHooks(const HTTPSessionDataHooksPtr & receiveDataHooks = {});
/// Sets data hooks that will be called on every receive from the socket.

bool connected() const;
/// Returns true if the underlying socket is connected.

Expand Down Expand Up @@ -211,6 +240,10 @@ namespace Net
Poco::Exception * _pException;
Poco::Any _data;

// Data hooks
HTTPSessionDataHooksPtr _sendDataHooks;
HTTPSessionDataHooksPtr _receiveDataHooks;

friend class HTTPStreamBuf;
friend class HTTPHeaderStreamBuf;
friend class HTTPFixedLengthStreamBuf;
Expand Down Expand Up @@ -246,6 +279,16 @@ namespace Net
return _receiveTimeout;
}

inline void HTTPSession::setSendDataHooks(const HTTPSessionDataHooksPtr & sendDataHooks)
{
_sendDataHooks = sendDataHooks;
}

inline void HTTPSession::setReceiveDataHooks(const HTTPSessionDataHooksPtr & receiveDataHooks)
{
_receiveDataHooks = receiveDataHooks;
}

inline StreamSocket & HTTPSession::socket()
{
return _socket;
Expand Down
24 changes: 19 additions & 5 deletions base/poco/Net/src/HTTPSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ int HTTPSession::get()
{
if (_pCurrent == _pEnd)
refill();

if (_pCurrent < _pEnd)
return *_pCurrent++;
else
return std::char_traits<char>::eof();
}


int HTTPSession::peek()
{
if (_pCurrent == _pEnd)
Expand All @@ -147,7 +147,7 @@ int HTTPSession::peek()
return std::char_traits<char>::eof();
}


int HTTPSession::read(char* buffer, std::streamsize length)
{
if (_pCurrent < _pEnd)
Expand All @@ -166,10 +166,17 @@ int HTTPSession::write(const char* buffer, std::streamsize length)
{
try
{
return _socket.sendBytes(buffer, (int) length);
if (_sendDataHooks)
_sendDataHooks->atStart((int) length);
int result = _socket.sendBytes(buffer, (int) length);
if (_sendDataHooks)
_sendDataHooks->atFinish(result);
return result;
}
catch (Poco::Exception& exc)
{
if (_sendDataHooks)
_sendDataHooks->atFail();
setException(exc);
throw;
}
Expand All @@ -180,10 +187,17 @@ int HTTPSession::receive(char* buffer, int length)
{
try
{
return _socket.receiveBytes(buffer, length);
if (_receiveDataHooks)
_receiveDataHooks->atStart(length);
int result = _socket.receiveBytes(buffer, length);
if (_receiveDataHooks)
_receiveDataHooks->atFinish(result);
return result;
}
catch (Poco::Exception& exc)
{
if (_receiveDataHooks)
_receiveDataHooks->atFail();
setException(exc);
throw;
}
Expand Down
36 changes: 18 additions & 18 deletions base/poco/Net/src/SocketImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ bool checkIsBrokenTimeout()

SocketImpl::SocketImpl():
_sockfd(POCO_INVALID_SOCKET),
_blocking(true),
_blocking(true),
_isBrokenTimeout(checkIsBrokenTimeout())
{
}
Expand All @@ -82,7 +82,7 @@ SocketImpl::~SocketImpl()
close();
}


SocketImpl* SocketImpl::acceptConnection(SocketAddress& clientAddr)
{
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
Expand Down Expand Up @@ -118,7 +118,7 @@ void SocketImpl::connect(const SocketAddress& address)
rc = ::connect(_sockfd, address.addr(), address.length());
}
while (rc != 0 && lastError() == POCO_EINTR);
if (rc != 0)
if (rc != 0)
{
int err = lastError();
error(err, address.toString());
Expand Down Expand Up @@ -205,7 +205,7 @@ void SocketImpl::bind6(const SocketAddress& address, bool reuseAddress, bool reu
#if defined(POCO_HAVE_IPv6)
if (address.family() != SocketAddress::IPv6)
throw Poco::InvalidArgumentException("SocketAddress must be an IPv6 address");

if (_sockfd == POCO_INVALID_SOCKET)
{
init(address.af());
Expand All @@ -226,11 +226,11 @@ void SocketImpl::bind6(const SocketAddress& address, bool reuseAddress, bool reu
#endif
}


void SocketImpl::listen(int backlog)
{
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();

int rc = ::listen(_sockfd, backlog);
if (rc != 0) error();
}
Expand All @@ -254,7 +254,7 @@ void SocketImpl::shutdownReceive()
if (rc != 0) error();
}


void SocketImpl::shutdownSend()
{
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
Expand All @@ -263,7 +263,7 @@ void SocketImpl::shutdownSend()
if (rc != 0) error();
}


void SocketImpl::shutdown()
{
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
Expand Down Expand Up @@ -318,15 +318,15 @@ int SocketImpl::receiveBytes(void* buffer, int length, int flags)
throw TimeoutException();
}
}

int rc;
do
{
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
rc = ::recv(_sockfd, reinterpret_cast<char*>(buffer), length, flags);
}
while (blocking && rc < 0 && lastError() == POCO_EINTR);
if (rc < 0)
if (rc < 0)
{
int err = lastError();
if ((err == POCO_EAGAIN || err == POCO_EWOULDBLOCK) && !blocking)
Expand Down Expand Up @@ -364,7 +364,7 @@ int SocketImpl::receiveFrom(void* buffer, int length, SocketAddress& address, in
throw TimeoutException();
}
}

sockaddr_storage abuffer;
struct sockaddr* pSA = reinterpret_cast<struct sockaddr*>(&abuffer);
poco_socklen_t saLen = sizeof(abuffer);
Expand Down Expand Up @@ -451,7 +451,7 @@ bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode)
}
while (rc < 0 && lastError() == POCO_EINTR);
if (rc < 0) error();
return rc > 0;
return rc > 0;

#else

Expand Down Expand Up @@ -494,7 +494,7 @@ bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode)
}
while (rc < 0 && errorCode == POCO_EINTR);
if (rc < 0) error(errorCode);
return rc > 0;
return rc > 0;

#endif // POCO_HAVE_FD_POLL
}
Expand All @@ -504,13 +504,13 @@ bool SocketImpl::poll(const Poco::Timespan& timeout, int mode)
Poco::Timespan remainingTime(timeout);
return pollImpl(remainingTime, mode);
}

void SocketImpl::setSendBufferSize(int size)
{
setOption(SOL_SOCKET, SO_SNDBUF, size);
}


int SocketImpl::getSendBufferSize()
{
int result;
Expand All @@ -524,7 +524,7 @@ void SocketImpl::setReceiveBufferSize(int size)
setOption(SOL_SOCKET, SO_RCVBUF, size);
}


int SocketImpl::getReceiveBufferSize()
{
int result;
Expand Down Expand Up @@ -570,7 +570,7 @@ Poco::Timespan SocketImpl::getReceiveTimeout()
return result;
}


SocketAddress SocketImpl::address()
{
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
Expand All @@ -581,7 +581,7 @@ SocketAddress SocketImpl::address()
int rc = ::getsockname(_sockfd, pSA, &saLen);
if (rc == 0)
return SocketAddress(pSA, saLen);
else
else
error();
return SocketAddress();
}
Expand Down
3 changes: 3 additions & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@
M(DistrCacheWriteRequests, "Number of executed Write requests to Distributed Cache") \
M(DistrCacheServerConnections, "Number of open connections to ClickHouse server from Distributed Cache") \
\
M(SchedulerIOReadScheduled, "Number of IO reads are being scheduled currently") \
M(SchedulerIOWriteScheduled, "Number of IO writes are being scheduled currently") \
\
M(StorageConnectionsStored, "Total count of sessions stored in the session pool for storages") \
M(StorageConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for storages") \
\
Expand Down
50 changes: 50 additions & 0 deletions src/Common/CurrentThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,56 @@ std::string_view CurrentThread::getQueryId()
return current_thread->getQueryId();
}

void CurrentThread::attachReadResource(ResourceLink link)
{
if (unlikely(!current_thread))
return;
if (current_thread->read_resource_link)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to read resource", std::to_string(getThreadId()));
current_thread->read_resource_link = link;
}

void CurrentThread::detachReadResource()
{
if (unlikely(!current_thread))
return;
if (!current_thread->read_resource_link)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to read resource", std::to_string(getThreadId()));
current_thread->read_resource_link.reset();
}

ResourceLink CurrentThread::getReadResourceLink()
{
if (unlikely(!current_thread))
return {};
return current_thread->read_resource_link;
}

void CurrentThread::attachWriteResource(ResourceLink link)
{
if (unlikely(!current_thread))
return;
if (current_thread->write_resource_link)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to write resource", std::to_string(getThreadId()));
current_thread->write_resource_link = link;
}

void CurrentThread::detachWriteResource()
{
if (unlikely(!current_thread))
return;
if (!current_thread->write_resource_link)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to write resource", std::to_string(getThreadId()));
current_thread->write_resource_link.reset();
}

ResourceLink CurrentThread::getWriteResourceLink()
{
if (unlikely(!current_thread))
return {};
return current_thread->write_resource_link;
}

MemoryTracker * CurrentThread::getUserMemoryTracker()
{
if (unlikely(!current_thread))
Expand Down
Loading

0 comments on commit 1f5082e

Please sign in to comment.