/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* * Copyright 2013-2020 Couchbase, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "plugin-internal.h" #include static my_uvreq_t *alloc_uvreq(my_sockdata_t *sock, generic_callback_t callback); static void set_last_error(my_iops_t *io, int error); static void socket_closed_callback(uv_handle_t *handle); static void wire_iops2(int version, lcb_loop_procs *loop, lcb_timer_procs *timer, lcb_bsd_procs *bsd, lcb_ev_procs *ev, lcb_completion_procs *iocp, lcb_iomodel_t *model); static void decref_iops(my_iops_t *io) { lcb_assert(io->iops_refcount); if (--io->iops_refcount) { return; } memset(io, 0xff, sizeof(*io)); free(io); } static void iops_lcb_dtor(lcb_io_opt_t iobase) { my_iops_t *io = (my_iops_t *)iobase; if (io->startstop_noop) { decref_iops(io); return; } while (io->iops_refcount > 1) { UVC_RUN_ONCE(io->loop); } if (io->external_loop == 0) { uv_loop_delete(io->loop); } decref_iops(io); } /****************************************************************************** ****************************************************************************** ** Event Loop Functions ** ****************************************************************************** ******************************************************************************/ #if UV_VERSION < 0x000900 static void do_run_loop(my_iops_t *io) { while (uv_run_once(io->loop) && io->do_stop == 0) { /* nothing */ } io->do_stop = 0; } static void do_stop_loop(my_iops_t *io) { io->do_stop = 1; } #else static void do_run_loop(my_iops_t *io) { uv_run(io->loop, UV_RUN_DEFAULT); } static void do_stop_loop(my_iops_t *io) { uv_stop(io->loop); } #endif static void run_event_loop(lcb_io_opt_t iobase) { my_iops_t *io = (my_iops_t *)iobase; if (!io->startstop_noop) { do_run_loop(io); } } static void tick_event_loop(lcb_io_opt_t iobase) { my_iops_t *io = (my_iops_t *)iobase; if (!io->startstop_noop) { #if UV_VERSION < 0x000900 uv_run_once(io->loop); io->do_stop = 0; #else uv_run(io->loop, UV_RUN_NOWAIT); #endif } } static void stop_event_loop(lcb_io_opt_t iobase) { my_iops_t *io = (my_iops_t *)iobase; if (!io->startstop_noop) { do_stop_loop(io); } } LCBUV_API lcb_STATUS lcb_create_libuv_io_opts(int version, lcb_io_opt_t *io, lcbuv_options_t *options) { lcb_io_opt_t iop; uv_loop_t *loop = NULL; my_iops_t *ret; if (version != 0) { return LCB_ERR_PLUGIN_VERSION_MISMATCH; } #ifdef _WIN32 { /** UV unloading on Windows doesn't work well */ HMODULE module; /* We need to provide a symbol */ static int dummy; BOOL result; result = GetModuleHandleEx(GET_MODULE_HANDLE_EX_FLAG_FROM_ADDRESS | GET_MODULE_HANDLE_EX_FLAG_PIN, (LPCSTR)&dummy, &module); if (!result) { return LCB_ERR_SDK_INTERNAL; } } #endif ret = (my_iops_t *)calloc(1, sizeof(*ret)); if (!ret) { return LCB_ERR_NO_MEMORY; } iop = &ret->base; iop->version = 2; iop->destructor = iops_lcb_dtor; iop->v.v2.get_procs = wire_iops2; ret->iops_refcount = 1; *io = iop; if (options) { if (options->v.v0.loop) { ret->external_loop = 1; loop = options->v.v0.loop; } ret->startstop_noop = options->v.v0.startsop_noop; } if (!loop) { loop = uv_loop_new(); } ret->loop = loop; return LCB_SUCCESS; } #define SOCK_INCR_PENDING(s, fld) (s)->pending.fld++ #define SOCK_DECR_PENDING(s, fld) (s)->pending.fld-- #ifdef DEBUG static void sock_dump_pending(my_sockdata_t *sock) { printf("Socket %p:\n", (void *)sock); printf("\tRead: %d\n", sock->pending.read); printf("\tWrite: %d\n", sock->pending.write); } #endif static void sock_do_uv_close(my_sockdata_t *sock) { if (!sock->uv_close_called) { sock->uv_close_called = 1; uv_close((uv_handle_t *)&sock->tcp, socket_closed_callback); } } static void decref_sock(my_sockdata_t *sock) { lcb_assert(sock->refcount); if (--sock->refcount) { return; } sock_do_uv_close(sock); } #define incref_sock(sd) (sd)->refcount++ /****************************************************************************** ****************************************************************************** ** Socket Functions ** ****************************************************************************** ******************************************************************************/ static lcb_sockdata_t *create_socket(lcb_io_opt_t iobase, int domain, int type, int protocol) { my_sockdata_t *ret; my_iops_t *io = (my_iops_t *)iobase; ret = (my_sockdata_t *)calloc(1, sizeof(*ret)); if (!ret) { return NULL; } uv_tcp_init(io->loop, &ret->tcp.t); ret->base.socket = INVALID_SOCKET; incref_iops(io); incref_sock(ret); set_last_error(io, 0); (void)domain; (void)type; (void)protocol; return (lcb_sockdata_t *)ret; } /** * This one is called from uv_close */ static void socket_closed_callback(uv_handle_t *handle) { my_sockdata_t *sock = PTR_FROM_FIELD(my_sockdata_t, handle, tcp); my_iops_t *io = (my_iops_t *)sock->base.parent; if (sock->pending.read) { CbREQ (&sock->tcp)(&sock->base, -1, sock->rdarg); } memset(sock, 0xEE, sizeof(*sock)); free(sock); decref_iops(io); } static unsigned int close_socket(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase) { my_sockdata_t *sock = (my_sockdata_t *)sockbase; sock->uv_close_called = 1; uv_close((uv_handle_t *)&sock->tcp, socket_closed_callback); (void)iobase; return 0; } static int cntl_socket(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase, int mode, int option, void *arg) { my_sockdata_t *sd = (my_sockdata_t *)sockbase; int rv; switch (option) { case LCB_IO_CNTL_TCP_NODELAY: if (mode == LCB_IO_CNTL_SET) { rv = uv_tcp_nodelay(&sd->tcp.t, *(int *)arg); if (rv != 0) { set_last_error((my_iops_t *)iobase, rv); } return rv; } else { LCB_IOPS_ERRNO(iobase) = ENOTSUP; return -1; } default: LCB_IOPS_ERRNO(iobase) = ENOTSUP; return -1; } } /****************************************************************************** ****************************************************************************** ** Connection Functions ** ****************************************************************************** ******************************************************************************/ static void connect_callback(uv_connect_t *req, int status) { my_uvreq_t *uvr = (my_uvreq_t *)req; set_last_error((my_iops_t *)uvr->socket->base.parent, status); if (uvr->cb.conn) { uvr->cb.conn(&uvr->socket->base, status); } decref_sock(uvr->socket); free(uvr); } static int start_connect(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase, const struct sockaddr *name, unsigned int namelen, lcb_io_connect_cb callback) { my_sockdata_t *sock = (my_sockdata_t *)sockbase; my_iops_t *io = (my_iops_t *)iobase; my_uvreq_t *uvr; int ret; int err_is_set = 0; uvr = alloc_uvreq(sock, (generic_callback_t)callback); if (!uvr) { return -1; } if (namelen == sizeof(struct sockaddr_in)) { ret = UVC_TCP_CONNECT(&uvr->uvreq.conn, &sock->tcp.t, name, connect_callback); } else if (namelen == sizeof(struct sockaddr_in6)) { ret = UVC_TCP_CONNECT6(&uvr->uvreq.conn, &sock->tcp.t, name, connect_callback); } else { io->base.v.v1.error = EINVAL; ret = -1; err_is_set = 1; } if (ret) { if (!err_is_set) { set_last_error(io, ret); } free(uvr); } else { incref_sock(sock); } #if UV_VERSION_HEX >= 0x010000 { uv_os_fd_t fd = (uv_os_fd_t)INVALID_SOCKET; /* Fetch socket descriptor for internal usage. * For example to detect dead sockets. */ ret = uv_fileno((uv_handle_t *)&sock->tcp, &fd); if (ret == 0) { sock->base.socket = (lcb_socket_t)fd; } } #endif return ret; } /****************************************************************************** ****************************************************************************** ** Write Functions ** ****************************************************************************** ******************************************************************************/ static void write2_callback(uv_write_t *req, int status) { my_write_t *mw = (my_write_t *)req; my_sockdata_t *sock = mw->sock; if (status != 0) { set_last_error((my_iops_t *)sock->base.parent, status); } mw->callback(&sock->base, status, mw->w.data); free(mw); } static int start_write2(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase, struct lcb_iovec_st *iov, lcb_size_t niov, void *uarg, lcb_ioC_write2_callback callback) { my_write_t *w; my_sockdata_t *sd = (my_sockdata_t *)sockbase; int ret; w = (my_write_t *)calloc(1, sizeof(*w)); w->w.data = uarg; w->callback = callback; w->sock = sd; ret = uv_write(&w->w, (uv_stream_t *)&sd->tcp, (uv_buf_t *)iov, niov, write2_callback); if (ret != 0) { free(w); set_last_error((my_iops_t *)iobase, -1); } return ret; } /****************************************************************************** ****************************************************************************** ** Read Functions ** ****************************************************************************** ******************************************************************************/ /** * Currently we support a single IOV. In theory while we could support * multiple IOVs, two problems arise: * * (1) Because UV does not guarantee that it'll utilize the first IOV completely * we may end up having a gap of unused space between IOVs. This may be * resolved by keeping an offset into the last-returned IOV and then * determining how much of this data was actually populated by UV itself. * * (2) In the event of an error, UV gives us "Undefined" behavior if we try * to utilize the socket again. The IOPS policy dictates that we deliver * any outstanding data to libcouchbase and _then_ deliver the pending * error. If we are forced to do this all in a single go, we'd be forced * to set up an 'async handle' to deliver the pending error, complicating * our code paths. */ static UVC_ALLOC_CB(alloc_cb) { UVC_ALLOC_CB_VARS() my_sockdata_t *sock = PTR_FROM_FIELD(my_sockdata_t, handle, tcp); buf->base = (char *)sock->iov.iov_base; buf->len = sock->iov.iov_len; (void)suggested_size; UVC_ALLOC_CB_RETURN(); } static UVC_READ_CB(read_cb) { UVC_READ_CB_VARS() my_tcp_t *mt = (my_tcp_t *)stream; my_sockdata_t *sock = PTR_FROM_FIELD(my_sockdata_t, mt, tcp); my_iops_t *io = (my_iops_t *)sock->base.parent; lcb_ioC_read2_callback callback = CbREQ(mt); if (nread == 0) { /* we have a fixed IOV between requests, so just retry again */ return; } /** * XXX: * For multi-IOV support, we would require a counter to determine if this * EAGAIN is spurious (i.e. no previous data in buffer), or actual. In * the case of the former, we'd retry -- but in the latter it is a signal * that there is no more pending data within the socket buffer AND we have * outstanding data to deliver back to the caller. */ SOCK_DECR_PENDING(sock, read); uv_read_stop(stream); CbREQ(mt) = NULL; if (nread < 0) { set_last_error(io, uvc_last_errno(io->loop, nread)); if (uvc_is_eof(io->loop, nread)) { nread = 0; } } callback(&sock->base, nread, sock->rdarg); decref_sock(sock); (void)buf; } static int start_read(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase, lcb_IOV *iov, lcb_size_t niov, void *uarg, lcb_ioC_read2_callback callback) { my_sockdata_t *sock = (my_sockdata_t *)sockbase; my_iops_t *io = (my_iops_t *)iobase; int ret; sock->iov = *iov; sock->rdarg = uarg; sock->tcp.callback = callback; ret = uv_read_start((uv_stream_t *)&sock->tcp.t, alloc_cb, read_cb); set_last_error(io, ret); if (ret == 0) { SOCK_INCR_PENDING(sock, read); incref_sock(sock); } return ret; } static int get_nameinfo(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase, struct lcb_nameinfo_st *ni) { my_sockdata_t *sock = (my_sockdata_t *)sockbase; my_iops_t *io = (my_iops_t *)iobase; uv_tcp_getpeername(&sock->tcp.t, ni->remote.name, ni->remote.len); uv_tcp_getsockname(&sock->tcp.t, ni->local.name, ni->local.len); (void)io; return 0; } /****************************************************************************** ****************************************************************************** ** Timer Functions ** ** There are just copied from the old couchnode I/O code ** ****************************************************************************** ******************************************************************************/ static UVC_TIMER_CB(timer_cb) { my_timer_t *mytimer = (my_timer_t *)timer; if (mytimer->callback) { mytimer->callback(-1, 0, mytimer->cb_arg); } } static void *create_timer(lcb_io_opt_t iobase) { my_iops_t *io = (my_iops_t *)iobase; my_timer_t *timer = (my_timer_t *)calloc(1, sizeof(*timer)); if (!timer) { return NULL; } timer->parent = io; incref_iops(io); uv_timer_init(io->loop, &timer->uvt); return timer; } static int update_timer(lcb_io_opt_t iobase, void *timer_opaque, lcb_uint32_t usec, void *cbdata, v0_callback_t callback) { my_timer_t *timer = (my_timer_t *)timer_opaque; timer->callback = callback; timer->cb_arg = cbdata; (void)iobase; return uv_timer_start(&timer->uvt, timer_cb, usec / 1000, 0); } static void delete_timer(lcb_io_opt_t iobase, void *timer_opaque) { my_timer_t *timer = (my_timer_t *)timer_opaque; uv_timer_stop(&timer->uvt); timer->callback = NULL; (void)iobase; } static void timer_close_cb(uv_handle_t *handle) { my_timer_t *timer = (my_timer_t *)handle; decref_iops(timer->parent); memset(timer, 0xff, sizeof(*timer)); free(timer); } static void destroy_timer(lcb_io_opt_t io, void *timer_opaque) { delete_timer(io, timer_opaque); uv_close((uv_handle_t *)timer_opaque, timer_close_cb); } static my_uvreq_t *alloc_uvreq(my_sockdata_t *sock, generic_callback_t callback) { my_uvreq_t *ret = (my_uvreq_t *)calloc(1, sizeof(*ret)); if (!ret) { sock->base.parent->v.v1.error = ENOMEM; return NULL; } ret->socket = sock; ret->cb.cb_ = callback; return ret; } static void set_last_error(my_iops_t *io, int error) { io->base.v.v1.error = uvc_last_errno(io->loop, error); } #if UV_VERSION_HEX >= 0x010000 static int check_closed(lcb_io_opt_t io, lcb_sockdata_t *sockbase, int flags) { my_sockdata_t *sd = (my_sockdata_t *)sockbase; char buf = 0; int rv = 0; lcb_socket_t sock = sd->base.socket; if (sock == INVALID_SOCKET) { return LCB_IO_SOCKCHECK_STATUS_UNKNOWN; } GT_RETRY: /* We can ignore flags for now, since both Windows and POSIX support MSG_PEEK */ rv = recv(sock, &buf, 1, MSG_PEEK); if (rv == 1) { if (flags & LCB_IO_SOCKCHECK_PEND_IS_ERROR) { return LCB_IO_SOCKCHECK_STATUS_CLOSED; } else { return LCB_IO_SOCKCHECK_STATUS_OK; } } else if (rv == 0) { /* Really closed! */ return LCB_IO_SOCKCHECK_STATUS_CLOSED; } else { int last_err; #ifdef _WIN32 last_err = get_wserr(sock); #else last_err = errno; #endif if (last_err == EINTR) { goto GT_RETRY; } else if (last_err == EWOULDBLOCK || last_err == EAGAIN) { return LCB_IO_SOCKCHECK_STATUS_OK; /* Nothing to report. So we're good */ } else { return LCB_IO_SOCKCHECK_STATUS_CLOSED; } } } #endif static void wire_iops2(int version, lcb_loop_procs *loop, lcb_timer_procs *timer, lcb_bsd_procs *bsd, lcb_ev_procs *ev, lcb_completion_procs *iocp, lcb_iomodel_t *model) { *model = LCB_IOMODEL_COMPLETION; loop->start = run_event_loop; loop->stop = stop_event_loop; loop->tick = tick_event_loop; timer->create = create_timer; timer->cancel = delete_timer; timer->schedule = update_timer; timer->destroy = destroy_timer; iocp->close = close_socket; iocp->socket = create_socket; iocp->connect = start_connect; iocp->nameinfo = get_nameinfo; iocp->read2 = start_read; iocp->write2 = start_write2; iocp->cntl = cntl_socket; #if UV_VERSION_HEX >= 0x010000 iocp->is_closed = check_closed; #endif /** Stuff we don't use */ iocp->write = NULL; iocp->wballoc = NULL; iocp->wbfree = NULL; iocp->serve = NULL; (void)bsd; (void)version; (void)ev; }