/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /** * Apache Kafka consumer & producer performance tester * using the Kafka driver from librdkafka * (https://github.com/edenhill/librdkafka) */ #define _GNU_SOURCE /* for strndup() */ #include #include #include #include /* Typical include path would be , but this program * is built from within the librdkafka source tree and thus differs. */ #include "rdkafka.h" /* for Kafka driver */ /* Do not include these defines from your program, they will not be * provided by librdkafka. */ #include "rd.h" #include "rdtime.h" static int run = 1; static int forever = 1; static int dispintvl = 1000; static int do_seq = 0; static int exit_after = 0; static int exit_eof = 0; static FILE *stats_fp; static int dr_disp_div; static int verbosity = 1; static int latency_mode = 0; static int report_offset = 0; static FILE *latency_fp = NULL; static int msgcnt = -1; static int incremental_mode = 0; static int partition_cnt = 0; static int eof_cnt = 0; static int with_dr = 1; static void stop (int sig) { if (!run) exit(0); run = 0; } static long int msgs_wait_cnt = 0; static long int msgs_wait_produce_cnt = 0; static rd_ts_t t_end; static rd_kafka_t *global_rk; struct avg { int64_t val; int cnt; uint64_t ts_start; }; static struct { rd_ts_t t_start; rd_ts_t t_end; rd_ts_t t_end_send; uint64_t msgs; uint64_t msgs_last; uint64_t msgs_dr_ok; uint64_t msgs_dr_err; uint64_t bytes_dr_ok; uint64_t bytes; uint64_t bytes_last; uint64_t tx; uint64_t tx_err; uint64_t avg_rtt; uint64_t offset; rd_ts_t t_fetch_latency; rd_ts_t t_last; rd_ts_t t_enobufs_last; rd_ts_t t_total; rd_ts_t latency_last; rd_ts_t latency_lo; rd_ts_t latency_hi; rd_ts_t latency_sum; int latency_cnt; int64_t last_offset; } cnt = {}; /* Returns wall clock time in microseconds */ uint64_t wall_clock (void) { struct timeval tv; gettimeofday(&tv, NULL); return ((uint64_t)tv.tv_sec * 1000000LLU) + ((uint64_t)tv.tv_usec); } static void err_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) { printf("%% ERROR CALLBACK: %s: %s: %s\n", rd_kafka_name(rk), rd_kafka_err2str(err), reason); } static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque) { printf("%% THROTTLED %dms by %s (%"PRId32")\n", throttle_time_ms, broker_name, broker_id); } static void msg_delivered (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { static rd_ts_t last; rd_ts_t now = rd_clock(); static int msgs; msgs++; msgs_wait_cnt--; if (rkmessage->err) cnt.msgs_dr_err++; else { cnt.msgs_dr_ok++; cnt.bytes_dr_ok += rkmessage->len; } if ((rkmessage->err && (cnt.msgs_dr_err < 50 || !(cnt.msgs_dr_err % (dispintvl / 1000)))) || !last || msgs_wait_cnt < 5 || !(msgs_wait_cnt % dr_disp_div) || (int)(now - last) >= dispintvl * 1000 || verbosity >= 3) { if (rkmessage->err && verbosity >= 2) printf("%% Message delivery failed: %s [%"PRId32"]: " "%s (%li remain)\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rd_kafka_err2str(rkmessage->err), msgs_wait_cnt); else if (verbosity > 2) printf("%% Message delivered (offset %"PRId64"): " "%li remain\n", rkmessage->offset, msgs_wait_cnt); if (verbosity >= 3 && do_seq) printf(" --> \"%.*s\"\n", (int)rkmessage->len, (const char *)rkmessage->payload); last = now; } if (report_offset) cnt.last_offset = rkmessage->offset; if (msgs_wait_produce_cnt == 0 && msgs_wait_cnt == 0 && !forever) { if (verbosity >= 2) printf("All messages delivered!\n"); t_end = rd_clock(); run = 0; } if (exit_after && exit_after <= msgs) { printf("%% Hard exit after %i messages, as requested\n", exit_after); exit(0); } } static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque) { if (rkmessage->err) { if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { cnt.offset = rkmessage->offset; if (verbosity >= 1) printf("%% Consumer reached end of " "%s [%"PRId32"] " "message queue at offset %"PRId64"\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset); if (exit_eof && ++eof_cnt == partition_cnt) run = 0; return; } printf("%% Consume error for topic \"%s\" [%"PRId32"] " "offset %"PRId64": %s\n", rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt):"", rkmessage->partition, rkmessage->offset, rd_kafka_message_errstr(rkmessage)); if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION || rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) run = 0; cnt.msgs_dr_err++; return; } /* Start measuring from first message received */ if (!cnt.t_start) cnt.t_start = cnt.t_last = rd_clock(); cnt.offset = rkmessage->offset; cnt.msgs++; cnt.bytes += rkmessage->len; if (verbosity >= 3 || (verbosity >= 2 && !(cnt.msgs % 1000000))) printf("@%"PRId64": %.*s: %.*s\n", rkmessage->offset, (int)rkmessage->key_len, (char *)rkmessage->key, (int)rkmessage->len, (char *)rkmessage->payload); if (latency_mode) { int64_t remote_ts, ts; if (rkmessage->len > 8 && !memcmp(rkmessage->payload, "LATENCY:", 8) && sscanf(rkmessage->payload, "LATENCY:%"SCNd64, &remote_ts) == 1) { ts = wall_clock() - remote_ts; if (ts > 0 && ts < (1000000 * 60 * 5)) { if (ts > cnt.latency_hi) cnt.latency_hi = ts; if (!cnt.latency_lo || ts < cnt.latency_lo) cnt.latency_lo = ts; cnt.latency_last = ts; cnt.latency_cnt++; cnt.latency_sum += ts; if (latency_fp) fprintf(latency_fp, "%"PRIu64"\n", ts); } else { if (verbosity >= 1) printf("Received latency timestamp is too far off: %"PRId64"us (message offset %"PRId64"): ignored\n", ts, rkmessage->offset); } } else if (verbosity > 1) printf("not a LATENCY payload: %.*s\n", (int)rkmessage->len, (char *)rkmessage->payload); } if (msgcnt != -1 && (int)cnt.msgs >= msgcnt) run = 0; } static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque) { switch (err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: fprintf(stderr, "%% Group rebalanced: %d partition(s) assigned\n", partitions->cnt); eof_cnt = 0; partition_cnt = partitions->cnt; rd_kafka_assign(rk, partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: fprintf(stderr, "%% Group rebalanced: %d partition(s) revoked\n", partitions->cnt); eof_cnt = 0; partition_cnt = 0; rd_kafka_assign(rk, NULL); break; default: break; } } /** * Find and extract single value from a two-level search. * First find 'field1', then find 'field2' and extract its value. * Returns 0 on miss else the value. */ static uint64_t json_parse_fields (const char *json, const char **end, const char *field1, const char *field2) { const char *t = json; const char *t2; int len1 = strlen(field1); int len2 = strlen(field2); while ((t2 = strstr(t, field1))) { uint64_t v; t = t2; t += len1; /* Find field */ if (!(t2 = strstr(t, field2))) continue; t2 += len2; while (isspace((int)*t2)) t2++; v = strtoull(t2, (char **)&t, 10); if (t2 == t) continue; *end = t; return v; } *end = t + strlen(t); return 0; } /** * Parse various values from rdkafka stats */ static void json_parse_stats (const char *json) { const char *t; const int max_avgs = 100; /* max number of brokers to scan for rtt */ uint64_t avg_rtt[max_avgs+1]; int avg_rtt_i = 0; /* Store totals at end of array */ avg_rtt[max_avgs] = 0; /* Extract all broker RTTs */ t = json; while (avg_rtt_i < max_avgs && *t) { avg_rtt[avg_rtt_i] = json_parse_fields(t, &t, "\"rtt\":", "\"avg\":"); /* Skip low RTT values, means no messages are passing */ if (avg_rtt[avg_rtt_i] < 100 /*0.1ms*/) continue; avg_rtt[max_avgs] += avg_rtt[avg_rtt_i]; avg_rtt_i++; } if (avg_rtt_i > 0) avg_rtt[max_avgs] /= avg_rtt_i; cnt.avg_rtt = avg_rtt[max_avgs]; } static int stats_cb (rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { /* Extract values for our own stats */ json_parse_stats(json); if (stats_fp) fprintf(stats_fp, "%s\n", json); return 0; } #define _OTYPE_TAB 0x1 /* tabular format */ #define _OTYPE_SUMMARY 0x2 /* summary format */ #define _OTYPE_FORCE 0x4 /* force output regardless of interval timing */ static void print_stats (rd_kafka_t *rk, int mode, int otype, const char *compression) { rd_ts_t now = rd_clock(); rd_ts_t t_total; static int rows_written = 0; int print_header; char extra[512]; int extra_of = 0; *extra = '\0'; #define EXTRA_PRINTF(fmt...) do { \ if (extra_of < sizeof(extra)) \ extra_of += snprintf(extra+extra_of, \ sizeof(extra)-extra_of, fmt); \ } while (0) if (!(otype & _OTYPE_FORCE) && (((otype & _OTYPE_SUMMARY) && verbosity == 0) || cnt.t_last + dispintvl > now)) return; print_header = !rows_written ||(verbosity > 0 && !(rows_written % 20)); if (cnt.t_end_send) t_total = cnt.t_end_send - cnt.t_start; else if (cnt.t_end) t_total = cnt.t_end - cnt.t_start; else if (cnt.t_start) t_total = now - cnt.t_start; else t_total = 1; if (mode == 'P') { if (otype & _OTYPE_TAB) { #define ROW_START() do {} while (0) #define COL_HDR(NAME) printf("| %10.10s ", (NAME)) #define COL_PR64(NAME,VAL) printf("| %10"PRIu64" ", (VAL)) #define COL_PRF(NAME,VAL) printf("| %10.2f ", (VAL)) #define ROW_END() do { \ printf("\n"); \ rows_written++; \ } while (0) if (print_header) { /* First time, print header */ ROW_START(); COL_HDR("elapsed"); COL_HDR("msgs"); COL_HDR("bytes"); COL_HDR("rtt"); COL_HDR("dr"); COL_HDR("dr_m/s"); COL_HDR("dr_MB/s"); COL_HDR("dr_err"); COL_HDR("tx_err"); COL_HDR("outq"); if (report_offset) COL_HDR("offset"); ROW_END(); } ROW_START(); COL_PR64("elapsed", t_total / 1000); COL_PR64("msgs", cnt.msgs); COL_PR64("bytes", cnt.bytes); COL_PR64("rtt", cnt.avg_rtt / 1000); COL_PR64("dr", cnt.msgs_dr_ok); COL_PR64("dr_m/s", ((cnt.msgs_dr_ok * 1000000) / t_total)); COL_PRF("dr_MB/s", (float)((cnt.bytes_dr_ok) / (float)t_total)); COL_PR64("dr_err", cnt.msgs_dr_err); COL_PR64("tx_err", cnt.tx_err); COL_PR64("outq", rk ? (uint64_t)rd_kafka_outq_len(rk) : 0); if (report_offset) COL_PR64("offset", (uint64_t)cnt.last_offset); ROW_END(); } if (otype & _OTYPE_SUMMARY) { printf("%% %"PRIu64" messages produced " "(%"PRIu64" bytes), " "%"PRIu64" delivered " "(offset %"PRId64", %"PRIu64" failed) " "in %"PRIu64"ms: %"PRIu64" msgs/s and " "%.02f Mb/s, " "%"PRIu64" produce failures, %i in queue, " "%s compression\n", cnt.msgs, cnt.bytes, cnt.msgs_dr_ok, cnt.last_offset, cnt.msgs_dr_err, t_total / 1000, ((cnt.msgs_dr_ok * 1000000) / t_total), (float)((cnt.bytes_dr_ok) / (float)t_total), cnt.tx_err, rk ? rd_kafka_outq_len(rk) : 0, compression); } } else { float latency_avg = 0.0f; if (latency_mode && cnt.latency_cnt) latency_avg = (double)cnt.latency_sum / (float)cnt.latency_cnt; if (otype & _OTYPE_TAB) { if (print_header) { /* First time, print header */ ROW_START(); COL_HDR("elapsed"); COL_HDR("msgs"); COL_HDR("bytes"); COL_HDR("rtt"); COL_HDR("m/s"); COL_HDR("MB/s"); COL_HDR("rx_err"); COL_HDR("offset"); if (latency_mode) { COL_HDR("lat_curr"); COL_HDR("lat_avg"); COL_HDR("lat_lo"); COL_HDR("lat_hi"); } ROW_END(); } ROW_START(); COL_PR64("elapsed", t_total / 1000); COL_PR64("msgs", cnt.msgs); COL_PR64("bytes", cnt.bytes); COL_PR64("rtt", cnt.avg_rtt / 1000); COL_PR64("m/s", ((cnt.msgs * 1000000) / t_total)); COL_PRF("MB/s", (float)((cnt.bytes) / (float)t_total)); COL_PR64("rx_err", cnt.msgs_dr_err); COL_PR64("offset", cnt.offset); if (latency_mode) { COL_PRF("lat_curr", cnt.latency_last / 1000.0f); COL_PRF("lat_avg", latency_avg / 1000.0f); COL_PRF("lat_lo", cnt.latency_lo / 1000.0f); COL_PRF("lat_hi", cnt.latency_hi / 1000.0f); } ROW_END(); } if (otype & _OTYPE_SUMMARY) { if (latency_avg >= 1.0f) extra_of += snprintf(extra+extra_of, sizeof(extra)-extra_of, ", latency " "curr/avg/lo/hi " "%.2f/%.2f/%.2f/%.2fms", cnt.latency_last / 1000.0f, latency_avg / 1000.0f, cnt.latency_lo / 1000.0f, cnt.latency_hi / 1000.0f) ; printf("%% %"PRIu64" messages (%"PRIu64" bytes) " "consumed in %"PRIu64"ms: %"PRIu64" msgs/s " "(%.02f Mb/s)" "%s\n", cnt.msgs, cnt.bytes, t_total / 1000, ((cnt.msgs * 1000000) / t_total), (float)((cnt.bytes) / (float)t_total), extra); } if (incremental_mode && now > cnt.t_last) { uint64_t i_msgs = cnt.msgs - cnt.msgs_last; uint64_t i_bytes = cnt.bytes - cnt.bytes_last; uint64_t i_time = cnt.t_last ? now - cnt.t_last : 0; printf("%% INTERVAL: %"PRIu64" messages " "(%"PRIu64" bytes) " "consumed in %"PRIu64"ms: %"PRIu64" msgs/s " "(%.02f Mb/s)" "%s\n", i_msgs, i_bytes, i_time / 1000, ((i_msgs * 1000000) / i_time), (float)((i_bytes) / (float)i_time), extra); } } cnt.t_last = now; cnt.msgs_last = cnt.msgs; cnt.bytes_last = cnt.bytes; } static void sig_usr1 (int sig) { rd_kafka_dump(stdout, global_rk); } int main (int argc, char **argv) { char *brokers = NULL; char mode = 'C'; char *topic = NULL; const char *key = NULL; int *partitions = NULL; int opt; int sendflags = 0; char *msgpattern = "librdkafka_performance testing!"; int msgsize = strlen(msgpattern); const char *debug = NULL; rd_ts_t now; char errstr[512]; uint64_t seq = 0; int seed = time(NULL); rd_kafka_t *rk; rd_kafka_topic_t *rkt; rd_kafka_conf_t *conf; rd_kafka_topic_conf_t *topic_conf; rd_kafka_queue_t *rkqu = NULL; const char *compression = "no"; int64_t start_offset = 0; int batch_size = 0; int idle = 0; const char *stats_cmd = NULL; char *stats_intvlstr = NULL; char tmp[128]; char *tmp2; int otype = _OTYPE_SUMMARY; double dtmp; int rate_sleep = 0; rd_kafka_topic_partition_list_t *topics; /* Kafka configuration */ conf = rd_kafka_conf_new(); rd_kafka_conf_set_error_cb(conf, err_cb); rd_kafka_conf_set_throttle_cb(conf, throttle_cb); /* Quick termination */ snprintf(tmp, sizeof(tmp), "%i", SIGIO); rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); /* Producer config */ rd_kafka_conf_set(conf, "queue.buffering.max.messages", "500000", NULL, 0); rd_kafka_conf_set(conf, "message.send.max.retries", "3", NULL, 0); rd_kafka_conf_set(conf, "retry.backoff.ms", "500", NULL, 0); /* Consumer config */ /* Tell rdkafka to (try to) maintain 1M messages * in its internal receive buffers. This is to avoid * application -> rdkafka -> broker per-message ping-pong * latency. * The larger the local queue, the higher the performance. * Try other values with: ... -X queued.min.messages=1000 */ rd_kafka_conf_set(conf, "queued.min.messages", "1000000", NULL, 0); rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0); /* Kafka topic configuration */ topic_conf = rd_kafka_topic_conf_new(); rd_kafka_topic_conf_set(topic_conf, "auto.offset.reset", "earliest", NULL, 0); topics = rd_kafka_topic_partition_list_new(1); while ((opt = getopt(argc, argv, "PCG:t:p:b:s:k:c:fi:MDd:m:S:x:" "R:a:z:o:X:B:eT:Y:qvIur:lA:OwN")) != -1) { switch (opt) { case 'G': if (rd_kafka_conf_set(conf, "group.id", optarg, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } /* FALLTHRU */ case 'P': case 'C': mode = opt; break; case 't': rd_kafka_topic_partition_list_add(topics, optarg, RD_KAFKA_PARTITION_UA); break; case 'p': partitions = realloc(partitions, ++partition_cnt); partitions[partition_cnt-1] = atoi(optarg); break; case 'b': brokers = optarg; break; case 's': msgsize = atoi(optarg); break; case 'k': key = optarg; break; case 'c': msgcnt = atoi(optarg); break; case 'D': sendflags |= RD_KAFKA_MSG_F_FREE; break; case 'i': dispintvl = atoi(optarg); break; case 'm': msgpattern = optarg; break; case 'S': seq = strtoull(optarg, NULL, 10); do_seq = 1; break; case 'x': exit_after = atoi(optarg); break; case 'R': seed = atoi(optarg); break; case 'a': if (rd_kafka_topic_conf_set(topic_conf, "request.required.acks", optarg, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } break; case 'B': batch_size = atoi(optarg); break; case 'z': if (rd_kafka_conf_set(conf, "compression.codec", optarg, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } compression = optarg; break; case 'o': if (!strcmp(optarg, "end")) start_offset = RD_KAFKA_OFFSET_END; else if (!strcmp(optarg, "beginning")) start_offset = RD_KAFKA_OFFSET_BEGINNING; else if (!strcmp(optarg, "stored")) start_offset = RD_KAFKA_OFFSET_STORED; else { start_offset = strtoll(optarg, NULL, 10); if (start_offset < 0) start_offset = RD_KAFKA_OFFSET_TAIL(-start_offset); } break; case 'e': exit_eof = 1; break; case 'd': debug = optarg; break; case 'X': { char *name, *val; rd_kafka_conf_res_t res; if (!strcmp(optarg, "list") || !strcmp(optarg, "help")) { rd_kafka_conf_properties_show(stdout); exit(0); } name = optarg; if (!(val = strchr(name, '='))) { fprintf(stderr, "%% Expected " "-X property=value, not %s\n", name); exit(1); } *val = '\0'; val++; res = RD_KAFKA_CONF_UNKNOWN; /* Try "topic." prefixed properties on topic * conf first, and then fall through to global if * it didnt match a topic configuration property. */ if (!strncmp(name, "topic.", strlen("topic."))) res = rd_kafka_topic_conf_set(topic_conf, name+ strlen("topic."), val, errstr, sizeof(errstr)); if (res == RD_KAFKA_CONF_UNKNOWN) res = rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)); if (res != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } } break; case 'T': stats_intvlstr = optarg; break; case 'Y': stats_cmd = optarg; break; case 'q': verbosity--; break; case 'v': verbosity++; break; case 'I': idle = 1; break; case 'u': otype = _OTYPE_TAB; verbosity--; /* remove some fluff */ break; case 'r': dtmp = strtod(optarg, &tmp2); if (tmp2 == optarg || (dtmp >= -0.001 && dtmp <= 0.001)) { fprintf(stderr, "%% Invalid rate: %s\n", optarg); exit(1); } rate_sleep = (int)(1000000.0 / dtmp); break; case 'l': latency_mode = 1; break; case 'A': if (!(latency_fp = fopen(optarg, "w"))) { fprintf(stderr, "%% Cant open %s: %s\n", optarg, strerror(errno)); exit(1); } break; case 'O': if (rd_kafka_topic_conf_set(topic_conf, "produce.offset.report", "true", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } report_offset = 1; break; case 'M': incremental_mode = 1; break; case 'N': with_dr = 0; break; default: fprintf(stderr, "Unknown option: %c\n", opt); goto usage; } } if (topics->cnt == 0 || optind != argc) { if (optind < argc) fprintf(stderr, "Unknown argument: %s\n", argv[optind]); usage: fprintf(stderr, "Usage: %s [-C|-P] -t " "[-p ] [-b ] [options..]\n" "\n" "librdkafka version %s (0x%08x)\n" "\n" " Options:\n" " -C | -P | Consumer or Producer mode\n" " -G High-level Kafka Consumer mode\n" " -t Topic to consume / produce\n" " -p Partition (defaults to random). " "Multiple partitions are allowed in -C consumer mode.\n" " -M Print consumer interval stats\n" " -b Broker address list (host[:port],..)\n" " -s Message size (producer)\n" " -k Message key (producer)\n" " -c Messages to transmit/receive\n" " -D Copy/Duplicate data buffer (producer)\n" " -i Display interval\n" " -m Message payload pattern\n" " -S Send a sequence number starting at " " as payload\n" " -R Random seed value (defaults to time)\n" " -a Required acks (producer): " "-1, 0, 1, >1\n" " -B Consume batch size (# of msgs)\n" " -z Enable compression:\n" " none|gzip|snappy\n" " -o Start offset (consumer)\n" " beginning, end, NNNNN or -NNNNN\n" " -d [facs..] Enable debugging contexts:\n" " %s\n" " -X Set arbitrary librdkafka " "configuration property\n" " Properties prefixed with \"topic.\" " "will be set on topic object.\n" " Use '-X list' to see the full list\n" " of supported properties.\n" " -T Enable statistics from librdkafka at " "specified interval (ms)\n" " -Y Pipe statistics to \n" " -I Idle: dont produce any messages\n" " -q Decrease verbosity\n" " -v Increase verbosity (default 1)\n" " -u Output stats in table format\n" " -r Producer msg/s limit\n" " -l Latency measurement.\n" " Needs two matching instances, one\n" " consumer and one producer, both\n" " running with the -l switch.\n" " -A Write per-message latency stats to " ". Requires -l\n" " -O Report produced offset (producer)\n" " -N No delivery reports (producer)\n" "\n" " In Consumer mode:\n" " consumes messages and prints thruput\n" " If -B <..> is supplied the batch consumer\n" " mode is used, else the callback mode is used.\n" "\n" " In Producer mode:\n" " writes messages of size -s <..> and prints thruput\n" "\n", argv[0], rd_kafka_version_str(), rd_kafka_version(), RD_KAFKA_DEBUG_CONTEXTS); exit(1); } dispintvl *= 1000; /* us */ if (verbosity > 1) printf("%% Using random seed %i, verbosity level %i\n", seed, verbosity); srand(seed); signal(SIGINT, stop); signal(SIGUSR1, sig_usr1); if (debug && rd_kafka_conf_set(conf, "debug", debug, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { printf("%% Debug configuration failed: %s: %s\n", errstr, debug); exit(1); } /* Always enable stats (for RTT extraction), and if user supplied * the -T option we let her take part of the stats aswell. */ rd_kafka_conf_set_stats_cb(conf, stats_cb); if (!stats_intvlstr) { /* if no user-desired stats, adjust stats interval * to the display interval. */ snprintf(tmp, sizeof(tmp), "%i", dispintvl / 1000); } if (rd_kafka_conf_set(conf, "statistics.interval.ms", stats_intvlstr ? stats_intvlstr : tmp, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } if (latency_mode) do_seq = 0; if (stats_intvlstr) { /* User enabled stats (-T) */ if (stats_cmd) { if (!(stats_fp = popen(stats_cmd, "we"))) { fprintf(stderr, "%% Failed to start stats command: " "%s: %s", stats_cmd, strerror(errno)); exit(1); } } else stats_fp = stdout; } if (msgcnt != -1) forever = 0; topic = topics->elems[0].topic; if (mode == 'P') { /* * Producer */ char *sbuf; char *pbuf; int outq; int keylen = key ? strlen(key) : 0; off_t rof = 0; size_t plen = strlen(msgpattern); int partition = partitions ? partitions[0] : RD_KAFKA_PARTITION_UA; if (latency_mode) { msgsize = strlen("LATENCY:") + strlen("18446744073709551615 ")+1; sendflags |= RD_KAFKA_MSG_F_COPY; } else if (do_seq) { int minlen = strlen("18446744073709551615 ")+1; if (msgsize < minlen) msgsize = minlen; /* Force duplication of payload */ sendflags |= RD_KAFKA_MSG_F_FREE; } sbuf = malloc(msgsize); /* Copy payload content to new buffer */ while (rof < msgsize) { size_t xlen = RD_MIN((size_t)msgsize-rof, plen); memcpy(sbuf+rof, msgpattern, xlen); rof += xlen; } if (msgcnt == -1) printf("%% Sending messages of size %i bytes\n", msgsize); else printf("%% Sending %i messages of size %i bytes\n", msgcnt, msgsize); if (with_dr) rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered); /* Create Kafka handle */ if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)))) { fprintf(stderr, "%% Failed to create Kafka producer: %s\n", errstr); exit(1); } global_rk = rk; if (debug) rd_kafka_set_log_level(rk, 7); /* Add broker(s) */ if (brokers && rd_kafka_brokers_add(rk, brokers) < 1) { fprintf(stderr, "%% No valid brokers specified\n"); exit(1); } /* Explicitly create topic to avoid per-msg lookups. */ rkt = rd_kafka_topic_new(rk, topic, topic_conf); if (rate_sleep && verbosity >= 2) fprintf(stderr, "%% Inter message rate limiter sleep %ius\n", rate_sleep); dr_disp_div = msgcnt / 50; if (dr_disp_div == 0) dr_disp_div = 10; cnt.t_start = cnt.t_last = rd_clock(); msgs_wait_produce_cnt = msgcnt; while (run && (msgcnt == -1 || (int)cnt.msgs < msgcnt)) { /* Send/Produce message. */ if (idle) { rd_kafka_poll(rk, 1000); continue; } if (latency_mode) { snprintf(sbuf, msgsize-1, "LATENCY:%"PRIu64, wall_clock()); } else if (do_seq) { snprintf(sbuf, msgsize-1, "%"PRIu64": ", seq); seq++; } if (sendflags & RD_KAFKA_MSG_F_FREE) { /* Duplicate memory */ pbuf = malloc(msgsize); memcpy(pbuf, sbuf, msgsize); } else pbuf = sbuf; if (msgsize == 0) pbuf = NULL; cnt.tx++; while (run && rd_kafka_produce(rkt, partition, sendflags, pbuf, msgsize, key, keylen, NULL) == -1) { if (errno == ESRCH) printf("%% No such partition: " "%"PRId32"\n", partition); else if ((errno != ENOBUFS && verbosity >= 1) || verbosity >= 3) printf("%% produce error: %s%s\n", rd_kafka_err2str( rd_kafka_errno2err( errno)), errno == ENOBUFS ? " (backpressure)":""); cnt.tx_err++; if (errno != ENOBUFS) { run = 0; break; } now = rd_clock(); if (verbosity >= 2 && cnt.t_enobufs_last + dispintvl <= now) { printf("%% Backpressure %i " "(tx %"PRIu64", " "txerr %"PRIu64")\n", rd_kafka_outq_len(rk), cnt.tx, cnt.tx_err); cnt.t_enobufs_last = now; } /* Poll to handle delivery reports */ rd_kafka_poll(rk, 10); print_stats(rk, mode, otype, compression); } msgs_wait_cnt++; if (msgs_wait_produce_cnt != -1) msgs_wait_produce_cnt--; cnt.msgs++; cnt.bytes += msgsize; if (rate_sleep) usleep(rate_sleep); /* Must poll to handle delivery reports */ rd_kafka_poll(rk, 0); print_stats(rk, mode, otype, compression); } forever = 0; if (verbosity >= 2) printf("%% All messages produced, " "now waiting for %li deliveries\n", msgs_wait_cnt); if (debug) rd_kafka_dump(stdout, rk); /* Wait for messages to be delivered */ while (run && rd_kafka_poll(rk, 1000) != -1) print_stats(rk, mode, otype, compression); outq = rd_kafka_outq_len(rk); if (verbosity >= 2) printf("%% %i messages in outq\n", outq); cnt.msgs -= outq; cnt.bytes -= msgsize * outq; cnt.t_end = t_end; if (cnt.tx_err > 0) printf("%% %"PRIu64" backpressures for %"PRIu64 " produce calls: %.3f%% backpressure rate\n", cnt.tx_err, cnt.tx, ((double)cnt.tx_err / (double)cnt.tx) * 100.0); if (debug) rd_kafka_dump(stdout, rk); /* Destroy topic */ rd_kafka_topic_destroy(rkt); /* Destroy the handle */ rd_kafka_destroy(rk); global_rk = rk = NULL; free(sbuf); } else if (mode == 'C') { /* * Consumer */ rd_kafka_message_t **rkmessages = NULL; size_t i = 0; /* Create Kafka handle */ if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)))) { fprintf(stderr, "%% Failed to create Kafka consumer: %s\n", errstr); exit(1); } global_rk = rk; if (debug) rd_kafka_set_log_level(rk, 7); /* Add broker(s) */ if (brokers && rd_kafka_brokers_add(rk, brokers) < 1) { fprintf(stderr, "%% No valid brokers specified\n"); exit(1); } /* Create topic to consume from */ rkt = rd_kafka_topic_new(rk, topic, topic_conf); /* Batch consumer */ if (batch_size) rkmessages = malloc(sizeof(*rkmessages) * batch_size); /* Start consuming */ rkqu = rd_kafka_queue_new(rk); for (i=0 ; i<(size_t)partition_cnt ; ++i) { const int r = rd_kafka_consume_start_queue(rkt, partitions[i], start_offset, rkqu); if (r == -1) { fprintf(stderr, "%% Error creating queue: %s\n", rd_kafka_err2str( rd_kafka_errno2err(errno))); exit(1); } } while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) { /* Consume messages. * A message may either be a real message, or * an error signaling (if rkmessage->err is set). */ uint64_t fetch_latency; int r; fetch_latency = rd_clock(); if (batch_size) { int i; int partition = partitions ? partitions[0] : RD_KAFKA_PARTITION_UA; /* Batch fetch mode */ r = rd_kafka_consume_batch(rkt, partition, 1000, rkmessages, batch_size); if (r != -1) { for (i = 0 ; i < r ; i++) { msg_consume(rkmessages[i], NULL); rd_kafka_message_destroy( rkmessages[i]); } } } else { /* Queue mode */ r = rd_kafka_consume_callback_queue(rkqu, 1000, msg_consume, NULL); } cnt.t_fetch_latency += rd_clock() - fetch_latency; if (r == -1) fprintf(stderr, "%% Error: %s\n", rd_kafka_err2str( rd_kafka_errno2err(errno))); print_stats(rk, mode, otype, compression); /* Poll to handle stats callbacks */ rd_kafka_poll(rk, 0); } cnt.t_end = rd_clock(); /* Stop consuming */ for (i=0 ; i<(size_t)partition_cnt ; ++i) { int r = rd_kafka_consume_stop(rkt, i); if (r == -1) { fprintf(stderr, "%% Error in consume_stop: %s\n", rd_kafka_err2str( rd_kafka_errno2err(errno))); } } rd_kafka_queue_destroy(rkqu); /* Destroy topic */ rd_kafka_topic_destroy(rkt); if (batch_size) free(rkmessages); /* Destroy the handle */ rd_kafka_destroy(rk); global_rk = rk = NULL; } else if (mode == 'G') { /* * High-level balanced Consumer */ rd_kafka_resp_err_t err; rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb); rd_kafka_conf_set_default_topic_conf(conf, topic_conf); /* Create Kafka handle */ if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)))) { fprintf(stderr, "%% Failed to create Kafka consumer: %s\n", errstr); exit(1); } /* Forward all events to consumer queue */ rd_kafka_poll_set_consumer(rk); global_rk = rk; if (debug) rd_kafka_set_log_level(rk, 7); /* Add broker(s) */ if (brokers && rd_kafka_brokers_add(rk, brokers) < 1) { fprintf(stderr, "%% No valid brokers specified\n"); exit(1); } err = rd_kafka_subscribe(rk, topics); if (err) { fprintf(stderr, "%% Subscribe failed: %s\n", rd_kafka_err2str(err)); exit(1); } fprintf(stderr, "%% Waiting for group rebalance..\n"); while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) { /* Consume messages. * A message may either be a real message, or * an event (if rkmessage->err is set). */ rd_kafka_message_t *rkmessage; uint64_t fetch_latency; fetch_latency = rd_clock(); rkmessage = rd_kafka_consumer_poll(rk, 1000); if (rkmessage) { msg_consume(rkmessage, NULL); rd_kafka_message_destroy(rkmessage); } cnt.t_fetch_latency += rd_clock() - fetch_latency; print_stats(rk, mode, otype, compression); } cnt.t_end = rd_clock(); err = rd_kafka_consumer_close(rk); if (err) fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err)); rd_kafka_destroy(rk); } print_stats(NULL, mode, otype|_OTYPE_FORCE, compression); if (cnt.t_fetch_latency && cnt.msgs) printf("%% Average application fetch latency: %"PRIu64"us\n", cnt.t_fetch_latency / cnt.msgs); if (latency_fp) fclose(latency_fp); if (stats_fp) { pclose(stats_fp); stats_fp = NULL; } rd_kafka_topic_partition_list_destroy(topics); /* Let background threads clean up and terminate cleanly. */ rd_kafka_wait_destroyed(2000); return 0; }