# Statistics librdkafka may be configured to emit internal metrics at a fixed interval by setting the `statistics.interval.ms` configuration property to a value > 0 and registering a `stats_cb` (or similar, depending on language). The stats are provided as a JSON object string. **Note**: The metrics returned may not be completely consistent between brokers, toppars and totals, due to the internal asynchronous nature of librdkafka. E.g., the top level `tx` total may be less than the sum of the broker `tx` values which it represents. ## General structure All fields that contain sizes are in bytes unless otherwise noted. ``` { "brokers": { , "toppars": { } }, "topics": { , "partitions": { } } [, "cgrp": { } ] [, "eos": { } ] } ``` ## Field type Fields are represented as follows: * string - UTF8 string. * int - Integer counter (64 bits wide). Ever increasing. * int gauge - Integer gauge (64 bits wide). Will be reset to 0 on each stats emit. * object - Nested JSON object. * bool - `true` or `false`. ## Top-level Field | Type | Example | Description ----- | ---- | ------- | ----------- name | string | `"rdkafka#producer-1"` | Handle instance name client_id | string | `"rdkafka"` | The configured (or default) `client.id` type | string | `"producer"` | Instance type (producer or consumer) ts | int | 12345678912345 | librdkafka's internal monotonic clock (microseconds) time | int | | Wall clock time in seconds since the epoch age | int | | Time since this client instance was created (microseconds) replyq | int gauge | | Number of ops (callbacks, events, etc) waiting in queue for application to serve with rd_kafka_poll() msg_cnt | int gauge | | Current number of messages in producer queues msg_size | int gauge | | Current total size of messages in producer queues msg_max | int | | Threshold: maximum number of messages allowed allowed on the producer queues msg_size_max | int | | Threshold: maximum total size of messages allowed on the producer queues tx | int | | Total number of requests sent to Kafka brokers tx_bytes | int | | Total number of bytes transmitted to Kafka brokers rx | int | | Total number of responses received from Kafka brokers rx_bytes | int | | Total number of bytes received from Kafka brokers txmsgs | int | | Total number of messages transmitted (produced) to Kafka brokers txmsg_bytes | int | | Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers rxmsgs | int | | Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers. rxmsg_bytes | int | | Total number of message bytes (including framing) received from Kafka brokers simple_cnt | int gauge | | Internal tracking of legacy vs new consumer API state metadata_cache_cnt | int gauge | | Number of topics in the metadata cache. brokers | object | | Dict of brokers, key is broker name, value is object. See **brokers** below topics | object | | Dict of topics, key is topic name, value is object. See **topics** below cgrp | object | | Consumer group metrics. See **cgrp** below eos | object | | EOS / Idempotent producer state and metrics. See **eos** below ## brokers Per broker statistics. Field | Type | Example | Description ----- | ---- | ------- | ----------- name | string | `"example.com:9092/13"` | Broker hostname, port and broker id nodeid | int | 13 | Broker id (-1 for bootstraps) nodename | string | `"example.com:9092"` | Broker hostname source | string | `"configured"` | Broker source (learned, configured, internal, logical) state | string | `"UP"` | Broker state (INIT, DOWN, CONNECT, AUTH, APIVERSION_QUERY, AUTH_HANDSHAKE, UP, UPDATE) stateage | int gauge | | Time since last broker state change (microseconds) outbuf_cnt | int gauge | | Number of requests awaiting transmission to broker outbuf_msg_cnt | int gauge | | Number of messages awaiting transmission to broker waitresp_cnt | int gauge | | Number of requests in-flight to broker awaiting response waitresp_msg_cnt | int gauge | | Number of messages in-flight to broker awaiting response tx | int | | Total number of requests sent txbytes | int | | Total number of bytes sent txerrs | int | | Total number of transmission errors txretries | int | | Total number of request retries txidle | int | | Microseconds since last socket send (or -1 if no sends yet for current connection). req_timeouts | int | | Total number of requests timed out rx | int | | Total number of responses received rxbytes | int | | Total number of bytes received rxerrs | int | | Total number of receive errors rxcorriderrs | int | | Total number of unmatched correlation ids in response (typically for timed out requests) rxpartial | int | | Total number of partial MessageSets received. The broker may return partial responses if the full MessageSet could not fit in the remaining Fetch response size. rxidle | int | | Microseconds since last socket receive (or -1 if no receives yet for current connection). req | object | | Request type counters. Object key is the request name, value is the number of requests sent. zbuf_grow | int | | Total number of decompression buffer size increases buf_grow | int | | Total number of buffer size increases (deprecated, unused) wakeups | int | | Broker thread poll loop wakeups connects | int | | Number of connection attempts, including successful and failed, and name resolution failures. disconnects | int | | Number of disconnects (triggered by broker, network, load-balancer, etc.). int_latency | object | | Internal producer queue latency in microseconds. See *Window stats* below outbuf_latency | object | | Internal request queue latency in microseconds. This is the time between a request is enqueued on the transmit (outbuf) queue and the time the request is written to the TCP socket. Additional buffering and latency may be incurred by the TCP stack and network. See *Window stats* below rtt | object | | Broker latency / round-trip time in microseconds. See *Window stats* below throttle | object | | Broker throttling time in milliseconds. See *Window stats* below toppars | object | | Partitions handled by this broker handle. Key is "topic-partition". See *brokers.toppars* below ## Window stats Rolling window statistics. The values are in microseconds unless otherwise stated. Field | Type | Example | Description ----- | ---- | ------- | ----------- min | int gauge | | Smallest value max | int gauge | | Largest value avg | int gauge | | Average value sum | int gauge | | Sum of values cnt | int gauge | | Number of values sampled stddev | int gauge | | Standard deviation (based on histogram) hdrsize | int gauge | | Memory size of Hdr Histogram p50 | int gauge | | 50th percentile p75 | int gauge | | 75th percentile p90 | int gauge | | 90th percentile p95 | int gauge | | 95th percentile p99 | int gauge | | 99th percentile p99_99 | int gauge | | 99.99th percentile outofrange | int gauge | | Values skipped due to out of histogram range ## brokers.toppars Topic partition assigned to broker. Field | Type | Example | Description ----- | ---- | ------- | ----------- topic | string | `"mytopic"` | Topic name partition | int | 3 | Partition id ## topics Field | Type | Example | Description ----- | ---- | ------- | ----------- topic | string | `"myatopic"` | Topic name age | int gauge | | Age of client's topic object (milliseconds) metadata_age | int gauge | | Age of metadata from broker for this topic (milliseconds) batchsize | object | | Batch sizes in bytes. See *Window stats*· batchcnt | object | | Batch message counts. See *Window stats*· partitions | object | | Partitions dict, key is partition id. See **partitions** below. ## partitions Field | Type | Example | Description ----- | ---- | ------- | ----------- partition | int | 3 | Partition Id (-1 for internal UA/UnAssigned partition) broker | int | | The id of the broker that messages are currently being fetched from leader | int | | Current leader broker id desired | bool | | Partition is explicitly desired by application unknown | bool | | Partition not seen in topic metadata from broker msgq_cnt | int gauge | | Number of messages waiting to be produced in first-level queue msgq_bytes | int gauge | | Number of bytes in msgq_cnt xmit_msgq_cnt | int gauge | | Number of messages ready to be produced in transmit queue xmit_msgq_bytes | int gauge | | Number of bytes in xmit_msgq fetchq_cnt | int gauge | | Number of pre-fetched messages in fetch queue fetchq_size | int gauge | | Bytes in fetchq fetch_state | string | `"active"` | Consumer fetch state for this partition (none, stopping, stopped, offset-query, offset-wait, active). query_offset | int gauge | | Current/Last logical offset query next_offset | int gauge | | Next offset to fetch app_offset | int gauge | | Offset of last message passed to application + 1 stored_offset | int gauge | | Offset to be committed stored_leader_epoch | int | | Partition leader epoch of stored offset committed_offset | int gauge | | Last committed offset committed_leader_epoch | int | | Partition leader epoch of committed offset eof_offset | int gauge | | Last PARTITION_EOF signaled offset lo_offset | int gauge | | Partition's low watermark offset on broker hi_offset | int gauge | | Partition's high watermark offset on broker ls_offset | int gauge | | Partition's last stable offset on broker, or same as hi_offset is broker version is less than 0.11.0.0. consumer_lag | int gauge | | Difference between (hi_offset or ls_offset) and committed_offset). hi_offset is used when isolation.level=read_uncommitted, otherwise ls_offset. consumer_lag_stored | int gauge | | Difference between (hi_offset or ls_offset) and stored_offset. See consumer_lag and stored_offset. leader_epoch | int | | Last known partition leader epoch, or -1 if unknown. txmsgs | int | | Total number of messages transmitted (produced) txbytes | int | | Total number of bytes transmitted for txmsgs rxmsgs | int | | Total number of messages consumed, not including ignored messages (due to offset, etc). rxbytes | int | | Total number of bytes received for rxmsgs msgs | int | | Total number of messages received (consumer, same as rxmsgs), or total number of messages produced (possibly not yet transmitted) (producer). rx_ver_drops | int | | Dropped outdated messages msgs_inflight | int gauge | | Current number of messages in-flight to/from broker next_ack_seq | int gauge | | Next expected acked sequence (idempotent producer) next_err_seq | int gauge | | Next expected errored sequence (idempotent producer) acked_msgid | int | | Last acked internal message id (idempotent producer) ## cgrp Field | Type | Example | Description ----- | ---- | ------- | ----------- state | string | "up" | Local consumer group handler's state. stateage | int gauge | | Time elapsed since last state change (milliseconds). join_state | string | "assigned" | Local consumer group handler's join state. rebalance_age | int gauge | | Time elapsed since last rebalance (assign or revoke) (milliseconds). rebalance_cnt | int | | Total number of rebalances (assign or revoke). rebalance_reason | string | | Last rebalance reason, or empty string. assignment_size | int gauge | | Current assignment's partition count. ## eos Field | Type | Example | Description ----- | ---- | ------- | ----------- idemp_state | string | "Assigned" | Current idempotent producer id state. idemp_stateage | int gauge | | Time elapsed since last idemp_state change (milliseconds). txn_state | string | "InTransaction" | Current transactional producer state. txn_stateage | int gauge | | Time elapsed since last txn_state change (milliseconds). txn_may_enq | bool | | Transactional state allows enqueuing (producing) new messages. producer_id | int gauge | | The currently assigned Producer ID (or -1). producer_epoch | int gauge | | The current epoch (or -1). epoch_cnt | int | | The number of Producer ID assignments since start. # Example output This (prettified) example output is from a short-lived producer using the following command: `rdkafka_performance -b localhost -P -t test -T 1000 -Y 'cat >> stats.json'`. Note: this output is prettified using `jq .`, the JSON object emitted by librdkafka does not contain line breaks. ```json { "name": "rdkafka#producer-1", "client_id": "rdkafka", "type": "producer", "ts": 5016483227792, "time": 1527060869, "replyq": 0, "msg_cnt": 22710, "msg_size": 704010, "msg_max": 500000, "msg_size_max": 1073741824, "simple_cnt": 0, "metadata_cache_cnt": 1, "brokers": { "localhost:9092/2": { "name": "localhost:9092/2", "nodeid": 2, "nodename": "localhost:9092", "source": "learned", "state": "UP", "stateage": 9057234, "outbuf_cnt": 0, "outbuf_msg_cnt": 0, "waitresp_cnt": 0, "waitresp_msg_cnt": 0, "tx": 320, "txbytes": 84283332, "txerrs": 0, "txretries": 0, "req_timeouts": 0, "rx": 320, "rxbytes": 15708, "rxerrs": 0, "rxcorriderrs": 0, "rxpartial": 0, "zbuf_grow": 0, "buf_grow": 0, "wakeups": 591067, "int_latency": { "min": 86, "max": 59375, "avg": 23726, "sum": 5694616664, "stddev": 13982, "p50": 28031, "p75": 36095, "p90": 39679, "p95": 43263, "p99": 48639, "p99_99": 59391, "outofrange": 0, "hdrsize": 11376, "cnt": 240012 }, "rtt": { "min": 1580, "max": 3389, "avg": 2349, "sum": 79868, "stddev": 474, "p50": 2319, "p75": 2543, "p90": 3183, "p95": 3199, "p99": 3391, "p99_99": 3391, "outofrange": 0, "hdrsize": 13424, "cnt": 34 }, "throttle": { "min": 0, "max": 0, "avg": 0, "sum": 0, "stddev": 0, "p50": 0, "p75": 0, "p90": 0, "p95": 0, "p99": 0, "p99_99": 0, "outofrange": 0, "hdrsize": 17520, "cnt": 34 }, "toppars": { "test-1": { "topic": "test", "partition": 1 } } }, "localhost:9093/3": { "name": "localhost:9093/3", "nodeid": 3, "nodename": "localhost:9093", "source": "learned", "state": "UP", "stateage": 9057209, "outbuf_cnt": 0, "outbuf_msg_cnt": 0, "waitresp_cnt": 0, "waitresp_msg_cnt": 0, "tx": 310, "txbytes": 84301122, "txerrs": 0, "txretries": 0, "req_timeouts": 0, "rx": 310, "rxbytes": 15104, "rxerrs": 0, "rxcorriderrs": 0, "rxpartial": 0, "zbuf_grow": 0, "buf_grow": 0, "wakeups": 607956, "int_latency": { "min": 82, "max": 58069, "avg": 23404, "sum": 5617432101, "stddev": 14021, "p50": 27391, "p75": 35839, "p90": 39679, "p95": 42751, "p99": 48639, "p99_99": 58111, "outofrange": 0, "hdrsize": 11376, "cnt": 240016 }, "rtt": { "min": 1704, "max": 3572, "avg": 2493, "sum": 87289, "stddev": 559, "p50": 2447, "p75": 2895, "p90": 3375, "p95": 3407, "p99": 3583, "p99_99": 3583, "outofrange": 0, "hdrsize": 13424, "cnt": 35 }, "throttle": { "min": 0, "max": 0, "avg": 0, "sum": 0, "stddev": 0, "p50": 0, "p75": 0, "p90": 0, "p95": 0, "p99": 0, "p99_99": 0, "outofrange": 0, "hdrsize": 17520, "cnt": 35 }, "toppars": { "test-0": { "topic": "test", "partition": 0 } } }, "localhost:9094/4": { "name": "localhost:9094/4", "nodeid": 4, "nodename": "localhost:9094", "source": "learned", "state": "UP", "stateage": 9057207, "outbuf_cnt": 0, "outbuf_msg_cnt": 0, "waitresp_cnt": 0, "waitresp_msg_cnt": 0, "tx": 1, "txbytes": 25, "txerrs": 0, "txretries": 0, "req_timeouts": 0, "rx": 1, "rxbytes": 272, "rxerrs": 0, "rxcorriderrs": 0, "rxpartial": 0, "zbuf_grow": 0, "buf_grow": 0, "wakeups": 4, "int_latency": { "min": 0, "max": 0, "avg": 0, "sum": 0, "stddev": 0, "p50": 0, "p75": 0, "p90": 0, "p95": 0, "p99": 0, "p99_99": 0, "outofrange": 0, "hdrsize": 11376, "cnt": 0 }, "rtt": { "min": 0, "max": 0, "avg": 0, "sum": 0, "stddev": 0, "p50": 0, "p75": 0, "p90": 0, "p95": 0, "p99": 0, "p99_99": 0, "outofrange": 0, "hdrsize": 13424, "cnt": 0 }, "throttle": { "min": 0, "max": 0, "avg": 0, "sum": 0, "stddev": 0, "p50": 0, "p75": 0, "p90": 0, "p95": 0, "p99": 0, "p99_99": 0, "outofrange": 0, "hdrsize": 17520, "cnt": 0 }, "toppars": {} } }, "topics": { "test": { "topic": "test", "metadata_age": 9060, "batchsize": { "min": 99, "max": 391805, "avg": 272593, "sum": 18808985, "stddev": 180408, "p50": 393215, "p75": 393215, "p90": 393215, "p95": 393215, "p99": 393215, "p99_99": 393215, "outofrange": 0, "hdrsize": 14448, "cnt": 69 }, "batchcnt": { "min": 1, "max": 10000, "avg": 6956, "sum": 480028, "stddev": 4608, "p50": 10047, "p75": 10047, "p90": 10047, "p95": 10047, "p99": 10047, "p99_99": 10047, "outofrange": 0, "hdrsize": 8304, "cnt": 69 }, "partitions": { "0": { "partition": 0, "broker": 3, "leader": 3, "desired": false, "unknown": false, "msgq_cnt": 1, "msgq_bytes": 31, "xmit_msgq_cnt": 0, "xmit_msgq_bytes": 0, "fetchq_cnt": 0, "fetchq_size": 0, "fetch_state": "none", "query_offset": 0, "next_offset": 0, "app_offset": -1001, "stored_offset": -1001, "commited_offset": -1001, "committed_offset": -1001, "eof_offset": -1001, "lo_offset": -1001, "hi_offset": -1001, "consumer_lag": -1, "txmsgs": 2150617, "txbytes": 66669127, "rxmsgs": 0, "rxbytes": 0, "msgs": 2160510, "rx_ver_drops": 0 }, "1": { "partition": 1, "broker": 2, "leader": 2, "desired": false, "unknown": false, "msgq_cnt": 0, "msgq_bytes": 0, "xmit_msgq_cnt": 0, "xmit_msgq_bytes": 0, "fetchq_cnt": 0, "fetchq_size": 0, "fetch_state": "none", "query_offset": 0, "next_offset": 0, "app_offset": -1001, "stored_offset": -1001, "commited_offset": -1001, "committed_offset": -1001, "eof_offset": -1001, "lo_offset": -1001, "hi_offset": -1001, "consumer_lag": -1, "txmsgs": 2150136, "txbytes": 66654216, "rxmsgs": 0, "rxbytes": 0, "msgs": 2159735, "rx_ver_drops": 0 }, "-1": { "partition": -1, "broker": -1, "leader": -1, "desired": false, "unknown": false, "msgq_cnt": 0, "msgq_bytes": 0, "xmit_msgq_cnt": 0, "xmit_msgq_bytes": 0, "fetchq_cnt": 0, "fetchq_size": 0, "fetch_state": "none", "query_offset": 0, "next_offset": 0, "app_offset": -1001, "stored_offset": -1001, "commited_offset": -1001, "committed_offset": -1001, "eof_offset": -1001, "lo_offset": -1001, "hi_offset": -1001, "consumer_lag": -1, "txmsgs": 0, "txbytes": 0, "rxmsgs": 0, "rxbytes": 0, "msgs": 1177, "rx_ver_drops": 0 } } } }, "tx": 631, "tx_bytes": 168584479, "rx": 631, "rx_bytes": 31084, "txmsgs": 4300753, "txmsg_bytes": 133323343, "rxmsgs": 0, "rxmsg_bytes": 0 } ```