use std::{
    future::Future,
    pin::Pin,
    task::Context,
    time::{Duration, Instant},
};

use futures::{future::join_all, FutureExt};
use loole::{bounded, RecvError, SendError};

fn ms(ms: u64) -> Duration {
    Duration::from_millis(ms)
}

async fn async_sleep(ms: u64) {
    tokio::time::sleep(Duration::from_millis(ms)).await
}

#[test]
fn ordered_deques() {
    let (tx, rx) = bounded(0);

    let mut send_future_1 = tx.send_async(1);
    let mut send_future_2 = tx.send_async(2);

    let mut cx = Context::from_waker(futures::task::noop_waker_ref());
    let cx = &mut cx;

    assert!(Pin::new(&mut send_future_1).poll(cx).is_pending());
    assert!(Pin::new(&mut send_future_2).poll(cx).is_pending());

    drop(rx);

    assert_eq!(
        Pin::new(&mut send_future_2).poll(cx),
        std::task::Poll::Ready(Err(SendError(2)))
    );
    assert_eq!(
        Pin::new(&mut send_future_1).poll(cx),
        std::task::Poll::Ready(Err(SendError(1)))
    );
}

#[tokio::test]
async fn async_send_before_recv_buffer_0() {
    let (tx, rx) = bounded(0);
    tokio::spawn(tx.send_async(1));
    async_sleep(100).await;
    assert_eq!(rx.recv_async().await, Ok(1));
}

#[tokio::test]
async fn async_recv_before_send_buffer_0() {
    let (tx, rx) = bounded(0);
    let h = tokio::spawn(rx.recv_async());
    async_sleep(100).await;
    let f = h.map(|x| x.unwrap());
    assert_eq!(tx.send_async(1).await, Ok(()));
    assert_eq!(f.await, Ok(1));
}

#[tokio::test]
async fn async_close_before_send_buffer_0() {
    let (tx, rx) = bounded::<()>(0);
    drop(rx);
    assert_eq!(tx.send_async(()).await, Err(SendError(())));
}

#[tokio::test]
async fn async_send_before_recv_buffer_1() {
    let (tx, rx) = bounded(1);
    assert_eq!(tx.send_async(1).await, Ok(()));
    assert_eq!(rx.recv_async().await, Ok(1));
}

#[tokio::test]
async fn async_recv_before_send_buffer_1() {
    let (tx, rx) = bounded(1);
    let h = tokio::spawn(rx.recv_async());
    async_sleep(100).await;
    assert_eq!(tx.send_async(1).await, Ok(()));
    assert_eq!(h.await.unwrap(), Ok(1));
}

#[tokio::test]
async fn async_recv_after_manually_closed_sender() {
    let (tx, rx) = bounded(1);
    assert_eq!(tx.send_async(1).await, Ok(()));
    assert!(tx.close());
    assert_eq!(rx.recv_async().await, Ok(1));
    assert_eq!(rx.recv_async().await, Err(RecvError::Disconnected));
}

#[tokio::test]
async fn async_recv_after_manually_closeed_receiver() {
    let (tx, rx) = bounded(1);
    assert_eq!(tx.send_async(1).await, Ok(()));
    assert!(rx.close());
    assert_eq!(rx.recv_async().await, Ok(1));
    assert_eq!(rx.recv_async().await, Err(RecvError::Disconnected));
}

#[tokio::test]
async fn async_2_sends_before_2_recvs_buffer_1() {
    let (tx, rx) = bounded(1);
    assert_eq!(tx.capacity(), Some(1));
    assert_eq!(tx.len(), 0);
    tx.send_async(1).await.unwrap();
    assert_eq!(tx.len(), 1);
    tokio::spawn(tx.send_async(2));
    async_sleep(100).await;
    assert_eq!(rx.len(), 1);
    let r1r = rx.recv_async().await;
    assert_eq!(r1r, Ok(1));
    assert_eq!(rx.len(), 1);
    let r2r = rx.recv_async().await;
    assert_eq!(r2r, Ok(2));
    assert_eq!(rx.len(), 0);
}

#[tokio::test]
async fn async_close_before_recv_buffer_0() {
    let (tx, rx) = bounded::<()>(0);
    drop(tx);
    assert_eq!(rx.recv_async().await, Err(RecvError::Disconnected));
}

#[tokio::test]
async fn async_close_before_recv_buffer_1() {
    let (tx, rx) = bounded(1);
    assert_eq!(tx.send_async(1).await, Ok(()));
    drop(tx);
    assert_eq!(rx.recv_async().await, Ok(1));
    assert_eq!(rx.recv_async().await, Err(RecvError::Disconnected));
}

#[tokio::test]
async fn async_concurrent_writes_and_reads_buffer_0() {
    let (tx, rx) = bounded(0);
    let _sends = tokio::spawn(join_all([
        tx.send_async(1),
        tx.send_async(2),
        tx.send_async(3),
    ]));
    let recvs = tokio::spawn(join_all([
        rx.recv_async(),
        rx.recv_async(),
        rx.recv_async(),
    ]))
    .await
    .unwrap();
    assert_eq!(recvs, vec![Ok(1), Ok(2), Ok(3)]);
}

#[tokio::test]
async fn async_shift_pending_send_buffer_0() {
    let (tx, rx) = bounded(0);
    let tx_clone = tx.clone();
    let h1 = tokio::spawn(async move {
        let start = Instant::now();
        assert_eq!(tx.send_async(1).await, Ok(()));
        start.elapsed()
    });
    let h2 = tokio::spawn(async move {
        let start = Instant::now();
        async_sleep(100).await;
        assert_eq!(tx_clone.send_async(2).await, Ok(()));
        start.elapsed()
    });
    tokio::spawn(async move {
        async_sleep(1000).await;
        assert_eq!(rx.recv(), Ok(1));
        async_sleep(1000).await;
        assert_eq!(rx.recv(), Ok(2));
    });
    let elapsed1 = h1.await.unwrap();
    assert!(elapsed1 >= ms(900));
    assert!(elapsed1 < ms(1100));
    let elapsed2 = h2.await.unwrap();
    assert!(elapsed2 >= ms(1900));
    assert!(elapsed2 < ms(2100));
}

#[tokio::test]
async fn async_shift_pending_send_buffer_2() {
    let (tx, rx) = bounded(2);
    assert_eq!(tx.send_async(1).await, Ok(()));
    assert_eq!(tx.send_async(2).await, Ok(()));
    let h = tokio::spawn(async move {
        let start = Instant::now();
        assert_eq!(tx.send_async(3).await, Ok(()));
        start.elapsed()
    });
    tokio::spawn(async move {
        async_sleep(1000).await;
        assert_eq!(rx.recv_async().await, Ok(1));
    })
    .await
    .unwrap();

    let elapsed = h.await.unwrap();
    assert!(elapsed >= ms(900));
    assert!(elapsed < ms(1100));
}