use ntex::service::chain_factory; use ntex_mqtt::{v3, v5, MqttError, MqttServer}; use ntex_tls::openssl::SslAcceptor; use openssl::ssl::{self, SslFiletype, SslMethod}; #[derive(Clone)] struct Session; #[derive(Debug)] struct ServerError; impl From<()> for ServerError { fn from(_: ()) -> Self { ServerError } } impl std::convert::TryFrom for v5::PublishAck { type Error = ServerError; fn try_from(err: ServerError) -> Result { Err(err) } } async fn handshake_v3( handshake: v3::Handshake, ) -> Result, ServerError> { log::info!("new connection: {:?}", handshake); Ok(handshake.ack(Session, false)) } async fn publish_v3(publish: v3::Publish) -> Result<(), ServerError> { log::info!("incoming publish: {:?} -> {:?}", publish.id(), publish.topic()); Ok(()) } async fn handshake_v5( handshake: v5::Handshake, ) -> Result, ServerError> { log::info!("new connection: {:?}", handshake); Ok(handshake.ack(Session)) } async fn publish_v5(publish: v5::Publish) -> Result { log::info!("incoming publish: {:?} -> {:?}", publish.id(), publish.topic()); Ok(publish.ack()) } #[ntex::main] async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "ntex=info,ntex_io=trace,ntex_mqtt=trace,openssl=trace"); env_logger::init(); // create self-signed certificates using: // openssl req -x509 -nodes -subj '/CN=localhost' -newkey rsa:4096 -keyout examples/key8.pem -out examples/cert.pem -days 365 -keyform PEM // openssl rsa -in examples/key8.pem -out examples/key.pem let mut builder = ssl::SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); builder.set_private_key_file("./tests/key.pem", SslFiletype::PEM).unwrap(); builder.set_certificate_chain_file("./tests/cert.pem").unwrap(); let acceptor = builder.build(); ntex::server::build() .bind("mqtt", "127.0.0.1:8883", move |_| { chain_factory(SslAcceptor::new(acceptor.clone())) .map_err(|_err| MqttError::Service(ServerError {})) .and_then( MqttServer::new() .v3(v3::MqttServer::new(handshake_v3).publish(publish_v3).finish()) .v5(v5::MqttServer::new(handshake_v5).publish(publish_v5).finish()), ) })? .workers(1) .run() .await }