/** * Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #include "uct_test.h" extern "C" { #include } class test_many2one_am : public uct_test { public: static const uint8_t AM_ID = 15; static const uint64_t MAGIC_DESC = 0xdeadbeef12345678ul; static const uint64_t MAGIC_ALLOC = 0xbaadf00d12345678ul; typedef struct { uint64_t magic; unsigned length; } receive_desc_t; test_many2one_am() : m_am_count(0), m_receiver(NULL) { } void init() { std::string val = "16k"; std::string tx_name, rx_name; if (has_ib()) { tx_name = "IB_SEG_SIZE"; } else if (has_transport("tcp")) { tx_name = "TX_SEG_SIZE"; rx_name = "RX_SEG_SIZE"; } else if (has_transport("mm") || has_transport("self")) { tx_name = "SEG_SIZE"; } if (!tx_name.empty()) { modify_config(tx_name, val); } if (!rx_name.empty()) { modify_config(rx_name, val); } uct_test::init(); m_receiver = create_entity(sizeof(receive_desc_t)); m_entities.push_back(m_receiver); check_skip_test(); } static ucs_status_t am_handler(void *arg, void *data, size_t length, unsigned flags) { test_many2one_am *self = reinterpret_cast(arg); return self->am_handler(data, length, flags); } ucs_status_t am_handler(void *data, size_t length, unsigned flags) { if (ucs::rand() % 4 == 0) { receive_desc_t *my_desc; if (flags & UCT_CB_PARAM_FLAG_DESC) { my_desc = (receive_desc_t *)data - 1; my_desc->magic = MAGIC_DESC; } else { my_desc = (receive_desc_t *)ucs_malloc(sizeof(*my_desc) + length, "TODO: remove allocation"); my_desc->magic = MAGIC_ALLOC; } my_desc->length = length; if (data != my_desc + 1) { memcpy(my_desc + 1, data, length); } m_backlog.push_back(my_desc); ucs_atomic_add32(&m_am_count, 1); return (flags & UCT_CB_PARAM_FLAG_DESC) ? UCS_INPROGRESS : UCS_OK; } mem_buffer::pattern_check(data, length); ucs_atomic_add32(&m_am_count, 1); return UCS_OK; } void check_backlog() { while (!m_backlog.empty()) { receive_desc_t *my_desc = m_backlog.back(); m_backlog.pop_back(); mem_buffer::pattern_check(my_desc + 1, my_desc->length); if (my_desc->magic == MAGIC_DESC) { uct_iface_release_desc(my_desc); } else { EXPECT_EQ(uint64_t(MAGIC_ALLOC), my_desc->magic); ucs_free(my_desc); } } } static const size_t NUM_SENDERS = 10; protected: volatile uint32_t m_am_count; std::vector m_backlog; entity *m_receiver; }; UCS_TEST_SKIP_COND_P(test_many2one_am, am_bcopy, !check_caps(UCT_IFACE_FLAG_AM_BCOPY | UCT_IFACE_FLAG_CB_SYNC)) { const unsigned num_sends = 1000 / ucs::test_time_multiplier(); ucs_status_t status; ucs::ptr_vector buffers; for (unsigned i = 0; i < NUM_SENDERS; ++i) { entity *sender = create_entity(0); mapped_buffer *buffer = new mapped_buffer( sender->iface_attr().cap.am.max_bcopy, 0, *sender); sender->connect(0, *m_receiver, i); m_entities.push_back(sender); buffers.push_back(buffer); } m_am_count = 0; status = uct_iface_set_am_handler(m_receiver->iface(), AM_ID, am_handler, (void*)this, 0); ASSERT_UCS_OK(status); for (unsigned i = 0; i < num_sends; ++i) { unsigned sender_num = ucs::rand() % NUM_SENDERS; mapped_buffer& buffer = buffers.at(sender_num); buffer.pattern_fill(i); ssize_t packed_len; for (;;) { const entity& sender = ent(sender_num + 1); packed_len = uct_ep_am_bcopy(sender.ep(0), AM_ID, mapped_buffer::pack, (void*)&buffer, 0); if (packed_len != UCS_ERR_NO_RESOURCE) { break; } sender.progress(); m_receiver->progress(); } if (packed_len < 0) { ASSERT_UCS_OK((ucs_status_t)packed_len); } } while (m_am_count < num_sends) { progress(); } status = uct_iface_set_am_handler(m_receiver->iface(), AM_ID, NULL, NULL, 0); ASSERT_UCS_OK(status); check_backlog(); for (unsigned i = 0; i < NUM_SENDERS; ++i) { ent(i + 1).flush(); } buffers.clear(); } UCT_INSTANTIATE_NO_SELF_TEST_CASE(test_many2one_am)