// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause #![cfg(feature = "remote_endpoint")] use std::any::Any; use std::result; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use std::thread; use std::time::Duration; use event_manager::utilities::subscribers::CounterSubscriber; use event_manager::{self, EventManager, MutEventSubscriber, SubscriberId}; trait GenericSubscriber: MutEventSubscriber { fn as_mut_any(&mut self) -> &mut dyn Any; } impl GenericSubscriber for CounterSubscriber { fn as_mut_any(&mut self) -> &mut dyn Any { self } } // This test showcases the use of the three `RemoteEndpoint` methods: `call_blocking`, `fire`, // and `kick`. The setting is we're moving an `EventManager` to a separate thread, and we want // to register subscribers that remain fully owned by the manager (so, for example, it's not // necessary to wrap them in a `Mutex` if event handling needs mutable access to their inner // state). We also use the `GenericSubscriber` trait defined above to show how we can still // obtain and interact with references to the actual types when event subscribers are registered // as trait objects. #[test] fn test_endpoint() { let mut event_manager = EventManager::>::new().unwrap(); let endpoint = event_manager.remote_endpoint(); let sub = Box::::default(); let run_count = Arc::new(AtomicU64::new(0)); let keep_running = Arc::new(AtomicBool::new(true)); // These get moved into the following closure. let run_count_clone = run_count.clone(); let keep_running_clone = keep_running.clone(); // We spawn the thread that runs the event loop. let thread_handle = thread::spawn(move || { loop { // The manager gets moved to the new thread here. event_manager.run().unwrap(); // Increment this counter every time the `run` method call above returns. Ordering // is not that important here because the other thread only reads the value. run_count_clone.fetch_add(1, Ordering::Relaxed); // When `keep_running` is no longer `true`, we break the loop and the // thread will exit. if !keep_running_clone.load(Ordering::Acquire) { break; } } }); // We're back to the main program thread from now on. // `run_count` should not change for now as there's no possible activity for the manager. thread::sleep(Duration::from_millis(100)); assert_eq!(run_count.load(Ordering::Relaxed), 0); // Use `call_blocking` to register a subscriber (and move it to the event loop thread under // the ownership of the manager). `call_block` also allows us to inspect the result of the // operation, but it blocks waiting for it and cannot be invoked from the same thread as // the event loop. let sub_id = endpoint .call_blocking( |sub_ops| -> result::Result { Ok(sub_ops.add_subscriber(sub)) }, ) .unwrap(); // We've added a subscriber. No subscriber events are fired yet, but the manager run // loop went through one iteration when the endpoint message was received, so `run_count` // has benn incremented. thread::sleep(Duration::from_millis(100)); assert_eq!(run_count.load(Ordering::Relaxed), 1); // Now let's activate the subscriber event. It's going to generate continuous activity until // we explicitly clear it. We use `endpoint` to interact with the subscriber, because the // latter is fully owned by the manager. Also, we make use of the `as_mut_any` method // from our `GenericSubscriber` trait to get a reference to the actual subscriber type // (which has been erased as a trait object from the manager's perspective). We use `fire` // here, so we don't get a result. // // `fire` can also be used from the same thread as the `event_manager` runs on without causing // a deadlock, because it doesn't get a result from the closure. For example, we can pass an // endpoint to a subscriber, and use `fire` as part of `process` if it's helpful for that // particular use case. // // Not getting a result from the closure means we have to deal with error conditions within. // We use `unwrap` here, but that's ok because if the subscriber associated with `sub_id` is // not present, then we have a serious error in our program logic. endpoint .fire(move |sub_ops| { let sub = sub_ops.subscriber_mut(sub_id).unwrap(); // The following `unwrap` cannot fail because we know the type is `CounterSubscriber`. sub.as_mut_any() .downcast_mut::() .unwrap() .trigger_event() }) .unwrap(); // The event will start triggering at this point, so `run_count` will increase. thread::sleep(Duration::from_millis(100)); assert!(run_count.load(Ordering::Relaxed) > 1); // Let's clear the subscriber event. Using `fire` again. endpoint .fire(move |sub_ops| { let sub = sub_ops.subscriber_mut(sub_id).unwrap(); // The following `unwrap` cannot fail because we know the actual type // is `CounterSubscriber`. sub.as_mut_any() .downcast_mut::() .unwrap() .clear_event() }) .unwrap(); // We wait a bit more. The manager will be once more become blocked waiting for events. thread::sleep(Duration::from_millis(100)); keep_running.store(false, Ordering::Release); // Trying to `join` the manager here would lead to a deadlock, because it will never read // the value of `keep_running` to break the loop while stuck waiting for events. We use the // `kick` endpoint method to force `EventManager::run()` to return. endpoint.kick().unwrap(); // We can `join` the manager thread and finalize now. thread_handle.join().unwrap(); }