use barter::{ data::live, engine::{trader::Trader, Engine}, event::{Event, EventTx}, execution::{ simulated::{Config as ExecutionConfig, SimulatedExecution}, Fees, }, portfolio::{ allocator::DefaultAllocator, portfolio::MetaPortfolio, repository::in_memory::InMemoryRepository, risk::DefaultRisk, }, statistic::summary::{ trading::{Config as StatisticConfig, TradingSummary}, Initialiser, }, strategy::example::{Config as StrategyConfig, RSIStrategy}, }; use barter_data::event::MarketEvent; use barter_data::streams::consumer::MarketStreamEvent; use barter_data::streams::reconnect::stream::ReconnectingStream; use barter_data::{ event::DataKind, exchange::binance::spot::BinanceSpot, streams::Streams, subscription::trade::PublicTrades, }; use barter_integration::model::{ instrument::{kind::InstrumentKind, Instrument}, Market, }; use futures::Stream; use parking_lot::Mutex; use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::mpsc; use tokio_stream::StreamExt; use tracing::warn; use uuid::Uuid; const ENGINE_RUN_TIMEOUT: Duration = Duration::from_secs(5); #[tokio::main] async fn main() { // Create channel to distribute Commands to the Engine & it's Traders (eg/ Command::Terminate) let (_command_tx, command_rx) = mpsc::channel(20); // Create Event channel to listen to all Engine Events in real-time let (event_tx, event_rx) = mpsc::unbounded_channel(); let event_tx = EventTx::new(event_tx); // Generate unique identifier to associate an Engine's components let engine_id = Uuid::new_v4(); // Create the Market(s) to be traded on (1-to-1 relationship with a Trader) let market = Market::new("binance", ("btc", "usdt", InstrumentKind::Spot)); // Build global shared-state MetaPortfolio (1-to-1 relationship with an Engine) let portfolio = Arc::new(Mutex::new( MetaPortfolio::builder() .engine_id(engine_id) .markets(vec![market.clone()]) .starting_cash(10_000.0) .repository(InMemoryRepository::new()) .allocation_manager(DefaultAllocator { default_order_value: 100.0, }) .risk_manager(DefaultRisk {}) .statistic_config(StatisticConfig { starting_equity: 10_000.0, trading_days_per_year: 365, risk_free_return: 0.0, }) .build_and_init() .expect("failed to build & initialise MetaPortfolio"), )); // Build Trader(s) let mut traders = Vec::new(); // Create channel for each Trader so the Engine can distribute Commands to it let (trader_command_tx, trader_command_rx) = mpsc::channel(10); traders.push( Trader::builder() .engine_id(engine_id) .market(market.clone()) .command_rx(trader_command_rx) .event_tx(event_tx.clone()) .portfolio(Arc::clone(&portfolio)) .data(live::ReconnectingMarketFeed::new( stream_market_event_trades().await, )) .strategy(RSIStrategy::new(StrategyConfig { rsi_period: 14 })) .execution(SimulatedExecution::new(ExecutionConfig { simulated_fees_pct: Fees { exchange: 0.1, slippage: 0.05, network: 0.0, }, })) .build() .expect("failed to build trader"), ); // Build Engine (1-to-many relationship with Traders) // Create HashMap so Engine can route Commands to Traders let trader_command_txs = HashMap::from([(market, trader_command_tx)]); let engine = Engine::builder() .engine_id(engine_id) .command_rx(command_rx) .portfolio(portfolio) .traders(traders) .trader_command_txs(trader_command_txs) .statistics_summary(TradingSummary::init(StatisticConfig { starting_equity: 1000.0, trading_days_per_year: 365, risk_free_return: 0.0, })) .build() .expect("failed to build engine"); // Run Engine trading & listen to Events it produces tokio::spawn(listen_to_engine_events(event_rx)); let _ = tokio::time::timeout(ENGINE_RUN_TIMEOUT, engine.run()).await; } async fn stream_market_event_trades() -> impl Stream> { // Initialise PublicTrades Streams for BinanceSpot // '--> each call to StreamBuilder::subscribe() creates a separate WebSocket connection let streams = Streams::::builder() // Separate WebSocket connection for BTC_USDT stream since it's very high volume .subscribe([( BinanceSpot::default(), "btc", "usdt", InstrumentKind::Spot, PublicTrades, )]) // Separate WebSocket connection for ETH_USDT stream since it's very high volume .subscribe([( BinanceSpot::default(), "eth", "usdt", InstrumentKind::Spot, PublicTrades, )]) .init() .await .unwrap(); // Select and merge every exchange Stream using futures_util::stream::select_all // Note: use `Streams.select(ExchangeId)` to interact with individual exchange streams! streams .select_all() .with_error_handler(|error| warn!(?error, "MarketStream generated error")) .map(|event| event.map(MarketEvent::from)) } // Listen to Events that occur in the Engine. These can be used for updating event-sourcing, // updating dashboard, etc etc. async fn listen_to_engine_events(mut event_rx: mpsc::UnboundedReceiver) { while let Some(event) = event_rx.recv().await { match event { Event::Market(market) => { // Market Event occurred in Engine println!("{market:?}"); } Event::Signal(signal) => { // Signal Event occurred in Engine println!("{signal:?}"); } Event::SignalForceExit(_) => { // SignalForceExit Event occurred in Engine } Event::OrderNew(new_order) => { // OrderNew Event occurred in Engine println!("{new_order:?}"); } Event::OrderUpdate => { // OrderUpdate Event occurred in Engine } Event::Fill(fill_event) => { // Fill Event occurred in Engine println!("{fill_event:?}"); } Event::PositionNew(new_position) => { // PositionNew Event occurred in Engine println!("{new_position:?}"); } Event::PositionUpdate(updated_position) => { // PositionUpdate Event occurred in Engine println!("{updated_position:?}"); } Event::PositionExit(exited_position) => { // PositionExit Event occurred in Engine println!("{exited_position:?}"); } Event::Balance(balance_update) => { // Balance update Event occurred in Engine println!("{balance_update:?}"); } } } }