//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::{
    sync::mpsc::{channel, Sender},
    time::Duration,
};

use ntex::{
    service::fn_service,
    time::{sleep, Millis},
    util::Ready,
};
use ntex_mqtt::v5;
use zenoh::{
    config::Config,
    internal::{plugins::PluginsManager, runtime::RuntimeBuilder},
    Wait,
};
use zenoh_config::ModeDependentValue;

// The test topic
const TEST_TOPIC: &str = "test-topic";
// The test payload
const TEST_PAYLOAD: &str = "Hello World";

#[derive(Debug)]
struct Error;

impl std::convert::TryFrom<Error> for v5::PublishAck {
    type Error = Error;

    fn try_from(err: Error) -> Result<Self, Self::Error> {
        Err(err)
    }
}

async fn create_mqtt_server() {
    let mut plugins_mgr = PluginsManager::static_plugins_only();
    plugins_mgr.declare_static_plugin::<zenoh_plugin_mqtt::MqttPlugin, &str>("mqtt", true);
    let mut config = Config::default();
    config.insert_json5("plugins/mqtt", "{}").unwrap();
    config
        .timestamping
        .set_enabled(Some(ModeDependentValue::Unique(true)))
        .unwrap();
    config.adminspace.set_enabled(true).unwrap();
    config.plugins_loading.set_enabled(true).unwrap();
    let mut runtime = RuntimeBuilder::new(config)
        .plugins_manager(plugins_mgr)
        .build()
        .await
        .unwrap();
    runtime.start().await.unwrap();
}

async fn create_mqtt_subscriber(tx: Sender<String>) {
    let client = v5::client::MqttConnector::new("127.0.0.1:1883")
        .client_id("mqtt-sub-id")
        .connect()
        .await
        .unwrap();

    let sink = client.sink();

    // handle incoming publishes
    ntex::rt::spawn(client.start(fn_service(
        move |control: v5::client::Control<Error>| match control {
            v5::client::Control::Publish(publish) => {
                println!(
                    "incoming publish: {:?} -> {:?} payload {:?}",
                    publish.packet().packet_id,
                    publish.packet().topic,
                    publish.packet().payload
                );
                let payload = std::str::from_utf8(&publish.packet().payload)
                    .unwrap()
                    .to_owned();
                tx.send(payload).unwrap();
                Ready::Ok(publish.ack(v5::codec::PublishAckReason::Success))
            }
            v5::client::Control::Disconnect(msg) => {
                println!("Server disconnecting: {:?}", msg);
                Ready::Ok(msg.ack())
            }
            v5::client::Control::Error(msg) => {
                println!("Codec error: {:?}", msg);
                Ready::Ok(msg.ack(v5::codec::DisconnectReasonCode::UnspecifiedError))
            }
            v5::client::Control::ProtocolError(msg) => {
                println!("Protocol error: {:?}", msg);
                Ready::Ok(msg.ack())
            }
            v5::client::Control::PeerGone(msg) => {
                println!("Peer closed connection: {:?}", msg.error());
                Ready::Ok(msg.ack())
            }
            v5::client::Control::Closed(msg) => {
                println!("Server closed connection: {:?}", msg);
                Ready::Ok(msg.ack())
            }
        },
    )));

    // subscribe to topic
    sink.subscribe(None)
        .topic_filter(
            TEST_TOPIC.into(),
            v5::codec::SubscriptionOptions {
                qos: v5::codec::QoS::AtLeastOnce,
                no_local: false,
                retain_as_published: false,
                retain_handling: v5::codec::RetainHandling::AtSubscribe,
            },
        )
        .send()
        .await
        .unwrap();
    // Ensure the data is received
    sleep(Millis(3_000)).await;
}

async fn create_mqtt_publisher() {
    let client = v5::client::MqttConnector::new("127.0.0.1:1883")
        .client_id("mqtt-pub-id")
        .connect()
        .await
        .unwrap();

    let sink = client.sink();

    // handle incoming publishes
    ntex::rt::spawn(client.start(fn_service(
        |control: v5::client::Control<Error>| match control {
            v5::client::Control::Publish(publish) => {
                println!(
                    "incoming publish: {:?} -> {:?} payload {:?}",
                    publish.packet().packet_id,
                    publish.packet().topic,
                    publish.packet().payload
                );
                Ready::Ok(publish.ack(v5::codec::PublishAckReason::Success))
            }
            v5::client::Control::Disconnect(msg) => {
                println!("Server disconnecting: {:?}", msg);
                Ready::Ok(msg.ack())
            }
            v5::client::Control::Error(msg) => {
                println!("Codec error: {:?}", msg);
                Ready::Ok(msg.ack(v5::codec::DisconnectReasonCode::UnspecifiedError))
            }
            v5::client::Control::ProtocolError(msg) => {
                println!("Protocol error: {:?}", msg);
                Ready::Ok(msg.ack())
            }
            v5::client::Control::PeerGone(msg) => {
                println!("Peer closed connection: {:?}", msg.error());
                Ready::Ok(msg.ack())
            }
            v5::client::Control::Closed(msg) => {
                println!("Server closed connection: {:?}", msg);
                Ready::Ok(msg.ack())
            }
        },
    )));

    // send client publish
    let ack = sink
        .publish(TEST_TOPIC, TEST_PAYLOAD.into())
        .send_at_least_once()
        .await
        .unwrap();
    // Ensure the data is sent
    println!("ack received: {:?}", ack);
}

#[test]
fn test_mqtt_pub_mqtt_sub() {
    // Run the bridge for MQTT and Zenoh
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.spawn(create_mqtt_server());
    // Wait for the bridge to be ready
    std::thread::sleep(Duration::from_secs(2));

    // MQTT subscriber
    let (tx, rx) = channel();
    rt.spawn_blocking(move || {
        ntex::rt::System::new("mqtt_sub").block_on(create_mqtt_subscriber(tx))
    });
    std::thread::sleep(Duration::from_secs(1));

    // MQTT publisher
    rt.spawn_blocking(|| ntex::rt::System::new("mqtt_pub").block_on(create_mqtt_publisher()));

    // Wait for the test to complete
    let result = rx.recv_timeout(Duration::from_secs(3));

    // Stop the tokio runtime
    // Since ntex server is running in blocking thread, we need to force shutdown the runtime while completing the test
    // Note that we should shutdown the runtime before doing any check that might panic the test.
    // Otherwise, there is no way to shutdown ntex server
    rt.shutdown_background();

    let payload = result.expect("Receiver timeout");
    assert_eq!(payload, TEST_PAYLOAD);
}

#[test]
fn test_mqtt_pub_zenoh_sub() {
    // Run the bridge for MQTT and Zenoh
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.spawn(create_mqtt_server());
    // Wait for the bridge to be ready
    std::thread::sleep(Duration::from_secs(2));

    // Zenoh subscriber
    let (tx, rx) = channel();
    let session = zenoh::open(zenoh::Config::default()).wait().unwrap();
    let _subscriber = session
        .declare_subscriber(TEST_TOPIC)
        .callback_mut(move |sample| {
            let data = sample
                .payload()
                .try_to_string()
                .to_owned()
                .unwrap()
                .into_owned();
            tx.send(data).unwrap();
        })
        .wait()
        .unwrap();
    std::thread::sleep(Duration::from_secs(1));

    // MQTT publisher
    rt.spawn_blocking(|| ntex::rt::System::new("mqtt_pub").block_on(create_mqtt_publisher()));

    // Wait for the test to complete
    let result = rx.recv_timeout(Duration::from_secs(3));

    // Stop the tokio runtime
    // Since ntex server is running in blocking thread, we need to force shutdown the runtime while completing the test
    // Note that we should shutdown the runtime before doing any check that might panic the test.
    // Otherwise, there is no way to shutdown ntex server
    rt.shutdown_background();

    let payload = result.expect("Receiver timeout");
    assert_eq!(payload, TEST_PAYLOAD);
}

#[test]
fn test_zenoh_pub_mqtt_sub() {
    // Run the bridge for MQTT and Zenoh
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.spawn(create_mqtt_server());
    // Wait for the bridge to be ready
    std::thread::sleep(Duration::from_secs(2));

    // MQTT subscriber
    let (tx, rx) = channel();
    rt.spawn_blocking(move || {
        ntex::rt::System::new("mqtt_sub").block_on(create_mqtt_subscriber(tx))
    });
    std::thread::sleep(Duration::from_secs(1));

    // Zenoh publisher
    let session = zenoh::open(zenoh::Config::default()).wait().unwrap();
    let publisher = session.declare_publisher(TEST_TOPIC).wait().unwrap();
    publisher.put(TEST_PAYLOAD).wait().unwrap();

    // Wait for the test to complete
    let result = rx.recv_timeout(Duration::from_secs(3));

    // Stop the tokio runtime
    // Since ntex server is running in blocking thread, we need to force shutdown the runtime while completing the test
    // Note that we should shutdown the runtime before doing any check that might panic the test.
    // Otherwise, there is no way to shutdown ntex server
    rt.shutdown_background();

    let payload = result.expect("Receiver timeout");
    assert_eq!(payload, TEST_PAYLOAD);
}