use dust_dds::{ domain::domain_participant_factory::DomainParticipantFactory, infrastructure::{ error::DdsError, instance::InstanceHandle, qos::{DataReaderQos, DataWriterQos, QosKind, TopicQos}, qos_policy::{ DeadlineQosPolicy, DestinationOrderQosPolicy, DestinationOrderQosPolicyKind, DurabilityQosPolicy, DurabilityQosPolicyKind, HistoryQosPolicy, HistoryQosPolicyKind, Length, LifespanQosPolicy, OwnershipQosPolicy, OwnershipQosPolicyKind, OwnershipStrengthQosPolicy, ReliabilityQosPolicy, ReliabilityQosPolicyKind, ResourceLimitsQosPolicy, TimeBasedFilterQosPolicy, WriterDataLifecycleQosPolicy, }, status::{StatusKind, NO_STATUS}, time::{Duration, DurationKind, Time}, wait_set::{Condition, WaitSet}, }, subscription::sample_info::{ InstanceStateKind, SampleStateKind, ViewStateKind, ANY_INSTANCE_STATE, ANY_SAMPLE_STATE, ANY_VIEW_STATE, }, topic_definition::type_support::DdsType, }; mod utils; use crate::utils::domain_id_generator::TEST_DOMAIN_ID_GENERATOR; #[derive(Debug, PartialEq, DdsType)] struct UserData(u8); #[derive(Clone, Debug, PartialEq, DdsType)] struct KeyedData { #[dust_dds(key)] id: u8, value: u32, } #[derive(Debug, PartialEq, DdsType)] struct LargeData { #[dust_dds(key)] id: u8, value: Vec, } #[test] fn large_data_should_be_fragmented() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::( "LargeDataTopic", "LargeData", QosKind::Default, None, NO_STATUS, ) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::BestEffort, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::BestEffort, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let even_data = LargeData { id: 1, value: vec![8; 15000], }; writer.write(&even_data, None).unwrap(); let cond = reader.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::DataAvailable]) .unwrap(); let mut reader_wait_set = WaitSet::new(); reader_wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); reader_wait_set.wait(Duration::new(10, 0)).unwrap(); let samples = reader .take(3, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 1); assert_eq!(samples[0].data().unwrap(), even_data); let odd_data = LargeData { id: 1, value: vec![8; 15001], }; writer.write(&odd_data, None).unwrap(); let cond = reader.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::DataAvailable]) .unwrap(); let mut reader_wait_set = WaitSet::new(); reader_wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); reader_wait_set.wait(Duration::new(10, 0)).unwrap(); let samples = reader .take(3, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 1); assert_eq!(samples[0].data().unwrap(), odd_data); } #[test] fn large_data_should_be_fragmented_reliable() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::( "LargeDataTopic", "LargeData", QosKind::Default, None, NO_STATUS, ) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let data = LargeData { id: 1, value: vec![8; 15000], }; writer.write(&data, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples = reader .take(3, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 1); assert_eq!(samples[0].data().unwrap(), data); } #[test] fn writer_with_keep_last_1_should_send_only_last_sample_to_reader() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::TransientLocal, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepLast(1), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 1, value: 2 }; let data3 = KeyedData { id: 1, value: 3 }; let data4 = KeyedData { id: 1, value: 4 }; let data5 = KeyedData { id: 1, value: 5 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); writer.write(&data3, None).unwrap(); writer.write(&data4, None).unwrap(); writer.write(&data5, None).unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::TransientLocal, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = reader.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::SubscriptionMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); reader .wait_for_historical_data(Duration::new(10, 0)) .unwrap(); let samples = reader .take(5, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 1); assert_eq!(samples[0].data().unwrap(), data5); } #[test] fn writer_with_keep_last_3_should_send_last_3_samples_to_reader() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::TransientLocal, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepLast(3), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 1, value: 2 }; let data3 = KeyedData { id: 1, value: 3 }; let data4 = KeyedData { id: 1, value: 4 }; let data5 = KeyedData { id: 1, value: 5 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); writer.write(&data3, None).unwrap(); writer.write(&data4, None).unwrap(); writer.write(&data5, None).unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::TransientLocal, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = reader.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::SubscriptionMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); reader .wait_for_historical_data(Duration::new(10, 0)) .unwrap(); let samples = reader .take(5, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!( samples.len(), 3, "Received wrong number of samples. Received samples: {:?}", samples ); assert_eq!(samples[0].data().unwrap(), data3); assert_eq!(samples[1].data().unwrap(), data4); assert_eq!(samples[2].data().unwrap(), data5); } #[test] fn samples_are_taken() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 10 }; let data3 = KeyedData { id: 3, value: 20 }; let data4 = KeyedData { id: 4, value: 30 }; let data5 = KeyedData { id: 5, value: 40 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); writer.write(&data3, None).unwrap(); writer.write(&data4, None).unwrap(); writer.write(&data5, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples1 = reader .take(3, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); let samples2 = reader .take(3, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples1.len(), 3); assert_eq!(samples1[0].data().unwrap(), data1); assert_eq!(samples1[1].data().unwrap(), data2); assert_eq!(samples1[2].data().unwrap(), data3); assert_eq!(samples2.len(), 2); assert_eq!(samples2[0].data().unwrap(), data4); assert_eq!(samples2[1].data().unwrap(), data5); } #[test] fn wait_for_samples_to_be_taken_best_effort() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::BestEffort, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::BestEffort, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 10 }; let data3 = KeyedData { id: 3, value: 20 }; let data4 = KeyedData { id: 4, value: 30 }; let data5 = KeyedData { id: 5, value: 40 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); writer.write(&data3, None).unwrap(); writer.write(&data4, None).unwrap(); writer.write(&data5, None).unwrap(); let start_time = std::time::Instant::now(); let mut samples = Vec::new(); while start_time.elapsed() < std::time::Duration::from_secs(10) { if let Ok(sample) = reader.take(1, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) { samples.push(sample[0].data().unwrap()) } if samples.len() >= 5 { break; } } assert_eq!(samples.len(), 5); assert_eq!(samples[0], data1); assert_eq!(samples[1], data2); assert_eq!(samples[2], data3); assert_eq!(samples[3], data4); assert_eq!(samples[4], data5); } #[test] fn read_only_unread_samples() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 10 }; let data3 = KeyedData { id: 3, value: 20 }; let data4 = KeyedData { id: 4, value: 30 }; let data5 = KeyedData { id: 5, value: 40 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); writer.write(&data3, None).unwrap(); writer.write(&data4, None).unwrap(); writer.write(&data5, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples1 = reader .read( 3, &[SampleStateKind::NotRead], ANY_VIEW_STATE, ANY_INSTANCE_STATE, ) .unwrap(); let samples2 = reader .read( 3, &[SampleStateKind::NotRead], ANY_VIEW_STATE, ANY_INSTANCE_STATE, ) .unwrap(); let samples3 = reader.read( 3, &[SampleStateKind::NotRead], ANY_VIEW_STATE, ANY_INSTANCE_STATE, ); assert_eq!(samples1.len(), 3); assert_eq!(samples1[0].data().unwrap(), data1); assert_eq!(samples1[1].data().unwrap(), data2); assert_eq!(samples1[2].data().unwrap(), data3); assert_eq!(samples2.len(), 2); assert_eq!(samples2[0].data().unwrap(), data4); assert_eq!(samples2[1].data().unwrap(), data5); assert_eq!(samples3, Err(DdsError::NoData)); } #[test] fn read_next_sample() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 10 }; let data3 = KeyedData { id: 3, value: 20 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); writer.write(&data3, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); assert_eq!(reader.read_next_sample().unwrap().data().unwrap(), data1); assert_eq!(reader.read_next_sample().unwrap().data().unwrap(), data2); assert_eq!(reader.read_next_sample().unwrap().data().unwrap(), data3); assert_eq!(reader.read_next_sample(), Err(DdsError::NoData)); } #[test] fn take_next_sample() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 10 }; let data3 = KeyedData { id: 3, value: 20 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); writer.write(&data3, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); assert_eq!(reader.take_next_sample().unwrap().data().unwrap(), data1); assert_eq!(reader.take_next_sample().unwrap().data().unwrap(), data2); assert_eq!(reader.take_next_sample().unwrap().data().unwrap(), data3); assert_eq!(reader.take_next_sample(), Err(DdsError::NoData)); } #[test] fn each_key_sample_is_read() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 10 }; let data3 = KeyedData { id: 3, value: 20 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); writer.write(&data3, None).unwrap(); let data1_handle = writer.lookup_instance(&data1).unwrap(); let data2_handle = writer.lookup_instance(&data2).unwrap(); let data3_handle = writer.lookup_instance(&data3).unwrap(); writer .wait_for_acknowledgments(Duration::new(1, 0)) .unwrap(); let samples = reader .read(3, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 3); assert_eq!(samples[0].data().unwrap(), data1); assert_eq!( samples[0].sample_info().instance_handle, data1_handle.unwrap(), ); assert_eq!(samples[1].data().unwrap(), data2); assert_eq!( samples[1].sample_info().instance_handle, data2_handle.unwrap(), ); assert_eq!(samples[2].data().unwrap(), data3); assert_eq!( samples[2].sample_info().instance_handle, data3_handle.unwrap(), ); } #[test] fn read_specific_instance() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 10 }; let data3 = KeyedData { id: 3, value: 20 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); writer.write(&data3, None).unwrap(); let data1_handle = writer.lookup_instance(&data1).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples = reader .read_instance( 3, data1_handle.unwrap(), ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE, ) .unwrap(); assert_eq!(samples.len(), 1); assert_eq!(samples[0].data().unwrap(), data1); } #[test] fn read_next_instance() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 10 }; let data3 = KeyedData { id: 3, value: 20 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); writer.write(&data3, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples1 = reader .read_next_instance( 3, None, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE, ) .unwrap(); let samples2 = reader .read_next_instance( 3, Some(samples1[0].sample_info().instance_handle), ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE, ) .unwrap(); let samples3 = reader .read_next_instance( 3, Some(samples2[0].sample_info().instance_handle), ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE, ) .unwrap(); let samples4 = reader.read_next_instance( 3, Some(samples3[0].sample_info().instance_handle), ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE, ); assert_eq!(samples1[0].data().unwrap(), data1); assert_eq!(samples2[0].data().unwrap(), data2); assert_eq!(samples3[0].data().unwrap(), data3); assert_eq!(samples4, Err(DdsError::NoData)); } #[test] fn take_next_instance() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 10 }; let data3 = KeyedData { id: 3, value: 20 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); writer.write(&data3, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples1 = reader .take_next_instance( 3, None, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE, ) .unwrap(); let samples2 = reader .take_next_instance( 3, Some(samples1[0].sample_info().instance_handle), ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE, ) .unwrap(); let samples3 = reader .take_next_instance( 3, Some(samples2[0].sample_info().instance_handle), ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE, ) .unwrap(); let samples4 = reader.take_next_instance( 3, Some(samples3[0].sample_info().instance_handle), ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE, ); assert_eq!(samples1[0].data().unwrap(), data1); assert_eq!(samples2[0].data().unwrap(), data2); assert_eq!(samples3[0].data().unwrap(), data3); assert_eq!(samples4, Err(DdsError::NoData)); } #[test] fn take_specific_instance() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 10 }; let data3 = KeyedData { id: 3, value: 20 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); writer.write(&data3, None).unwrap(); let data1_handle = writer.lookup_instance(&data1).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples = reader .take_instance( 3, data1_handle.unwrap(), ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE, ) .unwrap(); assert_eq!(samples.len(), 1); assert_eq!(samples[0].data().unwrap(), data1); } #[test] fn take_specific_unknown_instance() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 10 }; let data3 = KeyedData { id: 3, value: 20 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); writer.write(&data3, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); assert_eq!( reader.take_instance( 3, InstanceHandle::new([99; 16]), ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE, ), Err(DdsError::BadParameter) ); } #[test] fn write_read_disposed_samples() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant_factory = DomainParticipantFactory::get_instance(); let participant = participant_factory .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; writer.write(&data1, None).unwrap(); writer.dispose(&data1, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples = reader .read(2, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 2); assert_eq!( samples[0].sample_info().instance_state, InstanceStateKind::NotAliveDisposed ); assert_eq!( samples[1].sample_info().instance_state, InstanceStateKind::NotAliveDisposed ); } #[test] #[ignore = "Dispose message not always sent after timing changes"] fn write_read_disposed_samples_when_writer_is_immediately_deleted() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant_factory = DomainParticipantFactory::get_instance(); let participant1 = participant_factory .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let participant2 = participant_factory .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic1 = participant1 .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant1 .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, ..Default::default() }; let writer = publisher .create_datawriter(&topic1, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let topic2 = participant2 .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let subscriber = participant2 .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic2, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; writer.write(&data1, None).unwrap(); writer.dispose(&data1, None).unwrap(); publisher.delete_datawriter(&writer).unwrap(); let start_loop_time = std::time::Instant::now(); let wait_for_disposed_timeout = std::time::Duration::from_secs(10); let samples = loop { if let Ok(samples) = reader.read(2, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) { if samples.len() == 2 { break samples; } } if start_loop_time.elapsed() > wait_for_disposed_timeout { panic!("Disposed sample not received within expected time.") } }; assert_eq!(samples.len(), 2); assert_eq!( samples[0].sample_info().instance_state, InstanceStateKind::NotAliveDisposed ); assert_eq!( samples[1].sample_info().instance_state, InstanceStateKind::NotAliveDisposed ); } #[test] fn write_read_sample_view_state() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant_factory = DomainParticipantFactory::get_instance(); let participant = participant_factory .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("OtherTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; writer.write(&data1, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); reader .read(1, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); let data1_2 = KeyedData { id: 1, value: 2 }; let data2 = KeyedData { id: 2, value: 1 }; writer.write(&data1_2, None).unwrap(); writer.write(&data2, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(1, 0)) .unwrap(); let samples = reader .read(2, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 2); assert_eq!(samples[0].sample_info().view_state, ViewStateKind::NotNew); assert_eq!(samples[1].sample_info().view_state, ViewStateKind::New); } #[test] fn inconsistent_topic_status_condition() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant_factory = DomainParticipantFactory::get_instance(); let participant = participant_factory .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let best_effort_topic_qos = TopicQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::BestEffort, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let topic_best_effort = participant .create_topic::( "Topic", "KeyedData", QosKind::Specific(best_effort_topic_qos), None, NO_STATUS, ) .unwrap(); let status_cond = topic_best_effort.get_statuscondition(); status_cond .set_enabled_statuses(&[StatusKind::InconsistentTopic]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(status_cond)) .unwrap(); let participant2 = participant_factory .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let reliable_topic_qos = TopicQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let _topic_reliable = participant2 .create_topic::( "Topic", "KeyedData", QosKind::Specific(reliable_topic_qos), None, NO_STATUS, ) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); assert!( topic_best_effort .get_inconsistent_topic_status() .unwrap() .total_count > 0 ); } #[test] fn reader_with_minimum_time_separation_qos() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, time_based_filter: TimeBasedFilterQosPolicy { minimum_separation: DurationKind::Finite(Duration::new(2, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let data1_1 = KeyedData { id: 1, value: 1 }; let data1_2 = KeyedData { id: 1, value: 2 }; let data1_3 = KeyedData { id: 1, value: 3 }; let data2_1 = KeyedData { id: 2, value: 10 }; let data2_2 = KeyedData { id: 2, value: 20 }; let data2_3 = KeyedData { id: 2, value: 30 }; writer .write_w_timestamp(&data1_1, None, Time::new(1, 0)) .unwrap(); writer .write_w_timestamp(&data1_2, None, Time::new(2, 0)) .unwrap(); writer .write_w_timestamp(&data1_3, None, Time::new(3, 0)) .unwrap(); writer .write_w_timestamp(&data2_1, None, Time::new(4, 0)) .unwrap(); writer .write_w_timestamp(&data2_2, None, Time::new(5, 0)) .unwrap(); writer .write_w_timestamp(&data2_3, None, Time::new(6, 0)) .unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples = reader .read(10, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 4); assert_eq!(samples[0].data().unwrap(), data1_1); assert_eq!(samples[1].data().unwrap(), data1_3); assert_eq!(samples[2].data().unwrap(), data2_1); assert_eq!(samples[3].data().unwrap(), data2_3); } #[test] fn transient_local_writer_reader_wait_for_historical_data() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::TransientLocal, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 2 }; writer.write(&data1, None).unwrap(); writer.write(&data2, None).unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::TransientLocal, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, time_based_filter: TimeBasedFilterQosPolicy { minimum_separation: DurationKind::Finite(Duration::new(2, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = reader.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::SubscriptionMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); reader .wait_for_historical_data(Duration::new(10, 0)) .unwrap(); let samples = reader .read(10, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 2); assert_eq!(samples[0].data().unwrap(), data1); assert_eq!(samples[1].data().unwrap(), data2); } #[test] fn volatile_writer_reader_receives_only_new_samples() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::Volatile, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let data1 = KeyedData { id: 1, value: 1 }; writer.write(&data1, None).unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::Volatile, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let data2 = KeyedData { id: 2, value: 10 }; writer.write(&data2, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples = reader .read(10, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 1); assert_eq!(samples[0].data().unwrap(), data2); } #[test] fn write_read_unkeyed_topic() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant_factory = DomainParticipantFactory::get_instance(); let participant = participant_factory .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::( "write_read_unkeyed_topic", "UserData", QosKind::Default, None, NO_STATUS, ) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); writer.write(&UserData(8), None).unwrap(); writer .wait_for_acknowledgments(Duration::new(5, 0)) .unwrap(); let samples = reader.read(1, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE); assert_eq!(samples.unwrap()[0].data().unwrap(), UserData(8)); } #[test] fn data_reader_resource_limits() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant_factory = DomainParticipantFactory::get_instance(); let participant = participant_factory .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::( "data_reader_resource_limits", "UserData", QosKind::Default, None, NO_STATUS, ) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let data_writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, ..Default::default() }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(data_writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, ..Default::default() }, resource_limits: ResourceLimitsQosPolicy { max_samples: Length::Limited(2), max_instances: Length::Unlimited, max_samples_per_instance: Length::Limited(2), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); writer.write(&UserData(1), None).unwrap(); writer.write(&UserData(2), None).unwrap(); writer.write(&UserData(3), None).unwrap(); let reader_cond = reader.get_statuscondition(); reader_cond .set_enabled_statuses(&[StatusKind::SampleRejected]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(reader_cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let samples = reader .read(3, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 2); } #[test] fn data_reader_order_by_source_timestamp() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant_factory = DomainParticipantFactory::get_instance(); let participant = participant_factory .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "UserData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let data_writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, destination_order: DestinationOrderQosPolicy { kind: DestinationOrderQosPolicyKind::BySourceTimestamp, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, ..Default::default() }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(data_writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, destination_order: DestinationOrderQosPolicy { kind: DestinationOrderQosPolicyKind::BySourceTimestamp, }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); writer .write_w_timestamp(&UserData(1), None, Time::new(30, 0)) .unwrap(); writer .write_w_timestamp(&UserData(2), None, Time::new(20, 0)) .unwrap(); writer .write_w_timestamp(&UserData(3), None, Time::new(10, 0)) .unwrap(); writer .wait_for_acknowledgments(Duration::new(1, 0)) .unwrap(); let samples = reader .read(3, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 3); assert_eq!(samples[0].data().unwrap(), UserData(3)); assert_eq!(samples[1].data().unwrap(), UserData(2)); assert_eq!(samples[2].data().unwrap(), UserData(1)); } #[test] fn data_reader_publication_handle_sample_info() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant_factory = DomainParticipantFactory::get_instance(); let participant = participant_factory .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "UserData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); writer.write(&UserData(1), None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples = reader .read(1, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 1); assert!(reader .get_matched_publication_data(samples[0].sample_info().publication_handle) .is_ok()); } #[test] fn volatile_writer_with_reader_new_reader_receives_only_new_samples() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::Volatile, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::Volatile, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let _reader = subscriber .create_datareader::( &topic, QosKind::Specific(reader_qos.clone()), None, NO_STATUS, ) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); writer.get_publication_matched_status().unwrap(); // To reset wait_set for subsequent calls let data1 = KeyedData { id: 1, value: 1 }; writer.write(&data1, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(3, 0)) .unwrap(); let reader_new = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); // Wait for writer to match reader wait_set.wait(Duration::new(10, 0)).unwrap(); std::thread::sleep(std::time::Duration::from_secs(1)); let data2 = KeyedData { id: 2, value: 10 }; writer.write(&data2, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples = reader_new .read(10, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 1); assert_eq!(samples[0].data().unwrap(), data2); } #[test] fn write_read_unregistered_samples_are_also_disposed() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant_factory = DomainParticipantFactory::get_instance(); let participant = participant_factory .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, writer_data_lifecycle: WriterDataLifecycleQosPolicy { autodispose_unregistered_instances: true, }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 1 }; writer.write(&data1, None).unwrap(); writer.unregister_instance(&data1, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples = reader .read(2, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 2); assert_eq!( samples[0].sample_info().instance_state, InstanceStateKind::NotAliveDisposed ); assert_eq!( samples[1].sample_info().instance_state, InstanceStateKind::NotAliveDisposed ); } #[test] fn transient_local_writer_does_not_deliver_lifespan_expired_data_at_write() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::TransientLocal, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, lifespan: LifespanQosPolicy { duration: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 2 }; writer .write_w_timestamp(&data1, None, Time::new(0, 0)) .unwrap(); writer .write_w_timestamp(&data2, None, Time::new(i32::MAX, 0)) .unwrap(); // Never stale sample let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::TransientLocal, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = reader.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::SubscriptionMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); reader .wait_for_historical_data(Duration::new(10, 0)) .unwrap(); let samples = reader .read(10, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 1); assert_eq!(samples[0].data().unwrap(), data2); } #[test] fn transient_local_writer_does_not_deliver_lifespan_expired_data_after_write() { const LIFESPAN_MS: u32 = 100; let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::TransientLocal, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, lifespan: LifespanQosPolicy { duration: DurationKind::Finite(Duration::new(0, LIFESPAN_MS * 1_000_000)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let data1 = KeyedData { id: 1, value: 1 }; let data2 = KeyedData { id: 2, value: 2 }; writer.write(&data1, None).unwrap(); writer .write_w_timestamp(&data2, None, Time::new(i32::MAX, 0)) .unwrap(); // Never stale sample std::thread::sleep(std::time::Duration::from_millis(LIFESPAN_MS as u64 * 2)); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { durability: DurabilityQosPolicy { kind: DurabilityQosPolicyKind::TransientLocal, }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = reader.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::SubscriptionMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); reader .wait_for_historical_data(Duration::new(10, 0)) .unwrap(); let samples = reader .read(10, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 1); assert_eq!(samples[0].data().unwrap(), data2); } #[test] fn reader_joining_after_writer_writes_many_samples() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer = publisher .create_datawriter(&topic, QosKind::Specific(writer_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); for value in 0..500 { let data = KeyedData { id: 1, value }; writer.write(&data, None).unwrap(); } let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let cond = writer.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(10, 0)).unwrap(); let new_data = KeyedData { id: 1, value: 1000 }; writer.write(&new_data, None).unwrap(); writer .wait_for_acknowledgments(Duration::new(20, 0)) .unwrap(); let samples = reader .take(5, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 1); assert_eq!(samples[0].data().unwrap(), new_data); } #[test] fn reader_with_exclusive_ownership_should_not_read_samples_from_second_weaker_writer() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer1_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ownership: OwnershipQosPolicy { kind: OwnershipQosPolicyKind::Exclusive, }, ownership_strength: OwnershipStrengthQosPolicy { value: 10 }, ..Default::default() }; let writer1 = publisher .create_datawriter(&topic, QosKind::Specific(writer1_qos), None, NO_STATUS) .unwrap(); let writer2_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ownership: OwnershipQosPolicy { kind: OwnershipQosPolicyKind::Exclusive, }, ownership_strength: OwnershipStrengthQosPolicy { value: 1 }, ..Default::default() }; let writer2 = publisher .create_datawriter(&topic, QosKind::Specific(writer2_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, ownership: OwnershipQosPolicy { kind: OwnershipQosPolicyKind::Exclusive, }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let start_time = std::time::Instant::now(); while std::time::Instant::now().duration_since(start_time) < std::time::Duration::from_secs(10) { if reader.get_matched_publications().unwrap().len() >= 2 { break; } } assert_eq!( reader.get_matched_publications().unwrap().len(), 2, "Reader must have 2 matched writers" ); let cond = writer1.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let cond = writer2.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 10 }; writer1.write(&data1, None).unwrap(); writer1 .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let data2 = KeyedData { id: 1, value: 20 }; writer2.write(&data2, None).unwrap(); writer2 .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples = reader .read(10, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 1); assert_eq!(samples[0].data().unwrap(), data1); } #[test] fn reader_with_exclusive_ownership_should_read_samples_from_second_writer_with_higher_ownership_strength( ) { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer1_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ownership: OwnershipQosPolicy { kind: OwnershipQosPolicyKind::Exclusive, }, ownership_strength: OwnershipStrengthQosPolicy { value: 1 }, ..Default::default() }; let writer1 = publisher .create_datawriter(&topic, QosKind::Specific(writer1_qos), None, NO_STATUS) .unwrap(); let writer2_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ownership: OwnershipQosPolicy { kind: OwnershipQosPolicyKind::Exclusive, }, ownership_strength: OwnershipStrengthQosPolicy { value: 10 }, ..Default::default() }; let writer2 = publisher .create_datawriter(&topic, QosKind::Specific(writer2_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, ownership: OwnershipQosPolicy { kind: OwnershipQosPolicyKind::Exclusive, }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let start_time = std::time::Instant::now(); while std::time::Instant::now().duration_since(start_time) < std::time::Duration::from_secs(10) { if reader.get_matched_publications().unwrap().len() >= 2 { break; } } assert_eq!( reader.get_matched_publications().unwrap().len(), 2, "Reader must have 2 matched writers" ); let cond = writer1.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let cond = writer2.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 10 }; writer1.write(&data1, None).unwrap(); writer1 .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let data2 = KeyedData { id: 1, value: 20 }; writer2.write(&data2, None).unwrap(); writer2 .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples = reader .read(10, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 2); assert_eq!(samples[0].data().unwrap(), data1); assert_eq!(samples[1].data().unwrap(), data2); } #[test] fn reader_with_exclusive_ownership_should_read_samples_from_second_writer_after_deadline_missed() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer1_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ownership: OwnershipQosPolicy { kind: OwnershipQosPolicyKind::Exclusive, }, ownership_strength: OwnershipStrengthQosPolicy { value: 10 }, deadline: DeadlineQosPolicy { period: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer1 = publisher .create_datawriter(&topic, QosKind::Specific(writer1_qos), None, NO_STATUS) .unwrap(); let writer2_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ownership: OwnershipQosPolicy { kind: OwnershipQosPolicyKind::Exclusive, }, ownership_strength: OwnershipStrengthQosPolicy { value: 1 }, deadline: DeadlineQosPolicy { period: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let writer2 = publisher .create_datawriter(&topic, QosKind::Specific(writer2_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, ownership: OwnershipQosPolicy { kind: OwnershipQosPolicyKind::Exclusive, }, deadline: DeadlineQosPolicy { period: DurationKind::Finite(Duration::new(1, 0)), }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let start_time = std::time::Instant::now(); while std::time::Instant::now().duration_since(start_time) < std::time::Duration::from_secs(10) { if reader.get_matched_publications().unwrap().len() >= 2 { break; } } assert_eq!( reader.get_matched_publications().unwrap().len(), 2, "Reader must have 2 matched writers" ); let cond = writer1.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let cond = writer2.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 10 }; writer1.write(&data1, None).unwrap(); writer1 .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let cond = reader.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::RequestedDeadlineMissed]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let data2 = KeyedData { id: 1, value: 20 }; writer2.write(&data2, None).unwrap(); writer2 .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples = reader .read(10, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 2); assert_eq!(samples[0].data().unwrap(), data1); assert_eq!(samples[1].data().unwrap(), data2); } #[test] fn reader_with_exclusive_ownership_should_read_samples_from_second_weaker_writer_after_unregister() { let domain_id = TEST_DOMAIN_ID_GENERATOR.generate_unique_domain_id(); let participant = DomainParticipantFactory::get_instance() .create_participant(domain_id, QosKind::Default, None, NO_STATUS) .unwrap(); let topic = participant .create_topic::("MyTopic", "KeyedData", QosKind::Default, None, NO_STATUS) .unwrap(); let publisher = participant .create_publisher(QosKind::Default, None, NO_STATUS) .unwrap(); let writer1_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ownership: OwnershipQosPolicy { kind: OwnershipQosPolicyKind::Exclusive, }, ownership_strength: OwnershipStrengthQosPolicy { value: 10 }, ..Default::default() }; let writer1 = publisher .create_datawriter(&topic, QosKind::Specific(writer1_qos), None, NO_STATUS) .unwrap(); let writer2_qos = DataWriterQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, ownership: OwnershipQosPolicy { kind: OwnershipQosPolicyKind::Exclusive, }, ownership_strength: OwnershipStrengthQosPolicy { value: 1 }, ..Default::default() }; let writer2 = publisher .create_datawriter(&topic, QosKind::Specific(writer2_qos), None, NO_STATUS) .unwrap(); let subscriber = participant .create_subscriber(QosKind::Default, None, NO_STATUS) .unwrap(); let reader_qos = DataReaderQos { reliability: ReliabilityQosPolicy { kind: ReliabilityQosPolicyKind::Reliable, max_blocking_time: DurationKind::Finite(Duration::new(1, 0)), }, history: HistoryQosPolicy { kind: HistoryQosPolicyKind::KeepAll, }, ownership: OwnershipQosPolicy { kind: OwnershipQosPolicyKind::Exclusive, }, ..Default::default() }; let reader = subscriber .create_datareader::(&topic, QosKind::Specific(reader_qos), None, NO_STATUS) .unwrap(); let start_time = std::time::Instant::now(); while std::time::Instant::now().duration_since(start_time) < std::time::Duration::from_secs(10) { if reader.get_matched_publications().unwrap().len() >= 2 { break; } } assert_eq!( reader.get_matched_publications().unwrap().len(), 2, "Reader must have 2 matched writers" ); let cond = writer1.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let cond = writer2.get_statuscondition(); cond.set_enabled_statuses(&[StatusKind::PublicationMatched]) .unwrap(); let mut wait_set = WaitSet::new(); wait_set .attach_condition(Condition::StatusCondition(cond)) .unwrap(); wait_set.wait(Duration::new(5, 0)).unwrap(); let data1 = KeyedData { id: 1, value: 10 }; writer1.write(&data1, None).unwrap(); writer1 .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); writer1.unregister_instance(&data1, None).unwrap(); writer1 .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let data2 = KeyedData { id: 1, value: 20 }; writer2.write(&data2, None).unwrap(); writer2 .wait_for_acknowledgments(Duration::new(10, 0)) .unwrap(); let samples = reader .read(10, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE) .unwrap(); assert_eq!(samples.len(), 3); assert_eq!(samples[0].data().unwrap(), data1); assert_eq!(samples[2].data().unwrap(), data2); }