Skip to content

Commit

Permalink
feat: when the task is downloading, it is not allowed to delete the t…
Browse files Browse the repository at this point in the history
…ask (#828)

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Nov 8, 2024
1 parent 6e0a859 commit ee21989
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 78 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.115"
version = "0.1.116"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.115" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.115" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.115" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.115" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.115" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.115" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.115" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.116" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.116" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.116" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.116" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.116" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.116" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.116" }
thiserror = "1.0"
dragonfly-api = "=2.0.169"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
Expand Down
81 changes: 20 additions & 61 deletions dragonfly-client-storage/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl Task {
self.finished_at.is_none()
}

/// is_downloading returns whether the task is downloading.
/// is_uploading returns whether the task is uploading.
pub fn is_uploading(&self) -> bool {
self.uploading_count > 0
}
Expand Down Expand Up @@ -184,7 +184,7 @@ impl PersistentCacheTask {
self.finished_at.is_none()
}

/// is_downloading returns whether the persistent cache task is downloading.
/// is_uploading returns whether the persistent cache task is uploading.
pub fn is_uploading(&self) -> bool {
self.uploading_count > 0
}
Expand Down Expand Up @@ -947,60 +947,43 @@ mod tests {
assert!(task.response_header.is_empty());
assert_eq!(task.uploading_count, 0);
assert_eq!(task.uploaded_count, 0);
assert!(!task.is_finished());

// Test download_task_finished.
metadata.download_task_finished(task_id).unwrap();
let task = metadata.get_task(task_id).unwrap().unwrap();
assert!(
task.is_finished(),
"task should be finished after download_task_finished"
);
assert!(task.is_finished());

// Test upload_task_started.
metadata.upload_task_started(task_id).unwrap();
let task = metadata.get_task(task_id).unwrap().unwrap();
assert_eq!(
task.uploading_count, 1,
"uploading_count should be increased by 1 after upload_task_started"
);
assert_eq!(task.uploading_count, 1,);

// Test upload_task_finished.
metadata.upload_task_finished(task_id).unwrap();
let task = metadata.get_task(task_id).unwrap().unwrap();
assert_eq!(
task.uploading_count, 0,
"uploading_count should be decreased by 1 after upload_task_finished"
);
assert_eq!(
task.uploaded_count, 1,
"uploaded_count should be increased by 1 after upload_task_finished"
);
assert_eq!(task.uploading_count, 0,);
assert_eq!(task.uploaded_count, 1,);

// Test upload_task_failed.
let task = metadata.upload_task_started(task_id).unwrap();
assert_eq!(task.uploading_count, 1);
let task = metadata.upload_task_failed(task_id).unwrap();
assert_eq!(
task.uploading_count, 0,
"uploading_count should be decreased by 1 after upload_task_failed"
);
assert_eq!(
task.uploaded_count, 1,
"uploaded_count should not be changed after upload_task_failed"
);
assert_eq!(task.uploading_count, 0,);
assert_eq!(task.uploaded_count, 1,);

// Test get_tasks.
let task_id = "a535b115f18d96870f0422ac891f91dd162f2f391e4778fb84279701fcd02dd1";
metadata
.download_task_started(task_id, Some(1024), None, None)
.unwrap();
let tasks = metadata.get_tasks().unwrap();
assert_eq!(tasks.len(), 2, "should get 2 tasks in total");
assert_eq!(tasks.len(), 2);

// Test delete_task.
metadata.delete_task(task_id).unwrap();
let task = metadata.get_task(task_id).unwrap();
assert!(task.is_none(), "task should be deleted after delete_task");
assert!(task.is_none());
}

#[test]
Expand All @@ -1013,24 +996,15 @@ mod tests {
// Test download_piece_started.
metadata.download_piece_started(task_id, 1).unwrap();
let piece = metadata.get_piece(task_id, 1).unwrap().unwrap();
assert_eq!(
piece.number, 1,
"should get newly created piece with number specified"
);
assert_eq!(piece.number, 1,);

// Test download_piece_finished.
metadata
.download_piece_finished(task_id, 1, 0, 1024, "digest1", None)
.unwrap();
let piece = metadata.get_piece(task_id, 1).unwrap().unwrap();
assert_eq!(
piece.length, 1024,
"piece should be updated after download_piece_finished"
);
assert_eq!(
piece.digest, "digest1",
"piece should be updated after download_piece_finished"
);
assert_eq!(piece.length, 1024,);
assert_eq!(piece.digest, "digest1",);

// Test get_pieces.
metadata.download_piece_started(task_id, 2).unwrap();
Expand All @@ -1043,43 +1017,28 @@ mod tests {
metadata.download_piece_started(task_id, 3).unwrap();
metadata.download_piece_failed(task_id, 2).unwrap();
let piece = metadata.get_piece(task_id, 2).unwrap();
assert!(
piece.is_none(),
"piece should be deleted after download_piece_failed"
);
assert!(piece.is_none());

// Test upload_piece_started.
metadata.upload_piece_started(task_id, 3).unwrap();
let piece = metadata.get_piece(task_id, 3).unwrap().unwrap();
assert_eq!(
piece.uploading_count, 1,
"piece should be updated after upload_piece_started"
);
assert_eq!(piece.uploading_count, 1,);

// Test upload_piece_finished.
metadata.upload_piece_finished(task_id, 3).unwrap();
let piece = metadata.get_piece(task_id, 3).unwrap().unwrap();
assert_eq!(
piece.uploading_count, 0,
"piece should be updated after upload_piece_finished"
);
assert_eq!(
piece.uploaded_count, 1,
"piece should be updated after upload_piece_finished"
);
assert_eq!(piece.uploading_count, 0,);
assert_eq!(piece.uploaded_count, 1,);

// Test upload_piece_failed.
metadata.upload_piece_started(task_id, 3).unwrap();
metadata.upload_piece_failed(task_id, 3).unwrap();
let piece = metadata.get_piece(task_id, 3).unwrap().unwrap();
assert_eq!(
piece.uploading_count, 0,
"piece should be updated after upload_piece_failed"
);
assert_eq!(piece.uploading_count, 0,);

// Test delete_pieces.
metadata.delete_pieces(task_id).unwrap();
let pieces = metadata.get_pieces(task_id).unwrap();
assert!(pieces.is_empty(), "should get 0 pieces after delete_pieces");
assert!(pieces.is_empty());
}
}
32 changes: 31 additions & 1 deletion dragonfly-client/src/gc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use crate::grpc::scheduler::SchedulerClient;
use crate::shutdown;
use chrono::Utc;
use dragonfly_api::scheduler::v2::{DeletePersistentCacheTaskRequest, DeleteTaskRequest};
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::Result;
Expand All @@ -24,9 +25,14 @@ use dragonfly_client_storage::{
metadata, Storage,
};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{error, info, instrument};

// DOWNLOAD_TASK_TIMEOUT is the timeout of downloading the task. If the task download timeout, the
// task will be garbage collected by disk usage, default 2 hours.
pub const DOWNLOAD_TASK_TIMEOUT: Duration = Duration::from_secs(2 * 60 * 60);

/// GC is the garbage collector of dfdaemon.
pub struct GC {
/// config is the configuration of the dfdaemon.
Expand Down Expand Up @@ -185,6 +191,17 @@ impl GC {
}
};

// If the task is started and not finished, and the task download is not timeout,
// skip it.
if task.is_started()
&& !task.is_finished()
&& !task.is_failed()
&& (task.created_at + DOWNLOAD_TASK_TIMEOUT > Utc::now().naive_utc())
{
info!("task {} is started and not finished, skip it", task.id);
continue;
}

// Evict the task.
self.storage.delete_task(&task.id).await;

Expand Down Expand Up @@ -290,12 +307,25 @@ impl GC {
continue;
}

let task_space = task.content_length();
// If the task is started and not finished, and the task download is not timeout,
// skip it.
if task.is_started()
&& !task.is_finished()
&& !task.is_failed()
&& (task.created_at + DOWNLOAD_TASK_TIMEOUT > Utc::now().naive_utc())
{
info!(
"persistent cache task {} is started and not finished, skip it",
task.id
);
continue;
}

// Evict the task.
self.storage.delete_task(&task.id).await;

// Update the evicted space.
let task_space = task.content_length();
evicted_space += task_space;
info!(
"evict persistent cache task {} size {}",
Expand Down

0 comments on commit ee21989

Please sign in to comment.