import errno import unittest from pyverbs.pyverbs_error import PyverbsRDMAError from tests.base import RCResources, UDResources from tests.base import RDMATestCase from tests.utils import traffic from pyverbs.cq import CQ, CompChannel def create_cq_with_comp_channel(agr_obj): agr_obj.comp_channel = CompChannel(agr_obj.ctx) agr_obj.cq = CQ(agr_obj.ctx, agr_obj.num_msgs, None, agr_obj.comp_channel) agr_obj.cq.req_notify() class CqEventsUD(UDResources): def create_cq(self): create_cq_with_comp_channel(self) class CqEventsRC(RCResources): def create_cq(self): create_cq_with_comp_channel(self) class CqEventsTestCase(RDMATestCase): def setUp(self): super().setUp() self.iters = 100 def test_cq_events_ud(self): self.create_players(CqEventsUD) traffic(**self.traffic_args) def test_cq_events_rc(self): self.create_players(CqEventsRC) traffic(**self.traffic_args)