use futures::TryStreamExt; use rand::{Rng, SeedableRng}; use rand_xoshiro::Xoshiro256PlusPlus; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use sqlx::{ query, sqlite::Sqlite, sqlite::SqliteRow, Column, ConnectOptions, Connection, Executor, Row, SqliteConnection, SqlitePool, Statement, TypeInfo, }; use sqlx_test::new; use std::sync::Arc; #[sqlx_macros::test] async fn it_connects() -> anyhow::Result<()> { Ok(new::().await?.ping().await?) } #[sqlx_macros::test] async fn it_fetches_and_inflates_row() -> anyhow::Result<()> { let mut conn = new::().await?; // process rows, one-at-a-time // this reuses the memory of the row { let expected = [15, 39, 51]; let mut i = 0; let mut s = conn.fetch("SELECT 15 UNION SELECT 51 UNION SELECT 39"); while let Some(row) = s.try_next().await? { let v1 = row.get::(0); assert_eq!(expected[i], v1); i += 1; } } // same query, but fetch all rows at once // this triggers the internal inflation let rows = conn .fetch_all("SELECT 15 UNION SELECT 51 UNION SELECT 39") .await?; assert_eq!(rows.len(), 3); assert_eq!(rows[0].get::(0), 15); assert_eq!(rows[1].get::(0), 39); assert_eq!(rows[2].get::(0), 51); // same query but fetch the first row a few times from a non-persistent query // these rows should be immediately inflated let row1 = conn .fetch_one("SELECT 15 UNION SELECT 51 UNION SELECT 39") .await?; assert_eq!(row1.get::(0), 15); let row2 = conn .fetch_one("SELECT 15 UNION SELECT 51 UNION SELECT 39") .await?; assert_eq!(row1.get::(0), 15); assert_eq!(row2.get::(0), 15); // same query (again) but make it persistent // and fetch the first row a few times let row1 = conn .fetch_one(query("SELECT 15 UNION SELECT 51 UNION SELECT 39")) .await?; assert_eq!(row1.get::(0), 15); let row2 = conn .fetch_one(query("SELECT 15 UNION SELECT 51 UNION SELECT 39")) .await?; assert_eq!(row1.get::(0), 15); assert_eq!(row2.get::(0), 15); Ok(()) } #[sqlx_macros::test] async fn it_maths() -> anyhow::Result<()> { let mut conn = new::().await?; let value = sqlx::query("select 1 + ?1") .bind(5_i32) .try_map(|row: SqliteRow| row.try_get::(0)) .fetch_one(&mut conn) .await?; assert_eq!(6i32, value); Ok(()) } #[sqlx_macros::test] async fn test_bind_multiple_statements_multiple_values() -> anyhow::Result<()> { let mut conn = new::().await?; let values: Vec = sqlx::query_scalar::<_, i32>("select ?; select ?") .bind(5_i32) .bind(15_i32) .fetch_all(&mut conn) .await?; assert_eq!(values.len(), 2); assert_eq!(values[0], 5); assert_eq!(values[1], 15); Ok(()) } #[sqlx_macros::test] async fn test_bind_multiple_statements_same_value() -> anyhow::Result<()> { let mut conn = new::().await?; let values: Vec = sqlx::query_scalar::<_, i32>("select ?1; select ?1") .bind(25_i32) .fetch_all(&mut conn) .await?; assert_eq!(values.len(), 2); assert_eq!(values[0], 25); assert_eq!(values[1], 25); Ok(()) } #[sqlx_macros::test] async fn it_can_describe_with_pragma() -> anyhow::Result<()> { use sqlx::{Decode, TypeInfo, ValueRef}; let mut conn = new::().await?; let defaults = sqlx::query("pragma table_info (tweet)") .try_map(|row: SqliteRow| { let val = row.try_get_raw("dflt_value")?; let ty = val.type_info().clone().into_owned(); let val: Option = Decode::::decode(val).map_err(sqlx::Error::Decode)?; if val.is_some() { assert_eq!(ty.name(), "TEXT"); } Ok(val) }) .fetch_all(&mut conn) .await?; assert_eq!(defaults[0], None); assert_eq!(defaults[2], Some(0)); Ok(()) } #[sqlx_macros::test] async fn it_binds_positional_parameters_issue_467() -> anyhow::Result<()> { let mut conn = new::().await?; let row: (i32, i32, i32, i32) = sqlx::query_as("select ?1, ?1, ?3, ?2") .bind(5_i32) .bind(500_i32) .bind(1020_i32) .fetch_one(&mut conn) .await?; assert_eq!(row.0, 5); assert_eq!(row.1, 5); assert_eq!(row.2, 1020); assert_eq!(row.3, 500); Ok(()) } #[sqlx_macros::test] async fn it_fetches_in_loop() -> anyhow::Result<()> { // this is trying to check for any data races // there were a few that triggered *sometimes* while building out StatementWorker for _ in 0..1000_usize { let mut conn = new::().await?; let v: Vec<(i32,)> = sqlx::query_as("SELECT 1").fetch_all(&mut conn).await?; assert_eq!(v[0].0, 1); } Ok(()) } #[sqlx_macros::test] async fn it_executes_with_pool() -> anyhow::Result<()> { let pool: SqlitePool = SqlitePoolOptions::new() .min_connections(2) .max_connections(2) .test_before_acquire(false) .connect(&dotenvy::var("DATABASE_URL")?) .await?; let rows = pool.fetch_all("SELECT 1; SElECT 2").await?; assert_eq!(rows.len(), 2); Ok(()) } #[cfg(sqlite_ipaddr)] #[sqlx_macros::test] async fn it_opens_with_extension() -> anyhow::Result<()> { use std::str::FromStr; let opts = SqliteConnectOptions::from_str(&dotenvy::var("DATABASE_URL")?)?.extension("ipaddr"); let mut conn = SqliteConnection::connect_with(&opts).await?; conn.execute("SELECT ipmasklen('192.168.16.12/24');") .await?; conn.close().await?; Ok(()) } #[sqlx_macros::test] async fn it_opens_in_memory() -> anyhow::Result<()> { // If the filename is ":memory:", then a private, temporary in-memory database // is created for the connection. let conn = SqliteConnection::connect(":memory:").await?; conn.close().await?; Ok(()) } #[sqlx_macros::test] async fn it_opens_temp_on_disk() -> anyhow::Result<()> { // If the filename is an empty string, then a private, temporary on-disk database will // be created. let conn = SqliteConnection::connect("").await?; conn.close().await?; Ok(()) } #[sqlx_macros::test] async fn it_fails_to_parse() -> anyhow::Result<()> { let mut conn = new::().await?; let res = sqlx::raw_sql("SEELCT 1").execute(&mut conn).await; assert!(res.is_err()); let err = res.unwrap_err().to_string(); assert_eq!( "error returned from database: (code: 1) near \"SEELCT\": syntax error", err ); Ok(()) } #[sqlx_macros::test] async fn it_handles_empty_queries() -> anyhow::Result<()> { let mut conn = new::().await?; let done = conn.execute("").await?; assert_eq!(done.rows_affected(), 0); Ok(()) } #[sqlx_macros::test] fn it_binds_parameters() -> anyhow::Result<()> { let mut conn = new::().await?; let v: i32 = sqlx::query_scalar("SELECT ?") .bind(10_i32) .fetch_one(&mut conn) .await?; assert_eq!(v, 10); let v: (i32, i32) = sqlx::query_as("SELECT ?1, ?") .bind(10_i32) .fetch_one(&mut conn) .await?; assert_eq!(v.0, 10); assert_eq!(v.1, 10); Ok(()) } #[sqlx_macros::test] fn it_binds_dollar_parameters() -> anyhow::Result<()> { let mut conn = new::().await?; let v: (i32, i32) = sqlx::query_as("SELECT $1, $2") .bind(10_i32) .bind(11_i32) .fetch_one(&mut conn) .await?; assert_eq!(v.0, 10); assert_eq!(v.1, 11); Ok(()) } #[sqlx_macros::test] async fn it_executes_queries() -> anyhow::Result<()> { let mut conn = new::().await?; let _ = conn .execute( r#" CREATE TEMPORARY TABLE users (id INTEGER PRIMARY KEY) "#, ) .await?; for index in 1..=10_i32 { let done = sqlx::query("INSERT INTO users (id) VALUES (?)") .bind(index * 2) .execute(&mut conn) .await?; assert_eq!(done.rows_affected(), 1); } let sum: i32 = sqlx::query_as("SELECT id FROM users") .fetch(&mut conn) .try_fold(0_i32, |acc, (x,): (i32,)| async move { Ok(acc + x) }) .await?; assert_eq!(sum, 110); Ok(()) } #[sqlx_macros::test] async fn it_can_execute_multiple_statements() -> anyhow::Result<()> { let mut conn = new::().await?; let done = conn .execute( r#" CREATE TEMPORARY TABLE users (id INTEGER PRIMARY KEY, other INTEGER); INSERT INTO users DEFAULT VALUES; "#, ) .await?; assert_eq!(done.rows_affected(), 1); for index in 2..5_i32 { let (id, other): (i32, i32) = sqlx::query_as( r#" INSERT INTO users (other) VALUES (?); SELECT id, other FROM users WHERE id = last_insert_rowid(); "#, ) .bind(index) .fetch_one(&mut conn) .await?; assert_eq!(id, index); assert_eq!(other, index); } Ok(()) } #[sqlx_macros::test] async fn it_interleaves_reads_and_writes() -> anyhow::Result<()> { let mut conn = new::().await?; let mut cursor = conn.fetch( " CREATE TABLE IF NOT EXISTS _sqlx_test ( id INT PRIMARY KEY, text TEXT NOT NULL ); SELECT 'Hello World' as _1; INSERT INTO _sqlx_test (text) VALUES ('this is a test'); SELECT id, text FROM _sqlx_test; ", ); let row = cursor.try_next().await?.unwrap(); assert!("Hello World" == row.try_get::<&str, _>("_1")?); let row = cursor.try_next().await?.unwrap(); let id: i64 = row.try_get("id")?; let text: &str = row.try_get("text")?; assert_eq!(0, id); assert_eq!("this is a test", text); Ok(()) } #[sqlx_macros::test] async fn it_supports_collations() -> anyhow::Result<()> { let mut conn = new::().await?; // also tests `.lock_handle()` conn.lock_handle() .await? .create_collation("test_collation", |l, r| l.cmp(r).reverse())?; let _ = conn .execute( r#" CREATE TEMPORARY TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL COLLATE test_collation) "#, ) .await?; sqlx::query("INSERT INTO users (name) VALUES (?)") .bind("a") .execute(&mut conn) .await?; sqlx::query("INSERT INTO users (name) VALUES (?)") .bind("b") .execute(&mut conn) .await?; let row: SqliteRow = conn .fetch_one("SELECT name FROM users ORDER BY name ASC") .await?; let name: &str = row.try_get(0)?; assert_eq!(name, "b"); Ok(()) } #[sqlx_macros::test] async fn it_caches_statements() -> anyhow::Result<()> { let mut conn = new::().await?; // Initial PRAGMAs are not cached as we are not going to execute // them more than once. assert_eq!(0, conn.cached_statements_size()); // `&str` queries are not persistent. let row = conn.fetch_one("SELECT 100 AS val").await?; let val: i32 = row.get("val"); assert_eq!(val, 100); assert_eq!(0, conn.cached_statements_size()); // `Query` is persistent by default. let mut conn = new::().await?; for i in 0..2 { let row = sqlx::query("SELECT ? AS val") .bind(i) .fetch_one(&mut conn) .await?; let val: i32 = row.get("val"); assert_eq!(i, val); } assert_eq!(1, conn.cached_statements_size()); // Cache can be cleared. conn.clear_cached_statements().await?; assert_eq!(0, conn.cached_statements_size()); // `Query` is not persistent if `.persistent(false)` is used // explicitly. let mut conn = new::().await?; for i in 0..2 { let row = sqlx::query("SELECT ? AS val") .bind(i) .persistent(false) .fetch_one(&mut conn) .await?; let val: i32 = row.get("val"); assert_eq!(i, val); } assert_eq!(0, conn.cached_statements_size()); Ok(()) } #[sqlx_macros::test] async fn it_can_prepare_then_execute() -> anyhow::Result<()> { let mut conn = new::().await?; let mut tx = conn.begin().await?; let _ = sqlx::query("INSERT INTO tweet ( id, text ) VALUES ( 2, 'Hello, World' )") .execute(&mut *tx) .await?; let tweet_id: i32 = 2; let statement = tx.prepare("SELECT * FROM tweet WHERE id = ?1").await?; assert_eq!(statement.column(0).name(), "id"); assert_eq!(statement.column(1).name(), "text"); assert_eq!(statement.column(2).name(), "is_sent"); assert_eq!(statement.column(3).name(), "owner_id"); assert_eq!(statement.column(0).type_info().name(), "INTEGER"); assert_eq!(statement.column(1).type_info().name(), "TEXT"); assert_eq!(statement.column(2).type_info().name(), "BOOLEAN"); assert_eq!(statement.column(3).type_info().name(), "INTEGER"); let row = statement.query().bind(tweet_id).fetch_one(&mut *tx).await?; let tweet_text: &str = row.try_get("text")?; assert_eq!(tweet_text, "Hello, World"); Ok(()) } #[sqlx_macros::test] async fn it_resets_prepared_statement_after_fetch_one() -> anyhow::Result<()> { let mut conn = new::().await?; conn.execute("CREATE TEMPORARY TABLE foobar (id INTEGER)") .await?; conn.execute("INSERT INTO foobar VALUES (42)").await?; let r = sqlx::query("SELECT id FROM foobar") .fetch_one(&mut conn) .await?; let x: i32 = r.try_get("id")?; assert_eq!(x, 42); conn.execute("DROP TABLE foobar").await?; Ok(()) } #[sqlx_macros::test] async fn it_resets_prepared_statement_after_fetch_many() -> anyhow::Result<()> { let mut conn = new::().await?; conn.execute("CREATE TEMPORARY TABLE foobar (id INTEGER)") .await?; conn.execute("INSERT INTO foobar VALUES (42)").await?; conn.execute("INSERT INTO foobar VALUES (43)").await?; let mut rows = sqlx::query("SELECT id FROM foobar").fetch(&mut conn); let row = rows.try_next().await?.unwrap(); let x: i32 = row.try_get("id")?; assert_eq!(x, 42); drop(rows); conn.execute("DROP TABLE foobar").await?; Ok(()) } // https://github.com/launchbadge/sqlx/issues/1300 #[sqlx_macros::test] async fn concurrent_resets_dont_segfault() { use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions}; use std::{str::FromStr, time::Duration}; let mut conn = SqliteConnectOptions::from_str(":memory:") .unwrap() .connect() .await .unwrap(); sqlx::query("CREATE TABLE stuff (name INTEGER, value INTEGER)") .execute(&mut conn) .await .unwrap(); sqlx_core::rt::spawn(async move { for i in 0..1000 { sqlx::query("INSERT INTO stuff (name, value) VALUES (?, ?)") .bind(i) .bind(0) .execute(&mut conn) .await .unwrap(); } }); sqlx_core::rt::sleep(Duration::from_millis(1)).await; } // https://github.com/launchbadge/sqlx/issues/1419 // note: this passes before and after the fix; you need to run it with `--nocapture` // to see the panic from the worker thread, which doesn't happen after the fix #[sqlx_macros::test] async fn row_dropped_after_connection_doesnt_panic() { let mut conn = SqliteConnection::connect(":memory:").await.unwrap(); let books = sqlx::query("SELECT 'hello' AS title") .fetch_all(&mut conn) .await .unwrap(); for book in &books { // force the row to be inflated let _title: String = book.get("title"); } // hold `books` past the lifetime of `conn` drop(conn); sqlx_core::rt::sleep(std::time::Duration::from_secs(1)).await; drop(books); } // note: to repro issue #1467 this should be run in release mode // May spuriously fail with UNIQUE constraint failures (which aren't relevant to the original issue) // which I have tried to reproduce using the same seed as printed from CI but to no avail. // It may be due to some nondeterminism in SQLite itself for all I know. #[sqlx_macros::test] #[ignore] async fn issue_1467() -> anyhow::Result<()> { let mut conn = SqliteConnectOptions::new() .filename(":memory:") .connect() .await?; sqlx::query( r#" CREATE TABLE kv (k PRIMARY KEY, v); CREATE INDEX idx_kv ON kv (v); "#, ) .execute(&mut conn) .await?; // Random seed: let seed: [u8; 32] = rand::random(); println!("RNG seed: {}", hex::encode(&seed)); // Pre-determined seed: // let mut seed: [u8; 32] = [0u8; 32]; // hex::decode_to_slice( // "135234871d03fc0479e22f2f06395b6074761bac5fe7dcf205dbe01eef9f7794", // &mut seed, // )?; // reproducible RNG for testing let mut rng = Xoshiro256PlusPlus::from_seed(seed); for i in 0..1_000_000 { if i % 1_000 == 0 { println!("{i}"); } let key = rng.gen_range(0..1_000); let value = rng.gen_range(0..1_000); let mut tx = conn.begin().await?; let exists = sqlx::query("SELECT 1 FROM kv WHERE k = ?") .bind(key) .fetch_optional(&mut *tx) .await?; if exists.is_some() { sqlx::query("UPDATE kv SET v = ? WHERE k = ?") .bind(value) .bind(key) .execute(&mut *tx) .await?; } else { sqlx::query("INSERT INTO kv(k, v) VALUES (?, ?)") .bind(key) .bind(value) .execute(&mut *tx) .await?; } tx.commit().await?; } Ok(()) } #[sqlx_macros::test] async fn concurrent_read_and_write() { let pool: SqlitePool = SqlitePoolOptions::new() .min_connections(2) .connect(":memory:") .await .unwrap(); sqlx::query("CREATE TABLE kv (k PRIMARY KEY, v)") .execute(&pool) .await .unwrap(); let n = 100; let read = sqlx_core::rt::spawn({ let mut conn = pool.acquire().await.unwrap(); async move { for i in 0u32..n { sqlx::query("SELECT v FROM kv") .bind(i) .fetch_all(&mut *conn) .await .unwrap(); } } }); let write = sqlx_core::rt::spawn({ let mut conn = pool.acquire().await.unwrap(); async move { for i in 0u32..n { sqlx::query("INSERT INTO kv (k, v) VALUES (?, ?)") .bind(i) .bind(i * i) .execute(&mut *conn) .await .unwrap(); } } }); read.await; write.await; } #[sqlx_macros::test] async fn test_query_with_progress_handler() -> anyhow::Result<()> { let mut conn = new::().await?; // Using this string as a canary to ensure the callback doesn't get called with the wrong data pointer. let state = format!("test"); conn.lock_handle().await?.set_progress_handler(1, move || { assert_eq!(state, "test"); false }); match sqlx::query("SELECT 'hello' AS title") .fetch_all(&mut conn) .await { Err(sqlx::Error::Database(err)) => assert_eq!(err.message(), String::from("interrupted")), _ => panic!("expected an interrupt"), } Ok(()) } #[sqlx_macros::test] async fn test_multiple_set_progress_handler_calls_drop_old_handler() -> anyhow::Result<()> { let ref_counted_object = Arc::new(0); assert_eq!(1, Arc::strong_count(&ref_counted_object)); { let mut conn = new::().await?; let o = ref_counted_object.clone(); conn.lock_handle().await?.set_progress_handler(1, move || { println!("{o:?}"); false }); assert_eq!(2, Arc::strong_count(&ref_counted_object)); let o = ref_counted_object.clone(); conn.lock_handle().await?.set_progress_handler(1, move || { println!("{o:?}"); false }); assert_eq!(2, Arc::strong_count(&ref_counted_object)); let o = ref_counted_object.clone(); conn.lock_handle().await?.set_progress_handler(1, move || { println!("{o:?}"); false }); assert_eq!(2, Arc::strong_count(&ref_counted_object)); match sqlx::query("SELECT 'hello' AS title") .fetch_all(&mut conn) .await { Err(sqlx::Error::Database(err)) => { assert_eq!(err.message(), String::from("interrupted")) } _ => panic!("expected an interrupt"), } conn.lock_handle().await?.remove_progress_handler(); } assert_eq!(1, Arc::strong_count(&ref_counted_object)); Ok(()) }