/* * librdkafka - Apache Kafka C/C++ library * * Copyright (c) 2014 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include "rdkafkacpp_int.h" void RdKafka::log_cb_trampoline (const rd_kafka_t *rk, int level, const char *fac, const char *buf) { if (!rk) { rd_kafka_log_print(rk, level, fac, buf); return; } void *opaque = rd_kafka_opaque(rk); RdKafka::HandleImpl *handle = static_cast(opaque); if (!handle->event_cb_) { rd_kafka_log_print(rk, level, fac, buf); return; } RdKafka::EventImpl event(RdKafka::Event::EVENT_LOG, RdKafka::ERR_NO_ERROR, static_cast(level), fac, buf); handle->event_cb_->event_cb(event); } void RdKafka::error_cb_trampoline (rd_kafka_t *rk, int err, const char *reason, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); RdKafka::EventImpl event(RdKafka::Event::EVENT_ERROR, static_cast(err), RdKafka::Event::EVENT_SEVERITY_ERROR, NULL, reason); handle->event_cb_->event_cb(event); } void RdKafka::throttle_cb_trampoline (rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); RdKafka::EventImpl event(RdKafka::Event::EVENT_THROTTLE); event.str_ = broker_name; event.id_ = broker_id; event.throttle_time_ = throttle_time_ms; handle->event_cb_->event_cb(event); } int RdKafka::stats_cb_trampoline (rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); RdKafka::EventImpl event(RdKafka::Event::EVENT_STATS, RdKafka::ERR_NO_ERROR, RdKafka::Event::EVENT_SEVERITY_INFO, NULL, json); handle->event_cb_->event_cb(event); return 0; } int RdKafka::socket_cb_trampoline (int domain, int type, int protocol, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); return handle->socket_cb_->socket_cb(domain, type, protocol); } int RdKafka::open_cb_trampoline (const char *pathname, int flags, mode_t mode, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); return handle->open_cb_->open_cb(pathname, flags, static_cast(mode)); } RdKafka::ErrorCode RdKafka::HandleImpl::metadata (bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms) { const rd_kafka_metadata_t *cmetadatap=NULL; rd_kafka_topic_t *topic = only_rkt ? static_cast(only_rkt)->rkt_ : NULL; const rd_kafka_resp_err_t rc = rd_kafka_metadata(rk_, all_topics, topic, &cmetadatap,timeout_ms); *metadatap = (rc == RD_KAFKA_RESP_ERR_NO_ERROR) ? new RdKafka::MetadataImpl(cmetadatap) : NULL; return static_cast(rc); } /** * Convert a list of C partitions to C++ partitions */ static void c_parts_to_partitions (const rd_kafka_topic_partition_list_t *c_parts, std::vector &partitions) { partitions.resize(c_parts->cnt); for (int i = 0 ; i < c_parts->cnt ; i++) partitions[i] = new RdKafka::TopicPartitionImpl(&c_parts->elems[i]); } static void free_partition_vector (std::vector &v) { for (unsigned int i = 0 ; i < v.size() ; i++) delete v[i]; v.clear(); } void RdKafka::rebalance_cb_trampoline (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *c_partitions, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); std::vector partitions; c_parts_to_partitions(c_partitions, partitions); handle->rebalance_cb_->rebalance_cb( dynamic_cast(handle), static_cast(err), partitions); free_partition_vector(partitions); } void RdKafka::offset_commit_cb_trampoline ( rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *c_offsets, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); std::vector offsets; if (c_offsets) c_parts_to_partitions(c_offsets, offsets); handle->offset_commit_cb_-> offset_commit_cb(static_cast(err), offsets); free_partition_vector(offsets); } void RdKafka::HandleImpl::set_common_config (RdKafka::ConfImpl *confimpl) { rd_kafka_conf_set_opaque(confimpl->rk_conf_, this); if (confimpl->event_cb_) { rd_kafka_conf_set_log_cb(confimpl->rk_conf_, RdKafka::log_cb_trampoline); rd_kafka_conf_set_error_cb(confimpl->rk_conf_, RdKafka::error_cb_trampoline); rd_kafka_conf_set_throttle_cb(confimpl->rk_conf_, RdKafka::throttle_cb_trampoline); rd_kafka_conf_set_stats_cb(confimpl->rk_conf_, RdKafka::stats_cb_trampoline); event_cb_ = confimpl->event_cb_; } if (confimpl->socket_cb_) { rd_kafka_conf_set_socket_cb(confimpl->rk_conf_, RdKafka::socket_cb_trampoline); socket_cb_ = confimpl->socket_cb_; } if (confimpl->open_cb_) { #ifndef _MSC_VER rd_kafka_conf_set_open_cb(confimpl->rk_conf_, RdKafka::open_cb_trampoline); open_cb_ = confimpl->open_cb_; #endif } if (confimpl->rebalance_cb_) { rd_kafka_conf_set_rebalance_cb(confimpl->rk_conf_, RdKafka::rebalance_cb_trampoline); rebalance_cb_ = confimpl->rebalance_cb_; } if (confimpl->offset_commit_cb_) { rd_kafka_conf_set_offset_commit_cb(confimpl->rk_conf_, RdKafka::offset_commit_cb_trampoline); offset_commit_cb_ = confimpl->offset_commit_cb_; } } RdKafka::ErrorCode RdKafka::HandleImpl::pause (std::vector &partitions) { rd_kafka_topic_partition_list_t *c_parts; rd_kafka_resp_err_t err; c_parts = partitions_to_c_parts(partitions); err = rd_kafka_pause_partitions(rk_, c_parts); if (!err) update_partitions_from_c_parts(partitions, c_parts); rd_kafka_topic_partition_list_destroy(c_parts); return static_cast(err); } RdKafka::ErrorCode RdKafka::HandleImpl::resume (std::vector &partitions) { rd_kafka_topic_partition_list_t *c_parts; rd_kafka_resp_err_t err; c_parts = partitions_to_c_parts(partitions); err = rd_kafka_resume_partitions(rk_, c_parts); if (!err) update_partitions_from_c_parts(partitions, c_parts); rd_kafka_topic_partition_list_destroy(c_parts); return static_cast(err); } namespace RdKafka { rd_kafka_topic_partition_list_t * partitions_to_c_parts (const std::vector &partitions){ rd_kafka_topic_partition_list_t *c_parts; c_parts = rd_kafka_topic_partition_list_new(partitions.size()); for (unsigned int i = 0 ; i < partitions.size() ; i++) { const RdKafka::TopicPartitionImpl *tpi = dynamic_cast(partitions[i]); rd_kafka_topic_partition_t *rktpar = rd_kafka_topic_partition_list_add(c_parts, tpi->topic_.c_str(), tpi->partition_); rktpar->offset = tpi->offset_; } return c_parts; } /** * @brief Update the application provided 'partitions' with info from 'c_parts' */ void update_partitions_from_c_parts (std::vector &partitions, const rd_kafka_topic_partition_list_t *c_parts) { for (int i = 0 ; i < c_parts->cnt ; i++) { rd_kafka_topic_partition_t *p = &c_parts->elems[i]; /* Find corresponding C++ entry */ for (unsigned int j = 0 ; j < partitions.size() ; j++) { RdKafka::TopicPartitionImpl *pp = dynamic_cast(partitions[j]); if (!strcmp(p->topic, pp->topic_.c_str()) && p->partition == pp->partition_) { pp->offset_ = p->offset; pp->err_ = static_cast(p->err); } } } } };