Skip to content

Commit

Permalink
Make db tests explicitly close the pool
Browse files Browse the repository at this point in the history
This should stop the madness to attempt to close the pool on db drop,
which has never worked correctly and I haven't been able to fix.
  • Loading branch information
jmmv committed Dec 3, 2023
1 parent dfb0d4b commit c1d46df
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 40 deletions.
12 changes: 10 additions & 2 deletions authn/src/db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ mod postgres {
use crate::db::init_schema;
use iii_iv_core::db::postgres::PostgresDb;
use iii_iv_core::db::Db;
use std::sync::Arc;

async fn setup() -> PostgresDb {
let db = iii_iv_core::db::postgres::testutils::setup().await;
Expand All @@ -230,7 +231,10 @@ mod postgres {
}

generate_db_tests!(
&mut setup().await.ex().await.unwrap(),
{
let db = Arc::from(setup().await);
(db.clone(), &mut db.ex().await.unwrap())
},
#[ignore = "Requires environment configuration and is expensive"]
);
}
Expand All @@ -240,12 +244,16 @@ mod sqlite {
use crate::db::init_schema;
use iii_iv_core::db::sqlite::SqliteDb;
use iii_iv_core::db::Db;
use std::sync::Arc;

async fn setup() -> SqliteDb {
let db = iii_iv_core::db::sqlite::testutils::setup().await;
init_schema(&mut db.ex().await.unwrap()).await.unwrap();
db
}

generate_db_tests!(&mut setup().await.ex().await.unwrap());
generate_db_tests!({
let db = Arc::from(setup().await);
(db.clone(), &mut db.ex().await.unwrap())
});
}
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ postgres = [
]
sqlite = [
"dep:futures",
"dep:log",
"dep:tokio",
"sqlx/sqlite",
"sqlx/runtime-tokio-rustls",
Expand Down
37 changes: 24 additions & 13 deletions core/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ pub trait Db {
/// It is the responsibility of the caller to call `commit` on the returned executor. Otherwise
/// the transaction is rolled back on drop.
async fn begin(&self) -> DbResult<TxExecutor>;

/// Closes the connection to the pool.
///
/// The caller can never do anything useful on error, so this doesn't return them.
async fn close(&self);
}

/// Parses a `COUNT` result as a `usize`.
Expand Down Expand Up @@ -153,7 +158,14 @@ pub mod testutils {
$(#[$extra])?
async fn $name() {
$crate::db::testutils::paste! {
$module :: [< $name >]($setup).await;
let db = {
let (db, arg) = $setup;
$module :: [< $name >](arg).await;
db
};
// arg must be dropped at this point to not hog a potential connection to the
// database, which would deadlock the close await.
db.close().await;
}
}
}
Expand All @@ -164,9 +176,10 @@ pub mod testutils {
/// Instantiates a collection of tests for a specific database system.
///
/// The database implementation to run the tests against is determined by the `setup`
/// expression, which needs to return a database object parameterized with the desired
/// transaction type. The returned database should also have been initialized with the
/// desired schema.
/// expression, which needs to return a database object and the argument to pass to the
/// tests. The first database object might be the same as the argument, but the duplication
/// is necessary to allow closing the pool. The returned database should also have been
/// initialized with the desired schema.
///
/// The `extra` metadata parameter can be used to tag the generated tests.
#[macro_export]
Expand Down Expand Up @@ -228,14 +241,14 @@ mod tests {
}
}

pub(super) async fn test_direct_execution(db: Box<dyn Db>) {
pub(super) async fn test_direct_execution(db: Arc<dyn Db>) {
let mut ex = db.ex().await.unwrap();
exec(&mut ex, "CREATE TABLE test (i INTEGER)").await.unwrap();
exec(&mut ex, "INSERT INTO test (i) VALUES (3)").await.unwrap();
assert_eq!(1, query_i64(&mut ex, "count", "SELECT COUNT(*) AS count FROM test").await);
}

pub(super) async fn test_tx_commit(db: Box<dyn Db>) {
pub(super) async fn test_tx_commit(db: Arc<dyn Db>) {
exec(&mut db.ex().await.unwrap(), "CREATE TABLE test (i INTEGER)").await.unwrap();

let mut tx = db.begin().await.unwrap();
Expand All @@ -249,7 +262,7 @@ mod tests {
);
}

pub(super) async fn test_tx_rollback_on_drop(db: Box<dyn Db>) {
pub(super) async fn test_tx_rollback_on_drop(db: Arc<dyn Db>) {
exec(&mut db.ex().await.unwrap(), "CREATE TABLE test (i INTEGER)").await.unwrap();

{
Expand All @@ -264,17 +277,15 @@ mod tests {
);
}

pub(super) async fn test_multiple_txs(db: Box<dyn Db>) {
pub(super) async fn test_multiple_txs(db: Arc<dyn Db>) {
let tx1 = db.begin().await.unwrap();
let tx2 = db.begin().await.unwrap();
tx1.commit().await.unwrap();
tx2.commit().await.unwrap();
}

pub(super) async fn test_begin_tx_after_drop(db: Box<dyn Db>) {
let db: Arc<dyn Db> = Arc::from(db);

let tx1 = db.clone().begin().await.unwrap();
pub(super) async fn test_begin_tx_after_commit(db: Arc<dyn Db>) {
let tx1 = db.begin().await.unwrap();
tx1.commit().await.unwrap();

let tx2 = db.begin().await.unwrap();
Expand All @@ -291,7 +302,7 @@ mod tests {
$setup,
$crate::db::tests,
test_multiple_txs,
test_begin_tx_after_drop
test_begin_tx_after_commit
);
}
];
Expand Down
28 changes: 18 additions & 10 deletions core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,13 @@ pub struct PostgresDb {

impl Drop for PostgresDb {
fn drop(&mut self) {
let pool = self.pool.clone();
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
rt.block_on(async move {
pool.close().await;
});
});
if !self.pool.is_closed() {
if cfg!(debug_assertions) {
panic!("Dropping connection without having called close() first");
} else {
warn!("Dropping connection without having called close() first");
}
}
}
}

Expand Down Expand Up @@ -381,6 +381,10 @@ impl Db for PostgresDb {
let tx = retry(|| self.pool.begin(), self.max_retries).await?;
Ok(TxExecutor(Executor::Postgres(PostgresExecutor::TxExec(tx))))
}

async fn close(&self) {
self.pool.close().await;
}
}

/// Helper function to initialize the database with a schema.
Expand Down Expand Up @@ -428,6 +432,7 @@ mod tests {
use super::*;
use crate::db::tests::{generate_db_ro_concurrent_tests, generate_db_rw_tests};
use std::env;
use std::sync::Arc;

generate_db_ro_concurrent_tests!(
{
Expand All @@ -437,14 +442,17 @@ mod tests {
// connections to 1 but we need at least 2 for the concurrent tests to succeed.
// This means that the tests cannot write to the database because we did not set
// up the `search_path`.
let db = PostgresDb::connect(PostgresOptions::from_env("PGSQL_TEST").unwrap()).unwrap();
Box::from(db)
let db = Arc::from(PostgresDb::connect(PostgresOptions::from_env("PGSQL_TEST").unwrap()).unwrap());
(db.clone(), db)
},
#[ignore = "Requires environment configuration and is expensive"]
);

generate_db_rw_tests!(
Box::from(setup().await),
{
let db = Arc::from(setup().await);
(db.clone(), db)
},
#[ignore = "Requires environment configuration and is expensive"]
);

Expand Down
26 changes: 17 additions & 9 deletions core/src/db/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::db::{Db, DbError, DbResult, Executor, TxExecutor};
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::TryStreamExt;
use log::warn;
use sqlx::pool::PoolConnection;
use sqlx::sqlite::{Sqlite, SqlitePool};
use sqlx::Transaction;
Expand Down Expand Up @@ -242,13 +243,9 @@ impl SqliteDb {

impl Drop for SqliteDb {
fn drop(&mut self) {
let pool = self.pool.clone();
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
rt.block_on(async move {
pool.close().await;
});
});
if !self.pool.is_closed() {
warn!("Dropping connection without having called close() first");
}
}
}

Expand All @@ -263,6 +260,10 @@ impl Db for SqliteDb {
let tx = self.pool.begin().await.map_err(map_sqlx_error)?;
Ok(TxExecutor(Executor::Sqlite(SqliteExecutor::TxExec(tx))))
}

async fn close(&self) {
self.pool.close().await;
}
}

/// Helper function to initialize the database with a schema.
Expand Down Expand Up @@ -354,10 +355,17 @@ mod tests {
use super::testutils::*;
use super::*;
use crate::db::tests::{generate_db_ro_concurrent_tests, generate_db_rw_tests};
use std::sync::Arc;

generate_db_ro_concurrent_tests!(Box::from(setup().await));
generate_db_ro_concurrent_tests!({
let db = Arc::from(setup().await);
(db.clone(), db)
});

generate_db_rw_tests!(Box::from(setup().await));
generate_db_rw_tests!({
let db = Arc::from(setup().await);
(db.clone(), db)
});

#[test]
fn test_build_unpack_duration_zero() {
Expand Down
12 changes: 10 additions & 2 deletions example/src/db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ mod postgres {
use crate::db::init_schema;
use iii_iv_core::db::postgres::PostgresDb;
use iii_iv_core::db::Db;
use std::sync::Arc;

async fn setup() -> PostgresDb {
let db = iii_iv_core::db::postgres::testutils::setup().await;
Expand All @@ -92,7 +93,10 @@ mod postgres {
}

generate_db_tests!(
&mut setup().await.ex().await.unwrap(),
{
let db = Arc::from(setup().await);
(db.clone(), &mut db.ex().await.unwrap())
},
#[ignore = "Requires environment configuration and is expensive"]
);
}
Expand All @@ -102,12 +106,16 @@ mod sqlite {
use crate::db::init_schema;
use iii_iv_core::db::sqlite::SqliteDb;
use iii_iv_core::db::Db;
use std::sync::Arc;

async fn setup() -> SqliteDb {
let db = iii_iv_core::db::sqlite::testutils::setup().await;
init_schema(&mut db.ex().await.unwrap()).await.unwrap();
db
}

generate_db_tests!(&mut setup().await.ex().await.unwrap());
generate_db_tests!({
let db = Arc::from(setup().await);
(db.clone(), &mut db.ex().await.unwrap())
});
}
12 changes: 10 additions & 2 deletions queue/src/db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ mod postgres {
use crate::db::init_schema;
use iii_iv_core::db::postgres::PostgresDb;
use iii_iv_core::db::Db;
use std::sync::Arc;

async fn setup() -> PostgresDb {
let db = iii_iv_core::db::postgres::testutils::setup().await;
Expand All @@ -496,7 +497,10 @@ mod postgres {
}

generate_db_tests!(
&mut setup().await.ex().await.unwrap(),
{
let db = Arc::from(setup().await);
(db.clone(), &mut db.ex().await.unwrap())
},
#[ignore = "Requires environment configuration and is expensive"]
);
}
Expand All @@ -506,12 +510,16 @@ mod sqlite {
use crate::db::init_schema;
use iii_iv_core::db::sqlite::SqliteDb;
use iii_iv_core::db::Db;
use std::sync::Arc;

async fn setup() -> SqliteDb {
let db = iii_iv_core::db::sqlite::testutils::setup().await;
init_schema(&mut db.ex().await.unwrap()).await.unwrap();
db
}

generate_db_tests!(&mut setup().await.ex().await.unwrap());
generate_db_tests!({
let db = Arc::from(setup().await);
(db.clone(), &mut db.ex().await.unwrap())
});
}
12 changes: 10 additions & 2 deletions smtp/src/db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ mod postgres {
use crate::db::init_schema;
use iii_iv_core::db::postgres::PostgresDb;
use iii_iv_core::db::Db;
use std::sync::Arc;

async fn setup() -> PostgresDb {
let db = iii_iv_core::db::postgres::testutils::setup().await;
Expand All @@ -66,7 +67,10 @@ mod postgres {
}

generate_db_tests!(
&mut setup().await.ex().await.unwrap(),
{
let db = Arc::from(setup().await);
(db.clone(), &mut db.ex().await.unwrap())
},
#[ignore = "Requires environment configuration and is expensive"]
);
}
Expand All @@ -76,12 +80,16 @@ mod sqlite {
use crate::db::init_schema;
use iii_iv_core::db::sqlite::SqliteDb;
use iii_iv_core::db::Db;
use std::sync::Arc;

async fn setup() -> SqliteDb {
let db = iii_iv_core::db::sqlite::testutils::setup().await;
init_schema(&mut db.ex().await.unwrap()).await.unwrap();
db
}

generate_db_tests!(&mut setup().await.ex().await.unwrap());
generate_db_tests!({
let db = Arc::from(setup().await);
(db.clone(), &mut db.ex().await.unwrap())
});
}

0 comments on commit c1d46df

Please sign in to comment.