/* * librdkafka - Apache Kafka C library * * Copyright (c) 2016-2022, Magnus Edenhill * 2023, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "test.h" /* Typical include path would be , but this program * is built from within the librdkafka source tree and thus differs. */ #include "rdkafka.h" /* for Kafka driver */ /** * KafkaConsumer: regex topic subscriptions */ struct expect { char *name; /* sub-test name */ const char *sub[4]; /* subscriptions */ const char *exp[4]; /* expected topics */ int exp_err; /* expected error from subscribe() */ int stat[4]; /* per exp status */ int fails; enum { _EXP_NONE, _EXP_FAIL, _EXP_OK, _EXP_ASSIGN, _EXP_REVOKE, _EXP_ASSIGNED, _EXP_REVOKED, } result; }; static struct expect *exp_curr; static uint64_t testid; static void expect_match(struct expect *exp, const rd_kafka_topic_partition_list_t *parts) { int i; int e = 0; int fails = 0; memset(exp->stat, 0, sizeof(exp->stat)); for (i = 0; i < parts->cnt; i++) { int found = 0; e = 0; while (exp->exp[e]) { if (!strcmp(parts->elems[i].topic, exp->exp[e])) { exp->stat[e]++; found++; } e++; } if (!found) { TEST_WARN("%s: got unexpected topic match: %s\n", exp->name, parts->elems[i].topic); fails++; } } e = 0; while (exp->exp[e]) { if (!exp->stat[e]) { TEST_WARN( "%s: expected topic not " "found in assignment: %s\n", exp->name, exp->exp[e]); fails++; } else { TEST_SAY("%s: expected topic %s seen in assignment\n", exp->name, exp->exp[e]); } e++; } exp->fails += fails; if (fails) { TEST_WARN("%s: see %d previous failures\n", exp->name, fails); exp->result = _EXP_FAIL; } else { TEST_SAY(_C_MAG "[ %s: assignment matched ]\n", exp->name); exp->result = _EXP_OK; } } static void rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *parts, void *opaque) { struct expect *exp = exp_curr; TEST_ASSERT(exp_curr, "exp_curr not set"); TEST_SAY("rebalance_cb: %s with %d partition(s)\n", rd_kafka_err2str(err), parts->cnt); test_print_partition_list(parts); switch (err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: /* Check that provided partitions match our expectations */ if (exp->result != _EXP_ASSIGN) { TEST_WARN( "%s: rebalance called while expecting %d: " "too many or undesired assignment(s?\n", exp->name, exp->result); } expect_match(exp, parts); test_consumer_assign("rebalance", rk, parts); exp->result = _EXP_ASSIGNED; break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: if (exp->result != _EXP_REVOKE) { TEST_WARN( "%s: rebalance called while expecting %d: " "too many or undesired assignment(s?\n", exp->name, exp->result); } test_consumer_unassign("rebalance", rk); exp->result = _EXP_REVOKED; break; default: TEST_FAIL("rebalance_cb: error: %s", rd_kafka_err2str(err)); } } /** * @brief Poll the consumer once. */ static void consumer_poll_once(rd_kafka_t *rk) { rd_kafka_message_t *rkmessage; rkmessage = rd_kafka_consumer_poll(rk, 1000); if (!rkmessage) return; if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { TEST_SAY("%s [%" PRId32 "] reached EOF at " "offset %" PRId64 "\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset); } else if (rkmessage->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) { /* Test segfault associated with this call is solved */ int32_t leader_epoch = rd_kafka_message_leader_epoch(rkmessage); TEST_ASSERT(leader_epoch == -1, "rd_kafka_message_leader_epoch should be -1" ", got %" PRId32, leader_epoch); if (strstr(rd_kafka_topic_name(rkmessage->rkt), "NONEXIST")) TEST_SAY("%s: %s: error is expected for this topic\n", rd_kafka_topic_name(rkmessage->rkt), rd_kafka_message_errstr(rkmessage)); else TEST_FAIL( "%s [%" PRId32 "] error (offset %" PRId64 "): %s", rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt) : "(no-topic)", rkmessage->partition, rkmessage->offset, rd_kafka_message_errstr(rkmessage)); } rd_kafka_message_destroy(rkmessage); } static int test_subscribe(rd_kafka_t *rk, struct expect *exp) { rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *tlist; int i; test_timing_t t_sub, t_assign, t_unsub; exp_curr = exp; test_timeout_set((test_session_timeout_ms / 1000) * 3); tlist = rd_kafka_topic_partition_list_new(4); TEST_SAY(_C_MAG "[ %s: begin ]\n", exp->name); i = 0; TEST_SAY("Topic subscription:\n"); while (exp->sub[i]) { TEST_SAY("%s: %s\n", exp->name, exp->sub[i]); rd_kafka_topic_partition_list_add(tlist, exp->sub[i], RD_KAFKA_PARTITION_UA); i++; } /* Subscribe */ TIMING_START(&t_sub, "subscribe"); err = rd_kafka_subscribe(rk, tlist); TIMING_STOP(&t_sub); TEST_ASSERT(err == exp->exp_err, "subscribe() failed: %s (expected %s)", rd_kafka_err2str(err), rd_kafka_err2str(exp->exp_err)); if (exp->exp[0]) { /* Wait for assignment, actual messages are ignored. */ exp->result = _EXP_ASSIGN; TEST_SAY("%s: waiting for assignment\n", exp->name); TIMING_START(&t_assign, "assignment"); while (exp->result == _EXP_ASSIGN) consumer_poll_once(rk); TIMING_STOP(&t_assign); TEST_ASSERT(exp->result == _EXP_ASSIGNED, "got %d instead of assignment", exp->result); } else { /* Not expecting any assignment */ int64_t ts_end = test_clock() + 5000; exp->result = _EXP_NONE; /* Not expecting a rebalance */ while (exp->result == _EXP_NONE && test_clock() < ts_end) consumer_poll_once(rk); TEST_ASSERT(exp->result == _EXP_NONE); } /* Unsubscribe */ TIMING_START(&t_unsub, "unsubscribe"); err = rd_kafka_unsubscribe(rk); TIMING_STOP(&t_unsub); TEST_ASSERT(!err, "unsubscribe() failed: %s", rd_kafka_err2str(err)); rd_kafka_topic_partition_list_destroy(tlist); if (exp->exp[0]) { /* Wait for revoke, actual messages are ignored. */ TEST_SAY("%s: waiting for revoke\n", exp->name); exp->result = _EXP_REVOKE; TIMING_START(&t_assign, "revoke"); while (exp->result != _EXP_REVOKED) consumer_poll_once(rk); TIMING_STOP(&t_assign); TEST_ASSERT(exp->result == _EXP_REVOKED, "got %d instead of revoke", exp->result); } else { /* Not expecting any revoke */ int64_t ts_end = test_clock() + 5000; exp->result = _EXP_NONE; /* Not expecting a rebalance */ while (exp->result == _EXP_NONE && test_clock() < ts_end) consumer_poll_once(rk); TEST_ASSERT(exp->result == _EXP_NONE); } TEST_SAY(_C_MAG "[ %s: done with %d failures ]\n", exp->name, exp->fails); return exp->fails; } static int do_test(const char *assignor) { static char topics[3][128]; static char nonexist_topic[128]; const int topic_cnt = 3; rd_kafka_t *rk; const int msgcnt = 10; int i; char groupid[64]; int fails = 0; rd_kafka_conf_t *conf; if (!test_check_builtin("regex")) { TEST_SKIP("regex support not built in\n"); return 0; } testid = test_id_generate(); test_str_id_generate(groupid, sizeof(groupid)); rd_snprintf(topics[0], sizeof(topics[0]), "%s_%s", test_mk_topic_name("regex_subscribe_TOPIC_0001_UNO", 0), groupid); rd_snprintf(topics[1], sizeof(topics[1]), "%s_%s", test_mk_topic_name("regex_subscribe_topic_0002_dup", 0), groupid); rd_snprintf(topics[2], sizeof(topics[2]), "%s_%s", test_mk_topic_name("regex_subscribe_TOOTHPIC_0003_3", 0), groupid); /* To avoid auto topic creation to kick in we use * an invalid topic name. */ rd_snprintf( nonexist_topic, sizeof(nonexist_topic), "%s_%s", test_mk_topic_name("regex_subscribe_NONEXISTENT_0004_IV#!", 0), groupid); /* Produce messages to topics to ensure creation. */ for (i = 0; i < topic_cnt; i++) test_produce_msgs_easy(topics[i], testid, RD_KAFKA_PARTITION_UA, msgcnt); test_conf_init(&conf, NULL, 20); test_conf_set(conf, "partition.assignment.strategy", assignor); /* Speed up propagation of new topics */ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000"); test_conf_set(conf, "allow.auto.create.topics", "true"); /* Create a single consumer to handle all subscriptions. * Has the nice side affect of testing multiple subscriptions. */ rk = test_create_consumer(groupid, rebalance_cb, conf, NULL); /* * Test cases */ { struct expect expect = {.name = rd_strdup(tsprintf( "%s: no regexps (0&1)", assignor)), .sub = {topics[0], topics[1], NULL}, .exp = {topics[0], topics[1], NULL}}; fails += test_subscribe(rk, &expect); rd_free(expect.name); } { struct expect expect = {.name = rd_strdup(tsprintf("%s: no regexps " "(no matches)", assignor)), .sub = {nonexist_topic, NULL}, .exp = {NULL}}; fails += test_subscribe(rk, &expect); rd_free(expect.name); } { struct expect expect = { .name = rd_strdup(tsprintf("%s: regex all", assignor)), .sub = {rd_strdup(tsprintf("^.*_%s", groupid)), NULL}, .exp = {topics[0], topics[1], topics[2], NULL}}; fails += test_subscribe(rk, &expect); rd_free(expect.name); rd_free((void *)expect.sub[0]); } { struct expect expect = { .name = rd_strdup(tsprintf("%s: regex 0&1", assignor)), .sub = {rd_strdup(tsprintf( "^.*[tToOpPiIcC]_0+[12]_[^_]+_%s", groupid)), NULL}, .exp = {topics[0], topics[1], NULL}}; fails += test_subscribe(rk, &expect); rd_free(expect.name); rd_free((void *)expect.sub[0]); } { struct expect expect = { .name = rd_strdup(tsprintf("%s: regex 2", assignor)), .sub = {rd_strdup( tsprintf("^.*TOOTHPIC_000._._%s", groupid)), NULL}, .exp = {topics[2], NULL}}; fails += test_subscribe(rk, &expect); rd_free(expect.name); rd_free((void *)expect.sub[0]); } { struct expect expect = { .name = rd_strdup(tsprintf("%s: regex 2 and " "nonexistent(not seen)", assignor)), .sub = {rd_strdup(tsprintf("^.*_000[34]_..?_%s", groupid)), NULL}, .exp = {topics[2], NULL}}; fails += test_subscribe(rk, &expect); rd_free(expect.name); rd_free((void *)expect.sub[0]); } { struct expect expect = { .name = rd_strdup( tsprintf("%s: broken regex (no matches)", assignor)), .sub = {"^.*[0", NULL}, .exp = {NULL}, .exp_err = RD_KAFKA_RESP_ERR__INVALID_ARG}; fails += test_subscribe(rk, &expect); rd_free(expect.name); } test_consumer_close(rk); rd_kafka_destroy(rk); if (fails) TEST_FAIL("See %d previous failures", fails); return 0; } int main_0033_regex_subscribe(int argc, char **argv) { do_test("range"); do_test("roundrobin"); return 0; } /** * @brief Subscription API tests that dont require a broker */ int main_0033_regex_subscribe_local(int argc, char **argv) { rd_kafka_topic_partition_list_t *valids, *invalids, *none, *empty, *alot; rd_kafka_t *rk; rd_kafka_conf_t *conf; rd_kafka_resp_err_t err; char errstr[256]; int i; valids = rd_kafka_topic_partition_list_new(0); invalids = rd_kafka_topic_partition_list_new(100); none = rd_kafka_topic_partition_list_new(1000); empty = rd_kafka_topic_partition_list_new(5); alot = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(valids, "not_a_regex", 0); rd_kafka_topic_partition_list_add(valids, "^My[vV]alid..regex+", 0); rd_kafka_topic_partition_list_add(valids, "^another_one$", 55); rd_kafka_topic_partition_list_add(invalids, "not_a_regex", 0); rd_kafka_topic_partition_list_add(invalids, "^My[vV]alid..regex+", 0); rd_kafka_topic_partition_list_add(invalids, "^a[b", 99); rd_kafka_topic_partition_list_add(empty, "not_a_regex", 0); rd_kafka_topic_partition_list_add(empty, "", 0); rd_kafka_topic_partition_list_add(empty, "^ok", 0); for (i = 0; i < 10000; i++) { char topic[32]; rd_snprintf(topic, sizeof(topic), "^Va[lLid]_regex_%d$", i); rd_kafka_topic_partition_list_add(alot, topic, i); } conf = rd_kafka_conf_new(); test_conf_set(conf, "group.id", "group"); test_conf_set(conf, "client.id", test_curr->name); rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); if (!rk) TEST_FAIL("Failed to create consumer: %s", errstr); err = rd_kafka_subscribe(rk, valids); TEST_ASSERT(!err, "valids failed: %s", rd_kafka_err2str(err)); err = rd_kafka_subscribe(rk, invalids); TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, "invalids failed with wrong return: %s", rd_kafka_err2str(err)); err = rd_kafka_subscribe(rk, none); TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, "none failed with wrong return: %s", rd_kafka_err2str(err)); err = rd_kafka_subscribe(rk, empty); TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, "empty failed with wrong return: %s", rd_kafka_err2str(err)); err = rd_kafka_subscribe(rk, alot); TEST_ASSERT(!err, "alot failed: %s", rd_kafka_err2str(err)); rd_kafka_consumer_close(rk); rd_kafka_destroy(rk); rd_kafka_topic_partition_list_destroy(valids); rd_kafka_topic_partition_list_destroy(invalids); rd_kafka_topic_partition_list_destroy(none); rd_kafka_topic_partition_list_destroy(empty); rd_kafka_topic_partition_list_destroy(alot); return 0; }