//! SUBSCRIBE Actions requirements
use sage_broker::BrokerSettings;
use sage_mqtt::{Packet, ReasonCode, Subscribe, Topic};
pub mod utils;

use utils::client::Response;
pub use utils::*;

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.1-1: Bits 3,2,1 and 0 of the Fixed Header of the SUBSCRIBE packet are reserved and
/// MUST be set to 0,0,1 and 0 respectively. The Server MUST treat any other value as malformed and
/// close the Network Connection
macro_rules! mqtt_3_8_1_1 {
    ($($name:ident: $value:expr,)*) => {
        $(

                #[tokio::test]
                async fn $name() {
                    let (fixed_header, expect_success) = $value;
                    mqtt_3_8_1_1(fixed_header, expect_success).await
                }


        )*
    }
}

mqtt_3_8_1_1! {
    mqtt_3_8_1_1_0010: (0b1000_0010, true),
    mqtt_3_8_1_1_0000: (0b1000_0000, false),
    mqtt_3_8_1_1_0001: (0b1000_0001, false),
    mqtt_3_8_1_1_0011: (0b1000_0011, false),
    mqtt_3_8_1_1_0100: (0b1000_0100, false),
    mqtt_3_8_1_1_0101: (0b1000_0101, false),
    mqtt_3_8_1_1_0110: (0b1000_0110, false),
    mqtt_3_8_1_1_0111: (0b1000_0111, false),
    mqtt_3_8_1_1_1000: (0b1000_1000, false),
    mqtt_3_8_1_1_1001: (0b1000_1001, false),
    mqtt_3_8_1_1_1010: (0b1000_1010, false),
    mqtt_3_8_1_1_1011: (0b1000_1011, false),
    mqtt_3_8_1_1_1100: (0b1000_1100, false),
    mqtt_3_8_1_1_1101: (0b1000_1101, false),
    mqtt_3_8_1_1_1110: (0b1000_1110, false),
    mqtt_3_8_1_1_1111: (0b1000_1111, false),
}

async fn mqtt_3_8_1_1(fixed_header: u8, expect_success: bool) {
    // We will send any version of incorrect Fixed Headers and wait for the server's response
    // We will also check for the validity of the unique correct case
    let (_, server, local_addr, shutdown) = server::spawn(BrokerSettings {
        keep_alive: TIMEOUT_DELAY,
        ..BrokerSettings::valid_default()
    })
    .await;

    let (mut stream, _) = client::connect(&local_addr, Default::default()).await;

    let mut buffer = Vec::new();
    Packet::Subscribe(Subscribe {
        subscriptions: vec![(Topic::from("hello/world"), Default::default())],
        ..Default::default()
    })
    .encode(&mut buffer)
    .await
    .unwrap();
    buffer[0] = fixed_header; // Force Fixed Header value

    if expect_success {
        assert!(matches!(
            client::send_waitback_data(&mut stream, buffer).await,
            Response::Packet(Packet::SubAck(_))
        ));
    } else {
        let response = client::send_waitback_data(&mut stream, buffer).await;
        if let Response::Packet(packet) = response {
            if let Packet::Disconnect(packet) = packet {
                assert_eq!(packet.reason_code, ReasonCode::MalformedPacket);
            } else {
                panic!("Expected DISCONNECT after malformed SUBSCRIBE");
            }
        } else {
            panic!("Expected packet after malformed SUBSCRIBE");
        }
    }

    server::stop(shutdown, server).await;
}

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.3-1: The Topic Filters MUST be a UTF-8 Encoded String.
#[tokio::test]
async fn mqtt_3_8_3_1() {}

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.3-2: The Payload MUST contain at least one Topic Filter and Subscription Options pair.
/// A SUBSCRIBE packet with no Payload is a Protocol Error
#[tokio::test]
async fn mqtt_3_8_3_2() {
    // We will send a packet with no Subscription
    let (_, server, local_addr, shutdown) = server::spawn(BrokerSettings {
        keep_alive: TIMEOUT_DELAY,
        ..BrokerSettings::valid_default()
    })
    .await;

    let (mut stream, _) = client::connect(&local_addr, Default::default()).await;

    let mut buffer = Vec::new();
    Packet::Subscribe(Default::default())
        .encode(&mut buffer)
        .await
        .unwrap();

    let response = client::send_waitback_data(&mut stream, buffer).await;
    if let Response::Packet(packet) = response {
        if let Packet::Disconnect(packet) = packet {
            assert_eq!(packet.reason_code, ReasonCode::ProtocolError);
        } else {
            panic!("Expected DISCONNECT after empty SUBSCRIBE");
        }
    } else {
        panic!("Expected packet after empty SUBSCRIBE");
    }

    server::stop(shutdown, server).await;
}

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.3-3: Bit 2 of the Subscription Options represents the No Local option. If the value is
/// 1, Application Messages MUST NOT be forwarded to a connection with a ClientID equal to the
/// ClientID of the publishing connection.
#[tokio::test]
async fn mqtt_3_8_3_3() {}

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.3-4: It is a Protocol Error to set the No Local bit to 1 on a Shared Subscription.
#[tokio::test]
async fn mqtt_3_8_3_4() {}

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.3-5: The Server MUST treat a SUBSCRIBE packet as malformed if any of Reserved bits in
/// the Payload are non-zero.
#[tokio::test]
async fn mqtt_3_8_3_5_0001() {
    mqtt_3_8_3_5(0b0100_0000).await
}

#[tokio::test]
async fn mqtt_3_8_3_5_0010() {
    mqtt_3_8_3_5(0b1000_0000).await
}

#[tokio::test]
async fn mqtt_3_8_3_5_0011() {
    mqtt_3_8_3_5(0b1100_0000).await
}

async fn mqtt_3_8_3_5(sub_payload: u8) {
    let (_, server, local_addr, shutdown) = server::spawn(BrokerSettings {
        keep_alive: TIMEOUT_DELAY,
        ..BrokerSettings::valid_default()
    })
    .await;

    let (mut stream, _) = client::connect(&local_addr, Default::default()).await;
    let mut buffer = Vec::new();
    // Create a subscribe packet with a topic

    let subs = Subscribe {
        subscriptions: vec![("Cat".into(), Default::default())],
        ..Default::default()
    };
    Packet::Subscribe(subs).encode(&mut buffer).await.unwrap();
    *buffer.last_mut().unwrap() |= sub_payload; // Force Fixed Header value

    let response = client::send_waitback_data(&mut stream, buffer).await;
    if let Response::Packet(packet) = response {
        if let Packet::Disconnect(packet) = packet {
            assert_eq!(packet.reason_code, ReasonCode::MalformedPacket);
        } else {
            panic!(
                "Expected DISCONNECT after invalid SUBSCRIBE, got {:?}",
                packet
            );
        }
    } else {
        panic!("Expected packet after invalid SUBSCRIBE");
    }
    server::stop(shutdown, server).await;
}

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.4-1: When the Server receives a SUBSCRIBE packet from a Client, the Server MUST
/// respond with a SUBACK packet.
#[tokio::test]
async fn mqtt_3_8_4_1() {
    let (_, server, local_addr, shutdown) = server::spawn(BrokerSettings {
        keep_alive: TIMEOUT_DELAY,
        ..BrokerSettings::valid_default()
    })
    .await;
    let (mut stream, _) = client::connect(&local_addr, Default::default()).await;

    let subscribe = Subscribe {
        subscriptions: vec![(Topic::from("hello/world"), Default::default())],
        ..Default::default()
    };

    assert!(matches!(
        client::send_waitback(&mut stream, subscribe.into()).await,
        Response::Packet(Packet::SubAck(_))
    ));

    server::stop(shutdown, server).await;
}

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.4-2: The SUBACK packet MUST have the same Packet Identifier as the SUBSCRIBE packet
/// that it is acknowledging.
#[tokio::test]
async fn mqtt_3_8_4_2() {
    // We send 100 random packet identifiers and expect the SubAck to return the same each
    let (_, server, local_addr, shutdown) = server::spawn(BrokerSettings {
        keep_alive: TIMEOUT_DELAY,
        ..BrokerSettings::valid_default()
    })
    .await;
    let (mut stream, _) = client::connect(&local_addr, Default::default()).await;
    for _ in 0..100 {
        let packet_identifier = rand::random();

        let subscribe = Subscribe {
            packet_identifier,
            subscriptions: vec![(Topic::from("hello/world"), Default::default())],
            ..Default::default()
        };

        if let Response::Packet(Packet::SubAck(suback)) =
            client::send_waitback(&mut stream, subscribe.into()).await
        {
            assert_eq!(suback.packet_identifier, packet_identifier);
        } else {
            panic!("Expected SUBACK after SUBSCRIBE");
        }
    }

    server::stop(shutdown, server).await;
}

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.4-3: If a Server receives a SUBSCRIBE packet containing a Topic Filter that is
/// identical to a Non‑shared Subscription’s Topic Filter for the current Session then it MUST
/// replace that existing Subscription with a new Subscription.
#[tokio::test]
async fn mqtt_3_8_4_3() {
    let topic = Topic::from("Topic1");
    let (sessions, server, local_addr, shutdown) = server::spawn(BrokerSettings {
        keep_alive: TIMEOUT_DELAY,
        ..BrokerSettings::valid_default()
    })
    .await;
    let (mut stream, client_id) = client::connect(&local_addr, Default::default()).await;
    let session = sessions.read().unwrap().get(&client_id.unwrap()).unwrap();

    // Send twice the same topic sub. Each time check only 1 sub exist within
    // the client
    for _ in 0..2 {
        let subscribe = Subscribe {
            subscriptions: vec![(topic.clone(), Default::default())],
            ..Default::default()
        };

        assert!(matches!(
            client::send_waitback(&mut stream, subscribe.into()).await,
            Response::Packet(Packet::SubAck(_))
        ));

        {
            let subs = session.subs().read().unwrap();
            assert_eq!(subs.len(), 1);
            assert!(subs.has_filter(&topic,));
        }
    }

    server::stop(shutdown, server).await;
}

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.4-4: If the Retain Handling option is 0, any existing retained messages matching the
/// Topic Filter MUST be re-sent, but Application Messages MUST NOT be lost due to replacing the
/// Subscription.
#[tokio::test]
async fn mqtt_3_8_4_4() {}

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.4-5: If a Server receives a SUBSCRIBE packet that contains multiple Topic Filters it
/// MUST handle that packet as if it had received a sequence of multiple SUBSCRIBE packets, except
/// that it combines their responses into a single SUBACK response.
#[tokio::test]
async fn mqtt_3_8_4_5() {
    // Send a sub with three topics
    let topics = vec!["topic1", "topic2", "topic3"];

    let (sessions, server, local_addr, shutdown) = server::spawn(BrokerSettings {
        keep_alive: TIMEOUT_DELAY,
        ..BrokerSettings::valid_default()
    })
    .await;

    let (mut stream, client_id) = client::connect(&local_addr, Default::default()).await;
    let session = sessions.read().unwrap().get(&client_id.unwrap()).unwrap();

    let subscribe = Subscribe {
        subscriptions: topics
            .iter()
            .map(|&t| (Topic::from(t), Default::default()))
            .collect(),
        ..Default::default()
    };

    assert!(matches!(
        client::send_waitback(&mut stream, subscribe.into()).await,
        Response::Packet(Packet::SubAck(_))
    ));

    // Check for the existence of the three subscriptions in the session
    {
        let subs = session.subs().read().unwrap();
        assert_eq!(subs.len(), 3);
        assert!(subs.has_filter(&Topic::from("topic1")));
        assert!(subs.has_filter(&Topic::from("topic2")));
        assert!(subs.has_filter(&Topic::from("topic3")));
    }

    // Now resend each separately and make the same checks
    for subscribe in topics.iter().map(|&x| Subscribe {
        subscriptions: vec![(Topic::from(x), Default::default())],
        ..Default::default()
    }) {
        assert!(matches!(
            client::send_waitback(&mut stream, subscribe.into()).await,
            Response::Packet(Packet::SubAck(_))
        ));
    }

    {
        let subs = session.subs().read().unwrap();
        assert_eq!(subs.len(), 3);
        assert!(subs.has_filter(&Topic::from("topic1")));
        assert!(subs.has_filter(&Topic::from("topic2")));
        assert!(subs.has_filter(&Topic::from("topic3")));
    }

    server::stop(shutdown, server).await;
}

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.4-6: The SUBACK packet sent by the Server to the Client MUST contain a Reason Code for
/// each Topic Filter/Subscription Option pair.
#[tokio::test]
async fn mqtt_3_8_4_6() {
    // Send a sub with three topics
    let topics = vec!["topic1", "topic2", "topic3"];

    let (_, server, local_addr, shutdown) = server::spawn(BrokerSettings {
        keep_alive: TIMEOUT_DELAY,
        ..BrokerSettings::valid_default()
    })
    .await;
    let (mut stream, _) = client::connect(&local_addr, Default::default()).await;

    let subscribe = Subscribe {
        subscriptions: topics
            .iter()
            .map(|&t| (Topic::from(t), Default::default()))
            .collect(),
        ..Default::default()
    };

    let response = client::send_waitback(&mut stream, subscribe.into()).await;

    if let Response::Packet(Packet::SubAck(suback)) = response {
        assert_eq!(suback.reason_codes.len(), 3);
    }

    server::stop(shutdown, server).await;
}

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.4-7: This Reason Code MUST either show the maximum QoS that was granted for that
/// Subscription or indicate that the subscription failed.
#[tokio::test]
async fn mqtt_3_8_4_7() {}

////////////////////////////////////////////////////////////////////////////////
/// MQTT-3.8.4-8: The QoS of Payload Messages sent in response to a Subscription MUST be the
/// minimum of the QoS of the originally published message and the Maximum QoS granted by the
/// Server.
#[tokio::test]
async fn mqtt_3_8_4_8() {}