from pyverbs.mem_alloc import mmap, munmap, madvise, MAP_ANONYMOUS_, MAP_PRIVATE_, \ MAP_HUGETLB_ from tests.base import RCResources, UDResources, XRCResources from pyverbs.qp import QPCap, QPAttr, QPInitAttr from pyverbs.wr import SGE, SendWR, RecvWR from tests.base import RDMATestCase from pyverbs.mr import MR import pyverbs.enums as e import tests.utils as u HUGE_PAGE_SIZE = 0x200000 class OdpUD(UDResources): def __init__(self, request_user_addr=False, **kwargs): self.request_user_addr = request_user_addr self.user_addr = None super(OdpUD, self).__init__(**kwargs) @u.requires_odp('ud', e.IBV_ODP_SUPPORT_SEND) def create_mr(self): if self.request_user_addr: self.user_addr = mmap(length=self.msg_size, flags=MAP_ANONYMOUS_ | MAP_PRIVATE_) self.send_mr = MR(self.pd, self.msg_size + u.GRH_SIZE, e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND, address=self.user_addr) self.recv_mr = MR(self.pd, self.msg_size + u.GRH_SIZE, e.IBV_ACCESS_LOCAL_WRITE) class OdpRC(RCResources): def __init__(self, dev_name, ib_port, gid_index, is_huge=False, request_user_addr=False, use_mr_prefetch=None, is_implicit=False, prefetch_advice=e._IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE, msg_size=1024, odp_caps=e.IBV_ODP_SUPPORT_SEND | e.IBV_ODP_SUPPORT_RECV, use_mixed_mr=False): """ Initialize an OdpRC object. :param dev_name: Device name to be used :param ib_port: IB port of the device to use :param gid_index: Which GID index to use :param is_huge: If True, use huge pages for MR registration :param request_user_addr: Request to provide the MR's buffer address. If False, the buffer will be allocated by pyverbs. :param use_mr_prefetch: Describes the properties of the prefetch operation. The options are 'sync', 'async' and None to skip the prefetch operation. :param is_implicit: If True, register implicit MR. :param prefetch_advice: The advice of the prefetch request (ignored if use_mr_prefetch is None). :param use_mixed_mr: If True, create a non-ODP MR in addition to the ODP MR. """ self.is_huge = is_huge self.request_user_addr = request_user_addr self.is_implicit = is_implicit self.odp_caps = odp_caps self.access = e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND | \ e.IBV_ACCESS_REMOTE_ATOMIC | e.IBV_ACCESS_REMOTE_READ | \ e.IBV_ACCESS_REMOTE_WRITE self.user_addr = None self.use_mixed_mr = use_mixed_mr self.non_odp_mr = None super(OdpRC, self).__init__(dev_name=dev_name, ib_port=ib_port, gid_index=gid_index) self.use_mr_prefetch = use_mr_prefetch self.prefetch_advice = prefetch_advice self.msg_size = msg_size @u.requires_odp('rc', e.IBV_ODP_SUPPORT_SEND | e.IBV_ODP_SUPPORT_RECV) def create_mr(self): u.odp_supported(self.ctx, 'rc', self.odp_caps) if self.request_user_addr: mmap_flags = MAP_ANONYMOUS_| MAP_PRIVATE_ length = self.msg_size if self.is_huge: mmap_flags |= MAP_HUGETLB_ length = HUGE_PAGE_SIZE self.user_addr = mmap(length=length, flags=mmap_flags) access = self.access if self.is_huge: access |= e.IBV_ACCESS_HUGETLB self.mr = MR(self.pd, self.msg_size, access, address=self.user_addr, implicit=self.is_implicit) if self.use_mixed_mr: self.non_odp_mr = MR(self.pd, self.msg_size, e.IBV_ACCESS_LOCAL_WRITE) def create_qp_init_attr(self): return QPInitAttr(qp_type=e.IBV_QPT_RC, scq=self.cq, sq_sig_all=0, rcq=self.cq, srq=self.srq, cap=self.create_qp_cap()) def create_qp_attr(self): qp_attr = QPAttr(port_num=self.ib_port) qp_attr.qp_access_flags = self.access return qp_attr def create_qp_cap(self): if self.use_mixed_mr: return QPCap(max_recv_wr=self.num_msgs, max_send_sge=2, max_recv_sge=2) return super().create_qp_cap() class OdpXRC(XRCResources): def __init__(self, request_user_addr=False, **kwargs): self.request_user_addr = request_user_addr self.user_addr = None super(OdpXRC, self).__init__(**kwargs) @u.requires_odp('xrc', e.IBV_ODP_SUPPORT_SEND | e.IBV_ODP_SUPPORT_SRQ_RECV) def create_mr(self): if self.request_user_addr: self.user_addr = mmap(length=self.msg_size, flags=MAP_ANONYMOUS_| MAP_PRIVATE_) self.mr = u.create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND, user_addr=self.user_addr) class OdpTestCase(RDMATestCase): def setUp(self): super(OdpTestCase, self).setUp() self.iters = 100 self.force_page_faults = True self.is_huge = False def create_players(self, resource, **resource_arg): """ Init odp tests resources. :param resource: The RDMA resources to use. A class of type BaseResources. :param resource_arg: Dict of args that specify the resource specific attributes. """ sync_attrs = False if resource == OdpUD else True super().create_players(resource, sync_attrs, **resource_arg) self.traffic_args['force_page_faults'] = self.force_page_faults def tearDown(self): if self.server and self.server.user_addr: length = HUGE_PAGE_SIZE if self.is_huge else self.server.msg_size munmap(self.server.user_addr, length) if self.client and self.client.user_addr: length = HUGE_PAGE_SIZE if self.is_huge else self.client.msg_size munmap(self.client.user_addr, length) super(OdpTestCase, self).tearDown() def test_odp_rc_traffic(self): self.create_players(OdpRC, request_user_addr=self.force_page_faults) u.traffic(**self.traffic_args) def test_odp_rc_mixed_mr(self): self.create_players(OdpRC, request_user_addr=self.force_page_faults, use_mixed_mr=True) u.traffic(**self.traffic_args) def test_odp_rc_atomic_cmp_and_swp(self): self.force_page_faults = False self.create_players(OdpRC, request_user_addr=self.force_page_faults, msg_size=8, odp_caps=e.IBV_ODP_SUPPORT_ATOMIC) u.atomic_traffic(**self.traffic_args, send_op=e.IBV_WR_ATOMIC_CMP_AND_SWP) u.atomic_traffic(**self.traffic_args, receiver_val=1, sender_val=1, send_op=e.IBV_WR_ATOMIC_CMP_AND_SWP) def test_odp_rc_atomic_fetch_and_add(self): self.force_page_faults = False self.create_players(OdpRC, request_user_addr=self.force_page_faults, msg_size=8, odp_caps=e.IBV_ODP_SUPPORT_ATOMIC) u.atomic_traffic(**self.traffic_args, send_op=e.IBV_WR_ATOMIC_FETCH_AND_ADD) def test_odp_rc_rdma_read(self): self.create_players(OdpRC, request_user_addr=self.force_page_faults, odp_caps=e.IBV_ODP_SUPPORT_READ) self.server.mr.write('s' * self.server.msg_size, self.server.msg_size) u.rdma_traffic(**self.traffic_args, send_op=e.IBV_WR_RDMA_READ) def test_odp_rc_rdma_write(self): self.create_players(OdpRC, request_user_addr=self.force_page_faults, odp_caps=e.IBV_ODP_SUPPORT_WRITE) u.rdma_traffic(**self.traffic_args, send_op=e.IBV_WR_RDMA_WRITE) def test_odp_implicit_rc_traffic(self): self.create_players(OdpRC, request_user_addr=self.force_page_faults, is_implicit=True) u.traffic(**self.traffic_args) def test_odp_ud_traffic(self): self.create_players(OdpUD, request_user_addr=self.force_page_faults) # Implement the traffic here because OdpUD uses two different MRs for # send and recv. ah_client = u.get_global_ah(self.client, self.gid_index, self.ib_port) recv_sge = SGE(self.server.recv_mr.buf, self.server.msg_size + u.GRH_SIZE, self.server.recv_mr.lkey) server_recv_wr = RecvWR(sg=[recv_sge], num_sge=1) send_sge = SGE(self.client.send_mr.buf + u.GRH_SIZE, self.client.msg_size, self.client.send_mr.lkey) client_send_wr = SendWR(num_sge=1, sg=[send_sge]) for i in range(self.iters): madvise(self.client.send_mr.buf, self.client.msg_size) self.server.qp.post_recv(server_recv_wr) u.post_send(self.client, client_send_wr, ah=ah_client) u.poll_cq(self.client.cq) u.poll_cq(self.server.cq) def test_odp_xrc_traffic(self): self.create_players(OdpXRC, request_user_addr=self.force_page_faults) u.xrc_traffic(self.client, self.server) @u.requires_huge_pages() def test_odp_rc_huge_traffic(self): self.force_page_faults = False self.create_players(OdpRC, request_user_addr=self.force_page_faults, is_huge=True) u.traffic(**self.traffic_args) @u.requires_huge_pages() def test_odp_rc_huge_user_addr_traffic(self): self.is_huge = True self.create_players(OdpRC, request_user_addr=self.force_page_faults, is_huge=True) u.traffic(**self.traffic_args) def test_odp_sync_prefetch_rc_traffic(self): for advice in [e._IBV_ADVISE_MR_ADVICE_PREFETCH, e._IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE]: self.create_players(OdpRC, request_user_addr=self.force_page_faults, use_mr_prefetch='sync', prefetch_advice=advice) u.traffic(**self.traffic_args) def test_odp_async_prefetch_rc_traffic(self): for advice in [e._IBV_ADVISE_MR_ADVICE_PREFETCH, e._IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE]: self.create_players(OdpRC, request_user_addr=self.force_page_faults, use_mr_prefetch='async', prefetch_advice=advice) u.traffic(**self.traffic_args) def test_odp_implicit_sync_prefetch_rc_traffic(self): self.create_players(OdpRC, request_user_addr=self.force_page_faults, use_mr_prefetch='sync', is_implicit=True) u.traffic(**self.traffic_args) def test_odp_implicit_async_prefetch_rc_traffic(self): self.create_players(OdpRC, request_user_addr=self.force_page_faults, use_mr_prefetch='async', is_implicit=True) u.traffic(**self.traffic_args) def test_odp_prefetch_sync_no_page_fault_rc_traffic(self): prefetch_advice = e._IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT self.create_players(OdpRC, request_user_addr=self.force_page_faults, use_mr_prefetch='sync', prefetch_advice=prefetch_advice) u.traffic(**self.traffic_args) def test_odp_prefetch_async_no_page_fault_rc_traffic(self): prefetch_advice = e._IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT self.create_players(OdpRC, request_user_addr=self.force_page_faults, use_mr_prefetch='async', prefetch_advice=prefetch_advice) u.traffic(**self.traffic_args)