57 return connect(
m_socket, addr, addr_len);
60 int Sock::Bind(
const sockaddr* addr, socklen_t addr_len)
const 62 return bind(
m_socket, addr, addr_len);
70 std::unique_ptr<Sock>
Sock::Accept(sockaddr* addr, socklen_t* addr_len)
const 78 std::unique_ptr<Sock> sock;
80 const auto socket = accept(
m_socket, addr, addr_len);
83 sock = std::make_unique<Sock>(socket);
84 }
catch (
const std::exception&) {
98 return getsockopt(
m_socket, level, opt_name, static_cast<char*>(opt_val), opt_len);
101 int Sock::SetSockOpt(
int level,
int opt_name,
const void* opt_val, socklen_t opt_len)
const 103 return setsockopt(
m_socket, level, opt_name, static_cast<const char*>(opt_val), opt_len);
132 #if defined(USE_POLL) || defined(WIN32) 144 std::shared_ptr<const Sock> shared{
this, [](
const Sock*) {}};
148 if (!
WaitMany(timeout, events_per_sock)) {
152 if (occurred !=
nullptr) {
153 *occurred = events_per_sock.begin()->second.occurred;
162 std::vector<pollfd> pfds;
163 for (
const auto& [sock, events] : events_per_sock) {
165 auto& pfd = pfds.back();
166 pfd.fd = sock->m_socket;
167 if (events.requested &
RECV) {
168 pfd.events |= POLLIN;
170 if (events.requested &
SEND) {
171 pfd.events |= POLLOUT;
179 assert(pfds.size() == events_per_sock.size());
181 for (
auto& [sock, events] : events_per_sock) {
182 assert(sock->m_socket == static_cast<SOCKET>(pfds[i].fd));
184 if (pfds[i].revents & POLLIN) {
185 events.occurred |=
RECV;
187 if (pfds[i].revents & POLLOUT) {
188 events.occurred |=
SEND;
190 if (pfds[i].revents & (POLLERR | POLLHUP)) {
191 events.occurred |=
ERR;
206 for (
const auto& [sock, events] : events_per_sock) {
207 if (!sock->IsSelectable()) {
210 const auto& s = sock->m_socket;
211 if (events.requested &
RECV) {
214 if (events.requested &
SEND) {
218 socket_max = std::max(socket_max, s);
227 for (
auto& [sock, events] : events_per_sock) {
228 const auto& s = sock->m_socket;
230 if (FD_ISSET(s, &recv)) {
231 events.occurred |=
RECV;
233 if (FD_ISSET(s, &
send)) {
234 events.occurred |=
SEND;
236 if (FD_ISSET(s, &err)) {
237 events.occurred |=
ERR;
246 std::chrono::milliseconds timeout,
249 const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
256 sent +=
static_cast<size_t>(
ret);
257 if (sent == data.
size()) {
267 const auto now = GetTime<std::chrono::milliseconds>();
269 if (now >= deadline) {
271 "Send timeout (sent only %u of %u bytes before that)", sent, data.
size()));
276 "Send interrupted (sent only %u of %u bytes before that)", sent, data.
size()));
281 const auto wait_time = std::min(deadline - now, std::chrono::milliseconds{
MAX_WAIT_FOR_IO});
287 std::chrono::milliseconds timeout,
294 std::chrono::milliseconds timeout,
296 size_t max_data)
const 298 const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
300 bool terminator_found{
false};
311 if (data.size() >= max_data) {
312 throw std::runtime_error(
313 strprintf(
"Received too many bytes without a terminator (%u)", data.size()));
318 const ssize_t peek_ret{
Recv(buf, std::min(
sizeof(buf), max_data - data.size()), MSG_PEEK)};
329 throw std::runtime_error(
"Connection unexpectedly closed by peer");
331 auto end = buf + peek_ret;
332 auto terminator_pos = std::find(buf, end, terminator);
333 terminator_found = terminator_pos != end;
335 const size_t try_len{terminator_found ? terminator_pos - buf + 1 :
336 static_cast<size_t>(peek_ret)};
338 const ssize_t read_ret{
Recv(buf, try_len, 0)};
340 if (read_ret < 0 || static_cast<size_t>(read_ret) != try_len) {
341 throw std::runtime_error(
342 strprintf(
"recv() returned %u bytes on attempt to read %u bytes but previous " 343 "peek claimed %u bytes are available",
344 read_ret, try_len, peek_ret));
348 const size_t append_len{terminator_found ? try_len - 1 : try_len};
350 data.append(buf, buf + append_len);
352 if (terminator_found) {
357 const auto now = GetTime<std::chrono::milliseconds>();
359 if (now >= deadline) {
361 "Receive timeout (received %u bytes without terminator before that)", data.size()));
366 "Receive interrupted (received %u bytes without terminator before that)",
371 const auto wait_time = std::min(deadline - now, std::chrono::milliseconds{
MAX_WAIT_FOR_IO});
379 errmsg =
"not connected";
384 switch (
Recv(&c,
sizeof(c), MSG_PEEK)) {
425 return Win32ErrorString(err);
virtual bool SetNonBlocking() const
Set the non-blocking option on the socket.
virtual void SendComplete(Span< const unsigned char > data, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt) const
Send the given data, retrying on transient errors.
virtual int Bind(const sockaddr *addr, socklen_t addr_len) const
bind(2) wrapper.
virtual bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const
Same as Wait(), but wait on many sockets within the same timeout.
constexpr std::size_t size() const noexcept
SOCKET m_socket
Contained socket.
virtual std::string RecvUntilTerminator(uint8_t terminator, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt, size_t max_data) const
Read from socket until a terminator character is encountered.
virtual ssize_t Recv(void *buf, size_t len, int flags) const
recv(2) wrapper.
#define WSAGetLastError()
void Close()
Close m_socket if it is not INVALID_SOCKET.
virtual bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const
Wait for readiness for input (recv) or output (send).
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
virtual int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const
getsockopt(2) wrapper.
std::string SysErrorString(int err)
Return system error string from errno value.
virtual std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const
accept(2) wrapper.
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
constexpr int64_t count_milliseconds(std::chrono::milliseconds t)
virtual ssize_t Send(const void *data, size_t len, int flags) const
send(2) wrapper.
virtual int SetSockOpt(int level, int opt_name, const void *opt_val, socklen_t opt_len) const
setsockopt(2) wrapper.
A helper class for interruptible sleeps.
std::string NetworkErrorString(int err)
Return readable error string for a network error code.
static constexpr Event ERR
Ignored if passed to Wait(), but could be set in the occurred events if an exceptional condition has ...
bool operator==(SOCKET s) const
Check if the internal socket is equal to s.
virtual bool IsSelectable() const
Check if the underlying socket can be used for select(2) (or the Wait() method).
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
static bool IOErrorIsPermanent(int err)
virtual int Listen(int backlog) const
listen(2) wrapper.
constexpr C * data() const noexcept
constexpr auto MakeUCharSpan(V &&v) -> decltype(UCharSpanCast(Span
Like the Span constructor, but for (const) unsigned char member types only.
struct timeval MillisToTimeval(int64_t nTimeout)
Convert milliseconds to a struct timeval for e.g.
RAII helper class that manages a socket and closes it automatically when it goes out of scope...
virtual ~Sock()
Destructor, close the socket or do nothing if empty.
virtual bool IsConnected(std::string &errmsg) const
Check if still connected.
virtual int GetSockName(sockaddr *name, socklen_t *name_len) const
getsockname(2) wrapper.
Auxiliary requested/occurred events to wait for in WaitMany().
Sock & operator=(const Sock &)=delete
Copy assignment operator, disabled because closing the same socket twice is undesirable.
virtual int Connect(const sockaddr *addr, socklen_t addr_len) const
connect(2) wrapper.
static constexpr auto MAX_WAIT_FOR_IO
Maximum time to wait for I/O readiness.