/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "./pn_test_proactor.hpp" #include "./thread.h" #include #include #include #include #include #include #include #include #include #include #include namespace pn_test { std::string listening_port(pn_listener_t *l) { const pn_netaddr_t *na = pn_listener_addr(l); char port[PN_MAX_ADDR]; pn_netaddr_host_port(na, NULL, 0, port, sizeof(port)); return port; } proactor::proactor(struct handler *h) : auto_free(pn_proactor()), handler(h) {} bool proactor::dispatch(pn_event_t *e) { void *ctx = NULL; if (pn_event_listener(e)) ctx = pn_listener_get_context(pn_event_listener(e)); else if (pn_event_connection(e)) ctx = pn_connection_get_context(pn_event_connection(e)); struct handler *h = ctx ? reinterpret_cast(ctx) : handler; bool ret = h ? h->dispatch(e) : false; // pn_test::handler doesn't know about listeners so save listener condition // here. if (pn_event_listener(e)) { pn_condition_copy(h->last_condition, pn_listener_condition(pn_event_listener(e))); } return ret; } // RAII for event batches class auto_batch { pn_proactor_t *p_; pn_event_batch_t *b_; public: auto_batch(pn_proactor_t *p, pn_event_batch_t *b) : p_(p), b_(b) {} ~auto_batch() { if (b_) pn_proactor_done(p_, b_); } pn_event_t *next() { return b_ ? pn_event_batch_next(b_) : NULL; } pn_event_batch_t *get() { return b_; } }; pn_event_type_t proactor::run(pn_event_type_t stop) { // This will hang in the underlying poll if test expectations are never met. // Not ideal, but easier to debug than race conditions caused by buggy // test-harness code that attempts to detect the problem and recover early. // The larger test or CI harness will kill us after some time limit while (true) { auto_batch b(*this, pn_proactor_wait(*this)); if (b.get()) { pn_event_t *e; while ((e = b.next())) { pn_event_type_t et = pn_event_type(e); if (dispatch(e) || et == stop) return et; } } } } std::pair proactor::flush(pn_event_type_t stop) { auto_batch b(*this, pn_proactor_get(*this)); int n = 0; if (b.get()) { pn_event_t *e; while ((e = b.next())) { pn_event_type_t et = pn_event_type(e); if (dispatch(e) || et == stop) return std::make_pair(n, et); } } return std::make_pair(n, PN_EVENT_NONE); } pn_event_type_t proactor::corun(proactor &other, pn_event_type_t stop) { // We can't wait() on either proactor as it might be idle. // We can't give up immediately if both proactors are idle // something happens on the other, so spin between the two for a limited // number of attempts that should be large enough if the test is going to // pass. int spin_limit = 1000; do { std::pair n_et = flush(stop); if (n_et.second) return n_et.second; if (n_et.first + other.flush().first == 0) { // Both idle, sleep and retry in case network traffic is in flight. millisleep(1); --spin_limit; } } while (spin_limit); return PN_EVENT_NONE; } pn_event_type_t proactor::wait_next() { // pn_proactor_wait() should never return an empty batch, so we shouldn't need // a loop here. Due to bug https://issues.apache.org/jira/browse/PROTON-1964 // we need to re-wait if we get an empty batch. // // To reproduce PROTON-1964 remove the loop below and run // TEST_CASE("proactor_proton_1586") from proactor_test.cpp // // You will pn_proactor_wait() return a non-NULL batch, but the // first call to pn_event_batch_next() returns a NULL event. // while (true) { auto_batch b(*this, pn_proactor_wait(*this)); pn_event_t *e = b.next(); if (e) { dispatch(e); return pn_event_type(e); } // Try again on an empty batch. } } pn_listener_t *proactor::listen(const std::string &addr, struct handler *handler) { pn_listener_t *l = pn_listener(); pn_listener_set_context(l, handler); pn_proactor_listen(*this, l, addr.c_str(), 4); return l; } pn_connection_t *proactor::connect(const std::string &addr, struct handler *h, pn_connection_t *c) { if (!c) c = pn_connection(); if (h) pn_connection_set_context(c, h); pn_proactor_connect2(*this, c, NULL, addr.c_str()); return c; } } // namespace pn_test