/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2022, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "test.h" /* Typical include path would be , but this program * is built from within the librdkafka source tree and thus differs. */ #include "rdkafka.h" /* for Kafka driver */ /** * Basic compression tests, with rather lacking verification. */ int main_0017_compression(int argc, char **argv) { rd_kafka_t *rk_p, *rk_c; const int msg_cnt = 1000; int msg_base = 0; uint64_t testid; #define CODEC_CNT 5 const char *codecs[CODEC_CNT + 1] = { "none", #if WITH_ZLIB "gzip", #endif #if WITH_SNAPPY "snappy", #endif #if WITH_ZSTD "zstd", #endif "lz4", NULL }; char *topics[CODEC_CNT]; const int32_t partition = 0; int i; int crc; testid = test_id_generate(); /* Produce messages */ rk_p = test_create_producer(); for (i = 0; codecs[i] != NULL; i++) { rd_kafka_topic_t *rkt_p; topics[i] = rd_strdup(test_mk_topic_name(codecs[i], 1)); TEST_SAY( "Produce %d messages with %s compression to " "topic %s\n", msg_cnt, codecs[i], topics[i]); rkt_p = test_create_producer_topic( rk_p, topics[i], "compression.codec", codecs[i], NULL); /* Produce small message that will not decrease with * compression (issue #781) */ test_produce_msgs(rk_p, rkt_p, testid, partition, msg_base + (partition * msg_cnt), 1, NULL, 5); /* Produce standard sized messages */ test_produce_msgs(rk_p, rkt_p, testid, partition, msg_base + (partition * msg_cnt) + 1, msg_cnt - 1, NULL, 512); rd_kafka_topic_destroy(rkt_p); } rd_kafka_destroy(rk_p); /* restart timeout (mainly for helgrind use since it is very slow) */ test_timeout_set(30); /* Consume messages: Without and with CRC checking */ for (crc = 0; crc < 2; crc++) { const char *crc_tof = crc ? "true" : "false"; rd_kafka_conf_t *conf; test_conf_init(&conf, NULL, 0); test_conf_set(conf, "check.crcs", crc_tof); rk_c = test_create_consumer(NULL, NULL, conf, NULL); for (i = 0; codecs[i] != NULL; i++) { rd_kafka_topic_t *rkt_c = rd_kafka_topic_new(rk_c, topics[i], NULL); TEST_SAY("Consume %d messages from topic %s (crc=%s)\n", msg_cnt, topics[i], crc_tof); /* Start consuming */ test_consumer_start(codecs[i], rkt_c, partition, RD_KAFKA_OFFSET_BEGINNING); /* Consume messages */ test_consume_msgs( codecs[i], rkt_c, testid, partition, /* Use offset 0 here, which is wrong, should * be TEST_NO_SEEK, but it exposed a bug * where the Offset query was postponed * till after the seek, causing messages * to be replayed. */ 0, msg_base, msg_cnt, 1 /* parse format */); test_consumer_stop(codecs[i], rkt_c, partition); rd_kafka_topic_destroy(rkt_c); } rd_kafka_destroy(rk_c); } for (i = 0; codecs[i] != NULL; i++) rd_free(topics[i]); return 0; }