use std::{io, thread, time}; mod utils; fn next_message_starting_at( sc: &stan::Client, subject: &str, start: stan::SubscriptionStart, ) -> io::Result { let sub = sc.subscribe( subject, stan::SubscriptionConfig { start, ..Default::default() }, )?; utils::next_message(&sub) } #[test] fn test_subscription_start() -> io::Result<()> { let server = utils::server()?; let nats_url = &format!("localhost:{}", server.port); let nc = nats::connect(nats_url)?; let subject = "foo"; let sc = stan::connect(nc, "test-cluster", "rust-client-1")?; sc.publish(subject, "hello from rust 1")?; let from_time_start = time::SystemTime::now(); thread::sleep(time::Duration::from_millis(50)); sc.publish(subject, "hello from rust 2")?; thread::sleep(time::Duration::from_millis(50)); sc.publish(subject, "hello from rust 3")?; sc.publish(subject, "hello from rust 4")?; sc.publish(subject, "hello from rust 5")?; assert_eq!( "hello from rust 1", next_message_starting_at(&sc, subject, stan::SubscriptionStart::AllAvailable)?, ); assert_eq!( "hello from rust 2", next_message_starting_at( &sc, subject, stan::SubscriptionStart::FromTimestamp(from_time_start) )? ); assert_eq!( "hello from rust 3", next_message_starting_at( &sc, subject, stan::SubscriptionStart::FromPast(time::Duration::from_millis(50)) )?, ); assert_eq!( "hello from rust 4", next_message_starting_at(&sc, subject, stan::SubscriptionStart::FromSequence(4))?, ); assert_eq!( "hello from rust 5", next_message_starting_at(&sc, subject, stan::SubscriptionStart::LastReceived)? ); let new_only_sub = sc.subscribe( subject, stan::SubscriptionConfig { start: stan::SubscriptionStart::NewOnly, ..Default::default() }, )?; assert_eq!( Err(io::ErrorKind::TimedOut), utils::next_message(&new_only_sub).map_err(|e| e.kind()) ); sc.publish(subject, "hello from rust 6")?; assert_eq!("hello from rust 6", utils::next_message(&new_only_sub)?); Ok(()) }