Skip to content

Commit

Permalink
Misc logging minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Nov 13, 2024
1 parent 3dd1f5a commit b30370b
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 18 deletions.
18 changes: 16 additions & 2 deletions crates/bifrost/src/loglet/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,33 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_core::ShutdownError;
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;

use restate_core::ShutdownError;
use restate_types::logs::TailState;

use super::LogletOffset;

#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct TailOffsetWatch {
sender: watch::Sender<TailState<LogletOffset>>,
}

// Passthrough Debug to TailState
impl std::fmt::Debug for TailOffsetWatch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&*self.sender.borrow(), f)
}
}

// Passthrough Display to TailState
impl std::fmt::Display for TailOffsetWatch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&*self.sender.borrow(), f)
}
}

impl TailOffsetWatch {
pub fn new(tail: TailState<LogletOffset>) -> Self {
let sender = watch::Sender::new(tail);
Expand Down
11 changes: 4 additions & 7 deletions crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ enum LogletWrapperError {
}

/// Wraps loglets with the base LSN of the segment
#[derive(Clone, Debug)]
#[derive(Clone, derive_more::Debug)]
pub struct LogletWrapper {
segment_index: SegmentIndex,
/// The offset of the first record in the segment (if exists).
Expand All @@ -42,6 +42,7 @@ pub struct LogletWrapper {
pub(crate) base_lsn: Lsn,
/// If set, it points to the first first LSN outside the boundary of this loglet (bifrost's tail semantics)
pub(crate) tail_lsn: Option<Lsn>,
#[debug(skip)]
pub(crate) config: LogletConfig,
loglet: Arc<dyn Loglet>,
}
Expand Down Expand Up @@ -156,13 +157,9 @@ impl LogletWrapper {

#[instrument(
level = "trace",
skip(self, payloads),
skip(payloads),
ret,
fields(
segment_index = %self.segment_index,
loglet = ?self.loglet,
count = payloads.len(),
)
fields(batch_size = payloads.len())
)]
pub async fn append_batch(&self, payloads: Arc<[Record]>) -> Result<Lsn, AppendError> {
self.enqueue_batch(payloads).await?.await
Expand Down
2 changes: 2 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ use super::tasks::{CheckSealOutcome, CheckSealTask, FindTailResult};

#[derive(derive_more::Debug)]
pub(super) struct ReplicatedLoglet<T> {
#[debug(skip)]
log_id: LogId,
#[debug(skip)]
segment_index: SegmentIndex,
my_params: ReplicatedLogletParams,
#[debug(skip)]
Expand Down
8 changes: 4 additions & 4 deletions crates/bifrost/src/providers/replicated_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::time::Duration;

use async_trait::async_trait;
use dashmap::DashMap;
use tracing::trace;
use tracing::debug;

use restate_core::network::{MessageRouterBuilder, Networking, TransportConnect};
use restate_core::{TaskCenter, TaskKind};
Expand Down Expand Up @@ -163,13 +163,13 @@ impl<T: TransportConnect> ReplicatedLogletProvider<T> {
ReplicatedLogletError::LogletParamsParsingError(log_id, segment_index, e)
})?;

trace!(
debug!(
log_id = %log_id,
segment_index = %segment_index,
loglet_id = %params.loglet_id,
nodeset = ?params.nodeset,
nodeset = %params.nodeset,
sequencer = %params.sequencer,
replication = ?params.replication,
replication = %params.replication,
"Creating a replicated loglet client"
);

Expand Down
15 changes: 14 additions & 1 deletion crates/types/src/logs/tail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
/// Represents the state of the tail of the loglet.

use std::fmt::Display;

use super::{Lsn, SequenceNumber};

/// Represents the state of the tail of the loglet.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum TailState<Offset = Lsn> {
/// Loglet is open for appends
Expand All @@ -18,6 +21,16 @@ pub enum TailState<Offset = Lsn> {
Sealed(Offset),
}

/// "(S)" denotes that tail is sealed
impl<O: Display> Display for TailState<O> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Open(n) => write!(f, "{}", n),
Self::Sealed(n) => write!(f, "{} (S)", n),
}
}
}

impl<Offset: SequenceNumber> TailState<Offset> {
pub fn new(sealed: bool, offset: Offset) -> Self {
if sealed {
Expand Down
8 changes: 4 additions & 4 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,13 @@ where

debug!(
last_applied_lsn = %last_applied_lsn,
current_log_tail = ?current_tail,
current_log_tail = %current_tail,
"PartitionProcessor creating log reader",
);
if current_tail.offset() == last_applied_lsn.next() {
if self.status.replay_status != ReplayStatus::Active {
debug!(
?last_applied_lsn,
%last_applied_lsn,
"Processor has caught up with the log tail."
);
self.status.replay_status = ReplayStatus::Active;
Expand All @@ -324,8 +324,8 @@ where
// that the log has reverted backwards.
if last_applied_lsn >= current_tail.offset() {
error!(
?last_applied_lsn,
log_tail_lsn = ?current_tail.offset(),
%last_applied_lsn,
log_tail_lsn = %current_tail.offset(),
"Processor has applied log entries beyond the log tail. This indicates data-loss in the log!"
);
// todo: declare unhealthy state to cluster controller, or raise a flare.
Expand Down

0 comments on commit b30370b

Please sign in to comment.