/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* * Copyright 2011-2020 Couchbase, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #undef NDEBUG #include "socktest.h" #include using std::list; extern "C" { static void ctx_error(lcbio_CTX *ctx, lcb_STATUS err) { ESocket *s = (ESocket *)lcbio_ctx_data(ctx); s->lasterr = err; s->actions->onError(s); s->parent->stop(); } static void ctx_read(lcbio_CTX *ctx, unsigned nr) { ESocket *s = (ESocket *)lcbio_ctx_data(ctx); s->actions->onRead(s, nr); } static void conn_cb(lcbio_SOCKET *sock, void *data, lcb_STATUS err, lcbio_OSERR oserr) { ESocket *mysock = (ESocket *)data; mysock->assign(sock, err); mysock->parent->stop(); mysock->syserr = oserr; mysock->callCount++; } static void ctx_flush_ready(lcbio_CTX *ctx) { ESocket *s = (ESocket *)lcbio_ctx_data(ctx); s->actions->onFlushReady(s); } static void ctx_flush_done(lcbio_CTX *ctx, unsigned expected, unsigned nr) { ESocket *s = (ESocket *)lcbio_ctx_data(ctx); s->actions->onFlushDone(s, expected, nr); } } void IOActions::onError(ESocket *) { // noop for now } void IOActions::onRead(ESocket *s, size_t nr) { lcbio_CTXRDITER iter; LCBIO_CTX_ITERFOR(s->ctx, &iter, nr) { char *curbuf = (char *)lcbio_ctx_ribuf(&iter); unsigned nbuf = lcbio_ctx_risize(&iter); s->readbuf.insert(s->readbuf.end(), curbuf, curbuf + nbuf); } } IOActions ESocket::defaultActions; void ESocket::assign(lcbio_SOCKET *s, lcb_STATUS err) { creq = NULL; if (s == NULL) { lasterr = err; return; } err = lcbio_sslify_if_needed(s, s->settings); if (err != LCB_SUCCESS) { lasterr = err; return; } lcbio_CTXPROCS procs{}; procs.cb_err = ctx_error; procs.cb_read = ctx_read; procs.cb_flush_done = ctx_flush_done; procs.cb_flush_ready = ctx_flush_ready; sock = s; ctx = lcbio_ctx_new(s, this, &procs); } extern "C" { static void close_cb(lcbio_SOCKET *s, int reusable, void *arg) { (void)arg; if (reusable) { lcbio_ref(s); lcb::io::Pool::put(s); } } } void ESocket::close() { if (!ctx) { return; } if (lcb::io::Pool::is_from_pool(ctx->sock)) { lcbio_ctx_close(ctx, close_cb, NULL); } else { lcbio_ctx_close(ctx, NULL, NULL); } sock = NULL; ctx = NULL; } class BreakTimer : public Timer { public: BreakTimer(Loop *l) : Timer(l->iot) { this->loop = l; } void expired() { if (!loop->bcond) { return; } if (loop->bcond->shouldBreak()) { loop->stop(); } else { loop->scheduleBreak(); } } virtual ~BreakTimer() {} private: Loop *loop; }; Loop::Loop() { io = NULL; lcb_STATUS rc = lcb_create_io_ops(&io, NULL); assert(rc == LCB_SUCCESS); assert(io != NULL); iot = lcbio_table_new(io); settings = lcb_settings_new(); settings->logger = lcb_init_console_logger(); server = new TestServer(); breakTimer = new BreakTimer(this); bcond = NULL; sockpool = new lcb::io::Pool(settings, iot); } Loop::~Loop() { delete breakTimer; delete server; sockpool->shutdown(); lcbio_table_unref(iot); lcb_destroy_io_ops(io); lcb_settings_unref(settings); } void Loop::scheduleBreak() { breakTimer->schedule(2); // 2ms } void Loop::cancelBreak() { breakTimer->cancel(); } void Loop::start() { if (bcond) { scheduleBreak(); } IOT_START(iot); cancelBreak(); bcond = NULL; } void Loop::stop() { cancelBreak(); IOT_STOP(iot); } void Loop::initSockCommon(ESocket *sock) { // Find the peer.. struct sockaddr_in *addr = (struct sockaddr_in *)&sock->sock->info->sa_local; uint16_t port = ntohs(addr->sin_port); sock->conn = server->findConnection(port); } void Loop::connectPooled(ESocket *sock, lcb_host_t *host, unsigned mstmo) { lcb_host_t tmphost = {0}; sock->parent = this; if (!host) { populateHost(&tmphost); host = &tmphost; } sock->creq = sockpool->get(*host, LCB_MS2US(mstmo), conn_cb, sock); start(); if (sock->sock) { initSockCommon(sock); } } void Loop::connect(ESocket *sock, lcb_host_t *host, unsigned mstmo) { lcb_host_t tmphost = {0}; if (host == NULL) { populateHost(&tmphost); host = &tmphost; } sock->parent = this; sock->creq = lcbio_connect(iot, settings, host, LCB_MS2US(mstmo), conn_cb, sock); start(); if (sock->sock) { initSockCommon(sock); } } void Loop::populateHost(lcb_host_t *host) { strcpy(host->host, server->getHostString().c_str()); strcpy(host->port, server->getPortString().c_str()); } extern "C" { static void timerCallback(void *arg) { Timer *tm = (Timer *)arg; tm->expired(); } } Timer::Timer(lcbio_TABLE *iot) { timer = lcbio_timer_new(iot, this, timerCallback); } Timer::~Timer() { destroy(); } void Timer::destroy() { lcbio_timer_destroy(timer); timer = NULL; } void Timer::cancel() { lcbio_timer_disarm(timer); } void Timer::schedule(unsigned ms) { lcbio_timer_rearm(timer, LCB_MS2US(ms)); } void Timer::signal() { schedule(0); } // Conditions BreakCondition::BreakCondition() { broke = false; } bool BreakCondition::shouldBreak() { if (shouldBreakImpl()) { broke = true; return true; } return false; } bool FlushedBreakCondition::shouldBreakImpl() { lcbio_CTX *ctx = sock->ctx; if (ctx->npending) { return false; } if (ctx->output && ctx->output->rb.nbytes == 0) { return true; } return false; } bool ReadBreakCondition::shouldBreakImpl() { lcbio_CTX *ctx = sock->ctx; unsigned unread = rdb_get_nused(&ctx->ior); return unread + sock->getReceived().size() >= expected; } extern "C" { static void dtor_cb(lcbio_CTX *ctx) { CtxCloseBreakCondition *bc = (CtxCloseBreakCondition *)lcbio_ctx_data(ctx); bc->gotDtor(); } } void CtxCloseBreakCondition::closeCtx() { lcbio_ctx_close_ex(s->ctx, NULL, NULL, dtor_cb, this); s->ctx = NULL; }