diff --git a/Cargo.lock b/Cargo.lock index 003e68604..b3dbdc32b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2873,6 +2873,7 @@ dependencies = [ "snafu", "stream-cancel", "strum", + "strum_macros", "tempfile", "time", "tokio", diff --git a/operators/src/error.rs b/operators/src/error.rs index e263c6f89..f5060b2d7 100644 --- a/operators/src/error.rs +++ b/operators/src/error.rs @@ -1,6 +1,6 @@ use crate::util::statistics::StatisticsError; use bb8_postgres::bb8; -use geoengine_datatypes::dataset::{DataId, NamedData}; +use geoengine_datatypes::dataset::{DataId, DatasetId, NamedData}; use geoengine_datatypes::error::ErrorSource; use geoengine_datatypes::primitives::{FeatureDataType, TimeInterval}; use geoengine_datatypes::raster::RasterDataType; @@ -472,6 +472,11 @@ pub enum Error { Bb8Postgres { source: bb8::RunError, }, + + #[snafu(display("Dataset {} cannot be accessed, because it was deleted", id))] + DatasetDeleted { + id: DatasetId, + }, } impl From for Error { diff --git a/services/Cargo.toml b/services/Cargo.toml index 5322ed49e..87c880d7e 100644 --- a/services/Cargo.toml +++ b/services/Cargo.toml @@ -118,6 +118,7 @@ walkdir = "2.4" zip = "2.1" assert-json-diff = "2.0.2" sha2 = "0.10.8" +strum_macros = "0.26.1" [target.'cfg(target_os = "linux")'.dependencies] nix = { version = "0.29", features = ["socket"] } diff --git a/services/src/api/handlers/datasets.rs b/services/src/api/handlers/datasets.rs index 35b115b4f..c96deed86 100755 --- a/services/src/api/handlers/datasets.rs +++ b/services/src/api/handlers/datasets.rs @@ -1,3 +1,4 @@ +use crate::datasets::storage::{check_reserved_tags, ReservedTags}; use crate::{ api::model::{ operators::{GdalLoadingInfoTemporalSlice, GdalMetaDataList}, @@ -635,7 +636,8 @@ pub async fn create_upload_dataset( let db = app_ctx.session_context(session).db(); let upload = db.load_upload(upload_id).await.context(UploadNotFound)?; - add_tag(&mut definition.properties, "upload".to_owned()); + check_reserved_tags(&definition.properties.tags); + add_tag(&mut definition.properties, ReservedTags::Upload.to_string()); adjust_meta_data_path(&mut definition.meta_data, &upload) .context(CannotResolveUploadFilePath)?; diff --git a/services/src/api/handlers/upload.rs b/services/src/api/handlers/upload.rs index 3d4e6a51b..60d6ad5a4 100644 --- a/services/src/api/handlers/upload.rs +++ b/services/src/api/handlers/upload.rs @@ -1,19 +1,16 @@ use crate::api::model::responses::IdResponse; use crate::contexts::{ApplicationContext, SessionContext}; -use crate::datasets::upload::{FileId, FileUpload, Upload, UploadDb, UploadId, UploadRootPath}; +use crate::datasets::upload::{create_upload, Upload, UploadDb, UploadId, UploadRootPath}; +use crate::error::Error; use crate::error::Result; -use crate::error::{self, Error}; use crate::util::path_with_base_path; use actix_multipart::Multipart; use actix_web::{web, FromRequest, Responder}; -use futures::StreamExt; use gdal::vector::LayerAccess; -use geoengine_datatypes::util::Identifier; use geoengine_operators::util::gdal::gdal_open_dataset; use serde::{Deserialize, Serialize}; -use snafu::ResultExt; use std::path::Path; -use tokio::{fs, io::AsyncWriteExt}; +use tokio::fs; use utoipa::{ToResponse, ToSchema}; pub(crate) fn init_upload_routes(cfg: &mut web::ServiceConfig) @@ -70,43 +67,9 @@ impl<'a> ToSchema<'a> for FileUploadRequest { async fn upload_handler( session: C::Session, app_ctx: web::Data, - mut body: Multipart, + body: Multipart, ) -> Result>> { - let upload_id = UploadId::new(); - - let root = upload_id.root_path()?; - - fs::create_dir_all(&root).await.context(error::Io)?; - - let mut files: Vec = vec![]; - while let Some(item) = body.next().await { - let mut field = item?; - let file_name = field - .content_disposition() - .ok_or(error::Error::UploadFieldMissingFileName)? - .get_filename() - .ok_or(error::Error::UploadFieldMissingFileName)? - .to_owned(); - - let file_id = FileId::new(); - let mut file = fs::File::create(root.join(&file_name)) - .await - .context(error::Io)?; - - let mut byte_size = 0_u64; - while let Some(chunk) = field.next().await { - let bytes = chunk?; - file.write_all(&bytes).await.context(error::Io)?; - byte_size += bytes.len() as u64; - } - file.flush().await.context(error::Io)?; - - files.push(FileUpload { - id: file_id, - name: file_name, - byte_size, - }); - } + let (upload_id, files) = create_upload(body).await?; app_ctx .session_context(session) diff --git a/services/src/contexts/migrations/migration_0012_fair_upload_deletion.rs b/services/src/contexts/migrations/migration_0012_fair_upload_deletion.rs new file mode 100644 index 000000000..7232cf07d --- /dev/null +++ b/services/src/contexts/migrations/migration_0012_fair_upload_deletion.rs @@ -0,0 +1,24 @@ +use async_trait::async_trait; +use tokio_postgres::Transaction; + +use crate::error::Result; + +use super::database_migration::{DatabaseVersion, Migration}; + +/// This migration adds new delete options for uploaded user datasets +pub struct Migration0012FairUploadDeletion; + +#[async_trait] +impl Migration for Migration0012FairUploadDeletion { + fn prev_version(&self) -> Option { + Some("0011_remove_xgb".into()) + } + + fn version(&self) -> DatabaseVersion { + "0012_fair_upload_deletion".into() + } + + async fn migrate(&self, _tx: &Transaction<'_>) -> Result<()> { + Ok(()) + } +} diff --git a/services/src/contexts/migrations/mod.rs b/services/src/contexts/migrations/mod.rs index 3959bc34a..98212b2cb 100644 --- a/services/src/contexts/migrations/mod.rs +++ b/services/src/contexts/migrations/mod.rs @@ -11,6 +11,7 @@ pub use crate::contexts::migrations::{ migration_0009_oidc_tokens::Migration0009OidcTokens, migration_0010_s2_stack_time_buffers::Migration0010S2StacTimeBuffers, migration_0011_remove_xgb::Migration0011RemoveXgb, + migration_0012_fair_upload_deletion::Migration0012FairUploadDeletion, }; pub use database_migration::{ initialize_database, migrate_database, DatabaseVersion, Migration, MigrationResult, @@ -30,6 +31,7 @@ pub mod migration_0008_band_names; pub mod migration_0009_oidc_tokens; pub mod migration_0010_s2_stack_time_buffers; pub mod migration_0011_remove_xgb; +pub mod migration_0012_fair_upload_deletion; #[cfg(test)] mod schema_info; @@ -55,6 +57,7 @@ pub fn all_migrations() -> Vec> { Box::new(Migration0009OidcTokens), Box::new(Migration0010S2StacTimeBuffers), Box::new(Migration0011RemoveXgb), + Box::new(Migration0012FairUploadDeletion), ] } diff --git a/services/src/contexts/mod.rs b/services/src/contexts/mod.rs index 7c50dbf57..096811168 100644 --- a/services/src/contexts/mod.rs +++ b/services/src/contexts/mod.rs @@ -30,7 +30,7 @@ pub use migrations::{ Migration0004DatasetListingProviderPrio, Migration0005GbifColumnSelection, Migration0006EbvProvider, Migration0007OwnerRole, Migration0008BandNames, Migration0009OidcTokens, Migration0010S2StacTimeBuffers, Migration0011RemoveXgb, - MigrationResult, + Migration0012FairUploadDeletion, MigrationResult, }; pub use postgres::{PostgresContext, PostgresDb, PostgresSessionContext}; pub use session::{MockableSession, Session, SessionId, SimpleSession}; diff --git a/services/src/datasets/storage.rs b/services/src/datasets/storage.rs index f391cf680..f58acc1d1 100755 --- a/services/src/datasets/storage.rs +++ b/services/src/datasets/storage.rs @@ -19,6 +19,9 @@ use geoengine_operators::{mock::MockDatasetDataSourceLoadingInfo, source::GdalMe use serde::{Deserialize, Serialize}; use snafu::ResultExt; use std::fmt::Debug; +use std::str::FromStr; +use strum_macros; +use strum_macros::{Display, EnumString}; use utoipa::ToSchema; use uuid::Uuid; use validator::{Validate, ValidationError}; @@ -95,6 +98,13 @@ pub struct AutoCreateDataset { pub tags: Option>, } +#[derive(Display, EnumString)] +pub enum ReservedTags { + #[strum(serialize = "upload")] + Upload, + Deleted, +} + fn validate_main_file(main_file: &str) -> Result<(), ValidationError> { if main_file.is_empty() || main_file.contains('/') || main_file.contains("..") { return Err(ValidationError::new("Invalid upload file name")); @@ -114,6 +124,20 @@ pub fn validate_tags(tags: &Vec) -> Result<(), ValidationError> { Ok(()) } +pub fn check_reserved_tags(tags: &Option>) { + if let Some(tags) = tags { + for tag in tags { + let conversion = ReservedTags::from_str(tag.as_str()); + if let Ok(reserved) = conversion { + log::warn!( + "Adding a new dataset with a reserved tag: {}", + reserved.to_string() + ); + } + } + } +} + #[derive(Deserialize, Serialize, Debug, Clone, ToSchema)] #[serde(rename_all = "camelCase")] pub struct SuggestMetaData { diff --git a/services/src/datasets/upload.rs b/services/src/datasets/upload.rs index cc7b0dc2b..45dbbf55a 100644 --- a/services/src/datasets/upload.rs +++ b/services/src/datasets/upload.rs @@ -1,3 +1,4 @@ +use actix_multipart::Multipart; use std::fmt::{Display, Formatter}; use std::path::{Path, PathBuf}; @@ -9,7 +10,12 @@ use crate::{ util::config::{self, get_config_element}, }; use async_trait::async_trait; +use futures_util::StreamExt; +use geoengine_datatypes::util::Identifier; use serde::{Deserialize, Deserializer, Serialize}; +use snafu::ResultExt; +use tokio::fs; +use tokio::io::AsyncWriteExt; use utoipa::ToSchema; identifier!(UploadId); @@ -118,6 +124,53 @@ pub struct UploadListing { pub num_files: usize, } +pub async fn create_upload(mut body: Multipart) -> Result<(UploadId, Vec)> { + let upload_id = UploadId::new(); + + let root = upload_id.root_path()?; + + fs::create_dir_all(&root).await.context(error::Io)?; + + let mut files: Vec = vec![]; + while let Some(item) = body.next().await { + let mut field = item?; + let file_name = field + .content_disposition() + .ok_or(error::Error::UploadFieldMissingFileName)? + .get_filename() + .ok_or(error::Error::UploadFieldMissingFileName)? + .to_owned(); + + let file_id = FileId::new(); + let mut file = fs::File::create(root.join(&file_name)) + .await + .context(error::Io)?; + + let mut byte_size = 0_u64; + while let Some(chunk) = field.next().await { + let bytes = chunk?; + file.write_all(&bytes).await.context(error::Io)?; + byte_size += bytes.len() as u64; + } + file.flush().await.context(error::Io)?; + + files.push(FileUpload { + id: file_id, + name: file_name, + byte_size, + }); + } + + Ok((upload_id, files)) +} + +pub async fn delete_upload(upload_id: UploadId) -> Result<()> { + let root = upload_id.root_path()?; + log::debug!("Deleting {upload_id}"); + fs::remove_dir_all(&root).await.context(error::Io)?; + Ok(()) +} + #[async_trait] pub trait UploadDb { async fn load_upload(&self, upload: UploadId) -> Result; diff --git a/services/src/error.rs b/services/src/error.rs index c259d92f5..ac9e0bd2d 100644 --- a/services/src/error.rs +++ b/services/src/error.rs @@ -488,6 +488,21 @@ pub enum Error { UnknownVolumeName { volume_name: String, }, + + #[snafu(display("Trying to set an expiration timestamp for Dataset {dataset} in the past"))] + ExpirationTimestampInPast { + dataset: DatasetId, + }, + #[snafu(display("Illegal expiration update for Dataset {dataset}: {reason}"))] + IllegalExpirationUpdate { + dataset: DatasetId, + reason: String, + }, + #[snafu(display("Illegal status for Dataset {dataset}: {status}"))] + IllegalDatasetStatus { + dataset: DatasetId, + status: String, + }, } impl actix_web::error::ResponseError for Error { diff --git a/services/src/pro/api/apidoc.rs b/services/src/pro/api/apidoc.rs index 3418651d3..a79db0220 100644 --- a/services/src/pro/api/apidoc.rs +++ b/services/src/pro/api/apidoc.rs @@ -53,6 +53,9 @@ use crate::layers::listing::{ }; use crate::pro; use crate::pro::api::handlers::users::{Quota, UpdateQuota}; +use crate::pro::datasets::{ + DatasetAccessStatusResponse, DatasetDeletionType, Expiration, ExpirationChange, +}; use crate::pro::permissions::{ Permission, PermissionListing, ResourceId, Role, RoleDescription, RoleId, }; @@ -157,7 +160,10 @@ use utoipa::{Modify, OpenApi}; handlers::upload::upload_handler, pro::api::handlers::permissions::add_permission_handler, pro::api::handlers::permissions::remove_permission_handler, - pro::api::handlers::permissions::get_resource_permissions_handler + pro::api::handlers::permissions::get_resource_permissions_handler, + pro::api::handlers::datasets::set_dataset_expiration, + pro::api::handlers::datasets::get_dataset_status, + pro::api::handlers::datasets::gc_expired_datasets ), components( responses( @@ -371,6 +377,10 @@ use utoipa::{Modify, OpenApi}; Volume, VolumeName, DataPath, + Expiration, + ExpirationChange, + DatasetDeletionType, + DatasetAccessStatusResponse, PlotOutputFormat, WrappedPlotOutput, diff --git a/services/src/pro/api/handlers/datasets.rs b/services/src/pro/api/handlers/datasets.rs index 98e6b7a14..4fd40ad09 100644 --- a/services/src/pro/api/handlers/datasets.rs +++ b/services/src/pro/api/handlers/datasets.rs @@ -1,11 +1,20 @@ +use crate::api::handlers::datasets::add_tag; +use crate::datasets::listing::DatasetProvider; +use crate::datasets::storage::{check_reserved_tags, ReservedTags}; +use crate::datasets::upload::{UploadDb, UploadId}; +use crate::datasets::DatasetName; +use crate::pro::datasets::{ + ChangeDatasetExpiration, DatasetAccessStatusResponse, ExpirationChange, + UploadedUserDatasetStore, +}; use crate::{ api::{ handlers::datasets::{ - adjust_meta_data_path, auto_create_dataset_handler, create_upload_dataset, - delete_dataset_handler, get_dataset_handler, get_loading_info_handler, - list_datasets_handler, list_volumes_handler, suggest_meta_data_handler, - update_dataset_handler, update_dataset_provenance_handler, - update_dataset_symbology_handler, update_loading_info_handler, + adjust_meta_data_path, auto_create_dataset_handler, delete_dataset_handler, + get_dataset_handler, get_loading_info_handler, list_datasets_handler, + list_volumes_handler, suggest_meta_data_handler, update_dataset_handler, + update_dataset_provenance_handler, update_dataset_symbology_handler, + update_loading_info_handler, }, model::{ responses::datasets::{errors::*, DatasetNameResponse}, @@ -17,6 +26,7 @@ use crate::{ storage::DatasetStore, upload::{Volume, VolumeName}, }, + error, error::Result, pro::{ contexts::{ProApplicationContext, ProGeoEngineDb}, @@ -24,7 +34,7 @@ use crate::{ }, util::config::{get_config_element, Data}, }; -use actix_web::{web, FromRequest}; +use actix_web::{web, FromRequest, HttpResponse, HttpResponseBuilder, Responder}; use geoengine_datatypes::error::BoxedResultExt; use snafu::ResultExt; @@ -41,6 +51,7 @@ where ) .service(web::resource("/auto").route(web::post().to(auto_create_dataset_handler::))) .service(web::resource("/volumes").route(web::get().to(list_volumes_handler::))) + .service(web::resource("/gc").route(web::post().to(gc_expired_datasets::))) .service( web::resource("/{dataset}/loadingInfo") .route(web::get().to(get_loading_info_handler::)) @@ -54,6 +65,13 @@ where web::resource("/{dataset}/provenance") .route(web::put().to(update_dataset_provenance_handler::)), ) + .service( + web::resource("/{dataset}/expiration") + .route(web::put().to(set_dataset_expiration::)), + ) + .service( + web::resource("/{dataset}/status").route(web::get().to(get_dataset_status::)), + ) .service( web::resource("/{dataset}") .route(web::get().to(get_dataset_handler::)) @@ -91,13 +109,13 @@ where let create = create.into_inner(); match create { CreateDataset { - data_path: DataPath::Volume(upload), + data_path: DataPath::Volume(volume), definition, - } => create_system_dataset(session, app_ctx, upload, definition).await, + } => create_system_dataset(session, app_ctx, volume, definition).await, CreateDataset { - data_path: DataPath::Upload(volume), + data_path: DataPath::Upload(upload), definition, - } => create_upload_dataset(session, app_ctx, volume, definition).await, + } => create_upload_dataset(session, app_ctx, upload, definition).await, } } @@ -148,13 +166,184 @@ where Ok(web::Json(dataset.name.into())) } +pub async fn create_upload_dataset( + session: C::Session, + app_ctx: web::Data, + upload_id: UploadId, + mut definition: DatasetDefinition, +) -> Result, CreateDatasetError> +where + <::SessionContext as SessionContext>::GeoEngineDB: ProGeoEngineDb, +{ + let db = app_ctx.session_context(session).db(); + let upload = db.load_upload(upload_id).await.context(UploadNotFound)?; + + check_reserved_tags(&definition.properties.tags); + add_tag(&mut definition.properties, ReservedTags::Upload.to_string()); + + adjust_meta_data_path(&mut definition.meta_data, &upload) + .context(CannotResolveUploadFilePath)?; + + let result = db + .add_uploaded_dataset( + upload_id, + definition.properties.into(), + definition.meta_data.into(), + ) + .await + .context(CannotCreateDataset)?; + + Ok(web::Json(result.name.into())) +} + +/// Sets an expiration date for the dataset with the given name. +/// Will expire immediately if no timestamp is provided. +#[utoipa::path( + tag = "Datasets", + put, + path = "/dataset/{dataset}/expiration", + request_body(content = ExpirationChange, examples( + ("Delete" = (value = json!({ + "type": "setExpire", + "deletionType": "DeleteData" + }))), + ("Expire" = (value = json!({ + "type": "setExpire", + "deletionType": "DeleteRecordAndData", + "deletionTimestamp": "2024-06-28T14:52:39.655Z" + }))), + ("Undo Expire" = (value = json!({ + "type": "unsetExpire" + }))) + )), + responses( + (status = 200, description = "OK"), + (status = 400, description = "Bad request", body = ErrorResponse), + (status = 401, response = crate::api::model::responses::UnauthorizedUserResponse) + ), + params( + ("dataset" = DatasetName, description = "Dataset Name"), + ), + security( + ("session_token" = []) + ) +)] +async fn set_dataset_expiration( + session: C::Session, + app_ctx: web::Data, + dataset: web::Path, + expiration: web::Json, +) -> Result +where + <::SessionContext as SessionContext>::GeoEngineDB: ProGeoEngineDb, +{ + let db = app_ctx.session_context(session).db(); + + let dataset_name = dataset.into_inner(); + + let dataset_id = db.resolve_dataset_name_to_id(&dataset_name).await?; + + let dataset_id = dataset_id.ok_or(error::Error::UnknownDatasetName { + dataset_name: dataset_name.to_string(), + })?; + + let expire_dataset = ChangeDatasetExpiration { + dataset_id, + expiration_change: expiration.into_inner(), + }; + + db.expire_uploaded_dataset(expire_dataset).await?; + + Ok(HttpResponse::Ok()) +} + +/// Clears expired datasets. +/// Requires an admin session. +#[utoipa::path( + tag = "Datasets", + post, + path = "/dataset/gc", + responses( + (status = 200, description = "OK"), + (status = 400, description = "Bad request", body = ErrorResponse), + (status = 401, response = crate::api::model::responses::UnauthorizedUserResponse) + ), + security( + ("session_token" = []) + ) +)] +async fn gc_expired_datasets( + session: C::Session, + app_ctx: web::Data, +) -> Result +where + <::SessionContext as SessionContext>::GeoEngineDB: ProGeoEngineDb, +{ + if !session.is_admin() { + return Err(error::Error::Unauthorized { + source: Box::new(error::Error::OperationRequiresAdminPrivilige), + }); + } + + app_ctx + .session_context(session) + .db() + .clear_expired_datasets() + .await?; + + Ok(HttpResponse::Ok()) +} + +/// Returns the access status of the current user for the dataset with the given name. +#[utoipa::path( + tag = "Datasets", + get, + path = "/dataset/{dataset}/status", + responses( + (status = 200, description = "OK", body = DatasetAccessStatusResponse), + (status = 400, description = "Bad request", body = ErrorResponse), + (status = 401, response = crate::api::model::responses::UnauthorizedUserResponse) + ), + params( + ("dataset" = DatasetName, description = "Dataset Name"), + ), + security( + ("session_token" = []) + ) +)] +async fn get_dataset_status( + session: C::Session, + app_ctx: web::Data, + dataset: web::Path, +) -> Result +where + <::SessionContext as SessionContext>::GeoEngineDB: ProGeoEngineDb, +{ + let db = app_ctx.session_context(session).db(); + + let dataset_name = dataset.into_inner(); + + let dataset_id = db.resolve_dataset_name_to_id(&dataset_name).await?; + + let dataset_id = dataset_id.ok_or(error::Error::UnknownDatasetName { + dataset_name: dataset_name.to_string(), + })?; + + let status: DatasetAccessStatusResponse = + db.get_dataset_access_status(&dataset_id).await?.into(); + + Ok(web::Json(status)) +} + #[cfg(test)] mod tests { use super::*; use crate::api::model::responses::IdResponse; use crate::datasets::DatasetName; use crate::pro::contexts::ProPostgresContext; + use crate::pro::datasets::DatasetAccessStatusResponse; use crate::pro::ge_context; + use crate::pro::util::tests::get_db_timestamp; use crate::{ api::model::services::{AddDataset, DataPath, DatasetDefinition, MetaDataDefinition}, contexts::{Session, SessionContext, SessionId}, @@ -172,7 +361,7 @@ mod tests { use actix_web_httpauth::headers::authorization::Bearer; use futures::TryStreamExt; use geoengine_datatypes::dataset::NamedData; - use geoengine_datatypes::primitives::ColumnSelection; + use geoengine_datatypes::primitives::{ColumnSelection, Duration}; use geoengine_datatypes::{ collections::{GeometryCollection, MultiPointCollection}, primitives::{BoundingBox2D, SpatialResolution, VectorQueryRectangle}, @@ -188,6 +377,7 @@ mod tests { util::gdal::create_ndvi_meta_data, }; use serde_json::json; + use std::ops::Add; use tokio_postgres::NoTls; pub async fn upload_ne_10m_ports_files( @@ -545,4 +735,81 @@ mod tests { Ok(()) } + + #[ge_context::test] + async fn it_expires_dataset_and_checks_status( + app_ctx: ProPostgresContext, + ) -> Result<()> { + let mut test_data = TestDataUploads::default(); + + let session = app_ctx.create_anonymous_session().await.unwrap(); + let session_id = session.id(); + let ctx = app_ctx.session_context(session); + + let upload_id = upload_ne_10m_ports_files(app_ctx.clone(), session_id).await?; + test_data.uploads.push(upload_id); + + let dataset_name = + construct_dataset_from_upload(app_ctx.clone(), upload_id, session_id).await; + + let db = ctx.db(); + let dataset_id = db + .resolve_dataset_name_to_id(&dataset_name) + .await + .unwrap() + .unwrap(); + + assert!(db.load_dataset(&dataset_id).await.is_ok()); + + let req = actix_web::test::TestRequest::get() + .uri(&format!("/dataset/{dataset_name}/status")) + .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string()))); + let res = send_pro_test_request(req, app_ctx.clone()).await; + assert_eq!(res.status(), 200, "response: {res:?}"); + let status: DatasetAccessStatusResponse = actix_web::test::read_body_json(res).await; + assert!(status.is_user_upload); + assert!(status.is_available); + assert!(status.expiration.is_none()); + + let current_time = get_db_timestamp(&app_ctx).await; + let future_time = current_time.add(Duration::seconds(5)); + + let expiration = + ChangeDatasetExpiration::expire_full(dataset_id, future_time).expiration_change; + + let req = actix_web::test::TestRequest::put() + .uri(&format!("/dataset/{dataset_name}/expiration")) + .set_json(expiration) + .append_header((header::CONTENT_LENGTH, 0)) + .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string()))) + .append_header((header::CONTENT_TYPE, "application/json")); + + let res = send_pro_test_request(req, app_ctx.clone()).await; + + assert_eq!(res.status(), 200, "response: {res:?}"); + + assert!(db.load_dataset(&dataset_id).await.is_ok()); + + let req = actix_web::test::TestRequest::get() + .uri(&format!("/dataset/{dataset_name}/status")) + .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string()))); + let res = send_pro_test_request(req, app_ctx.clone()).await; + assert_eq!(res.status(), 200, "response: {res:?}"); + let status: DatasetAccessStatusResponse = actix_web::test::read_body_json(res).await; + assert!(status.is_user_upload); + assert!(status.is_available); + assert!(status.expiration.is_some()); + + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + assert!(db.load_dataset(&dataset_id).await.is_err()); + + let req = actix_web::test::TestRequest::get() + .uri(&format!("/dataset/{dataset_name}/status")) + .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string()))); + let res = send_pro_test_request(req, app_ctx.clone()).await; + assert_eq!(res.status(), 400, "response: {res:?}"); + + Ok(()) + } } diff --git a/services/src/pro/contexts/migrations/current_schema.sql b/services/src/pro/contexts/migrations/current_schema.sql index 4193275f8..8ad5a3d9f 100644 --- a/services/src/pro/contexts/migrations/current_schema.sql +++ b/services/src/pro/contexts/migrations/current_schema.sql @@ -228,3 +228,42 @@ CREATE TABLE oidc_session_tokens ( refresh_token bytea, refresh_token_encryption_nonce bytea ); + +CREATE TYPE "InternalUploadedDatasetStatus" AS ENUM ( + 'Available', + 'Expires', + 'Expired', + 'UpdateExpired', + 'Deleted', + 'DeletedWithError' +); + +CREATE TYPE "DatasetDeletionType" AS ENUM ( + 'DeleteRecordAndData', + 'DeleteData' +); + +CREATE TABLE uploaded_user_datasets ( + user_id uuid, + upload_id uuid, + dataset_id uuid, + status "InternalUploadedDatasetStatus" NOT NULL, + created timestamp with time zone NOT NULL, + expiration timestamp with time zone, + deleted timestamp with time zone, + deletion_type "DatasetDeletionType", + PRIMARY KEY (user_id, dataset_id, upload_id) +); + +CREATE VIEW updatable_uploaded_user_datasets AS +SELECT + u.dataset_id, + u.user_id, + u.status, + u.deletion_type +FROM + uploaded_user_datasets AS u INNER JOIN + user_permitted_datasets AS p ON (u.user_id = p.user_id) +WHERE + u.expiration <= CURRENT_TIMESTAMP + AND (u.status = 'Expires' OR u.status = 'UpdateExpired'); diff --git a/services/src/pro/contexts/migrations/migration_0012_fair_upload_deletion.rs b/services/src/pro/contexts/migrations/migration_0012_fair_upload_deletion.rs new file mode 100644 index 000000000..4f49b4b83 --- /dev/null +++ b/services/src/pro/contexts/migrations/migration_0012_fair_upload_deletion.rs @@ -0,0 +1,56 @@ +use async_trait::async_trait; +use tokio_postgres::Transaction; + +use super::database_migration::{ProMigration, ProMigrationImpl}; +use crate::{contexts::Migration0012FairUploadDeletion, error::Result}; + +#[async_trait] +impl ProMigration for ProMigrationImpl { + async fn pro_migrate(&self, tx: &Transaction<'_>) -> Result<()> { + tx.batch_execute( + r#" + CREATE TYPE "InternalUploadedDatasetStatus" AS ENUM ( + 'Available', + 'Expires', + 'Expired', + 'UpdateExpired', + 'Deleted', + 'DeletedWithError' + ); + + CREATE TYPE "DatasetDeletionType" AS ENUM ( + 'DeleteRecordAndData', + 'DeleteData' + ); + + CREATE TABLE uploaded_user_datasets ( + user_id uuid, + upload_id uuid, + dataset_id uuid, + status "InternalUploadedDatasetStatus" NOT NULL, + created timestamp with time zone NOT NULL, + expiration timestamp with time zone, + deleted timestamp with time zone, + deletion_type "DatasetDeletionType", + PRIMARY KEY (user_id, dataset_id, upload_id) + ); + + CREATE VIEW updatable_uploaded_user_datasets AS + SELECT + u.dataset_id, + u.user_id, + u.status, + u.deletion_type + FROM + uploaded_user_datasets AS u INNER JOIN + user_permitted_datasets AS p ON (u.user_id = p.user_id) + WHERE + u.expiration <= CURRENT_TIMESTAMP + AND (u.status = 'Expires' OR u.status = 'UpdateExpired'); + "#, + ) + .await?; + + Ok(()) + } +} diff --git a/services/src/pro/contexts/migrations/mod.rs b/services/src/pro/contexts/migrations/mod.rs index bb3b23633..9cf47ef82 100644 --- a/services/src/pro/contexts/migrations/mod.rs +++ b/services/src/pro/contexts/migrations/mod.rs @@ -4,7 +4,7 @@ use crate::contexts::{ Migration0003GbifConfig, Migration0004DatasetListingProviderPrio, Migration0005GbifColumnSelection, Migration0006EbvProvider, Migration0007OwnerRole, Migration0008BandNames, Migration0009OidcTokens, Migration0010S2StacTimeBuffers, - Migration0011RemoveXgb, + Migration0011RemoveXgb, Migration0012FairUploadDeletion, }; use crate::pro::contexts::migrations::database_migration::NoProMigrationImpl; @@ -18,6 +18,7 @@ mod migration_0007_owner_role; mod migration_0009_oidc_tokens; mod migration_0010_s2_stack_time_buffers; mod migration_0011_remove_xgb; +mod migration_0012_fair_upload_deletion; /// Get all regular and pro migrations. This function wraps all regular migrations into a pro migration. pub fn pro_migrations() -> Vec> @@ -40,6 +41,7 @@ where Box::new(ProMigrationImpl::from(Migration0009OidcTokens)), Box::new(ProMigrationImpl::from(Migration0010S2StacTimeBuffers)), Box::new(ProMigrationImpl::from(Migration0011RemoveXgb)), + Box::new(ProMigrationImpl::from(Migration0012FairUploadDeletion)), ] } diff --git a/services/src/pro/contexts/mod.rs b/services/src/pro/contexts/mod.rs index a42f22ac2..ba5d98029 100644 --- a/services/src/pro/contexts/mod.rs +++ b/services/src/pro/contexts/mod.rs @@ -36,6 +36,7 @@ use super::users::{RoleDb, UserAuth, UserSession}; use super::util::config::{Cache, QuotaTrackingMode}; use crate::util::config::get_config_element; +use crate::pro::datasets::UploadedUserDatasetStore; pub use postgres::ProPostgresDb; /// A pro application contexts that extends the default context. @@ -43,7 +44,10 @@ pub trait ProApplicationContext: ApplicationContext + Use fn oidc_manager(&self) -> &OidcManager; } -pub trait ProGeoEngineDb: GeoEngineDb + UserDb + PermissionDb + RoleDb {} +pub trait ProGeoEngineDb: + GeoEngineDb + UserDb + PermissionDb + RoleDb + UploadedUserDatasetStore +{ +} pub struct ExecutionContextImpl where diff --git a/services/src/pro/datasets/mod.rs b/services/src/pro/datasets/mod.rs index fc8a18794..416eb3be3 100644 --- a/services/src/pro/datasets/mod.rs +++ b/services/src/pro/datasets/mod.rs @@ -1,7 +1,13 @@ mod external; mod postgres; +mod storage; pub use external::{ GdalRetries, SentinelS2L2ACogsProviderDefinition, StacApiRetries, StacBand, StacQueryBuffer, StacZone, TypedProDataProviderDefinition, }; + +pub use storage::{ + ChangeDatasetExpiration, DatasetAccessStatus, DatasetAccessStatusResponse, DatasetDeletionType, + Expiration, ExpirationChange, UploadedUserDatasetStore, +}; diff --git a/services/src/pro/datasets/postgres.rs b/services/src/pro/datasets/postgres.rs index abc178b98..2e468acb2 100644 --- a/services/src/pro/datasets/postgres.rs +++ b/services/src/pro/datasets/postgres.rs @@ -3,19 +3,35 @@ use crate::datasets::listing::Provenance; use crate::datasets::listing::{DatasetListOptions, DatasetListing, DatasetProvider}; use crate::datasets::listing::{OrderBy, ProvenanceOutput}; use crate::datasets::postgres::resolve_dataset_name_to_id; -use crate::datasets::storage::{Dataset, DatasetDb, DatasetStore, MetaDataDefinition}; -use crate::datasets::upload::FileId; +use crate::datasets::storage::{ + Dataset, DatasetDb, DatasetStore, MetaDataDefinition, ReservedTags, +}; +use crate::datasets::upload::{delete_upload, FileId}; use crate::datasets::upload::{Upload, UploadDb, UploadId}; use crate::datasets::{AddDataset, DatasetIdAndName, DatasetName}; +use crate::error::Error::{ + ExpirationTimestampInPast, IllegalDatasetStatus, IllegalExpirationUpdate, UnknownDatasetId, +}; use crate::error::{self, Error, Result}; use crate::pro::contexts::ProPostgresDb; +use crate::pro::datasets::storage::DatasetDeletionType::{DeleteData, DeleteRecordAndData}; +use crate::pro::datasets::storage::InternalUploadedDatasetStatus::{Deleted, DeletedWithError}; +use crate::pro::datasets::storage::{ + ChangeDatasetExpiration, DatasetAccessStatus, DatasetDeletionType, DatasetType, + InternalUploadedDatasetStatus, TxUploadedUserDatasetStore, UploadedDatasetStatus, + UploadedUserDatasetStore, +}; +use crate::pro::datasets::{Expiration, ExpirationChange}; use crate::pro::permissions::postgres_permissiondb::TxPermissionDb; use crate::pro::permissions::{Permission, RoleId}; +use crate::pro::users::{UserId, UserSession}; use crate::projects::Symbology; use crate::util::postgres::PostgresErrorExt; use async_trait::async_trait; +use bb8_postgres::bb8::PooledConnection; use bb8_postgres::tokio_postgres::tls::{MakeTlsConnect, TlsConnect}; use bb8_postgres::tokio_postgres::Socket; +use bb8_postgres::PostgresConnectionManager; use geoengine_datatypes::dataset::{DataId, DatasetId}; use geoengine_datatypes::error::BoxedResultExt; use geoengine_datatypes::primitives::RasterQueryRectangle; @@ -27,6 +43,9 @@ use geoengine_operators::engine::{ use geoengine_operators::mock::MockDatasetDataSourceLoadingInfo; use geoengine_operators::source::{GdalLoadingInfo, OgrSourceDataset}; use postgres_types::{FromSql, ToSql}; +use snafu::ensure; +use tokio_postgres::Transaction; +use InternalUploadedDatasetStatus::{Available, Expired, Expires, UpdateExpired}; impl DatasetDb for ProPostgresDb where @@ -47,7 +66,8 @@ where <>::TlsConnect as TlsConnect>::Future: Send, { async fn list_datasets(&self, options: DatasetListOptions) -> Result> { - let conn = self.conn_pool.get().await?; + let mut conn = self.conn_pool.get().await?; + self.lazy_dataset_store_updates(&mut conn, None).await?; let mut pos = 3; let order_sql = if options.order == OrderBy::NameAsc { @@ -67,7 +87,13 @@ where pos += 1; (format!("AND d.tags @> ${pos}::text[]"), filter_tags.clone()) } else { - (String::new(), vec![]) + ( + format!( + "AND (d.tags IS NULL OR NOT d.tags @> '{{{}}}'::text[])", + ReservedTags::Deleted + ), + vec![], + ) }; let stmt = conn @@ -166,7 +192,10 @@ where } async fn load_dataset(&self, dataset: &DatasetId) -> Result { - let conn = self.conn_pool.get().await?; + let mut conn = self.conn_pool.get().await?; + self.lazy_dataset_store_updates(&mut conn, Some(dataset)) + .await?; + let stmt = conn .prepare( " @@ -210,7 +239,9 @@ where } async fn load_provenance(&self, dataset: &DatasetId) -> Result { - let conn = self.conn_pool.get().await?; + let mut conn = self.conn_pool.get().await?; + self.lazy_dataset_store_updates(&mut conn, Some(dataset)) + .await?; let stmt = conn .prepare( @@ -238,7 +269,9 @@ where } async fn load_loading_info(&self, dataset: &DatasetId) -> Result { - let conn = self.conn_pool.get().await?; + let mut conn = self.conn_pool.get().await?; + self.lazy_dataset_store_updates(&mut conn, Some(dataset)) + .await?; let stmt = conn .prepare( @@ -264,7 +297,8 @@ where &self, dataset_name: &DatasetName, ) -> Result> { - let conn = self.conn_pool.get().await?; + let mut conn = self.conn_pool.get().await?; + self.lazy_dataset_store_updates(&mut conn, None).await?; resolve_dataset_name_to_id(&conn, dataset_name).await } @@ -275,7 +309,8 @@ where limit: u32, offset: u32, ) -> Result> { - let connection = self.conn_pool.get().await?; + let mut conn = self.conn_pool.get().await?; + self.lazy_dataset_store_updates(&mut conn, None).await?; let limit = i64::from(limit); let offset = i64::from(offset); @@ -294,7 +329,7 @@ where String::new() }; - let stmt = connection + let stmt = conn .prepare(&format!( " SELECT @@ -311,7 +346,7 @@ where )) .await?; - let rows = connection.query(&stmt, &query_params).await?; + let rows = conn.query(&stmt, &query_params).await?; Ok(rows.iter().map(|row| row.get(0)).collect()) } @@ -383,6 +418,13 @@ where return Err(geoengine_operators::error::Error::PermissionDenied); }; + let uploaded_status = self.uploaded_dataset_status_in_tx(&id, &tx).await; + if let Ok(status) = uploaded_status { + if matches!(status, UploadedDatasetStatus::Deleted { .. }) { + return Err(geoengine_operators::error::Error::DatasetDeleted { id }); + } + } + let stmt = tx .prepare( " @@ -467,6 +509,13 @@ where return Err(geoengine_operators::error::Error::PermissionDenied); }; + let uploaded_status = self.uploaded_dataset_status_in_tx(&id, &tx).await; + if let Ok(status) = uploaded_status { + if matches!(status, UploadedDatasetStatus::Deleted { .. }) { + return Err(geoengine_operators::error::Error::DatasetDeleted { id }); + } + } + let stmt = tx .prepare( " @@ -691,36 +740,44 @@ where let mut conn = self.conn_pool.get().await?; let tx = conn.build_transaction().start().await?; - self.ensure_permission_in_tx(dataset_id.into(), Permission::Owner, &tx) - .await - .boxed_context(crate::error::PermissionDb)?; + let is_user_upload = self.is_user_upload_in_tx(&dataset_id, &tx).await?; + if !is_user_upload { + self.ensure_permission_in_tx(dataset_id.into(), Permission::Owner, &tx) + .await + .boxed_context(crate::error::PermissionDb)?; - let stmt = tx - .prepare( - " - SELECT - TRUE - FROM - user_permitted_datasets p JOIN datasets d - ON (p.dataset_id = d.id) - WHERE - d.id = $1 AND p.user_id = $2 AND p.permission = 'Owner';", - ) - .await?; + let stmt = tx + .prepare( + " + SELECT + TRUE + FROM + user_permitted_datasets p JOIN datasets d + ON (p.dataset_id = d.id) + WHERE + d.id = $1 AND p.user_id = $2 AND p.permission = 'Owner';", + ) + .await?; - let rows = tx - .query(&stmt, &[&dataset_id, &self.session.user.id]) - .await?; + let rows = tx + .query(&stmt, &[&dataset_id, &self.session.user.id]) + .await?; - if rows.is_empty() { - return Err(Error::OperationRequiresOwnerPermission); - } + if rows.is_empty() { + return Err(Error::OperationRequiresOwnerPermission); + } - let stmt = tx.prepare("DELETE FROM datasets WHERE id = $1;").await?; + let stmt = tx.prepare("DELETE FROM datasets WHERE id = $1;").await?; - tx.execute(&stmt, &[&dataset_id]).await?; + tx.execute(&stmt, &[&dataset_id]).await?; - tx.commit().await?; + tx.commit().await?; + + return Ok(()); + } + + self.expire_uploaded_dataset(ChangeDatasetExpiration::delete_full(dataset_id)) + .await?; Ok(()) } @@ -831,146 +888,1570 @@ impl From for crate::datasets::upload::FileUpload { } } -#[cfg(test)] -mod tests { - use std::path::PathBuf; - - use super::*; - use crate::{ - contexts::{ApplicationContext, SessionContext}, - pro::{ - contexts::ProPostgresContext, - ge_context, - users::{UserAuth, UserSession}, - }, - }; - use geoengine_datatypes::{ - collections::VectorDataType, - primitives::{CacheTtlSeconds, FeatureDataType, Measurement}, - spatial_reference::SpatialReference, - }; - use geoengine_operators::{ - engine::{StaticMetaData, VectorColumnInfo}, - source::{ - CsvHeader, FormatSpecifics, OgrSourceColumnSpec, OgrSourceDatasetTimeType, - OgrSourceDurationSpec, OgrSourceErrorSpec, OgrSourceTimeFormat, - }, - }; - use tokio_postgres::NoTls; +#[async_trait] +impl TxUploadedUserDatasetStore> for ProPostgresDb +where + Tls: MakeTlsConnect + Clone + Send + Sync + 'static + std::fmt::Debug, + >::Stream: Send + Sync, + >::TlsConnect: Send, + <>::TlsConnect as TlsConnect>::Future: Send, +{ + async fn is_user_upload_in_tx(&self, dataset_id: &DatasetId, tx: &Transaction) -> Result { + self.ensure_permission_in_tx((*dataset_id).into(), Permission::Read, tx) + .await + .boxed_context(crate::error::PermissionDb)?; - #[ge_context::test] - async fn it_autocompletes_datasets(app_ctx: ProPostgresContext) { - let session_a = app_ctx.create_anonymous_session().await.unwrap(); - let session_b = app_ctx.create_anonymous_session().await.unwrap(); + let stmt = tx + .prepare( + " + SELECT + TRUE + FROM + uploaded_user_datasets + WHERE + dataset_id = $1;", + ) + .await?; - let db_a = app_ctx.session_context(session_a.clone()).db(); - let db_b = app_ctx.session_context(session_b.clone()).db(); + let result = tx.query_opt(&stmt, &[&dataset_id]).await?; - add_single_dataset(&db_a, &session_a).await; + return Ok(result.is_some()); + } - assert_eq!( - db_a.dataset_autocomplete_search(None, "Ogr".to_owned(), 10, 0) - .await - .unwrap(), - vec!["Ogr Test"] - ); - assert_eq!( - db_a.dataset_autocomplete_search( - Some(vec!["upload".to_string()]), - "Ogr".to_owned(), - 10, - 0 - ) + async fn get_dataset_access_status_in_tx( + &self, + dataset_id: &DatasetId, + tx: &Transaction, + ) -> Result { + let permissions = self + .get_user_permissions_in_tx(*dataset_id, tx) .await - .unwrap(), - vec!["Ogr Test"] - ); + .boxed_context(crate::error::PermissionDb)?; + let uploaded = self.uploaded_dataset_status_in_tx(dataset_id, tx).await; + let access_status = if let Ok(user_upload) = uploaded { + if let UploadedDatasetStatus::Deleted(expiration) = &user_upload { + if matches!(expiration.deletion_type, DeleteRecordAndData) { + return Err(UnknownDatasetId); + } + } + DatasetAccessStatus { + id: *dataset_id, + dataset_type: DatasetType::UserUpload(user_upload), + permissions, + } + } else { + let stmt = tx + .prepare( + " + SELECT + TRUE + FROM + user_permitted_datasets p JOIN datasets d + ON (p.dataset_id = d.id) + WHERE + d.id = $1 AND p.user_id = $2;", + ) + .await?; - // check that other user B cannot access datasets of user A + let rows = tx + .query(&stmt, &[&dataset_id, &self.session.user.id]) + .await?; - assert!(db_b - .dataset_autocomplete_search(None, "Ogr".to_owned(), 10, 0) - .await - .unwrap() - .is_empty()); - assert!(db_b - .dataset_autocomplete_search(Some(vec!["upload".to_string()]), "Ogr".to_owned(), 10, 0) - .await - .unwrap() - .is_empty()); + if rows.is_empty() { + return Err(UnknownDatasetId); + } + + DatasetAccessStatus { + id: *dataset_id, + dataset_type: DatasetType::NonUserUpload, + permissions, + } + }; + + Ok(access_status) } - async fn add_single_dataset(db: &ProPostgresDb, session: &UserSession) { - let loading_info = OgrSourceDataset { - file_name: PathBuf::from("test.csv"), - layer_name: "test.csv".to_owned(), - data_type: Some(VectorDataType::MultiPoint), - time: OgrSourceDatasetTimeType::Start { - start_field: "start".to_owned(), - start_format: OgrSourceTimeFormat::Auto, - duration: OgrSourceDurationSpec::Zero, - }, - default_geometry: None, - columns: Some(OgrSourceColumnSpec { - format_specifics: Some(FormatSpecifics::Csv { - header: CsvHeader::Auto, - }), - x: "x".to_owned(), - y: None, - int: vec![], - float: vec![], - text: vec![], - bool: vec![], - datetime: vec![], - rename: None, - }), - force_ogr_time_filter: false, - force_ogr_spatial_filter: false, - on_error: OgrSourceErrorSpec::Ignore, - sql_query: None, - attribute_query: None, - cache_ttl: CacheTtlSeconds::default(), + async fn validate_expiration_request_in_tx( + &self, + dataset_id: &DatasetId, + expiration: &Expiration, + tx: &Transaction, + ) -> Result<()> { + let (status, deletion_type, legal_expiration): ( + InternalUploadedDatasetStatus, + Option, + bool, + ) = if let Some(timestamp) = expiration.deletion_timestamp { + let stmt = tx + .prepare( + " + SELECT + status, + deletion_type, + $2 >= CURRENT_TIMESTAMP as legal_expiration + FROM + uploaded_user_datasets + WHERE + dataset_id = $1;", + ) + .await?; + let row = tx + .query_opt(&stmt, &[&dataset_id, ×tamp]) + .await? + .ok_or(UnknownDatasetId)?; + (row.get(0), row.get(1), row.get::(2)) + } else { + let stmt = tx + .prepare( + " + SELECT + status, + deletion_type, + TRUE as legal_expiration + FROM + uploaded_user_datasets + WHERE + dataset_id = $1;", + ) + .await?; + let row = tx + .query_opt(&stmt, &[&dataset_id]) + .await? + .ok_or(UnknownDatasetId)?; + (row.get(0), row.get(1), row.get::(2)) }; - let meta_data = MetaDataDefinition::OgrMetaData(StaticMetaData::< - OgrSourceDataset, - VectorResultDescriptor, - VectorQueryRectangle, - > { - loading_info: loading_info.clone(), - result_descriptor: VectorResultDescriptor { - data_type: VectorDataType::MultiPoint, - spatial_reference: SpatialReference::epsg_4326().into(), - columns: [( - "foo".to_owned(), - VectorColumnInfo { - data_type: FeatureDataType::Float, - measurement: Measurement::Unitless, - }, - )] - .into_iter() - .collect(), - time: None, - bbox: None, - }, - phantom: Default::default(), - }); + match status { + Available | Expires => { + if !legal_expiration { + return Err(ExpirationTimestampInPast { + dataset: (*dataset_id).into(), + }); + } + } + Expired | UpdateExpired | Deleted => { + if matches!(expiration.deletion_type, DeleteData) + && matches!(deletion_type, Some(DeleteRecordAndData)) + { + return Err(IllegalExpirationUpdate { + dataset: (*dataset_id).into(), + reason: "Prior expiration already deleted data and record".to_string(), + }); + } + if expiration.deletion_timestamp.is_some() { + return Err(IllegalExpirationUpdate { + dataset: (*dataset_id).into(), + reason: "Setting expiration after deletion".to_string(), + }); + } + } + DeletedWithError => { + return Err(IllegalDatasetStatus { + dataset: (*dataset_id).into(), + status: "Dataset was deleted, but an error occurred during deletion" + .to_string(), + }); + } + } + Ok(()) + } - let dataset_name = DatasetName::new(Some(session.user.id.to_string()), "my_dataset"); + async fn uploaded_dataset_status_in_tx( + &self, + dataset_id: &DatasetId, + tx: &Transaction, + ) -> Result { + self.ensure_permission_in_tx((*dataset_id).into(), Permission::Read, tx) + .await + .boxed_context(crate::error::PermissionDb)?; - db.add_dataset( - AddDataset { - name: Some(dataset_name.clone()), - display_name: "Ogr Test".to_owned(), - description: "desc".to_owned(), - source_operator: "OgrSource".to_owned(), - symbology: None, - provenance: None, - tags: Some(vec!["upload".to_owned(), "test".to_owned()]), - }, - meta_data, - ) - .await - .unwrap(); + self.update_uploaded_datasets_status_in_tx(Some(dataset_id), tx) + .await?; + + let stmt = tx + .prepare( + " + SELECT + status, expiration, deletion_type + FROM + uploaded_user_datasets + WHERE + dataset_id = $1;", + ) + .await?; + + let result = tx + .query_opt(&stmt, &[&dataset_id]) + .await? + .ok_or(error::Error::UnknownDatasetId)?; + + let internal_status: InternalUploadedDatasetStatus = result.get(0); + let expiration_timestamp = result.get(1); + let dataset_deletion_type = result.get(2); + + let status = internal_status.convert_to_uploaded_dataset_status( + dataset_id, + expiration_timestamp, + dataset_deletion_type, + )?; + + Ok(status) + } + + async fn lazy_dataset_store_updates( + &self, + conn: &mut PooledConnection>, + dataset_id: Option<&DatasetId>, + ) -> Result<()> { + let tx = conn.build_transaction().start().await?; + self.update_uploaded_datasets_status_in_tx(dataset_id, &tx) + .await?; + tx.commit().await?; + + Ok(()) + } + + async fn expire_uploaded_dataset_in_tx( + &self, + expire_dataset: ChangeDatasetExpiration, + tx: &Transaction, + ) -> Result<()> { + self.ensure_permission_in_tx(expire_dataset.dataset_id.into(), Permission::Owner, tx) + .await + .boxed_context(error::PermissionDb)?; + + self.update_uploaded_datasets_status_in_tx(Some(&expire_dataset.dataset_id), tx) + .await?; + + match expire_dataset.expiration_change { + ExpirationChange::SetExpire(expiration) => { + self.set_expire_for_uploaded_dataset(&expire_dataset.dataset_id, &expiration, tx) + .await?; + } + ExpirationChange::UnsetExpire => { + self.unset_expire_for_uploaded_dataset(&expire_dataset.dataset_id, tx) + .await?; + } + } + self.update_uploaded_datasets_status_in_tx(Some(&expire_dataset.dataset_id), tx) + .await?; + + Ok(()) + } + + #[allow(clippy::too_many_lines)] + async fn update_uploaded_datasets_status_in_tx( + &self, + dataset_id: Option<&DatasetId>, + tx: &Transaction, + ) -> Result<()> { + fn create_filter( + session: &UserSession, + dataset_id: Option<&DatasetId>, + mut param_size: usize, + ) -> (String, Option, String, Option) { + let (user_filter, user_param) = if session.is_admin() { + (String::new(), None) + } else { + param_size += 1; + let filter = format!("AND up.user_id = ${param_size}").to_string(); + (filter, Some(session.user.id)) + }; + + let (dataset_filter, dataset_param) = if let Some(dataset_id) = dataset_id { + param_size += 1; + let filter = format!("AND up.dataset_id = ${param_size}").to_string(); + (filter, Some(*dataset_id)) + } else { + (String::new(), None) + }; + + (user_filter, user_param, dataset_filter, dataset_param) + } + + fn create_filter_params<'a>( + filter_params: &'a mut Vec<&'a (dyn ToSql + Sync)>, + user_id: Option<&'a UserId>, + dataset_id: Option<&'a DatasetId>, + ) -> &'a [&'a (dyn ToSql + Sync)] { + if let Some(user_id) = user_id { + filter_params.push(user_id); + } + if let Some(dataset_id) = dataset_id { + filter_params.push(dataset_id); + } + filter_params.as_slice() + } + + let (user_filter, user_param, dataset_filter, dataset_param) = + create_filter(&self.session, dataset_id, 1); + let tag_deletion = tx + .prepare( + format!(" + UPDATE + datasets + SET + tags = tags || '{{{}}}' + FROM + updatable_uploaded_user_datasets up + WHERE + datasets.id = up.dataset_id AND up.deletion_type = $1 {user_filter} {dataset_filter};", + ReservedTags::Deleted + ) + .as_str(), + ) + .await?; + let mut tag_deletion_params: Vec<&(dyn ToSql + Sync)> = vec![&DeleteData]; + tx.execute( + &tag_deletion, + create_filter_params( + &mut tag_deletion_params, + user_param.as_ref(), + dataset_param.as_ref(), + ), + ) + .await?; + + let mark_deletion = tx + .prepare( + format!(" + UPDATE + uploaded_user_datasets + SET + status = $1 + FROM + updatable_uploaded_user_datasets up + WHERE + uploaded_user_datasets.dataset_id = up.dataset_id {user_filter} {dataset_filter};" + ) + .as_str(), + ) + .await?; + let mut mark_deletion_params: Vec<&(dyn ToSql + Sync)> = vec![&Expired]; + tx.execute( + &mark_deletion, + create_filter_params( + &mut mark_deletion_params, + user_param.as_ref(), + dataset_param.as_ref(), + ), + ) + .await?; + + let (user_filter, user_param, dataset_filter, dataset_param) = + create_filter(&self.session, dataset_id, 2); + let delete_records = tx + .prepare( + format!(" + DELETE FROM + datasets + USING + uploaded_user_datasets up + WHERE + datasets.id = up.dataset_id AND up.status = $1 AND up.deletion_type = $2 {user_filter} {dataset_filter};").as_str(), + ) + .await?; + let mut delete_records_params: Vec<&(dyn ToSql + Sync)> = + vec![&Expired, &DeleteRecordAndData]; + tx.execute( + &delete_records, + create_filter_params( + &mut delete_records_params, + user_param.as_ref(), + dataset_param.as_ref(), + ), + ) + .await?; + Ok(()) + } + + async fn set_expire_for_uploaded_dataset( + &self, + dataset_id: &DatasetId, + expiration: &Expiration, + tx: &Transaction, + ) -> Result<()> { + let num_changes = if let Some(delete_timestamp) = expiration.deletion_timestamp { + let stmt = tx + .prepare(" + UPDATE uploaded_user_datasets + SET status = $2, expiration = $3, deletion_type = $4 + WHERE dataset_id = $1 AND $3 >= CURRENT_TIMESTAMP AND (status = $5 OR status = $6);", + ).await?; + tx.execute( + &stmt, + &[ + &dataset_id, + &Expires, + &delete_timestamp, + &expiration.deletion_type, + &Available, + &Expires, + ], + ) + .await? + } else { + let stmt = tx + .prepare( + " + UPDATE uploaded_user_datasets + SET status = $2, expiration = CURRENT_TIMESTAMP, deletion_type = $3 + WHERE dataset_id = $1 AND (status = $4 OR status = $5);", + ) + .await?; + let num_expired = tx + .execute( + &stmt, + &[ + &dataset_id, + &Expires, + &expiration.deletion_type, + &Available, + &Expires, + ], + ) + .await?; + + if num_expired == 0 && matches!(expiration.deletion_type, DeleteRecordAndData) { + let stmt = tx + .prepare( + " + UPDATE uploaded_user_datasets + SET deletion_type = $2, + status = $3 + WHERE dataset_id = $1 AND (status = $4 OR status = $5) AND deletion_type = $6;", + ) + .await?; + tx.execute( + &stmt, + &[ + &dataset_id, + &expiration.deletion_type, + &UpdateExpired, + &Expired, + &Deleted, + &DeleteData, + ], + ) + .await? + } else { + num_expired + } + }; + + if num_changes == 0 { + self.validate_expiration_request_in_tx(dataset_id, expiration, tx) + .await?; + }; + + Ok(()) + } + + async fn unset_expire_for_uploaded_dataset( + &self, + dataset_id: &DatasetId, + tx: &Transaction, + ) -> Result<()> { + let stmt = tx + .prepare( + " + UPDATE uploaded_user_datasets + SET status = $2, expiration = NULL, deletion_type = NULL + WHERE dataset_id = $1 AND status = $3;", + ) + .await?; + let set_changes = tx + .execute(&stmt, &[&dataset_id, &Available, &Expires]) + .await?; + if set_changes == 0 { + return Err(IllegalDatasetStatus { + dataset: (*dataset_id).into(), + status: "Requested dataset does not exist or does not have an expiration" + .to_string(), + }); + } + Ok(()) + } +} + +#[async_trait] +impl UploadedUserDatasetStore for ProPostgresDb +where + Tls: MakeTlsConnect + Clone + Send + Sync + 'static + std::fmt::Debug, + >::Stream: Send + Sync, + >::TlsConnect: Send, + <>::TlsConnect as TlsConnect>::Future: Send, +{ + async fn add_uploaded_dataset( + &self, + upload_id: UploadId, + dataset: AddDataset, + meta_data: MetaDataDefinition, + ) -> Result { + let id = DatasetId::new(); + let name = dataset.name.unwrap_or_else(|| DatasetName { + namespace: Some(self.session.user.id.to_string()), + name: id.to_string(), + }); + + log::info!( + "Adding dataset with name: {:?}, tags: {:?}", + name, + dataset.tags + ); + + self.check_namespace(&name)?; + + let typed_meta_data = meta_data.to_typed_metadata(); + + let mut conn = self.conn_pool.get().await?; + + let tx = conn.build_transaction().start().await?; + + tx.execute( + " + INSERT INTO datasets ( + id, + name, + display_name, + description, + source_operator, + result_descriptor, + meta_data, + symbology, + provenance, + tags + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::text[])", + &[ + &id, + &name, + &dataset.display_name, + &dataset.description, + &dataset.source_operator, + &typed_meta_data.result_descriptor, + typed_meta_data.meta_data, + &dataset.symbology, + &dataset.provenance, + &dataset.tags, + ], + ) + .await + .map_unique_violation("datasets", "name", || error::Error::InvalidDatasetName)?; + + let stmt = tx + .prepare( + " + INSERT INTO permissions ( + role_id, + dataset_id, + permission + ) + VALUES ($1, $2, $3)", + ) + .await?; + + tx.execute( + &stmt, + &[&RoleId::from(self.session.user.id), &id, &Permission::Owner], + ) + .await?; + + let stmt = tx + .prepare( + " + INSERT INTO uploaded_user_datasets ( + user_id, + upload_id, + dataset_id, + status, + created, + expiration, + deleted, + deletion_type + ) + VALUES ($1, $2, $3, 'Available', CURRENT_TIMESTAMP, NULL, NULL, NULL)", + ) + .await?; + + tx.execute( + &stmt, + &[&RoleId::from(self.session.user.id), &upload_id, &id], + ) + .await?; + + tx.commit().await?; + + Ok(DatasetIdAndName { id, name }) + } + + async fn expire_uploaded_dataset(&self, expire_dataset: ChangeDatasetExpiration) -> Result<()> { + let mut conn = self.conn_pool.get().await?; + let tx = conn.build_transaction().start().await?; + + self.expire_uploaded_dataset_in_tx(expire_dataset, &tx) + .await?; + + tx.commit().await?; + + Ok(()) + } + + async fn get_dataset_access_status( + &self, + dataset_id: &DatasetId, + ) -> Result { + let mut conn = self.conn_pool.get().await?; + self.lazy_dataset_store_updates(&mut conn, Some(dataset_id)) + .await?; + + let tx = conn.build_transaction().start().await?; + + let result = self.get_dataset_access_status_in_tx(dataset_id, &tx).await; + + tx.commit().await?; + + result + } + + async fn clear_expired_datasets(&self) -> Result { + ensure!(self.session.is_admin(), error::PermissionDenied); + + let mut conn = self.conn_pool.get().await?; + let tx = conn.build_transaction().start().await?; + + self.update_uploaded_datasets_status_in_tx(None, &tx) + .await?; + + let update_expired = tx + .prepare( + " + UPDATE + uploaded_user_datasets + SET + status = $1 + WHERE + status = $2 AND deleted IS NOT NULL;", + ) + .await?; + let mut updated = tx.execute(&update_expired, &[&Deleted, &Expired]).await?; + + let marked_datasets = tx + .prepare( + " + SELECT + dataset_id, upload_id + FROM + uploaded_user_datasets + WHERE + status = $1 AND deleted IS NULL;", + ) + .await?; + + let rows = tx.query(&marked_datasets, &[&Expired]).await?; + + let mut deleted = vec![]; + let mut deleted_with_error = vec![]; + + for row in rows { + let dataset_id: DatasetId = row.get(0); + let upload_id = row.get(1); + let res = delete_upload(upload_id).await; + if let Err(error) = res { + log::error!("Error during deletion of upload {upload_id} from dataset {dataset_id}: {error}, marking as DeletedWithError"); + deleted_with_error.push(upload_id); + } else { + deleted.push(upload_id); + } + updated += 1; //Could hypothetically overflow + } + + let mark_deletion = tx + .prepare( + " + UPDATE + uploaded_user_datasets + SET + status = $1, deleted = CURRENT_TIMESTAMP + WHERE + status = $2 AND upload_id = ANY($3);", + ) + .await?; + + if !deleted.is_empty() { + tx.execute(&mark_deletion, &[&Deleted, &Expired, &deleted]) + .await?; + } + + if !deleted_with_error.is_empty() { + tx.execute( + &mark_deletion, + &[&DeletedWithError, &Expired, &deleted_with_error], + ) + .await?; + } + + tx.commit().await?; + + Ok(updated) + } +} + +#[cfg(test)] +mod tests { + use std::fs; + use std::ops::{Add, Sub}; + use std::path::PathBuf; + + use super::*; + use crate::api::model::responses::IdResponse; + use crate::contexts::SessionId; + use crate::datasets::upload::UploadRootPath; + use crate::error::Error::PermissionDenied; + use crate::pro::permissions::{PermissionDb, Role}; + use crate::pro::users::{UserCredentials, UserRegistration}; + use crate::pro::util::tests::{admin_login, send_pro_test_request}; + use crate::pro::util::tests::{get_db_timestamp, get_db_timestamp_in_tx}; + use crate::util::tests::{SetMultipartBody, TestDataUploads}; + use crate::{ + contexts::{ApplicationContext, SessionContext}, + pro::{ + contexts::ProPostgresContext, + ge_context, + users::{UserAuth, UserSession}, + }, + }; + use actix_web::http::header; + use actix_web::test; + use actix_web_httpauth::headers::authorization::Bearer; + use geoengine_datatypes::primitives::{DateTime, Duration}; + use geoengine_datatypes::{ + collections::VectorDataType, + primitives::{CacheTtlSeconds, FeatureDataType, Measurement}, + spatial_reference::SpatialReference, + }; + use geoengine_operators::error::Error::DatasetDeleted; + use geoengine_operators::{ + engine::{StaticMetaData, VectorColumnInfo}, + source::{ + CsvHeader, FormatSpecifics, OgrSourceColumnSpec, OgrSourceDatasetTimeType, + OgrSourceDurationSpec, OgrSourceErrorSpec, OgrSourceTimeFormat, + }, + }; + use tokio_postgres::NoTls; + + #[ge_context::test] + async fn it_autocompletes_datasets(app_ctx: ProPostgresContext) { + let session_a = app_ctx.create_anonymous_session().await.unwrap(); + let session_b = app_ctx.create_anonymous_session().await.unwrap(); + + let db_a = app_ctx.session_context(session_a.clone()).db(); + let db_b = app_ctx.session_context(session_b.clone()).db(); + + add_single_dataset(&db_a, &session_a).await; + + assert_eq!( + db_a.dataset_autocomplete_search(None, "Ogr".to_owned(), 10, 0) + .await + .unwrap(), + vec!["Ogr Test"] + ); + assert_eq!( + db_a.dataset_autocomplete_search( + Some(vec!["upload".to_string()]), + "Ogr".to_owned(), + 10, + 0 + ) + .await + .unwrap(), + vec!["Ogr Test"] + ); + + // check that other user B cannot access datasets of user A + + assert!(db_b + .dataset_autocomplete_search(None, "Ogr".to_owned(), 10, 0) + .await + .unwrap() + .is_empty()); + assert!(db_b + .dataset_autocomplete_search(Some(vec!["upload".to_string()]), "Ogr".to_owned(), 10, 0) + .await + .unwrap() + .is_empty()); + } + + async fn add_single_dataset(db: &ProPostgresDb, session: &UserSession) -> DatasetName { + let loading_info = OgrSourceDataset { + file_name: PathBuf::from("test.csv"), + layer_name: "test.csv".to_owned(), + data_type: Some(VectorDataType::MultiPoint), + time: OgrSourceDatasetTimeType::Start { + start_field: "start".to_owned(), + start_format: OgrSourceTimeFormat::Auto, + duration: OgrSourceDurationSpec::Zero, + }, + default_geometry: None, + columns: Some(OgrSourceColumnSpec { + format_specifics: Some(FormatSpecifics::Csv { + header: CsvHeader::Auto, + }), + x: "x".to_owned(), + y: None, + int: vec![], + float: vec![], + text: vec![], + bool: vec![], + datetime: vec![], + rename: None, + }), + force_ogr_time_filter: false, + force_ogr_spatial_filter: false, + on_error: OgrSourceErrorSpec::Ignore, + sql_query: None, + attribute_query: None, + cache_ttl: CacheTtlSeconds::default(), + }; + + let meta_data = MetaDataDefinition::OgrMetaData(StaticMetaData::< + OgrSourceDataset, + VectorResultDescriptor, + VectorQueryRectangle, + > { + loading_info: loading_info.clone(), + result_descriptor: VectorResultDescriptor { + data_type: VectorDataType::MultiPoint, + spatial_reference: SpatialReference::epsg_4326().into(), + columns: [( + "foo".to_owned(), + VectorColumnInfo { + data_type: FeatureDataType::Float, + measurement: Measurement::Unitless, + }, + )] + .into_iter() + .collect(), + time: None, + bbox: None, + }, + phantom: Default::default(), + }); + + let dataset_name = DatasetName::new(Some(session.user.id.to_string()), "my_dataset"); + + db.add_dataset( + AddDataset { + name: Some(dataset_name.clone()), + display_name: "Ogr Test".to_owned(), + description: "desc".to_owned(), + source_operator: "OgrSource".to_owned(), + symbology: None, + provenance: None, + tags: Some(vec!["upload".to_owned(), "test".to_owned()]), + }, + meta_data, + ) + .await + .unwrap(); + + dataset_name + } + + const TEST_POINT_DATASET_SOURCE_PATH: &str = "vector/data/points.fgb"; + + struct TestDatasetDefinition { + meta_data: MetaDataDefinition, + dataset_name: DatasetName, + } + + struct UploadedTestDataset { + dataset_name: DatasetName, + dataset_id: DatasetId, + upload_id: UploadId, + } + + fn test_point_dataset(name_space: Option, name: &str) -> TestDatasetDefinition { + let local_path = PathBuf::from(TEST_POINT_DATASET_SOURCE_PATH); + let file_name = local_path.file_name().unwrap().to_str().unwrap(); + let loading_info = OgrSourceDataset { + file_name: PathBuf::from(file_name), + layer_name: file_name.to_owned(), + data_type: Some(VectorDataType::MultiPoint), + time: OgrSourceDatasetTimeType::None, + default_geometry: None, + columns: Some(OgrSourceColumnSpec { + format_specifics: None, + x: "x".to_owned(), + y: Some("y".to_owned()), + int: vec!["num".to_owned()], + float: vec![], + text: vec!["txt".to_owned()], + bool: vec![], + datetime: vec![], + rename: None, + }), + force_ogr_time_filter: false, + force_ogr_spatial_filter: false, + on_error: OgrSourceErrorSpec::Ignore, + sql_query: None, + attribute_query: None, + cache_ttl: CacheTtlSeconds::default(), + }; + + let meta_data = MetaDataDefinition::OgrMetaData(StaticMetaData::< + OgrSourceDataset, + VectorResultDescriptor, + VectorQueryRectangle, + > { + loading_info: loading_info.clone(), + result_descriptor: VectorResultDescriptor { + data_type: VectorDataType::MultiPoint, + spatial_reference: SpatialReference::epsg_4326().into(), + columns: [ + ( + "num".to_owned(), + VectorColumnInfo { + data_type: FeatureDataType::Int, + measurement: Measurement::Unitless, + }, + ), + ( + "txt".to_owned(), + VectorColumnInfo { + data_type: FeatureDataType::Text, + measurement: Measurement::Unitless, + }, + ), + ] + .into_iter() + .collect(), + time: None, + bbox: None, + }, + phantom: Default::default(), + }); + + let dataset_name = DatasetName::new(name_space, name); + + TestDatasetDefinition { + meta_data, + dataset_name, + } + } + + async fn upload_point_dataset( + app_ctx: &ProPostgresContext, + session_id: SessionId, + ) -> UploadId { + let files = + vec![geoengine_datatypes::test_data!(TEST_POINT_DATASET_SOURCE_PATH).to_path_buf()]; + + let req = actix_web::test::TestRequest::post() + .uri("/upload") + .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string()))) + .set_multipart_files(&files); + + let res = send_pro_test_request(req, app_ctx.clone()).await; + assert_eq!(res.status(), 200); + let upload: IdResponse = test::read_body_json(res).await; + + upload.id + } + + async fn upload_and_add_point_dataset( + app_ctx: &ProPostgresContext, + user_session: &UserSession, + name: &str, + upload_dir: &mut TestDataUploads, + ) -> UploadedTestDataset { + let test_dataset = test_point_dataset(Some(user_session.user.id.to_string()), name); + let upload_id = upload_point_dataset(app_ctx, user_session.id).await; + + let res = app_ctx + .session_context(user_session.clone()) + .db() + .add_uploaded_dataset( + upload_id, + AddDataset { + name: Some(test_dataset.dataset_name.clone()), + display_name: "Ogr Test".to_owned(), + description: "desc".to_owned(), + source_operator: "OgrSource".to_owned(), + symbology: None, + provenance: None, + tags: Some(vec!["upload".to_owned(), "test".to_owned()]), + }, + test_dataset.meta_data.clone(), + ) + .await + .unwrap(); + + upload_dir.uploads.push(upload_id); + + UploadedTestDataset { + dataset_name: test_dataset.dataset_name, + dataset_id: res.id, + upload_id, + } + } + + async fn add_test_volume_dataset(app_ctx: &ProPostgresContext) -> DatasetId { + let admin_session = admin_login(app_ctx).await; + let admin_ctx = app_ctx.session_context(admin_session.clone()); + let db = admin_ctx.db(); + let dataset_name = add_single_dataset(&db, &admin_session).await; + let dataset_id = db + .resolve_dataset_name_to_id(&dataset_name) + .await + .unwrap() + .unwrap(); + + db.add_permission( + Role::registered_user_role_id(), + dataset_id, + Permission::Read, + ) + .await + .unwrap(); + + db.add_permission(Role::anonymous_role_id(), dataset_id, Permission::Read) + .await + .unwrap(); + + dataset_id + } + + fn listing_not_deleted(dataset: &DatasetListing, origin: &UploadedTestDataset) -> bool { + dataset.name == origin.dataset_name + && !dataset.tags.contains(&ReservedTags::Deleted.to_string()) + } + + fn dataset_deleted(dataset: &Dataset, origin: &UploadedTestDataset) -> bool { + let tags = dataset.tags.clone().unwrap(); + let mut num_deleted = 0; + for tag in tags { + if tag == ReservedTags::Deleted.to_string() { + num_deleted += 1; + } + } + dataset.name == origin.dataset_name && num_deleted == 1 + } + + fn dir_exists(origin: &UploadedTestDataset) -> bool { + let path = origin.upload_id.root_path().unwrap(); + fs::read_dir(path).is_ok() + } + + fn has_read_and_owner_permissions(permissions: &[Permission]) { + assert_eq!(permissions.len(), 2); + assert!(permissions.contains(&Permission::Read)); + assert!(permissions.contains(&Permission::Owner)); + } + + async fn register_test_user(app_ctx: &ProPostgresContext) -> UserSession { + let _user_id = app_ctx + .register_user(UserRegistration { + email: "test@localhost".to_string(), + real_name: "Foo Bar".to_string(), + password: "test".to_string(), + }) + .await + .unwrap(); + + app_ctx + .login(UserCredentials { + email: "test@localhost".to_string(), + password: "test".to_string(), + }) + .await + .unwrap() + } + + async fn expire_in_tx_time_duration( + app_ctx: &ProPostgresContext, + user_session: &UserSession, + dataset_id: DatasetId, + fair: bool, + duration: Duration, + ) -> DateTime { + let mut conn = app_ctx.pool.get().await.unwrap(); + let tx = conn.build_transaction().start().await.unwrap(); + + let db = app_ctx.session_context(user_session.clone()).db(); + + let current_time = get_db_timestamp_in_tx(&tx).await; + let future_time = current_time.add(duration); + + let change_dataset_expiration = if fair { + ChangeDatasetExpiration::expire_fair(dataset_id, future_time) + } else { + ChangeDatasetExpiration::expire_full(dataset_id, future_time) + }; + + db.expire_uploaded_dataset_in_tx(change_dataset_expiration, &tx) + .await + .unwrap(); + + tx.commit().await.unwrap(); + + future_time + } + + #[ge_context::test] + async fn it_lists_datasets_without_tags(app_ctx: ProPostgresContext) { + let admin_session = admin_login(&app_ctx).await; + let admin_ctx = app_ctx.session_context(admin_session.clone()); + let db = admin_ctx.db(); + let test_dataset = test_point_dataset(None, "test_data"); + + let ds = AddDataset { + name: None, + display_name: "TestData".to_string(), + description: "TestData without tags".to_string(), + source_operator: "OgrSource".to_string(), + symbology: None, + provenance: None, + tags: None, + }; + + db.add_dataset(ds, test_dataset.meta_data).await.unwrap(); + + let default_list_options = DatasetListOptions { + filter: None, + order: OrderBy::NameAsc, + offset: 0, + limit: 10, + tags: None, + }; + + let listing = db + .list_datasets(default_list_options.clone()) + .await + .unwrap(); + + assert_eq!(listing.len(), 1); + } + + #[ge_context::test] + async fn it_deletes_datasets(app_ctx: ProPostgresContext) { + let mut test_data = TestDataUploads::default(); + let user_session = register_test_user(&app_ctx).await; + + let available = + upload_and_add_point_dataset(&app_ctx, &user_session, "available", &mut test_data) + .await; + let fair = + upload_and_add_point_dataset(&app_ctx, &user_session, "fair", &mut test_data).await; + let full = + upload_and_add_point_dataset(&app_ctx, &user_session, "full", &mut test_data).await; + + let db = app_ctx.session_context(user_session.clone()).db(); + + let default_list_options = DatasetListOptions { + filter: None, + order: OrderBy::NameAsc, + offset: 0, + limit: 10, + tags: None, + }; + + let listing = db + .list_datasets(default_list_options.clone()) + .await + .unwrap(); + + assert_eq!(listing.len(), 3); + assert!(listing_not_deleted(listing.first().unwrap(), &available)); + assert!(listing_not_deleted(listing.get(1).unwrap(), &fair)); + assert!(listing_not_deleted(listing.get(2).unwrap(), &full)); + + db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_fair(fair.dataset_id)) + .await + .unwrap(); + db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_full(full.dataset_id)) + .await + .unwrap(); + + let listing = db + .list_datasets(default_list_options.clone()) + .await + .unwrap(); + + assert_eq!(listing.len(), 1); + assert!(listing_not_deleted(listing.first().unwrap(), &available)); + assert!(dataset_deleted( + &db.load_dataset(&fair.dataset_id).await.unwrap(), + &fair + )); + assert!(matches!( + db.load_dataset(&full.dataset_id).await.unwrap_err(), + UnknownDatasetId + )); + + assert!(dir_exists(&available)); + assert!(dir_exists(&fair)); + assert!(dir_exists(&full)); + + let admin_session = admin_login(&app_ctx).await; + let admin_ctx = app_ctx.session_context(admin_session.clone()); + let deleted = admin_ctx.db().clear_expired_datasets().await.unwrap(); + + assert_eq!(deleted, 2); + assert!(dir_exists(&available)); + assert!(!dir_exists(&fair)); + assert!(!dir_exists(&full)); + + let deleted = admin_ctx.db().clear_expired_datasets().await.unwrap(); + assert_eq!(deleted, 0); + } + + #[ge_context::test] + async fn it_expires_dataset(app_ctx: ProPostgresContext) { + let mut test_data = TestDataUploads::default(); + let user_session = register_test_user(&app_ctx).await; + + let fair = + upload_and_add_point_dataset(&app_ctx, &user_session, "fair", &mut test_data).await; + + let db = app_ctx.session_context(user_session.clone()).db(); + + let default_list_options = DatasetListOptions { + filter: None, + order: OrderBy::NameAsc, + offset: 0, + limit: 10, + tags: None, + }; + + expire_in_tx_time_duration( + &app_ctx, + &user_session, + fair.dataset_id, + true, + Duration::seconds(3), + ) + .await; + + let listing = db + .list_datasets(default_list_options.clone()) + .await + .unwrap(); + + assert_eq!(listing.len(), 1); + assert!(listing_not_deleted(listing.first().unwrap(), &fair)); + + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + let listing = db + .list_datasets(default_list_options.clone()) + .await + .unwrap(); + + assert_eq!(listing.len(), 0); + assert!(dataset_deleted( + &db.load_dataset(&fair.dataset_id).await.unwrap(), + &fair + )); + } + + #[ge_context::test] + async fn it_updates_expiring_dataset(app_ctx: ProPostgresContext) { + let mut test_data = TestDataUploads::default(); + let user_session = register_test_user(&app_ctx).await; + + let fair = + upload_and_add_point_dataset(&app_ctx, &user_session, "fair", &mut test_data).await; + let fair2full = + upload_and_add_point_dataset(&app_ctx, &user_session, "fair2full", &mut test_data) + .await; + + let db = app_ctx.session_context(user_session.clone()).db(); + + let default_list_options = DatasetListOptions { + filter: None, + order: OrderBy::NameAsc, + offset: 0, + limit: 10, + tags: None, + }; + + expire_in_tx_time_duration( + &app_ctx, + &user_session, + fair.dataset_id, + true, + Duration::seconds(5), + ) + .await; + expire_in_tx_time_duration( + &app_ctx, + &user_session, + fair.dataset_id, + true, + Duration::seconds(10), + ) + .await; + expire_in_tx_time_duration( + &app_ctx, + &user_session, + fair2full.dataset_id, + true, + Duration::seconds(5), + ) + .await; + expire_in_tx_time_duration( + &app_ctx, + &user_session, + fair2full.dataset_id, + false, + Duration::seconds(5), + ) + .await; + + let listing = db + .list_datasets(default_list_options.clone()) + .await + .unwrap(); + + assert_eq!(listing.len(), 2); + assert!(listing_not_deleted(listing.first().unwrap(), &fair)); + assert!(listing_not_deleted(listing.get(1).unwrap(), &fair2full)); + + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + let listing = db + .list_datasets(default_list_options.clone()) + .await + .unwrap(); + + assert_eq!(listing.len(), 1); + assert!(listing_not_deleted(listing.first().unwrap(), &fair)); + assert!(matches!( + db.load_dataset(&fair2full.dataset_id).await.unwrap_err(), + UnknownDatasetId + )); + + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + + let listing = db + .list_datasets(default_list_options.clone()) + .await + .unwrap(); + assert_eq!(listing.len(), 0); + assert!(dataset_deleted( + &db.load_dataset(&fair.dataset_id).await.unwrap(), + &fair + )); + } + + #[allow(clippy::too_many_lines)] + #[ge_context::test] + async fn it_updates_expired_dataset(app_ctx: ProPostgresContext) { + let mut test_data = TestDataUploads::default(); + let user_session = register_test_user(&app_ctx).await; + + let db = app_ctx.session_context(user_session.clone()).db(); + let default_list_options = DatasetListOptions { + filter: None, + order: OrderBy::NameAsc, + offset: 0, + limit: 10, + tags: None, + }; + + let fair2full = + upload_and_add_point_dataset(&app_ctx, &user_session, "fair2full", &mut test_data) + .await; + db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_fair(fair2full.dataset_id)) + .await + .unwrap(); + assert!(dataset_deleted( + &db.load_dataset(&fair2full.dataset_id).await.unwrap(), + &fair2full + )); + + let admin_session = admin_login(&app_ctx).await; + let admin_ctx = app_ctx.session_context(admin_session.clone()); + let deleted = admin_ctx.db().clear_expired_datasets().await.unwrap(); + assert_eq!(deleted, 1); + + db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_full(fair2full.dataset_id)) + .await + .unwrap(); + assert!(matches!( + db.load_dataset(&fair2full.dataset_id).await.unwrap_err(), + UnknownDatasetId + )); + + let deleted = admin_ctx.db().clear_expired_datasets().await.unwrap(); + assert_eq!(deleted, 1); + + assert!(db + .expire_uploaded_dataset(ChangeDatasetExpiration::delete_fair(fair2full.dataset_id)) + .await + .is_err()); + + let fair2available = + upload_and_add_point_dataset(&app_ctx, &user_session, "fair2available", &mut test_data) + .await; + + expire_in_tx_time_duration( + &app_ctx, + &user_session, + fair2available.dataset_id, + true, + Duration::seconds(3), + ) + .await; + db.expire_uploaded_dataset(ChangeDatasetExpiration::unset_expire( + fair2available.dataset_id, + )) + .await + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + let deleted = admin_ctx.db().clear_expired_datasets().await.unwrap(); + assert_eq!(deleted, 0); + + let listing = db + .list_datasets(default_list_options.clone()) + .await + .unwrap(); + assert_eq!(listing.len(), 1); + assert!(listing_not_deleted( + listing.first().unwrap(), + &fair2available + )); + + assert!(dir_exists(&fair2available)); + assert!(!dir_exists(&fair2full)); + } + + #[ge_context::test] + async fn it_handles_dataset_status(app_ctx: ProPostgresContext) { + let mut test_data = TestDataUploads::default(); + + let volume_dataset = add_test_volume_dataset(&app_ctx).await; + + let user_session = register_test_user(&app_ctx).await; + let db = app_ctx.session_context(user_session.clone()).db(); + + let access_status = db.get_dataset_access_status(&volume_dataset).await.unwrap(); + assert!(matches!( + access_status.dataset_type, + DatasetType::NonUserUpload + )); + assert_eq!(access_status.permissions, vec![Permission::Read]); + + let user_dataset = + upload_and_add_point_dataset(&app_ctx, &user_session, "user_dataset", &mut test_data) + .await + .dataset_id; + + let access_status = db.get_dataset_access_status(&user_dataset).await.unwrap(); + assert!(matches!( + access_status.dataset_type, + DatasetType::UserUpload(UploadedDatasetStatus::Available) + )); + has_read_and_owner_permissions(&access_status.permissions); + + let future_time = expire_in_tx_time_duration( + &app_ctx, + &user_session, + user_dataset, + true, + Duration::seconds(3), + ) + .await; + let access_status = db.get_dataset_access_status(&user_dataset).await.unwrap(); + assert!(matches!( + access_status.dataset_type, + DatasetType::UserUpload(UploadedDatasetStatus::Expires(ex)) if ex.deletion_timestamp.unwrap() == future_time + )); + has_read_and_owner_permissions(&access_status.permissions); + + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + let access_status = db.get_dataset_access_status(&user_dataset).await.unwrap(); + assert!(matches!( + access_status.dataset_type, + DatasetType::UserUpload(UploadedDatasetStatus::Deleted(ex)) if ex.deletion_timestamp.unwrap() == future_time + )); + has_read_and_owner_permissions(&access_status.permissions); + + db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_full(user_dataset)) + .await + .unwrap(); + + let access_status = db.get_dataset_access_status(&user_dataset).await; + assert!(matches!(access_status, Err(UnknownDatasetId))); + } + + #[ge_context::test] + async fn it_handles_expiration_errors(app_ctx: ProPostgresContext) { + let mut test_data = TestDataUploads::default(); + let user_session = register_test_user(&app_ctx).await; + + let current_time = get_db_timestamp(&app_ctx).await; + let future_time = current_time.add(Duration::hours(1)); + let past_time = current_time.sub(Duration::hours(1)); + + let db = app_ctx.session_context(user_session.clone()).db(); + + //Expire before current time + let test_dataset = + upload_and_add_point_dataset(&app_ctx, &user_session, "fair2full", &mut test_data) + .await; + let err = db + .expire_uploaded_dataset(ChangeDatasetExpiration::expire_fair( + test_dataset.dataset_id, + past_time, + )) + .await; + assert!(err.is_err()); + assert!(matches!(err.unwrap_err(), ExpirationTimestampInPast { .. })); + + //Unset expire for non-expiring dataset + let err = db + .expire_uploaded_dataset(ChangeDatasetExpiration::unset_expire( + test_dataset.dataset_id, + )) + .await; + assert!(err.is_err()); + assert!(matches!(err.unwrap_err(), IllegalDatasetStatus { .. })); + + //Expire already deleted + db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_fair( + test_dataset.dataset_id, + )) + .await + .unwrap(); + let err = db + .expire_uploaded_dataset(ChangeDatasetExpiration::expire_fair( + test_dataset.dataset_id, + future_time, + )) + .await; + assert!(err.is_err()); + assert!(matches!(err.unwrap_err(), IllegalExpirationUpdate { .. })); + + // Call meta data for deleted + let err: std::result::Result< + Box>, + geoengine_operators::error::Error, + > = db + .meta_data(&DataId::Internal { + dataset_id: test_dataset.dataset_id, + }) + .await; + assert!(err.is_err()); + assert!(matches!(err.unwrap_err(), DatasetDeleted { .. })); + + //Clear without admin permission + let err = db.clear_expired_datasets().await; + assert!(err.is_err()); + assert!(matches!(err.unwrap_err(), PermissionDenied)); } } diff --git a/services/src/pro/datasets/storage.rs b/services/src/pro/datasets/storage.rs new file mode 100644 index 000000000..bdeeaf224 --- /dev/null +++ b/services/src/pro/datasets/storage.rs @@ -0,0 +1,273 @@ +use async_trait::async_trait; +use bb8_postgres::bb8::{ManageConnection, PooledConnection}; +use postgres_types::{FromSql, ToSql}; +use serde::{Deserialize, Serialize}; +use tokio_postgres::Transaction; +use utoipa::ToSchema; + +use geoengine_datatypes::dataset::DatasetId; +use geoengine_datatypes::primitives::DateTime; + +use crate::datasets::storage::MetaDataDefinition; +use crate::datasets::upload::UploadId; +use crate::datasets::{AddDataset, DatasetIdAndName}; +use crate::error::Error::IllegalDatasetStatus; +use crate::error::Result; +use crate::pro::datasets::storage::DatasetDeletionType::{DeleteData, DeleteRecordAndData}; +use crate::pro::permissions::Permission; + +#[derive(Deserialize, Serialize, Debug, Clone, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct Expiration { + pub deletion_timestamp: Option, + pub deletion_type: DatasetDeletionType, +} + +#[derive(Deserialize, Serialize, Debug, Clone, ToSchema, FromSql, ToSql)] +pub enum DatasetDeletionType { + DeleteRecordAndData, + DeleteData, +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct ChangeDatasetExpiration { + pub dataset_id: DatasetId, + pub expiration_change: ExpirationChange, +} + +impl ChangeDatasetExpiration { + pub fn delete_fair(dataset_id: DatasetId) -> Self { + ChangeDatasetExpiration { + dataset_id, + expiration_change: ExpirationChange::SetExpire(Expiration { + deletion_timestamp: None, + deletion_type: DeleteData, + }), + } + } + + pub fn delete_full(dataset_id: DatasetId) -> Self { + ChangeDatasetExpiration { + dataset_id, + expiration_change: ExpirationChange::SetExpire(Expiration { + deletion_timestamp: None, + deletion_type: DeleteRecordAndData, + }), + } + } + + pub fn expire_fair(dataset_id: DatasetId, timestamp: DateTime) -> Self { + ChangeDatasetExpiration { + dataset_id, + expiration_change: ExpirationChange::SetExpire(Expiration { + deletion_timestamp: Some(timestamp), + deletion_type: DeleteData, + }), + } + } + + pub fn expire_full(dataset_id: DatasetId, timestamp: DateTime) -> Self { + ChangeDatasetExpiration { + dataset_id, + expiration_change: ExpirationChange::SetExpire(Expiration { + deletion_timestamp: Some(timestamp), + deletion_type: DeleteRecordAndData, + }), + } + } + + pub fn unset_expire(dataset_id: DatasetId) -> Self { + ChangeDatasetExpiration { + dataset_id, + expiration_change: ExpirationChange::UnsetExpire, + } + } +} + +#[derive(Deserialize, Serialize, Debug, Clone, ToSchema)] +#[serde(rename_all = "camelCase", tag = "type")] +pub enum ExpirationChange { + SetExpire(Expiration), + UnsetExpire, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum UploadedDatasetStatus { + Available, + Expires(Expiration), + Deleted(Expiration), +} + +#[derive(Debug, FromSql, ToSql)] +pub enum InternalUploadedDatasetStatus { + Available, + Expires, + Expired, + UpdateExpired, + Deleted, + DeletedWithError, +} + +impl InternalUploadedDatasetStatus { + pub fn convert_to_uploaded_dataset_status( + &self, + dataset_id: &DatasetId, + expiration_timestamp: Option, + dataset_deletion_type: Option, + ) -> Result { + if matches!(self, InternalUploadedDatasetStatus::Available) + && expiration_timestamp.is_none() + && expiration_timestamp.is_none() + { + return Ok(UploadedDatasetStatus::Available); + } else if let Some(deletion_type) = dataset_deletion_type { + let expiration = Expiration { + deletion_timestamp: expiration_timestamp, + deletion_type, + }; + match self { + InternalUploadedDatasetStatus::Available => {} + InternalUploadedDatasetStatus::Expires => { + if expiration_timestamp.is_some() { + return Ok(UploadedDatasetStatus::Expires(expiration)); + } + } + InternalUploadedDatasetStatus::Expired + | InternalUploadedDatasetStatus::UpdateExpired + | InternalUploadedDatasetStatus::Deleted + | InternalUploadedDatasetStatus::DeletedWithError => { + return Ok(UploadedDatasetStatus::Deleted(expiration)); + } + } + } + Err(IllegalDatasetStatus { + dataset: (*dataset_id).into(), + status: "InternalUploadedDatasetStatus is not a legal configuration".to_string(), + }) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum DatasetType { + UserUpload(UploadedDatasetStatus), + NonUserUpload, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct DatasetAccessStatus { + pub id: DatasetId, + pub dataset_type: DatasetType, + pub permissions: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DatasetAccessStatusResponse { + pub id: DatasetId, + pub is_user_upload: bool, + pub is_available: bool, + pub expiration: Option, + pub permissions: Vec, +} + +impl From for DatasetAccessStatusResponse { + fn from(value: DatasetAccessStatus) -> Self { + let (is_user_upload, is_available, expiration) = match value.dataset_type { + DatasetType::UserUpload(upload) => match upload { + UploadedDatasetStatus::Available => (true, true, None), + UploadedDatasetStatus::Expires(expiration) => (true, true, Some(expiration)), + UploadedDatasetStatus::Deleted(expiration) => (true, false, Some(expiration)), + }, + DatasetType::NonUserUpload => (false, true, None), + }; + + DatasetAccessStatusResponse { + id: value.id, + is_user_upload, + is_available, + expiration, + permissions: value.permissions, + } + } +} + +/// internal functionality for transactional control of user-uploaded datasets db +/// +/// In contrast to the `UploadedUserDatasetStore` this is not to be used by services but only by the `ProPostgresDb` internally. +/// This is because services do not know about database transactions. +#[async_trait] +pub trait TxUploadedUserDatasetStore { + async fn is_user_upload_in_tx(&self, dataset_id: &DatasetId, tx: &Transaction) -> Result; + + async fn get_dataset_access_status_in_tx( + &self, + dataset_id: &DatasetId, + tx: &Transaction, + ) -> Result; + + async fn validate_expiration_request_in_tx( + &self, + dataset_id: &DatasetId, + expiration: &Expiration, + tx: &Transaction, + ) -> Result<()>; + + async fn uploaded_dataset_status_in_tx( + &self, + dataset_id: &DatasetId, + tx: &Transaction, + ) -> Result; + + /// Updates the status of datasets, because some datasets might have reached expiration + //TODO: Add some sort of periodic update for the status of datasets + async fn lazy_dataset_store_updates( + &self, + conn: &mut PooledConnection, + dataset_id: Option<&DatasetId>, + ) -> Result<()>; + + async fn expire_uploaded_dataset_in_tx( + &self, + expire_dataset: ChangeDatasetExpiration, + tx: &Transaction, + ) -> Result<()>; + + async fn update_uploaded_datasets_status_in_tx( + &self, + dataset_id: Option<&DatasetId>, + tx: &Transaction, + ) -> Result<()>; + + async fn set_expire_for_uploaded_dataset( + &self, + dataset_id: &DatasetId, + expiration: &Expiration, + tx: &Transaction, + ) -> Result<()>; + + async fn unset_expire_for_uploaded_dataset( + &self, + dataset_id: &DatasetId, + tx: &Transaction, + ) -> Result<()>; +} + +/// Storage of user-uploaded datasets +#[async_trait] +pub trait UploadedUserDatasetStore { + async fn add_uploaded_dataset( + &self, + upload_id: UploadId, + dataset: AddDataset, + meta_data: MetaDataDefinition, + ) -> Result; + + async fn expire_uploaded_dataset(&self, expire_dataset: ChangeDatasetExpiration) -> Result<()>; + + async fn get_dataset_access_status( + &self, + dataset_id: &DatasetId, + ) -> Result; + + async fn clear_expired_datasets(&self) -> Result; +} diff --git a/services/src/pro/permissions/postgres_permissiondb.rs b/services/src/pro/permissions/postgres_permissiondb.rs index 61b9e8107..fa79c09f6 100644 --- a/services/src/pro/permissions/postgres_permissiondb.rs +++ b/services/src/pro/permissions/postgres_permissiondb.rs @@ -10,9 +10,10 @@ use crate::pro::permissions::{ }; use async_trait::async_trait; use snafu::{ensure, ResultExt}; +use std::collections::HashSet; use tokio_postgres::{ tls::{MakeTlsConnect, TlsConnect}, - Socket, + Socket, Transaction, }; use uuid::Uuid; @@ -123,6 +124,13 @@ pub trait TxPermissionDb { limit: u32, tx: &tokio_postgres::Transaction<'_>, ) -> Result, PermissionDbError>; + + /// Get all permissions the current user has for this `resource`. + async fn get_user_permissions_in_tx + Send + Sync>( + &self, + resource: R, + tx: &tokio_postgres::Transaction<'_>, + ) -> Result, PermissionDbError>; } #[async_trait] @@ -370,6 +378,44 @@ where Ok(permissions) } + + async fn get_user_permissions_in_tx + Send + Sync>( + &self, + resource: R, + tx: &Transaction<'_>, + ) -> Result, PermissionDbError> { + let resource: ResourceId = resource.into(); + + let stmt = tx + .prepare(&format!( + " + SELECT + p.permission + FROM + permissions p JOIN user_roles r ON (p.role_id = r.role_id) + WHERE + {resource_type} = $1 AND r.user_id = $2 + ;", + resource_type = resource.resource_type_name() + )) + .await + .context(PostgresPermissionDbError)?; + + let rows = tx + .query(&stmt, &[&resource.uuid()?, &self.session.user.id]) + .await + .context(PostgresPermissionDbError)?; + + let mut all_permissions = HashSet::new(); + for row in rows { + let permission: Permission = row.get(0); + for implied_permission in permission.implied_permissions() { + all_permissions.insert(implied_permission); + } + } + + Ok(Vec::from_iter(all_permissions)) + } } #[async_trait] diff --git a/services/src/pro/util/tests.rs b/services/src/pro/util/tests.rs index 2f4d8dab0..107244d9c 100644 --- a/services/src/pro/util/tests.rs +++ b/services/src/pro/util/tests.rs @@ -42,6 +42,8 @@ use geoengine_operators::{ }; use tokio::runtime::Handle; use tokio_postgres::NoTls; +#[cfg(test)] +use tokio_postgres::Transaction; use super::config::{Cache, Quota}; @@ -449,6 +451,21 @@ where } } +#[cfg(test)] +#[allow(clippy::missing_panics_doc)] +pub async fn get_db_timestamp(app_ctx: &ProPostgresContext) -> DateTime { + let conn = app_ctx.pool.get().await.unwrap(); + let get_time_stmt = conn.prepare("SELECT CURRENT_TIMESTAMP;").await.unwrap(); + conn.query_one(&get_time_stmt, &[]).await.unwrap().get(0) +} + +#[cfg(test)] +#[allow(clippy::missing_panics_doc)] +pub async fn get_db_timestamp_in_tx(tx: &Transaction<'_>) -> DateTime { + let get_time_stmt = tx.prepare("SELECT CURRENT_TIMESTAMP;").await.unwrap(); + tx.query_one(&get_time_stmt, &[]).await.unwrap().get(0) +} + #[cfg(test)] pub(in crate::pro) mod mock_oidc { use crate::pro::users::{DefaultJsonWebKeySet, DefaultProviderMetadata};