diff --git a/src/session.rs b/src/session.rs index 8efd302..f065af5 100644 --- a/src/session.rs +++ b/src/session.rs @@ -112,7 +112,8 @@ impl Session { let send_side_bwe = SendSideBandwithEstimator::new(rate); let bwe = Bwe { bwe: send_side_bwe, - desired_bitrate: Bitrate::ZERO, + local_desired_bitrate: Bitrate::ZERO, + remote_desired_bitrate: None, current_bitrate: rate, last_emitted_estimate: Bitrate::ZERO, @@ -548,6 +549,13 @@ impl Session { return Some(Event::KeyframeRequest(req)); } + if let Some(bitrate) = self.streams.poll_remb_request() { + if let Some(bwe) = self.bwe.as_mut() { + bwe.remote_desired_bitrate = Some(bitrate); + self.configure_pacer(); + } + } + for media in &mut self.medias { if media.need_open_event { media.need_open_event = false; @@ -781,7 +789,7 @@ impl Session { pub fn set_bwe_desired_bitrate(&mut self, desired_bitrate: Bitrate) { if let Some(bwe) = self.bwe.as_mut() { - bwe.desired_bitrate = desired_bitrate; + bwe.local_desired_bitrate = desired_bitrate; self.configure_pacer(); } } @@ -810,7 +818,7 @@ impl Session { let padding_rate = bwe .last_estimate() - .map(|estimate| estimate.min(bwe.desired_bitrate)) + .map(|estimate| estimate.min(bwe.desired_bitrate())) .unwrap_or(Bitrate::ZERO); self.pacer.set_padding_rate(padding_rate); @@ -877,7 +885,8 @@ impl Session { struct Bwe { bwe: SendSideBandwithEstimator, - desired_bitrate: Bitrate, + local_desired_bitrate: Bitrate, + remote_desired_bitrate: Option, current_bitrate: Bitrate, last_emitted_estimate: Bitrate, @@ -918,6 +927,14 @@ impl Bwe { fn last_estimate(&self) -> Option { self.bwe.last_estimate() } + + fn desired_bitrate(&self) -> Bitrate { + if let Some(remote) = self.remote_desired_bitrate { + remote.min(self.local_desired_bitrate) + } else { + self.local_desired_bitrate + } + } } pub struct PacketReceipt { diff --git a/src/streams/mod.rs b/src/streams/mod.rs index ccae4f4..c4095a7 100644 --- a/src/streams/mod.rs +++ b/src/streams/mod.rs @@ -6,8 +6,8 @@ use std::time::Instant; use crate::format::CodecConfig; use crate::format::PayloadParams; use crate::media::{KeyframeRequest, Media}; -use crate::rtp_::Pt; use crate::rtp_::Ssrc; +use crate::rtp_::{Bitrate, Pt}; use crate::rtp_::{MediaTime, SenderInfo}; use crate::rtp_::{Mid, Rid, SeqNo}; use crate::rtp_::{Rtcp, RtpHeader}; @@ -461,6 +461,12 @@ impl Streams { }) } + pub(crate) fn poll_remb_request(&mut self) -> Option { + self.streams_tx + .values_mut() + .find_map(|s| s.poll_remb_request()) + } + pub(crate) fn poll_stream_paused(&mut self) -> Option { self.streams_rx.values_mut().find_map(|s| s.poll_paused()) } diff --git a/src/streams/send.rs b/src/streams/send.rs index 38df82c..b92fe9f 100644 --- a/src/streams/send.rs +++ b/src/streams/send.rs @@ -13,6 +13,7 @@ use crate::media::MediaKind; use crate::packet::QueuePriority; use crate::packet::QueueSnapshot; use crate::packet::QueueState; +use crate::rtp_::Bitrate; use crate::rtp_::{extend_u16, Descriptions, ReportList, Rtcp}; use crate::rtp_::{ExtensionMap, ReceptionReport, RtpHeader}; use crate::rtp_::{ExtensionValues, MediaTime, Mid, NackEntry}; @@ -107,6 +108,9 @@ pub struct StreamTx { /// If we have a pending incoming keyframe request. pending_request_keyframe: Option, + /// If we have a pending incoming remb request. + pending_request_remb: Option, + /// Statistics of outgoing data. stats: StreamTxStats, @@ -172,6 +176,7 @@ impl StreamTx { rtx_cache: RtxCache::new(2000, DEFAULT_RTX_CACHE_DURATION), last_sender_report: already_happened(), pending_request_keyframe: None, + pending_request_remb: None, stats: StreamTxStats::default(), rtx_ratio: (0.0, already_happened()), } @@ -644,6 +649,10 @@ impl StreamTx { self.pending_request_keyframe.take() } + pub(crate) fn poll_remb_request(&mut self) -> Option { + self.pending_request_remb.take() + } + pub(crate) fn handle_rtcp(&mut self, now: Instant, fb: RtcpFb) { use RtcpFb::*; match fb { @@ -661,6 +670,9 @@ impl StreamTx { self.stats.increase_firs(); self.pending_request_keyframe = Some(KeyframeRequestKind::Fir); } + Remb(r) => { + self.pending_request_remb = Some(Bitrate::from(r.bitrate as f64)); + } Twcc(_) => unreachable!("TWCC should be handled on session level"), _ => {} }