use std::ptr; use std::ptr::NonNull; use rrddmma::rdma::qp::ExtCompareSwapParams; #[cfg(mlnx5)] fn main() { eprintln!("DC is not yet implemented on MLNX v5.x"); } #[cfg(mlnx4)] fn main() -> anyhow::Result<()> { use rrddmma::{prelude::*, wrap::RegisteredMem}; use std::thread; const LEN: usize = 16; fn client(ep: QpEndpoint, remote: MrRemote) -> anyhow::Result<()> { fn make_dci(dev: &str) -> anyhow::Result { use rrddmma::rdma::qp::ExpFeature::*; let Nic { context, ports } = Nic::finder().dev_name(dev).probe()?; let pd = Pd::new(&context)?; let cq = Cq::new(&context, Cq::DEFAULT_CQ_DEPTH)?; let mut qp = Qp::builder() .qp_type(QpType::DcIni) .caps(QpCaps::for_dc_ini()) .send_cq(&cq) .recv_cq(&cq) .sq_sig_all(true) .enable_feature(ExtendedAtomics, 16) .build(&pd)?; qp.bind_local_port(&ports[0], None)?; Ok(qp) } let mut qp = make_dci("mlx5_0").inspect_err(|e| eprintln!("Err in creating DCI QP: {:?}", e))?; let peer = qp.make_peer(&ep)?; qp.set_dc_peer(&peer); // Issue a CAS. fn ptr_to(val: &[u64; 2]) -> NonNull { NonNull::new(val.as_ptr() as *mut u64).unwrap() } let cmp = [0x0123456789abcdefu64, 0x1145141919810abcu64]; let cmp_mask = [0xffffffffffffffffu64, 0xffffffffffffffffu64]; let swap = [0xdeadbeefdeadbeefu64, 0x8badf00d8badf00du64]; let swap_mask = [0xffffffffffffffffu64, 0xffffffffffffffffu64]; let mut mem = RegisteredMem::new(qp.pd(), 4096)?; unsafe { ptr::write_bytes(mem.as_mut_ptr(), 0, LEN); let params = ExtCompareSwapParams { compare: ptr_to(&cmp), swap: ptr_to(&swap), compare_mask: ptr_to(&cmp_mask), swap_mask: ptr_to(&swap_mask), }; qp.ext_compare_swap::(&mem.slice(0, LEN).unwrap(), &remote, ¶ms, 0, true)?; } qp.scq().poll_one_blocking_consumed(); unsafe { println!( "client: {:#x} {:#x}", ptr::read::(mem.as_ptr() as _).swap_bytes(), ptr::read::(mem.as_ptr().add(8) as _).swap_bytes() ) }; Ok(()) } fn make_dct(dev: &str) -> anyhow::Result { let Nic { context, ports } = Nic::finder().dev_name(dev).probe()?; let pd = Pd::new(&context)?; let cq = Cq::new(&context, Cq::DEFAULT_CQ_DEPTH)?; let dct = Dct::builder() .pd(&pd) .cq(&cq) .port(&ports[0], None) .inline_size(64) .build(&context)?; Ok(dct) } let dct = make_dct("mlx5_0")?; let mut mem = RegisteredMem::new(dct.pd(), 4096)?; unsafe { ptr::write_volatile(mem.as_mut_ptr() as *mut u64, 0x0123456789abcdefu64); ptr::write_volatile(mem.as_mut_ptr().add(8) as *mut u64, 0x1145141919810abcu64); } let slice = MrRemote::from(mem.slice(0, LEN).unwrap()); let ep = dct.endpoint(); let cli = thread::spawn(move || client(ep, slice)); cli.join().unwrap()?; unsafe { println!( "server: {:#x} {:#x}", ptr::read::(mem.as_ptr() as _), ptr::read::(mem.as_ptr().add(8) as _) ) }; Ok(()) }