use crate::session::session::SessionError; use crate::session::RUNTIME; use antimatter_api::apis::configuration; use antimatter_api::apis::internal_api::{self as api}; use antimatter_api::models::*; use lru::LruCache; use std::cmp::max; use std::mem; use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; use tokio::task::JoinHandle; type InvokeFunc = Arc< dyn Fn( configuration::Configuration, String, String, String, CapsuleSealRequest, ) -> Result<(), SessionError> + Send + Sync, >; // invoke_request is a wrapper for the domain seal capsule. By returning a // boxed future, we can store a reference to it that can be checked later for // completion. fn invoke_request( configuration: configuration::Configuration, domain_id: String, id: String, create_token: String, seal_request: CapsuleSealRequest, ) -> Result<(), SessionError> { let c = configuration.clone(); RUNTIME .block_on(api::domain_seal_capsule( &c, &domain_id, &id, &create_token, seal_request, )) .map_err(|e| SessionError::APIError(format!("invoke seal error: {:?}", e))) } #[derive(Hash, Eq, PartialEq, Clone, Debug)] struct SealKey { domain_id: String, id: String, create_token: String, } pub struct SealCache { cache: LruCache, seal_invoker: InvokeFunc, enabled: bool, size: usize, } impl SealCache { pub fn new(size: usize, is_async: bool) -> Self { let cache_size = NonZeroUsize::new(size).unwrap_or_else(|| NonZeroUsize::new(100).unwrap()); Self { cache: LruCache::new(cache_size), seal_invoker: Arc::new(invoke_request), enabled: is_async, size, } } pub fn seal( &mut self, configuration: &configuration::Configuration, domain_id: String, id: String, create_token: String, request: &mut CapsuleSealRequest, async_seal: bool, ) -> Result<(), SessionError> { let key = SealKey { domain_id: domain_id.to_string(), id: id.to_string(), create_token: create_token.to_string(), }; let buff = match self.cache.get_mut(&key) { None => { let new_buff = BufferedSealInternal::new( configuration.clone(), domain_id.clone(), id.clone(), create_token.clone(), self.seal_invoker.clone(), ); self.cache.push(key.clone(), new_buff); self.cache.get_mut(&key).unwrap() } Some(v) => v, }; if !self.enabled || !async_seal { buff.seal( &mut request.span_tags.unique_tags, &mut request.span_tags.elided_tags, &mut request.capsule_tags, request.size, request.rows, ) } else { buff.seal_async( &mut request.span_tags.unique_tags, &mut request.span_tags.elided_tags, &mut request.capsule_tags, request.size, request.rows, ) } } pub fn update_tags_for_seal( &mut self, configuration: &configuration::Configuration, domain_id: String, id: String, create_token: String, request: &mut CapsuleSealRequest, ) -> Result<(), SessionError> { let key = SealKey { domain_id: domain_id.to_string(), id: id.to_string(), create_token: create_token.to_string(), }; let buff = match self.cache.get_mut(&key) { None => { let new_buff = BufferedSealInternal::new( configuration.clone(), domain_id.clone(), id.clone(), create_token.clone(), self.seal_invoker.clone(), ); self.cache.push(key.clone(), new_buff); self.cache.get_mut(&key).unwrap() } Some(v) => v, }; buff.update_tags_for_seal( &mut request.span_tags.unique_tags, &mut request.span_tags.elided_tags, &mut request.capsule_tags, request.size, request.rows, ); Ok(()) } pub fn size(&self) -> usize { self.size } pub fn enabled(&self) -> bool { self.enabled } } struct BufferedSealInternal { configuration: configuration::Configuration, domain_id: String, id: String, create_token: String, active_request: Option>>, pub seal_invoker: InvokeFunc, request: Arc>>, first: bool, } impl Drop for BufferedSealInternal { fn drop(&mut self) { match self.complete() { Ok(_) => {} // TODO: this happens during the cleanup phase, i dont know how to propagate the error out Err(e) => { println!("error handling tail seal event: {:}", e) } } } } impl BufferedSealInternal { pub fn new( configuration: configuration::Configuration, domain_id: String, id: String, create_token: String, seal_invoker: InvokeFunc, ) -> Self { Self { configuration, domain_id, id, create_token, active_request: None, seal_invoker, request: Arc::new(Mutex::new(None)), first: true, } } fn request_in_flight(&mut self) -> Result { if self.active_request.is_some() { if self.active_request.as_ref().unwrap().is_finished() { RUNTIME.block_on(async { match self.active_request.as_mut().unwrap().await { Ok(Ok(())) => { self.active_request = None; Ok(false) } Ok(Err(e)) => Err(e), Err(e) => Err(SessionError::Error(format!( "error joining active request task: {}", e ))), } }) } else { Ok(true) } } else { Ok(false) } } // Update adds the supplied tags to the current capsule request body. pub fn update_tags_for_seal( &mut self, unique: &mut Vec, elided: &mut Vec, capsule_tags: &mut Vec, size: i64, rows: i64, ) { let mut request = self.request.lock().unwrap(); if request.is_none() { *request = Some(CapsuleSealRequest { capsule_tags: mem::take(&mut *capsule_tags), span_tags: Box::new(TagSummary { unique_tags: mem::take(&mut *unique), elided_tags: mem::take(&mut *elided), }), size, rows, }) } else { let req = request.as_mut().unwrap(); merge_unique_tags(&mut req.span_tags.unique_tags, unique); merge_elided_tags(&mut req.span_tags.elided_tags, elided); req.capsule_tags.append(capsule_tags); req.size += size; req.rows += rows; } } // Invoke seal for the current request as a blocking event pub fn seal( &mut self, unique: &mut Vec, elided: &mut Vec, capsule_tags: &mut Vec, size: i64, rows: i64, ) -> Result<(), SessionError> { self.update_tags_for_seal(unique, elided, capsule_tags, size, rows); let request = self.request.lock().unwrap().take().unwrap(); (self.seal_invoker)( self.configuration.clone(), self.domain_id.clone(), self.id.clone(), self.create_token.clone(), request, ) } // seal_async will issue a seal request for the provided tags if no request is in flight. // If a request is in flight, these tags are added to the next request to be sent. pub fn seal_async( &mut self, unique: &mut Vec, elided: &mut Vec, capsule_tags: &mut Vec, size: i64, rows: i64, ) -> Result<(), SessionError> { self.update_tags_for_seal(unique, elided, capsule_tags, size, rows); if !self.request_in_flight()? { let request = self.request.lock().unwrap().take().unwrap(); if self.first { // first seal is sync (self.seal_invoker)( self.configuration.clone(), self.domain_id.clone(), self.id.clone(), self.create_token.clone(), request, )?; } else { // subsequent seals are async let config = self.configuration.clone(); let domain_id = self.domain_id.clone(); let id = self.id.clone(); let create_token = self.create_token.clone(); let invoker = self.seal_invoker.clone(); let request_clone = self.request.clone(); self.active_request = Some(tokio::spawn(async move { // send the current request (invoker)( config.clone(), domain_id.clone(), id.clone(), create_token.clone(), request, )?; // if a new request is ready, send that too if let Some(next_request) = request_clone.lock().unwrap().take() { (invoker)(config, domain_id, id, create_token, next_request)?; } Ok(()) })); } } Ok(()) } pub fn complete(&mut self) -> Result<(), SessionError> { if self.active_request.is_some() { RUNTIME.block_on(async { self.active_request.as_mut().unwrap().await.unwrap() })?; } self.active_request = None; let request = self.request.lock().unwrap().take(); if request.is_some() { (self.seal_invoker)( self.configuration.clone(), self.domain_id.clone(), self.id.clone(), self.create_token.clone(), request.unwrap(), )?; } Ok(()) } } // merge_unique_tags merges b into a fn merge_unique_tags(a: &mut Vec, b: &Vec) { for bi in b { if let Some(ai) = a.iter_mut().find(|ai| ai.tag == bi.tag) { ai.occurrences += bi.occurrences; } else { a.push(bi.clone()); } } } // merge_elided_tags merges b into a fn merge_elided_tags(a: &mut Vec, b: &Vec) { for bi in b { if let Some(ai) = a.iter_mut().find(|ai| ai.tag_name == bi.tag_name) { // This is wrong but we no longer have information on the unique tags. // The best we can do is report the largest set. ai.num_unique_tags = max(ai.num_unique_tags, bi.num_unique_tags); ai.total_occurrences += bi.total_occurrences; } else { a.push(bi.clone()); } } } #[cfg(test)] mod tests { use super::*; // Mock implementation of `invoke_request` function for testing. fn mock_invoke_request( _configuration: configuration::Configuration, _domain_id: String, _id: String, _create_token: String, _seal_request: CapsuleSealRequest, ) -> Result<(), SessionError> { Ok(()) } fn dummy_tag() -> TagSummaryUniqueTagsInner { TagSummaryUniqueTagsInner { tag: Box::new(Tag { name: "dummy_tag".to_string(), value: "value".to_string(), r#type: Default::default(), source: "source".to_string(), hook_version: Some("1.0.0".to_string()), }), occurrences: 10, } } fn seal_request() -> CapsuleSealRequest { CapsuleSealRequest { capsule_tags: vec![], span_tags: Box::new(TagSummary { unique_tags: vec![], elided_tags: vec![], }), size: 0, rows: 0, } } #[test] fn test_buffered_seal() { let configuration = configuration::Configuration::default(); let domain_id = "test_domain".to_string(); let id = "test_id".to_string(); let create_token = "test_token".to_string(); let mut buffered_seal = BufferedSealInternal::new( configuration, domain_id, id, create_token, Arc::new(mock_invoke_request), ); // Simulate adding data to be sealed let mut unique_tags = vec![dummy_tag()]; let mut elided_tags = vec![]; let mut capsule_tags: Vec = vec![]; let size = 100; let rows = 10; buffered_seal .seal_async( &mut unique_tags, &mut elided_tags, &mut capsule_tags, size, rows, ) .unwrap(); let complete = buffered_seal.complete(); assert!(complete.is_ok()); assert!(buffered_seal.active_request.is_none()); } #[test] fn test_caching_enabled() { let configuration = configuration::Configuration::default(); let domain_id = "domain1".to_string(); let id = "id1".to_string(); let create_token = "token1".to_string(); let mut request = seal_request(); let mut seal_cache = SealCache::new(10, true); seal_cache.seal_invoker = Arc::new(mock_invoke_request); assert!(seal_cache .seal( &configuration, domain_id.clone(), id.clone(), create_token.clone(), &mut request, true ) .is_ok()); assert!(seal_cache .seal( &configuration, domain_id, id, create_token, &mut request, true ) .is_ok()); assert_eq!(seal_cache.cache.len(), 1); } #[test] fn test_caching_disabled() { let configuration = configuration::Configuration::default(); let domain_id = "domain2".to_string(); let id = "id2".to_string(); let create_token = "token2".to_string(); let mut request = seal_request(); let mut seal_cache = SealCache::new(0, false); seal_cache.seal_invoker = Arc::new(mock_invoke_request); assert!(seal_cache .seal( &configuration, domain_id, id, create_token, &mut request, true ) .is_ok()); assert_eq!(seal_cache.enabled, false); } #[test] fn merge_into_empty() { let mut a: Vec = vec![]; let mut b = vec![TagSummaryUniqueTagsInner { tag: Box::new(Tag { name: "tag1".to_string(), value: "value1".to_string(), r#type: Default::default(), source: "source1".to_string(), hook_version: Some("1.0.0".to_string()), }), occurrences: 2, }]; merge_unique_tags(&mut a, &mut b); assert_eq!(a.len(), 1); assert_eq!(a[0].occurrences, 2); assert_eq!(a[0].tag.name, "tag1"); } #[test] fn merge_with_no_overlap() { let mut a = vec![TagSummaryUniqueTagsInner { tag: Box::new(Tag { name: "tag1".to_string(), value: "value1".to_string(), r#type: Default::default(), source: "source1".to_string(), hook_version: Some("1.0.0".to_string()), }), occurrences: 2, }]; let mut b = vec![ TagSummaryUniqueTagsInner { // tag has same name but different source so should be treated as a separate tag tag: Box::new(Tag { name: "tag1".to_string(), value: "value1".to_string(), r#type: Default::default(), source: "source2".to_string(), hook_version: Some("1.0.0".to_string()), }), occurrences: 3, }, TagSummaryUniqueTagsInner { tag: Box::new(Tag { name: "tag2".to_string(), value: "value2".to_string(), r#type: Default::default(), source: "source2".to_string(), hook_version: Some("1.0.0".to_string()), }), occurrences: 3, }, ]; merge_unique_tags(&mut a, &mut b); assert_eq!(a.len(), 3); assert_eq!(a[1].occurrences, 3); assert_eq!(a[1].tag.name, "tag1"); } #[test] fn merge_with_overlap() { let mut a = vec![TagSummaryUniqueTagsInner { tag: Box::new(Tag { name: "tag1".to_string(), value: "value1".to_string(), r#type: Default::default(), source: "source1".to_string(), hook_version: Some("1.0.0".to_string()), }), occurrences: 2, }]; let mut b = vec![ TagSummaryUniqueTagsInner { tag: Box::new(Tag { name: "tag1".to_string(), value: "value1_different".to_string(), r#type: Default::default(), source: "source1_different".to_string(), hook_version: Some("1.0.1".to_string()), }), occurrences: 3, }, TagSummaryUniqueTagsInner { tag: Box::new(Tag { name: "tag1".to_string(), value: "value1".to_string(), r#type: Default::default(), source: "source1".to_string(), hook_version: Some("1.0.0".to_string()), }), occurrences: 5, }, ]; merge_unique_tags(&mut a, &mut b); assert_eq!(a.len(), 2, "One tag should be merged, the other appended"); assert_eq!(a[0].occurrences, 7, "Occurrences should be summed"); } #[test] fn merge_elided_into_empty() { let mut a: Vec = vec![]; let mut b = vec![TagSummaryElidedTagsInner { tag_name: "tag1".to_string(), num_unique_tags: 2, total_occurrences: 5, }]; merge_elided_tags(&mut a, &mut b); assert_eq!(a.len(), 1); assert_eq!(a[0].num_unique_tags, 2); assert_eq!(a[0].total_occurrences, 5); assert_eq!(a[0].tag_name, "tag1"); } #[test] fn merge_elided_with_no_overlap() { let mut a = vec![TagSummaryElidedTagsInner { tag_name: "tag1".to_string(), num_unique_tags: 2, total_occurrences: 5, }]; let mut b = vec![TagSummaryElidedTagsInner { tag_name: "tag2".to_string(), num_unique_tags: 3, total_occurrences: 7, }]; merge_elided_tags(&mut a, &mut b); assert_eq!(a.len(), 2); assert_eq!(a[1].num_unique_tags, 3); assert_eq!(a[1].total_occurrences, 7); assert_eq!(a[1].tag_name, "tag2"); } #[test] fn merge_elided_with_overlap() { let mut a = vec![TagSummaryElidedTagsInner { tag_name: "tag1".to_string(), num_unique_tags: 2, total_occurrences: 5, }]; let mut b = vec![TagSummaryElidedTagsInner { tag_name: "tag1".to_string(), num_unique_tags: 1, // Lower number of unique tags total_occurrences: 3, }]; merge_elided_tags(&mut a, &mut b); assert_eq!(a.len(), 1, "Tags should be merged, not appended"); assert_eq!( a[0].num_unique_tags, 2, "Num unique tags should report the largest set" ); assert_eq!( a[0].total_occurrences, 8, "Total occurrences should be summed" ); } }