/* * librdkafka - Apache Kafka C library * * Copyright (c) 2016-2022, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "testcpp.h" #if WITH_RAPIDJSON #include #include #include #include #include #include #include #include #include #include #include #include #include /** * @name Consumer Transactions. * * - Uses the TransactionProducerCli Java application to produce messages * that are part of abort and commit transactions in various combinations * and tests that librdkafka consumes them as expected. Refer to * TransactionProducerCli.java for scenarios covered. */ class TestEventCb : public RdKafka::EventCb { public: static bool should_capture_stats; static bool has_captured_stats; static int64_t partition_0_hi_offset; static int64_t partition_0_ls_offset; static std::string topic; void event_cb(RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_STATS: if (should_capture_stats) { partition_0_hi_offset = -1; partition_0_ls_offset = -1; has_captured_stats = true; should_capture_stats = false; char path[256]; /* Parse JSON to validate */ rapidjson::Document d; if (d.Parse(event.str().c_str()).HasParseError()) Test::Fail(tostr() << "Failed to parse stats JSON: " << rapidjson::GetParseError_En(d.GetParseError()) << " at " << d.GetErrorOffset()); rd_snprintf(path, sizeof(path), "/topics/%s/partitions/0", topic.c_str()); rapidjson::Pointer jpath((const char *)path); rapidjson::Value *pp = rapidjson::GetValueByPointer(d, jpath); if (pp == NULL) return; TEST_ASSERT(pp->HasMember("hi_offset"), "hi_offset not found in stats"); TEST_ASSERT(pp->HasMember("ls_offset"), "ls_offset not found in stats"); partition_0_hi_offset = (*pp)["hi_offset"].GetInt(); partition_0_ls_offset = (*pp)["ls_offset"].GetInt(); } break; case RdKafka::Event::EVENT_LOG: std::cerr << event.str() << "\n"; break; default: break; } } }; bool TestEventCb::should_capture_stats; bool TestEventCb::has_captured_stats; int64_t TestEventCb::partition_0_hi_offset; int64_t TestEventCb::partition_0_ls_offset; std::string TestEventCb::topic; static TestEventCb ex_event_cb; static void execute_java_produce_cli(std::string &bootstrapServers, const std::string &topic, const std::string &testidstr, const char **cmds, size_t cmd_cnt) { const std::string topicCmd = "topic," + topic; const std::string testidCmd = "testid," + testidstr; const char **argv; size_t i = 0; argv = (const char **)rd_alloca(sizeof(*argv) * (1 + 1 + 1 + cmd_cnt + 1)); argv[i++] = bootstrapServers.c_str(); argv[i++] = topicCmd.c_str(); argv[i++] = testidCmd.c_str(); for (size_t j = 0; j < cmd_cnt; j++) argv[i++] = cmds[j]; argv[i] = NULL; int pid = test_run_java("TransactionProducerCli", (const char **)argv); test_waitpid(pid); } static std::vector consume_messages(RdKafka::KafkaConsumer *c, std::string topic, int partition) { RdKafka::ErrorCode err; /* Assign partitions */ std::vector parts; parts.push_back(RdKafka::TopicPartition::create(topic, partition)); if ((err = c->assign(parts))) Test::Fail("assign failed: " + RdKafka::err2str(err)); RdKafka::TopicPartition::destroy(parts); Test::Say(tostr() << "Consuming from topic " << topic << " partition " << partition << "\n"); std::vector result = std::vector(); while (true) { RdKafka::Message *msg = c->consume(tmout_multip(1000)); switch (msg->err()) { case RdKafka::ERR__TIMED_OUT: delete msg; continue; case RdKafka::ERR__PARTITION_EOF: delete msg; break; case RdKafka::ERR_NO_ERROR: result.push_back(msg); continue; default: Test::Fail("Error consuming from topic " + topic + ": " + msg->errstr()); delete msg; break; } break; } Test::Say("Read all messages from topic: " + topic + "\n"); TestEventCb::should_capture_stats = true; /* rely on the test timeout to prevent an infinite loop in * the (unlikely) event that the statistics callback isn't * called. */ while (!TestEventCb::has_captured_stats) { RdKafka::Message *msg = c->consume(tmout_multip(500)); delete msg; } Test::Say("Captured consumer statistics event\n"); return result; } static void delete_messages(std::vector &messages) { for (size_t i = 0; i < messages.size(); ++i) delete messages[i]; } static std::string get_bootstrap_servers() { RdKafka::Conf *conf; std::string bootstrap_servers; Test::conf_init(&conf, NULL, 40); conf->get("bootstrap.servers", bootstrap_servers); delete conf; return bootstrap_servers; } static RdKafka::KafkaConsumer *create_consumer(std::string &topic_name, const char *isolation_level) { RdKafka::Conf *conf; std::string errstr; Test::conf_init(&conf, NULL, 40); Test::conf_set(conf, "group.id", topic_name); Test::conf_set(conf, "enable.auto.commit", "false"); Test::conf_set(conf, "auto.offset.reset", "earliest"); Test::conf_set(conf, "enable.partition.eof", "true"); Test::conf_set(conf, "isolation.level", isolation_level); Test::conf_set(conf, "statistics.interval.ms", "1000"); conf->set("event_cb", &ex_event_cb, errstr); TestEventCb::should_capture_stats = false; TestEventCb::has_captured_stats = false; RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr); if (!c) Test::Fail("Failed to create KafkaConsumer: " + errstr); delete conf; return c; } static std::vector csv_split(const std::string &input) { std::stringstream ss(input); std::vector res; while (ss.good()) { std::string substr; std::getline(ss, substr, ','); /* Trim */ substr.erase(0, substr.find_first_not_of(' ')); substr.erase(substr.find_last_not_of(' ') + 1); res.push_back(substr); } return res; } enum TransactionType { TransactionType_None, TransactionType_BeginAbort, TransactionType_BeginCommit, TransactionType_BeginOpen, TransactionType_ContinueAbort, TransactionType_ContinueCommit, TransactionType_ContinueOpen }; static TransactionType TransactionType_from_string(std::string str) { #define _CHKRET(NAME) \ if (!str.compare(#NAME)) \ return TransactionType_##NAME _CHKRET(None); _CHKRET(BeginAbort); _CHKRET(BeginCommit); _CHKRET(BeginOpen); _CHKRET(ContinueAbort); _CHKRET(ContinueCommit); _CHKRET(ContinueOpen); Test::Fail("Unknown TransactionType: " + str); return TransactionType_None; /* NOTREACHED */ } static void txn_producer_makeTestMessages(RdKafka::Producer *producer, const std::string &topic, const std::string &testidstr, int partition, int idStart, int msgcount, TransactionType tt, bool do_flush) { RdKafka::Error *error; if (tt != TransactionType_None && tt != TransactionType_ContinueOpen && tt != TransactionType_ContinueCommit && tt != TransactionType_ContinueAbort) { error = producer->begin_transaction(); if (error) { Test::Fail("begin_transaction() failed: " + error->str()); delete error; } } for (int i = 0; i < msgcount; i++) { char key[] = {(char)((i + idStart) & 0xff)}; char payload[] = {0x10, 0x20, 0x30, 0x40}; RdKafka::ErrorCode err; err = producer->produce(topic, partition, producer->RK_MSG_COPY, payload, sizeof(payload), key, sizeof(key), 0, NULL); if (err) Test::Fail("produce() failed: " + RdKafka::err2str(err)); } if (do_flush) producer->flush(-1); switch (tt) { case TransactionType_BeginAbort: case TransactionType_ContinueAbort: error = producer->abort_transaction(30 * 1000); if (error) { Test::Fail("abort_transaction() failed: " + error->str()); delete error; } break; case TransactionType_BeginCommit: case TransactionType_ContinueCommit: error = producer->commit_transaction(30 * 1000); if (error) { Test::Fail("commit_transaction() failed: " + error->str()); delete error; } break; default: break; } } class txnDeliveryReportCb : public RdKafka::DeliveryReportCb { public: void dr_cb(RdKafka::Message &msg) { switch (msg.err()) { case RdKafka::ERR__PURGE_QUEUE: case RdKafka::ERR__PURGE_INFLIGHT: /* These are expected when transactions are aborted */ break; case RdKafka::ERR_NO_ERROR: break; default: Test::Fail("Delivery failed: " + msg.errstr()); break; } } }; /** * @brief Transactional producer, performing the commands in \p cmds. * This is the librdkafka counterpart of * java/TransactionProducerCli.java */ static void txn_producer(const std::string &brokers, const std::string &topic, const std::string &testidstr, const char **cmds, size_t cmd_cnt) { RdKafka::Conf *conf; txnDeliveryReportCb txn_dr; Test::conf_init(&conf, NULL, 0); Test::conf_set(conf, "bootstrap.servers", brokers); std::map producers; for (size_t i = 0; i < cmd_cnt; i++) { std::string cmdstr = std::string(cmds[i]); Test::Say(_C_CLR "rdkafka txn producer command: " + cmdstr + "\n"); std::vector cmd = csv_split(cmdstr); if (!cmd[0].compare("sleep")) { rd_usleep(atoi(cmd[1].c_str()) * 1000, NULL); } else if (!cmd[0].compare("exit")) { break; /* We can't really simulate the Java exit behaviour * from in-process. */ } else if (cmd[0].find("producer") == 0) { TransactionType txntype = TransactionType_from_string(cmd[4]); std::map::iterator it = producers.find(cmd[0]); RdKafka::Producer *producer; if (it == producers.end()) { /* Create producer if it doesn't exist */ std::string errstr; Test::Say(tostr() << "Creating producer " << cmd[0] << " with transactiontype " << txntype << " '" << cmd[4] << "'\n"); /* Config */ Test::conf_set(conf, "enable.idempotence", "true"); if (txntype != TransactionType_None) Test::conf_set(conf, "transactional.id", "test-transactional-id-c-" + testidstr + "-" + cmd[0]); else Test::conf_set(conf, "transactional.id", ""); Test::conf_set(conf, "linger.ms", "5"); /* ensure batching */ conf->set("dr_cb", &txn_dr, errstr); /* Create producer */ producer = RdKafka::Producer::create(conf, errstr); if (!producer) Test::Fail("Failed to create producer " + cmd[0] + ": " + errstr); /* Init transactions if producer is transactional */ if (txntype != TransactionType_None) { RdKafka::Error *error = producer->init_transactions(20 * 1000); if (error) { Test::Fail("init_transactions() failed: " + error->str()); delete error; } } producers[cmd[0]] = producer; } else { producer = it->second; } txn_producer_makeTestMessages( producer, /* producer */ topic, /* topic */ testidstr, /* testid */ atoi(cmd[1].c_str()), /* partition */ (int)strtol(cmd[2].c_str(), NULL, 0), /* idStart */ atoi(cmd[3].c_str()), /* msg count */ txntype, /* TransactionType */ !cmd[5].compare("DoFlush") /* Flush */); } else { Test::Fail("Unknown command: " + cmd[0]); } } delete conf; for (std::map::iterator it = producers.begin(); it != producers.end(); it++) delete it->second; } static void do_test_consumer_txn_test(bool use_java_producer) { std::string errstr; std::string topic_name; RdKafka::KafkaConsumer *c; std::vector msgs; std::string testidstr = test_str_id_generate_tmp(); std::string bootstrap_servers = get_bootstrap_servers(); Test::Say(tostr() << _C_BLU "[ Consumer transaction tests using " << (use_java_producer ? "java" : "librdkafka") << " producer with testid " << testidstr << "]\n" _C_CLR); #define run_producer(CMDS...) \ do { \ const char *_cmds[] = {CMDS}; \ size_t _cmd_cnt = sizeof(_cmds) / sizeof(*_cmds); \ if (use_java_producer) \ execute_java_produce_cli(bootstrap_servers, topic_name, testidstr, \ _cmds, _cmd_cnt); \ else \ txn_producer(bootstrap_servers, topic_name, testidstr, _cmds, _cmd_cnt); \ } while (0) if (test_quick) { Test::Say("Skipping consumer_txn tests 0->4 due to quick mode\n"); goto test5; } Test::Say(_C_BLU "Test 0 - basic commit + abort\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-0", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); run_producer("producer1, -1, 0x0, 5, BeginCommit, DoFlush", "producer1, -1, 0x10, 5, BeginAbort, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 5, "Consumed unexpected number of messages. " "Expected 5, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0 == msgs[0]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[4]->key_len() >= 1 && 4 == msgs[4]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); c->close(); delete c; #define expect_msgcnt(msgcnt) \ TEST_ASSERT(msgs.size() == msgcnt, "Expected %d messages, got %d", \ (int)msgs.size(), msgcnt) #define expect_key(msgidx, value) \ do { \ TEST_ASSERT(msgs.size() > msgidx, \ "Expected at least %d message(s), only got %d", msgidx + 1, \ (int)msgs.size()); \ TEST_ASSERT(msgs[msgidx]->key_len() == 1, \ "Expected msg #%d key to be of size 1, not %d\n", msgidx, \ (int)msgs[msgidx]->key_len()); \ TEST_ASSERT(value == (int)msgs[msgidx]->key()->c_str()[0], \ "Expected msg #%d key 0x%x, not 0x%x", msgidx, value, \ (int)msgs[msgidx]->key()->c_str()[0]); \ } while (0) c = create_consumer(topic_name, "READ_UNCOMMITTED"); msgs = consume_messages(c, topic_name, 0); expect_msgcnt(10); expect_key(0, 0x0); expect_key(4, 0x4); expect_key(5, 0x10); expect_key(9, 0x14); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 0.1\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-0.1", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); run_producer("producer1, -1, 0x0, 5, BeginCommit, DontFlush", "producer1, -1, 0x10, 5, BeginAbort, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 5, "Consumed unexpected number of messages. " "Expected 5, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0 == msgs[0]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[4]->key_len() >= 1 && 4 == msgs[4]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); c->close(); delete c; c = create_consumer(topic_name, "READ_UNCOMMITTED"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 10, "Consumed unexpected number of messages. " "Expected 10, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0 == msgs[0]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[4]->key_len() >= 1 && 4 == msgs[4]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x10 == msgs[5]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x14 == msgs[9]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 0.2\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-0.2", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); run_producer("producer1, -1, 0x10, 5, BeginAbort, DoFlush", "producer1, -1, 0x30, 5, BeginCommit, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 5, "Consumed unexpected number of messages. " "Expected 5, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x30 == msgs[0]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x34 == msgs[4]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); c->close(); delete c; c = create_consumer(topic_name, "READ_UNCOMMITTED"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 10, "Consumed unexpected number of messages. " "Expected 10, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x10 == msgs[0]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x14 == msgs[4]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x30 == msgs[5]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x34 == msgs[9]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 1 - mixed with non-transactional.\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-1", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); TestEventCb::topic = topic_name; run_producer("producer3, -1, 0x10, 5, None, DoFlush", "producer1, -1, 0x50, 5, BeginCommit, DoFlush", "producer1, -1, 0x80, 5, BeginAbort, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(TestEventCb::partition_0_ls_offset != -1 && TestEventCb::partition_0_ls_offset == TestEventCb::partition_0_hi_offset, "Expected hi_offset to equal ls_offset but " "got hi_offset: %" PRId64 ", ls_offset: %" PRId64, TestEventCb::partition_0_hi_offset, TestEventCb::partition_0_ls_offset); TEST_ASSERT(msgs.size() == 10, "Consumed unexpected number of messages. " "Expected 10, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x10 == msgs[0]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x14 == msgs[4]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x50 == msgs[5]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x54 == msgs[9]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 1.1\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-1.1", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); run_producer("producer1, -1, 0x30, 5, BeginAbort, DoFlush", "producer3, -1, 0x40, 5, None, DoFlush", "producer1, -1, 0x60, 5, BeginCommit, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 10, "Consumed unexpected number of messages. " "Expected 10, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x40 == msgs[0]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x44 == msgs[4]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x60 == msgs[5]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x64 == msgs[9]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 1.2\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-1.2", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); run_producer("producer1, -1, 0x10, 5, BeginCommit, DoFlush", "producer1, -1, 0x20, 5, BeginAbort, DoFlush", "producer3, -1, 0x30, 5, None, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 10, "Consumed unexpected number of messages. " "Expected 10, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x10 == msgs[0]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x14 == msgs[4]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x30 == msgs[5]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x34 == msgs[9]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 2 - rapid abort / committing.\n" _C_CLR); // note: aborted records never seem to make it to the broker when not flushed. topic_name = Test::mk_topic_name("0098-consumer_txn-2", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); run_producer("producer1, -1, 0x10, 1, BeginAbort, DontFlush", "producer1, -1, 0x20, 1, BeginCommit, DontFlush", "producer1, -1, 0x30, 1, BeginAbort, DontFlush", "producer1, -1, 0x40, 1, BeginCommit, DontFlush", "producer1, -1, 0x50, 1, BeginAbort, DontFlush", "producer1, -1, 0x60, 1, BeginCommit, DontFlush", "producer1, -1, 0x70, 1, BeginAbort, DontFlush", "producer1, -1, 0x80, 1, BeginCommit, DontFlush", "producer1, -1, 0x90, 1, BeginAbort, DontFlush", "producer1, -1, 0xa0, 1, BeginCommit, DoFlush", "producer3, -1, 0xb0, 1, None, DontFlush", "producer3, -1, 0xc0, 1, None, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 7, "Consumed unexpected number of messages. " "Expected 7, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x20 == (unsigned char)msgs[0]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[1]->key_len() >= 1 && 0x40 == (unsigned char)msgs[1]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[2]->key_len() >= 1 && 0x60 == (unsigned char)msgs[2]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[3]->key_len() >= 1 && 0x80 == (unsigned char)msgs[3]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[4]->key_len() >= 1 && 0xa0 == (unsigned char)msgs[4]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[5]->key_len() >= 1 && 0xb0 == (unsigned char)msgs[5]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[6]->key_len() >= 1 && 0xc0 == (unsigned char)msgs[6]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 2.1\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-2.1", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); run_producer("producer1, -1, 0x10, 1, BeginAbort, DoFlush", "producer1, -1, 0x20, 1, BeginCommit, DoFlush", "producer1, -1, 0x30, 1, BeginAbort, DoFlush", "producer1, -1, 0x40, 1, BeginCommit, DoFlush", "producer1, -1, 0x50, 1, BeginAbort, DoFlush", "producer1, -1, 0x60, 1, BeginCommit, DoFlush", "producer1, -1, 0x70, 1, BeginAbort, DoFlush", "producer1, -1, 0x80, 1, BeginCommit, DoFlush", "producer1, -1, 0x90, 1, BeginAbort, DoFlush", "producer1, -1, 0xa0, 1, BeginCommit, DoFlush", "producer3, -1, 0xb0, 1, None, DoFlush", "producer3, -1, 0xc0, 1, None, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 7, "Consumed unexpected number of messages. " "Expected 7, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x20 == (unsigned char)msgs[0]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[1]->key_len() >= 1 && 0x40 == (unsigned char)msgs[1]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[2]->key_len() >= 1 && 0x60 == (unsigned char)msgs[2]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[3]->key_len() >= 1 && 0x80 == (unsigned char)msgs[3]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[4]->key_len() >= 1 && 0xa0 == (unsigned char)msgs[4]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[5]->key_len() >= 1 && 0xb0 == (unsigned char)msgs[5]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[6]->key_len() >= 1 && 0xc0 == (unsigned char)msgs[6]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); c->close(); delete c; c = create_consumer(topic_name, "READ_UNCOMMITTED"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 12, "Consumed unexpected number of messages. " "Expected 12, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x10 == (unsigned char)msgs[0]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[1]->key_len() >= 1 && 0x20 == (unsigned char)msgs[1]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[2]->key_len() >= 1 && 0x30 == (unsigned char)msgs[2]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[3]->key_len() >= 1 && 0x40 == (unsigned char)msgs[3]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x50 == (unsigned char)msgs[4]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x60 == (unsigned char)msgs[5]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[6]->key_len() >= 1 && 0x70 == (unsigned char)msgs[6]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 3 - cross partition (simple).\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-3", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 2, 3); run_producer("producer1, 0, 0x10, 3, BeginOpen, DoFlush", "producer1, 1, 0x20, 3, ContinueOpen, DoFlush", "producer1, 0, 0x30, 3, ContinueCommit, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 6, "Consumed unexpected number of messages. " "Expected 6, got: %d", (int)msgs.size()); delete_messages(msgs); msgs = consume_messages(c, topic_name, 1); TEST_ASSERT(msgs.size() == 3, "Consumed unexpected number of messages. " "Expected 3, got: %d", (int)msgs.size()); delete_messages(msgs); c->close(); delete c; c = create_consumer(topic_name, "READ_UNCOMMITTED"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 6, "Consumed unexpected number of messages. " "Expected 6, got: %d", (int)msgs.size()); delete_messages(msgs); msgs = consume_messages(c, topic_name, 1); TEST_ASSERT(msgs.size() == 3, "Consumed unexpected number of messages. " "Expected 3, got: %d", (int)msgs.size()); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 3.1\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-3.1", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 2, 3); run_producer("producer1, 0, 0x55, 1, BeginCommit, DoFlush", "producer1, 0, 0x10, 3, BeginOpen, DoFlush", "producer1, 1, 0x20, 3, ContinueOpen, DoFlush", "producer1, 0, 0x30, 3, ContinueAbort, DoFlush", "producer3, 0, 0x00, 1, None, DoFlush", "producer1, 1, 0x44, 1, BeginCommit, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 2, "Consumed unexpected number of messages. " "Expected 2, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x55 == (unsigned char)msgs[0]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[1]->key_len() >= 1 && 0x00 == (unsigned char)msgs[1]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); msgs = consume_messages(c, topic_name, 1); TEST_ASSERT(msgs.size() == 1, "Consumed unexpected number of messages. " "Expected 1, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x44 == (unsigned char)msgs[0]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 4 - simultaneous transactions (simple).\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-4", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); run_producer("producer3, 0, 0x10, 1, None, DoFlush", "producer1, 0, 0x20, 3, BeginOpen, DoFlush", "producer2, 0, 0x30, 3, BeginOpen, DoFlush", "producer1, 0, 0x40, 3, ContinueCommit, DoFlush", "producer2, 0, 0x50, 3, ContinueAbort, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 7, "Consumed unexpected number of messages. " "Expected 7, got: %d", (int)msgs.size()); delete_messages(msgs); c->close(); delete c; c = create_consumer(topic_name, "READ_UNCOMMITTED"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 13, "Consumed unexpected number of messages. " "Expected 13, got: %d", (int)msgs.size()); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 4.1\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-4.1", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); run_producer("producer3, 0, 0x10, 1, None, DoFlush", "producer1, 0, 0x20, 3, BeginOpen, DoFlush", "producer2, 0, 0x30, 3, BeginOpen, DoFlush", "producer1, 0, 0x40, 3, ContinueAbort, DoFlush", "producer2, 0, 0x50, 3, ContinueCommit, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 7, "Consumed unexpected number of messages. " "Expected 7, got: %d", (int)msgs.size()); delete_messages(msgs); c->close(); delete c; c = create_consumer(topic_name, "READ_UNCOMMITTED"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 13, "Consumed unexpected number of messages. " "Expected 13, got: %d", (int)msgs.size()); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 4.2\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-4.2", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); run_producer("producer3, 0, 0x10, 1, None, DoFlush", "producer1, 0, 0x20, 3, BeginOpen, DoFlush", "producer2, 0, 0x30, 3, BeginOpen, DoFlush", "producer1, 0, 0x40, 3, ContinueCommit, DoFlush", "producer2, 0, 0x50, 3, ContinueCommit, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 13, "Consumed unexpected number of messages. " "Expected 7, got: %d", (int)msgs.size()); delete_messages(msgs); c->close(); delete c; c = create_consumer(topic_name, "READ_UNCOMMITTED"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 13, "Consumed unexpected number of messages. " "Expected 13, got: %d", (int)msgs.size()); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 4.3\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-4.3", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); run_producer("producer3, 0, 0x10, 1, None, DoFlush", "producer1, 0, 0x20, 3, BeginOpen, DoFlush", "producer2, 0, 0x30, 3, BeginOpen, DoFlush", "producer1, 0, 0x40, 3, ContinueAbort, DoFlush", "producer2, 0, 0x50, 3, ContinueAbort, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 1, "Consumed unexpected number of messages. " "Expected 7, got: %d", (int)msgs.size()); delete_messages(msgs); c->close(); delete c; c = create_consumer(topic_name, "READ_UNCOMMITTED"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 13, "Consumed unexpected number of messages. " "Expected 13, got: %d", (int)msgs.size()); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 5 - split transaction across message sets.\n" _C_CLR); test5: topic_name = Test::mk_topic_name("0098-consumer_txn-5", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); run_producer("producer1, 0, 0x10, 2, BeginOpen, DontFlush", "sleep,200", "producer1, 0, 0x20, 2, ContinueAbort, DontFlush", "producer1, 0, 0x30, 2, BeginOpen, DontFlush", "sleep,200", "producer1, 0, 0x40, 2, ContinueCommit, DontFlush", "producer1, 0, 0x50, 2, BeginOpen, DontFlush", "sleep,200", "producer1, 0, 0x60, 2, ContinueAbort, DontFlush", "producer1, 0, 0xa0, 2, BeginOpen, DontFlush", "sleep,200", "producer1, 0, 0xb0, 2, ContinueCommit, DontFlush", "producer3, 0, 0x70, 1, None, DoFlush"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 9, "Consumed unexpected number of messages. " "Expected 9, got: %d", (int)msgs.size()); TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x30 == (unsigned char)msgs[0]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[1]->key_len() >= 1 && 0x31 == (unsigned char)msgs[1]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[2]->key_len() >= 1 && 0x40 == (unsigned char)msgs[2]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[3]->key_len() >= 1 && 0x41 == (unsigned char)msgs[3]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[4]->key_len() >= 1 && 0xa0 == (unsigned char)msgs[4]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[5]->key_len() >= 1 && 0xa1 == (unsigned char)msgs[5]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[6]->key_len() >= 1 && 0xb0 == (unsigned char)msgs[6]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[7]->key_len() >= 1 && 0xb1 == (unsigned char)msgs[7]->key()->c_str()[0], "Unexpected key"); TEST_ASSERT(msgs[8]->key_len() >= 1 && 0x70 == (unsigned char)msgs[8]->key()->c_str()[0], "Unexpected key"); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; Test::Say(_C_BLU "Test 6 - transaction left open\n" _C_CLR); topic_name = Test::mk_topic_name("0098-consumer_txn-0", 1); c = create_consumer(topic_name, "READ_COMMITTED"); Test::create_topic(c, topic_name.c_str(), 1, 3); TestEventCb::topic = topic_name; run_producer("producer3, 0, 0x10, 1, None, DoFlush", "producer1, 0, 0x20, 3, BeginOpen, DoFlush", // prevent abort control message from being written. "exit,0"); msgs = consume_messages(c, topic_name, 0); TEST_ASSERT(msgs.size() == 1, "Consumed unexpected number of messages. " "Expected 1, got: %d", (int)msgs.size()); TEST_ASSERT(TestEventCb::partition_0_ls_offset + 3 == TestEventCb::partition_0_hi_offset, "Expected hi_offset to be 3 greater than ls_offset " "but got hi_offset: %" PRId64 ", ls_offset: %" PRId64, TestEventCb::partition_0_hi_offset, TestEventCb::partition_0_ls_offset); delete_messages(msgs); Test::delete_topic(c, topic_name.c_str()); c->close(); delete c; } #endif extern "C" { int main_0098_consumer_txn(int argc, char **argv) { if (test_needs_auth()) { Test::Skip( "Authentication or security configuration " "required on client: not supported in " "Java transactional producer: skipping tests\n"); return 0; } #if WITH_RAPIDJSON do_test_consumer_txn_test(true /* with java producer */); do_test_consumer_txn_test(false /* with librdkafka producer */); #else Test::Skip("RapidJSON >=1.1.0 not available\n"); #endif return 0; } }