use barter::{ data::historical, engine::{trader::Trader, Engine}, event::EventTx, execution::{ simulated::{Config as ExecutionConfig, SimulatedExecution}, Fees, }, portfolio::{ allocator::DefaultAllocator, portfolio::MetaPortfolio, repository::in_memory::InMemoryRepository, risk::DefaultRisk, }, statistic::summary::{ trading::{Config as StatisticConfig, TradingSummary}, Initialiser, }, strategy::example::{Config as StrategyConfig, RSIStrategy}, test_util::market_event_trade, }; use barter_integration::model::{instrument::kind::InstrumentKind, Market, Side}; use parking_lot::Mutex; use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::mpsc; use uuid::Uuid; #[tokio::test] async fn engine_with_historic_data_stops_after_candles_finished() { // Create channel to distribute Commands to the Engine & it's Traders (eg/ Command::Terminate) let (_command_tx, command_rx) = mpsc::channel(20); // Create Event channel to listen to all Engine Events in real-time let (event_tx, _event_rx) = mpsc::unbounded_channel(); let event_tx = EventTx::new(event_tx); // Generate unique identifier to associate an Engine's components let engine_id = Uuid::new_v4(); // Create the Market(s) to be traded on (1-to-1 relationship with a Trader) let market = Market::new("binance", ("btc", "usdt", InstrumentKind::Spot)); // Build global shared-state MetaPortfolio (1-to-1 relationship with an Engine) let portfolio = Arc::new(Mutex::new( MetaPortfolio::builder() .engine_id(engine_id) .markets(vec![market.clone()]) .starting_cash(10_000.0) .repository(InMemoryRepository::new()) .allocation_manager(DefaultAllocator { default_order_value: 100.0, }) .risk_manager(DefaultRisk {}) .statistic_config(StatisticConfig { starting_equity: 10_000.0, trading_days_per_year: 365, risk_free_return: 0.0, }) .build_and_init() .expect("failed to build & initialise MetaPortfolio"), )); // Build Trader(s) let mut traders = Vec::new(); // Create channel for each Trader so the Engine can distribute Commands to it let (trader_command_tx, trader_command_rx) = mpsc::channel(10); traders.push( Trader::builder() .engine_id(engine_id) .market(market.clone()) .command_rx(trader_command_rx) .event_tx(event_tx.clone()) .portfolio(Arc::clone(&portfolio)) .data(historical::MarketFeed::new( [market_event_trade(Side::Buy)].into_iter(), )) .strategy(RSIStrategy::new(StrategyConfig { rsi_period: 14 })) .execution(SimulatedExecution::new(ExecutionConfig { simulated_fees_pct: Fees { exchange: 0.1, slippage: 0.05, network: 0.0, }, })) .build() .expect("failed to build trader"), ); // Build Engine (1-to-many relationship with Traders) // Create HashMap so Engine can route Commands to Traders let trader_command_txs = HashMap::from_iter([(market, trader_command_tx)]); let engine = Engine::builder() .engine_id(engine_id) .command_rx(command_rx) .portfolio(portfolio) .traders(traders) .trader_command_txs(trader_command_txs) .statistics_summary(TradingSummary::init(StatisticConfig { starting_equity: 1000.0, trading_days_per_year: 365, risk_free_return: 0.0, })) .build() .expect("failed to build engine"); // Run Engine trading with timeout: // If timeout before engine stops, Engine command_rx.await is incorrectly blocking the // Engine from stopping even though the Traders have no more historical data to process let timeout = Duration::from_millis(10); let engine_run_future = engine.run(); let actual = tokio::time::timeout(timeout, engine_run_future).await; assert!( actual.is_ok(), "failed because Engine's command_rx.await is blocking the Engine from stopping" ) }