/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2022, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include #include "testcpp.h" /** * @brief Test log callbacks and log queues */ class myLogCb : public RdKafka::EventCb { private: enum { _EXP_NONE, _EXP_LOG } state_; int cnt_; public: myLogCb() : state_(_EXP_NONE), cnt_(0) { } void expecting(bool b) { state_ = b ? _EXP_LOG : _EXP_NONE; } int count() { return cnt_; } void event_cb(RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_LOG: cnt_++; Test::Say(tostr() << "Log: " << "level " << event.severity() << ", facility " << event.fac() << ", str " << event.str() << "\n"); if (state_ != _EXP_LOG) Test::Fail( "Received unexpected " "log message"); break; default: break; } } }; static void test_log(std::string what, bool main_queue) { RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); myLogCb my_log; std::string errstr; Test::conf_set(conf, "client.id", test_curr_name()); Test::conf_set(conf, "debug", "generic"); // generate some logs Test::conf_set(conf, "log.queue", "true"); if (conf->set("event_cb", &my_log, errstr) != RdKafka::Conf::CONF_OK) Test::Fail(errstr); Test::Say(what + "Creating producer, not expecting any log messages\n"); my_log.expecting(false); RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr); if (!p) Test::Fail(what + "Failed to create Producer: " + errstr); delete conf; RdKafka::Queue *queue = NULL; if (!main_queue) { queue = RdKafka::Queue::create(p); queue->poll(1000); } else { p->poll(1000); } Test::Say(what + "Setting log queue\n"); p->set_log_queue(queue); /* Redirect logs to main queue */ Test::Say(what + "Expecting at least one log message\n"); my_log.expecting(true); if (queue) queue->poll(1000); else p->poll(1000); /* Should not spontaneously call logs */ Test::Say(tostr() << what << "Saw " << my_log.count() << " logs\n"); if (my_log.count() < 1) Test::Fail(what + "No logs seen: expected at least one broker " "failure"); if (queue) delete queue; delete (p); } extern "C" { int main_0058_log(int argc, char **argv) { test_log("main.queue: ", true); test_log("local.queue: ", false); return 0; } }