Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FAIR dataset deletion #970

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion operators/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::util::statistics::StatisticsError;
use bb8_postgres::bb8;
use geoengine_datatypes::dataset::{DataId, NamedData};
use geoengine_datatypes::dataset::{DataId, DatasetId, NamedData};
use geoengine_datatypes::error::ErrorSource;
use geoengine_datatypes::primitives::{FeatureDataType, TimeInterval};
use geoengine_datatypes::raster::RasterDataType;
Expand Down Expand Up @@ -472,6 +472,11 @@ pub enum Error {
Bb8Postgres {
source: bb8::RunError<tokio_postgres::Error>,
},

#[snafu(display("Dataset {} cannot be accessed, because it was deleted", id))]
DatasetDeleted {
michaelmattig marked this conversation as resolved.
Show resolved Hide resolved
id: DatasetId,
},
}

impl From<crate::adapters::SparseTilesFillAdapterError> for Error {
Expand Down
1 change: 1 addition & 0 deletions services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ walkdir = "2.4"
zip = "2.1"
assert-json-diff = "2.0.2"
sha2 = "0.10.8"
strum_macros = "0.26.1"

[target.'cfg(target_os = "linux")'.dependencies]
nix = { version = "0.29", features = ["socket"] }
Expand Down
4 changes: 3 additions & 1 deletion services/src/api/handlers/datasets.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::datasets::storage::{check_reserved_tags, ReservedTags};
use crate::{
api::model::{
operators::{GdalLoadingInfoTemporalSlice, GdalMetaDataList},
Expand Down Expand Up @@ -635,7 +636,8 @@ pub async fn create_upload_dataset<C: ApplicationContext>(
let db = app_ctx.session_context(session).db();
let upload = db.load_upload(upload_id).await.context(UploadNotFound)?;

add_tag(&mut definition.properties, "upload".to_owned());
check_reserved_tags(&definition.properties.tags);
add_tag(&mut definition.properties, ReservedTags::Upload.to_string());

adjust_meta_data_path(&mut definition.meta_data, &upload)
.context(CannotResolveUploadFilePath)?;
Expand Down
47 changes: 5 additions & 42 deletions services/src/api/handlers/upload.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use crate::api::model::responses::IdResponse;
use crate::contexts::{ApplicationContext, SessionContext};
use crate::datasets::upload::{FileId, FileUpload, Upload, UploadDb, UploadId, UploadRootPath};
use crate::datasets::upload::{create_upload, Upload, UploadDb, UploadId, UploadRootPath};
use crate::error::Error;
use crate::error::Result;
use crate::error::{self, Error};
use crate::util::path_with_base_path;
use actix_multipart::Multipart;
use actix_web::{web, FromRequest, Responder};
use futures::StreamExt;
use gdal::vector::LayerAccess;
use geoengine_datatypes::util::Identifier;
use geoengine_operators::util::gdal::gdal_open_dataset;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use std::path::Path;
use tokio::{fs, io::AsyncWriteExt};
use tokio::fs;
use utoipa::{ToResponse, ToSchema};

pub(crate) fn init_upload_routes<C>(cfg: &mut web::ServiceConfig)
Expand Down Expand Up @@ -70,43 +67,9 @@ impl<'a> ToSchema<'a> for FileUploadRequest {
async fn upload_handler<C: ApplicationContext>(
session: C::Session,
app_ctx: web::Data<C>,
mut body: Multipart,
body: Multipart,
) -> Result<web::Json<IdResponse<UploadId>>> {
let upload_id = UploadId::new();

let root = upload_id.root_path()?;

fs::create_dir_all(&root).await.context(error::Io)?;

let mut files: Vec<FileUpload> = vec![];
while let Some(item) = body.next().await {
let mut field = item?;
let file_name = field
.content_disposition()
.ok_or(error::Error::UploadFieldMissingFileName)?
.get_filename()
.ok_or(error::Error::UploadFieldMissingFileName)?
.to_owned();

let file_id = FileId::new();
let mut file = fs::File::create(root.join(&file_name))
.await
.context(error::Io)?;

let mut byte_size = 0_u64;
while let Some(chunk) = field.next().await {
let bytes = chunk?;
file.write_all(&bytes).await.context(error::Io)?;
byte_size += bytes.len() as u64;
}
file.flush().await.context(error::Io)?;

files.push(FileUpload {
id: file_id,
name: file_name,
byte_size,
});
}
let (upload_id, files) = create_upload(body).await?;

app_ctx
.session_context(session)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use async_trait::async_trait;
use tokio_postgres::Transaction;

use crate::error::Result;

use super::database_migration::{DatabaseVersion, Migration};

/// This migration adds new delete options for uploaded user datasets
pub struct Migration0012FairUploadDeletion;

#[async_trait]
impl Migration for Migration0012FairUploadDeletion {
fn prev_version(&self) -> Option<DatabaseVersion> {
Some("0011_remove_xgb".into())
}

fn version(&self) -> DatabaseVersion {
"0012_fair_upload_deletion".into()
}

async fn migrate(&self, _tx: &Transaction<'_>) -> Result<()> {
Ok(())
}
}
3 changes: 3 additions & 0 deletions services/src/contexts/migrations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub use crate::contexts::migrations::{
migration_0009_oidc_tokens::Migration0009OidcTokens,
migration_0010_s2_stack_time_buffers::Migration0010S2StacTimeBuffers,
migration_0011_remove_xgb::Migration0011RemoveXgb,
migration_0012_fair_upload_deletion::Migration0012FairUploadDeletion,
};
pub use database_migration::{
initialize_database, migrate_database, DatabaseVersion, Migration, MigrationResult,
Expand All @@ -30,6 +31,7 @@ pub mod migration_0008_band_names;
pub mod migration_0009_oidc_tokens;
pub mod migration_0010_s2_stack_time_buffers;
pub mod migration_0011_remove_xgb;
pub mod migration_0012_fair_upload_deletion;

#[cfg(test)]
mod schema_info;
Expand All @@ -55,6 +57,7 @@ pub fn all_migrations() -> Vec<Box<dyn Migration>> {
Box::new(Migration0009OidcTokens),
Box::new(Migration0010S2StacTimeBuffers),
Box::new(Migration0011RemoveXgb),
Box::new(Migration0012FairUploadDeletion),
]
}

Expand Down
2 changes: 1 addition & 1 deletion services/src/contexts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use migrations::{
Migration0004DatasetListingProviderPrio, Migration0005GbifColumnSelection,
Migration0006EbvProvider, Migration0007OwnerRole, Migration0008BandNames,
Migration0009OidcTokens, Migration0010S2StacTimeBuffers, Migration0011RemoveXgb,
MigrationResult,
Migration0012FairUploadDeletion, MigrationResult,
};
pub use postgres::{PostgresContext, PostgresDb, PostgresSessionContext};
pub use session::{MockableSession, Session, SessionId, SimpleSession};
Expand Down
24 changes: 24 additions & 0 deletions services/src/datasets/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use geoengine_operators::{mock::MockDatasetDataSourceLoadingInfo, source::GdalMe
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use std::fmt::Debug;
use std::str::FromStr;
use strum_macros;
use strum_macros::{Display, EnumString};
use utoipa::ToSchema;
use uuid::Uuid;
use validator::{Validate, ValidationError};
Expand Down Expand Up @@ -95,6 +98,13 @@ pub struct AutoCreateDataset {
pub tags: Option<Vec<String>>,
}

#[derive(Display, EnumString)]
pub enum ReservedTags {
#[strum(serialize = "upload")]
Upload,
Deleted,
}

fn validate_main_file(main_file: &str) -> Result<(), ValidationError> {
if main_file.is_empty() || main_file.contains('/') || main_file.contains("..") {
return Err(ValidationError::new("Invalid upload file name"));
Expand All @@ -114,6 +124,20 @@ pub fn validate_tags(tags: &Vec<String>) -> Result<(), ValidationError> {
Ok(())
}

pub fn check_reserved_tags(tags: &Option<Vec<String>>) {
if let Some(tags) = tags {
for tag in tags {
let conversion = ReservedTags::from_str(tag.as_str());
if let Ok(reserved) = conversion {
log::warn!(
"Adding a new dataset with a reserved tag: {}",
reserved.to_string()
);
}
}
}
}

#[derive(Deserialize, Serialize, Debug, Clone, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct SuggestMetaData {
Expand Down
53 changes: 53 additions & 0 deletions services/src/datasets/upload.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use actix_multipart::Multipart;
use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf};

Expand All @@ -9,7 +10,12 @@ use crate::{
util::config::{self, get_config_element},
};
use async_trait::async_trait;
use futures_util::StreamExt;
use geoengine_datatypes::util::Identifier;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::ResultExt;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use utoipa::ToSchema;

identifier!(UploadId);
Expand Down Expand Up @@ -118,6 +124,53 @@ pub struct UploadListing {
pub num_files: usize,
}

pub async fn create_upload(mut body: Multipart) -> Result<(UploadId, Vec<FileUpload>)> {
let upload_id = UploadId::new();

let root = upload_id.root_path()?;

fs::create_dir_all(&root).await.context(error::Io)?;

let mut files: Vec<FileUpload> = vec![];
while let Some(item) = body.next().await {
let mut field = item?;
let file_name = field
.content_disposition()
.ok_or(error::Error::UploadFieldMissingFileName)?
.get_filename()
.ok_or(error::Error::UploadFieldMissingFileName)?
.to_owned();

let file_id = FileId::new();
let mut file = fs::File::create(root.join(&file_name))
.await
.context(error::Io)?;

let mut byte_size = 0_u64;
while let Some(chunk) = field.next().await {
let bytes = chunk?;
file.write_all(&bytes).await.context(error::Io)?;
byte_size += bytes.len() as u64;
}
file.flush().await.context(error::Io)?;

files.push(FileUpload {
id: file_id,
name: file_name,
byte_size,
});
}

Ok((upload_id, files))
}

pub async fn delete_upload(upload_id: UploadId) -> Result<()> {
let root = upload_id.root_path()?;
log::debug!("Deleting {upload_id}");
fs::remove_dir_all(&root).await.context(error::Io)?;
Ok(())
}

#[async_trait]
pub trait UploadDb {
async fn load_upload(&self, upload: UploadId) -> Result<Upload>;
Expand Down
15 changes: 15 additions & 0 deletions services/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,21 @@ pub enum Error {
UnknownVolumeName {
volume_name: String,
},

#[snafu(display("Trying to set an expiration timestamp for Dataset {dataset} in the past"))]
ExpirationTimestampInPast {
dataset: DatasetId,
},
#[snafu(display("Illegal expiration update for Dataset {dataset}: {reason}"))]
IllegalExpirationUpdate {
dataset: DatasetId,
reason: String,
},
#[snafu(display("Illegal status for Dataset {dataset}: {status}"))]
IllegalDatasetStatus {
dataset: DatasetId,
status: String,
michaelmattig marked this conversation as resolved.
Show resolved Hide resolved
},
}

impl actix_web::error::ResponseError for Error {
Expand Down
12 changes: 11 additions & 1 deletion services/src/pro/api/apidoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ use crate::layers::listing::{
};
use crate::pro;
use crate::pro::api::handlers::users::{Quota, UpdateQuota};
use crate::pro::datasets::{
DatasetAccessStatusResponse, DatasetDeletionType, Expiration, ExpirationChange,
};
use crate::pro::permissions::{
Permission, PermissionListing, ResourceId, Role, RoleDescription, RoleId,
};
Expand Down Expand Up @@ -157,7 +160,10 @@ use utoipa::{Modify, OpenApi};
handlers::upload::upload_handler,
pro::api::handlers::permissions::add_permission_handler,
pro::api::handlers::permissions::remove_permission_handler,
pro::api::handlers::permissions::get_resource_permissions_handler
pro::api::handlers::permissions::get_resource_permissions_handler,
pro::api::handlers::datasets::set_dataset_expiration,
pro::api::handlers::datasets::get_dataset_status,
pro::api::handlers::datasets::gc_expired_datasets
),
components(
responses(
Expand Down Expand Up @@ -371,6 +377,10 @@ use utoipa::{Modify, OpenApi};
Volume,
VolumeName,
DataPath,
Expiration,
ExpirationChange,
DatasetDeletionType,
DatasetAccessStatusResponse,

PlotOutputFormat,
WrappedPlotOutput,
Expand Down
Loading
Loading