/** * Copyright (C) Mellanox Technologies Ltd. 2018. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #include "sa_tcp.h" #include #include #include #include #include #include #include tcp_socket::tcp_socket() : file_desc(create_socket()) { } tcp_socket::tcp_socket(int fd) : file_desc(fd) { } tcp_socket::~tcp_socket() { } int tcp_socket::create_socket() { int fd = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (fd < 0) { throw sys_error("failed to create tcp socket", errno); } return fd; } tcp_connection::tcp_connection(const struct sockaddr *addr, socklen_t addrlen) : m_is_closed(false) { initialize(); int ret = ::connect(m_socket, addr, addrlen); if ((ret < 0) && (errno != EINPROGRESS)) { throw sys_error("failed to connect tcp socket", errno); } } tcp_connection::tcp_connection(int fd) : m_socket(fd), m_is_closed(false) { initialize(); } void tcp_connection::initialize() { int ret = fcntl(m_socket, F_SETFL, fcntl(m_socket, F_GETFL) | O_NONBLOCK); if (ret < 0) { throw sys_error("failed to set tcp socket to nonblocking", errno); } set_id(m_socket); } void tcp_connection::add_to_evpoll(evpoll_set& evpoll) { evpoll.add(m_socket, EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLET); } size_t tcp_connection::send(const char *buffer, size_t size) { ssize_t ret = ::send(m_socket, buffer, size, 0); if (ret < 0) { if (errno != EAGAIN) { throw sys_error("failed to send on tcp socket", errno); } return 0; } return ret; } size_t tcp_connection::recv(char *buffer, size_t size) { ssize_t ret = ::recv(m_socket, buffer, size, 0); if (ret < 0) { if (errno != EAGAIN) { throw sys_error("failed to receive from tcp socket", errno); } return 0; } if (ret == 0) { m_is_closed = true; } return ret; } bool tcp_connection::is_closed() const { return m_is_closed; } tcp_worker::tcp_worker(const struct sockaddr *listen_addr, socklen_t addrlen) { int retb = ::bind(m_server_socket, listen_addr, addrlen); if (retb != 0) { throw sys_error("failed to bind tcp socket", errno); } int retl = ::listen(m_server_socket, 1024); if (retl != 0) { throw sys_error("failed to listen on tcp socket", errno); } } void tcp_worker::add_to_evpoll(evpoll_set& evpoll) { evpoll.add(m_server_socket, EPOLLIN | EPOLLERR); } void tcp_worker::wait(const evpoll_set& evpoll, conn_handler_t conn_handler, data_handler_t data_handler, int timeout_ms) { std::vector events; evpoll.wait(events, timeout_ms); for (auto ev : events) { if (ev.fd == m_server_socket) { int ret = accept(m_server_socket, NULL, NULL); if (ret < 0) { throw sys_error("failed to accept", errno); } auto conn = std::make_shared(ret); conn_handler(conn); } else { data_handler(ev.fd, ev.ev_flags); } } } std::shared_ptr tcp_worker::connect(const struct sockaddr *addr, socklen_t addrlen) { return std::make_shared(addr, addrlen); }