//! SPDX-License-Identifier: MIT //! //! Copyright (c) 2023, eunomia-bpf //! All rights reserved. //! //! Here tests whether the apis of server & client can work peoperly use std::{convert::Infallible, net::SocketAddr, str::FromStr, time::Duration}; use ecli_lib::{ config::ProgramType, runner::{ client::{http::EcliHttpClient, AbstractClient, ProgramStatus}, server_http::{EcliHttpServerAPI, HttpServerState}, LogType, }, }; use hyper::{ server::conn::AddrStream, service::{make_service_fn, service_fn, Service}, Body, Request, Server, }; use std::path::PathBuf; use tokio::sync::mpsc::Sender; fn get_local_dir() -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tests") } async fn prepare_server_and_client(port: u16) -> (EcliHttpClient, Sender<()>) { let app_state = HttpServerState::default(); let addr = SocketAddr::from_str(&format!("127.0.0.1:{}", port)).unwrap(); let server = Server::bind(&addr); let server = server.serve(make_service_fn(move |_socket: &AddrStream| { let app_state = app_state.clone(); let mut service = ecli_server_codegen::Service::new(EcliHttpServerAPI {}); async move { Ok::<_, Infallible>(service_fn(move |req: Request
| { let app_state = app_state.clone(); service.call((req, app_state)) })) } })); let (stop_tx, mut stop_rx) = tokio::sync::mpsc::channel::<()>(1); tokio::spawn(async move { let server = server.with_graceful_shutdown(async move { stop_rx.recv().await; }); server.await.unwrap(); }); let client = EcliHttpClient::new(format!("http://127.0.0.1:{}", port)).unwrap(); (client, stop_tx) } #[tokio::test(flavor = "multi_thread")] async fn test_load_wasm_prog() { let (client, stop_tx) = prepare_server_and_client(8562).await; // Let's load a wasm program let wasm_handle = { let buf = std::fs::read(get_local_dir().join("bootstrap.wasm")).unwrap(); client .start_program( Some("my-wasm-program".to_string()), &buf, ProgramType::WasmModule, false, &[], None, ) .await .unwrap() }; let progs = client.get_program_list().await.unwrap(); assert_eq!(progs.len(), 1); assert_eq!(progs[0].id, wasm_handle); assert_eq!(progs[0].name, "my-wasm-program"); assert_eq!(progs[0].status, ProgramStatus::Running); tokio::time::sleep(Duration::from_secs(5)).await; // Let's see the logs let mut out_buf = String::default(); let mut cursor = None; for _ in 0..5 { let logs = client.fetch_logs(wasm_handle, cursor, None).await.unwrap(); for (cursor1, log) in logs { cursor = Some(cursor1 + 1); assert!(!matches!(log.log_type, LogType::Plain)); out_buf.push_str(log.log.as_str()); } tokio::time::sleep(Duration::from_millis(50)).await; } println!("{}", out_buf); stop_tx.send(()).await.unwrap(); } #[tokio::test(flavor = "multi_thread")] async fn test_load_json_prog() { let (client, stop_tx) = prepare_server_and_client(8563).await; let json_handle = { // Let's load a json program, but let it prints plain text let buf = std::fs::read(get_local_dir().join("bootstrap.json")).unwrap(); client .start_program( Some("my-json-program".to_string()), &buf, ProgramType::JsonEunomia, false, &[], None, ) .await .unwrap() }; let progs = client.get_program_list().await.unwrap(); assert_eq!(progs.len(), 1); assert_eq!(progs[0].id, json_handle); assert_eq!(progs[0].name, "my-json-program"); assert_eq!(progs[0].status, ProgramStatus::Running); tokio::time::sleep(Duration::from_secs(5)).await; // Let's see the logs let mut out_buf = String::default(); let mut cursor = None; for _ in 0..5 { let logs = client.fetch_logs(json_handle, cursor, None).await.unwrap(); for (cursor1, log) in logs { cursor = Some(cursor1 + 1); assert!(matches!(log.log_type, LogType::Plain)); out_buf.push_str(log.log.as_str()); } tokio::time::sleep(Duration::from_millis(50)).await; } println!("{}", out_buf); stop_tx.send(()).await.unwrap(); } #[tokio::test(flavor = "multi_thread")] async fn test_load_json_prog_and_out_json() { let (client, stop_tx) = prepare_server_and_client(8564).await; let json_handle = { // Let's load a json program, but let it prints json let buf = std::fs::read(get_local_dir().join("bootstrap.json")).unwrap(); client .start_program( Some("my-json-program".to_string()), &buf, ProgramType::JsonEunomia, true, &[], None, ) .await .unwrap() }; let progs = client.get_program_list().await.unwrap(); assert_eq!(progs.len(), 1); assert_eq!(progs[0].id, json_handle); assert_eq!(progs[0].name, "my-json-program"); assert_eq!(progs[0].status, ProgramStatus::Running); tokio::time::sleep(Duration::from_secs(5)).await; // Let's see the logs let mut cursor = None; for _ in 0..5 { let logs = client.fetch_logs(json_handle, cursor, None).await.unwrap(); for (cursor1, log) in logs { cursor = Some(cursor1 + 1); assert!(matches!(log.log_type, LogType::Plain)); serde_json::from_str::