Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse Rebuild IO handles #1755

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[submodule "spdk-rs"]
path = spdk-rs
url = https://github.com/openebs/spdk-rs
url = ../spdk-rs.git
branch = develop
[submodule "utils/dependencies"]
path = utils/dependencies
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions io-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ libc = "0.2.149"
log = "0.4.20"
md5 = "0.7.0"
merge = "0.1.0"
nix = { version = "0.27.1", default-features = false, features = [ "hostname", "net", "socket", "ioctl" ] }
nix = { version = "0.27.1", default-features = false, features = ["hostname", "net", "socket", "ioctl"] }
once_cell = "1.18.0"
parking_lot = "0.12.1"
pin-utils = "0.1.0"
Expand Down Expand Up @@ -102,9 +102,10 @@ async-process = { version = "1.8.1" }
rstack = { version = "0.3.3" }
tokio-stream = "0.1.14"
rustls = "0.21.12"
either = "1.9.0"

devinfo = { path = "../utils/dependencies/devinfo" }
jsonrpc = { path = "../jsonrpc"}
jsonrpc = { path = "../jsonrpc" }
io-engine-api = { path = "../utils/dependencies/apis/io-engine" }
spdk-rs = { path = "../spdk-rs" }
sysfs = { path = "../sysfs" }
Expand Down
9 changes: 5 additions & 4 deletions io-engine/src/bdev/nexus/nexus_bdev_children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,10 +1152,11 @@ impl<'n> Nexus<'n> {
// Cancel rebuild job for this child, if any.
if let Some(job) = child.rebuild_job() {
debug!("{self:?}: retire: stopping rebuild job...");
let terminated = job.force_fail();
Reactors::master().send_future(async move {
terminated.await.ok();
});
if let either::Either::Left(terminated) = job.force_fail() {
Reactors::master().send_future(async move {
terminated.await.ok();
});
}
}

debug!("{child:?}: retire: enqueuing device '{dev}' to retire");
Expand Down
22 changes: 14 additions & 8 deletions io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,18 @@ impl<'n> Nexus<'n> {
async fn terminate_rebuild(&self, child_uri: &str) {
// If a rebuild job is not found that's ok
// as we were just going to remove it anyway.
if let Ok(rj) = self.rebuild_job_mut(child_uri) {
let ch = rj.force_stop();
if let Err(e) = ch.await {
error!(
"Failed to wait on rebuild job for child {child_uri} \
let Ok(rj) = self.rebuild_job_mut(child_uri) else {
return;
};
let either::Either::Left(ch) = rj.force_stop() else {
return;
};
if let Err(e) = ch.await {
error!(
"Failed to wait on rebuild job for child {child_uri} \
to terminate with error {}",
e.verbose()
);
}
e.verbose()
);
}
}

Expand Down Expand Up @@ -355,6 +358,9 @@ impl<'n> Nexus<'n> {

// wait for the jobs to complete terminating
for job in terminated_jobs {
let either::Either::Left(job) = job else {
continue;
};
if let Err(e) = job.await {
error!(
"{:?}: error when waiting for the rebuild job \
Expand Down
5 changes: 4 additions & 1 deletion io-engine/src/grpc/v1/snapshot_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ impl SnapshotRebuildRpc for SnapshotRebuildService {
let Ok(job) = SnapshotRebuildJob::lookup(&args.uuid) else {
return Err(tonic::Status::not_found(""));
};
let rx = job.force_stop().await.ok();
let rx = match job.force_stop() {
either::Either::Left(chan) => chan.await,
either::Either::Right(stopped) => Ok(stopped),
};
info!("Snapshot Rebuild stopped: {rx:?}");
job.destroy();
Ok(())
Expand Down
5 changes: 4 additions & 1 deletion io-engine/src/rebuild/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ impl WithinRange<u64> for std::ops::Range<u64> {
/// Shutdown all pending snapshot rebuilds.
pub(crate) async fn shutdown_snapshot_rebuilds() {
let jobs = SnapshotRebuildJob::list().into_iter();
for recv in jobs.map(|job| job.force_stop()).collect::<Vec<_>>() {
for recv in jobs
.flat_map(|job| job.force_stop().left())
.collect::<Vec<_>>()
{
recv.await.ok();
}
}
Expand Down
29 changes: 12 additions & 17 deletions io-engine/src/rebuild/rebuild_descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ pub(super) struct RebuildDescriptor {
/// Pre-opened descriptor for the source block device.
#[allow(clippy::non_send_fields_in_send_ty)]
pub(super) src_descriptor: Box<dyn BlockDeviceDescriptor>,
pub(super) src_handle: Box<dyn BlockDeviceHandle>,
/// Pre-opened descriptor for destination block device.
#[allow(clippy::non_send_fields_in_send_ty)]
pub(super) dst_descriptor: Box<dyn BlockDeviceDescriptor>,
pub(super) dst_handle: Box<dyn BlockDeviceHandle>,
/// Start time of this rebuild.
pub(super) start_time: DateTime<Utc>,
}
Expand Down Expand Up @@ -90,9 +92,8 @@ impl RebuildDescriptor {
});
}

let source_hdl = RebuildDescriptor::io_handle(&*src_descriptor).await?;
let destination_hdl =
RebuildDescriptor::io_handle(&*dst_descriptor).await?;
let src_handle = RebuildDescriptor::io_handle(&*src_descriptor).await?;
let dst_handle = RebuildDescriptor::io_handle(&*dst_descriptor).await?;

let range = match range {
None => {
Expand All @@ -105,8 +106,8 @@ impl RebuildDescriptor {
};

if !Self::validate(
source_hdl.get_device(),
destination_hdl.get_device(),
src_handle.get_device(),
dst_handle.get_device(),
&range,
) {
return Err(RebuildError::InvalidSrcDstRange {});
Expand All @@ -123,7 +124,9 @@ impl RebuildDescriptor {
block_size,
segment_size_blks,
src_descriptor,
src_handle,
dst_descriptor,
dst_handle,
start_time: Utc::now(),
})
}
Expand Down Expand Up @@ -173,18 +176,14 @@ impl RebuildDescriptor {

/// Get a `BlockDeviceHandle` for the source.
#[inline(always)]
pub(super) async fn src_io_handle(
&self,
) -> Result<Box<dyn BlockDeviceHandle>, RebuildError> {
Self::io_handle(&*self.src_descriptor).await
pub(super) fn src_io_handle(&self) -> &dyn BlockDeviceHandle {
self.src_handle.as_ref()
}

/// Get a `BlockDeviceHandle` for the destination.
#[inline(always)]
pub(super) async fn dst_io_handle(
&self,
) -> Result<Box<dyn BlockDeviceHandle>, RebuildError> {
Self::io_handle(&*self.dst_descriptor).await
pub(super) fn dst_io_handle(&self) -> &dyn BlockDeviceHandle {
self.dst_handle.as_ref()
}

/// Get a `BlockDeviceHandle` for the given block device descriptor.
Expand Down Expand Up @@ -231,7 +230,6 @@ impl RebuildDescriptor {
) -> Result<bool, RebuildError> {
match self
.src_io_handle()
.await?
.readv_blocks_async(
iovs,
offset_blk,
Expand Down Expand Up @@ -269,7 +267,6 @@ impl RebuildDescriptor {
iovs: &[IoVec],
) -> Result<(), RebuildError> {
self.dst_io_handle()
.await?
.writev_blocks_async(
iovs,
offset_blk,
Expand All @@ -291,7 +288,6 @@ impl RebuildDescriptor {
) -> Result<(), RebuildError> {
// Read the source again.
self.src_io_handle()
.await?
.readv_blocks_async(
iovs,
offset_blk,
Expand All @@ -306,7 +302,6 @@ impl RebuildDescriptor {

match self
.dst_io_handle()
.await?
.comparev_blocks_async(
iovs,
offset_blk,
Expand Down
17 changes: 12 additions & 5 deletions io-engine/src/rebuild/rebuild_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,17 @@ impl RebuildJob {

/// Forcefully stops the job, overriding any pending client operation
/// returns an async channel which can be used to await for termination.
pub(crate) fn force_stop(&self) -> oneshot::Receiver<RebuildState> {
pub(crate) fn force_stop(
&self,
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.force_terminate(RebuildOperation::Stop)
}

/// Forcefully fails the job, overriding any pending client operation
/// returns an async channel which can be used to await for termination.
pub(crate) fn force_fail(&self) -> oneshot::Receiver<RebuildState> {
pub(crate) fn force_fail(
&self,
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.force_terminate(RebuildOperation::Fail)
}

Expand All @@ -179,10 +183,13 @@ impl RebuildJob {
fn force_terminate(
&self,
op: RebuildOperation,
) -> oneshot::Receiver<RebuildState> {
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.exec_internal_op(op).ok();
self.add_completion_listener()
.unwrap_or_else(|_| oneshot::channel().1)

match self.add_completion_listener() {
Ok(chan) => either::Either::Left(chan),
Err(_) => either::Either::Right(self.state()),
}
}

/// Get the rebuild stats.
Expand Down
2 changes: 1 addition & 1 deletion spdk-rs