import unittest import datetime import errno from pyverbs.enums import IBV_WC_EX_WITH_COMPLETION_TIMESTAMP as FREE_RUNNING, \ IBV_WC_EX_WITH_COMPLETION_TIMESTAMP_WALLCLOCK as REAL_TIME from tests.base import RCResources, RDMATestCase, PyverbsAPITestCase from pyverbs.providers.mlx5.mlx5dv import Mlx5Context from pyverbs.pyverbs_error import PyverbsRDMAError from pyverbs.cq import CqInitAttrEx, CQEX from tests.test_flow import FlowRes from pyverbs.qp import QPInitAttr from pyverbs.cq import PollCqAttr import pyverbs.enums as e import tests.utils as u GIGA = 1000000000 def convert_ts_to_ns(ctx, device_ts): """ Convert device timestamp from HCA core clock units to corresponding nanosecond counts. :param ctx: The context that gets this timestamp. :param device_ts: The device timestamp to translate. :return: Timestamp in nanoseconds """ try: timestamp_in_ns = Mlx5Context.device_timestamp_to_ns(ctx, device_ts) except PyverbsRDMAError as ex: if ex.error_code == errno.EOPNOTSUPP: raise unittest.SkipTest('Converting timestamp to nanoseconds is not supported') raise ex return timestamp_in_ns def timestamp_res_cls(base_class): """ This is a factory function which creates a class that inherits base_class of any BaseResources type. :param base_class: The base resources class to inherit from. :return: TimeStampRes class. """ class TimeStampRes(base_class): def __init__(self, dev_name, ib_port, gid_index, qp_type, send_ts=None, recv_ts=None): self.qp_type = qp_type self.send_ts = send_ts self.recv_ts = recv_ts self.timestamp = None self.scq = None self.rcq = None super().__init__(dev_name=dev_name, ib_port=ib_port, gid_index=gid_index) def create_cq(self): self.scq = self._create_ex_cq(self.send_ts) self.rcq = self._create_ex_cq(self.recv_ts) def _create_ex_cq(self, timestamp=None): """ Create an Extended CQ. :param timestamp: If set, the timestamp type to use. """ wc_flags = e.IBV_WC_STANDARD_FLAGS if timestamp: wc_flags |= timestamp cia = CqInitAttrEx(cqe=self.num_msgs, wc_flags=wc_flags) try: cq = CQEX(self.ctx, cia) except PyverbsRDMAError as ex: if ex.error_code == errno.EOPNOTSUPP: raise unittest.SkipTest('Create Extended CQ is not supported') raise ex return cq def create_qp_init_attr(self): return QPInitAttr(qp_type=self.qp_type, scq=self.scq, rcq=self.rcq, srq=self.srq, cap=self.create_qp_cap()) return TimeStampRes class TimeStampTest(RDMATestCase): """ Test various types of timestamping formats. """ def setUp(self): super().setUp() self.send_ts = None self.recv_ts = None self.qp_type = None @property def resource_arg(self): return {'send_ts': self.send_ts, 'recv_ts': self.recv_ts, 'qp_type': self.qp_type} def test_timestamp_free_running_rc_traffic(self): """ Test free running timestamp on RC traffic. """ self.qp_type = e.IBV_QPT_RC self.send_ts = self.recv_ts = FREE_RUNNING self.create_players(timestamp_res_cls(RCResources), **self.resource_arg) self.ts_traffic() timestamp = convert_ts_to_ns(self.client.ctx, self.client.timestamp) self.verify_ts(timestamp) def test_timestamp_real_time_rc_traffic(self): """ Test real time timestamp on RC traffic. """ self.qp_type = e.IBV_QPT_RC self.send_ts = self.recv_ts = REAL_TIME self.create_players(timestamp_res_cls(RCResources), **self.resource_arg) self.ts_traffic() self.verify_ts(self.client.timestamp) def test_timestamp_free_running_send_raw_traffic(self): """ Test timestamping on RAW traffic only on the send completions. """ self.qp_type = e.IBV_QPT_RAW_PACKET self.send_ts = FREE_RUNNING self.create_players(timestamp_res_cls(FlowRes), **self.resource_arg) self.flow = self.server.create_flow([self.server.create_eth_spec()]) self.ts_traffic() timestamp = convert_ts_to_ns(self.client.ctx, self.client.timestamp) self.verify_ts(timestamp) def test_timestamp_free_running_recv_raw_traffic(self): """ Test timestamping on RAW traffic only on the recv completions. """ self.qp_type = e.IBV_QPT_RAW_PACKET self.recv_ts = FREE_RUNNING self.create_players(timestamp_res_cls(FlowRes), **self.resource_arg) self.flow = self.server.create_flow([self.server.create_eth_spec()]) self.ts_traffic() timestamp = convert_ts_to_ns(self.server.ctx, self.server.timestamp) self.verify_ts(timestamp) def test_timestamp_real_time_raw_traffic(self): """ Test real time timestamp on RAW traffic. """ self.qp_type = e.IBV_QPT_RAW_PACKET self.send_ts = self.recv_ts = REAL_TIME self.create_players(timestamp_res_cls(FlowRes), **self.resource_arg) self.flow = self.server.create_flow([self.server.create_eth_spec()]) self.ts_traffic() self.verify_ts(self.client.timestamp) @staticmethod def verify_ts(timestamp): """ Verify that the timestamp is a valid value of time. """ datetime.datetime.fromtimestamp(timestamp/GIGA) @staticmethod def poll_cq_ex_ts(cqex, ts_type=None): """ Poll completion from the extended CQ. :param cqex: CQEX to poll from :param ts_type: If set, read the CQE timestamp in this format :return: The CQE timestamp if it requested. """ polling_timeout = 10 start = datetime.datetime.now() ts = 0 poll_attr = PollCqAttr() ret = cqex.start_poll(poll_attr) while ret == 2 and (datetime.datetime.now() - start).seconds < polling_timeout: ret = cqex.start_poll(poll_attr) if ret == 2: raise PyverbsRDMAError('Failed to poll CQEX - Got timeout') if ret != 0: raise PyverbsRDMAError('Failed to poll CQEX') if cqex.status != e.IBV_WC_SUCCESS: raise PyverbsRDMAError('Completion status is {cqex.status}') if ts_type == FREE_RUNNING: ts = cqex.read_timestamp() if ts_type == REAL_TIME: ts = cqex.read_completion_wallclock_ns() cqex.end_poll() return ts def ts_traffic(self): """ Run RDMA traffic and read the completions timestamps. """ s_recv_wr = u.get_recv_wr(self.server) u.post_recv(self.server, s_recv_wr) if self.qp_type == e.IBV_QPT_RAW_PACKET: c_send_wr, _, _ = u.get_send_elements_raw_qp(self.client) else: c_send_wr, _ = u.get_send_elements(self.client, False) u.send(self.client, c_send_wr, e.IBV_WR_SEND, False, 0) self.client.timestamp = self.poll_cq_ex_ts(self.client.scq, ts_type=self.send_ts) self.server.timestamp = self.poll_cq_ex_ts(self.server.rcq, ts_type=self.recv_ts) class TimeAPITest(PyverbsAPITestCase): def test_query_rt_values(self): """ Test the ibv_query_rt_values_ex API. Query the device real-time values, convert them to ns and verify that the timestamp is a valid value of time.. """ try: _, hw_time = self.ctx.query_rt_values_ex() time_in_ns = convert_ts_to_ns(self.ctx, hw_time) datetime.datetime.fromtimestamp(time_in_ns/GIGA) except PyverbsRDMAError as ex: if ex.error_code == errno.EOPNOTSUPP: raise unittest.SkipTest('Query device real time is not supported') raise ex