/* * librdkafka - Apache Kafka C library * * Copyright (c) 2020-2022, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /** * Example utility that shows how to use DeleteRecords (AdminAPI) * do delete all messages/records up to (but not including) a specific offset * from one or more topic partitions. */ #include #include #include #include /* Typical include path would be , but this program * is builtin from within the librdkafka source tree and thus differs. */ #include "rdkafka.h" static rd_kafka_queue_t *queue; /** Admin result queue. * This is a global so we can * yield in stop() */ static volatile sig_atomic_t run = 1; /** * @brief Signal termination of program */ static void stop(int sig) { if (!run) { fprintf(stderr, "%% Forced termination\n"); exit(2); } run = 0; rd_kafka_queue_yield(queue); } /** * @brief Parse an integer or fail. */ int64_t parse_int(const char *what, const char *str) { char *end; unsigned long n = strtoull(str, &end, 0); if (end != str + strlen(str)) { fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n", what, str); exit(1); } return (int64_t)n; } int main(int argc, char **argv) { rd_kafka_conf_t *conf; /* Temporary configuration object */ char errstr[512]; /* librdkafka API error reporting buffer */ const char *brokers; /* Argument: broker list */ rd_kafka_t *rk; /* Admin client instance */ rd_kafka_topic_partition_list_t *offsets_before; /* Delete messages up * to but not * including these * offsets */ rd_kafka_DeleteRecords_t *del_records; /* Container for offsets_before*/ rd_kafka_AdminOptions_t *options; /* (Optional) Options for * DeleteRecords() */ rd_kafka_event_t *event; /* DeleteRecords result event */ int exitcode = 0; int i; /* * Argument validation */ if (argc < 5 || (argc - 2) % 3 != 0) { fprintf(stderr, "%% Usage: %s " " " " ...\n" "\n" "Delete all messages up to but not including the " "specified offset(s).\n" "\n", argv[0]); return 1; } brokers = argv[1]; /* Parse topic partition offset tuples and add to offsets list */ offsets_before = rd_kafka_topic_partition_list_new((argc - 2) / 3); for (i = 2; i < argc; i += 3) { const char *topic = argv[i]; int partition = parse_int("partition", argv[i + 1]); int64_t offset = parse_int("offset_before", argv[i + 2]); rd_kafka_topic_partition_list_add(offsets_before, topic, partition) ->offset = offset; } /* * Create Kafka client configuration place-holder */ conf = rd_kafka_conf_new(); /* Set bootstrap broker(s) as a comma-separated list of * host or host:port (default port 9092). * librdkafka will use the bootstrap brokers to acquire the full * set of brokers from the cluster. */ if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); return 1; } rd_kafka_conf_set(conf, "debug", "admin,topic,metadata", NULL, 0); /* * Create an admin client, it can be created using any client type, * so we choose producer since it requires no extra configuration * and is more light-weight than the consumer. * * NOTE: rd_kafka_new() takes ownership of the conf object * and the application must not reference it again after * this call. */ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { fprintf(stderr, "%% Failed to create new producer: %s\n", errstr); return 1; } /* The Admin API is completely asynchronous, results are emitted * on the result queue that is passed to DeleteRecords() */ queue = rd_kafka_queue_new(rk); /* Signal handler for clean shutdown */ signal(SIGINT, stop); /* Set timeout (optional) */ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETERECORDS); if (rd_kafka_AdminOptions_set_request_timeout( options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))) { fprintf(stderr, "%% Failed to set timeout: %s\n", errstr); return 1; } /* Create argument */ del_records = rd_kafka_DeleteRecords_new(offsets_before); /* We're now done with offsets_before */ rd_kafka_topic_partition_list_destroy(offsets_before); /* Call DeleteRecords */ rd_kafka_DeleteRecords(rk, &del_records, 1, options, queue); /* Clean up input arguments */ rd_kafka_DeleteRecords_destroy(del_records); rd_kafka_AdminOptions_destroy(options); /* Wait for results */ event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/); if (!event) { /* User hit Ctrl-C */ fprintf(stderr, "%% Cancelled by user\n"); } else if (rd_kafka_event_error(event)) { /* DeleteRecords request failed */ fprintf(stderr, "%% DeleteRecords failed: %s\n", rd_kafka_event_error_string(event)); exitcode = 2; } else { /* DeleteRecords request succeeded, but individual * partitions may have errors. */ const rd_kafka_DeleteRecords_result_t *result; const rd_kafka_topic_partition_list_t *offsets; int i; result = rd_kafka_event_DeleteRecords_result(event); offsets = rd_kafka_DeleteRecords_result_offsets(result); printf("DeleteRecords results:\n"); for (i = 0; i < offsets->cnt; i++) printf(" %s [%" PRId32 "] offset %" PRId64 ": %s\n", offsets->elems[i].topic, offsets->elems[i].partition, offsets->elems[i].offset, rd_kafka_err2str(offsets->elems[i].err)); } /* Destroy event object when we're done with it. * Note: rd_kafka_event_destroy() allows a NULL event. */ rd_kafka_event_destroy(event); signal(SIGINT, SIG_DFL); /* Destroy queue */ rd_kafka_queue_destroy(queue); /* Destroy the producer instance */ rd_kafka_destroy(rk); return exitcode; }