Skip to content

Commit

Permalink
refactor: manifest error code (apache#1546)
Browse files Browse the repository at this point in the history
  • Loading branch information
zealchen authored and LeslieKid committed Sep 24, 2024
1 parent 23cab9a commit 4f3587a
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 290 deletions.
4 changes: 1 addition & 3 deletions src/analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,7 @@ pub enum Error {
CreateOpenFailedTable { table: String, backtrace: Backtrace },

#[snafu(display("Failed to open manifest, err:{}", source))]
OpenManifest {
source: crate::manifest::details::Error,
},
OpenManifest { source: crate::manifest::Error },

#[snafu(display("Failed to find table, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
TableNotExist { msg: String, backtrace: Backtrace },
Expand Down
130 changes: 19 additions & 111 deletions src/analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ use std::{
};

use async_trait::async_trait;
use generic_error::{BoxError, GenericError, GenericResult};
use generic_error::{BoxError, GenericResult};
use horaedbproto::manifest as manifest_pb;
use lazy_static::lazy_static;
use logger::{debug, info, warn};
use macros::define_result;
use object_store::{ObjectStoreRef, Path};
use parquet::data_type::AsBytes;
use prometheus::{exponential_buckets, register_histogram, Histogram};
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::{Backtrace, ResultExt, Snafu};
use table_engine::table::TableId;
use time_ext::ReadableDuration;
use tokio::sync::Mutex;
Expand All @@ -57,104 +55,12 @@ use crate::{
MetaEdit, MetaEditRequest, MetaUpdate, MetaUpdateDecoder, MetaUpdatePayload, Snapshot,
},
meta_snapshot::{MetaSnapshot, MetaSnapshotBuilder},
LoadRequest, Manifest, SnapshotRequest,
Error, LoadRequest, Manifest, Result, SnapshotRequest,
},
space::SpaceId,
table::data::{TableDataRef, TableShardInfo},
};

#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum Error {
#[snafu(display(
"Failed to encode payloads, wal_location:{:?}, err:{}",
wal_location,
source
))]
EncodePayloads {
wal_location: WalLocation,
source: wal::manager::Error,
},

#[snafu(display("Failed to write update to wal, err:{}", source))]
WriteWal { source: wal::manager::Error },

#[snafu(display("Failed to read wal, err:{}", source))]
ReadWal { source: wal::manager::Error },

#[snafu(display("Failed to read log entry, err:{}", source))]
ReadEntry { source: wal::manager::Error },

#[snafu(display("Failed to apply table meta update, err:{}", source))]
ApplyUpdate {
source: crate::manifest::meta_snapshot::Error,
},

#[snafu(display("Failed to clean wal, err:{}", source))]
CleanWal { source: wal::manager::Error },

#[snafu(display(
"Failed to store snapshot, err:{}.\nBacktrace:\n{:?}",
source,
backtrace
))]
StoreSnapshot {
source: object_store::ObjectStoreError,
backtrace: Backtrace,
},

#[snafu(display(
"Failed to fetch snapshot, err:{}.\nBacktrace:\n{:?}",
source,
backtrace
))]
FetchSnapshot {
source: object_store::ObjectStoreError,
backtrace: Backtrace,
},

#[snafu(display(
"Failed to decode snapshot, err:{}.\nBacktrace:\n{:?}",
source,
backtrace
))]
DecodeSnapshot {
source: prost::DecodeError,
backtrace: Backtrace,
},

#[snafu(display("Failed to build snapshot, msg:{}.\nBacktrace:\n{:?}", msg, backtrace))]
BuildSnapshotNoCause { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to build snapshot, msg:{}, err:{}", msg, source))]
BuildSnapshotWithCause { msg: String, source: GenericError },

#[snafu(display(
"Failed to apply edit to table, msg:{}.\nBacktrace:\n{:?}",
msg,
backtrace
))]
ApplyUpdateToTableNoCause { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to apply edit to table, msg:{}, err:{}", msg, source))]
ApplyUpdateToTableWithCause { msg: String, source: GenericError },

#[snafu(display(
"Failed to apply snapshot to table, msg:{}.\nBacktrace:\n{:?}",
msg,
backtrace
))]
ApplySnapshotToTableNoCause { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to apply snapshot to table, msg:{}, err:{}", msg, source))]
ApplySnapshotToTableWithCause { msg: String, source: GenericError },

#[snafu(display("Failed to load snapshot, err:{}", source))]
LoadSnapshot { source: GenericError },
}

define_result!(Error);

lazy_static! {
static ref RECOVER_TABLE_META_FROM_SNAPSHOT_DURATION: Histogram = register_histogram!(
"recover_table_meta_from_snapshot_duration",
Expand Down Expand Up @@ -197,7 +103,7 @@ impl MetaUpdateLogEntryIterator for MetaUpdateReaderImpl {
.iter
.next_log_entries(decoder, |_| true, buffer)
.await
.context(ReadEntry)?;
.map_err(anyhow::Error::new)?;
}

match self.buffer.pop_front() {
Expand Down Expand Up @@ -277,7 +183,7 @@ where
latest_seq = seq;
manifest_data_builder
.apply_update(update)
.context(ApplyUpdate)?;
.map_err(anyhow::Error::new)?;
}
Ok(Snapshot {
end_seq: latest_seq,
Expand All @@ -302,7 +208,7 @@ where
latest_seq = seq;
manifest_data_builder
.apply_update(update)
.context(ApplyUpdate)?;
.map_err(anyhow::Error::new)?;
has_logs = true;
}

Expand Down Expand Up @@ -633,7 +539,7 @@ impl MetaUpdateSnapshotStore for ObjectStoreBasedSnapshotStore {
self.store
.put(&self.snapshot_path, payload.into())
.await
.context(StoreSnapshot)?;
.map_err(anyhow::Error::new)?;

Ok(())
}
Expand Down Expand Up @@ -661,15 +567,13 @@ impl MetaUpdateSnapshotStore for ObjectStoreBasedSnapshotStore {
}

let payload = get_res
.context(FetchSnapshot)?
.map_err(anyhow::Error::new)?
.bytes()
.await
.context(FetchSnapshot)?;
.map_err(anyhow::Error::new)?;
let snapshot_pb =
manifest_pb::Snapshot::decode(payload.as_bytes()).context(DecodeSnapshot)?;
let snapshot = Snapshot::try_from(snapshot_pb)
.box_err()
.context(LoadSnapshot)?;
manifest_pb::Snapshot::decode(payload.as_bytes()).map_err(anyhow::Error::new)?;
let snapshot = Snapshot::try_from(snapshot_pb).map_err(anyhow::Error::new)?;

Ok(Some(snapshot))
}
Expand Down Expand Up @@ -702,7 +606,7 @@ impl MetaUpdateLogStore for WalBasedLogStore {
.wal_manager
.read_batch(&ctx, &read_req)
.await
.context(ReadWal)?;
.map_err(anyhow::Error::new)?;

Ok(MetaUpdateReaderImpl {
iter,
Expand All @@ -714,8 +618,12 @@ impl MetaUpdateLogStore for WalBasedLogStore {
async fn append(&self, meta_update: MetaUpdate) -> Result<SequenceNumber> {
let payload = MetaUpdatePayload::from(meta_update);
let log_batch_encoder = LogBatchEncoder::create(self.location);
let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
wal_location: self.location,
let log_batch = log_batch_encoder.encode(&payload).map_err(|e| {
anyhow::anyhow!(
"Failed to encode payloads, wal_location:{:?}, err:{}",
self.location,
e
)
})?;

let write_ctx = WriteContext {
Expand All @@ -725,14 +633,14 @@ impl MetaUpdateLogStore for WalBasedLogStore {
self.wal_manager
.write(&write_ctx, &log_batch)
.await
.context(WriteWal)
.map_err(|e| Error::from(anyhow::Error::new(e)))
}

async fn delete_up_to(&self, inclusive_end: SequenceNumber) -> Result<()> {
self.wal_manager
.mark_delete_entries_up_to(self.location, inclusive_end)
.await
.context(CleanWal)
.map_err(|e| Error::from(anyhow::Error::new(e)))
}
}

Expand Down
47 changes: 47 additions & 0 deletions src/analytic_engine/src/manifest/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use thiserror::Error;

use crate::ErrorKind;

#[derive(Debug, Error)]
#[error(transparent)]
pub struct Error(#[from] InnerError);

impl From<anyhow::Error> for Error {
fn from(source: anyhow::Error) -> Self {
Self(InnerError::Other { source })
}
}

impl Error {
pub fn kind(&self) -> ErrorKind {
match self.0 {
InnerError::Other { .. } => ErrorKind::Internal,
}
}
}

#[derive(Error, Debug)]
pub(crate) enum InnerError {
#[error(transparent)]
Other {
#[from]
source: anyhow::Error,
},
}
Loading

0 comments on commit 4f3587a

Please sign in to comment.