use ntex::service::{fn_factory_with_config, fn_service}; use ntex::util::Ready; use ntex_mqtt::v5::codec::PublishAckReason; use ntex_mqtt::{v3, v5, MqttServer}; #[derive(Clone, Debug)] struct MySession { // our custom session information client_id: String, } #[derive(Debug)] struct MyServerError; impl From<()> for MyServerError { fn from(_: ()) -> Self { MyServerError } } impl std::convert::TryFrom for v5::PublishAck { type Error = MyServerError; fn try_from(err: MyServerError) -> Result { Err(err) } } async fn handshake_v3( handshake: v3::Handshake, ) -> Result, MyServerError> { log::info!("new connection: {:?}", handshake); let session = MySession { client_id: handshake.packet().client_id.to_string() }; Ok(handshake.ack(session, false)) } async fn publish_v3( session: v3::Session, publish: v3::Publish, ) -> Result<(), MyServerError> { log::info!( "incoming publish ({:?}): {:?} -> {:?}", session.state(), publish.id(), publish.topic() ); // example: only "my-client-id" may publish if session.state().client_id == "my-client-id" { Ok(()) } else { // with MQTTv3 we can only close the connection Err(MyServerError) } } async fn handshake_v5( handshake: v5::Handshake, ) -> Result, MyServerError> { log::info!("new connection: {:?}", handshake); let session = MySession { client_id: handshake.packet().client_id.to_string() }; Ok(handshake.ack(session)) } async fn publish_v5( session: v5::Session, publish: v5::Publish, ) -> Result { log::info!( "incoming publish ({:?}) : {:?} -> {:?}", session.state(), publish.id(), publish.topic() ); // example: only "my-client-id" may publish if session.state().client_id == "my-client-id" { Ok(publish.ack()) } else { Ok(publish.ack().reason_code(PublishAckReason::NotAuthorized)) } } #[ntex::main] async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "session=trace,ntex=trace,ntex_mqtt=trace,basic=trace"); env_logger::init(); log::info!("Hello"); ntex::server::build() .bind("mqtt", "127.0.0.1:1883", |_| { MqttServer::new() .v3(v3::MqttServer::new(handshake_v3) .publish(fn_factory_with_config(|session: v3::Session| { Ready::Ok::<_, MyServerError>(fn_service(move |req| { publish_v3(session.clone(), req) })) })) .finish()) .v5(v5::MqttServer::new(handshake_v5) .publish(fn_factory_with_config(|session: v5::Session| { Ready::Ok::<_, MyServerError>(fn_service(move |req| { publish_v5(session.clone(), req) })) })) .finish()) })? .workers(1) .run() .await }