/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2013, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /** * Tests that producing to unknown partitions fails. * Issue #39 */ #include "test.h" /* Typical include path would be , but this program * is built from within the librdkafka source tree and thus differs. */ #include "rdkafka.h" /* for Kafka driver */ static int msgs_wait = 0; /* bitmask */ /** * Delivery report callback. * Called for each message once to signal its delivery status. */ static void dr_cb (rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) { int msgid = *(int *)msg_opaque; free(msg_opaque); if (!(msgs_wait & (1 << msgid))) TEST_FAIL("Unwanted delivery report for message #%i " "(waiting for 0x%x)\n", msgid, msgs_wait); TEST_SAY("Delivery report for message #%i: %s\n", msgid, rd_kafka_err2str(err)); msgs_wait &= ~(1 << msgid); if (err != RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION) TEST_FAIL("Message #%i failed with unexpected error %s\n", msgid, rd_kafka_err2str(err)); } int main_0002_unkpart (int argc, char **argv) { int partition = 99; /* non-existent */ int r; rd_kafka_t *rk; rd_kafka_topic_t *rkt; rd_kafka_conf_t *conf; rd_kafka_topic_conf_t *topic_conf; char msg[128]; int msgcnt = 10; int i; int fails = 0; const struct rd_kafka_metadata *metadata; test_conf_init(&conf, &topic_conf, 10); /* Set delivery report callback */ rd_kafka_conf_set_dr_cb(conf, dr_cb); /* Create kafka instance */ rk = test_create_handle(RD_KAFKA_PRODUCER, conf); rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0002", 0), topic_conf); if (!rkt) TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); /* Request metadata so that we know the cluster is up before producing * messages, otherwise erroneous partitions will not fail immediately.*/ if ((r = rd_kafka_metadata(rk, 0, rkt, &metadata, tmout_multip(15000))) != RD_KAFKA_RESP_ERR_NO_ERROR) TEST_FAIL("Failed to acquire metadata: %s\n", rd_kafka_err2str(r)); rd_kafka_metadata_destroy(metadata); /* Produce a message */ for (i = 0 ; i < msgcnt ; i++) { int *msgidp = malloc(sizeof(*msgidp)); *msgidp = i; rd_snprintf(msg, sizeof(msg), "%s test message #%i", argv[0], i); r = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, msg, strlen(msg), NULL, 0, msgidp); if (r == -1) { if (errno == ESRCH) TEST_SAY("Failed to produce message #%i: " "unknown partition: good!\n", i); else TEST_FAIL("Failed to produce message #%i: %s\n", i, rd_kafka_err2str( rd_kafka_errno2err(errno))); free(msgidp); } else { if (i > 5) { fails++; TEST_SAY("Message #%i produced: " "should've failed\n", i); } msgs_wait |= (1 << i); } /* After half the messages: sleep to allow the metadata * to be fetched from broker and update the actual partition * count: this will make subsequent produce() calls fail * immediately. */ if (i == 5) rd_sleep(2); } /* Wait for messages to time out */ rd_kafka_flush(rk, -1); if (msgs_wait != 0) TEST_FAIL("Still waiting for messages: 0x%x\n", msgs_wait); if (fails > 0) TEST_FAIL("See previous error(s)\n"); /* Destroy topic */ rd_kafka_topic_destroy(rkt); /* Destroy rdkafka instance */ TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); rd_kafka_destroy(rk); return 0; }