/* * librdkafka - Apache Kafka C library * * Copyright (c) 2020-2022, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include "testcpp.h" /** * Test consumer allow.auto.create.topics by subscribing to a mix * of available, unauthorized and non-existent topics. * * The same test is run with and without allow.auto.create.topics * and with and without wildcard subscribes. * */ static void do_test_consumer(bool allow_auto_create_topics, bool with_wildcards) { Test::Say(tostr() << _C_MAG << "[ Test allow.auto.create.topics=" << (allow_auto_create_topics ? "true" : "false") << " with_wildcards=" << (with_wildcards ? "true" : "false") << " ]\n"); bool has_acl_cli = test_broker_version >= TEST_BRKVER(2, 1, 0, 0) && !test_needs_auth(); /* We can't bother passing Java * security config to kafka-acls.sh */ bool supports_allow = test_broker_version >= TEST_BRKVER(0, 11, 0, 0); std::string topic_exists = Test::mk_topic_name("0109-exists", 1); std::string topic_notexists = Test::mk_topic_name("0109-notexists", 1); std::string topic_unauth = Test::mk_topic_name("0109-unauthorized", 1); /* Create consumer */ RdKafka::Conf *conf; Test::conf_init(&conf, NULL, 20); Test::conf_set(conf, "group.id", topic_exists); Test::conf_set(conf, "enable.partition.eof", "true"); /* Quickly refresh metadata on topic auto-creation since the first * metadata after auto-create hides the topic due to 0 partition count. */ Test::conf_set(conf, "topic.metadata.refresh.interval.ms", "1000"); if (allow_auto_create_topics) Test::conf_set(conf, "allow.auto.create.topics", "true"); std::string bootstraps; if (conf->get("bootstrap.servers", bootstraps) != RdKafka::Conf::CONF_OK) Test::Fail("Failed to retrieve bootstrap.servers"); std::string errstr; RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr); if (!c) Test::Fail("Failed to create KafkaConsumer: " + errstr); delete conf; /* Create topics */ Test::create_topic(c, topic_exists.c_str(), 1, 1); if (has_acl_cli) { Test::create_topic(c, topic_unauth.c_str(), 1, 1); /* Add denying ACL for unauth topic */ test_kafka_cmd( "kafka-acls.sh --bootstrap-server %s " "--add --deny-principal 'User:*' " "--operation All --deny-host '*' " "--topic '%s'", bootstraps.c_str(), topic_unauth.c_str()); } /* Wait for topic to be fully created */ test_wait_topic_exists(NULL, topic_exists.c_str(), 10 * 1000); /* * Subscribe */ std::vector topics; std::map exp_errors; topics.push_back(topic_notexists); if (has_acl_cli) topics.push_back(topic_unauth); if (with_wildcards) { topics.push_back("^" + topic_exists); topics.push_back("^" + topic_notexists); /* If the subscription contains at least one wildcard/regex * then no auto topic creation will take place (since the consumer * requests all topics in metadata, and not specific ones, thus * not triggering topic auto creation). * We need to handle the expected error cases accordingly. */ exp_errors["^" + topic_notexists] = RdKafka::ERR_UNKNOWN_TOPIC_OR_PART; exp_errors[topic_notexists] = RdKafka::ERR_UNKNOWN_TOPIC_OR_PART; if (has_acl_cli) { /* Unauthorized topics are not included in list-all-topics Metadata, * which we use for wildcards, so in this case the error code for * unauthorixed topics show up as unknown topic. */ exp_errors[topic_unauth] = RdKafka::ERR_UNKNOWN_TOPIC_OR_PART; } } else { topics.push_back(topic_exists); if (has_acl_cli) exp_errors[topic_unauth] = RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED; } if (supports_allow && !allow_auto_create_topics) exp_errors[topic_notexists] = RdKafka::ERR_UNKNOWN_TOPIC_OR_PART; RdKafka::ErrorCode err; if ((err = c->subscribe(topics))) Test::Fail("subscribe failed: " + RdKafka::err2str(err)); /* Start consuming until EOF is reached, which indicates that we have an * assignment and any errors should have been reported. */ bool run = true; while (run) { RdKafka::Message *msg = c->consume(tmout_multip(1000)); switch (msg->err()) { case RdKafka::ERR__TIMED_OUT: case RdKafka::ERR_NO_ERROR: break; case RdKafka::ERR__PARTITION_EOF: run = false; break; default: Test::Say("Consume error on " + msg->topic_name() + ": " + msg->errstr() + "\n"); std::map::iterator it = exp_errors.find(msg->topic_name()); /* Temporary unknown-topic errors are okay for auto-created topics. */ bool unknown_is_ok = allow_auto_create_topics && !with_wildcards && msg->err() == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART && msg->topic_name() == topic_notexists; if (it == exp_errors.end()) { if (unknown_is_ok) Test::Say("Ignoring temporary auto-create error for topic " + msg->topic_name() + ": " + RdKafka::err2str(msg->err()) + "\n"); else Test::Fail("Did not expect error for " + msg->topic_name() + ": got: " + RdKafka::err2str(msg->err())); } else if (msg->err() != it->second) { if (unknown_is_ok) Test::Say("Ignoring temporary auto-create error for topic " + msg->topic_name() + ": " + RdKafka::err2str(msg->err()) + "\n"); else Test::Fail("Expected '" + RdKafka::err2str(it->second) + "' for " + msg->topic_name() + ", got " + RdKafka::err2str(msg->err())); } else { exp_errors.erase(msg->topic_name()); } break; } delete msg; } /* Fail if not all expected errors were seen. */ if (!exp_errors.empty()) Test::Fail(tostr() << "Expecting " << exp_errors.size() << " more errors"); c->close(); delete c; } extern "C" { int main_0109_auto_create_topics(int argc, char **argv) { /* Parameters: * allow auto create, with wildcards */ do_test_consumer(true, true); do_test_consumer(true, false); do_test_consumer(false, true); do_test_consumer(false, false); return 0; } }