// Copyright 2020-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #[test] fn drain() { let s = nats_server::run_basic_server(); let nc = nats::connect(s.client_url()).unwrap(); let sub = nc.subscribe("test").unwrap(); for _ in 0..10000 { nc.publish("test", b"foo").unwrap(); } let mut i = 0; // first drain sub.drain().unwrap(); // then read messages for _msg in sub.iter() { i += 1; } assert_eq!(10000, i); // check if we do not get further messages when publishing to drained subscription. nc.publish("test", b"ipsum").unwrap(); assert!(sub.next().is_none()); sub.unsubscribe().unwrap(); }