diff --git a/src/catalog.rs b/src/catalog.rs index 46bc10da..6ff98467 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -8,8 +8,6 @@ use datafusion::datasource::TableProvider; use datafusion::error::DataFusionError; use deltalake::DeltaTable; use itertools::Itertools; -#[cfg(test)] -use mockall::automock; use parking_lot::RwLock; use uuid::Uuid; @@ -143,6 +141,10 @@ impl From for DataFusionError { } } +// This is the main entrypoint to all individual catalogs for various objects types. +// The intention is to make it extensible and de-coupled from the underlying metastore +// persistence mechanism (such as the presently used `Repository`). +#[derive(Clone)] pub struct Metastore { pub catalogs: Arc, pub schemas: Arc, @@ -150,21 +152,17 @@ pub struct Metastore { pub functions: Arc, } -#[cfg_attr(test, automock)] #[async_trait] pub trait CatalogStore: Sync + Send { async fn create(&self, catalog_name: &str) -> Result; async fn load_database(&self, name: &str) -> Result; - async fn list(&self) -> Result, Error>; - async fn get(&self, name: &str) -> Result; async fn delete(&self, name: &str) -> Result<()>; } -#[cfg_attr(test, automock)] #[async_trait] pub trait SchemaStore: Sync + Send { async fn create(&self, catalog_name: &str, schema_name: &str) @@ -179,7 +177,6 @@ pub trait SchemaStore: Sync + Send { async fn delete(&self, catalog_name: &str, schema_name: &str) -> Result<()>; } -#[cfg_attr(test, automock)] #[async_trait] pub trait TableStore: Sync + Send { async fn create( @@ -248,7 +245,6 @@ pub trait TableStore: Sync + Send { async fn delete_dropped_table(&self, uuid: Uuid) -> Result<()>; } -#[cfg_attr(test, automock)] #[async_trait] pub trait FunctionStore: Sync + Send { async fn create( @@ -394,13 +390,6 @@ impl CatalogStore for DefaultCatalog { }) } - async fn list(&self) -> Result, Error> { - match self.repository.list_databases().await { - Ok(databases) => Ok(databases), - Err(e) => Err(Self::to_sqlx_error(e)), - } - } - async fn get(&self, name: &str) -> Result { match self.repository.get_database(name).await { Ok(database) => Ok(database), diff --git a/src/config/context.rs b/src/config/context.rs index e4d2930c..7fdaf99e 100644 --- a/src/config/context.rs +++ b/src/config/context.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::sync::Arc; use crate::{ @@ -26,7 +25,6 @@ use datafusion_remote_tables::factory::RemoteTableFactory; #[cfg(feature = "object-store-s3")] use object_store::aws::AmazonS3Builder; use object_store::gcp::GoogleCloudStorageBuilder; -use parking_lot::lock_api::RwLock; use super::schema::{self, GCS, MEBIBYTES, MEMORY_FRACTION, S3}; @@ -199,14 +197,6 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result = metastore - .catalogs - .list() - .await? - .iter() - .map(|db| db.name.clone()) - .collect(); - // Convergence doesn't support connecting to different DB names. We are supposed // to do one context per query (as we need to load the schema before executing every // query) and per database (since the context is supposed to be limited to the database @@ -218,7 +208,6 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result, pub internal_object_store: Arc, pub database: String, - pub all_databases: Arc>>, pub max_partition_size: u32, } impl SeafowlContext { // Create a new `SeafowlContext` with a new inner context scoped to a different default DB pub async fn scope_to_database(&self, name: String) -> Result> { - // TODO: do we need this? The only goal here is to keep the list of databases in memory and - // fail early if one is missing, but we can defer that to resolution time. - let maybe_database = self.all_databases.read().get(&name).cloned(); - let all_databases = self.all_databases.clone(); - if maybe_database.is_none() { - // Perhaps the db was created on another node; try to reload from catalog - let _ = self.metastore.catalogs.get(&name).await.map_err(|_| { - DataFusionError::Plan(format!( - "Unknown database {name}; try creating one with CREATE DATABASE first" - )) - })?; - all_databases.write().insert(name.clone()); - }; - - // Swap the default database in the new internal context's session config + // Swap the default catalog in the new internal context's session config let session_config = self .inner() .copied_config() @@ -62,7 +45,6 @@ impl SeafowlContext { metastore: self.metastore.clone(), internal_object_store: self.internal_object_store.clone(), database: name, - all_databases, max_partition_size: self.max_partition_size, })) } diff --git a/src/context/physical.rs b/src/context/physical.rs index c612ab17..5ff5bfb6 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -202,9 +202,6 @@ impl SeafowlContext { .create(catalog_name, DEFAULT_SCHEMA) .await?; - // Update the shared in-memory map of DB names -> ids - self.all_databases.write().insert(catalog_name.clone()); - Ok(make_dummy_exec()) } LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable { diff --git a/src/frontend/http.rs b/src/frontend/http.rs index cd1564af..c55e4b21 100644 --- a/src/frontend/http.rs +++ b/src/frontend/http.rs @@ -1023,7 +1023,7 @@ pub mod tests { assert_eq!(resp.status(), StatusCode::BAD_REQUEST); assert_eq!( resp.body(), - "Error during planning: Unknown database missing_db; try creating one with CREATE DATABASE first" + "Error during planning: Database \"missing_db\" doesn't exist" ); } diff --git a/src/repository/default.rs b/src/repository/default.rs index 57c4c49d..4d3e705d 100644 --- a/src/repository/default.rs +++ b/src/repository/default.rs @@ -65,18 +65,6 @@ impl Repository for $repo { .expect("error running migrations"); } - async fn get_collections_in_database( - &self, - database_id: DatabaseId, - ) -> Result, Error> { - let names = sqlx::query("SELECT name FROM collection WHERE database_id = $1") - .bind(database_id) - .fetch(&self.executor) - .map_ok(|row| row.get("name")) - .try_collect() - .await.map_err($repo::interpret_error)?; - Ok(names) - } async fn get_all_columns_in_database( &self, name: &str, @@ -181,13 +169,6 @@ impl Repository for $repo { Ok(table) } - async fn list_databases(&self) -> Result> { - sqlx::query_as(r#"SELECT name, id FROM database"#) - .fetch_all(&self.executor) - .await.map_err($repo::interpret_error) - - } - async fn create_collection( &self, database_id: DatabaseId, diff --git a/src/repository/interface.rs b/src/repository/interface.rs index d14c529a..35984d3a 100644 --- a/src/repository/interface.rs +++ b/src/repository/interface.rs @@ -36,13 +36,6 @@ pub struct TableRecord { pub name: String, } -#[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] -pub struct TableVersion { - pub id: TableVersionId, - pub table_id: TableId, - pub creation_time: Timestamp, -} - #[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] pub struct AllDatabaseColumnsResult { pub database_name: String, @@ -120,18 +113,11 @@ pub type Result = std::result::Result; pub trait Repository: Send + Sync + Debug { async fn setup(&self); - async fn get_collections_in_database( - &self, - database_id: DatabaseId, - ) -> Result, Error>; - async fn get_all_columns_in_database( &self, name: &str, ) -> Result, Error>; - async fn list_databases(&self) -> Result, Error>; - async fn get_database(&self, name: &str) -> Result; async fn get_collection( @@ -289,7 +275,7 @@ pub mod tests { } pub async fn run_generic_repository_tests(repository: Arc) { - test_get_collections_empty(repository.clone()).await; + test_get_tables_empty(repository.clone()).await; let (database_id, table_id, table_version_id) = test_create_database_collection_table(repository.clone()).await; test_create_functions(repository.clone(), database_id).await; @@ -303,13 +289,13 @@ pub mod tests { test_error_propagation(repository, table_id).await; } - async fn test_get_collections_empty(repository: Arc) { + async fn test_get_tables_empty(repository: Arc) { assert_eq!( repository - .get_collections_in_database(0) + .get_all_columns_in_database(TEST_DB) .await .expect("error getting collections"), - Vec::::new() + Vec::::new() ); }