// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use std::sync::Arc; use crate::args::Flags; use crate::args::JupyterFlags; use crate::cdp; use crate::lsp::ReplCompletionItem; use crate::ops; use crate::tools::repl; use crate::tools::test::create_single_test_event_channel; use crate::tools::test::reporters::PrettyTestReporter; use crate::tools::test::TestEventWorkerSender; use crate::tools::test::TestFailureFormatOptions; use crate::CliFactory; use deno_core::anyhow::bail; use deno_core::anyhow::Context; use deno_core::error::generic_error; use deno_core::error::AnyError; use deno_core::futures::FutureExt; use deno_core::located_script_name; use deno_core::resolve_url_or_path; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::url::Url; use deno_runtime::deno_io::Stdio; use deno_runtime::deno_io::StdioPipe; use deno_runtime::deno_permissions::PermissionsContainer; use deno_runtime::WorkerExecutionMode; use deno_terminal::colors; use jupyter_runtime::jupyter::ConnectionInfo; use jupyter_runtime::messaging::StreamContent; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; mod install; pub mod server; pub async fn kernel( flags: Arc<Flags>, jupyter_flags: JupyterFlags, ) -> Result<(), AnyError> { log::info!( "{} \"deno jupyter\" is unstable and might change in the future.", colors::yellow("Warning"), ); if !jupyter_flags.install && !jupyter_flags.kernel { install::status()?; return Ok(()); } if jupyter_flags.install { install::install()?; return Ok(()); } let connection_filepath = jupyter_flags.conn_file.unwrap(); let factory = CliFactory::from_flags(flags); let cli_options = factory.cli_options()?; let main_module = resolve_url_or_path("./$deno$jupyter.mts", cli_options.initial_cwd()) .unwrap(); // TODO(bartlomieju): should we run with all permissions? let permissions = PermissionsContainer::allow_all(factory.permission_desc_parser()?.clone()); let npm_resolver = factory.npm_resolver().await?.clone(); let resolver = factory.resolver().await?.clone(); let worker_factory = factory.create_cli_main_worker_factory(None).await?; let (stdio_tx, stdio_rx) = mpsc::unbounded_channel(); let conn_file = std::fs::read_to_string(&connection_filepath).with_context(|| { format!("Couldn't read connection file: {:?}", connection_filepath) })?; let spec: ConnectionInfo = serde_json::from_str(&conn_file).with_context(|| { format!( "Connection file is not a valid JSON: {:?}", connection_filepath ) })?; let (worker, test_event_receiver) = create_single_test_event_channel(); let TestEventWorkerSender { sender: test_event_sender, stdout, stderr, } = worker; let mut worker = worker_factory .create_custom_worker( WorkerExecutionMode::Jupyter, main_module.clone(), permissions, vec![ ops::jupyter::deno_jupyter::init_ops(stdio_tx.clone()), ops::testing::deno_test::init_ops(test_event_sender), ], // FIXME(nayeemrmn): Test output capturing currently doesn't work. Stdio { stdin: StdioPipe::inherit(), stdout: StdioPipe::file(stdout), stderr: StdioPipe::file(stderr), }, ) .await?; worker.setup_repl().await?; worker.execute_script_static( located_script_name!(), "Deno[Deno.internal].enableJupyter();", )?; let worker = worker.into_main_worker(); let mut repl_session = repl::ReplSession::initialize( cli_options, npm_resolver, resolver, worker, main_module, test_event_receiver, ) .await?; struct TestWriter(UnboundedSender<StreamContent>); impl std::io::Write for TestWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { self .0 .send(StreamContent::stdout(&String::from_utf8_lossy(buf))) .ok(); Ok(buf.len()) } fn flush(&mut self) -> std::io::Result<()> { Ok(()) } } let cwd_url = Url::from_directory_path(cli_options.initial_cwd()).map_err(|_| { generic_error(format!( "Unable to construct URL from the path of cwd: {}", cli_options.initial_cwd().to_string_lossy(), )) })?; repl_session.set_test_reporter_factory(Box::new(move || { Box::new( PrettyTestReporter::new( false, true, false, true, cwd_url.clone(), TestFailureFormatOptions::default(), ) .with_writer(Box::new(TestWriter(stdio_tx.clone()))), ) })); let (tx1, rx1) = mpsc::unbounded_channel(); let (tx2, rx2) = mpsc::unbounded_channel(); let (startup_data_tx, startup_data_rx) = oneshot::channel::<server::StartupData>(); let mut repl_session_proxy = JupyterReplSession { repl_session, rx: rx1, tx: tx2, }; let repl_session_proxy_channels = JupyterReplProxy { tx: tx1, rx: rx2 }; let join_handle = std::thread::spawn(move || { let fut = server::JupyterServer::start( spec, stdio_rx, repl_session_proxy_channels, startup_data_tx, ) .boxed_local(); deno_runtime::tokio_util::create_and_run_current_thread(fut) }); let Ok(startup_data) = startup_data_rx.await else { bail!("Failed to acquire startup data"); }; { let op_state_rc = repl_session_proxy.repl_session.worker.js_runtime.op_state(); let mut op_state = op_state_rc.borrow_mut(); op_state.put(startup_data.iopub_connection.clone()); op_state.put(startup_data.last_execution_request.clone()); op_state.put(startup_data.stdin_connection_proxy.clone()); } repl_session_proxy.start().await; let server_result = join_handle.join(); match server_result { Ok(result) => { result?; } Err(e) => { bail!("Jupyter kernel error: {:?}", e); } }; Ok(()) } pub enum JupyterReplRequest { LspCompletions { line_text: String, position: usize, }, JsGetProperties { object_id: String, }, JsEvaluate { expr: String, }, JsGlobalLexicalScopeNames, JsEvaluateLineWithObjectWrapping { line: String, }, JsCallFunctionOnArgs { function_declaration: String, args: Vec<cdp::RemoteObject>, }, JsCallFunctionOn { arg0: cdp::CallArgument, arg1: cdp::CallArgument, }, } pub enum JupyterReplResponse { LspCompletions(Vec<ReplCompletionItem>), JsGetProperties(Option<cdp::GetPropertiesResponse>), JsEvaluate(Option<cdp::EvaluateResponse>), JsGlobalLexicalScopeNames(cdp::GlobalLexicalScopeNamesResponse), JsEvaluateLineWithObjectWrapping(Result<repl::TsEvaluateResponse, AnyError>), JsCallFunctionOnArgs(Result<cdp::CallFunctionOnResponse, AnyError>), JsCallFunctionOn(Option<cdp::CallFunctionOnResponse>), } pub struct JupyterReplProxy { tx: mpsc::UnboundedSender<JupyterReplRequest>, rx: mpsc::UnboundedReceiver<JupyterReplResponse>, } impl JupyterReplProxy { pub async fn lsp_completions( &mut self, line_text: String, position: usize, ) -> Vec<ReplCompletionItem> { let _ = self.tx.send(JupyterReplRequest::LspCompletions { line_text, position, }); let Some(JupyterReplResponse::LspCompletions(resp)) = self.rx.recv().await else { unreachable!() }; resp } pub async fn get_properties( &mut self, object_id: String, ) -> Option<cdp::GetPropertiesResponse> { let _ = self .tx .send(JupyterReplRequest::JsGetProperties { object_id }); let Some(JupyterReplResponse::JsGetProperties(resp)) = self.rx.recv().await else { unreachable!() }; resp } pub async fn evaluate( &mut self, expr: String, ) -> Option<cdp::EvaluateResponse> { let _ = self.tx.send(JupyterReplRequest::JsEvaluate { expr }); let Some(JupyterReplResponse::JsEvaluate(resp)) = self.rx.recv().await else { unreachable!() }; resp } pub async fn global_lexical_scope_names( &mut self, ) -> cdp::GlobalLexicalScopeNamesResponse { let _ = self.tx.send(JupyterReplRequest::JsGlobalLexicalScopeNames); let Some(JupyterReplResponse::JsGlobalLexicalScopeNames(resp)) = self.rx.recv().await else { unreachable!() }; resp } pub async fn evaluate_line_with_object_wrapping( &mut self, line: String, ) -> Result<repl::TsEvaluateResponse, AnyError> { let _ = self .tx .send(JupyterReplRequest::JsEvaluateLineWithObjectWrapping { line }); let Some(JupyterReplResponse::JsEvaluateLineWithObjectWrapping(resp)) = self.rx.recv().await else { unreachable!() }; resp } pub async fn call_function_on_args( &mut self, function_declaration: String, args: Vec<cdp::RemoteObject>, ) -> Result<cdp::CallFunctionOnResponse, AnyError> { let _ = self.tx.send(JupyterReplRequest::JsCallFunctionOnArgs { function_declaration, args, }); let Some(JupyterReplResponse::JsCallFunctionOnArgs(resp)) = self.rx.recv().await else { unreachable!() }; resp } // TODO(bartlomieju): rename to "broadcast_result"? pub async fn call_function_on( &mut self, arg0: cdp::CallArgument, arg1: cdp::CallArgument, ) -> Option<cdp::CallFunctionOnResponse> { let _ = self .tx .send(JupyterReplRequest::JsCallFunctionOn { arg0, arg1 }); let Some(JupyterReplResponse::JsCallFunctionOn(resp)) = self.rx.recv().await else { unreachable!() }; resp } } pub struct JupyterReplSession { repl_session: repl::ReplSession, rx: mpsc::UnboundedReceiver<JupyterReplRequest>, tx: mpsc::UnboundedSender<JupyterReplResponse>, } impl JupyterReplSession { pub async fn start(&mut self) { let mut poll_worker = true; loop { tokio::select! { biased; maybe_message = self.rx.recv() => { let Some(msg) = maybe_message else { break; }; if self.handle_message(msg).await.is_err() { break; } poll_worker = true; }, _ = self.repl_session.run_event_loop(), if poll_worker => { poll_worker = false; } } } } async fn handle_message( &mut self, msg: JupyterReplRequest, ) -> Result<(), AnyError> { let resp = match msg { JupyterReplRequest::LspCompletions { line_text, position, } => JupyterReplResponse::LspCompletions( self.lsp_completions(&line_text, position).await, ), JupyterReplRequest::JsGetProperties { object_id } => { JupyterReplResponse::JsGetProperties( self.get_properties(object_id).await, ) } JupyterReplRequest::JsEvaluate { expr } => { JupyterReplResponse::JsEvaluate(self.evaluate(expr).await) } JupyterReplRequest::JsGlobalLexicalScopeNames => { JupyterReplResponse::JsGlobalLexicalScopeNames( self.global_lexical_scope_names().await, ) } JupyterReplRequest::JsEvaluateLineWithObjectWrapping { line } => { JupyterReplResponse::JsEvaluateLineWithObjectWrapping( self.evaluate_line_with_object_wrapping(&line).await, ) } JupyterReplRequest::JsCallFunctionOnArgs { function_declaration, args, } => JupyterReplResponse::JsCallFunctionOnArgs( self .call_function_on_args(function_declaration, &args) .await, ), JupyterReplRequest::JsCallFunctionOn { arg0, arg1 } => { JupyterReplResponse::JsCallFunctionOn( self.call_function_on(arg0, arg1).await, ) } }; self.tx.send(resp).map_err(|e| e.into()) } pub async fn lsp_completions( &mut self, line_text: &str, position: usize, ) -> Vec<ReplCompletionItem> { self .repl_session .language_server .completions(line_text, position) .await } pub async fn get_properties( &mut self, object_id: String, ) -> Option<cdp::GetPropertiesResponse> { let get_properties_response = self .repl_session .post_message_with_event_loop( "Runtime.getProperties", Some(cdp::GetPropertiesArgs { object_id, own_properties: None, accessor_properties_only: None, generate_preview: None, non_indexed_properties_only: Some(true), }), ) .await .ok()?; serde_json::from_value(get_properties_response).ok() } pub async fn evaluate( &mut self, expr: String, ) -> Option<cdp::EvaluateResponse> { let evaluate_response: serde_json::Value = self .repl_session .post_message_with_event_loop( "Runtime.evaluate", Some(cdp::EvaluateArgs { expression: expr, object_group: None, include_command_line_api: None, silent: None, context_id: Some(self.repl_session.context_id), return_by_value: None, generate_preview: None, user_gesture: None, await_promise: None, throw_on_side_effect: Some(true), timeout: Some(200), disable_breaks: None, repl_mode: None, allow_unsafe_eval_blocked_by_csp: None, unique_context_id: None, }), ) .await .ok()?; serde_json::from_value(evaluate_response).ok() } pub async fn global_lexical_scope_names( &mut self, ) -> cdp::GlobalLexicalScopeNamesResponse { let evaluate_response = self .repl_session .post_message_with_event_loop( "Runtime.globalLexicalScopeNames", Some(cdp::GlobalLexicalScopeNamesArgs { execution_context_id: Some(self.repl_session.context_id), }), ) .await .unwrap(); serde_json::from_value(evaluate_response).unwrap() } pub async fn evaluate_line_with_object_wrapping( &mut self, line: &str, ) -> Result<repl::TsEvaluateResponse, AnyError> { self .repl_session .evaluate_line_with_object_wrapping(line) .await } pub async fn call_function_on_args( &mut self, function_declaration: String, args: &[cdp::RemoteObject], ) -> Result<cdp::CallFunctionOnResponse, AnyError> { self .repl_session .call_function_on_args(function_declaration, args) .await } // TODO(bartlomieju): rename to "broadcast_result"? pub async fn call_function_on( &mut self, arg0: cdp::CallArgument, arg1: cdp::CallArgument, ) -> Option<cdp::CallFunctionOnResponse> { let response = self.repl_session .post_message_with_event_loop( "Runtime.callFunctionOn", Some(json!({ "functionDeclaration": r#"async function (execution_count, result) { await Deno[Deno.internal].jupyter.broadcastResult(execution_count, result); }"#, "arguments": [arg0, arg1], "executionContextId": self.repl_session.context_id, "awaitPromise": true, })), ) .await.ok()?; serde_json::from_value(response).ok() } }