#![allow(dead_code)] #![allow(unused_imports)] #![allow(unused_variables)] use std::{ io, sync::{Arc, Mutex}, }; use log::info; use wampire::{ client::{Client, Connection, Subscription}, MatchingPolicy, Value, URI, }; enum Command { Sub, Pub, Unsub, List, Help, Quit, NoOp, Invalid(String), } fn print_prompt() { println!("Enter a command (or type \"help\")"); } fn get_input_from_user() -> String { let mut input = String::new(); io::stdin().read_line(&mut input).unwrap(); input } fn process_input(input: &str) -> (Command, Vec) { let mut i_iter = input.splitn(2, ' '); let command = match i_iter.next() { Some(command) => command.trim().to_lowercase(), None => return (Command::NoOp, Vec::new()), }; let command = match command.as_str() { "pub" => Command::Pub, "sub" => Command::Sub, "unsub" => Command::Unsub, "list" => Command::List, "help" => Command::Help, "quit" => Command::Quit, "" => Command::NoOp, x => Command::Invalid(x.to_string()), }; let args = match i_iter.next() { Some(args_string) => args_string .split(',') .map(|s| s.trim().to_string()) .collect(), None => Vec::new(), }; (command, args) } async fn subscribe( client: &mut Client, subscriptions: &mut Arc>>, args: &[String], ) { if args.len() > 2 { println!("Too many arguments to subscribe. Ignoring"); } else if args.is_empty() { println!("Please specify the topic to subscribe to"); return; } let topic = args[0].clone(); let policy = if args.len() > 1 { match args[1].as_str() { "prefix" => MatchingPolicy::Prefix, "wild" => MatchingPolicy::Wildcard, "strict" => MatchingPolicy::Strict, _ => { println!("Invalid matching type, should be 'prefix', 'wild' or 'strict'"); return; } } } else { MatchingPolicy::Strict }; let subscriptions = Arc::clone(subscriptions); client .subscribe_with_pattern( URI::new(&topic), Box::new(move |args, kwargs| { println!( "Received message on topic {} with args {:?} and kwargs {:?}", topic, args, kwargs ); }), policy, ) .await .and_then(move |subscription| { println!("Subscribed to topic {}", subscription.topic.uri); subscriptions.lock().unwrap().push(subscription); Ok(()) }) .unwrap(); } async fn unsubscribe( client: &mut Client, subscriptions: &mut Arc>>, args: &[String], ) { if args.len() > 1 { println!("Too many arguments to subscribe. Ignoring"); } else if args.is_empty() { println!("Please specify the topic to subscribe to"); return; } match args[0].parse::() { Ok(i) => { let mut subscriptions = subscriptions.lock().unwrap(); if i >= subscriptions.len() { println!("Invalid subscription index: {}", i); return; } let subscription = subscriptions.remove(i); let topic = subscription.topic.uri.clone(); client .unsubscribe(subscription) .await .and_then(move |()| { println!("Successfully unsubscribed from {}", topic); Ok(()) }) .unwrap(); } Err(_) => { println!("Invalid subscription index: {}", args[0]); } } } fn list(subscriptions: &Arc>>) { let subscriptions = subscriptions.lock().unwrap(); for (index, subscription) in subscriptions.iter().enumerate() { println!("{} {}", index, subscription.topic.uri); } } async fn publish(client: &mut Client, args: &[String]) { if args.is_empty() { println!("Please specify a topic to publish to"); } let uri = &args[0]; let args = args[1..] .iter() .map(|arg| match arg.parse::() { Ok(i) => Value::Integer(i), Err(_) => Value::String(arg.clone()), }) .collect(); client .publish_and_acknowledge(URI::new(uri), Some(args), None) .await .unwrap(); } fn help() { println!("The following commands are supported:"); println!(" sub , ?",); println!(" Subscribes to the topic specified by the uri "); println!(" specifies the type of patten matching used",); println!( " should be one of 'strict' (the default), 'wild' or 'prefix'", ); println!(" pub , *",); println!(" Publishes to the topic specified by uri "); println!(" is an optinal, comma separated list of arguments"); println!(" list"); println!(" Lists all of the current subscriptions, along with their index"); println!(" unsub "); println!(" Unsubscribes from the topic subscription specified by the given index"); println!(" quit"); println!(" Sends a goodbye message and quits the program"); } async fn event_loop(mut client: Client) { let mut subscriptions = Arc::new(Mutex::new(Vec::new())); loop { print_prompt(); let input = get_input_from_user(); let (command, args) = process_input(&input); match command { Command::Sub => subscribe(&mut client, &mut subscriptions, &args).await, Command::Pub => publish(&mut client, &args).await, Command::Unsub => unsubscribe(&mut client, &mut subscriptions, &args).await, Command::List => list(&subscriptions), Command::Help => help(), Command::Quit => break, Command::NoOp => {} Command::Invalid(bad_command) => print!("Invalid command: {}", bad_command), } } client.shutdown().await.unwrap(); } #[tokio::main] async fn main() { env_logger::init(); let connection = Connection::new("ws://127.0.0.1:8090/ws", "wampire_realm"); info!("Connecting"); let client = connection.connect().unwrap(); info!("Connected"); event_loop(client).await }