#![cfg(feature = "macros")] use std::{ pin::Pin, task::{Context, Poll}, }; use actix::{io::SinkWrite, prelude::*}; use bytes::{Buf, Bytes}; use futures_sink::Sink; use tokio::sync::mpsc; type ByteSender = mpsc::UnboundedSender; struct MySink { sender: ByteSender, queue: Vec, } // simple sink that send one bit at a time // and produce an error on '#' impl Sink for MySink { type Error = (); fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); if !this.queue.is_empty() { let bytes = &mut this.queue[0]; if bytes[0] == b'#' { return Poll::Ready(Err(())); } this.sender.send(bytes[0]).unwrap(); bytes.advance(1); if bytes.is_empty() { this.queue.remove(0); } } if this.queue.is_empty() { Poll::Ready(Ok(())) } else { cx.waker().wake_by_ref(); Poll::Pending } } fn start_send(self: Pin<&mut Self>, bytes: Bytes) -> Result<(), Self::Error> { self.get_mut().queue.push(bytes); Ok(()) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.poll_ready(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { self.poll_ready(cx) } } struct SinkUnboundedSender { tx: mpsc::UnboundedSender, } impl Sink for SinkUnboundedSender { type Error = mpsc::error::SendError; fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn start_send(self: Pin<&mut Self>, t: Bytes) -> Result<(), Self::Error> { mpsc::UnboundedSender::send(&self.get_mut().tx, t) } fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> { Poll::Ready(Ok(())) } } struct Data { bytes: Bytes, last: bool, } impl Message for Data { type Result = (); } struct MyActor { sink: SinkWrite, } impl Actor for MyActor { type Context = actix::Context; } impl actix::io::WriteHandler<()> for MyActor { fn finished(&mut self, _ctxt: &mut Self::Context) { System::current().stop(); } } impl Handler for MyActor { type Result = (); fn handle(&mut self, data: Data, _ctxt: &mut Self::Context) { let _ = self.sink.write(data.bytes); if data.last { self.sink.close(); } } } #[actix::test] async fn test_send_1() { let (sender, mut receiver) = mpsc::unbounded_channel(); actix_rt::spawn(async move { let addr = MyActor::create(move |ctxt| { let sink = MySink { sender, queue: Vec::new(), }; MyActor { sink: SinkWrite::new(sink, ctxt), } }); let data = Data { bytes: Bytes::from_static(b"Hello"), last: true, }; addr.do_send(data); }); let mut res = Vec::new(); while let Some(r) = receiver.recv().await { res.push(r); } assert_eq!(b"Hello", &res[..]); } #[actix::test] async fn test_send_2() { let (sender, mut receiver) = mpsc::unbounded_channel(); actix_rt::spawn(async move { let addr = MyActor::create(move |ctxt| { let sink = MySink { sender, queue: Vec::new(), }; MyActor { sink: SinkWrite::new(sink, ctxt), } }); let data = Data { bytes: Bytes::from_static(b"Hello"), last: false, }; addr.do_send(data); let data = Data { bytes: Bytes::from_static(b" world"), last: true, }; addr.do_send(data); }); let mut res = Vec::new(); while let Some(r) = receiver.recv().await { res.push(r); } assert_eq!(b"Hello world", &res[..]); } #[actix::test] async fn test_send_error() { let (sender, mut receiver) = mpsc::unbounded_channel(); actix_rt::spawn(async move { let addr = MyActor::create(move |ctxt| { let sink = MySink { sender, queue: Vec::new(), }; MyActor { sink: SinkWrite::new(sink, ctxt), } }); let data = Data { bytes: Bytes::from_static(b"Hello #"), last: false, }; addr.do_send(data); }); let mut res = Vec::new(); while let Some(r) = receiver.recv().await { res.push(r); } assert_eq!(b"Hello ", &res[..]); } struct AnotherActor { sink: SinkWrite, } impl Actor for AnotherActor { type Context = actix::Context; } impl actix::io::WriteHandler> for AnotherActor { fn finished(&mut self, _ctxt: &mut Self::Context) { System::current().stop(); } } impl Handler for AnotherActor { type Result = (); fn handle(&mut self, data: Data, _ctxt: &mut Self::Context) { let _ = self.sink.write(data.bytes); if data.last { self.sink.close(); } } } #[actix::test] async fn test_send_bytes() { let (sender, mut receiver) = mpsc::unbounded_channel(); let bytes = Bytes::from_static(b"Hello"); let expected_bytes = bytes.clone(); actix_rt::spawn(async move { let addr = AnotherActor::create(move |ctxt| AnotherActor { sink: SinkWrite::new(SinkUnboundedSender { tx: sender }, ctxt), }); let data = Data { bytes, last: true }; addr.do_send(data); }); let res = receiver.recv().await.unwrap(); assert_eq!(expected_bytes, res); } #[actix::test] async fn test_send_single_bytes() { let (sender, mut receiver) = mpsc::unbounded_channel(); let addr = MyActor::create(move |ctxt| { let sink = MySink { sender, queue: Vec::new(), }; MyActor { sink: SinkWrite::new(sink, ctxt), } }); let data = Data { bytes: Bytes::from_static(b"h"), last: false, }; addr.do_send(data); let data = Data { bytes: Bytes::from_static(b"i"), last: false, }; addr.do_send(data); let mut res = Vec::new(); res.push(receiver.recv().await.unwrap()); res.push(receiver.recv().await.unwrap()); let data = Data { bytes: Bytes::from_static(b"!"), last: true, }; addr.do_send(data); res.push(receiver.recv().await.unwrap()); assert_eq!(b"hi!", &res[..]); }