/* * librdkafka - Apache Kafka C/C++ library * * Copyright (c) 2014-2022, Magnus Edenhill * 2023, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include "rdkafkacpp_int.h" void RdKafka::consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); RdKafka::Topic *topic = static_cast(rd_kafka_topic_opaque(msg->rkt)); RdKafka::MessageImpl message(RD_KAFKA_CONSUMER, topic, msg, false /*don't free*/); handle->consume_cb_->consume_cb(message, opaque); } void RdKafka::log_cb_trampoline(const rd_kafka_t *rk, int level, const char *fac, const char *buf) { if (!rk) { rd_kafka_log_print(rk, level, fac, buf); return; } void *opaque = rd_kafka_opaque(rk); RdKafka::HandleImpl *handle = static_cast(opaque); if (!handle->event_cb_) { rd_kafka_log_print(rk, level, fac, buf); return; } RdKafka::EventImpl event(RdKafka::Event::EVENT_LOG, RdKafka::ERR_NO_ERROR, static_cast(level), fac, buf); handle->event_cb_->event_cb(event); } void RdKafka::error_cb_trampoline(rd_kafka_t *rk, int err, const char *reason, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); char errstr[512]; bool is_fatal = false; if (err == RD_KAFKA_RESP_ERR__FATAL) { /* Translate to underlying fatal error code and string */ err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr)); if (err) reason = errstr; is_fatal = true; } RdKafka::EventImpl event(RdKafka::Event::EVENT_ERROR, static_cast(err), RdKafka::Event::EVENT_SEVERITY_ERROR, NULL, reason); event.fatal_ = is_fatal; handle->event_cb_->event_cb(event); } void RdKafka::throttle_cb_trampoline(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); RdKafka::EventImpl event(RdKafka::Event::EVENT_THROTTLE); event.str_ = broker_name; event.id_ = broker_id; event.throttle_time_ = throttle_time_ms; handle->event_cb_->event_cb(event); } int RdKafka::stats_cb_trampoline(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); RdKafka::EventImpl event(RdKafka::Event::EVENT_STATS, RdKafka::ERR_NO_ERROR, RdKafka::Event::EVENT_SEVERITY_INFO, NULL, json); handle->event_cb_->event_cb(event); return 0; } int RdKafka::socket_cb_trampoline(int domain, int type, int protocol, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); return handle->socket_cb_->socket_cb(domain, type, protocol); } int RdKafka::open_cb_trampoline(const char *pathname, int flags, mode_t mode, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); return handle->open_cb_->open_cb(pathname, flags, static_cast(mode)); } void RdKafka::oauthbearer_token_refresh_cb_trampoline( rd_kafka_t *rk, const char *oauthbearer_config, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); handle->oauthbearer_token_refresh_cb_->oauthbearer_token_refresh_cb( handle, std::string(oauthbearer_config ? oauthbearer_config : "")); } int RdKafka::ssl_cert_verify_cb_trampoline(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int *x509_error, int depth, const char *buf, size_t size, char *errstr, size_t errstr_size, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); std::string errbuf; bool res = 0 != handle->ssl_cert_verify_cb_->ssl_cert_verify_cb( std::string(broker_name), broker_id, x509_error, depth, buf, size, errbuf); if (res) return (int)res; size_t errlen = errbuf.size() > errstr_size - 1 ? errstr_size - 1 : errbuf.size(); memcpy(errstr, errbuf.c_str(), errlen); if (errstr_size > 0) errstr[errlen] = '\0'; return (int)res; } RdKafka::ErrorCode RdKafka::HandleImpl::metadata(bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms) { const rd_kafka_metadata_t *cmetadatap = NULL; rd_kafka_topic_t *topic = only_rkt ? static_cast(only_rkt)->rkt_ : NULL; const rd_kafka_resp_err_t rc = rd_kafka_metadata(rk_, all_topics, topic, &cmetadatap, timeout_ms); *metadatap = (rc == RD_KAFKA_RESP_ERR_NO_ERROR) ? new RdKafka::MetadataImpl(cmetadatap) : NULL; return static_cast(rc); } /** * Convert a list of C partitions to C++ partitions */ static void c_parts_to_partitions( const rd_kafka_topic_partition_list_t *c_parts, std::vector &partitions) { partitions.resize(c_parts->cnt); for (int i = 0; i < c_parts->cnt; i++) partitions[i] = new RdKafka::TopicPartitionImpl(&c_parts->elems[i]); } static void free_partition_vector(std::vector &v) { for (unsigned int i = 0; i < v.size(); i++) delete v[i]; v.clear(); } void RdKafka::rebalance_cb_trampoline( rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *c_partitions, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); std::vector partitions; c_parts_to_partitions(c_partitions, partitions); handle->rebalance_cb_->rebalance_cb( dynamic_cast(handle), static_cast(err), partitions); free_partition_vector(partitions); } void RdKafka::offset_commit_cb_trampoline0( rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *c_offsets, void *opaque) { OffsetCommitCb *cb = static_cast(opaque); std::vector offsets; if (c_offsets) c_parts_to_partitions(c_offsets, offsets); cb->offset_commit_cb(static_cast(err), offsets); free_partition_vector(offsets); } static void offset_commit_cb_trampoline( rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *c_offsets, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); RdKafka::offset_commit_cb_trampoline0(rk, err, c_offsets, handle->offset_commit_cb_); } void RdKafka::HandleImpl::set_common_config(const RdKafka::ConfImpl *confimpl) { rd_kafka_conf_set_opaque(confimpl->rk_conf_, this); if (confimpl->event_cb_) { rd_kafka_conf_set_log_cb(confimpl->rk_conf_, RdKafka::log_cb_trampoline); rd_kafka_conf_set_error_cb(confimpl->rk_conf_, RdKafka::error_cb_trampoline); rd_kafka_conf_set_throttle_cb(confimpl->rk_conf_, RdKafka::throttle_cb_trampoline); rd_kafka_conf_set_stats_cb(confimpl->rk_conf_, RdKafka::stats_cb_trampoline); event_cb_ = confimpl->event_cb_; } if (confimpl->oauthbearer_token_refresh_cb_) { rd_kafka_conf_set_oauthbearer_token_refresh_cb( confimpl->rk_conf_, RdKafka::oauthbearer_token_refresh_cb_trampoline); oauthbearer_token_refresh_cb_ = confimpl->oauthbearer_token_refresh_cb_; } if (confimpl->socket_cb_) { rd_kafka_conf_set_socket_cb(confimpl->rk_conf_, RdKafka::socket_cb_trampoline); socket_cb_ = confimpl->socket_cb_; } if (confimpl->ssl_cert_verify_cb_) { rd_kafka_conf_set_ssl_cert_verify_cb( confimpl->rk_conf_, RdKafka::ssl_cert_verify_cb_trampoline); ssl_cert_verify_cb_ = confimpl->ssl_cert_verify_cb_; } if (confimpl->open_cb_) { #ifndef _WIN32 rd_kafka_conf_set_open_cb(confimpl->rk_conf_, RdKafka::open_cb_trampoline); open_cb_ = confimpl->open_cb_; #endif } if (confimpl->rebalance_cb_) { rd_kafka_conf_set_rebalance_cb(confimpl->rk_conf_, RdKafka::rebalance_cb_trampoline); rebalance_cb_ = confimpl->rebalance_cb_; } if (confimpl->offset_commit_cb_) { rd_kafka_conf_set_offset_commit_cb(confimpl->rk_conf_, offset_commit_cb_trampoline); offset_commit_cb_ = confimpl->offset_commit_cb_; } if (confimpl->consume_cb_) { rd_kafka_conf_set_consume_cb(confimpl->rk_conf_, RdKafka::consume_cb_trampoline); consume_cb_ = confimpl->consume_cb_; } } RdKafka::ErrorCode RdKafka::HandleImpl::pause( std::vector &partitions) { rd_kafka_topic_partition_list_t *c_parts; rd_kafka_resp_err_t err; c_parts = partitions_to_c_parts(partitions); err = rd_kafka_pause_partitions(rk_, c_parts); if (!err) update_partitions_from_c_parts(partitions, c_parts); rd_kafka_topic_partition_list_destroy(c_parts); return static_cast(err); } RdKafka::ErrorCode RdKafka::HandleImpl::resume( std::vector &partitions) { rd_kafka_topic_partition_list_t *c_parts; rd_kafka_resp_err_t err; c_parts = partitions_to_c_parts(partitions); err = rd_kafka_resume_partitions(rk_, c_parts); if (!err) update_partitions_from_c_parts(partitions, c_parts); rd_kafka_topic_partition_list_destroy(c_parts); return static_cast(err); } RdKafka::Queue *RdKafka::HandleImpl::get_partition_queue( const TopicPartition *part) { rd_kafka_queue_t *rkqu; rkqu = rd_kafka_queue_get_partition(rk_, part->topic().c_str(), part->partition()); if (rkqu == NULL) return NULL; return new QueueImpl(rkqu); } RdKafka::ErrorCode RdKafka::HandleImpl::set_log_queue(RdKafka::Queue *queue) { rd_kafka_queue_t *rkqu = NULL; if (queue) { QueueImpl *queueimpl = dynamic_cast(queue); rkqu = queueimpl->queue_; } return static_cast(rd_kafka_set_log_queue(rk_, rkqu)); } namespace RdKafka { rd_kafka_topic_partition_list_t *partitions_to_c_parts( const std::vector &partitions) { rd_kafka_topic_partition_list_t *c_parts; c_parts = rd_kafka_topic_partition_list_new((int)partitions.size()); for (unsigned int i = 0; i < partitions.size(); i++) { const RdKafka::TopicPartitionImpl *tpi = dynamic_cast(partitions[i]); rd_kafka_topic_partition_t *rktpar = rd_kafka_topic_partition_list_add( c_parts, tpi->topic_.c_str(), tpi->partition_); rktpar->offset = tpi->offset_; if (tpi->metadata_.size()) { void *metadata_p = mem_malloc(tpi->metadata_.size()); memcpy(metadata_p, tpi->metadata_.data(), tpi->metadata_.size()); rktpar->metadata = metadata_p; rktpar->metadata_size = tpi->metadata_.size(); } if (tpi->leader_epoch_ != -1) rd_kafka_topic_partition_set_leader_epoch(rktpar, tpi->leader_epoch_); } return c_parts; } /** * @brief Update the application provided 'partitions' with info from 'c_parts' */ void update_partitions_from_c_parts( std::vector &partitions, const rd_kafka_topic_partition_list_t *c_parts) { for (int i = 0; i < c_parts->cnt; i++) { rd_kafka_topic_partition_t *p = &c_parts->elems[i]; /* Find corresponding C++ entry */ for (unsigned int j = 0; j < partitions.size(); j++) { RdKafka::TopicPartitionImpl *pp = dynamic_cast(partitions[j]); if (!strcmp(p->topic, pp->topic_.c_str()) && p->partition == pp->partition_) { pp->offset_ = p->offset; pp->err_ = static_cast(p->err); pp->leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(p); if (p->metadata_size) { unsigned char *metadata = (unsigned char *)p->metadata; pp->metadata_.assign(metadata, metadata + p->metadata_size); } } } } } } // namespace RdKafka