/* * Copyright (c) 2013-2015 Intel Corporation. All rights reserved. * Copyright (c) 2016 Cisco Systems, Inc. All rights reserved. * * This software is available to you under the BSD license * below: * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * - Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * - Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #include #include #include #include #include #include #include #include enum { FT_UNSPEC, FT_EP_CNT, }; enum ft_ep_state { FT_EP_STATE_INIT, FT_EP_CONNECT_RCVD, FT_EP_CONNECTING, FT_EP_CONNECTED, }; struct ep_info { struct fid_ep *ep; struct fi_info *fi; enum ft_ep_state state; }; static struct fi_info *fi_dup; static int ep_cnt = 4; static struct fid_ep **ep_array; static size_t addrlen = 0; static fi_addr_t *addr_array; static int get_dupinfo(void) { struct fi_info *hints_dup; int ret; char *src_addr; char *dest_addr; /* Get a fi_info corresponding to a wild card port. The first endpoint * should use default/given port since that is what is known to both * client and server. For other endpoints we should use addresses with * random ports to avoid collision. fi_getinfo should return a random * port if we don't specify it in the service arg or the hints. This * is used only for non-MSG endpoints. */ hints_dup = fi_dupinfo(hints); if (!hints_dup) return -FI_ENOMEM; /* After calling fi_dupinfo, libfabric owns all the memory for hints_dup, * so we don't want to free it here. Instead we'll save the pointers, * set them to NULL, then restore them just before calling fi_freeinfo. */ src_addr = hints_dup->src_addr; dest_addr = hints_dup->dest_addr; hints_dup->src_addr = NULL; hints_dup->dest_addr = NULL; hints_dup->src_addrlen = 0; hints_dup->dest_addrlen = 0; if (opts.dst_addr) { ret = fi_getinfo(FT_FIVERSION, opts.dst_addr, NULL, 0, hints_dup, &fi_dup); } else { ret = fi_getinfo(FT_FIVERSION, opts.src_addr, NULL, FI_SOURCE, hints_dup, &fi_dup); } if (ret) FT_PRINTERR("fi_getinfo", ret); hints_dup->src_addr = src_addr; hints_dup->dest_addr = dest_addr; fi_freeinfo(hints_dup); return ret; } static int alloc_eps(void) { int i, ret; ep_array = calloc(ep_cnt, sizeof(*ep_array)); if (!ep_array) return -FI_ENOMEM; ret = fi_endpoint(domain, fi, &ep_array[0], NULL); if (ret) { FT_PRINTERR("fi_endpoint", ret); return ret; } for (i = 1; i < ep_cnt; i++) { if (hints->ep_attr->type == FI_EP_MSG) ret = fi_endpoint(domain, fi, &ep_array[i], NULL); else ret = fi_endpoint(domain, fi_dup, &ep_array[i], NULL); if (ret) { FT_PRINTERR("fi_endpoint", ret); return ret; } } return 0; } static int enable_eps(void) { int i, ret; for (i = 0; i < ep_cnt; i++) { ret = ft_enable_ep(ep_array[i], eq, av, txcq, rxcq, txcntr, rxcntr, rma_cntr); if (ret) return ret; } return 0; } static int run_test() { int ret, i; /* Post recvs */ for (i = 0; i < ep_cnt; i++) { if (srx) { fprintf(stdout, "Posting recv #%d for shared rx ctx\n", i); ret = ft_post_rx(srx, rx_size, &rx_ctx_arr[i].context); } else { fprintf(stdout, "Posting recv for endpoint #%d\n", i); ret = ft_post_rx(ep_array[i], rx_size, &rx_ctx_arr[i].context); } if (ret) return ret; } if (opts.dst_addr) { /* Post sends addressed to remote EPs */ for (i = 0; i < ep_cnt; i++) { if (stx) fprintf(stdout, "Posting send #%d to shared tx ctx\n", i); else fprintf(stdout, "Posting send to endpoint #%d\n", i); ret = ft_tx(ep_array[i], addr_array[i], tx_size, &tx_ctx_arr[i].context); if (ret) return ret; } } /* Wait for recv completions */ ret = ft_get_rx_comp(rx_seq - 1); if (ret) return ret; if (!opts.dst_addr) { /* Post sends addressed to remote EPs */ for (i = 0; i < ep_cnt; i++) { if (stx) fprintf(stdout, "Posting send #%d to shared tx ctx\n", i); else fprintf(stdout, "Posting send to endpoint #%d\n", i); ret = ft_tx(ep_array[i], addr_array[i], tx_size, &tx_ctx_arr[i].context); if (ret) return ret; } } return 0; } static int init_av(void) { int ret; int i; if (opts.dst_addr) { ret = ft_av_insert(av, fi->dest_addr, 1, &addr_array[0], 0, NULL); if (ret) return ret; } for (i = 0; i < ep_cnt; i++) { addrlen = tx_size; ret = fi_getname(&ep_array[i]->fid, tx_buf + ft_tx_prefix_size(), &addrlen); if (ret) { FT_PRINTERR("fi_getname", ret); return ret; } if (opts.dst_addr) { ret = ft_tx(ep_array[0], addr_array[0], addrlen, &tx_ctx); if (ret) return ret; if (srx) ret = ft_rx(srx, rx_size); else ret = ft_rx(ep_array[0], rx_size); if (ret) return ret; /* Skip the first address since we already have it in AV */ if (i) { ret = ft_av_insert(av, rx_buf + ft_rx_prefix_size(), 1, &addr_array[i], 0, NULL); if (ret) return ret; } } else { if (srx) ret = ft_rx(srx, rx_size); else ret = ft_rx(ep_array[0], rx_size); if (ret) return ret; ret = ft_av_insert(av, rx_buf + ft_rx_prefix_size(), 1, &addr_array[i], 0, NULL); if (ret) return ret; ret = ft_tx(ep_array[0], addr_array[0], addrlen, &tx_ctx); if (ret) return ret; } } /* ACK */ if (opts.dst_addr) { ret = ft_tx(ep_array[0], addr_array[0], 1, &tx_ctx); } else { if (srx) ret = ft_rx(srx, rx_size); else ret = ft_rx(ep_array[0], rx_size); } return ret; } static int init_fabric(void) { int ret; ret = ft_init(); if (ret) return ret; ret = ft_init_oob(); if (ret) return ret; ret = ft_getinfo(hints, &fi); if (ret) return ret; ret = get_dupinfo(); if (ret) return ret; ret = ft_open_fabric_res(); if (ret) return ret; av_attr.count = ep_cnt; ret = ft_alloc_ep_res(fi, &txcq, &rxcq, &txcntr, &rxcntr, NULL, &av); if (ret) return ret; ret = alloc_eps(); if (ret) return ret; ret = enable_eps(); if (ret) return ret; ret = ft_alloc_msgs(); if (ret) return ret; /* Post recv */ if (srx) ret = ft_post_rx(srx, MAX(rx_size, FT_MAX_CTRL_MSG), &rx_ctx); else ret = ft_post_rx(ep_array[0], MAX(rx_size, FT_MAX_CTRL_MSG), &rx_ctx); if (ret) return ret; ret = init_av(); return ret; } static int client_connect(void) { struct fi_eq_cm_entry entry; uint32_t event; ssize_t rd; int i, ret; ret = ft_init(); if (ret) return ret; ret = ft_init_oob(); if (ret) return ret; ret = ft_getinfo(hints, &fi); if (ret) return ret; ret = get_dupinfo(); if (ret) return ret; ret = ft_open_fabric_res(); if (ret) return ret; ret = ft_alloc_ep_res(fi, &txcq, &rxcq, &txcntr, &rxcntr, NULL, &av); if (ret) return ret; ret = alloc_eps(); if (ret) return ret; ret = enable_eps(); if (ret) return ret; for (i = 0; i < ep_cnt; i++) { ret = fi_connect(ep_array[i], fi->dest_addr, NULL, 0); if (ret) { FT_PRINTERR("fi_connect", ret); return ret; } rd = fi_eq_sread(eq, &event, &entry, sizeof entry, -1, 0); if (rd != sizeof entry) { FT_PROCESS_EQ_ERR(rd, eq, "fi_eq_sread", "connect"); ret = (int) rd; return ret; } if (event != FI_CONNECTED || entry.fid != &ep_array[i]->fid) { fprintf(stderr, "Unexpected CM event %d fid %p (ep %p)\n", event, entry.fid, ep); ret = -FI_EOTHER; return ret; } } /* Post recv */ if (srx) ret = ft_post_rx(srx, MAX(rx_size, FT_MAX_CTRL_MSG), &rx_ctx); else ret = ft_post_rx(ep_array[0], MAX(rx_size, FT_MAX_CTRL_MSG), &rx_ctx); if (ret) return ret; return 0; } static int server_connect(void) { struct fi_eq_cm_entry entry; uint32_t event; ssize_t rd; int ret, k; int num_conn_reqs = 0, num_connected = 0; struct ep_info *ep_state_array = NULL; ep_array = calloc(ep_cnt, sizeof(*ep_array)); if (!ep_array) return -FI_ENOMEM; ep_state_array = calloc(ep_cnt, sizeof(*ep_state_array)); if (!ep_state_array) return -FI_ENOMEM; while (num_connected != ep_cnt) { rd = fi_eq_sread(eq, &event, &entry, sizeof entry, -1, 0); if (rd != sizeof entry) { FT_PROCESS_EQ_ERR(rd, eq, "fi_eq_sread", "cm-event"); ret = (int) rd; goto err; } switch(event) { case FI_CONNREQ: if (num_conn_reqs == ep_cnt) { fprintf(stderr, "Unexpected CM event %d\n", event); ret = -FI_EOTHER; goto err; } fi = ep_state_array[num_conn_reqs].fi = entry.info; ep_state_array[num_conn_reqs].state = FT_EP_CONNECT_RCVD; if (num_conn_reqs == 0) { ret = ft_open_domain_res(); if (ret) goto err; ret = ft_alloc_ep_res(fi, &txcq, &rxcq, &txcntr, &rxcntr, NULL, &av); if (ret) goto err; } ret = fi_endpoint(domain, fi, &ep_array[num_conn_reqs], NULL); if (ret) { FT_PRINTERR("fi_endpoint", ret); goto err; } ep_state_array[num_conn_reqs].ep = ep_array[num_conn_reqs]; ret = ft_enable_ep(ep_array[num_conn_reqs], eq, av, txcq, rxcq, txcntr, rxcntr, rma_cntr); if (ret) goto err; ret = fi_accept(ep_array[num_conn_reqs], NULL, 0); if (ret) { FT_PRINTERR("fi_accept", ret); goto err; } ep_state_array[num_conn_reqs].state = FT_EP_CONNECTING; num_conn_reqs++; break; case FI_CONNECTED: if (num_conn_reqs <= num_connected) { ret = -FI_EOTHER; goto err; } for (k = 0; k < num_conn_reqs; k++) { if (ep_state_array[k].state != FT_EP_CONNECTING) continue; if (&ep_state_array[k].ep->fid == entry.fid) { ep_state_array[k].state = FT_EP_CONNECTED; num_connected++; if (num_connected != ep_cnt) fi_freeinfo(ep_state_array[k].fi); break; } } if (k == num_conn_reqs) { fprintf(stderr, "Unexpected CM event %d fid %p (ep %p)\n", event, entry.fid, ep); ret = -FI_EOTHER; goto err; } break; default: ret = -FI_EOTHER; goto err; } } /* Post recv */ if (srx) ret = ft_post_rx(srx, MAX(rx_size, FT_MAX_CTRL_MSG), &rx_ctx); else ret = ft_post_rx(ep_array[0], MAX(rx_size, FT_MAX_CTRL_MSG), &rx_ctx); if (ret) goto err; free(ep_state_array); return 0; err: for (k = 0; k < ep_cnt; k++) { switch(ep_state_array[k].state) { case FT_EP_CONNECT_RCVD: fi_reject(pep, ep_state_array[k].fi->handle, NULL, 0); break; case FT_EP_CONNECTING: case FT_EP_CONNECTED: fi_shutdown(ep_state_array[k].ep, 0); break; case FT_EP_STATE_INIT: default: break; } } free(ep_state_array); return ret; } static int run(void) { int ret = 0; addr_array = calloc(ep_cnt, sizeof(*addr_array)); if (!addr_array) { perror("malloc"); return -FI_ENOMEM; } if (hints->ep_attr->type == FI_EP_MSG) { if (!opts.dst_addr) { ret = ft_start_server(); if (ret) return ret; } ret = opts.dst_addr ? client_connect() : server_connect(); } else { ret = init_fabric(); } if (ret) return ret; ret = run_test(); /* TODO: Add a local finalize applicable to shared ctx */ //ft_finalize(fi, ep_array[0], txcq, rxcq, addr_array[0]); return ret; } int main(int argc, char **argv) { int op, ret; int option_index = 0; int use_stx = 1, use_srx = 1; struct option long_options[] = { {"no-tx-shared-ctx", no_argument, &use_stx, 0}, {"no-rx-shared-ctx", no_argument, &use_srx, 0}, {"ep-count", required_argument, 0, FT_EP_CNT}, {0, 0, 0, 0}, }; opts = INIT_OPTS; opts.options |= FT_OPT_SIZE; hints = fi_allocinfo(); if (!hints) return EXIT_FAILURE; while ((op = getopt_long(argc, argv, "h" ADDR_OPTS INFO_OPTS API_OPTS, long_options, &option_index)) != -1) { switch (op) { case FT_EP_CNT: ep_cnt = atoi(optarg); if (ep_cnt <= 0) { FT_ERR("ep_count needs to be greater than 0\n"); return EXIT_FAILURE; } hints->domain_attr->ep_cnt = ep_cnt; break; default: ft_parse_addr_opts(op, optarg, &opts); ft_parseinfo(op, optarg, hints, &opts); ft_parse_api_opts(op, optarg, hints, &opts); break; case '?': case 'h': ft_usage(argv[0], "An RDM client-server example that uses" " shared context.\n"); FT_PRINT_OPTS_USAGE("--no-tx-shared-ctx", "Disable shared context for TX"); FT_PRINT_OPTS_USAGE("--no-rx-shared-ctx", "Disable shared context for RX"); FT_PRINT_OPTS_USAGE("--ep-count (default: 4)", "# of endpoints to be opened"); return EXIT_FAILURE; } } if (optind < argc) opts.dst_addr = argv[optind]; if (!(hints->caps & FI_TAGGED)) hints->caps = FI_MSG; hints->mode = FI_CONTEXT; hints->domain_attr->mr_mode = opts.mr_mode; hints->addr_format = opts.address_format; if (use_stx) { opts.options |= FT_OPT_STX; hints->ep_attr->tx_ctx_cnt = FI_SHARED_CONTEXT; } if (use_srx) { opts.options |= FT_OPT_SRX; hints->ep_attr->rx_ctx_cnt = FI_SHARED_CONTEXT; } ret = run(); FT_CLOSEV_FID(ep_array, ep_cnt); ft_free_res(); free(addr_array); free(ep_array); fi_freeinfo(fi_dup); return ft_exit_code(ret); }