use std::{time::{Instant, Duration}, sync::{RwLock, Arc, Mutex}, collections::HashSet, cell::{RefCell, Cell}, thread}; use indexmap::IndexMap; use ringbuffer::{ConstGenericRingBuffer, RingBufferWrite, RingBufferRead}; use rosc::{OscPacket}; use crate::{osc_receiver::{UdpReceiver, RoscReceiver}, cursor::{Cursor}, object::Object, blob::Blob, errors::{TuioError, OscReceiverError}, osc_encode_decode::{OscDecoder, DecodeOsc, self, Set}}; #[derive(Default)] pub struct TuioEvents { pub cursor_events: Vec, pub object_events: Vec, pub blob_events: Vec, } pub struct CursorData { pub source_name: String, pub cursor: Cursor } pub struct ObjectData { pub source_name: String, pub object: Object } pub struct BlobData { pub source_name: String, pub blob: Blob } pub enum CursorEvent { New(CursorData), Update(CursorData), Remove(CursorData), } pub enum ObjectEvent { New(ObjectData), Update(ObjectData), Remove(ObjectData), } pub enum BlobEvent { New(BlobData), Update(BlobData), Remove(BlobData), } #[derive(Default)] pub struct SourceCollection { pub object_map: IndexMap, pub blob_map: IndexMap, pub cursor_map: IndexMap } pub struct Client { current_frame: Cell, instant: Instant, current_time: Cell, pub source_list: RefCell>, osc_receivers: Vec>, packet_buffer: Arc>>, local_receiver: bool, listen: Arc> } /// Keeps the entries whose keys are contained in a [HashSet] /// /// Returns a [Vec] of removed ids /// /// # Arguments /// * `index_map` - an [IndexMap] to filter /// * `to_keep` - an [HashSet] containing the keys to retain fn retain_by_ids(index_map: &mut IndexMap, to_keep: HashSet) -> Vec { let mut removed: Vec = Vec::with_capacity(index_map.len()); let mut to_remove: Vec = Vec::with_capacity(index_map.len()); for id in index_map.keys() { if !to_keep.contains(id) { to_remove.push(*id); } } for id in to_remove { removed.push(index_map.remove(&id).unwrap()); } removed } impl Client { pub fn new() -> Result { Self::from_port(3333) } pub fn from_port(port: u16) -> Result { Ok(Self { instant: Instant::now(), osc_receivers: vec![Arc::new(UdpReceiver::from_port(port)?)], current_frame: (-1).into(), current_time: Cell::new(Duration::default()), source_list: RefCell::new(IndexMap::new()), local_receiver: true, listen: Arc::new(RwLock::new(false)), packet_buffer: Default::default() }) } pub fn connect(&self) -> Result<(), OscReceiverError> { if *self.listen.read().unwrap() { return Err(OscReceiverError::AlreadyConnected()); } *self.listen.write().unwrap() = true; for receiver in &self.osc_receivers { receiver.connect().map_err(OscReceiverError::Connect)?; let listen = Arc::clone(&self.listen); let receiver = Arc::clone(receiver); let buffer = Arc::clone(&self.packet_buffer); thread::spawn(move || loop { if !*listen.read().unwrap() { break; } match receiver.recv() { Ok(packet) => { buffer.lock().unwrap().push(packet); } Err(err) => { match err { OscReceiverError::Receive(err) => if err.raw_os_error().unwrap() != 10004 { println!("Error receiving from socket: {}", err); }, _ => println!("Error receiving from socket: {}", err) } break; } } }); } Ok(()) } pub fn disconnect(&self) { for receiver in &self.osc_receivers { receiver.disconnect(); } } /// Refreshes the client's state /// /// On success, returns an [Option] containing the evnts of all new, updated and removed TUIO inputs pub fn refresh(&self) -> Result, TuioError> { let mut updated = false; let mut events = TuioEvents::default(); for packet in self.packet_buffer.lock().unwrap().drain() { if self.process_osc_packet(packet, &mut events)? { updated = true; } }; if updated { Ok(Some(events)) } else { Ok(None) } } /// Update frame parameters based on a frame number /// /// Returns true if the frame is a new frame /// # Argument /// * `frame` - the new frame number fn update_frame(&self, frame: i32) -> bool { if frame >= 0 { let current_frame = self.current_frame.get(); if frame > current_frame { self.current_time.set(self.instant.elapsed()); } if frame >= current_frame || current_frame - frame > 100 { self.current_frame.set(frame); return true; } else if self.instant.elapsed() - self.current_time.get() > Duration::from_millis(100){ self.current_time.set(self.instant.elapsed()); return false; } } false } fn process_osc_packet(&self, packet: OscPacket, events: &mut TuioEvents) -> Result { if let OscPacket::Bundle(bundle) = packet { let decoded_bundle = OscDecoder::decode_bundle(bundle)?; let to_keep: HashSet = HashSet::from_iter(decoded_bundle.alive); if self.update_frame(decoded_bundle.fseq) { let mut source_list = self.source_list.borrow_mut(); let source_collection = source_list.entry(decoded_bundle.source.clone()).or_default(); match decoded_bundle.tuio_type { osc_encode_decode::TuioBundleType::Cursor => { let cursor_map = &mut source_collection.cursor_map; for cursor in retain_by_ids(cursor_map, to_keep).into_iter() { events.cursor_events.push(CursorEvent::Remove(CursorData{source_name: decoded_bundle.source.clone(), cursor: cursor.clone()})); } if let Some(Set::Cursor(cursor_collection)) = decoded_bundle.set { for cursor in cursor_collection { match cursor_map.entry(cursor.get_session_id()) { indexmap::map::Entry::Occupied(mut entry) => { events.cursor_events.push(CursorEvent::Update(CursorData{source_name: decoded_bundle.source.clone(), cursor: cursor.clone()})); entry.insert(cursor); }, indexmap::map::Entry::Vacant(entry) => { events.cursor_events.push(CursorEvent::New(CursorData{source_name: decoded_bundle.source.clone(), cursor: cursor.clone()})); entry.insert(cursor); }, } } } }, osc_encode_decode::TuioBundleType::Object => { let object_map = &mut source_collection.object_map; for object in retain_by_ids(object_map, to_keep).into_iter() { events.object_events.push(ObjectEvent::Remove(ObjectData { source_name: decoded_bundle.source.clone(), object: object.clone() })); } if let Some(Set::Object(object_collection)) = decoded_bundle.set { for object in object_collection { match object_map.entry(object.get_session_id()) { indexmap::map::Entry::Occupied(mut entry) => { events.object_events.push(ObjectEvent::Update(ObjectData { source_name: decoded_bundle.source.clone(), object: object.clone() })); entry.insert(object); }, indexmap::map::Entry::Vacant(entry) => { events.object_events.push(ObjectEvent::New(ObjectData { source_name: decoded_bundle.source.clone(), object: object.clone() })); entry.insert(object); }, } } } }, osc_encode_decode::TuioBundleType::Blob => { let blob_map = &mut source_collection.blob_map; for blob in retain_by_ids(blob_map, to_keep).into_iter() { events.blob_events.push(BlobEvent::Remove(BlobData { source_name: decoded_bundle.source.clone(), blob: blob.clone() })); } if let Some(Set::Blob(blob_collection)) = decoded_bundle.set { for blob in blob_collection { match blob_map.entry(blob.get_session_id()) { indexmap::map::Entry::Occupied(mut entry) => { events.blob_events.push(BlobEvent::Update(BlobData { source_name: decoded_bundle.source.clone(), blob: blob.clone() })); entry.insert(blob); }, indexmap::map::Entry::Vacant(entry) => { events.blob_events.push(BlobEvent::New(BlobData { source_name: decoded_bundle.source.clone(), blob: blob.clone() })); entry.insert(blob); }, } } } }, osc_encode_decode::TuioBundleType::Unknown => (), } Ok(true) } else { Ok(false) } } else { Err(TuioError::NotABundle(packet)) } } pub fn local_receiver(&self) -> bool { self.local_receiver } }