diff --git a/Cargo.lock b/Cargo.lock index f0cc1638..d49f1f27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,7 +410,7 @@ dependencies = [ "anstyle", "bstr", "doc-comment", - "predicates 3.0.4", + "predicates", "predicates-core", "predicates-tree", "wait-timeout", @@ -1994,12 +1994,6 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" -[[package]] -name = "downcast" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" - [[package]] name = "dynamodb_lock" version = "0.6.1" @@ -2253,15 +2247,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "float-cmp" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" -dependencies = [ - "num-traits", -] - [[package]] name = "flume" version = "0.11.0" @@ -2303,12 +2288,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fragile" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" - [[package]] name = "frunk" version = "0.4.2" @@ -3433,33 +3412,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "mockall" -version = "0.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96" -dependencies = [ - "cfg-if", - "downcast", - "fragile", - "lazy_static", - "mockall_derive", - "predicates 2.1.5", - "predicates-tree", -] - -[[package]] -name = "mockall_derive" -version = "0.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb" -dependencies = [ - "cfg-if", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "moka" version = "0.11.3" @@ -3630,12 +3582,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "normalize-line-endings" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" - [[package]] name = "ntapi" version = "0.4.1" @@ -4248,20 +4194,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "predicates" -version = "2.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" -dependencies = [ - "difflib", - "float-cmp", - "itertools 0.10.5", - "normalize-line-endings", - "predicates-core", - "regex", -] - [[package]] name = "predicates" version = "3.0.4" @@ -5294,7 +5226,6 @@ dependencies = [ "itertools 0.11.0", "lazy_static", "log", - "mockall", "moka", "object_store", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index c0d6709a..adecb626 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,6 @@ wasmtime-wasi = "14.0.0" [dev-dependencies] assert_cmd = "2" assert_unordered = "0.3" -mockall = "0.11.1" rstest = "*" serial_test = "2" wiremock = "0.5" diff --git a/src/catalog.rs b/src/catalog.rs index f8262e6b..56b4a243 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -8,25 +8,25 @@ use datafusion::datasource::TableProvider; use datafusion::error::DataFusionError; use deltalake::DeltaTable; use itertools::Itertools; -#[cfg(test)] -use mockall::automock; use parking_lot::RwLock; use uuid::Uuid; use crate::object_store::wrapped::InternalObjectStore; use crate::provider::SeafowlFunction; -use crate::repository::interface::{DroppedTableDeletionStatus, DroppedTablesResult}; +use crate::repository::interface::{ + DatabaseRecord, DroppedTableDeletionStatus, DroppedTablesResult, TableRecord, +}; use crate::system_tables::SystemSchemaProvider; use crate::wasm_udf::data_types::{ CreateFunctionDataType, CreateFunctionDetails, CreateFunctionLanguage, CreateFunctionVolatility, }; use crate::{ - data_types::{CollectionId, DatabaseId, FunctionId, TableId, TableVersionId}, - provider::{SeafowlCollection, SeafowlDatabase}, + provider::{SeafowlDatabase, SeafowlSchema}, repository::interface::{ - AllDatabaseColumnsResult, AllDatabaseFunctionsResult, Error as RepositoryError, - Repository, TableVersionsResult, + AllDatabaseColumnsResult, AllDatabaseFunctionsResult, CollectionId, + CollectionRecord, DatabaseId, Error as RepositoryError, FunctionId, Repository, + TableId, TableVersionId, TableVersionsResult, }, }; @@ -36,13 +36,13 @@ pub const STAGING_SCHEMA: &str = "staging"; #[derive(Debug)] pub enum Error { - DatabaseDoesNotExist { id: DatabaseId }, - CollectionDoesNotExist { id: CollectionId }, - TableDoesNotExist { id: TableId }, + CatalogDoesNotExist { name: String }, + SchemaDoesNotExist { name: String }, + TableDoesNotExist { name: String }, TableUuidDoesNotExist { uuid: Uuid }, TableAlreadyExists { name: String }, - DatabaseAlreadyExists { name: String }, - CollectionAlreadyExists { name: String }, + CatalogAlreadyExists { name: String }, + SchemaAlreadyExists { name: String }, FunctionAlreadyExists { name: String }, FunctionDeserializationError { reason: String }, FunctionNotFound { names: String }, @@ -93,20 +93,17 @@ pub type Result = std::result::Result; impl From for DataFusionError { fn from(val: Error) -> Self { match val { - // These errors are raised by routines that already take an ID instead of - // a database/schema/table name and so the ID is supposed to be valid. An error - // in this case is an internal consistency issue. - Error::DatabaseDoesNotExist { id } => { - DataFusionError::Internal(format!("Database with ID {id} doesn't exist")) + Error::CatalogDoesNotExist { name } => { + DataFusionError::Plan(format!("Database {name:?} doesn't exist")) } - Error::CollectionDoesNotExist { id } => { - DataFusionError::Internal(format!("Schema with ID {id} doesn't exist")) + Error::SchemaDoesNotExist { name } => { + DataFusionError::Plan(format!("Schema {name:?} doesn't exist")) } - Error::TableDoesNotExist { id } => { - DataFusionError::Internal(format!("Table with ID {id} doesn't exist")) + Error::TableDoesNotExist { name } => { + DataFusionError::Plan(format!("Table {name:?} doesn't exist")) } Error::TableUuidDoesNotExist { uuid } => { - DataFusionError::Internal(format!("Table with UUID {uuid} doesn't exist")) + DataFusionError::Plan(format!("Table with UUID {uuid} doesn't exist")) } Error::FunctionDeserializationError { reason } => DataFusionError::Internal( format!("Error deserializing function: {reason:?}"), @@ -120,10 +117,10 @@ impl From for DataFusionError { Error::TableAlreadyExists { name } => { DataFusionError::Plan(format!("Table {name:?} already exists")) } - Error::DatabaseAlreadyExists { name } => { + Error::CatalogAlreadyExists { name } => { DataFusionError::Plan(format!("Database {name:?} already exists")) } - Error::CollectionAlreadyExists { name } => { + Error::SchemaAlreadyExists { name } => { DataFusionError::Plan(format!("Schema {name:?} already exists")) } Error::FunctionAlreadyExists { name } => { @@ -137,81 +134,254 @@ impl From for DataFusionError { .to_string(), ), // Miscellaneous sqlx error. We want to log it but it's not worth showing to the user. - Error::SqlxError(e) => DataFusionError::Internal(format!( - "Internal SQL error: {:?}", - e.to_string() - )), + Error::SqlxError(e) => { + DataFusionError::Plan(format!("Internal SQL error: {:?}", e.to_string())) + } } } } -#[cfg_attr(test, automock)] -#[async_trait] -pub trait TableCatalog: Sync + Send { - async fn load_database(&self, id: DatabaseId) -> Result; - async fn load_database_ids(&self) -> Result>; - async fn get_database_id_by_name( - &self, - database_name: &str, - ) -> Result>; - async fn get_collection_id_by_name( +// This is the main entrypoint to all individual catalogs for various objects types. +// The intention is to make it extensible and de-coupled from the underlying metastore +// persistence mechanism (such as the presently used `Repository`). +#[derive(Clone)] +pub struct Metastore { + pub catalogs: Arc, + pub schemas: Arc, + pub tables: Arc, + pub functions: Arc, + staging_schema: Arc, + object_store: Arc, +} + +pub struct RepositoryStore { + pub repository: Arc, +} + +impl Metastore { + pub fn new_from_repository( + repository: Arc, + object_store: Arc, + ) -> Self { + let repository_store = Arc::new(RepositoryStore { repository }); + + let staging_schema = Arc::new(MemorySchemaProvider::new()); + Self { + catalogs: repository_store.clone(), + schemas: repository_store.clone(), + tables: repository_store.clone(), + functions: repository_store, + staging_schema, + object_store, + } + } + + pub async fn build_catalog(&self, catalog_name: &str) -> Result { + let all_columns = self.schemas.list(catalog_name).await?; + + // NB we can't distinguish between a database without tables and a database + // that doesn't exist at all due to our query. + + // Turn the list of all collections, tables and their columns into a nested map. + + let schemas: HashMap, Arc> = all_columns + .iter() + .group_by(|col| &col.collection_name) + .into_iter() + .map(|(cn, cc)| self.build_schema(cn, cc)) + .collect(); + + let name: Arc = Arc::from(catalog_name); + + Ok(SeafowlDatabase { + name: name.clone(), + schemas, + staging_schema: self.staging_schema.clone(), + system_schema: Arc::new(SystemSchemaProvider::new(name, self.tables.clone())), + }) + } + + fn build_schema<'a, I>( &self, - database_name: &str, collection_name: &str, - ) -> Result>; - async fn get_table_id_by_name( + collection_columns: I, + ) -> (Arc, Arc) + where + I: Iterator, + { + let tables = collection_columns + .filter_map(|col| { + if let Some(table_name) = &col.table_name + && let Some(table_uuid) = col.table_uuid + { + Some(self.build_table(table_name, table_uuid)) + } else { + None + } + }) + .collect::>(); + + ( + Arc::from(collection_name.to_string()), + Arc::new(SeafowlSchema { + name: Arc::from(collection_name.to_string()), + tables: RwLock::new(tables), + }), + ) + } + + fn build_table( &self, - database_name: &str, - collection_name: &str, table_name: &str, - ) -> Result>; + table_uuid: Uuid, + ) -> (Arc, Arc) { + // Build a delta table but don't load it yet; we'll do that only for tables that are + // actually referenced in a statement, via the async `table` method of the schema provider. + // TODO: this means that any `information_schema.columns` query will serially load all + // delta tables present in the database. The real fix for this is to make DF use `TableSource` + // for the information schema, and then implement `TableSource` for `DeltaTable` in delta-rs. + let table_log_store = self.object_store.get_log_store(table_uuid); - async fn create_database(&self, database_name: &str) -> Result; + let table = DeltaTable::new(table_log_store, Default::default()); + (Arc::from(table_name.to_string()), Arc::new(table) as _) + } - async fn create_collection( + pub async fn build_functions( &self, - database_id: DatabaseId, - collection_name: &str, - ) -> Result; + catalog_name: &str, + ) -> Result> { + let functions = self.functions.list(catalog_name).await?; + + functions + .iter() + .map(|item| { + Self::parse_create_function_details(item) + .map(|details| SeafowlFunction { + function_id: item.id, + name: item.name.to_owned(), + details, + }) + .map_err(|e| Error::FunctionDeserializationError { + reason: e.message, + }) + }) + .collect::>>() + } + + fn parse_create_function_details( + item: &AllDatabaseFunctionsResult, + ) -> std::result::Result { + let AllDatabaseFunctionsResult { + id: _, + name: _, + entrypoint, + language, + input_types, + return_type, + data, + volatility, + } = item; - async fn create_table( + Ok(CreateFunctionDetails { + entrypoint: entrypoint.to_string(), + language: CreateFunctionLanguage::from_str(language.as_str())?, + input_types: serde_json::from_str::>( + input_types, + )?, + return_type: CreateFunctionDataType::from_str( + &return_type.as_str().to_ascii_uppercase(), + )?, + data: data.to_string(), + volatility: CreateFunctionVolatility::from_str(volatility.as_str())?, + }) + } +} + +#[async_trait] +pub trait CatalogStore: Sync + Send { + async fn create(&self, name: &str) -> Result; + + async fn get(&self, name: &str) -> Result; + + async fn delete(&self, name: &str) -> Result<()>; +} + +#[async_trait] +pub trait SchemaStore: Sync + Send { + async fn create(&self, catalog_name: &str, schema_name: &str) + -> Result; + + async fn list( + &self, + catalog_name: &str, + ) -> Result, Error>; + + async fn get( + &self, + catalog_name: &str, + schema_name: &str, + ) -> Result; + + async fn delete(&self, catalog_name: &str, schema_name: &str) -> Result<()>; +} + +#[async_trait] +pub trait TableStore: Sync + Send { + async fn create( &self, - collection_id: CollectionId, + catalog_name: &str, + schema_name: &str, table_name: &str, schema: &Schema, uuid: Uuid, ) -> Result<(TableId, TableVersionId)>; - async fn delete_old_table_versions(&self, table_id: TableId) -> Result; + async fn get( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> Result; - async fn create_new_table_version( + async fn create_new_version( &self, uuid: Uuid, version: i64, ) -> Result; - async fn get_all_table_versions( + async fn delete_old_versions( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> Result; + + async fn get_all_versions( &self, - database_name: &str, + catalog_name: &str, table_names: Option>, ) -> Result>; - async fn move_table( + async fn update( &self, - table_id: TableId, + old_catalog_name: &str, + old_schema_name: &str, + old_table_name: &str, + new_catalog_name: &str, + new_schema_name: &str, new_table_name: &str, - new_collection_id: Option, ) -> Result<()>; - async fn drop_table(&self, table_id: TableId) -> Result<()>; - - async fn drop_collection(&self, collection_id: CollectionId) -> Result<()>; - - async fn drop_database(&self, database_id: DatabaseId) -> Result<()>; + async fn delete( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> Result<()>; async fn get_dropped_tables( &self, - database_name: Option, + catalog_name: Option, ) -> Result>; async fn update_dropped_table( @@ -223,310 +393,274 @@ pub trait TableCatalog: Sync + Send { async fn delete_dropped_table(&self, uuid: Uuid) -> Result<()>; } -#[cfg_attr(test, automock)] #[async_trait] -pub trait FunctionCatalog: Sync + Send { - async fn create_function( +pub trait FunctionStore: Sync + Send { + async fn create( &self, - database_id: DatabaseId, + catalog_name: &str, function_name: &str, or_replace: bool, details: &CreateFunctionDetails, ) -> Result; - async fn get_all_functions_in_database( - &self, - database_id: DatabaseId, - ) -> Result>; + async fn list(&self, catalog_name: &str) -> Result>; - async fn drop_function( + async fn delete( &self, - database_id: DatabaseId, + catalog_name: &str, if_exists: bool, func_names: &[String], ) -> Result<()>; } -#[derive(Clone)] -pub struct DefaultCatalog { - repository: Arc, - - // DataFusion's in-memory schema provider for staging external tables - staging_schema: Arc, - object_store: Arc, -} - -impl DefaultCatalog { - pub fn new( - repository: Arc, - object_store: Arc, - ) -> Self { - let staging_schema = Arc::new(MemorySchemaProvider::new()); - Self { - repository, - staging_schema, - object_store, - } - } - - fn to_sqlx_error(error: RepositoryError) -> Error { - Error::SqlxError(match error { +impl From for Error { + fn from(err: RepositoryError) -> Error { + Error::SqlxError(match err { RepositoryError::UniqueConstraintViolation(e) => e, RepositoryError::FKConstraintViolation(e) => e, RepositoryError::SqlxError(e) => e, }) } - - fn build_table( - &self, - table_name: &str, - table_uuid: Uuid, - ) -> (Arc, Arc) { - // Build a delta table but don't load it yet; we'll do that only for tables that are - // actually referenced in a statement, via the async `table` method of the schema provider. - // TODO: this means that any `information_schema.columns` query will serially load all - // delta tables present in the database. The real fix for this is to make DF use `TableSource` - // for the information schema, and then implement `TableSource` for `DeltaTable` in delta-rs. - let table_log_store = self.object_store.get_log_store(table_uuid); - - let table = DeltaTable::new(table_log_store, Default::default()); - (Arc::from(table_name.to_string()), Arc::new(table) as _) - } - - fn build_collection<'a, I>( - &self, - collection_name: &str, - collection_columns: I, - ) -> (Arc, Arc) - where - I: Iterator, - { - let tables = collection_columns - .filter_map(|col| { - if let Some(table_name) = &col.table_name - && let Some(table_uuid) = col.table_uuid - { - Some(self.build_table(table_name, table_uuid)) - } else { - None - } - }) - .collect::>(); - - ( - Arc::from(collection_name.to_string()), - Arc::new(SeafowlCollection { - name: Arc::from(collection_name.to_string()), - tables: RwLock::new(tables), - }), - ) - } } #[async_trait] -impl TableCatalog for DefaultCatalog { - async fn load_database(&self, database_id: DatabaseId) -> Result { - let all_columns = self - .repository - .get_all_columns_in_database(database_id) - .await - .map_err(Self::to_sqlx_error)?; - - // NB we can't distinguish between a database without tables and a database - // that doesn't exist at all due to our query. - - // Turn the list of all collections, tables and their columns into a nested map. - - let collections: HashMap, Arc> = all_columns - .iter() - .group_by(|col| &col.collection_name) - .into_iter() - .map(|(cn, cc)| self.build_collection(cn, cc)) - .collect(); - - // TODO load the database name too - let name: Arc = Arc::from(DEFAULT_DB); - Ok(SeafowlDatabase { - name: name.clone(), - collections, - staging_schema: self.staging_schema.clone(), - system_schema: Arc::new(SystemSchemaProvider::new( - name, - Arc::new(self.clone()), - )), - }) - } - - async fn load_database_ids(&self) -> Result> { - let all_db_ids = self - .repository - .get_all_database_ids() +impl CatalogStore for RepositoryStore { + async fn create(&self, name: &str) -> Result { + self.repository + .create_database(name) .await - .map_err(Self::to_sqlx_error)?; - - Ok(HashMap::from_iter(all_db_ids)) + .map_err(|e| match e { + RepositoryError::UniqueConstraintViolation(_) => { + Error::CatalogAlreadyExists { + name: name.to_string(), + } + } + e => e.into(), + }) } - async fn create_table( - &self, - collection_id: CollectionId, - table_name: &str, - schema: &Schema, - uuid: Uuid, - ) -> Result<(TableId, TableVersionId)> { + async fn get(&self, name: &str) -> Result { self.repository - .create_table(collection_id, table_name, schema, uuid) + .get_database(name) .await .map_err(|e| match e { - RepositoryError::UniqueConstraintViolation(_) => { - Error::TableAlreadyExists { - name: table_name.to_string(), + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + Error::CatalogDoesNotExist { + name: name.to_string(), } } - RepositoryError::FKConstraintViolation(_) => { - Error::CollectionDoesNotExist { id: collection_id } - } - RepositoryError::SqlxError(e) => Error::SqlxError(e), + e => e.into(), }) } - async fn delete_old_table_versions(&self, table_id: TableId) -> Result { + async fn delete(&self, name: &str) -> Result<()> { + let database = CatalogStore::get(self, name).await?; + self.repository - .delete_old_table_versions(table_id) + .delete_database(database.id) .await - .map_err(Self::to_sqlx_error) + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + Error::CatalogDoesNotExist { + name: name.to_string(), + } + } + e => e.into(), + }) } +} - async fn get_collection_id_by_name( +#[async_trait] +impl SchemaStore for RepositoryStore { + async fn create( &self, - database_name: &str, - collection_name: &str, - ) -> Result> { - if collection_name == STAGING_SCHEMA { + catalog_name: &str, + schema_name: &str, + ) -> Result { + if schema_name == STAGING_SCHEMA { return Err(Error::UsedStagingSchema); } - match self - .repository - .get_collection_id_by_name(database_name, collection_name) + let database = CatalogStore::get(self, catalog_name).await?; + + self.repository + .create_collection(database.id, schema_name) .await - { - Ok(id) => Ok(Some(id)), - Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => Ok(None), - Err(e) => Err(Self::to_sqlx_error(e)), - } + .map_err(|e| match e { + RepositoryError::UniqueConstraintViolation(_) => { + Error::SchemaAlreadyExists { + name: schema_name.to_string(), + } + } + e => e.into(), + }) } - async fn get_database_id_by_name( + async fn list( &self, - database_name: &str, - ) -> Result> { - match self.repository.get_database_id_by_name(database_name).await { - Ok(id) => Ok(Some(id)), - Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => Ok(None), - Err(e) => Err(Self::to_sqlx_error(e)), - } + catalog_name: &str, + ) -> Result, Error> { + Ok(self.repository.list_collections(catalog_name).await?) } - async fn get_table_id_by_name( + async fn get( &self, - database_name: &str, - collection_name: &str, - table_name: &str, - ) -> Result> { - if collection_name == STAGING_SCHEMA { + catalog_name: &str, + schema_name: &str, + ) -> Result { + if schema_name == STAGING_SCHEMA { return Err(Error::UsedStagingSchema); } - match self - .repository - .get_table_id_by_name(database_name, collection_name, table_name) + self.repository + .get_collection(catalog_name, schema_name) .await - { - Ok(id) => Ok(Some(id)), - Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => Ok(None), - Err(e) => Err(Self::to_sqlx_error(e)), - } + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + Error::SchemaDoesNotExist { + name: schema_name.to_string(), + } + } + e => e.into(), + }) } - async fn create_database(&self, database_name: &str) -> Result { + async fn delete(&self, catalog_name: &str, schema_name: &str) -> Result<()> { + let schema = SchemaStore::get(self, catalog_name, schema_name).await?; + self.repository - .create_database(database_name) + .drop_collection(schema.id) .await .map_err(|e| match e { - RepositoryError::UniqueConstraintViolation(_) => { - Error::DatabaseAlreadyExists { - name: database_name.to_string(), + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + Error::SchemaDoesNotExist { + name: schema_name.to_string(), } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } +} - async fn create_collection( +#[async_trait] +impl TableStore for RepositoryStore { + async fn create( &self, - database_id: DatabaseId, - collection_name: &str, - ) -> Result { - if collection_name == STAGING_SCHEMA { - return Err(Error::UsedStagingSchema); - } + catalog_name: &str, + schema_name: &str, + table_name: &str, + schema: &Schema, + uuid: Uuid, + ) -> Result<(TableId, TableVersionId)> { + let collection = SchemaStore::get(self, catalog_name, schema_name).await?; self.repository - .create_collection(database_id, collection_name) + .create_table(collection.id, table_name, schema, uuid) .await .map_err(|e| match e { RepositoryError::UniqueConstraintViolation(_) => { - Error::CollectionAlreadyExists { - name: collection_name.to_string(), + Error::TableAlreadyExists { + name: table_name.to_string(), } } - _ => Self::to_sqlx_error(e), + RepositoryError::FKConstraintViolation(_) => Error::SchemaDoesNotExist { + name: schema_name.to_string(), + }, + RepositoryError::SqlxError(e) => Error::SqlxError(e), }) } - async fn create_new_table_version( + async fn get( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> Result { + self.repository + .get_table(catalog_name, schema_name, table_name) + .await + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + Error::TableDoesNotExist { + name: table_name.to_string(), + } + } + e => e.into(), + }) + } + + async fn create_new_version( &self, uuid: Uuid, version: i64, ) -> Result { self.repository - .create_new_table_version(uuid, version) + .create_new_version(uuid, version) .await .map_err(|e| match e { RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { Error::TableUuidDoesNotExist { uuid } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } - async fn get_all_table_versions( + async fn delete_old_versions( &self, - database_name: &str, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> Result { + let table = TableStore::get(self, catalog_name, schema_name, table_name).await?; + + Ok(self.repository.delete_old_versions(table.id).await?) + } + + async fn get_all_versions( + &self, + catalog_name: &str, table_names: Option>, ) -> Result> { - self.repository - .get_all_table_versions(database_name, table_names) - .await - .map_err(Self::to_sqlx_error) + Ok(self + .repository + .get_all_versions(catalog_name, table_names) + .await?) } - async fn move_table( + async fn update( &self, - table_id: TableId, + old_catalog_name: &str, + old_schema_name: &str, + old_table_name: &str, + new_catalog_name: &str, + new_schema_name: &str, new_table_name: &str, - new_collection_id: Option, ) -> Result<()> { + assert_eq!( + old_catalog_name, new_catalog_name, + "Moving across catalogs not yet supported" + ); + + let table = + TableStore::get(self, old_catalog_name, old_schema_name, old_table_name) + .await?; + let new_schema_id = if new_schema_name != old_schema_name { + let schema = + SchemaStore::get(self, old_catalog_name, new_schema_name).await?; + Some(schema.id) + } else { + None + }; + self.repository - .move_table(table_id, new_table_name, new_collection_id) + .rename_table(table.id, new_table_name, new_schema_id) .await .map_err(|e| match e { RepositoryError::FKConstraintViolation(_) => { // We only FK on collection_id, so this will be Some - Error::CollectionDoesNotExist { - id: new_collection_id.unwrap(), + Error::SchemaDoesNotExist { + name: new_schema_name.to_string(), } } RepositoryError::UniqueConstraintViolation(_) => { @@ -535,56 +669,40 @@ impl TableCatalog for DefaultCatalog { } } RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::TableDoesNotExist { id: table_id } - } - _ => Self::to_sqlx_error(e), - }) - } - - async fn drop_table(&self, table_id: TableId) -> Result<()> { - self.repository - .drop_table(table_id) - .await - .map_err(|e| match e { - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::TableDoesNotExist { id: table_id } + Error::TableDoesNotExist { + name: old_table_name.to_string(), + } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } - async fn drop_collection(&self, collection_id: CollectionId) -> Result<()> { - self.repository - .drop_collection(collection_id) - .await - .map_err(|e| match e { - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::CollectionDoesNotExist { id: collection_id } - } - _ => Self::to_sqlx_error(e), - }) - } + async fn delete( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> Result<()> { + let table = TableStore::get(self, catalog_name, schema_name, table_name).await?; - async fn drop_database(&self, database_id: DatabaseId) -> Result<()> { self.repository - .drop_database(database_id) + .drop_table(table.id) .await .map_err(|e| match e { RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::DatabaseDoesNotExist { id: database_id } + Error::TableDoesNotExist { + name: table_name.to_string(), + } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } async fn get_dropped_tables( &self, - database_name: Option, + catalog_name: Option, ) -> Result> { - self.repository - .get_dropped_tables(database_name) - .await - .map_err(Self::to_sqlx_error) + Ok(self.repository.get_dropped_tables(catalog_name).await?) } async fn update_dropped_table( @@ -599,7 +717,7 @@ impl TableCatalog for DefaultCatalog { RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { Error::TableUuidDoesNotExist { uuid } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } @@ -611,102 +729,62 @@ impl TableCatalog for DefaultCatalog { RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { Error::TableUuidDoesNotExist { uuid } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } } -impl DefaultCatalog { - fn parse_create_function_details( - item: &AllDatabaseFunctionsResult, - ) -> std::result::Result { - let AllDatabaseFunctionsResult { - id: _, - name: _, - entrypoint, - language, - input_types, - return_type, - data, - volatility, - } = item; - - Ok(CreateFunctionDetails { - entrypoint: entrypoint.to_string(), - language: CreateFunctionLanguage::from_str(language.as_str())?, - input_types: serde_json::from_str::>( - input_types, - )?, - return_type: CreateFunctionDataType::from_str( - &return_type.as_str().to_ascii_uppercase(), - )?, - data: data.to_string(), - volatility: CreateFunctionVolatility::from_str(volatility.as_str())?, - }) - } -} - #[async_trait] -impl FunctionCatalog for DefaultCatalog { - async fn create_function( +impl FunctionStore for RepositoryStore { + async fn create( &self, - database_id: DatabaseId, + catalog_name: &str, function_name: &str, or_replace: bool, details: &CreateFunctionDetails, ) -> Result { + let database = CatalogStore::get(self, catalog_name).await?; + self.repository - .create_function(database_id, function_name, or_replace, details) + .create_function(database.id, function_name, or_replace, details) .await .map_err(|e| match e { - RepositoryError::FKConstraintViolation(_) => { - Error::DatabaseDoesNotExist { id: database_id } - } + RepositoryError::FKConstraintViolation(_) => Error::CatalogDoesNotExist { + name: catalog_name.to_string(), + }, RepositoryError::UniqueConstraintViolation(_) => { Error::FunctionAlreadyExists { name: function_name.to_string(), } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } - async fn get_all_functions_in_database( - &self, - database_id: DatabaseId, - ) -> Result> { - let all_functions = self - .repository - .get_all_functions_in_database(database_id) - .await - .map_err(Self::to_sqlx_error)?; + async fn list(&self, catalog_name: &str) -> Result> { + let database = CatalogStore::get(self, catalog_name).await?; - all_functions - .iter() - .map(|item| { - Self::parse_create_function_details(item) - .map(|details| SeafowlFunction { - function_id: item.id, - name: item.name.to_owned(), - details, - }) - .map_err(|e| Error::FunctionDeserializationError { - reason: e.message, - }) - }) - .collect::>>() + Ok(self + .repository + .get_all_functions_in_database(database.id) + .await?) } - async fn drop_function( + async fn delete( &self, - database_id: DatabaseId, + catalog_name: &str, if_exists: bool, + func_names: &[String], ) -> Result<()> { - match self.repository.drop_function(database_id, func_names).await { + let database = CatalogStore::get(self, catalog_name).await?; + + match self.repository.drop_function(database.id, func_names).await { Ok(id) => Ok(id), Err(RepositoryError::FKConstraintViolation(_)) => { - Err(Error::DatabaseDoesNotExist { id: database_id }) + Err(Error::CatalogDoesNotExist { + name: catalog_name.to_string(), + }) } Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => { if if_exists { @@ -717,7 +795,7 @@ impl FunctionCatalog for DefaultCatalog { }) } } - Err(e) => Err(Self::to_sqlx_error(e)), + Err(e) => Err(e.into()), } } } diff --git a/src/config/context.rs b/src/config/context.rs index e2942fc0..e43bde77 100644 --- a/src/config/context.rs +++ b/src/config/context.rs @@ -1,9 +1,7 @@ use std::sync::Arc; use crate::{ - catalog::{ - DefaultCatalog, FunctionCatalog, TableCatalog, DEFAULT_DB, DEFAULT_SCHEMA, - }, + catalog::{DEFAULT_DB, DEFAULT_SCHEMA}, context::SeafowlContext, repository::{interface::Repository, sqlite::SqliteRepository}, }; @@ -19,6 +17,7 @@ use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; #[cfg(feature = "catalog-postgres")] use crate::repository::postgres::PostgresRepository; +use crate::catalog::{Error, Metastore}; use crate::object_store::http::add_http_object_store; use crate::object_store::wrapped::InternalObjectStore; #[cfg(feature = "remote-tables")] @@ -26,14 +25,13 @@ use datafusion_remote_tables::factory::RemoteTableFactory; #[cfg(feature = "object-store-s3")] use object_store::aws::AmazonS3Builder; use object_store::gcp::GoogleCloudStorageBuilder; -use parking_lot::lock_api::RwLock; use super::schema::{self, GCS, MEBIBYTES, MEMORY_FRACTION, S3}; -async fn build_catalog( +async fn build_metastore( config: &schema::SeafowlConfig, object_store: Arc, -) -> (Arc, Arc) { +) -> Metastore { // Initialize the repository let repository: Arc = match &config.catalog { #[cfg(feature = "catalog-postgres")] @@ -62,9 +60,7 @@ async fn build_catalog( ), }; - let catalog = Arc::new(DefaultCatalog::new(repository, object_store)); - - (catalog.clone(), catalog) + Metastore::new_from_repository(repository, object_store) } pub fn build_object_store( @@ -179,23 +175,20 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result id, - None => tables.create_database(DEFAULT_DB).await.unwrap(), - }; - - match tables - .get_collection_id_by_name(DEFAULT_DB, DEFAULT_SCHEMA) - .await? + if let Err(Error::CatalogDoesNotExist { .. }) = + metastore.catalogs.get(DEFAULT_DB).await { - Some(id) => id, - None => tables.create_collection(default_db, DEFAULT_SCHEMA).await?, - }; + metastore.catalogs.create(DEFAULT_DB).await.unwrap(); + } - let all_database_ids = tables.load_database_ids().await?; + if let Err(Error::SchemaDoesNotExist { .. }) = + metastore.schemas.get(DEFAULT_DB, DEFAULT_SCHEMA).await + { + metastore.schemas.create(DEFAULT_DB, DEFAULT_SCHEMA).await?; + } // Convergence doesn't support connecting to different DB names. We are supposed // to do one context per query (as we need to load the schema before executing every @@ -205,12 +198,9 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result, - pub function_catalog: Arc, + pub metastore: Arc, pub internal_object_store: Arc, pub database: String, - pub database_id: DatabaseId, - pub all_database_ids: Arc>>, pub max_partition_size: u32, } impl SeafowlContext { // Create a new `SeafowlContext` with a new inner context scoped to a different default DB - pub async fn scope_to_database(&self, name: String) -> Result> { - let maybe_database_id = self.all_database_ids.read().get(name.as_str()).cloned(); - let database_id = match maybe_database_id { - Some(db_id) => db_id, - None => { - // Perhaps the db was created on another node; try to reload from catalog - let new_db_ids = self.table_catalog.load_database_ids().await?; - new_db_ids - .get(name.as_str()) - .cloned() - .map(|db_id| { - self.all_database_ids.write().insert(name.clone(), db_id); - db_id - }) - .ok_or_else(|| { - DataFusionError::Plan(format!( - "Unknown database {name}; try creating one with CREATE DATABASE first" - )) - })? - } - }; - - // Swap the default database in the new internal context's session config + pub fn scope_to_database(&self, name: String) -> Arc { + // Swap the default catalog in the new internal context's session config let session_config = self .inner() .copied_config() @@ -69,16 +40,13 @@ impl SeafowlContext { let state = build_state_with_table_factories(session_config, self.inner().runtime_env()); - Ok(Arc::from(SeafowlContext { + Arc::from(SeafowlContext { inner: SessionContext::new_with_state(state), - table_catalog: self.table_catalog.clone(), - function_catalog: self.function_catalog.clone(), + metastore: self.metastore.clone(), internal_object_store: self.internal_object_store.clone(), database: name, - database_id, - all_database_ids: self.all_database_ids.clone(), max_partition_size: self.max_partition_size, - })) + }) } pub fn inner(&self) -> &SessionContext { @@ -100,12 +68,12 @@ impl SeafowlContext { self.inner.register_catalog( &self.database, - Arc::new(self.table_catalog.load_database(self.database_id).await?), + Arc::new(self.metastore.build_catalog(&self.database).await?), ); // Register all functions in the database - self.function_catalog - .get_all_functions_in_database(self.database_id) + self.metastore + .build_functions(&self.database) .await? .iter() .try_for_each(|f| self.register_function(&f.name, &f.details)) @@ -251,16 +219,9 @@ pub mod test_utils { // Create new non-default database; we're doing this in catalog only to simulate it taking // place on another node - context - .table_catalog - .create_database("testdb") - .await - .unwrap(); + context.metastore.catalogs.create("testdb").await.unwrap(); - let context = context - .scope_to_database("testdb".to_string()) - .await - .expect("'testdb' should exist"); + let context = context.scope_to_database("testdb".to_string()); // Create new non-default collection context.plan_query("CREATE SCHEMA testcol").await.unwrap(); diff --git a/src/context/physical.rs b/src/context/physical.rs index 96fe633e..5ff5bfb6 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -172,8 +172,9 @@ impl SeafowlContext { )) => { // CREATE SCHEMA // Create a schema and register it - self.table_catalog - .create_collection(self.database_id, schema_name) + self.metastore + .schemas + .create(&self.database, schema_name) .await?; Ok(make_dummy_exec()) } @@ -182,12 +183,7 @@ impl SeafowlContext { if_not_exists, .. })) => { - if self - .table_catalog - .get_database_id_by_name(catalog_name) - .await? - .is_some() - { + if self.metastore.catalogs.get(catalog_name).await.is_ok() { if !*if_not_exists { return Err(DataFusionError::Plan(format!( "Database {catalog_name} already exists" @@ -198,19 +194,14 @@ impl SeafowlContext { } // Persist DB into metadata catalog - let database_id = - self.table_catalog.create_database(catalog_name).await?; + self.metastore.catalogs.create(catalog_name).await?; // Create the corresponding default schema as well - self.table_catalog - .create_collection(database_id, DEFAULT_SCHEMA) + self.metastore + .schemas + .create(catalog_name, DEFAULT_SCHEMA) .await?; - // Update the shared in-memory map of DB names -> ids - self.all_database_ids - .write() - .insert(catalog_name.clone(), database_id); - Ok(make_dummy_exec()) } LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable { @@ -382,8 +373,9 @@ impl SeafowlContext { None, ) .await?; - self.table_catalog - .create_new_table_version(uuid, version) + self.metastore + .tables + .create_new_version(uuid, version) .await?; Ok(make_dummy_exec()) @@ -507,8 +499,9 @@ impl SeafowlContext { None, ) .await?; - self.table_catalog - .create_new_table_version(uuid, version) + self.metastore + .tables + .create_new_version(uuid, version) .await?; Ok(make_dummy_exec()) @@ -527,19 +520,14 @@ impl SeafowlContext { return Ok(make_dummy_exec()); } - let table_id = self - .table_catalog - .get_table_id_by_name( + self.metastore + .tables + .delete( &resolved_ref.catalog, &resolved_ref.schema, &resolved_ref.table, ) - .await? - .ok_or_else(|| { - DataFusionError::Execution(format!("Table {name} not found")) - })?; - - self.table_catalog.drop_table(table_id).await?; + .await?; Ok(make_dummy_exec()) } LogicalPlan::Ddl(DdlStatement::CreateView(_)) => Err(Error::Plan( @@ -586,13 +574,9 @@ impl SeafowlContext { self.register_function(name, details)?; // Persist the function in the metadata storage - self.function_catalog - .create_function( - self.database_id, - name, - *or_replace, - details, - ) + self.metastore + .functions + .create(&self.database, name, *or_replace, details) .await?; Ok(make_dummy_exec()) @@ -602,8 +586,9 @@ impl SeafowlContext { func_names, output_schema: _, }) => { - self.function_catalog - .drop_function(self.database_id, *if_exists, func_names) + self.metastore + .functions + .delete(&self.database, *if_exists, func_names) .await?; Ok(make_dummy_exec()) } @@ -623,65 +608,27 @@ impl SeafowlContext { )); } - // Resolve old table reference and fetch the table id + // Resolve old table reference let old_table_ref = TableReference::from(old_name.as_str()); let resolved_old_ref = old_table_ref.resolve(&self.database, DEFAULT_SCHEMA); - let table_id = self - .table_catalog - .get_table_id_by_name( + // Finally update our catalog entry + self.metastore + .tables + .update( &resolved_old_ref.catalog, &resolved_old_ref.schema, &resolved_old_ref.table, - ) - .await? - .ok_or_else(|| { - DataFusionError::Execution(format!( - "Table {old_name} not found" - )) - })?; - - // If the old and new table schema is different check that the - // corresponding collection already exists - let new_schema_id = - if resolved_new_ref.schema != resolved_old_ref.schema { - let collection_id = self - .table_catalog - .get_collection_id_by_name( - &self.database, - &resolved_new_ref.schema, - ) - .await? - .ok_or_else(|| { - Error::Plan(format!( - "Schema \"{}\" does not exist!", - &resolved_new_ref.schema, - )) - })?; - Some(collection_id) - } else { - None - }; - - // Finally update our catalog entry - self.table_catalog - .move_table( - table_id, + &resolved_new_ref.catalog, + &resolved_new_ref.schema, &resolved_new_ref.table, - new_schema_id, ) .await?; Ok(make_dummy_exec()) } SeafowlExtensionNode::DropSchema(DropSchema { name, .. }) => { - if let Some(collection_id) = self - .table_catalog - .get_collection_id_by_name(&self.database, name) - .await? - { - self.table_catalog.drop_collection(collection_id).await? - }; + self.metastore.schemas.delete(&self.database, name).await?; Ok(make_dummy_exec()) } @@ -697,22 +644,8 @@ impl SeafowlContext { let resolved_ref = table_ref.resolve(&self.database, DEFAULT_SCHEMA); - let table_id = self - .table_catalog - .get_table_id_by_name( - &resolved_ref.catalog, - &resolved_ref.schema, - &resolved_ref.table, - ) - .await? - .ok_or_else(|| { - DataFusionError::Execution( - "Table {table_name} not found".to_string(), - ) - })?; - if let Ok(mut delta_table) = - self.try_get_delta_table(resolved_ref).await + self.try_get_delta_table(resolved_ref.clone()).await { // TODO: The Delta protocol doesn't vacuum old table versions per se, but only files no longer tied to the latest table version. // This means that the VACUUM could be a no-op, for instance, in the case when append-only writes have been performed. @@ -736,8 +669,13 @@ impl SeafowlContext { } match self - .table_catalog - .delete_old_table_versions(table_id) + .metastore + .tables + .delete_old_versions( + &resolved_ref.catalog, + &resolved_ref.schema, + &resolved_ref.table, + ) .await { Ok(row_count) => { @@ -846,8 +784,9 @@ impl SeafowlContext { } None => { // Schema doesn't exist; create one first, and then reload to pick it up - self.table_catalog - .create_collection(self.database_id, &schema_name) + self.metastore + .schemas + .create(&self.database, &schema_name) .await?; self.reload_schema().await?; false diff --git a/src/data_types.rs b/src/data_types.rs deleted file mode 100644 index bb54ab7f..00000000 --- a/src/data_types.rs +++ /dev/null @@ -1,44 +0,0 @@ -pub type DatabaseId = i64; -pub type CollectionId = i64; -pub type TableId = i64; -pub type TableVersionId = i64; -pub type Timestamp = i64; -pub type TableColumnId = i64; -pub type FunctionId = i64; - -// TODO: most of these structs currently aren't used (we use versions -// without IDs since they can be passed to db-writing routines before -// we know the ID) - -pub struct Database { - pub id: DatabaseId, - pub name: String, -} - -pub struct Collection { - pub id: CollectionId, - pub database_id: DatabaseId, - pub name: String, -} - -pub struct Table { - pub id: TableId, - pub collection_id: CollectionId, - pub name: String, -} - -pub struct TableVersion { - pub id: TableVersionId, - pub table_id: TableId, - pub creation_time: Timestamp, - // TODO is_deleted flag? -} - -pub struct TableColumn { - pub id: TableColumnId, - pub table_version_id: TableVersionId, - pub name: String, - // TODO enum? - pub r#type: String, - // TODO ordinal? -} diff --git a/src/frontend/http.rs b/src/frontend/http.rs index cd1564af..65939903 100644 --- a/src/frontend/http.rs +++ b/src/frontend/http.rs @@ -160,7 +160,7 @@ pub async fn uncached_read_write_query( // If a specific DB name was used as a parameter in the route, scope the context to it, // effectively making it the default DB for the duration of the session. if database_name != context.database { - context = context.scope_to_database(database_name).await?; + context = context.scope_to_database(database_name); } let statements = context.parse_query(&query).await?; @@ -325,7 +325,7 @@ pub async fn cached_read_query( // If a specific DB name was used as a parameter in the route, scope the context to it, // effectively making it the default DB for the duration of the session. if database_name != context.database { - context = context.scope_to_database(database_name).await?; + context = context.scope_to_database(database_name); } // Plan the query @@ -383,7 +383,7 @@ pub async fn upload( }; if database_name != context.database { - context = context.scope_to_database(database_name.clone()).await?; + context = context.scope_to_database(database_name.clone()); } let mut has_header = true; @@ -661,10 +661,7 @@ pub mod tests { .await .unwrap(); - context = context - .scope_to_database(db_name.to_string()) - .await - .unwrap(); + context = context.scope_to_database(db_name.to_string()); } context @@ -679,10 +676,7 @@ pub mod tests { if new_db.is_some() { // Re-scope to the original DB - return context - .scope_to_database(DEFAULT_DB.to_string()) - .await - .unwrap(); + return context.scope_to_database(DEFAULT_DB.to_string()); } context @@ -694,10 +688,7 @@ pub mod tests { let mut context = in_memory_context_with_single_table(new_db).await; if let Some(db_name) = new_db { - context = context - .scope_to_database(db_name.to_string()) - .await - .unwrap(); + context = context.scope_to_database(db_name.to_string()); } context @@ -707,10 +698,7 @@ pub mod tests { if new_db.is_some() { // Re-scope to the original DB - return context - .scope_to_database(DEFAULT_DB.to_string()) - .await - .unwrap(); + return context.scope_to_database(DEFAULT_DB.to_string()); } context @@ -1023,7 +1011,7 @@ pub mod tests { assert_eq!(resp.status(), StatusCode::BAD_REQUEST); assert_eq!( resp.body(), - "Error during planning: Unknown database missing_db; try creating one with CREATE DATABASE first" + "Error during planning: Database \"missing_db\" doesn't exist" ); } diff --git a/src/lib.rs b/src/lib.rs index 7123f8f1..1b8c4544 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,6 @@ pub mod catalog; pub mod cli; pub mod config; pub mod context; -pub mod data_types; pub mod datafusion; pub mod delta_rs; pub mod frontend; diff --git a/src/provider.rs b/src/provider.rs index c3f0c724..abe61f2a 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -21,13 +21,13 @@ use deltalake::DeltaTable; use log::warn; use parking_lot::RwLock; -use crate::data_types::FunctionId; +use crate::repository::interface::FunctionId; use crate::system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA}; use crate::{catalog::STAGING_SCHEMA, wasm_udf::data_types::CreateFunctionDetails}; pub struct SeafowlDatabase { pub name: Arc, - pub collections: HashMap, Arc>, + pub schemas: HashMap, Arc>, pub staging_schema: Arc, pub system_schema: Arc, } @@ -38,7 +38,7 @@ impl CatalogProvider for SeafowlDatabase { } fn schema_names(&self) -> Vec { - self.collections + self.schemas .keys() .map(|s| s.to_string()) .chain([STAGING_SCHEMA.to_string(), SYSTEM_SCHEMA.to_string()]) @@ -51,19 +51,19 @@ impl CatalogProvider for SeafowlDatabase { } else if name == SYSTEM_SCHEMA { Some(self.system_schema.clone()) } else { - self.collections.get(name).map(|c| Arc::clone(c) as _) + self.schemas.get(name).map(|c| Arc::clone(c) as _) } } } -pub struct SeafowlCollection { +pub struct SeafowlSchema { pub name: Arc, // TODO: consider using DashMap instead of RwLock>: https://github.com/xacrimon/conc-map-bench pub tables: RwLock, Arc>>, } #[async_trait] -impl SchemaProvider for SeafowlCollection { +impl SchemaProvider for SeafowlSchema { fn as_any(&self) -> &dyn Any { self } diff --git a/src/repository/default.rs b/src/repository/default.rs index 794b0f6a..439848ba 100644 --- a/src/repository/default.rs +++ b/src/repository/default.rs @@ -65,21 +65,9 @@ impl Repository for $repo { .expect("error running migrations"); } - async fn get_collections_in_database( + async fn list_collections( &self, - database_id: DatabaseId, - ) -> Result, Error> { - let names = sqlx::query("SELECT name FROM collection WHERE database_id = $1") - .bind(database_id) - .fetch(&self.executor) - .map_ok(|row| row.get("name")) - .try_collect() - .await.map_err($repo::interpret_error)?; - Ok(names) - } - async fn get_all_columns_in_database( - &self, - database_id: DatabaseId, + database_name: &str, ) -> Result, Error> { let mut builder: QueryBuilder<_> = QueryBuilder::new($repo::QUERIES.latest_table_versions); @@ -98,8 +86,8 @@ impl Repository for $repo { LEFT JOIN "table" ON collection.id = "table".collection_id LEFT JOIN desired_table_versions ON "table".id = desired_table_versions.table_id LEFT JOIN table_column ON table_column.table_version_id = desired_table_versions.id - WHERE database.id = "#); - builder.push_bind(database_id); + WHERE database.name = "#); + builder.push_bind(database_name); builder.push(r#" ORDER BY collection_name, table_name, table_version_id, column_name @@ -125,14 +113,26 @@ impl Repository for $repo { Ok(id) } - async fn get_collection_id_by_name( + async fn get_database( + &self, + name: &str, + ) -> Result { + let database = sqlx::query_as(r#"SELECT id, name FROM database WHERE database.name = $1"#) + .bind(name) + .fetch_one(&self.executor) + .await.map_err($repo::interpret_error)?; + + Ok(database) + } + + async fn get_collection( &self, database_name: &str, collection_name: &str, - ) -> Result { - let id = sqlx::query( + ) -> Result { + let collection = sqlx::query_as( r#" - SELECT collection.id + SELECT collection.id, database.id AS database_id, collection.name FROM collection JOIN database ON collection.database_id = database.id WHERE database.name = $1 AND collection.name = $2 "#, @@ -140,34 +140,20 @@ impl Repository for $repo { .bind(database_name) .bind(collection_name) .fetch_one(&self.executor) - .await.map_err($repo::interpret_error)? - .try_get("id").map_err($repo::interpret_error)?; - - Ok(id) - } - - async fn get_database_id_by_name( - &self, - database_name: &str, - ) -> Result { - let id = sqlx::query(r#"SELECT id FROM database WHERE database.name = $1"#) - .bind(database_name) - .fetch_one(&self.executor) - .await.map_err($repo::interpret_error)? - .try_get("id").map_err($repo::interpret_error)?; + .await.map_err($repo::interpret_error)?; - Ok(id) + Ok(collection) } - async fn get_table_id_by_name( + async fn get_table( &self, database_name: &str, collection_name: &str, table_name: &str, - ) -> Result { - let id = sqlx::query( + ) -> Result { + let table = sqlx::query_as( r#" - SELECT "table".id + SELECT "table".id, collection.id as collection_id, "table".name FROM "table" JOIN collection ON "table".collection_id = collection.id JOIN database ON collection.database_id = database.id @@ -178,21 +164,9 @@ impl Repository for $repo { .bind(collection_name) .bind(table_name) .fetch_one(&self.executor) - .await.map_err($repo::interpret_error)? - .try_get("id").map_err($repo::interpret_error)?; - - Ok(id) - } - - async fn get_all_database_ids(&self) -> Result> { - let all_db_ids = sqlx::query(r#"SELECT name, id FROM database"#) - .fetch_all(&self.executor) - .await.map_err($repo::interpret_error)? - .iter() - .map(|row| (row.get("name"), row.get("id"))) - .collect(); + .await.map_err($repo::interpret_error)?; - Ok(all_db_ids) + Ok(table) } async fn create_collection( @@ -261,7 +235,7 @@ impl Repository for $repo { Ok((new_table_id, new_version_id)) } - async fn delete_old_table_versions( + async fn delete_old_versions( &self, table_id: TableId, ) -> Result { @@ -277,7 +251,7 @@ impl Repository for $repo { Ok(delete_result.rows_affected()) } - async fn create_new_table_version( + async fn create_new_version( &self, uuid: Uuid, version: i64, @@ -315,7 +289,7 @@ impl Repository for $repo { Ok(new_version_id) } - async fn get_all_table_versions( + async fn get_all_versions( &self, database_name: &str, table_names: Option>, @@ -361,7 +335,7 @@ impl Repository for $repo { Ok(table_versions) } - async fn move_table( + async fn rename_table( &self, table_id: TableId, new_table_name: &str, @@ -436,6 +410,7 @@ impl Repository for $repo { data, volatility FROM function + WHERE database_id = $1; "#) .bind(database_id) @@ -497,7 +472,7 @@ impl Repository for $repo { Ok(()) } - async fn drop_database(&self, database_id: DatabaseId) -> Result<(), Error> { + async fn delete_database(&self, database_id: DatabaseId) -> Result<(), Error> { self.insert_dropped_tables(None, None, Some(database_id)).await?; sqlx::query("DELETE FROM database WHERE id = $1 RETURNING id") diff --git a/src/repository/interface.rs b/src/repository/interface.rs index ce846402..d6f02bf3 100644 --- a/src/repository/interface.rs +++ b/src/repository/interface.rs @@ -7,11 +7,35 @@ use strum::ParseError; use strum_macros::{Display, EnumString}; use uuid::Uuid; -use crate::data_types::{ - CollectionId, DatabaseId, FunctionId, TableId, TableVersionId, Timestamp, -}; use crate::wasm_udf::data_types::CreateFunctionDetails; +pub type DatabaseId = i64; +pub type CollectionId = i64; +pub type TableId = i64; +pub type TableVersionId = i64; +pub type Timestamp = i64; +pub type FunctionId = i64; + +#[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] +pub struct DatabaseRecord { + pub id: DatabaseId, + pub name: String, +} + +#[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] +pub struct CollectionRecord { + pub id: CollectionId, + pub database_id: DatabaseId, + pub name: String, +} + +#[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] +pub struct TableRecord { + pub id: TableId, + pub collection_id: CollectionId, + pub name: String, +} + #[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] pub struct AllDatabaseColumnsResult { pub database_name: String, @@ -89,35 +113,25 @@ pub type Result = std::result::Result; pub trait Repository: Send + Sync + Debug { async fn setup(&self); - async fn get_collections_in_database( - &self, - database_id: DatabaseId, - ) -> Result, Error>; - - async fn get_all_columns_in_database( + async fn list_collections( &self, - database_id: DatabaseId, + database_name: &str, ) -> Result, Error>; - async fn get_collection_id_by_name( - &self, - database_name: &str, - collection_name: &str, - ) -> Result; + async fn get_database(&self, name: &str) -> Result; - async fn get_database_id_by_name( + async fn get_collection( &self, database_name: &str, - ) -> Result; + collection_name: &str, + ) -> Result; - async fn get_table_id_by_name( + async fn get_table( &self, database_name: &str, collection_name: &str, table_name: &str, - ) -> Result; - - async fn get_all_database_ids(&self) -> Result, Error>; + ) -> Result; async fn create_database(&self, database_name: &str) -> Result; @@ -135,21 +149,21 @@ pub trait Repository: Send + Sync + Debug { uuid: Uuid, ) -> Result<(TableId, TableVersionId), Error>; - async fn delete_old_table_versions(&self, table_id: TableId) -> Result; + async fn delete_old_versions(&self, table_id: TableId) -> Result; - async fn create_new_table_version( + async fn create_new_version( &self, uuid: Uuid, version: i64, ) -> Result; - async fn get_all_table_versions( + async fn get_all_versions( &self, database_name: &str, table_names: Option>, ) -> Result>; - async fn move_table( + async fn rename_table( &self, table_id: TableId, new_table_name: &str, @@ -179,7 +193,7 @@ pub trait Repository: Send + Sync + Debug { async fn drop_collection(&self, collection_id: CollectionId) -> Result<(), Error>; - async fn drop_database(&self, database_id: DatabaseId) -> Result<(), Error>; + async fn delete_database(&self, database_id: DatabaseId) -> Result<(), Error>; async fn insert_dropped_tables( &self, @@ -217,11 +231,13 @@ pub mod tests { use super::*; + static TEST_DB: &str = "testdb"; + async fn make_database_with_single_table( repository: Arc, ) -> (DatabaseId, CollectionId, TableId, TableVersionId) { let database_id = repository - .create_database("testdb") + .create_database(TEST_DB) .await .expect("Error creating database"); @@ -259,7 +275,7 @@ pub mod tests { } pub async fn run_generic_repository_tests(repository: Arc) { - test_get_collections_empty(repository.clone()).await; + test_get_tables_empty(repository.clone()).await; let (database_id, table_id, table_version_id) = test_create_database_collection_table(repository.clone()).await; test_create_functions(repository.clone(), database_id).await; @@ -273,13 +289,13 @@ pub mod tests { test_error_propagation(repository, table_id).await; } - async fn test_get_collections_empty(repository: Arc) { + async fn test_get_tables_empty(repository: Arc) { assert_eq!( repository - .get_collections_in_database(0) + .list_collections(TEST_DB) .await .expect("error getting collections"), - Vec::::new() + Vec::::new() ); } @@ -330,17 +346,10 @@ pub mod tests { let (database_id, _, table_id, table_version_id) = make_database_with_single_table(repository.clone()).await; - let all_database_ids = repository - .get_all_database_ids() - .await - .expect("Error getting all database ids"); - - assert_eq!(all_database_ids, vec![("testdb".to_string(), database_id)]); - // Test loading all columns let all_columns = repository - .get_all_columns_in_database(database_id) + .list_collections(TEST_DB) .await .expect("Error getting all columns"); @@ -356,13 +365,13 @@ pub mod tests { // Duplicate the table let new_version_id = repository - .create_new_table_version(Uuid::default(), 1) + .create_new_version(Uuid::default(), 1) .await .unwrap(); // Test all columns again: we should have the schema for the latest table version let all_columns = repository - .get_all_columns_in_database(database_id) + .list_collections(TEST_DB) .await .expect("Error getting all columns"); @@ -378,7 +387,7 @@ pub mod tests { // Check the existing table versions let all_table_versions: Vec = repository - .get_all_table_versions("testdb", Some(vec!["testtable".to_string()])) + .get_all_versions("testdb", Some(vec!["testtable".to_string()])) .await .expect("Error getting all columns") .iter() @@ -484,12 +493,12 @@ pub mod tests { ) { // Rename the table to something else repository - .move_table(table_id, "testtable2", None) + .rename_table(table_id, "testtable2", None) .await .unwrap(); let all_columns = repository - .get_all_columns_in_database(database_id) + .list_collections(TEST_DB) .await .expect("Error getting all columns"); @@ -509,12 +518,12 @@ pub mod tests { .await .unwrap(); repository - .move_table(table_id, "testtable2", Some(collection_id)) + .rename_table(table_id, "testtable2", Some(collection_id)) .await .unwrap(); let mut all_columns = repository - .get_all_columns_in_database(database_id) + .list_collections(TEST_DB) .await .expect("Error getting all columns"); all_columns.sort_by_key(|c| c.collection_name.clone()); @@ -540,7 +549,7 @@ pub mod tests { // Nonexistent table ID assert!(matches!( repository - .move_table(-1, "doesntmatter", None) + .rename_table(-1, "doesntmatter", None) .await .unwrap_err(), Error::SqlxError(sqlx::Error::RowNotFound) @@ -549,26 +558,26 @@ pub mod tests { // Existing table ID, moved to a nonexistent collection (FK violation) assert!(matches!( repository - .move_table(table_id, "doesntmatter", Some(-1)) + .rename_table(table_id, "doesntmatter", Some(-1)) .await .unwrap_err(), Error::FKConstraintViolation(_) )); // Make a new table in the existing collection with the same name - let collection_id_1 = repository - .get_collection_id_by_name("testdb", "testcol") + let collection_1 = repository + .get_collection("testdb", "testcol") .await .unwrap(); - let collection_id_2 = repository - .get_collection_id_by_name("testdb", "testcol2") + let collection_2 = repository + .get_collection("testdb", "testcol2") .await .unwrap(); assert!(matches!( repository .create_table( - collection_id_2, + collection_2.id, "testtable2", &ArrowSchema::empty(), Uuid::default() @@ -581,7 +590,7 @@ pub mod tests { // Make a new table in the previous collection, try renaming let (new_table_id, _) = repository .create_table( - collection_id_1, + collection_1.id, "testtable2", &ArrowSchema::empty(), Uuid::default(), @@ -591,7 +600,7 @@ pub mod tests { assert!(matches!( repository - .move_table(new_table_id, "testtable2", Some(collection_id_2)) + .rename_table(new_table_id, "testtable2", Some(collection_2.id)) .await .unwrap_err(), Error::UniqueConstraintViolation(_) diff --git a/src/repository/postgres.rs b/src/repository/postgres.rs index 3dede0d0..26d73c90 100644 --- a/src/repository/postgres.rs +++ b/src/repository/postgres.rs @@ -11,17 +11,15 @@ use sqlx::{ }; use uuid::Uuid; -use crate::{ - data_types::{CollectionId, DatabaseId, FunctionId, TableId, TableVersionId}, - implement_repository, - wasm_udf::data_types::CreateFunctionDetails, -}; +use crate::{implement_repository, wasm_udf::data_types::CreateFunctionDetails}; use super::{ default::RepositoryQueries, interface::{ - AllDatabaseColumnsResult, AllDatabaseFunctionsResult, DroppedTableDeletionStatus, - DroppedTablesResult, Error, Repository, Result, TableVersionsResult, + AllDatabaseColumnsResult, AllDatabaseFunctionsResult, CollectionId, + CollectionRecord, DatabaseId, DatabaseRecord, DroppedTableDeletionStatus, + DroppedTablesResult, Error, FunctionId, Repository, Result, TableId, TableRecord, + TableVersionId, TableVersionsResult, }, }; diff --git a/src/repository/sqlite.rs b/src/repository/sqlite.rs index 8645d223..13ea413d 100644 --- a/src/repository/sqlite.rs +++ b/src/repository/sqlite.rs @@ -12,18 +12,17 @@ use sqlx::{ }; use uuid::Uuid; -use crate::{ - data_types::{CollectionId, DatabaseId, FunctionId, TableId, TableVersionId}, - wasm_udf::data_types::CreateFunctionDetails, -}; +use crate::wasm_udf::data_types::CreateFunctionDetails; use crate::implement_repository; use super::{ default::RepositoryQueries, interface::{ - AllDatabaseColumnsResult, AllDatabaseFunctionsResult, DroppedTableDeletionStatus, - DroppedTablesResult, Error, Repository, Result, TableVersionsResult, + AllDatabaseColumnsResult, AllDatabaseFunctionsResult, CollectionId, + CollectionRecord, DatabaseId, DatabaseRecord, DroppedTableDeletionStatus, + DroppedTablesResult, Error, FunctionId, Repository, Result, TableId, TableRecord, + TableVersionId, TableVersionsResult, }, }; @@ -155,10 +154,7 @@ mod tests { .unwrap(); assert_eq!( - ro_repository - .get_database_id_by_name("testdb") - .await - .unwrap(), + ro_repository.get_database("testdb").await.unwrap().id, db_id ); } diff --git a/src/system_tables.rs b/src/system_tables.rs index 6e2ceeb2..c3e36716 100644 --- a/src/system_tables.rs +++ b/src/system_tables.rs @@ -1,7 +1,7 @@ //! Mechanism for creating virtual Seafowl system tables, inspired by influxdb_iox system tables //! and datafusion's information_schema. -use crate::catalog::TableCatalog; +use crate::catalog::TableStore; use crate::repository::interface::DroppedTablesResult; use arrow::array::{Int64Builder, StringBuilder, StructBuilder, TimestampSecondBuilder}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; @@ -24,11 +24,11 @@ const DROPPED_TABLES: &str = "dropped_tables"; pub struct SystemSchemaProvider { database: Arc, - table_catalog: Arc, + table_catalog: Arc, } impl SystemSchemaProvider { - pub fn new(database: Arc, table_catalog: Arc) -> Self { + pub fn new(database: Arc, table_catalog: Arc) -> Self { Self { database, table_catalog, @@ -131,13 +131,13 @@ where struct TableVersionsTable { database: Arc, schema: SchemaRef, - table_catalog: Arc, + table_catalog: Arc, } impl TableVersionsTable { - fn new(database: Arc, table_catalog: Arc) -> Self { + fn new(database: Arc, table_catalog: Arc) -> Self { Self { - // This is dictated by the output of `get_all_table_versions`, except that we omit the + // This is dictated by the output of `get_all_versions`, except that we omit the // database_name field, since we scope down to the database at hand. database, schema: Arc::new(Schema::new(vec![ @@ -165,7 +165,7 @@ impl SeafowlSystemTable for TableVersionsTable { async fn load_record_batch(&self) -> Result { let table_versions = self .table_catalog - .get_all_table_versions(&self.database, None) + .get_all_versions(&self.database, None) .await?; let mut builder = StructBuilder::from_fields( @@ -210,11 +210,11 @@ impl SeafowlSystemTable for TableVersionsTable { struct DroppedTablesTable { database: Arc, schema: SchemaRef, - table_catalog: Arc, + table_catalog: Arc, } impl DroppedTablesTable { - fn new(database: Arc, table_catalog: Arc) -> Self { + fn new(database: Arc, table_catalog: Arc) -> Self { Self { // This is dictated by the output of `get_dropped_tables`, except that we omit the // database_name field, since we scope down to the database at hand. diff --git a/src/utils.rs b/src/utils.rs index 6234e887..b75479d7 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -46,7 +46,8 @@ pub async fn run_one_off_command( // Physically delete dropped tables for a given context pub async fn gc_databases(context: &SeafowlContext, database_name: Option) { let mut dropped_tables = context - .table_catalog + .metastore + .tables .get_dropped_tables(database_name) .await .unwrap_or_else(|err| { @@ -82,7 +83,8 @@ pub async fn gc_databases(context: &SeafowlContext, database_name: Option