/* * librdkafka - Apache Kafka C library * * Copyright (c) 2016, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "test.h" /* Typical include path would be , but this program * is built from within the librdkafka source tree and thus differs. */ #include "rdkafka.h" /* for Kafka driver */ /** * Issue #559: make sure auto.offset.reset works with invalid offsets. */ static void do_test_reset (const char *topic, int partition, const char *reset, int64_t initial_offset, int exp_eofcnt, int exp_msgcnt, int exp_errcnt) { rd_kafka_t *rk; rd_kafka_topic_t *rkt; int eofcnt = 0, msgcnt = 0, errcnt = 0; TEST_SAY("Test auto.offset.reset=%s, " "expect %d msgs, %d EOFs, %d errors\n", reset, exp_msgcnt, exp_eofcnt, exp_errcnt); rk = test_create_consumer(NULL, NULL, NULL, NULL, NULL); rkt = test_create_topic_object(rk, topic, "auto.offset.reset", reset, NULL); test_consumer_start(reset, rkt, partition, initial_offset); while (1) { rd_kafka_message_t *rkm; rkm = rd_kafka_consume(rkt, partition, 1000*10); if (!rkm) TEST_FAIL("%s: no message for 10s: " "%d/%d messages, %d/%d EOFs, %d/%d errors\n", reset, msgcnt, exp_msgcnt, eofcnt, exp_eofcnt, errcnt, exp_errcnt); if (rkm->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { TEST_SAY("%s: received EOF at offset %"PRId64"\n", reset, rkm->offset); eofcnt++; } else if (rkm->err) { TEST_SAY("%s: consume error at offset %"PRId64": %s\n", reset, rkm->offset, rd_kafka_message_errstr(rkm)); errcnt++; } else { msgcnt++; } rd_kafka_message_destroy(rkm); if (eofcnt == exp_eofcnt && errcnt == exp_errcnt && msgcnt == exp_msgcnt) break; else if (eofcnt > exp_eofcnt || errcnt > exp_errcnt || msgcnt > exp_msgcnt) TEST_FAIL("%s: unexpected: " "%d/%d messages, %d/%d EOFs, %d/%d errors\n", reset, msgcnt, exp_msgcnt, eofcnt, exp_eofcnt, errcnt, exp_errcnt); } TEST_SAY("%s: Done: " "%d/%d messages, %d/%d EOFs, %d/%d errors\n", reset, msgcnt, exp_msgcnt, eofcnt, exp_eofcnt, errcnt, exp_errcnt); test_consumer_stop(reset, rkt, partition); rd_kafka_topic_destroy(rkt); rd_kafka_destroy(rk); } int main_0034_offset_reset (int argc, char **argv) { const char *topic = test_mk_topic_name(__FUNCTION__, 1); const int partition = 0; const int msgcnt = 100; /* Produce messages */ test_produce_msgs_easy(topic, 0, partition, msgcnt); /* auto.offset.reset=latest: Consume messages from invalid offset: * Should return EOF. */ do_test_reset(topic, partition, "latest", msgcnt+5, 1, 0, 0); /* auto.offset.reset=earliest: Consume messages from invalid offset: * Should return messages from beginning. */ do_test_reset(topic, partition, "earliest", msgcnt+5, 1, msgcnt, 0); /* auto.offset.reset=error: Consume messages from invalid offset: * Should return error. */ do_test_reset(topic, partition, "error", msgcnt+5, 0, 0, 1); return 0; }