/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2022, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "test.h" #include "rdstring.h" /* Typical include path would be , but this program * is built from within the librdkafka source tree and thus differs. */ #include "rdkafka.h" /* for Kafka driver */ /** * KafkaConsumer balanced group testing: termination * * Runs two consumers subscribing to the same topics, waits for both to * get an assignment and then closes one of them. */ static int assign_cnt = 0; static int consumed_msg_cnt = 0; static void rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque) { char *memberid = rd_kafka_memberid(rk); TEST_SAY("%s: MemberId \"%s\": Consumer group rebalanced: %s\n", rd_kafka_name(rk), memberid, rd_kafka_err2str(err)); if (memberid) free(memberid); test_print_partition_list(partitions); switch (err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: assign_cnt++; rd_kafka_assign(rk, partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: if (assign_cnt == 0) TEST_FAIL("asymetric rebalance_cb\n"); assign_cnt--; rd_kafka_assign(rk, NULL); break; default: TEST_FAIL("rebalance failed: %s\n", rd_kafka_err2str(err)); break; } } static void consume_all(rd_kafka_t **rk_c, int rk_cnt, int exp_msg_cnt, int max_time /*ms*/) { int64_t ts_start = test_clock(); int i; max_time *= 1000; while (ts_start + max_time > test_clock()) { for (i = 0; i < rk_cnt; i++) { rd_kafka_message_t *rkmsg; if (!rk_c[i]) continue; rkmsg = rd_kafka_consumer_poll(rk_c[i], 500); if (!rkmsg) continue; else if (rkmsg->err) TEST_SAY( "Message error " "(at offset %" PRId64 " after " "%d/%d messages and %dms): %s\n", rkmsg->offset, consumed_msg_cnt, exp_msg_cnt, (int)(test_clock() - ts_start) / 1000, rd_kafka_message_errstr(rkmsg)); else consumed_msg_cnt++; rd_kafka_message_destroy(rkmsg); if (consumed_msg_cnt >= exp_msg_cnt) { static int once = 0; if (!once++) TEST_SAY("All messages consumed\n"); return; } } } } struct args { rd_kafka_t *c; rd_kafka_queue_t *queue; }; static int poller_thread_main(void *p) { struct args *args = (struct args *)p; while (!rd_kafka_consumer_closed(args->c)) { rd_kafka_message_t *rkm; /* Using a long timeout (1 minute) to verify that the * queue is woken when close is done. */ rkm = rd_kafka_consume_queue(args->queue, 60 * 1000); if (rkm) rd_kafka_message_destroy(rkm); } return 0; } /** * @brief Close consumer using async queue. */ static void consumer_close_queue(rd_kafka_t *c) { /* Use the standard consumer queue rather than a temporary queue, * the latter is covered by test 0116. */ rd_kafka_queue_t *queue = rd_kafka_queue_get_consumer(c); struct args args = {c, queue}; thrd_t thrd; int ret; /* Spin up poller thread */ if (thrd_create(&thrd, poller_thread_main, (void *)&args) != thrd_success) TEST_FAIL("Failed to create thread"); TEST_SAY("Closing consumer %s using queue\n", rd_kafka_name(c)); TEST_CALL_ERROR__(rd_kafka_consumer_close_queue(c, queue)); if (thrd_join(thrd, &ret) != thrd_success) TEST_FAIL("thrd_join failed"); rd_kafka_queue_destroy(queue); } static void do_test(rd_bool_t with_queue) { const char *topic = test_mk_topic_name(__FUNCTION__, 1); #define _CONS_CNT 2 rd_kafka_t *rk_p, *rk_c[_CONS_CNT]; rd_kafka_topic_t *rkt_p; int msg_cnt = test_quick ? 100 : 1000; int msg_base = 0; int partition_cnt = 2; int partition; uint64_t testid; rd_kafka_topic_conf_t *default_topic_conf; rd_kafka_topic_partition_list_t *topics; rd_kafka_resp_err_t err; test_timing_t t_assign, t_consume; char errstr[512]; int i; SUB_TEST("with_queue=%s", RD_STR_ToF(with_queue)); testid = test_id_generate(); /* Produce messages */ rk_p = test_create_producer(); rkt_p = test_create_producer_topic(rk_p, topic, NULL); for (partition = 0; partition < partition_cnt; partition++) { test_produce_msgs(rk_p, rkt_p, testid, partition, msg_base + (partition * msg_cnt), msg_cnt, NULL, 0); } rd_kafka_topic_destroy(rkt_p); rd_kafka_destroy(rk_p); test_conf_init(NULL, &default_topic_conf, 5 + ((test_session_timeout_ms * 3 * 2) / 1000)); if (rd_kafka_topic_conf_set(default_topic_conf, "auto.offset.reset", "smallest", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) TEST_FAIL("%s\n", errstr); /* Fill in topic subscription set */ topics = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(topics, topic, -1); /* Create consumers and start subscription */ for (i = 0; i < _CONS_CNT; i++) { rk_c[i] = test_create_consumer( topic /*group_id*/, rebalance_cb, NULL, rd_kafka_topic_conf_dup(default_topic_conf)); err = rd_kafka_poll_set_consumer(rk_c[i]); if (err) TEST_FAIL("poll_set_consumer: %s\n", rd_kafka_err2str(err)); err = rd_kafka_subscribe(rk_c[i], topics); if (err) TEST_FAIL("subscribe: %s\n", rd_kafka_err2str(err)); } rd_kafka_topic_conf_destroy(default_topic_conf); rd_kafka_topic_partition_list_destroy(topics); /* Wait for both consumers to get an assignment */ TEST_SAY("Awaiting assignments for %d consumer(s)\n", _CONS_CNT); TIMING_START(&t_assign, "WAIT.ASSIGN"); while (assign_cnt < _CONS_CNT) consume_all(rk_c, _CONS_CNT, msg_cnt, test_session_timeout_ms + 3000); TIMING_STOP(&t_assign); /* Now close one of the consumers, this will cause a rebalance. */ TEST_SAY("Closing down 1/%d consumer(s): %s\n", _CONS_CNT, rd_kafka_name(rk_c[0])); if (with_queue) consumer_close_queue(rk_c[0]); else TEST_CALL_ERR__(rd_kafka_consumer_close(rk_c[0])); rd_kafka_destroy(rk_c[0]); rk_c[0] = NULL; /* Let remaining consumers run for a while to take over the now * lost partitions. */ if (assign_cnt != _CONS_CNT - 1) TEST_FAIL("assign_cnt %d, should be %d\n", assign_cnt, _CONS_CNT - 1); TIMING_START(&t_consume, "CONSUME.WAIT"); consume_all(rk_c, _CONS_CNT, msg_cnt, test_session_timeout_ms + 3000); TIMING_STOP(&t_consume); TEST_SAY("Closing remaining consumers\n"); for (i = 0; i < _CONS_CNT; i++) { test_timing_t t_close; rd_kafka_topic_partition_list_t *sub; int j; if (!rk_c[i]) continue; /* Query subscription */ err = rd_kafka_subscription(rk_c[i], &sub); if (err) TEST_FAIL("%s: subscription() failed: %s\n", rd_kafka_name(rk_c[i]), rd_kafka_err2str(err)); TEST_SAY("%s: subscription (%d):\n", rd_kafka_name(rk_c[i]), sub->cnt); for (j = 0; j < sub->cnt; j++) TEST_SAY(" %s\n", sub->elems[j].topic); rd_kafka_topic_partition_list_destroy(sub); /* Run an explicit unsubscribe() (async) prior to close() * to trigger race condition issues on termination. */ TEST_SAY("Unsubscribing instance %s\n", rd_kafka_name(rk_c[i])); err = rd_kafka_unsubscribe(rk_c[i]); if (err) TEST_FAIL("%s: unsubscribe failed: %s\n", rd_kafka_name(rk_c[i]), rd_kafka_err2str(err)); TEST_SAY("Closing %s\n", rd_kafka_name(rk_c[i])); TIMING_START(&t_close, "CONSUMER.CLOSE"); if (with_queue) consumer_close_queue(rk_c[i]); else TEST_CALL_ERR__(rd_kafka_consumer_close(rk_c[i])); TIMING_STOP(&t_close); rd_kafka_destroy(rk_c[i]); rk_c[i] = NULL; } TEST_SAY("%d/%d messages consumed\n", consumed_msg_cnt, msg_cnt); if (consumed_msg_cnt < msg_cnt) TEST_FAIL("Only %d/%d messages were consumed\n", consumed_msg_cnt, msg_cnt); else if (consumed_msg_cnt > msg_cnt) TEST_SAY( "At least %d/%d messages were consumed " "multiple times\n", consumed_msg_cnt - msg_cnt, msg_cnt); SUB_TEST_PASS(); } int main_0018_cgrp_term(int argc, char **argv) { do_test(rd_false /* rd_kafka_consumer_close() */); do_test(rd_true /* rd_kafka_consumer_close_queue() */); return 0; }