// MIT License // // Copyright (c) 2019-2021 Alessandro Cresto Miseroglio // Copyright (c) 2019-2021 Tangram Technologies S.R.L. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. use std::io; use std::collections::VecDeque; use actix::actors::resolver::{Connect, Resolver}; use actix::prelude::*; use futures::unsync::oneshot; use futures::Future; use backoff::backoff::Backoff; use backoff::ExponentialBackoff; use log::{error, info, debug}; use serde_json; use tokio_codec::FramedRead; use tokio_io::io::WriteHalf; use tokio_io::AsyncRead; use tokio_tcp::TcpStream; //use bytes::BytesMut; use crate::codec::{NsqCodec, Cmd}; use crate::commands::{identify, nop, auth, sub, rdy, publish, VERSION}; use crate::config::{Config, NsqdConfig}; use crate::error::Error; use crate::msgs::{Auth, Pub, Sub, Ready}; use crate::conn::ConnState; pub struct Producer { topic: String, channel: String, addr: String, config: Config, backoff: ExponentialBackoff, cell: Option, NsqCodec>>, state: ConnState, queue: VecDeque>>, auth: String, // rdy: u32, } impl Default for Producer { fn default() -> Producer { Producer { topic: String::new(), channel: String::new(), addr: String::new(), config: Config::default(), backoff: ExponentialBackoff::default(), cell: None, state: ConnState::Neg, queue: VecDeque::new(), auth: String::new(), // rdy: 0, } } } impl Producer { pub fn new>( topic: S, channel: S, addr: S, config: Option, auth: S, // rdy: Option, ) -> Producer { let mut backoff = ExponentialBackoff::default(); let mut _rdy = 0; //if rdy.is_some() { _rdy = rdy.unwrap() }; let cfg = match config { Some(cfg) => cfg, None => Config::default(), }; backoff.max_elapsed_time = None; Producer { addr: addr.into(), config: cfg, backoff, cell: None, topic: topic.into(), channel: channel.into(), state: ConnState::Neg, queue: VecDeque::new(), auth: auth.into(), // rdy: _rdy, } } } impl Actor for Producer { type Context = Context; fn started(&mut self, ctx: &mut Context) { Resolver::from_registry() .send(Connect::host(self.addr.as_str())) .into_actor(self) .map(|res, act, ctx| match res { Ok(stream) => { info!("Connected to nsqd: {}", act.addr); let (r, w) = stream.split(); // write connection let mut framed = actix::io::FramedWrite::new(w, NsqCodec{}, ctx); let mut rx = FramedRead::new(r, NsqCodec{}); // send magic version framed.write(Cmd::Magic(VERSION)); // send configuration to nsqd let json = match serde_json::to_string(&act.config) { Ok(s) => s, Err(e) => { error!("Config cannot be formatted as json string: {}", e); return ctx.stop(); } }; // read connection ctx.add_stream(rx); framed.write(identify(json)); act.cell = Some(framed); // reset backoff act.backoff.reset(); } Err(err) => { error!("Can not connect to nsqd: {}", err); if let Some(timeout) = act.backoff.next_backoff() { ctx.run_later(timeout, |_, ctx| ctx.stop()); } } }) .map_err(|err, act, ctx| { error!("Can not connect to nsqd: {}", err); if let Some(timeout) = act.backoff.next_backoff() { ctx.run_later(timeout, |_, ctx| ctx.stop()); } }) .wait(ctx); } } impl actix::io::WriteHandler for Producer { fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running { error!("nsqd connection dropped: {} error: {}", self.addr, err); Running::Stop } } // TODO: Implement error impl StreamHandler for Producer { fn error(&mut self, err: Error, _ctx: &mut Self::Context) -> Running { match err { Error::Remote(err) => { if let Some(tx) = self.queue.pop_front() { let _ = tx.send(Err(Error::Remote(err))); } return Running::Continue }, _ => { error!("Something goes wrong decoding message"); }, }; Running::Stop } fn handle(&mut self, msg: Cmd, ctx: &mut Self::Context) { match msg { Cmd::Heartbeat => { debug!("received heartbeat"); if let Some(ref mut cell) = self.cell { cell.write(nop()); } else { error!("Nsqd connection dropped. trying reconnecting"); ctx.stop(); } } Cmd::Response(s) => { match self.state { ConnState::Neg => { let config: NsqdConfig = match serde_json::from_str(s.as_str()) { Ok(s) => s, Err(err) => { error!("Negotiating json response invalid: {:?}", err); return ctx.stop(); } }; debug!("json response: {:?}", config); if config.auth_required { ctx.notify(Auth); } else { //ctx.notify(Sub); self.state = ConnState::Started; } }, ConnState::Sub => { ctx.notify(Sub); }, ConnState::Ready => { debug!("sub response: {}", s); ctx.notify(Ready(0)); } _ => { debug!("response: {}", s); if let Some(tx) = self.queue.pop_front() { let _ = tx.send(Ok(Cmd::Response(s))); } }, } }, _ => {}, } } } impl Handler for Producer { type Result = (); fn handle(&mut self, _msg: Auth, ctx: &mut Self::Context) { if let Some(ref mut cell) = self.cell { cell.write(auth(self.auth.clone())); } else { error!("Unable to identify nsqd connection dropped"); ctx.stop(); } self.state = ConnState::Sub; } } impl Handler for Producer { type Result = (); fn handle(&mut self, _msg: Sub, _ctx: &mut Self::Context) { if let Some(ref mut cell) = self.cell { let topic = self.topic.clone(); let channel = self.channel.clone(); cell.write(sub(topic.as_str(), channel.as_str())); } self.state = ConnState::Ready } } impl Handler for Producer { type Result = (); fn handle(&mut self, msg: Ready, _ctx: &mut Self::Context) { if let Some(ref mut cell) = self.cell { cell.write(rdy(msg.0)); } if self.state != ConnState::Started { self.state = ConnState::Started } } } impl Handler for Producer { type Result = ResponseFuture; fn handle(&mut self, msg: Pub, _ctx: &mut Self::Context) -> Self::Result { let (tx, rx) = oneshot::channel(); if let Some(ref mut cell) = self.cell { self.queue.push_back(tx); let topic = self.topic.clone(); let m = &msg.0.clone(); println!("publish: {}", m); cell.write(publish(topic.as_str(), &msg.0)); } else { let _ = tx.send(Err(Error::NotConnected)); } Box::new(rx.map_err(|_| Error::Disconnected).and_then(|res| res)) } } impl Supervised for Producer { fn restarting(&mut self, _: &mut Self::Context) {} }