//! This script is used to investigate issue #541. //! It creates and cancels a lot of transcoding sessions, to crash the Plex server. use async_std::task::sleep; use plex_api::{ device::DeviceConnection, library::{MediaItem, Movie}, media_container::server::library::{AudioCodec, VideoCodec}, transcode::{TranscodeSession, VideoTranscodeOptions}, HttpClientBuilder, MyPlex, MyPlexBuilder, Server, }; use std::{error::Error, time::Duration}; use tracing::{debug, error, info, trace}; struct AppArgs { pub server: Option, pub username: Option, pub password: Option, pub token: Option, pub client_id: Option, pub item_id: Option, pub delay_after_transcoding_start: u8, pub delay_after_transcoding_cancel: u8, } const HELP: &str = "\ Transcoding DoS USAGE: cargo run --example transcoding-dos -- [OPTIONS] FLAGS: -h, --help Prints help information OPTIONS: --server URL Sets the server's url --token STRING Sets the authentication token (preferred over username/password) --username STRING Sets the username for MyPlex --password STRING Sets the password for MyPlex --client-id STRING Sets the client id to use --item-id STRING Sets the item id from the server to use --delay-after-transcoding-start Sets the delay after starting the transcoding, seconds (default 0) --delay-after-transcoding-cancel Sets the delay after cancelling the transcoding, seconds (default 0) "; fn parse_args() -> Result { let mut pargs = pico_args::Arguments::from_env(); // Help has a higher priority and should be handled separately. if pargs.contains(["-h", "--help"]) { print!("{}", HELP); std::process::exit(0); } let args = AppArgs { server: pargs.opt_value_from_str("--server")?, username: pargs.opt_value_from_str("--username")?, password: pargs.opt_value_from_str("--password")?, token: pargs.opt_value_from_str("--token")?, client_id: pargs.opt_value_from_str("--client-id")?, item_id: pargs.opt_value_from_str("--item-id")?, delay_after_transcoding_start: pargs .opt_value_from_str("--delay-after-transcoding-start")? .unwrap_or(0), delay_after_transcoding_cancel: pargs .opt_value_from_str("--delay-after-transcoding-cancel")? .unwrap_or(0), }; // It's up to the caller what to do with the remaining arguments. let remaining = pargs.finish(); if !remaining.is_empty() { panic!("Warning: unused arguments left: {:?}.", remaining); } Ok(args) } async fn find_server(myplex: &MyPlex) -> Result, Box> { for device in myplex.device_manager()?.devices().await? { if !device.is_server() { continue; } info!("Connecting to {}...", device.name()); match device.connect().await { Ok(DeviceConnection::Server(server)) => { info!("Connected via {}", server.client().api_url); return Ok(Some(*server)); } Err(e) => { error!("Error connecting: {e}"); } _ => {} } } Ok(None) } async fn cancel_transcodes(server: &Server) -> Result<(), Box> { for session in server.transcode_sessions().await? { info!("Cancelling {}", session.session_id()); if let Err(e) = session.cancel().await { error!("{e}"); } } Ok(()) } async fn do_transcode(id: &str, server: Server) -> Option { trace!("Starting transcode {id}"); let item: Movie = server.item_by_id(id).await.unwrap().try_into().unwrap(); let media = &item.media()[0]; let part = &media.parts()[0]; trace!("Got item {id}"); let session = part .create_download_session(VideoTranscodeOptions { bitrate: 110, video_codecs: vec![VideoCodec::H264], audio_codecs: vec![AudioCodec::Mp3], ..Default::default() }) .await .unwrap(); debug!("Got session {id} {}", session.session_id()); // Wait until the transcode session has started. let mut count = 0; loop { match session.stats().await { Ok(_) => { info!("Session started {id}"); break; } Err(plex_api::Error::ItemNotFound) => { count += 1; if count > 10 { error!("Transcode failed to start"); return None; } } Err(e) => panic!("Transcode failed: {e}"), } sleep(Duration::from_millis(200)).await; } Some(session) } async fn punish(server: &Server, args: AppArgs) -> Result<(), Box> { let item_id = args.item_id.as_ref().expect("item-id must be provided"); loop { for id in item_id.split(',') { if let Some(session) = do_transcode(id, server.clone()).await { sleep(Duration::from_secs( args.delay_after_transcoding_start as u64, )) .await; info!("Cancelling {}", session.session_id()); session.cancel().await.unwrap(); } sleep(Duration::from_secs( args.delay_after_transcoding_cancel as u64, )) .await; } } } #[async_std::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); let args = parse_args()?; let mut client_builder = HttpClientBuilder::generic(); if let Some(token) = &args.token { client_builder = client_builder.set_x_plex_token(token.to_owned()); } if let Some(client_id) = &args.client_id { client_builder = client_builder.set_x_plex_client_identifier(client_id); } let client = client_builder.build()?; let server = if let Some(server) = &args.server { Server::new(server, client).await? } else if let (Some(username), Some(password)) = (&args.username, &args.password) { let myplex = MyPlexBuilder::default() .set_client(client) .set_username_and_password(username, password.to_owned()) .build() .await?; debug!("Token: {}", myplex.client().x_plex_token()); find_server(&myplex) .await? .expect("unable to find a server") } else if client.is_authenticated() { let myplex = MyPlexBuilder::default().set_client(client).build().await?; find_server(&myplex) .await? .expect("unable to find a server") } else { panic!("Must provide a server URL or plex username and password."); }; cancel_transcodes(&server).await?; punish(&server, args).await?; Ok(()) }