use std::{ fs::{create_dir_all, remove_dir_all}, path::PathBuf, time::Duration, }; use bevy::{ app::AppExit, input::{ gamepad::{ GamepadAxisChangedEvent, GamepadButtonChangedEvent, GamepadConnection, GamepadConnectionEvent, GamepadEvent, GamepadInfo, }, mouse::{MouseMotion, MouseWheel}, InputSystem, }, prelude::*, render::view::screenshot::ScreenshotManager, utils::HashSet, window::PrimaryWindow, }; use crate::{PlaybackTestingOptions, TestWrangler}; use super::{artefact_paths::ArtefactPaths, StartTime, TestQuitEvent, TestScript, UserInput}; #[derive(Debug, Clone, Copy, Event)] struct StartAsserting; pub(crate) struct PlaybackPlugin { pub(crate) script: TestScript, pub(crate) artefact_path: PathBuf, } impl Plugin for PlaybackPlugin { fn build(&self, app: &mut App) { // This is a bit wonky, as it depends on the order the plugins get added let running_headless = app .world .query::<&PrimaryWindow>() .iter(&app.world) .next() .is_none(); app.insert_resource(self.script.clone()) .add_systems( PreUpdate, (connect_pads, script_player).chain().after(InputSystem), ) .insert_resource(ArtefactPaths { base: self.artefact_path.clone(), running_headless, }) .add_event::() .add_event::() .add_systems( Update, ( create_artefact_dir.run_if(on_event::()), pre_assert_screenshot.run_if(on_event::()), post_assert_screenshot.run_if(on_event::()), run_asserts, delayed_exit, ) .chain(), ); } } // The point of this is to fake that the pads being used by the inputs are connected. fn connect_pads( script: Res, mut events: EventWriter, mut done: Local, ) { if *done { return; } for pad in script .events .iter() .filter_map(|(_, input)| match input { UserInput::ControllerAxisChange(axis, _) => Some(axis.gamepad), UserInput::ControllerButtonPress(button) => Some(button.gamepad), UserInput::ControllerButtonRelease(button) => Some(button.gamepad), _ => None, }) .collect::>() .into_iter() { events.send(GamepadEvent::Connection(GamepadConnectionEvent { gamepad: pad, connection: GamepadConnection::Connected(GamepadInfo { name: "Test Pad".to_string(), // TODO: Save the name in the script if it turns out to be relevant }), })); } *done = true; } #[allow(clippy::too_many_arguments)] fn script_player( mut last_run: Local, time: Res>, mut window_query: Query<&mut Window, With>, script: Res, mut quit_events: EventWriter, mut kb_input: ResMut>, mut mouse_buttons: ResMut>, mut pad_buttons: ResMut>, mut axis: ResMut>, mut mouse_scroll: EventWriter, mut mouse_movements: EventWriter, first_update: Option>, mut gamepad_event_writer: EventWriter, ) { let Some(start_time) = first_update else { return; }; for ev in script .events .iter() .skip_while(|(event_time, _)| *event_time + start_time.0 <= *last_run) .take_while(|(event_time, _)| *event_time + start_time.0 <= time.elapsed()) .map(|(_, input)| input) { match ev { UserInput::KeyPress(key) => kb_input.press(*key), UserInput::KeyRelese(key) => kb_input.release(*key), UserInput::MouseButtonPress(button) => mouse_buttons.press(*button), UserInput::MouseButtonRelease(button) => mouse_buttons.release(*button), UserInput::ControllerButtonPress(button) => { pad_buttons.press(*button); gamepad_event_writer.send(GamepadEvent::Button(GamepadButtonChangedEvent { value: 1.0, button_type: button.button_type, gamepad: button.gamepad, })); } UserInput::ControllerButtonRelease(button) => { pad_buttons.release(*button); gamepad_event_writer.send(GamepadEvent::Button(GamepadButtonChangedEvent { value: 0.0, button_type: button.button_type, gamepad: button.gamepad, })); } UserInput::ControllerAxisChange(key, value) => { axis.set(*key, *value); gamepad_event_writer.send(GamepadEvent::Axis(GamepadAxisChangedEvent { gamepad: key.gamepad, value: *value, axis_type: key.axis_type, })); } UserInput::MouseScroll(scroll) => { mouse_scroll.send(*scroll); } UserInput::MouseMove(delta, position) => { mouse_movements.send(MouseMotion { delta: *delta }); if let Ok(ref mut window) = window_query.get_single_mut() { window.set_cursor_position(*position); } } UserInput::Quit => { quit_events.send(StartAsserting); } } } *last_run = time.elapsed(); } fn create_artefact_dir(path: Res, mut has_ran: Local) { if *has_ran { return; } if path.base.exists() { remove_dir_all(path.base.clone()).unwrap(); } create_dir_all(path.base.clone()).unwrap(); *has_ran = true; } fn pre_assert_screenshot( main_window: Query>, mut screenshot_manager: ResMut, path: Res, mut has_ran: Local, ) { if *has_ran { return; } if let Ok(win) = main_window.get_single() { screenshot_manager .save_screenshot_to_disk(win, path.pre_assert_screenshot()) .unwrap(); } *has_ran = true; } fn post_assert_screenshot( main_window: Query>, mut screenshot_manager: ResMut, path: Res, mut has_ran: Local, ) { if *has_ran { return; } if let Ok(win) = main_window.get_single() { screenshot_manager .save_screenshot_to_disk(win, path.post_assert_screenshot()) .unwrap(); } *has_ran = true; } fn run_asserts( mut start_events: EventReader, mut result_writer: EventWriter, time: Res>, asserter: Res, options: Res, mut started: Local>, ) { if let Some(ref mut start_time) = *started { if asserter.outcome == Some(true) { result_writer.send(TestQuitEvent(true)); *started = None; } else if asserter.outcome == Some(false) || start_time.tick(time.delta()).just_finished() { result_writer.send(TestQuitEvent(false)); *started = None; } } else if start_events.read().next().is_some() { *started = Some(Timer::from_seconds(options.assert_window, TimerMode::Once)); } } #[allow(clippy::too_many_arguments)] fn delayed_exit( mut quit_events: ResMut>, mut custom_quit_events: EventReader, mut result: Local>, artefacts: Res, ) { if let Some(passed) = *result { if artefacts.saved() { if passed { println!("Test passed"); quit_events.send(AppExit); } else { // TODO: figure out a nicer way to fail the test panic!("Test failed"); } } } else if !custom_quit_events.is_empty() { *result = Some(custom_quit_events.read().next().unwrap().0); } }