use specs::{ReadExpect, System, WriteExpect}; use crate::{ common::ClientFilter, server::encode_message, world::{profiler::Profiler, Clients, MessageQueue}, EncodedMessage, EncodedMessageQueue, MessageType, Transports, }; pub struct BroadcastSystem; impl<'a> System<'a> for BroadcastSystem { type SystemData = ( ReadExpect<'a, Transports>, ReadExpect<'a, Clients>, WriteExpect<'a, MessageQueue>, WriteExpect<'a, EncodedMessageQueue>, WriteExpect<'a, Profiler>, ); fn run(&mut self, data: Self::SystemData) { let (transports, clients, mut queue, mut encoded_queue, mut profiler) = data; encoded_queue.append(queue.drain(..).collect()); encoded_queue.process(); let done_messages = encoded_queue.receive(); if done_messages.is_empty() { return; } for (encoded, filter) in done_messages { transports.values().for_each(|recipient| { recipient.do_send(encoded.to_owned()); }); if let ClientFilter::Direct(id) = &filter { if let Some(client) = clients.get(id) { client.addr.do_send(encoded); } continue; } clients.iter().for_each(|(id, client)| { match &filter { ClientFilter::All => {} ClientFilter::Include(ids) => { if !ids.iter().any(|i| *i == *id) { return; } } ClientFilter::Exclude(ids) => { if ids.iter().any(|i| *i == *id) { return; } } _ => {} }; client.addr.do_send(encoded.to_owned()); }) } } } // use log::info; // use specs::{ReadExpect, System, WriteExpect}; // use crate::{ // common::ClientFilter, // server::encode_message, // world::{profiler::Profiler, Clients, MessageQueue}, // EncodedMessage, EncodedMessageQueue, MessageType, Transports, // }; // pub struct BroadcastSystem; // impl<'a> System<'a> for BroadcastSystem { // type SystemData = ( // ReadExpect<'a, Transports>, // ReadExpect<'a, Clients>, // WriteExpect<'a, MessageQueue>, // WriteExpect<'a, EncodedMessageQueue>, // WriteExpect<'a, Profiler>, // ); // fn run(&mut self, data: Self::SystemData) { // let (transports, clients, mut queue, mut encoded_queue, mut profiler) = data; // profiler.time("broadcast"); // let all_messages: Vec<_> = queue.drain(..).collect(); // let (chunk_messages, other_messages): (Vec<_>, Vec<_>) = all_messages // .into_iter() // .partition(|message| !message.0.chunks.is_empty()); // encoded_queue.append(chunk_messages); // encoded_queue.process(); // let done_messages = encoded_queue.receive(); // let other_messages_encoded: Vec<_> = other_messages // .into_iter() // .map(|(message, filter)| (EncodedMessage(encode_message(&message)), filter)) // .collect(); // let all_messages: Vec<_> = done_messages // .into_iter() // .chain(other_messages_encoded.into_iter()) // .collect(); // if all_messages.is_empty() { // return; // } // for (encoded, filter) in all_messages { // transports.values().for_each(|recipient| { // recipient.do_send(encoded.to_owned()); // }); // match &filter { // ClientFilter::Direct(id) => { // if let Some(client) = clients.get(id) { // client.addr.do_send(encoded); // } // } // ClientFilter::All => { // clients.values().for_each(|client| { // client.addr.do_send(encoded.to_owned()); // }); // } // ClientFilter::Include(ids) => { // clients.iter().for_each(|(id, client)| { // if ids.contains(id) { // client.addr.do_send(encoded.to_owned()); // } // }); // } // ClientFilter::Exclude(ids) => { // clients.iter().for_each(|(id, client)| { // if !ids.contains(id) { // client.addr.do_send(encoded.to_owned()); // } // }); // } // } // } // profiler.time_end("broadcast"); // } // }