/* * librdkafka - Apache Kafka C library * * Copyright (c) 2019-2022, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include #include "testcpp.h" extern "C" { #include "rdkafka.h" /* For interceptor interface */ #include "../src/tinycthread.h" /* For mutexes */ } class myThreadCb { public: myThreadCb() : startCnt_(0), exitCnt_(0) { mtx_init(&lock_, mtx_plain); } ~myThreadCb() { mtx_destroy(&lock_); } int startCount() { int cnt; mtx_lock(&lock_); cnt = startCnt_; mtx_unlock(&lock_); return cnt; } int exitCount() { int cnt; mtx_lock(&lock_); cnt = exitCnt_; mtx_unlock(&lock_); return cnt; } virtual void thread_start_cb(const char *threadname) { Test::Say(tostr() << "Started thread: " << threadname << "\n"); mtx_lock(&lock_); startCnt_++; mtx_unlock(&lock_); } virtual void thread_exit_cb(const char *threadname) { Test::Say(tostr() << "Exiting from thread: " << threadname << "\n"); mtx_lock(&lock_); exitCnt_++; mtx_unlock(&lock_); } private: int startCnt_; int exitCnt_; mtx_t lock_; }; /** * @brief C to C++ callback trampoline. */ static rd_kafka_resp_err_t on_thread_start_trampoline( rd_kafka_t *rk, rd_kafka_thread_type_t thread_type, const char *threadname, void *ic_opaque) { myThreadCb *threadcb = (myThreadCb *)ic_opaque; Test::Say(tostr() << "on_thread_start(" << thread_type << ", " << threadname << ") called\n"); threadcb->thread_start_cb(threadname); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief C to C++ callback trampoline. */ static rd_kafka_resp_err_t on_thread_exit_trampoline( rd_kafka_t *rk, rd_kafka_thread_type_t thread_type, const char *threadname, void *ic_opaque) { myThreadCb *threadcb = (myThreadCb *)ic_opaque; Test::Say(tostr() << "on_thread_exit(" << thread_type << ", " << threadname << ") called\n"); threadcb->thread_exit_cb(threadname); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief This interceptor is called when a new client instance is created * prior to any threads being created. * We use it to set up the instance's thread interceptors. */ static rd_kafka_resp_err_t on_new(rd_kafka_t *rk, const rd_kafka_conf_t *conf, void *ic_opaque, char *errstr, size_t errstr_size) { Test::Say("on_new() interceptor called\n"); rd_kafka_interceptor_add_on_thread_start( rk, "test:0100", on_thread_start_trampoline, ic_opaque); rd_kafka_interceptor_add_on_thread_exit(rk, "test:0100", on_thread_exit_trampoline, ic_opaque); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief The on_conf_dup() interceptor let's use add the on_new interceptor * in case the config object is copied, since interceptors are not * automatically copied. */ static rd_kafka_resp_err_t on_conf_dup(rd_kafka_conf_t *new_conf, const rd_kafka_conf_t *old_conf, size_t filter_cnt, const char **filter, void *ic_opaque) { Test::Say("on_conf_dup() interceptor called\n"); return rd_kafka_conf_interceptor_add_on_new(new_conf, "test:0100", on_new, ic_opaque); } static void test_thread_cbs() { RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); std::string errstr; rd_kafka_conf_t *c_conf; myThreadCb my_threads; Test::conf_set(conf, "bootstrap.servers", "127.0.0.1:1"); /* Interceptors are not supported in the C++ API, instead use the C API: * 1. Extract the C conf_t object * 2. Set up an on_new() interceptor * 3. Set up an on_conf_dup() interceptor to add interceptors in the * case the config object is copied (which the C++ Conf always does). * 4. In the on_new() interceptor, add the thread interceptors. */ c_conf = conf->c_ptr_global(); rd_kafka_conf_interceptor_add_on_new(c_conf, "test:0100", on_new, &my_threads); rd_kafka_conf_interceptor_add_on_conf_dup(c_conf, "test:0100", on_conf_dup, &my_threads); RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr); if (!p) Test::Fail("Failed to create Producer: " + errstr); p->poll(500); delete conf; delete p; Test::Say(tostr() << my_threads.startCount() << " thread start calls, " << my_threads.exitCount() << " thread exit calls seen\n"); /* 3 = rdkafka main thread + internal broker + bootstrap broker */ if (my_threads.startCount() < 3) Test::Fail("Did not catch enough thread start callback calls"); if (my_threads.exitCount() < 3) Test::Fail("Did not catch enough thread exit callback calls"); if (my_threads.startCount() != my_threads.exitCount()) Test::Fail("Did not catch same number of start and exit callback calls"); } extern "C" { int main_0100_thread_interceptors(int argc, char **argv) { test_thread_cbs(); return 0; } }