Skip to content

Commit

Permalink
Some polishing
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Dec 19, 2023
1 parent 0e825b8 commit a4f52b7
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 86 deletions.
19 changes: 4 additions & 15 deletions src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use datafusion::datasource::TableProvider;
use datafusion::error::DataFusionError;
use deltalake::DeltaTable;
use itertools::Itertools;
#[cfg(test)]
use mockall::automock;
use parking_lot::RwLock;
use uuid::Uuid;

Expand Down Expand Up @@ -143,28 +141,28 @@ impl From<Error> for DataFusionError {
}
}

// This is the main entrypoint to all individual catalogs for various objects types.
// The intention is to make it extensible and de-coupled from the underlying metastore
// persistence mechanism (such as the presently used `Repository`).
#[derive(Clone)]
pub struct Metastore {
pub catalogs: Arc<dyn CatalogStore>,
pub schemas: Arc<dyn SchemaStore>,
pub tables: Arc<dyn TableStore>,
pub functions: Arc<dyn FunctionStore>,
}

#[cfg_attr(test, automock)]
#[async_trait]
pub trait CatalogStore: Sync + Send {
async fn create(&self, catalog_name: &str) -> Result<DatabaseId>;

async fn load_database(&self, name: &str) -> Result<SeafowlDatabase>;

async fn list(&self) -> Result<Vec<DatabaseRecord>, Error>;

async fn get(&self, name: &str) -> Result<DatabaseRecord, Error>;

async fn delete(&self, name: &str) -> Result<()>;
}

#[cfg_attr(test, automock)]
#[async_trait]
pub trait SchemaStore: Sync + Send {
async fn create(&self, catalog_name: &str, schema_name: &str)
Expand All @@ -179,7 +177,6 @@ pub trait SchemaStore: Sync + Send {
async fn delete(&self, catalog_name: &str, schema_name: &str) -> Result<()>;
}

#[cfg_attr(test, automock)]
#[async_trait]
pub trait TableStore: Sync + Send {
async fn create(
Expand Down Expand Up @@ -248,7 +245,6 @@ pub trait TableStore: Sync + Send {
async fn delete_dropped_table(&self, uuid: Uuid) -> Result<()>;
}

#[cfg_attr(test, automock)]
#[async_trait]
pub trait FunctionStore: Sync + Send {
async fn create(
Expand Down Expand Up @@ -394,13 +390,6 @@ impl CatalogStore for DefaultCatalog {
})
}

async fn list(&self) -> Result<Vec<DatabaseRecord>, Error> {
match self.repository.list_databases().await {
Ok(databases) => Ok(databases),
Err(e) => Err(Self::to_sqlx_error(e)),
}
}

async fn get(&self, name: &str) -> Result<DatabaseRecord> {
match self.repository.get_database(name).await {
Ok(database) => Ok(database),
Expand Down
11 changes: 0 additions & 11 deletions src/config/context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashSet;
use std::sync::Arc;

use crate::{
Expand Down Expand Up @@ -26,7 +25,6 @@ use datafusion_remote_tables::factory::RemoteTableFactory;
#[cfg(feature = "object-store-s3")]
use object_store::aws::AmazonS3Builder;
use object_store::gcp::GoogleCloudStorageBuilder;
use parking_lot::lock_api::RwLock;

use super::schema::{self, GCS, MEBIBYTES, MEMORY_FRACTION, S3};

Expand Down Expand Up @@ -199,14 +197,6 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result<SeafowlContext
metastore.schemas.create(DEFAULT_DB, DEFAULT_SCHEMA).await?;
}

let all_databases: HashSet<String> = metastore
.catalogs
.list()
.await?
.iter()
.map(|db| db.name.clone())
.collect();

// Convergence doesn't support connecting to different DB names. We are supposed
// to do one context per query (as we need to load the schema before executing every
// query) and per database (since the context is supposed to be limited to the database
Expand All @@ -218,7 +208,6 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result<SeafowlContext
metastore: Arc::new(metastore),
internal_object_store,
database: DEFAULT_DB.to_string(),
all_databases: Arc::from(RwLock::new(all_databases)),
max_partition_size: cfg.misc.max_partition_size,
})
}
Expand Down
20 changes: 1 addition & 19 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ use datafusion::{error::DataFusionError, prelude::SessionContext, sql::TableRefe
use datafusion_common::OwnedTableReference;
use deltalake::DeltaTable;
use object_store::path::Path;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::Arc;
use uuid::Uuid;

Expand All @@ -27,28 +25,13 @@ pub struct SeafowlContext {
pub metastore: Arc<Metastore>,
pub internal_object_store: Arc<InternalObjectStore>,
pub database: String,
pub all_databases: Arc<RwLock<HashSet<String>>>,
pub max_partition_size: u32,
}

impl SeafowlContext {
// Create a new `SeafowlContext` with a new inner context scoped to a different default DB
pub async fn scope_to_database(&self, name: String) -> Result<Arc<SeafowlContext>> {
// TODO: do we need this? The only goal here is to keep the list of databases in memory and
// fail early if one is missing, but we can defer that to resolution time.
let maybe_database = self.all_databases.read().get(&name).cloned();
let all_databases = self.all_databases.clone();
if maybe_database.is_none() {
// Perhaps the db was created on another node; try to reload from catalog
let _ = self.metastore.catalogs.get(&name).await.map_err(|_| {
DataFusionError::Plan(format!(
"Unknown database {name}; try creating one with CREATE DATABASE first"
))
})?;
all_databases.write().insert(name.clone());
};

// Swap the default database in the new internal context's session config
// Swap the default catalog in the new internal context's session config
let session_config = self
.inner()
.copied_config()
Expand All @@ -62,7 +45,6 @@ impl SeafowlContext {
metastore: self.metastore.clone(),
internal_object_store: self.internal_object_store.clone(),
database: name,
all_databases,
max_partition_size: self.max_partition_size,
}))
}
Expand Down
3 changes: 0 additions & 3 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,6 @@ impl SeafowlContext {
.create(catalog_name, DEFAULT_SCHEMA)
.await?;

// Update the shared in-memory map of DB names -> ids
self.all_databases.write().insert(catalog_name.clone());

Ok(make_dummy_exec())
}
LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ pub mod tests {
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
assert_eq!(
resp.body(),
"Error during planning: Unknown database missing_db; try creating one with CREATE DATABASE first"
"Error during planning: Database \"missing_db\" doesn't exist"
);
}

Expand Down
19 changes: 0 additions & 19 deletions src/repository/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,6 @@ impl Repository for $repo {
.expect("error running migrations");
}

async fn get_collections_in_database(
&self,
database_id: DatabaseId,
) -> Result<Vec<String>, Error> {
let names = sqlx::query("SELECT name FROM collection WHERE database_id = $1")
.bind(database_id)
.fetch(&self.executor)
.map_ok(|row| row.get("name"))
.try_collect()
.await.map_err($repo::interpret_error)?;
Ok(names)
}
async fn get_all_columns_in_database(
&self,
name: &str,
Expand Down Expand Up @@ -181,13 +169,6 @@ impl Repository for $repo {
Ok(table)
}

async fn list_databases(&self) -> Result<Vec<DatabaseRecord>> {
sqlx::query_as(r#"SELECT name, id FROM database"#)
.fetch_all(&self.executor)
.await.map_err($repo::interpret_error)

}

async fn create_collection(
&self,
database_id: DatabaseId,
Expand Down
22 changes: 4 additions & 18 deletions src/repository/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,6 @@ pub struct TableRecord {
pub name: String,
}

#[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)]
pub struct TableVersion {
pub id: TableVersionId,
pub table_id: TableId,
pub creation_time: Timestamp,
}

#[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)]
pub struct AllDatabaseColumnsResult {
pub database_name: String,
Expand Down Expand Up @@ -120,18 +113,11 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub trait Repository: Send + Sync + Debug {
async fn setup(&self);

async fn get_collections_in_database(
&self,
database_id: DatabaseId,
) -> Result<Vec<String>, Error>;

async fn get_all_columns_in_database(
&self,
name: &str,
) -> Result<Vec<AllDatabaseColumnsResult>, Error>;

async fn list_databases(&self) -> Result<Vec<DatabaseRecord>, Error>;

async fn get_database(&self, name: &str) -> Result<DatabaseRecord, Error>;

async fn get_collection(
Expand Down Expand Up @@ -289,7 +275,7 @@ pub mod tests {
}

pub async fn run_generic_repository_tests(repository: Arc<dyn Repository>) {
test_get_collections_empty(repository.clone()).await;
test_get_tables_empty(repository.clone()).await;
let (database_id, table_id, table_version_id) =
test_create_database_collection_table(repository.clone()).await;
test_create_functions(repository.clone(), database_id).await;
Expand All @@ -303,13 +289,13 @@ pub mod tests {
test_error_propagation(repository, table_id).await;
}

async fn test_get_collections_empty(repository: Arc<dyn Repository>) {
async fn test_get_tables_empty(repository: Arc<dyn Repository>) {
assert_eq!(
repository
.get_collections_in_database(0)
.get_all_columns_in_database(TEST_DB)
.await
.expect("error getting collections"),
Vec::<String>::new()
Vec::<AllDatabaseColumnsResult>::new()
);
}

Expand Down

0 comments on commit a4f52b7

Please sign in to comment.