use crate::task::JPollResult; use ::jni::{ errors::{Error, Result}, objects::{GlobalRef, JMethodID, JObject, JClass}, signature::JavaType, JNIEnv, JavaVM, }; use static_assertions::assert_impl_all; use std::{ convert::TryFrom, future::Future, pin::Pin, task::{Context, Poll}, }; /// Wrapper for [`JObject`]s that implement /// `io.github.gedgygedgy.rust.future.Future`. Implements /// [`Future`](std::future::Future) to allow asynchronous Rust code to wait for /// a result from Java code. /// /// Looks up the class and method IDs on creation rather than for every method /// call. /// /// For a [`Send`] version of this, use [`JSendFuture`]. pub struct JFuture<'a: 'b, 'b> { internal: JObject<'a>, poll: JMethodID<'a>, env: &'b JNIEnv<'a>, } impl<'a: 'b, 'b> JFuture<'a, 'b> { /// Create a [`JFuture`] from the environment and an object. This looks up /// the necessary class and method IDs to call all of the methods on it so /// that extra work doesn't need to be done on every method call. /// /// # Arguments /// /// * `env` - Java environment to use. /// * `obj` - Object to wrap. pub fn from_env(env: &'b JNIEnv<'a>, obj: JObject<'a>) -> Result { let poll = env.get_method_id( JClass::from(crate::classcache::get_class("io/github/gedgygedgy/rust/future/Future").unwrap().as_obj()), "poll", "(Lio/github/gedgygedgy/rust/task/Waker;)Lio/github/gedgygedgy/rust/task/PollResult;", )?; Ok(Self { internal: obj, poll, env, }) } /// Get the `io.github.gedgygedgy.rust.task.PollResult` from this future. /// Returns `null` if the future is not ready yet. /// /// # Arguments /// /// * `waker` - Waker object to wake later on if the result is not ready. pub fn poll(&self, waker: JObject<'a>) -> Result> { let result = self .env .call_method_unchecked( self.internal, self.poll, JavaType::Object("io/github/gedgygedgy/rust/task/PollResult".into()), &[waker.into()], )? .l()?; JPollResult::from_env(self.env, result) } /// Turn the [`JFuture`] into a [`Future`] that can be `await`ed on. pub fn into_future(self) -> JFutureIntoFuture<'a, 'b> { JFutureIntoFuture(self) } } impl<'a: 'b, 'b> ::std::ops::Deref for JFuture<'a, 'b> { type Target = JObject<'a>; fn deref(&self) -> &Self::Target { &self.internal } } impl<'a: 'b, 'b> From> for JObject<'a> { fn from(other: JFuture<'a, 'b>) -> JObject<'a> { other.internal } } /// Result of calling [`JFuture::into_future`]. This object can be `await`ed /// to get a [`JPollResult`]. pub struct JFutureIntoFuture<'a: 'b, 'b>(JFuture<'a, 'b>); impl<'a: 'b, 'b> JFutureIntoFuture<'a, 'b> { // Switch the Result and Poll return value to make this easier to implement using ?. fn poll_internal(&self, context: &mut Context<'_>) -> Result>> { use crate::task::waker; let result = self.0.poll(waker(self.0.env, context.waker().clone())?)?; Ok( if self.0.env.is_same_object(result.clone(), JObject::null())? { Poll::Pending } else { Poll::Ready(result) }, ) } } impl<'a: 'b, 'b> Future for JFutureIntoFuture<'a, 'b> { type Output = Result>; fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll { match self.poll_internal(context) { Ok(Poll::Ready(result)) => Poll::Ready(Ok(result)), Ok(Poll::Pending) => Poll::Pending, Err(err) => Poll::Ready(Err(err)), } } } impl<'a: 'b, 'b> From> for JFuture<'a, 'b> { fn from(fut: JFutureIntoFuture<'a, 'b>) -> Self { fut.0 } } impl<'a: 'b, 'b> std::ops::Deref for JFutureIntoFuture<'a, 'b> { type Target = JFuture<'a, 'b>; fn deref(&self) -> &Self::Target { &self.0 } } /// [`Send`] version of [`JFuture`]. Instead of storing a [`JNIEnv`], it stores /// a [`JavaVM`] and calls [`JavaVM::get_env`] when [`Future::poll`] is called. pub struct JSendFuture { internal: GlobalRef, vm: JavaVM, } impl<'a: 'b, 'b> TryFrom> for JSendFuture { type Error = Error; fn try_from(future: JFuture<'a, 'b>) -> Result { Ok(Self { internal: future.env.new_global_ref(future.internal)?, vm: future.env.get_java_vm()?, }) } } impl ::std::ops::Deref for JSendFuture { type Target = GlobalRef; fn deref(&self) -> &Self::Target { &self.internal } } impl JSendFuture { fn poll_internal(&self, context: &mut Context<'_>) -> Result>> { let env = self.vm.get_env()?; let jfuture = JFuture::from_env(&env, self.internal.as_obj())?.into_future(); jfuture .poll_internal(context) .map(|result| result.map(|result| Ok(env.new_global_ref(result)?))) } } impl Future for JSendFuture { type Output = Result; fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll { match self.poll_internal(context) { Ok(result) => result, Err(err) => Poll::Ready(Err(err)), } } } assert_impl_all!(JSendFuture: Send); #[cfg(test)] mod test { use super::{JFuture, JSendFuture}; use crate::{task::JPollResult, test_utils}; use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; #[test] fn test_jfuture() { use std::sync::Arc; test_utils::JVM_ENV.with(|env| { let data = Arc::new(test_utils::TestWakerData::new()); assert_eq!(Arc::strong_count(&data), 1); assert_eq!(data.value(), false); let waker = test_utils::test_waker(&data); assert_eq!(Arc::strong_count(&data), 2); assert_eq!(data.value(), false); let future_obj = env .new_object("io/github/gedgygedgy/rust/future/SimpleFuture", "()V", &[]) .unwrap(); let mut future = JFuture::from_env(env, future_obj).unwrap().into_future(); assert!( Future::poll(Pin::new(&mut future), &mut Context::from_waker(&waker)).is_pending() ); assert_eq!(Arc::strong_count(&data), 3); assert_eq!(data.value(), false); assert!( Future::poll(Pin::new(&mut future), &mut Context::from_waker(&waker)).is_pending() ); assert_eq!(Arc::strong_count(&data), 3); assert_eq!(data.value(), false); let obj = env.new_object("java/lang/Object", "()V", &[]).unwrap(); env.call_method(future_obj, "wake", "(Ljava/lang/Object;)V", &[obj.into()]) .unwrap(); assert_eq!(Arc::strong_count(&data), 2); assert_eq!(data.value(), true); let poll = Future::poll(Pin::new(&mut future), &mut Context::from_waker(&waker)); if let Poll::Ready(result) = poll { assert!(env .is_same_object(result.unwrap().get().unwrap(), obj) .unwrap()); } else { panic!("Poll result should be ready"); } assert_eq!(Arc::strong_count(&data), 2); assert_eq!(data.value(), true); let poll = Future::poll(Pin::new(&mut future), &mut Context::from_waker(&waker)); if let Poll::Ready(result) = poll { assert!(env .is_same_object(result.unwrap().get().unwrap(), obj) .unwrap()); } else { panic!("Poll result should be ready"); } assert_eq!(Arc::strong_count(&data), 2); assert_eq!(data.value(), true); }); } #[test] fn test_jfuture_await() { use futures::{executor::block_on, join}; test_utils::JVM_ENV.with(|env| { let future_obj = env .new_object("io/github/gedgygedgy/rust/future/SimpleFuture", "()V", &[]) .unwrap(); let future = JFuture::from_env(env, future_obj).unwrap(); let obj = env.new_object("java/lang/Object", "()V", &[]).unwrap(); block_on(async { join!( async { env.call_method(future_obj, "wake", "(Ljava/lang/Object;)V", &[obj.into()]) .unwrap(); }, async { assert!(env .is_same_object(future.into_future().await.unwrap().get().unwrap(), obj) .unwrap()); } ); }); }); } #[test] fn test_jfuture_await_throw() { use futures::{executor::block_on, join}; test_utils::JVM_ENV.with(|env| { let future_obj = env .new_object("io/github/gedgygedgy/rust/future/SimpleFuture", "()V", &[]) .unwrap(); let future = JFuture::from_env(env, future_obj).unwrap(); let ex = env.new_object("java/lang/Exception", "()V", &[]).unwrap(); block_on(async { join!( async { env.call_method( future_obj, "wakeWithThrowable", "(Ljava/lang/Throwable;)V", &[ex.into()], ) .unwrap(); }, async { future.into_future().await.unwrap().get().unwrap_err(); let future_ex = env.exception_occurred().unwrap(); env.exception_clear().unwrap(); let actual_ex = env .call_method(future_ex, "getCause", "()Ljava/lang/Throwable;", &[]) .unwrap() .l() .unwrap(); assert!(env.is_same_object(actual_ex, ex).unwrap()); } ); }); }); } #[test] fn test_jsendfuture_await() { use futures::{executor::block_on, join}; use std::convert::TryInto; test_utils::JVM_ENV.with(|env| { let future_obj = env .new_object("io/github/gedgygedgy/rust/future/SimpleFuture", "()V", &[]) .unwrap(); let future = JFuture::from_env(env, future_obj).unwrap(); let future: JSendFuture = future.try_into().unwrap(); let obj = env.new_object("java/lang/Object", "()V", &[]).unwrap(); block_on(async { join!( async { env.call_method(future_obj, "wake", "(Ljava/lang/Object;)V", &[obj.into()]) .unwrap(); }, async { let global_ref = future.await.unwrap(); let jpoll = JPollResult::from_env(env, global_ref.as_obj()).unwrap(); assert!(env.is_same_object(jpoll.get().unwrap(), obj).unwrap()); } ); }); }); } }