From 5900d587310f5c2fb698a8352a438757e5a3c11c Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 19 Dec 2023 12:20:54 +0100 Subject: [PATCH 1/7] Separate catalog and repository logic --- src/catalog.rs | 372 +++++++++++++++++++++--------------- src/config/context.rs | 29 +-- src/context/delta.rs | 14 +- src/context/mod.rs | 50 ++--- src/context/physical.rs | 113 +++-------- src/data_types.rs | 44 ----- src/lib.rs | 1 - src/provider.rs | 12 +- src/repository/default.rs | 70 ++++--- src/repository/interface.rs | 93 +++++---- src/repository/postgres.rs | 12 +- src/repository/sqlite.rs | 16 +- tests/statements/ddl.rs | 2 +- tests/statements/mod.rs | 2 +- 14 files changed, 397 insertions(+), 433 deletions(-) delete mode 100644 src/data_types.rs diff --git a/src/catalog.rs b/src/catalog.rs index f8262e6b..57ac294d 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -15,18 +15,20 @@ use uuid::Uuid; use crate::object_store::wrapped::InternalObjectStore; use crate::provider::SeafowlFunction; -use crate::repository::interface::{DroppedTableDeletionStatus, DroppedTablesResult}; +use crate::repository::interface::{ + DatabaseRecord, DroppedTableDeletionStatus, DroppedTablesResult, TableRecord, +}; use crate::system_tables::SystemSchemaProvider; use crate::wasm_udf::data_types::{ CreateFunctionDataType, CreateFunctionDetails, CreateFunctionLanguage, CreateFunctionVolatility, }; use crate::{ - data_types::{CollectionId, DatabaseId, FunctionId, TableId, TableVersionId}, - provider::{SeafowlCollection, SeafowlDatabase}, + provider::{SeafowlDatabase, SeafowlSchema}, repository::interface::{ - AllDatabaseColumnsResult, AllDatabaseFunctionsResult, Error as RepositoryError, - Repository, TableVersionsResult, + AllDatabaseColumnsResult, AllDatabaseFunctionsResult, CollectionId, + CollectionRecord, DatabaseId, Error as RepositoryError, FunctionId, Repository, + TableId, TableVersionId, TableVersionsResult, }, }; @@ -36,13 +38,13 @@ pub const STAGING_SCHEMA: &str = "staging"; #[derive(Debug)] pub enum Error { - DatabaseDoesNotExist { id: DatabaseId }, - CollectionDoesNotExist { id: CollectionId }, - TableDoesNotExist { id: TableId }, + CatalogDoesNotExist { name: String }, + SchemaDoesNotExist { name: String }, + TableDoesNotExist { name: String }, TableUuidDoesNotExist { uuid: Uuid }, TableAlreadyExists { name: String }, - DatabaseAlreadyExists { name: String }, - CollectionAlreadyExists { name: String }, + CatalogAlreadyExists { name: String }, + SchemaAlreadyExists { name: String }, FunctionAlreadyExists { name: String }, FunctionDeserializationError { reason: String }, FunctionNotFound { names: String }, @@ -93,20 +95,17 @@ pub type Result = std::result::Result; impl From for DataFusionError { fn from(val: Error) -> Self { match val { - // These errors are raised by routines that already take an ID instead of - // a database/schema/table name and so the ID is supposed to be valid. An error - // in this case is an internal consistency issue. - Error::DatabaseDoesNotExist { id } => { - DataFusionError::Internal(format!("Database with ID {id} doesn't exist")) + Error::CatalogDoesNotExist { name } => { + DataFusionError::Plan(format!("Database {name:?} doesn't exist")) } - Error::CollectionDoesNotExist { id } => { - DataFusionError::Internal(format!("Schema with ID {id} doesn't exist")) + Error::SchemaDoesNotExist { name } => { + DataFusionError::Plan(format!("Schema {name:?} doesn't exist")) } - Error::TableDoesNotExist { id } => { - DataFusionError::Internal(format!("Table with ID {id} doesn't exist")) + Error::TableDoesNotExist { name } => { + DataFusionError::Plan(format!("Table {name:?} doesn't exist")) } Error::TableUuidDoesNotExist { uuid } => { - DataFusionError::Internal(format!("Table with UUID {uuid} doesn't exist")) + DataFusionError::Plan(format!("Table with UUID {uuid} doesn't exist")) } Error::FunctionDeserializationError { reason } => DataFusionError::Internal( format!("Error deserializing function: {reason:?}"), @@ -120,10 +119,10 @@ impl From for DataFusionError { Error::TableAlreadyExists { name } => { DataFusionError::Plan(format!("Table {name:?} already exists")) } - Error::DatabaseAlreadyExists { name } => { + Error::CatalogAlreadyExists { name } => { DataFusionError::Plan(format!("Database {name:?} already exists")) } - Error::CollectionAlreadyExists { name } => { + Error::SchemaAlreadyExists { name } => { DataFusionError::Plan(format!("Schema {name:?} already exists")) } Error::FunctionAlreadyExists { name } => { @@ -137,10 +136,9 @@ impl From for DataFusionError { .to_string(), ), // Miscellaneous sqlx error. We want to log it but it's not worth showing to the user. - Error::SqlxError(e) => DataFusionError::Internal(format!( - "Internal SQL error: {:?}", - e.to_string() - )), + Error::SqlxError(e) => { + DataFusionError::Plan(format!("Internal SQL error: {:?}", e.to_string())) + } } } } @@ -148,41 +146,47 @@ impl From for DataFusionError { #[cfg_attr(test, automock)] #[async_trait] pub trait TableCatalog: Sync + Send { - async fn load_database(&self, id: DatabaseId) -> Result; - async fn load_database_ids(&self) -> Result>; - async fn get_database_id_by_name( - &self, - database_name: &str, - ) -> Result>; - async fn get_collection_id_by_name( + async fn load_database(&self, name: &str) -> Result; + async fn get_catalog(&self, name: &str) -> Result; + + async fn list_catalogs(&self) -> Result, Error>; + + async fn get_schema( &self, - database_name: &str, - collection_name: &str, - ) -> Result>; - async fn get_table_id_by_name( + catalog_name: &str, + schema_name: &str, + ) -> Result; + + async fn get_table( &self, - database_name: &str, - collection_name: &str, + catalog_name: &str, + schema_name: &str, table_name: &str, - ) -> Result>; + ) -> Result; - async fn create_database(&self, database_name: &str) -> Result; + async fn create_catalog(&self, catalog_name: &str) -> Result; - async fn create_collection( + async fn create_schema( &self, - database_id: DatabaseId, - collection_name: &str, + catalog_name: &str, + schema_name: &str, ) -> Result; async fn create_table( &self, - collection_id: CollectionId, + catalog_name: &str, + schema_name: &str, table_name: &str, schema: &Schema, uuid: Uuid, ) -> Result<(TableId, TableVersionId)>; - async fn delete_old_table_versions(&self, table_id: TableId) -> Result; + async fn delete_old_table_versions( + &self, + catalog_name: &str, + collection_name: &str, + table_name: &str, + ) -> Result; async fn create_new_table_version( &self, @@ -192,26 +196,34 @@ pub trait TableCatalog: Sync + Send { async fn get_all_table_versions( &self, - database_name: &str, + catalog_name: &str, table_names: Option>, ) -> Result>; async fn move_table( &self, - table_id: TableId, + old_catalog_name: &str, + old_schema_name: &str, + old_table_name: &str, + new_catalog_name: &str, + new_schema_name: &str, new_table_name: &str, - new_collection_id: Option, ) -> Result<()>; - async fn drop_table(&self, table_id: TableId) -> Result<()>; + async fn drop_table( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> Result<()>; - async fn drop_collection(&self, collection_id: CollectionId) -> Result<()>; + async fn delete_schema(&self, catalog_name: &str, schema_name: &str) -> Result<()>; - async fn drop_database(&self, database_id: DatabaseId) -> Result<()>; + async fn delete_catalog(&self, name: &str) -> Result<()>; async fn get_dropped_tables( &self, - database_name: Option, + catalog_name: Option, ) -> Result>; async fn update_dropped_table( @@ -228,7 +240,7 @@ pub trait TableCatalog: Sync + Send { pub trait FunctionCatalog: Sync + Send { async fn create_function( &self, - database_id: DatabaseId, + catalog_name: &str, function_name: &str, or_replace: bool, details: &CreateFunctionDetails, @@ -236,12 +248,12 @@ pub trait FunctionCatalog: Sync + Send { async fn get_all_functions_in_database( &self, - database_id: DatabaseId, + catalog_name: &str, ) -> Result>; async fn drop_function( &self, - database_id: DatabaseId, + catalog_name: &str, if_exists: bool, func_names: &[String], ) -> Result<()>; @@ -293,11 +305,11 @@ impl DefaultCatalog { (Arc::from(table_name.to_string()), Arc::new(table) as _) } - fn build_collection<'a, I>( + fn build_schema<'a, I>( &self, collection_name: &str, collection_columns: I, - ) -> (Arc, Arc) + ) -> (Arc, Arc) where I: Iterator, { @@ -315,7 +327,7 @@ impl DefaultCatalog { ( Arc::from(collection_name.to_string()), - Arc::new(SeafowlCollection { + Arc::new(SeafowlSchema { name: Arc::from(collection_name.to_string()), tables: RwLock::new(tables), }), @@ -325,10 +337,10 @@ impl DefaultCatalog { #[async_trait] impl TableCatalog for DefaultCatalog { - async fn load_database(&self, database_id: DatabaseId) -> Result { + async fn load_database(&self, name: &str) -> Result { let all_columns = self .repository - .get_all_columns_in_database(database_id) + .get_all_columns_in_database(name) .await .map_err(Self::to_sqlx_error)?; @@ -337,11 +349,11 @@ impl TableCatalog for DefaultCatalog { // Turn the list of all collections, tables and their columns into a nested map. - let collections: HashMap, Arc> = all_columns + let schemas: HashMap, Arc> = all_columns .iter() .group_by(|col| &col.collection_name) .into_iter() - .map(|(cn, cc)| self.build_collection(cn, cc)) + .map(|(cn, cc)| self.build_schema(cn, cc)) .collect(); // TODO load the database name too @@ -349,7 +361,7 @@ impl TableCatalog for DefaultCatalog { Ok(SeafowlDatabase { name: name.clone(), - collections, + schemas, staging_schema: self.staging_schema.clone(), system_schema: Arc::new(SystemSchemaProvider::new( name, @@ -358,25 +370,18 @@ impl TableCatalog for DefaultCatalog { }) } - async fn load_database_ids(&self) -> Result> { - let all_db_ids = self - .repository - .get_all_database_ids() - .await - .map_err(Self::to_sqlx_error)?; - - Ok(HashMap::from_iter(all_db_ids)) - } - async fn create_table( &self, - collection_id: CollectionId, + catalog_name: &str, + schema_name: &str, table_name: &str, schema: &Schema, uuid: Uuid, ) -> Result<(TableId, TableVersionId)> { + let collection = self.get_schema(catalog_name, schema_name).await?; + self.repository - .create_table(collection_id, table_name, schema, uuid) + .create_table(collection.id, table_name, schema, uuid) .await .map_err(|e| match e { RepositoryError::UniqueConstraintViolation(_) => { @@ -384,102 +389,125 @@ impl TableCatalog for DefaultCatalog { name: table_name.to_string(), } } - RepositoryError::FKConstraintViolation(_) => { - Error::CollectionDoesNotExist { id: collection_id } - } + RepositoryError::FKConstraintViolation(_) => Error::SchemaDoesNotExist { + name: schema_name.to_string(), + }, RepositoryError::SqlxError(e) => Error::SqlxError(e), }) } - async fn delete_old_table_versions(&self, table_id: TableId) -> Result { + async fn delete_old_table_versions( + &self, + catalog_name: &str, + collection_name: &str, + table_name: &str, + ) -> Result { + let table = self + .get_table(catalog_name, collection_name, table_name) + .await?; + self.repository - .delete_old_table_versions(table_id) + .delete_old_table_versions(table.id) .await .map_err(Self::to_sqlx_error) } - async fn get_collection_id_by_name( + async fn get_catalog(&self, name: &str) -> Result { + match self.repository.get_database(name).await { + Ok(database) => Ok(database), + Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => { + Err(Error::CatalogDoesNotExist { + name: name.to_string(), + }) + } + Err(e) => Err(Self::to_sqlx_error(e)), + } + } + + async fn list_catalogs(&self) -> Result, Error> { + match self.repository.list_databases().await { + Ok(databases) => Ok(databases), + Err(e) => Err(Self::to_sqlx_error(e)), + } + } + + async fn get_schema( &self, - database_name: &str, - collection_name: &str, - ) -> Result> { - if collection_name == STAGING_SCHEMA { + catalog_name: &str, + schema_name: &str, + ) -> Result { + if schema_name == STAGING_SCHEMA { return Err(Error::UsedStagingSchema); } match self .repository - .get_collection_id_by_name(database_name, collection_name) + .get_collection(catalog_name, schema_name) .await { - Ok(id) => Ok(Some(id)), - Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => Ok(None), - Err(e) => Err(Self::to_sqlx_error(e)), - } - } - - async fn get_database_id_by_name( - &self, - database_name: &str, - ) -> Result> { - match self.repository.get_database_id_by_name(database_name).await { - Ok(id) => Ok(Some(id)), - Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => Ok(None), + Ok(schema) => Ok(schema), + Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => { + Err(Error::SchemaDoesNotExist { + name: schema_name.to_string(), + }) + } Err(e) => Err(Self::to_sqlx_error(e)), } } - async fn get_table_id_by_name( + async fn get_table( &self, - database_name: &str, - collection_name: &str, + catalog_name: &str, + schema_name: &str, table_name: &str, - ) -> Result> { - if collection_name == STAGING_SCHEMA { - return Err(Error::UsedStagingSchema); - } - + ) -> Result { match self .repository - .get_table_id_by_name(database_name, collection_name, table_name) + .get_table(catalog_name, schema_name, table_name) .await { - Ok(id) => Ok(Some(id)), - Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => Ok(None), + Ok(table) => Ok(table), + Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => { + Err(Error::TableDoesNotExist { + name: table_name.to_string(), + }) + } Err(e) => Err(Self::to_sqlx_error(e)), } } - async fn create_database(&self, database_name: &str) -> Result { + async fn create_catalog(&self, catalog_name: &str) -> Result { self.repository - .create_database(database_name) + .create_database(catalog_name) .await .map_err(|e| match e { RepositoryError::UniqueConstraintViolation(_) => { - Error::DatabaseAlreadyExists { - name: database_name.to_string(), + Error::CatalogAlreadyExists { + name: catalog_name.to_string(), } } _ => Self::to_sqlx_error(e), }) } - async fn create_collection( + async fn create_schema( &self, - database_id: DatabaseId, - collection_name: &str, + catalog_name: &str, + schema_name: &str, ) -> Result { - if collection_name == STAGING_SCHEMA { + if schema_name == STAGING_SCHEMA { return Err(Error::UsedStagingSchema); } + let database = self.get_catalog(catalog_name).await?; + self.repository - .create_collection(database_id, collection_name) + .create_collection(database.id, schema_name) .await .map_err(|e| match e { RepositoryError::UniqueConstraintViolation(_) => { - Error::CollectionAlreadyExists { - name: collection_name.to_string(), + Error::SchemaAlreadyExists { + name: schema_name.to_string(), } } _ => Self::to_sqlx_error(e), @@ -504,29 +532,42 @@ impl TableCatalog for DefaultCatalog { async fn get_all_table_versions( &self, - database_name: &str, + catalog_name: &str, table_names: Option>, ) -> Result> { self.repository - .get_all_table_versions(database_name, table_names) + .get_all_table_versions(catalog_name, table_names) .await .map_err(Self::to_sqlx_error) } async fn move_table( &self, - table_id: TableId, + old_catalog_name: &str, + old_schema_name: &str, + old_table_name: &str, + _new_catalog_name: &str, // For now we don't support moving across catalogs + new_schema_name: &str, new_table_name: &str, - new_collection_id: Option, ) -> Result<()> { + let table = self + .get_table(old_catalog_name, old_schema_name, old_table_name) + .await?; + let new_schema_id = if new_schema_name != old_schema_name { + let schema = self.get_schema(old_catalog_name, new_schema_name).await?; + Some(schema.id) + } else { + None + }; + self.repository - .move_table(table_id, new_table_name, new_collection_id) + .move_table(table.id, new_table_name, new_schema_id) .await .map_err(|e| match e { RepositoryError::FKConstraintViolation(_) => { // We only FK on collection_id, so this will be Some - Error::CollectionDoesNotExist { - id: new_collection_id.unwrap(), + Error::SchemaDoesNotExist { + name: new_schema_name.to_string(), } } RepositoryError::UniqueConstraintViolation(_) => { @@ -535,43 +576,64 @@ impl TableCatalog for DefaultCatalog { } } RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::TableDoesNotExist { id: table_id } + Error::TableDoesNotExist { + name: old_table_name.to_string(), + } } _ => Self::to_sqlx_error(e), }) } - async fn drop_table(&self, table_id: TableId) -> Result<()> { + async fn drop_table( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> Result<()> { + let table = self + .get_table(catalog_name, schema_name, table_name) + .await?; + self.repository - .drop_table(table_id) + .drop_table(table.id) .await .map_err(|e| match e { RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::TableDoesNotExist { id: table_id } + Error::TableDoesNotExist { + name: table_name.to_string(), + } } _ => Self::to_sqlx_error(e), }) } - async fn drop_collection(&self, collection_id: CollectionId) -> Result<()> { + async fn delete_schema(&self, catalog_name: &str, schema_name: &str) -> Result<()> { + let schema = self.get_schema(catalog_name, schema_name).await?; + self.repository - .drop_collection(collection_id) + .drop_collection(schema.id) .await .map_err(|e| match e { RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::CollectionDoesNotExist { id: collection_id } + Error::SchemaDoesNotExist { + name: schema_name.to_string(), + } } _ => Self::to_sqlx_error(e), }) } - async fn drop_database(&self, database_id: DatabaseId) -> Result<()> { + async fn delete_catalog(&self, name: &str) -> Result<()> { + let database = self.get_catalog(name).await?; + self.repository - .drop_database(database_id) + .delete_database(database.id) .await .map_err(|e| match e { RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::DatabaseDoesNotExist { id: database_id } + Error::CatalogDoesNotExist { + name: name.to_string(), + } } _ => Self::to_sqlx_error(e), }) @@ -579,10 +641,10 @@ impl TableCatalog for DefaultCatalog { async fn get_dropped_tables( &self, - database_name: Option, + catalog_name: Option, ) -> Result> { self.repository - .get_dropped_tables(database_name) + .get_dropped_tables(catalog_name) .await .map_err(Self::to_sqlx_error) } @@ -650,18 +712,20 @@ impl DefaultCatalog { impl FunctionCatalog for DefaultCatalog { async fn create_function( &self, - database_id: DatabaseId, + catalog_name: &str, function_name: &str, or_replace: bool, details: &CreateFunctionDetails, ) -> Result { + let database = self.get_catalog(catalog_name).await?; + self.repository - .create_function(database_id, function_name, or_replace, details) + .create_function(database.id, function_name, or_replace, details) .await .map_err(|e| match e { - RepositoryError::FKConstraintViolation(_) => { - Error::DatabaseDoesNotExist { id: database_id } - } + RepositoryError::FKConstraintViolation(_) => Error::CatalogDoesNotExist { + name: catalog_name.to_string(), + }, RepositoryError::UniqueConstraintViolation(_) => { Error::FunctionAlreadyExists { name: function_name.to_string(), @@ -673,11 +737,13 @@ impl FunctionCatalog for DefaultCatalog { async fn get_all_functions_in_database( &self, - database_id: DatabaseId, + catalog_name: &str, ) -> Result> { + let database = self.get_catalog(catalog_name).await?; + let all_functions = self .repository - .get_all_functions_in_database(database_id) + .get_all_functions_in_database(database.id) .await .map_err(Self::to_sqlx_error)?; @@ -699,14 +765,18 @@ impl FunctionCatalog for DefaultCatalog { async fn drop_function( &self, - database_id: DatabaseId, + catalog_name: &str, if_exists: bool, func_names: &[String], ) -> Result<()> { - match self.repository.drop_function(database_id, func_names).await { + let database = self.get_catalog(catalog_name).await?; + + match self.repository.drop_function(database.id, func_names).await { Ok(id) => Ok(id), Err(RepositoryError::FKConstraintViolation(_)) => { - Err(Error::DatabaseDoesNotExist { id: database_id }) + Err(Error::CatalogDoesNotExist { + name: catalog_name.to_string(), + }) } Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => { if if_exists { diff --git a/src/config/context.rs b/src/config/context.rs index e2942fc0..d62d0a0f 100644 --- a/src/config/context.rs +++ b/src/config/context.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::sync::Arc; use crate::{ @@ -19,6 +20,7 @@ use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; #[cfg(feature = "catalog-postgres")] use crate::repository::postgres::PostgresRepository; +use crate::catalog::Error; use crate::object_store::http::add_http_object_store; use crate::object_store::wrapped::InternalObjectStore; #[cfg(feature = "remote-tables")] @@ -182,20 +184,22 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result id, - None => tables.create_database(DEFAULT_DB).await.unwrap(), - }; + if let Err(Error::CatalogDoesNotExist { .. }) = tables.get_catalog(DEFAULT_DB).await { + tables.create_catalog(DEFAULT_DB).await.unwrap(); + } - match tables - .get_collection_id_by_name(DEFAULT_DB, DEFAULT_SCHEMA) - .await? + if let Err(Error::SchemaDoesNotExist { .. }) = + tables.get_schema(DEFAULT_DB, DEFAULT_SCHEMA).await { - Some(id) => id, - None => tables.create_collection(default_db, DEFAULT_SCHEMA).await?, - }; + tables.create_schema(DEFAULT_DB, DEFAULT_SCHEMA).await?; + } - let all_database_ids = tables.load_database_ids().await?; + let all_databases: HashSet = tables + .list_catalogs() + .await? + .iter() + .map(|db| db.name.clone()) + .collect(); // Convergence doesn't support connecting to different DB names. We are supposed // to do one context per query (as we need to load the schema before executing every @@ -209,8 +213,7 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result, pub internal_object_store: Arc, pub database: String, - pub database_id: DatabaseId, - pub all_database_ids: Arc>>, + pub all_databases: Arc>>, pub max_partition_size: u32, } impl SeafowlContext { // Create a new `SeafowlContext` with a new inner context scoped to a different default DB pub async fn scope_to_database(&self, name: String) -> Result> { - let maybe_database_id = self.all_database_ids.read().get(name.as_str()).cloned(); - let database_id = match maybe_database_id { - Some(db_id) => db_id, - None => { - // Perhaps the db was created on another node; try to reload from catalog - let new_db_ids = self.table_catalog.load_database_ids().await?; - new_db_ids - .get(name.as_str()) - .cloned() - .map(|db_id| { - self.all_database_ids.write().insert(name.clone(), db_id); - db_id - }) - .ok_or_else(|| { - DataFusionError::Plan(format!( - "Unknown database {name}; try creating one with CREATE DATABASE first" - )) - })? - } + // TODO: do we need this? The only goal here is to keep the list of databases in memory and + // fail early if one is missing, but we can defer that to resolution time. + let maybe_database = self.all_databases.read().get(&name).cloned(); + let all_databases = self.all_databases.clone(); + if maybe_database.is_none() { + // Perhaps the db was created on another node; try to reload from catalog + let _ = self.table_catalog.get_catalog(&name).await.map_err(|_| { + DataFusionError::Plan(format!( + "Unknown database {name}; try creating one with CREATE DATABASE first" + )) + })?; + all_databases.write().insert(name.clone()); }; // Swap the default database in the new internal context's session config @@ -75,8 +64,7 @@ impl SeafowlContext { function_catalog: self.function_catalog.clone(), internal_object_store: self.internal_object_store.clone(), database: name, - database_id, - all_database_ids: self.all_database_ids.clone(), + all_databases, max_partition_size: self.max_partition_size, })) } @@ -100,12 +88,12 @@ impl SeafowlContext { self.inner.register_catalog( &self.database, - Arc::new(self.table_catalog.load_database(self.database_id).await?), + Arc::new(self.table_catalog.load_database(&self.database).await?), ); // Register all functions in the database self.function_catalog - .get_all_functions_in_database(self.database_id) + .get_all_functions_in_database(&self.database) .await? .iter() .try_for_each(|f| self.register_function(&f.name, &f.details)) @@ -253,7 +241,7 @@ pub mod test_utils { // place on another node context .table_catalog - .create_database("testdb") + .create_catalog("testdb") .await .unwrap(); diff --git a/src/context/physical.rs b/src/context/physical.rs index 96fe633e..c63d9b67 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -173,7 +173,7 @@ impl SeafowlContext { // CREATE SCHEMA // Create a schema and register it self.table_catalog - .create_collection(self.database_id, schema_name) + .create_schema(&self.database, schema_name) .await?; Ok(make_dummy_exec()) } @@ -182,12 +182,7 @@ impl SeafowlContext { if_not_exists, .. })) => { - if self - .table_catalog - .get_database_id_by_name(catalog_name) - .await? - .is_some() - { + if self.table_catalog.get_catalog(catalog_name).await.is_ok() { if !*if_not_exists { return Err(DataFusionError::Plan(format!( "Database {catalog_name} already exists" @@ -198,18 +193,15 @@ impl SeafowlContext { } // Persist DB into metadata catalog - let database_id = - self.table_catalog.create_database(catalog_name).await?; + self.table_catalog.create_catalog(catalog_name).await?; // Create the corresponding default schema as well self.table_catalog - .create_collection(database_id, DEFAULT_SCHEMA) + .create_schema(catalog_name, DEFAULT_SCHEMA) .await?; // Update the shared in-memory map of DB names -> ids - self.all_database_ids - .write() - .insert(catalog_name.clone(), database_id); + self.all_databases.write().insert(catalog_name.clone()); Ok(make_dummy_exec()) } @@ -527,19 +519,13 @@ impl SeafowlContext { return Ok(make_dummy_exec()); } - let table_id = self - .table_catalog - .get_table_id_by_name( + self.table_catalog + .drop_table( &resolved_ref.catalog, &resolved_ref.schema, &resolved_ref.table, ) - .await? - .ok_or_else(|| { - DataFusionError::Execution(format!("Table {name} not found")) - })?; - - self.table_catalog.drop_table(table_id).await?; + .await?; Ok(make_dummy_exec()) } LogicalPlan::Ddl(DdlStatement::CreateView(_)) => Err(Error::Plan( @@ -588,7 +574,7 @@ impl SeafowlContext { // Persist the function in the metadata storage self.function_catalog .create_function( - self.database_id, + &self.database, name, *or_replace, details, @@ -603,7 +589,7 @@ impl SeafowlContext { output_schema: _, }) => { self.function_catalog - .drop_function(self.database_id, *if_exists, func_names) + .drop_function(&self.database, *if_exists, func_names) .await?; Ok(make_dummy_exec()) } @@ -623,65 +609,28 @@ impl SeafowlContext { )); } - // Resolve old table reference and fetch the table id + // Resolve old table reference let old_table_ref = TableReference::from(old_name.as_str()); let resolved_old_ref = old_table_ref.resolve(&self.database, DEFAULT_SCHEMA); - let table_id = self - .table_catalog - .get_table_id_by_name( - &resolved_old_ref.catalog, - &resolved_old_ref.schema, - &resolved_old_ref.table, - ) - .await? - .ok_or_else(|| { - DataFusionError::Execution(format!( - "Table {old_name} not found" - )) - })?; - - // If the old and new table schema is different check that the - // corresponding collection already exists - let new_schema_id = - if resolved_new_ref.schema != resolved_old_ref.schema { - let collection_id = self - .table_catalog - .get_collection_id_by_name( - &self.database, - &resolved_new_ref.schema, - ) - .await? - .ok_or_else(|| { - Error::Plan(format!( - "Schema \"{}\" does not exist!", - &resolved_new_ref.schema, - )) - })?; - Some(collection_id) - } else { - None - }; - // Finally update our catalog entry self.table_catalog .move_table( - table_id, + &resolved_old_ref.catalog, + &resolved_old_ref.schema, + &resolved_old_ref.table, + &resolved_new_ref.catalog, + &resolved_new_ref.schema, &resolved_new_ref.table, - new_schema_id, ) .await?; Ok(make_dummy_exec()) } SeafowlExtensionNode::DropSchema(DropSchema { name, .. }) => { - if let Some(collection_id) = self - .table_catalog - .get_collection_id_by_name(&self.database, name) - .await? - { - self.table_catalog.drop_collection(collection_id).await? - }; + self.table_catalog + .delete_schema(&self.database, name) + .await?; Ok(make_dummy_exec()) } @@ -697,22 +646,8 @@ impl SeafowlContext { let resolved_ref = table_ref.resolve(&self.database, DEFAULT_SCHEMA); - let table_id = self - .table_catalog - .get_table_id_by_name( - &resolved_ref.catalog, - &resolved_ref.schema, - &resolved_ref.table, - ) - .await? - .ok_or_else(|| { - DataFusionError::Execution( - "Table {table_name} not found".to_string(), - ) - })?; - if let Ok(mut delta_table) = - self.try_get_delta_table(resolved_ref).await + self.try_get_delta_table(resolved_ref.clone()).await { // TODO: The Delta protocol doesn't vacuum old table versions per se, but only files no longer tied to the latest table version. // This means that the VACUUM could be a no-op, for instance, in the case when append-only writes have been performed. @@ -737,7 +672,11 @@ impl SeafowlContext { match self .table_catalog - .delete_old_table_versions(table_id) + .delete_old_table_versions( + &resolved_ref.catalog, + &resolved_ref.schema, + &resolved_ref.table, + ) .await { Ok(row_count) => { @@ -847,7 +786,7 @@ impl SeafowlContext { None => { // Schema doesn't exist; create one first, and then reload to pick it up self.table_catalog - .create_collection(self.database_id, &schema_name) + .create_schema(&self.database, &schema_name) .await?; self.reload_schema().await?; false diff --git a/src/data_types.rs b/src/data_types.rs deleted file mode 100644 index bb54ab7f..00000000 --- a/src/data_types.rs +++ /dev/null @@ -1,44 +0,0 @@ -pub type DatabaseId = i64; -pub type CollectionId = i64; -pub type TableId = i64; -pub type TableVersionId = i64; -pub type Timestamp = i64; -pub type TableColumnId = i64; -pub type FunctionId = i64; - -// TODO: most of these structs currently aren't used (we use versions -// without IDs since they can be passed to db-writing routines before -// we know the ID) - -pub struct Database { - pub id: DatabaseId, - pub name: String, -} - -pub struct Collection { - pub id: CollectionId, - pub database_id: DatabaseId, - pub name: String, -} - -pub struct Table { - pub id: TableId, - pub collection_id: CollectionId, - pub name: String, -} - -pub struct TableVersion { - pub id: TableVersionId, - pub table_id: TableId, - pub creation_time: Timestamp, - // TODO is_deleted flag? -} - -pub struct TableColumn { - pub id: TableColumnId, - pub table_version_id: TableVersionId, - pub name: String, - // TODO enum? - pub r#type: String, - // TODO ordinal? -} diff --git a/src/lib.rs b/src/lib.rs index 7123f8f1..1b8c4544 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,6 @@ pub mod catalog; pub mod cli; pub mod config; pub mod context; -pub mod data_types; pub mod datafusion; pub mod delta_rs; pub mod frontend; diff --git a/src/provider.rs b/src/provider.rs index c3f0c724..abe61f2a 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -21,13 +21,13 @@ use deltalake::DeltaTable; use log::warn; use parking_lot::RwLock; -use crate::data_types::FunctionId; +use crate::repository::interface::FunctionId; use crate::system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA}; use crate::{catalog::STAGING_SCHEMA, wasm_udf::data_types::CreateFunctionDetails}; pub struct SeafowlDatabase { pub name: Arc, - pub collections: HashMap, Arc>, + pub schemas: HashMap, Arc>, pub staging_schema: Arc, pub system_schema: Arc, } @@ -38,7 +38,7 @@ impl CatalogProvider for SeafowlDatabase { } fn schema_names(&self) -> Vec { - self.collections + self.schemas .keys() .map(|s| s.to_string()) .chain([STAGING_SCHEMA.to_string(), SYSTEM_SCHEMA.to_string()]) @@ -51,19 +51,19 @@ impl CatalogProvider for SeafowlDatabase { } else if name == SYSTEM_SCHEMA { Some(self.system_schema.clone()) } else { - self.collections.get(name).map(|c| Arc::clone(c) as _) + self.schemas.get(name).map(|c| Arc::clone(c) as _) } } } -pub struct SeafowlCollection { +pub struct SeafowlSchema { pub name: Arc, // TODO: consider using DashMap instead of RwLock>: https://github.com/xacrimon/conc-map-bench pub tables: RwLock, Arc>>, } #[async_trait] -impl SchemaProvider for SeafowlCollection { +impl SchemaProvider for SeafowlSchema { fn as_any(&self) -> &dyn Any { self } diff --git a/src/repository/default.rs b/src/repository/default.rs index 794b0f6a..8c32983d 100644 --- a/src/repository/default.rs +++ b/src/repository/default.rs @@ -79,7 +79,7 @@ impl Repository for $repo { } async fn get_all_columns_in_database( &self, - database_id: DatabaseId, + name: &str, ) -> Result, Error> { let mut builder: QueryBuilder<_> = QueryBuilder::new($repo::QUERIES.latest_table_versions); @@ -98,8 +98,8 @@ impl Repository for $repo { LEFT JOIN "table" ON collection.id = "table".collection_id LEFT JOIN desired_table_versions ON "table".id = desired_table_versions.table_id LEFT JOIN table_column ON table_column.table_version_id = desired_table_versions.id - WHERE database.id = "#); - builder.push_bind(database_id); + WHERE database.name = "#); + builder.push_bind(name); builder.push(r#" ORDER BY collection_name, table_name, table_version_id, column_name @@ -125,14 +125,26 @@ impl Repository for $repo { Ok(id) } - async fn get_collection_id_by_name( + async fn get_database( + &self, + name: &str, + ) -> Result { + let database = sqlx::query_as(r#"SELECT id, name FROM database WHERE database.name = $1"#) + .bind(name) + .fetch_one(&self.executor) + .await.map_err($repo::interpret_error)?; + + Ok(database) + } + + async fn get_collection( &self, database_name: &str, collection_name: &str, - ) -> Result { - let id = sqlx::query( + ) -> Result { + let collection = sqlx::query_as( r#" - SELECT collection.id + SELECT collection.id, database.id AS database_id, collection.name FROM collection JOIN database ON collection.database_id = database.id WHERE database.name = $1 AND collection.name = $2 "#, @@ -140,34 +152,20 @@ impl Repository for $repo { .bind(database_name) .bind(collection_name) .fetch_one(&self.executor) - .await.map_err($repo::interpret_error)? - .try_get("id").map_err($repo::interpret_error)?; - - Ok(id) - } - - async fn get_database_id_by_name( - &self, - database_name: &str, - ) -> Result { - let id = sqlx::query(r#"SELECT id FROM database WHERE database.name = $1"#) - .bind(database_name) - .fetch_one(&self.executor) - .await.map_err($repo::interpret_error)? - .try_get("id").map_err($repo::interpret_error)?; + .await.map_err($repo::interpret_error)?; - Ok(id) + Ok(collection) } - async fn get_table_id_by_name( + async fn get_table( &self, database_name: &str, collection_name: &str, table_name: &str, - ) -> Result { - let id = sqlx::query( + ) -> Result { + let table = sqlx::query_as( r#" - SELECT "table".id + SELECT "table".id, collection.id as collection_id, "table".name FROM "table" JOIN collection ON "table".collection_id = collection.id JOIN database ON collection.database_id = database.id @@ -178,21 +176,16 @@ impl Repository for $repo { .bind(collection_name) .bind(table_name) .fetch_one(&self.executor) - .await.map_err($repo::interpret_error)? - .try_get("id").map_err($repo::interpret_error)?; + .await.map_err($repo::interpret_error)?; - Ok(id) + Ok(table) } - async fn get_all_database_ids(&self) -> Result> { - let all_db_ids = sqlx::query(r#"SELECT name, id FROM database"#) + async fn list_databases(&self) -> Result> { + sqlx::query_as(r#"SELECT name, id FROM database"#) .fetch_all(&self.executor) - .await.map_err($repo::interpret_error)? - .iter() - .map(|row| (row.get("name"), row.get("id"))) - .collect(); + .await.map_err($repo::interpret_error) - Ok(all_db_ids) } async fn create_collection( @@ -436,6 +429,7 @@ impl Repository for $repo { data, volatility FROM function + WHERE database_id = $1; "#) .bind(database_id) @@ -497,7 +491,7 @@ impl Repository for $repo { Ok(()) } - async fn drop_database(&self, database_id: DatabaseId) -> Result<(), Error> { + async fn delete_database(&self, database_id: DatabaseId) -> Result<(), Error> { self.insert_dropped_tables(None, None, Some(database_id)).await?; sqlx::query("DELETE FROM database WHERE id = $1 RETURNING id") diff --git a/src/repository/interface.rs b/src/repository/interface.rs index ce846402..a98ba1aa 100644 --- a/src/repository/interface.rs +++ b/src/repository/interface.rs @@ -7,11 +7,42 @@ use strum::ParseError; use strum_macros::{Display, EnumString}; use uuid::Uuid; -use crate::data_types::{ - CollectionId, DatabaseId, FunctionId, TableId, TableVersionId, Timestamp, -}; use crate::wasm_udf::data_types::CreateFunctionDetails; +pub type DatabaseId = i64; +pub type CollectionId = i64; +pub type TableId = i64; +pub type TableVersionId = i64; +pub type Timestamp = i64; +pub type FunctionId = i64; + +#[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] +pub struct DatabaseRecord { + pub id: DatabaseId, + pub name: String, +} + +#[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] +pub struct CollectionRecord { + pub id: CollectionId, + pub database_id: DatabaseId, + pub name: String, +} + +#[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] +pub struct TableRecord { + pub id: TableId, + pub collection_id: CollectionId, + pub name: String, +} + +#[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] +pub struct TableVersion { + pub id: TableVersionId, + pub table_id: TableId, + pub creation_time: Timestamp, +} + #[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] pub struct AllDatabaseColumnsResult { pub database_name: String, @@ -96,28 +127,25 @@ pub trait Repository: Send + Sync + Debug { async fn get_all_columns_in_database( &self, - database_id: DatabaseId, + name: &str, ) -> Result, Error>; - async fn get_collection_id_by_name( - &self, - database_name: &str, - collection_name: &str, - ) -> Result; + async fn list_databases(&self) -> Result, Error>; - async fn get_database_id_by_name( + async fn get_database(&self, name: &str) -> Result; + + async fn get_collection( &self, database_name: &str, - ) -> Result; + collection_name: &str, + ) -> Result; - async fn get_table_id_by_name( + async fn get_table( &self, database_name: &str, collection_name: &str, table_name: &str, - ) -> Result; - - async fn get_all_database_ids(&self) -> Result, Error>; + ) -> Result; async fn create_database(&self, database_name: &str) -> Result; @@ -179,7 +207,7 @@ pub trait Repository: Send + Sync + Debug { async fn drop_collection(&self, collection_id: CollectionId) -> Result<(), Error>; - async fn drop_database(&self, database_id: DatabaseId) -> Result<(), Error>; + async fn delete_database(&self, database_id: DatabaseId) -> Result<(), Error>; async fn insert_dropped_tables( &self, @@ -217,11 +245,13 @@ pub mod tests { use super::*; + static TEST_DB: &str = "testdb"; + async fn make_database_with_single_table( repository: Arc, ) -> (DatabaseId, CollectionId, TableId, TableVersionId) { let database_id = repository - .create_database("testdb") + .create_database(TEST_DB) .await .expect("Error creating database"); @@ -330,17 +360,10 @@ pub mod tests { let (database_id, _, table_id, table_version_id) = make_database_with_single_table(repository.clone()).await; - let all_database_ids = repository - .get_all_database_ids() - .await - .expect("Error getting all database ids"); - - assert_eq!(all_database_ids, vec![("testdb".to_string(), database_id)]); - // Test loading all columns let all_columns = repository - .get_all_columns_in_database(database_id) + .get_all_columns_in_database(TEST_DB) .await .expect("Error getting all columns"); @@ -362,7 +385,7 @@ pub mod tests { // Test all columns again: we should have the schema for the latest table version let all_columns = repository - .get_all_columns_in_database(database_id) + .get_all_columns_in_database(TEST_DB) .await .expect("Error getting all columns"); @@ -489,7 +512,7 @@ pub mod tests { .unwrap(); let all_columns = repository - .get_all_columns_in_database(database_id) + .get_all_columns_in_database(TEST_DB) .await .expect("Error getting all columns"); @@ -514,7 +537,7 @@ pub mod tests { .unwrap(); let mut all_columns = repository - .get_all_columns_in_database(database_id) + .get_all_columns_in_database(TEST_DB) .await .expect("Error getting all columns"); all_columns.sort_by_key(|c| c.collection_name.clone()); @@ -556,19 +579,19 @@ pub mod tests { )); // Make a new table in the existing collection with the same name - let collection_id_1 = repository - .get_collection_id_by_name("testdb", "testcol") + let collection_1 = repository + .get_collection("testdb", "testcol") .await .unwrap(); - let collection_id_2 = repository - .get_collection_id_by_name("testdb", "testcol2") + let collection_2 = repository + .get_collection("testdb", "testcol2") .await .unwrap(); assert!(matches!( repository .create_table( - collection_id_2, + collection_2.id, "testtable2", &ArrowSchema::empty(), Uuid::default() @@ -581,7 +604,7 @@ pub mod tests { // Make a new table in the previous collection, try renaming let (new_table_id, _) = repository .create_table( - collection_id_1, + collection_1.id, "testtable2", &ArrowSchema::empty(), Uuid::default(), @@ -591,7 +614,7 @@ pub mod tests { assert!(matches!( repository - .move_table(new_table_id, "testtable2", Some(collection_id_2)) + .move_table(new_table_id, "testtable2", Some(collection_2.id)) .await .unwrap_err(), Error::UniqueConstraintViolation(_) diff --git a/src/repository/postgres.rs b/src/repository/postgres.rs index 3dede0d0..26d73c90 100644 --- a/src/repository/postgres.rs +++ b/src/repository/postgres.rs @@ -11,17 +11,15 @@ use sqlx::{ }; use uuid::Uuid; -use crate::{ - data_types::{CollectionId, DatabaseId, FunctionId, TableId, TableVersionId}, - implement_repository, - wasm_udf::data_types::CreateFunctionDetails, -}; +use crate::{implement_repository, wasm_udf::data_types::CreateFunctionDetails}; use super::{ default::RepositoryQueries, interface::{ - AllDatabaseColumnsResult, AllDatabaseFunctionsResult, DroppedTableDeletionStatus, - DroppedTablesResult, Error, Repository, Result, TableVersionsResult, + AllDatabaseColumnsResult, AllDatabaseFunctionsResult, CollectionId, + CollectionRecord, DatabaseId, DatabaseRecord, DroppedTableDeletionStatus, + DroppedTablesResult, Error, FunctionId, Repository, Result, TableId, TableRecord, + TableVersionId, TableVersionsResult, }, }; diff --git a/src/repository/sqlite.rs b/src/repository/sqlite.rs index 8645d223..13ea413d 100644 --- a/src/repository/sqlite.rs +++ b/src/repository/sqlite.rs @@ -12,18 +12,17 @@ use sqlx::{ }; use uuid::Uuid; -use crate::{ - data_types::{CollectionId, DatabaseId, FunctionId, TableId, TableVersionId}, - wasm_udf::data_types::CreateFunctionDetails, -}; +use crate::wasm_udf::data_types::CreateFunctionDetails; use crate::implement_repository; use super::{ default::RepositoryQueries, interface::{ - AllDatabaseColumnsResult, AllDatabaseFunctionsResult, DroppedTableDeletionStatus, - DroppedTablesResult, Error, Repository, Result, TableVersionsResult, + AllDatabaseColumnsResult, AllDatabaseFunctionsResult, CollectionId, + CollectionRecord, DatabaseId, DatabaseRecord, DroppedTableDeletionStatus, + DroppedTablesResult, Error, FunctionId, Repository, Result, TableId, TableRecord, + TableVersionId, TableVersionsResult, }, }; @@ -155,10 +154,7 @@ mod tests { .unwrap(); assert_eq!( - ro_repository - .get_database_id_by_name("testdb") - .await - .unwrap(), + ro_repository.get_database("testdb").await.unwrap().id, db_id ); } diff --git a/tests/statements/ddl.rs b/tests/statements/ddl.rs index f8ccae91..29b35d86 100644 --- a/tests/statements/ddl.rs +++ b/tests/statements/ddl.rs @@ -205,7 +205,7 @@ async fn test_create_table_move_and_drop( .await .unwrap_err() .to_string() - .contains("Schema \"new_./-~:schema\" does not exist!")); + .contains("Schema \"new_./-~:schema\" doesn't exist")); // Create a schema and move the table to it context diff --git a/tests/statements/mod.rs b/tests/statements/mod.rs index 7e0f956f..35b2f446 100644 --- a/tests/statements/mod.rs +++ b/tests/statements/mod.rs @@ -27,7 +27,7 @@ use tempfile::TempDir; use seafowl::config::context::build_context; use seafowl::config::schema::load_config_from_string; use seafowl::context::SeafowlContext; -use seafowl::data_types::Timestamp; +use seafowl::repository::interface::Timestamp; use seafowl::repository::postgres::testutils::get_random_schema; use seafowl::system_tables::SYSTEM_SCHEMA; From 0e825b86784bd83da4caae459aad9957ff5a1736 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 19 Dec 2023 14:16:47 +0100 Subject: [PATCH 2/7] Separate catalog into separate traits and implement a basic metastore --- src/catalog.rs | 362 +++++++++++++++++++----------------- src/config/context.rs | 37 ++-- src/context/delta.rs | 15 +- src/context/mod.rs | 28 +-- src/context/physical.rs | 63 ++++--- src/repository/default.rs | 8 +- src/repository/interface.rs | 22 +-- src/system_tables.rs | 18 +- src/utils.rs | 9 +- 9 files changed, 294 insertions(+), 268 deletions(-) diff --git a/src/catalog.rs b/src/catalog.rs index 57ac294d..46bc10da 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -143,36 +143,46 @@ impl From for DataFusionError { } } +pub struct Metastore { + pub catalogs: Arc, + pub schemas: Arc, + pub tables: Arc, + pub functions: Arc, +} + #[cfg_attr(test, automock)] #[async_trait] -pub trait TableCatalog: Sync + Send { +pub trait CatalogStore: Sync + Send { + async fn create(&self, catalog_name: &str) -> Result; + async fn load_database(&self, name: &str) -> Result; - async fn get_catalog(&self, name: &str) -> Result; - async fn list_catalogs(&self) -> Result, Error>; + async fn list(&self) -> Result, Error>; - async fn get_schema( - &self, - catalog_name: &str, - schema_name: &str, - ) -> Result; + async fn get(&self, name: &str) -> Result; - async fn get_table( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - ) -> Result; + async fn delete(&self, name: &str) -> Result<()>; +} - async fn create_catalog(&self, catalog_name: &str) -> Result; +#[cfg_attr(test, automock)] +#[async_trait] +pub trait SchemaStore: Sync + Send { + async fn create(&self, catalog_name: &str, schema_name: &str) + -> Result; - async fn create_schema( + async fn get( &self, catalog_name: &str, schema_name: &str, - ) -> Result; + ) -> Result; + + async fn delete(&self, catalog_name: &str, schema_name: &str) -> Result<()>; +} - async fn create_table( +#[cfg_attr(test, automock)] +#[async_trait] +pub trait TableStore: Sync + Send { + async fn create( &self, catalog_name: &str, schema_name: &str, @@ -181,26 +191,33 @@ pub trait TableCatalog: Sync + Send { uuid: Uuid, ) -> Result<(TableId, TableVersionId)>; - async fn delete_old_table_versions( + async fn get( &self, catalog_name: &str, - collection_name: &str, + schema_name: &str, table_name: &str, - ) -> Result; + ) -> Result; - async fn create_new_table_version( + async fn create_new_version( &self, uuid: Uuid, version: i64, ) -> Result; - async fn get_all_table_versions( + async fn delete_old_versions( + &self, + catalog_name: &str, + collection_name: &str, + table_name: &str, + ) -> Result; + + async fn get_all_versions( &self, catalog_name: &str, table_names: Option>, ) -> Result>; - async fn move_table( + async fn update( &self, old_catalog_name: &str, old_schema_name: &str, @@ -210,17 +227,13 @@ pub trait TableCatalog: Sync + Send { new_table_name: &str, ) -> Result<()>; - async fn drop_table( + async fn delete( &self, catalog_name: &str, schema_name: &str, table_name: &str, ) -> Result<()>; - async fn delete_schema(&self, catalog_name: &str, schema_name: &str) -> Result<()>; - - async fn delete_catalog(&self, name: &str) -> Result<()>; - async fn get_dropped_tables( &self, catalog_name: Option, @@ -237,8 +250,8 @@ pub trait TableCatalog: Sync + Send { #[cfg_attr(test, automock)] #[async_trait] -pub trait FunctionCatalog: Sync + Send { - async fn create_function( +pub trait FunctionStore: Sync + Send { + async fn create( &self, catalog_name: &str, function_name: &str, @@ -246,12 +259,9 @@ pub trait FunctionCatalog: Sync + Send { details: &CreateFunctionDetails, ) -> Result; - async fn get_all_functions_in_database( - &self, - catalog_name: &str, - ) -> Result>; + async fn list(&self, catalog_name: &str) -> Result>; - async fn drop_function( + async fn delete( &self, catalog_name: &str, if_exists: bool, @@ -336,7 +346,21 @@ impl DefaultCatalog { } #[async_trait] -impl TableCatalog for DefaultCatalog { +impl CatalogStore for DefaultCatalog { + async fn create(&self, catalog_name: &str) -> Result { + self.repository + .create_database(catalog_name) + .await + .map_err(|e| match e { + RepositoryError::UniqueConstraintViolation(_) => { + Error::CatalogAlreadyExists { + name: catalog_name.to_string(), + } + } + _ => Self::to_sqlx_error(e), + }) + } + async fn load_database(&self, name: &str) -> Result { let all_columns = self .repository @@ -370,49 +394,14 @@ impl TableCatalog for DefaultCatalog { }) } - async fn create_table( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - schema: &Schema, - uuid: Uuid, - ) -> Result<(TableId, TableVersionId)> { - let collection = self.get_schema(catalog_name, schema_name).await?; - - self.repository - .create_table(collection.id, table_name, schema, uuid) - .await - .map_err(|e| match e { - RepositoryError::UniqueConstraintViolation(_) => { - Error::TableAlreadyExists { - name: table_name.to_string(), - } - } - RepositoryError::FKConstraintViolation(_) => Error::SchemaDoesNotExist { - name: schema_name.to_string(), - }, - RepositoryError::SqlxError(e) => Error::SqlxError(e), - }) - } - - async fn delete_old_table_versions( - &self, - catalog_name: &str, - collection_name: &str, - table_name: &str, - ) -> Result { - let table = self - .get_table(catalog_name, collection_name, table_name) - .await?; - - self.repository - .delete_old_table_versions(table.id) - .await - .map_err(Self::to_sqlx_error) + async fn list(&self) -> Result, Error> { + match self.repository.list_databases().await { + Ok(databases) => Ok(databases), + Err(e) => Err(Self::to_sqlx_error(e)), + } } - async fn get_catalog(&self, name: &str) -> Result { + async fn get(&self, name: &str) -> Result { match self.repository.get_database(name).await { Ok(database) => Ok(database), Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => { @@ -424,14 +413,50 @@ impl TableCatalog for DefaultCatalog { } } - async fn list_catalogs(&self) -> Result, Error> { - match self.repository.list_databases().await { - Ok(databases) => Ok(databases), - Err(e) => Err(Self::to_sqlx_error(e)), + async fn delete(&self, name: &str) -> Result<()> { + let database = CatalogStore::get(self, name).await?; + + self.repository + .delete_database(database.id) + .await + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + Error::CatalogDoesNotExist { + name: name.to_string(), + } + } + _ => Self::to_sqlx_error(e), + }) + } +} + +#[async_trait] +impl SchemaStore for DefaultCatalog { + async fn create( + &self, + catalog_name: &str, + schema_name: &str, + ) -> Result { + if schema_name == STAGING_SCHEMA { + return Err(Error::UsedStagingSchema); } + + let database = CatalogStore::get(self, catalog_name).await?; + + self.repository + .create_collection(database.id, schema_name) + .await + .map_err(|e| match e { + RepositoryError::UniqueConstraintViolation(_) => { + Error::SchemaAlreadyExists { + name: schema_name.to_string(), + } + } + _ => Self::to_sqlx_error(e), + }) } - async fn get_schema( + async fn get( &self, catalog_name: &str, schema_name: &str, @@ -455,72 +480,79 @@ impl TableCatalog for DefaultCatalog { } } - async fn get_table( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - ) -> Result { - match self - .repository - .get_table(catalog_name, schema_name, table_name) - .await - { - Ok(table) => Ok(table), - Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => { - Err(Error::TableDoesNotExist { - name: table_name.to_string(), - }) - } - Err(e) => Err(Self::to_sqlx_error(e)), - } - } + async fn delete(&self, catalog_name: &str, schema_name: &str) -> Result<()> { + let schema = SchemaStore::get(self, catalog_name, schema_name).await?; - async fn create_catalog(&self, catalog_name: &str) -> Result { self.repository - .create_database(catalog_name) + .drop_collection(schema.id) .await .map_err(|e| match e { - RepositoryError::UniqueConstraintViolation(_) => { - Error::CatalogAlreadyExists { - name: catalog_name.to_string(), + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + Error::SchemaDoesNotExist { + name: schema_name.to_string(), } } _ => Self::to_sqlx_error(e), }) } +} - async fn create_schema( +#[async_trait] +impl TableStore for DefaultCatalog { + async fn create( &self, catalog_name: &str, schema_name: &str, - ) -> Result { - if schema_name == STAGING_SCHEMA { - return Err(Error::UsedStagingSchema); - } - - let database = self.get_catalog(catalog_name).await?; + table_name: &str, + schema: &Schema, + uuid: Uuid, + ) -> Result<(TableId, TableVersionId)> { + let collection = SchemaStore::get(self, catalog_name, schema_name).await?; self.repository - .create_collection(database.id, schema_name) + .create_table(collection.id, table_name, schema, uuid) .await .map_err(|e| match e { RepositoryError::UniqueConstraintViolation(_) => { - Error::SchemaAlreadyExists { - name: schema_name.to_string(), + Error::TableAlreadyExists { + name: table_name.to_string(), } } - _ => Self::to_sqlx_error(e), + RepositoryError::FKConstraintViolation(_) => Error::SchemaDoesNotExist { + name: schema_name.to_string(), + }, + RepositoryError::SqlxError(e) => Error::SqlxError(e), }) } - async fn create_new_table_version( + async fn get( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> Result { + match self + .repository + .get_table(catalog_name, schema_name, table_name) + .await + { + Ok(table) => Ok(table), + Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => { + Err(Error::TableDoesNotExist { + name: table_name.to_string(), + }) + } + Err(e) => Err(Self::to_sqlx_error(e)), + } + } + + async fn create_new_version( &self, uuid: Uuid, version: i64, ) -> Result { self.repository - .create_new_table_version(uuid, version) + .create_new_version(uuid, version) .await .map_err(|e| match e { RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { @@ -530,18 +562,33 @@ impl TableCatalog for DefaultCatalog { }) } - async fn get_all_table_versions( + async fn delete_old_versions( + &self, + catalog_name: &str, + collection_name: &str, + table_name: &str, + ) -> Result { + let table = + TableStore::get(self, catalog_name, collection_name, table_name).await?; + + self.repository + .delete_old_versions(table.id) + .await + .map_err(Self::to_sqlx_error) + } + + async fn get_all_versions( &self, catalog_name: &str, table_names: Option>, ) -> Result> { self.repository - .get_all_table_versions(catalog_name, table_names) + .get_all_versions(catalog_name, table_names) .await .map_err(Self::to_sqlx_error) } - async fn move_table( + async fn update( &self, old_catalog_name: &str, old_schema_name: &str, @@ -550,18 +597,19 @@ impl TableCatalog for DefaultCatalog { new_schema_name: &str, new_table_name: &str, ) -> Result<()> { - let table = self - .get_table(old_catalog_name, old_schema_name, old_table_name) - .await?; + let table = + TableStore::get(self, old_catalog_name, old_schema_name, old_table_name) + .await?; let new_schema_id = if new_schema_name != old_schema_name { - let schema = self.get_schema(old_catalog_name, new_schema_name).await?; + let schema = + SchemaStore::get(self, old_catalog_name, new_schema_name).await?; Some(schema.id) } else { None }; self.repository - .move_table(table.id, new_table_name, new_schema_id) + .rename_table(table.id, new_table_name, new_schema_id) .await .map_err(|e| match e { RepositoryError::FKConstraintViolation(_) => { @@ -584,15 +632,13 @@ impl TableCatalog for DefaultCatalog { }) } - async fn drop_table( + async fn delete( &self, catalog_name: &str, schema_name: &str, table_name: &str, ) -> Result<()> { - let table = self - .get_table(catalog_name, schema_name, table_name) - .await?; + let table = TableStore::get(self, catalog_name, schema_name, table_name).await?; self.repository .drop_table(table.id) @@ -607,38 +653,6 @@ impl TableCatalog for DefaultCatalog { }) } - async fn delete_schema(&self, catalog_name: &str, schema_name: &str) -> Result<()> { - let schema = self.get_schema(catalog_name, schema_name).await?; - - self.repository - .drop_collection(schema.id) - .await - .map_err(|e| match e { - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::SchemaDoesNotExist { - name: schema_name.to_string(), - } - } - _ => Self::to_sqlx_error(e), - }) - } - - async fn delete_catalog(&self, name: &str) -> Result<()> { - let database = self.get_catalog(name).await?; - - self.repository - .delete_database(database.id) - .await - .map_err(|e| match e { - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::CatalogDoesNotExist { - name: name.to_string(), - } - } - _ => Self::to_sqlx_error(e), - }) - } - async fn get_dropped_tables( &self, catalog_name: Option, @@ -709,15 +723,15 @@ impl DefaultCatalog { } #[async_trait] -impl FunctionCatalog for DefaultCatalog { - async fn create_function( +impl FunctionStore for DefaultCatalog { + async fn create( &self, catalog_name: &str, function_name: &str, or_replace: bool, details: &CreateFunctionDetails, ) -> Result { - let database = self.get_catalog(catalog_name).await?; + let database = CatalogStore::get(self, catalog_name).await?; self.repository .create_function(database.id, function_name, or_replace, details) @@ -735,11 +749,8 @@ impl FunctionCatalog for DefaultCatalog { }) } - async fn get_all_functions_in_database( - &self, - catalog_name: &str, - ) -> Result> { - let database = self.get_catalog(catalog_name).await?; + async fn list(&self, catalog_name: &str) -> Result> { + let database = CatalogStore::get(self, catalog_name).await?; let all_functions = self .repository @@ -763,13 +774,14 @@ impl FunctionCatalog for DefaultCatalog { .collect::>>() } - async fn drop_function( + async fn delete( &self, catalog_name: &str, if_exists: bool, + func_names: &[String], ) -> Result<()> { - let database = self.get_catalog(catalog_name).await?; + let database = CatalogStore::get(self, catalog_name).await?; match self.repository.drop_function(database.id, func_names).await { Ok(id) => Ok(id), diff --git a/src/config/context.rs b/src/config/context.rs index d62d0a0f..e4d2930c 100644 --- a/src/config/context.rs +++ b/src/config/context.rs @@ -2,9 +2,7 @@ use std::collections::HashSet; use std::sync::Arc; use crate::{ - catalog::{ - DefaultCatalog, FunctionCatalog, TableCatalog, DEFAULT_DB, DEFAULT_SCHEMA, - }, + catalog::{DefaultCatalog, DEFAULT_DB, DEFAULT_SCHEMA}, context::SeafowlContext, repository::{interface::Repository, sqlite::SqliteRepository}, }; @@ -20,7 +18,7 @@ use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; #[cfg(feature = "catalog-postgres")] use crate::repository::postgres::PostgresRepository; -use crate::catalog::Error; +use crate::catalog::{Error, Metastore}; use crate::object_store::http::add_http_object_store; use crate::object_store::wrapped::InternalObjectStore; #[cfg(feature = "remote-tables")] @@ -32,10 +30,10 @@ use parking_lot::lock_api::RwLock; use super::schema::{self, GCS, MEBIBYTES, MEMORY_FRACTION, S3}; -async fn build_catalog( +async fn build_metastore( config: &schema::SeafowlConfig, object_store: Arc, -) -> (Arc, Arc) { +) -> Metastore { // Initialize the repository let repository: Arc = match &config.catalog { #[cfg(feature = "catalog-postgres")] @@ -66,7 +64,12 @@ async fn build_catalog( let catalog = Arc::new(DefaultCatalog::new(repository, object_store)); - (catalog.clone(), catalog) + Metastore { + catalogs: catalog.clone(), + schemas: catalog.clone(), + tables: catalog.clone(), + functions: catalog, + } } pub fn build_object_store( @@ -181,21 +184,24 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result = tables - .list_catalogs() + let all_databases: HashSet = metastore + .catalogs + .list() .await? .iter() .map(|db| db.name.clone()) @@ -209,8 +215,7 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result, - pub function_catalog: Arc, + pub metastore: Arc, pub internal_object_store: Arc, pub database: String, pub all_databases: Arc>>, @@ -41,7 +40,7 @@ impl SeafowlContext { let all_databases = self.all_databases.clone(); if maybe_database.is_none() { // Perhaps the db was created on another node; try to reload from catalog - let _ = self.table_catalog.get_catalog(&name).await.map_err(|_| { + let _ = self.metastore.catalogs.get(&name).await.map_err(|_| { DataFusionError::Plan(format!( "Unknown database {name}; try creating one with CREATE DATABASE first" )) @@ -60,8 +59,7 @@ impl SeafowlContext { Ok(Arc::from(SeafowlContext { inner: SessionContext::new_with_state(state), - table_catalog: self.table_catalog.clone(), - function_catalog: self.function_catalog.clone(), + metastore: self.metastore.clone(), internal_object_store: self.internal_object_store.clone(), database: name, all_databases, @@ -88,12 +86,18 @@ impl SeafowlContext { self.inner.register_catalog( &self.database, - Arc::new(self.table_catalog.load_database(&self.database).await?), + Arc::new( + self.metastore + .catalogs + .load_database(&self.database) + .await?, + ), ); // Register all functions in the database - self.function_catalog - .get_all_functions_in_database(&self.database) + self.metastore + .functions + .list(&self.database) .await? .iter() .try_for_each(|f| self.register_function(&f.name, &f.details)) @@ -239,11 +243,7 @@ pub mod test_utils { // Create new non-default database; we're doing this in catalog only to simulate it taking // place on another node - context - .table_catalog - .create_catalog("testdb") - .await - .unwrap(); + context.metastore.catalogs.create("testdb").await.unwrap(); let context = context .scope_to_database("testdb".to_string()) diff --git a/src/context/physical.rs b/src/context/physical.rs index c63d9b67..c612ab17 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -172,8 +172,9 @@ impl SeafowlContext { )) => { // CREATE SCHEMA // Create a schema and register it - self.table_catalog - .create_schema(&self.database, schema_name) + self.metastore + .schemas + .create(&self.database, schema_name) .await?; Ok(make_dummy_exec()) } @@ -182,7 +183,7 @@ impl SeafowlContext { if_not_exists, .. })) => { - if self.table_catalog.get_catalog(catalog_name).await.is_ok() { + if self.metastore.catalogs.get(catalog_name).await.is_ok() { if !*if_not_exists { return Err(DataFusionError::Plan(format!( "Database {catalog_name} already exists" @@ -193,11 +194,12 @@ impl SeafowlContext { } // Persist DB into metadata catalog - self.table_catalog.create_catalog(catalog_name).await?; + self.metastore.catalogs.create(catalog_name).await?; // Create the corresponding default schema as well - self.table_catalog - .create_schema(catalog_name, DEFAULT_SCHEMA) + self.metastore + .schemas + .create(catalog_name, DEFAULT_SCHEMA) .await?; // Update the shared in-memory map of DB names -> ids @@ -374,8 +376,9 @@ impl SeafowlContext { None, ) .await?; - self.table_catalog - .create_new_table_version(uuid, version) + self.metastore + .tables + .create_new_version(uuid, version) .await?; Ok(make_dummy_exec()) @@ -499,8 +502,9 @@ impl SeafowlContext { None, ) .await?; - self.table_catalog - .create_new_table_version(uuid, version) + self.metastore + .tables + .create_new_version(uuid, version) .await?; Ok(make_dummy_exec()) @@ -519,8 +523,9 @@ impl SeafowlContext { return Ok(make_dummy_exec()); } - self.table_catalog - .drop_table( + self.metastore + .tables + .delete( &resolved_ref.catalog, &resolved_ref.schema, &resolved_ref.table, @@ -572,13 +577,9 @@ impl SeafowlContext { self.register_function(name, details)?; // Persist the function in the metadata storage - self.function_catalog - .create_function( - &self.database, - name, - *or_replace, - details, - ) + self.metastore + .functions + .create(&self.database, name, *or_replace, details) .await?; Ok(make_dummy_exec()) @@ -588,8 +589,9 @@ impl SeafowlContext { func_names, output_schema: _, }) => { - self.function_catalog - .drop_function(&self.database, *if_exists, func_names) + self.metastore + .functions + .delete(&self.database, *if_exists, func_names) .await?; Ok(make_dummy_exec()) } @@ -615,8 +617,9 @@ impl SeafowlContext { old_table_ref.resolve(&self.database, DEFAULT_SCHEMA); // Finally update our catalog entry - self.table_catalog - .move_table( + self.metastore + .tables + .update( &resolved_old_ref.catalog, &resolved_old_ref.schema, &resolved_old_ref.table, @@ -628,9 +631,7 @@ impl SeafowlContext { Ok(make_dummy_exec()) } SeafowlExtensionNode::DropSchema(DropSchema { name, .. }) => { - self.table_catalog - .delete_schema(&self.database, name) - .await?; + self.metastore.schemas.delete(&self.database, name).await?; Ok(make_dummy_exec()) } @@ -671,8 +672,9 @@ impl SeafowlContext { } match self - .table_catalog - .delete_old_table_versions( + .metastore + .tables + .delete_old_versions( &resolved_ref.catalog, &resolved_ref.schema, &resolved_ref.table, @@ -785,8 +787,9 @@ impl SeafowlContext { } None => { // Schema doesn't exist; create one first, and then reload to pick it up - self.table_catalog - .create_schema(&self.database, &schema_name) + self.metastore + .schemas + .create(&self.database, &schema_name) .await?; self.reload_schema().await?; false diff --git a/src/repository/default.rs b/src/repository/default.rs index 8c32983d..57c4c49d 100644 --- a/src/repository/default.rs +++ b/src/repository/default.rs @@ -254,7 +254,7 @@ impl Repository for $repo { Ok((new_table_id, new_version_id)) } - async fn delete_old_table_versions( + async fn delete_old_versions( &self, table_id: TableId, ) -> Result { @@ -270,7 +270,7 @@ impl Repository for $repo { Ok(delete_result.rows_affected()) } - async fn create_new_table_version( + async fn create_new_version( &self, uuid: Uuid, version: i64, @@ -308,7 +308,7 @@ impl Repository for $repo { Ok(new_version_id) } - async fn get_all_table_versions( + async fn get_all_versions( &self, database_name: &str, table_names: Option>, @@ -354,7 +354,7 @@ impl Repository for $repo { Ok(table_versions) } - async fn move_table( + async fn rename_table( &self, table_id: TableId, new_table_name: &str, diff --git a/src/repository/interface.rs b/src/repository/interface.rs index a98ba1aa..d14c529a 100644 --- a/src/repository/interface.rs +++ b/src/repository/interface.rs @@ -163,21 +163,21 @@ pub trait Repository: Send + Sync + Debug { uuid: Uuid, ) -> Result<(TableId, TableVersionId), Error>; - async fn delete_old_table_versions(&self, table_id: TableId) -> Result; + async fn delete_old_versions(&self, table_id: TableId) -> Result; - async fn create_new_table_version( + async fn create_new_version( &self, uuid: Uuid, version: i64, ) -> Result; - async fn get_all_table_versions( + async fn get_all_versions( &self, database_name: &str, table_names: Option>, ) -> Result>; - async fn move_table( + async fn rename_table( &self, table_id: TableId, new_table_name: &str, @@ -379,7 +379,7 @@ pub mod tests { // Duplicate the table let new_version_id = repository - .create_new_table_version(Uuid::default(), 1) + .create_new_version(Uuid::default(), 1) .await .unwrap(); @@ -401,7 +401,7 @@ pub mod tests { // Check the existing table versions let all_table_versions: Vec = repository - .get_all_table_versions("testdb", Some(vec!["testtable".to_string()])) + .get_all_versions("testdb", Some(vec!["testtable".to_string()])) .await .expect("Error getting all columns") .iter() @@ -507,7 +507,7 @@ pub mod tests { ) { // Rename the table to something else repository - .move_table(table_id, "testtable2", None) + .rename_table(table_id, "testtable2", None) .await .unwrap(); @@ -532,7 +532,7 @@ pub mod tests { .await .unwrap(); repository - .move_table(table_id, "testtable2", Some(collection_id)) + .rename_table(table_id, "testtable2", Some(collection_id)) .await .unwrap(); @@ -563,7 +563,7 @@ pub mod tests { // Nonexistent table ID assert!(matches!( repository - .move_table(-1, "doesntmatter", None) + .rename_table(-1, "doesntmatter", None) .await .unwrap_err(), Error::SqlxError(sqlx::Error::RowNotFound) @@ -572,7 +572,7 @@ pub mod tests { // Existing table ID, moved to a nonexistent collection (FK violation) assert!(matches!( repository - .move_table(table_id, "doesntmatter", Some(-1)) + .rename_table(table_id, "doesntmatter", Some(-1)) .await .unwrap_err(), Error::FKConstraintViolation(_) @@ -614,7 +614,7 @@ pub mod tests { assert!(matches!( repository - .move_table(new_table_id, "testtable2", Some(collection_2.id)) + .rename_table(new_table_id, "testtable2", Some(collection_2.id)) .await .unwrap_err(), Error::UniqueConstraintViolation(_) diff --git a/src/system_tables.rs b/src/system_tables.rs index 6e2ceeb2..c3e36716 100644 --- a/src/system_tables.rs +++ b/src/system_tables.rs @@ -1,7 +1,7 @@ //! Mechanism for creating virtual Seafowl system tables, inspired by influxdb_iox system tables //! and datafusion's information_schema. -use crate::catalog::TableCatalog; +use crate::catalog::TableStore; use crate::repository::interface::DroppedTablesResult; use arrow::array::{Int64Builder, StringBuilder, StructBuilder, TimestampSecondBuilder}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; @@ -24,11 +24,11 @@ const DROPPED_TABLES: &str = "dropped_tables"; pub struct SystemSchemaProvider { database: Arc, - table_catalog: Arc, + table_catalog: Arc, } impl SystemSchemaProvider { - pub fn new(database: Arc, table_catalog: Arc) -> Self { + pub fn new(database: Arc, table_catalog: Arc) -> Self { Self { database, table_catalog, @@ -131,13 +131,13 @@ where struct TableVersionsTable { database: Arc, schema: SchemaRef, - table_catalog: Arc, + table_catalog: Arc, } impl TableVersionsTable { - fn new(database: Arc, table_catalog: Arc) -> Self { + fn new(database: Arc, table_catalog: Arc) -> Self { Self { - // This is dictated by the output of `get_all_table_versions`, except that we omit the + // This is dictated by the output of `get_all_versions`, except that we omit the // database_name field, since we scope down to the database at hand. database, schema: Arc::new(Schema::new(vec![ @@ -165,7 +165,7 @@ impl SeafowlSystemTable for TableVersionsTable { async fn load_record_batch(&self) -> Result { let table_versions = self .table_catalog - .get_all_table_versions(&self.database, None) + .get_all_versions(&self.database, None) .await?; let mut builder = StructBuilder::from_fields( @@ -210,11 +210,11 @@ impl SeafowlSystemTable for TableVersionsTable { struct DroppedTablesTable { database: Arc, schema: SchemaRef, - table_catalog: Arc, + table_catalog: Arc, } impl DroppedTablesTable { - fn new(database: Arc, table_catalog: Arc) -> Self { + fn new(database: Arc, table_catalog: Arc) -> Self { Self { // This is dictated by the output of `get_dropped_tables`, except that we omit the // database_name field, since we scope down to the database at hand. diff --git a/src/utils.rs b/src/utils.rs index 6234e887..b75479d7 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -46,7 +46,8 @@ pub async fn run_one_off_command( // Physically delete dropped tables for a given context pub async fn gc_databases(context: &SeafowlContext, database_name: Option) { let mut dropped_tables = context - .table_catalog + .metastore + .tables .get_dropped_tables(database_name) .await .unwrap_or_else(|err| { @@ -82,7 +83,8 @@ pub async fn gc_databases(context: &SeafowlContext, database_name: Option Date: Tue, 19 Dec 2023 17:44:41 +0100 Subject: [PATCH 3/7] Some polishing --- src/catalog.rs | 19 ++++--------------- src/config/context.rs | 11 ----------- src/context/mod.rs | 20 +------------------- src/context/physical.rs | 3 --- src/frontend/http.rs | 2 +- src/repository/default.rs | 19 ------------------- src/repository/interface.rs | 22 ++++------------------ 7 files changed, 10 insertions(+), 86 deletions(-) diff --git a/src/catalog.rs b/src/catalog.rs index 46bc10da..6ff98467 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -8,8 +8,6 @@ use datafusion::datasource::TableProvider; use datafusion::error::DataFusionError; use deltalake::DeltaTable; use itertools::Itertools; -#[cfg(test)] -use mockall::automock; use parking_lot::RwLock; use uuid::Uuid; @@ -143,6 +141,10 @@ impl From for DataFusionError { } } +// This is the main entrypoint to all individual catalogs for various objects types. +// The intention is to make it extensible and de-coupled from the underlying metastore +// persistence mechanism (such as the presently used `Repository`). +#[derive(Clone)] pub struct Metastore { pub catalogs: Arc, pub schemas: Arc, @@ -150,21 +152,17 @@ pub struct Metastore { pub functions: Arc, } -#[cfg_attr(test, automock)] #[async_trait] pub trait CatalogStore: Sync + Send { async fn create(&self, catalog_name: &str) -> Result; async fn load_database(&self, name: &str) -> Result; - async fn list(&self) -> Result, Error>; - async fn get(&self, name: &str) -> Result; async fn delete(&self, name: &str) -> Result<()>; } -#[cfg_attr(test, automock)] #[async_trait] pub trait SchemaStore: Sync + Send { async fn create(&self, catalog_name: &str, schema_name: &str) @@ -179,7 +177,6 @@ pub trait SchemaStore: Sync + Send { async fn delete(&self, catalog_name: &str, schema_name: &str) -> Result<()>; } -#[cfg_attr(test, automock)] #[async_trait] pub trait TableStore: Sync + Send { async fn create( @@ -248,7 +245,6 @@ pub trait TableStore: Sync + Send { async fn delete_dropped_table(&self, uuid: Uuid) -> Result<()>; } -#[cfg_attr(test, automock)] #[async_trait] pub trait FunctionStore: Sync + Send { async fn create( @@ -394,13 +390,6 @@ impl CatalogStore for DefaultCatalog { }) } - async fn list(&self) -> Result, Error> { - match self.repository.list_databases().await { - Ok(databases) => Ok(databases), - Err(e) => Err(Self::to_sqlx_error(e)), - } - } - async fn get(&self, name: &str) -> Result { match self.repository.get_database(name).await { Ok(database) => Ok(database), diff --git a/src/config/context.rs b/src/config/context.rs index e4d2930c..7fdaf99e 100644 --- a/src/config/context.rs +++ b/src/config/context.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::sync::Arc; use crate::{ @@ -26,7 +25,6 @@ use datafusion_remote_tables::factory::RemoteTableFactory; #[cfg(feature = "object-store-s3")] use object_store::aws::AmazonS3Builder; use object_store::gcp::GoogleCloudStorageBuilder; -use parking_lot::lock_api::RwLock; use super::schema::{self, GCS, MEBIBYTES, MEMORY_FRACTION, S3}; @@ -199,14 +197,6 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result = metastore - .catalogs - .list() - .await? - .iter() - .map(|db| db.name.clone()) - .collect(); - // Convergence doesn't support connecting to different DB names. We are supposed // to do one context per query (as we need to load the schema before executing every // query) and per database (since the context is supposed to be limited to the database @@ -218,7 +208,6 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result, pub internal_object_store: Arc, pub database: String, - pub all_databases: Arc>>, pub max_partition_size: u32, } impl SeafowlContext { // Create a new `SeafowlContext` with a new inner context scoped to a different default DB pub async fn scope_to_database(&self, name: String) -> Result> { - // TODO: do we need this? The only goal here is to keep the list of databases in memory and - // fail early if one is missing, but we can defer that to resolution time. - let maybe_database = self.all_databases.read().get(&name).cloned(); - let all_databases = self.all_databases.clone(); - if maybe_database.is_none() { - // Perhaps the db was created on another node; try to reload from catalog - let _ = self.metastore.catalogs.get(&name).await.map_err(|_| { - DataFusionError::Plan(format!( - "Unknown database {name}; try creating one with CREATE DATABASE first" - )) - })?; - all_databases.write().insert(name.clone()); - }; - - // Swap the default database in the new internal context's session config + // Swap the default catalog in the new internal context's session config let session_config = self .inner() .copied_config() @@ -62,7 +45,6 @@ impl SeafowlContext { metastore: self.metastore.clone(), internal_object_store: self.internal_object_store.clone(), database: name, - all_databases, max_partition_size: self.max_partition_size, })) } diff --git a/src/context/physical.rs b/src/context/physical.rs index c612ab17..5ff5bfb6 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -202,9 +202,6 @@ impl SeafowlContext { .create(catalog_name, DEFAULT_SCHEMA) .await?; - // Update the shared in-memory map of DB names -> ids - self.all_databases.write().insert(catalog_name.clone()); - Ok(make_dummy_exec()) } LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable { diff --git a/src/frontend/http.rs b/src/frontend/http.rs index cd1564af..c55e4b21 100644 --- a/src/frontend/http.rs +++ b/src/frontend/http.rs @@ -1023,7 +1023,7 @@ pub mod tests { assert_eq!(resp.status(), StatusCode::BAD_REQUEST); assert_eq!( resp.body(), - "Error during planning: Unknown database missing_db; try creating one with CREATE DATABASE first" + "Error during planning: Database \"missing_db\" doesn't exist" ); } diff --git a/src/repository/default.rs b/src/repository/default.rs index 57c4c49d..4d3e705d 100644 --- a/src/repository/default.rs +++ b/src/repository/default.rs @@ -65,18 +65,6 @@ impl Repository for $repo { .expect("error running migrations"); } - async fn get_collections_in_database( - &self, - database_id: DatabaseId, - ) -> Result, Error> { - let names = sqlx::query("SELECT name FROM collection WHERE database_id = $1") - .bind(database_id) - .fetch(&self.executor) - .map_ok(|row| row.get("name")) - .try_collect() - .await.map_err($repo::interpret_error)?; - Ok(names) - } async fn get_all_columns_in_database( &self, name: &str, @@ -181,13 +169,6 @@ impl Repository for $repo { Ok(table) } - async fn list_databases(&self) -> Result> { - sqlx::query_as(r#"SELECT name, id FROM database"#) - .fetch_all(&self.executor) - .await.map_err($repo::interpret_error) - - } - async fn create_collection( &self, database_id: DatabaseId, diff --git a/src/repository/interface.rs b/src/repository/interface.rs index d14c529a..35984d3a 100644 --- a/src/repository/interface.rs +++ b/src/repository/interface.rs @@ -36,13 +36,6 @@ pub struct TableRecord { pub name: String, } -#[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] -pub struct TableVersion { - pub id: TableVersionId, - pub table_id: TableId, - pub creation_time: Timestamp, -} - #[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] pub struct AllDatabaseColumnsResult { pub database_name: String, @@ -120,18 +113,11 @@ pub type Result = std::result::Result; pub trait Repository: Send + Sync + Debug { async fn setup(&self); - async fn get_collections_in_database( - &self, - database_id: DatabaseId, - ) -> Result, Error>; - async fn get_all_columns_in_database( &self, name: &str, ) -> Result, Error>; - async fn list_databases(&self) -> Result, Error>; - async fn get_database(&self, name: &str) -> Result; async fn get_collection( @@ -289,7 +275,7 @@ pub mod tests { } pub async fn run_generic_repository_tests(repository: Arc) { - test_get_collections_empty(repository.clone()).await; + test_get_tables_empty(repository.clone()).await; let (database_id, table_id, table_version_id) = test_create_database_collection_table(repository.clone()).await; test_create_functions(repository.clone(), database_id).await; @@ -303,13 +289,13 @@ pub mod tests { test_error_propagation(repository, table_id).await; } - async fn test_get_collections_empty(repository: Arc) { + async fn test_get_tables_empty(repository: Arc) { assert_eq!( repository - .get_collections_in_database(0) + .get_all_columns_in_database(TEST_DB) .await .expect("error getting collections"), - Vec::::new() + Vec::::new() ); } From e8667c6b1919d267c9cf4a4fd5638cfecb6bd903 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 20 Dec 2023 09:43:46 +0100 Subject: [PATCH 4/7] Implement standard conversion from Repository to Catalog error --- Cargo.lock | 71 +------------------------------------------------- Cargo.toml | 1 - src/catalog.rs | 71 ++++++++++++++++++++++---------------------------- 3 files changed, 32 insertions(+), 111 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f0cc1638..d49f1f27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,7 +410,7 @@ dependencies = [ "anstyle", "bstr", "doc-comment", - "predicates 3.0.4", + "predicates", "predicates-core", "predicates-tree", "wait-timeout", @@ -1994,12 +1994,6 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" -[[package]] -name = "downcast" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" - [[package]] name = "dynamodb_lock" version = "0.6.1" @@ -2253,15 +2247,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "float-cmp" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" -dependencies = [ - "num-traits", -] - [[package]] name = "flume" version = "0.11.0" @@ -2303,12 +2288,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fragile" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" - [[package]] name = "frunk" version = "0.4.2" @@ -3433,33 +3412,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "mockall" -version = "0.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96" -dependencies = [ - "cfg-if", - "downcast", - "fragile", - "lazy_static", - "mockall_derive", - "predicates 2.1.5", - "predicates-tree", -] - -[[package]] -name = "mockall_derive" -version = "0.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb" -dependencies = [ - "cfg-if", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "moka" version = "0.11.3" @@ -3630,12 +3582,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "normalize-line-endings" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" - [[package]] name = "ntapi" version = "0.4.1" @@ -4248,20 +4194,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "predicates" -version = "2.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" -dependencies = [ - "difflib", - "float-cmp", - "itertools 0.10.5", - "normalize-line-endings", - "predicates-core", - "regex", -] - [[package]] name = "predicates" version = "3.0.4" @@ -5294,7 +5226,6 @@ dependencies = [ "itertools 0.11.0", "lazy_static", "log", - "mockall", "moka", "object_store", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index c0d6709a..adecb626 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,6 @@ wasmtime-wasi = "14.0.0" [dev-dependencies] assert_cmd = "2" assert_unordered = "0.3" -mockall = "0.11.1" rstest = "*" serial_test = "2" wiremock = "0.5" diff --git a/src/catalog.rs b/src/catalog.rs index 6ff98467..8419903e 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -287,14 +287,6 @@ impl DefaultCatalog { } } - fn to_sqlx_error(error: RepositoryError) -> Error { - Error::SqlxError(match error { - RepositoryError::UniqueConstraintViolation(e) => e, - RepositoryError::FKConstraintViolation(e) => e, - RepositoryError::SqlxError(e) => e, - }) - } - fn build_table( &self, table_name: &str, @@ -341,6 +333,16 @@ impl DefaultCatalog { } } +impl From for Error { + fn from(err: RepositoryError) -> Error { + Error::SqlxError(match err { + RepositoryError::UniqueConstraintViolation(e) => e, + RepositoryError::FKConstraintViolation(e) => e, + RepositoryError::SqlxError(e) => e, + }) + } +} + #[async_trait] impl CatalogStore for DefaultCatalog { async fn create(&self, catalog_name: &str) -> Result { @@ -353,16 +355,12 @@ impl CatalogStore for DefaultCatalog { name: catalog_name.to_string(), } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } async fn load_database(&self, name: &str) -> Result { - let all_columns = self - .repository - .get_all_columns_in_database(name) - .await - .map_err(Self::to_sqlx_error)?; + let all_columns = self.repository.get_all_columns_in_database(name).await?; // NB we can't distinguish between a database without tables and a database // that doesn't exist at all due to our query. @@ -398,7 +396,7 @@ impl CatalogStore for DefaultCatalog { name: name.to_string(), }) } - Err(e) => Err(Self::to_sqlx_error(e)), + Err(e) => Err(e.into()), } } @@ -414,7 +412,7 @@ impl CatalogStore for DefaultCatalog { name: name.to_string(), } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } } @@ -441,7 +439,7 @@ impl SchemaStore for DefaultCatalog { name: schema_name.to_string(), } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } @@ -465,7 +463,7 @@ impl SchemaStore for DefaultCatalog { name: schema_name.to_string(), }) } - Err(e) => Err(Self::to_sqlx_error(e)), + Err(e) => Err(e.into()), } } @@ -481,7 +479,7 @@ impl SchemaStore for DefaultCatalog { name: schema_name.to_string(), } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } } @@ -531,7 +529,7 @@ impl TableStore for DefaultCatalog { name: table_name.to_string(), }) } - Err(e) => Err(Self::to_sqlx_error(e)), + Err(e) => Err(e.into()), } } @@ -547,7 +545,7 @@ impl TableStore for DefaultCatalog { RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { Error::TableUuidDoesNotExist { uuid } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } @@ -560,10 +558,7 @@ impl TableStore for DefaultCatalog { let table = TableStore::get(self, catalog_name, collection_name, table_name).await?; - self.repository - .delete_old_versions(table.id) - .await - .map_err(Self::to_sqlx_error) + Ok(self.repository.delete_old_versions(table.id).await?) } async fn get_all_versions( @@ -571,10 +566,10 @@ impl TableStore for DefaultCatalog { catalog_name: &str, table_names: Option>, ) -> Result> { - self.repository + Ok(self + .repository .get_all_versions(catalog_name, table_names) - .await - .map_err(Self::to_sqlx_error) + .await?) } async fn update( @@ -617,7 +612,7 @@ impl TableStore for DefaultCatalog { name: old_table_name.to_string(), } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } @@ -638,7 +633,7 @@ impl TableStore for DefaultCatalog { name: table_name.to_string(), } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } @@ -646,10 +641,7 @@ impl TableStore for DefaultCatalog { &self, catalog_name: Option, ) -> Result> { - self.repository - .get_dropped_tables(catalog_name) - .await - .map_err(Self::to_sqlx_error) + Ok(self.repository.get_dropped_tables(catalog_name).await?) } async fn update_dropped_table( @@ -664,7 +656,7 @@ impl TableStore for DefaultCatalog { RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { Error::TableUuidDoesNotExist { uuid } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } @@ -676,7 +668,7 @@ impl TableStore for DefaultCatalog { RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { Error::TableUuidDoesNotExist { uuid } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } } @@ -734,7 +726,7 @@ impl FunctionStore for DefaultCatalog { name: function_name.to_string(), } } - _ => Self::to_sqlx_error(e), + e => e.into(), }) } @@ -744,8 +736,7 @@ impl FunctionStore for DefaultCatalog { let all_functions = self .repository .get_all_functions_in_database(database.id) - .await - .map_err(Self::to_sqlx_error)?; + .await?; all_functions .iter() @@ -788,7 +779,7 @@ impl FunctionStore for DefaultCatalog { }) } } - Err(e) => Err(Self::to_sqlx_error(e)), + Err(e) => Err(e.into()), } } } From 995facc500cf427f7e3f0ae599e389330528b5e2 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 20 Dec 2023 11:22:52 +0100 Subject: [PATCH 5/7] Move building of catalog and functions into the new Metastore --- src/catalog.rs | 333 +++++++++++++++++++----------------- src/config/context.rs | 11 +- src/context/mod.rs | 10 +- src/repository/default.rs | 6 +- src/repository/interface.rs | 14 +- 5 files changed, 187 insertions(+), 187 deletions(-) diff --git a/src/catalog.rs b/src/catalog.rs index 8419903e..3d90e4f1 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -150,13 +150,156 @@ pub struct Metastore { pub schemas: Arc, pub tables: Arc, pub functions: Arc, + staging_schema: Arc, + object_store: Arc, +} + +pub struct RepositoryStore { + pub repository: Arc, +} + +impl Metastore { + pub fn new_from_repository( + repository: Arc, + object_store: Arc, + ) -> Self { + let repository_store = Arc::new(RepositoryStore { repository }); + + let staging_schema = Arc::new(MemorySchemaProvider::new()); + Self { + catalogs: repository_store.clone(), + schemas: repository_store.clone(), + tables: repository_store.clone(), + functions: repository_store, + staging_schema, + object_store, + } + } + + pub async fn build_catalog(&self, catalog_name: &str) -> Result { + let all_columns = self.schemas.list(catalog_name).await?; + + // NB we can't distinguish between a database without tables and a database + // that doesn't exist at all due to our query. + + // Turn the list of all collections, tables and their columns into a nested map. + + let schemas: HashMap, Arc> = all_columns + .iter() + .group_by(|col| &col.collection_name) + .into_iter() + .map(|(cn, cc)| self.build_schema(cn, cc)) + .collect(); + + let name: Arc = Arc::from(catalog_name); + + Ok(SeafowlDatabase { + name: name.clone(), + schemas, + staging_schema: self.staging_schema.clone(), + system_schema: Arc::new(SystemSchemaProvider::new(name, self.tables.clone())), + }) + } + + fn build_schema<'a, I>( + &self, + collection_name: &str, + collection_columns: I, + ) -> (Arc, Arc) + where + I: Iterator, + { + let tables = collection_columns + .filter_map(|col| { + if let Some(table_name) = &col.table_name + && let Some(table_uuid) = col.table_uuid + { + Some(self.build_table(table_name, table_uuid)) + } else { + None + } + }) + .collect::>(); + + ( + Arc::from(collection_name.to_string()), + Arc::new(SeafowlSchema { + name: Arc::from(collection_name.to_string()), + tables: RwLock::new(tables), + }), + ) + } + + fn build_table( + &self, + table_name: &str, + table_uuid: Uuid, + ) -> (Arc, Arc) { + // Build a delta table but don't load it yet; we'll do that only for tables that are + // actually referenced in a statement, via the async `table` method of the schema provider. + // TODO: this means that any `information_schema.columns` query will serially load all + // delta tables present in the database. The real fix for this is to make DF use `TableSource` + // for the information schema, and then implement `TableSource` for `DeltaTable` in delta-rs. + let table_log_store = self.object_store.get_log_store(table_uuid); + + let table = DeltaTable::new(table_log_store, Default::default()); + (Arc::from(table_name.to_string()), Arc::new(table) as _) + } + + pub async fn build_functions( + &self, + catalog_name: &str, + ) -> Result> { + let functions = self.functions.list(catalog_name).await?; + + functions + .iter() + .map(|item| { + Self::parse_create_function_details(item) + .map(|details| SeafowlFunction { + function_id: item.id, + name: item.name.to_owned(), + details, + }) + .map_err(|e| Error::FunctionDeserializationError { + reason: e.message, + }) + }) + .collect::>>() + } + + fn parse_create_function_details( + item: &AllDatabaseFunctionsResult, + ) -> std::result::Result { + let AllDatabaseFunctionsResult { + id: _, + name: _, + entrypoint, + language, + input_types, + return_type, + data, + volatility, + } = item; + + Ok(CreateFunctionDetails { + entrypoint: entrypoint.to_string(), + language: CreateFunctionLanguage::from_str(language.as_str())?, + input_types: serde_json::from_str::>( + input_types, + )?, + return_type: CreateFunctionDataType::from_str( + &return_type.as_str().to_ascii_uppercase(), + )?, + data: data.to_string(), + volatility: CreateFunctionVolatility::from_str(volatility.as_str())?, + }) + } } #[async_trait] pub trait CatalogStore: Sync + Send { - async fn create(&self, catalog_name: &str) -> Result; - - async fn load_database(&self, name: &str) -> Result; + async fn create(&self, name: &str) -> Result; async fn get(&self, name: &str) -> Result; @@ -168,6 +311,11 @@ pub trait SchemaStore: Sync + Send { async fn create(&self, catalog_name: &str, schema_name: &str) -> Result; + async fn list( + &self, + catalog_name: &str, + ) -> Result, Error>; + async fn get( &self, catalog_name: &str, @@ -204,7 +352,7 @@ pub trait TableStore: Sync + Send { async fn delete_old_versions( &self, catalog_name: &str, - collection_name: &str, + schema_name: &str, table_name: &str, ) -> Result; @@ -255,7 +403,7 @@ pub trait FunctionStore: Sync + Send { details: &CreateFunctionDetails, ) -> Result; - async fn list(&self, catalog_name: &str) -> Result>; + async fn list(&self, catalog_name: &str) -> Result>; async fn delete( &self, @@ -265,74 +413,6 @@ pub trait FunctionStore: Sync + Send { ) -> Result<()>; } -#[derive(Clone)] -pub struct DefaultCatalog { - repository: Arc, - - // DataFusion's in-memory schema provider for staging external tables - staging_schema: Arc, - object_store: Arc, -} - -impl DefaultCatalog { - pub fn new( - repository: Arc, - object_store: Arc, - ) -> Self { - let staging_schema = Arc::new(MemorySchemaProvider::new()); - Self { - repository, - staging_schema, - object_store, - } - } - - fn build_table( - &self, - table_name: &str, - table_uuid: Uuid, - ) -> (Arc, Arc) { - // Build a delta table but don't load it yet; we'll do that only for tables that are - // actually referenced in a statement, via the async `table` method of the schema provider. - // TODO: this means that any `information_schema.columns` query will serially load all - // delta tables present in the database. The real fix for this is to make DF use `TableSource` - // for the information schema, and then implement `TableSource` for `DeltaTable` in delta-rs. - let table_log_store = self.object_store.get_log_store(table_uuid); - - let table = DeltaTable::new(table_log_store, Default::default()); - (Arc::from(table_name.to_string()), Arc::new(table) as _) - } - - fn build_schema<'a, I>( - &self, - collection_name: &str, - collection_columns: I, - ) -> (Arc, Arc) - where - I: Iterator, - { - let tables = collection_columns - .filter_map(|col| { - if let Some(table_name) = &col.table_name - && let Some(table_uuid) = col.table_uuid - { - Some(self.build_table(table_name, table_uuid)) - } else { - None - } - }) - .collect::>(); - - ( - Arc::from(collection_name.to_string()), - Arc::new(SeafowlSchema { - name: Arc::from(collection_name.to_string()), - tables: RwLock::new(tables), - }), - ) - } -} - impl From for Error { fn from(err: RepositoryError) -> Error { Error::SqlxError(match err { @@ -344,50 +424,22 @@ impl From for Error { } #[async_trait] -impl CatalogStore for DefaultCatalog { - async fn create(&self, catalog_name: &str) -> Result { + +impl CatalogStore for RepositoryStore { + async fn create(&self, name: &str) -> Result { self.repository - .create_database(catalog_name) + .create_database(name) .await .map_err(|e| match e { RepositoryError::UniqueConstraintViolation(_) => { Error::CatalogAlreadyExists { - name: catalog_name.to_string(), + name: name.to_string(), } } e => e.into(), }) } - async fn load_database(&self, name: &str) -> Result { - let all_columns = self.repository.get_all_columns_in_database(name).await?; - - // NB we can't distinguish between a database without tables and a database - // that doesn't exist at all due to our query. - - // Turn the list of all collections, tables and their columns into a nested map. - - let schemas: HashMap, Arc> = all_columns - .iter() - .group_by(|col| &col.collection_name) - .into_iter() - .map(|(cn, cc)| self.build_schema(cn, cc)) - .collect(); - - // TODO load the database name too - let name: Arc = Arc::from(DEFAULT_DB); - - Ok(SeafowlDatabase { - name: name.clone(), - schemas, - staging_schema: self.staging_schema.clone(), - system_schema: Arc::new(SystemSchemaProvider::new( - name, - Arc::new(self.clone()), - )), - }) - } - async fn get(&self, name: &str) -> Result { match self.repository.get_database(name).await { Ok(database) => Ok(database), @@ -418,7 +470,7 @@ impl CatalogStore for DefaultCatalog { } #[async_trait] -impl SchemaStore for DefaultCatalog { +impl SchemaStore for RepositoryStore { async fn create( &self, catalog_name: &str, @@ -443,6 +495,13 @@ impl SchemaStore for DefaultCatalog { }) } + async fn list( + &self, + catalog_name: &str, + ) -> Result, Error> { + Ok(self.repository.list_collections(catalog_name).await?) + } + async fn get( &self, catalog_name: &str, @@ -485,7 +544,7 @@ impl SchemaStore for DefaultCatalog { } #[async_trait] -impl TableStore for DefaultCatalog { +impl TableStore for RepositoryStore { async fn create( &self, catalog_name: &str, @@ -552,11 +611,10 @@ impl TableStore for DefaultCatalog { async fn delete_old_versions( &self, catalog_name: &str, - collection_name: &str, + schema_name: &str, table_name: &str, ) -> Result { - let table = - TableStore::get(self, catalog_name, collection_name, table_name).await?; + let table = TableStore::get(self, catalog_name, schema_name, table_name).await?; Ok(self.repository.delete_old_versions(table.id).await?) } @@ -673,38 +731,8 @@ impl TableStore for DefaultCatalog { } } -impl DefaultCatalog { - fn parse_create_function_details( - item: &AllDatabaseFunctionsResult, - ) -> std::result::Result { - let AllDatabaseFunctionsResult { - id: _, - name: _, - entrypoint, - language, - input_types, - return_type, - data, - volatility, - } = item; - - Ok(CreateFunctionDetails { - entrypoint: entrypoint.to_string(), - language: CreateFunctionLanguage::from_str(language.as_str())?, - input_types: serde_json::from_str::>( - input_types, - )?, - return_type: CreateFunctionDataType::from_str( - &return_type.as_str().to_ascii_uppercase(), - )?, - data: data.to_string(), - volatility: CreateFunctionVolatility::from_str(volatility.as_str())?, - }) - } -} - #[async_trait] -impl FunctionStore for DefaultCatalog { +impl FunctionStore for RepositoryStore { async fn create( &self, catalog_name: &str, @@ -730,28 +758,13 @@ impl FunctionStore for DefaultCatalog { }) } - async fn list(&self, catalog_name: &str) -> Result> { + async fn list(&self, catalog_name: &str) -> Result> { let database = CatalogStore::get(self, catalog_name).await?; - let all_functions = self + Ok(self .repository .get_all_functions_in_database(database.id) - .await?; - - all_functions - .iter() - .map(|item| { - Self::parse_create_function_details(item) - .map(|details| SeafowlFunction { - function_id: item.id, - name: item.name.to_owned(), - details, - }) - .map_err(|e| Error::FunctionDeserializationError { - reason: e.message, - }) - }) - .collect::>>() + .await?) } async fn delete( diff --git a/src/config/context.rs b/src/config/context.rs index 7fdaf99e..e43bde77 100644 --- a/src/config/context.rs +++ b/src/config/context.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::{ - catalog::{DefaultCatalog, DEFAULT_DB, DEFAULT_SCHEMA}, + catalog::{DEFAULT_DB, DEFAULT_SCHEMA}, context::SeafowlContext, repository::{interface::Repository, sqlite::SqliteRepository}, }; @@ -60,14 +60,7 @@ async fn build_metastore( ), }; - let catalog = Arc::new(DefaultCatalog::new(repository, object_store)); - - Metastore { - catalogs: catalog.clone(), - schemas: catalog.clone(), - tables: catalog.clone(), - functions: catalog, - } + Metastore::new_from_repository(repository, object_store) } pub fn build_object_store( diff --git a/src/context/mod.rs b/src/context/mod.rs index 53280cfe..99f1ff33 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -68,18 +68,12 @@ impl SeafowlContext { self.inner.register_catalog( &self.database, - Arc::new( - self.metastore - .catalogs - .load_database(&self.database) - .await?, - ), + Arc::new(self.metastore.build_catalog(&self.database).await?), ); // Register all functions in the database self.metastore - .functions - .list(&self.database) + .build_functions(&self.database) .await? .iter() .try_for_each(|f| self.register_function(&f.name, &f.details)) diff --git a/src/repository/default.rs b/src/repository/default.rs index 4d3e705d..439848ba 100644 --- a/src/repository/default.rs +++ b/src/repository/default.rs @@ -65,9 +65,9 @@ impl Repository for $repo { .expect("error running migrations"); } - async fn get_all_columns_in_database( + async fn list_collections( &self, - name: &str, + database_name: &str, ) -> Result, Error> { let mut builder: QueryBuilder<_> = QueryBuilder::new($repo::QUERIES.latest_table_versions); @@ -87,7 +87,7 @@ impl Repository for $repo { LEFT JOIN desired_table_versions ON "table".id = desired_table_versions.table_id LEFT JOIN table_column ON table_column.table_version_id = desired_table_versions.id WHERE database.name = "#); - builder.push_bind(name); + builder.push_bind(database_name); builder.push(r#" ORDER BY collection_name, table_name, table_version_id, column_name diff --git a/src/repository/interface.rs b/src/repository/interface.rs index 35984d3a..d6f02bf3 100644 --- a/src/repository/interface.rs +++ b/src/repository/interface.rs @@ -113,9 +113,9 @@ pub type Result = std::result::Result; pub trait Repository: Send + Sync + Debug { async fn setup(&self); - async fn get_all_columns_in_database( + async fn list_collections( &self, - name: &str, + database_name: &str, ) -> Result, Error>; async fn get_database(&self, name: &str) -> Result; @@ -292,7 +292,7 @@ pub mod tests { async fn test_get_tables_empty(repository: Arc) { assert_eq!( repository - .get_all_columns_in_database(TEST_DB) + .list_collections(TEST_DB) .await .expect("error getting collections"), Vec::::new() @@ -349,7 +349,7 @@ pub mod tests { // Test loading all columns let all_columns = repository - .get_all_columns_in_database(TEST_DB) + .list_collections(TEST_DB) .await .expect("Error getting all columns"); @@ -371,7 +371,7 @@ pub mod tests { // Test all columns again: we should have the schema for the latest table version let all_columns = repository - .get_all_columns_in_database(TEST_DB) + .list_collections(TEST_DB) .await .expect("Error getting all columns"); @@ -498,7 +498,7 @@ pub mod tests { .unwrap(); let all_columns = repository - .get_all_columns_in_database(TEST_DB) + .list_collections(TEST_DB) .await .expect("Error getting all columns"); @@ -523,7 +523,7 @@ pub mod tests { .unwrap(); let mut all_columns = repository - .get_all_columns_in_database(TEST_DB) + .list_collections(TEST_DB) .await .expect("Error getting all columns"); all_columns.sort_by_key(|c| c.collection_name.clone()); From c1f667e761e2d07b67ab602688b9e71ee5894712 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 20 Dec 2023 11:39:01 +0100 Subject: [PATCH 6/7] Simplify scope_to_databse function --- src/context/mod.rs | 11 ++++------- src/frontend/http.rs | 26 +++++++------------------- tests/http/upload.rs | 5 +---- 3 files changed, 12 insertions(+), 30 deletions(-) diff --git a/src/context/mod.rs b/src/context/mod.rs index 99f1ff33..b9fa2815 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -30,7 +30,7 @@ pub struct SeafowlContext { impl SeafowlContext { // Create a new `SeafowlContext` with a new inner context scoped to a different default DB - pub async fn scope_to_database(&self, name: String) -> Result> { + pub fn scope_to_database(&self, name: String) -> Arc { // Swap the default catalog in the new internal context's session config let session_config = self .inner() @@ -40,13 +40,13 @@ impl SeafowlContext { let state = build_state_with_table_factories(session_config, self.inner().runtime_env()); - Ok(Arc::from(SeafowlContext { + Arc::from(SeafowlContext { inner: SessionContext::new_with_state(state), metastore: self.metastore.clone(), internal_object_store: self.internal_object_store.clone(), database: name, max_partition_size: self.max_partition_size, - })) + }) } pub fn inner(&self) -> &SessionContext { @@ -221,10 +221,7 @@ pub mod test_utils { // place on another node context.metastore.catalogs.create("testdb").await.unwrap(); - let context = context - .scope_to_database("testdb".to_string()) - .await - .expect("'testdb' should exist"); + let context = context.scope_to_database("testdb".to_string()); // Create new non-default collection context.plan_query("CREATE SCHEMA testcol").await.unwrap(); diff --git a/src/frontend/http.rs b/src/frontend/http.rs index c55e4b21..65939903 100644 --- a/src/frontend/http.rs +++ b/src/frontend/http.rs @@ -160,7 +160,7 @@ pub async fn uncached_read_write_query( // If a specific DB name was used as a parameter in the route, scope the context to it, // effectively making it the default DB for the duration of the session. if database_name != context.database { - context = context.scope_to_database(database_name).await?; + context = context.scope_to_database(database_name); } let statements = context.parse_query(&query).await?; @@ -325,7 +325,7 @@ pub async fn cached_read_query( // If a specific DB name was used as a parameter in the route, scope the context to it, // effectively making it the default DB for the duration of the session. if database_name != context.database { - context = context.scope_to_database(database_name).await?; + context = context.scope_to_database(database_name); } // Plan the query @@ -383,7 +383,7 @@ pub async fn upload( }; if database_name != context.database { - context = context.scope_to_database(database_name.clone()).await?; + context = context.scope_to_database(database_name.clone()); } let mut has_header = true; @@ -661,10 +661,7 @@ pub mod tests { .await .unwrap(); - context = context - .scope_to_database(db_name.to_string()) - .await - .unwrap(); + context = context.scope_to_database(db_name.to_string()); } context @@ -679,10 +676,7 @@ pub mod tests { if new_db.is_some() { // Re-scope to the original DB - return context - .scope_to_database(DEFAULT_DB.to_string()) - .await - .unwrap(); + return context.scope_to_database(DEFAULT_DB.to_string()); } context @@ -694,10 +688,7 @@ pub mod tests { let mut context = in_memory_context_with_single_table(new_db).await; if let Some(db_name) = new_db { - context = context - .scope_to_database(db_name.to_string()) - .await - .unwrap(); + context = context.scope_to_database(db_name.to_string()); } context @@ -707,10 +698,7 @@ pub mod tests { if new_db.is_some() { // Re-scope to the original DB - return context - .scope_to_database(DEFAULT_DB.to_string()) - .await - .unwrap(); + return context.scope_to_database(DEFAULT_DB.to_string()); } context diff --git a/tests/http/upload.rs b/tests/http/upload.rs index 1ab099e5..d2616a88 100644 --- a/tests/http/upload.rs +++ b/tests/http/upload.rs @@ -113,10 +113,7 @@ async fn test_upload_base( // Verify the newly created table contents if let Some(db_name) = db_prefix { - context = context - .scope_to_database(db_name.to_string()) - .await - .unwrap(); + context = context.scope_to_database(db_name.to_string()); } let plan = context .plan_query(format!("SELECT * FROM test_upload.{table_name}").as_str()) From fb31b764b7ee2d19d3f6296bf62cbc7181436d0a Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 20 Dec 2023 13:38:17 +0100 Subject: [PATCH 7/7] Use map_err to simplify some get catalog calls --- src/catalog.rs | 67 ++++++++++++++++++++++++++------------------------ 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/src/catalog.rs b/src/catalog.rs index 3d90e4f1..56b4a243 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -441,15 +441,17 @@ impl CatalogStore for RepositoryStore { } async fn get(&self, name: &str) -> Result { - match self.repository.get_database(name).await { - Ok(database) => Ok(database), - Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => { - Err(Error::CatalogDoesNotExist { - name: name.to_string(), - }) - } - Err(e) => Err(e.into()), - } + self.repository + .get_database(name) + .await + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + Error::CatalogDoesNotExist { + name: name.to_string(), + } + } + e => e.into(), + }) } async fn delete(&self, name: &str) -> Result<()> { @@ -511,19 +513,17 @@ impl SchemaStore for RepositoryStore { return Err(Error::UsedStagingSchema); } - match self - .repository + self.repository .get_collection(catalog_name, schema_name) .await - { - Ok(schema) => Ok(schema), - Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => { - Err(Error::SchemaDoesNotExist { - name: schema_name.to_string(), - }) - } - Err(e) => Err(e.into()), - } + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + Error::SchemaDoesNotExist { + name: schema_name.to_string(), + } + } + e => e.into(), + }) } async fn delete(&self, catalog_name: &str, schema_name: &str) -> Result<()> { @@ -577,19 +577,17 @@ impl TableStore for RepositoryStore { schema_name: &str, table_name: &str, ) -> Result { - match self - .repository + self.repository .get_table(catalog_name, schema_name, table_name) .await - { - Ok(table) => Ok(table), - Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => { - Err(Error::TableDoesNotExist { - name: table_name.to_string(), - }) - } - Err(e) => Err(e.into()), - } + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + Error::TableDoesNotExist { + name: table_name.to_string(), + } + } + e => e.into(), + }) } async fn create_new_version( @@ -635,10 +633,15 @@ impl TableStore for RepositoryStore { old_catalog_name: &str, old_schema_name: &str, old_table_name: &str, - _new_catalog_name: &str, // For now we don't support moving across catalogs + new_catalog_name: &str, new_schema_name: &str, new_table_name: &str, ) -> Result<()> { + assert_eq!( + old_catalog_name, new_catalog_name, + "Moving across catalogs not yet supported" + ); + let table = TableStore::get(self, old_catalog_name, old_schema_name, old_table_name) .await?;