diff --git a/Cargo.lock b/Cargo.lock index 5069f7ae..5b1c6916 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,18 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -26,6 +38,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "anstream" version = "0.6.13" @@ -302,6 +320,21 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +[[package]] +name = "cassowary" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53" + +[[package]] +name = "castaway" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a17ed5635fc8536268e5d4de1e22e81ac34419e5f052d4d51f4e01dcc263fcc" +dependencies = [ + "rustversion", +] + [[package]] name = "cc" version = "1.0.90" @@ -421,6 +454,19 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "compact_str" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "ryu", + "static_assertions", +] + [[package]] name = "console" version = "0.15.8" @@ -491,6 +537,7 @@ checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" dependencies = [ "bitflags 2.5.0", "crossterm_winapi", + "futures-core", "libc", "mio", "parking_lot", @@ -999,6 +1046,10 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "heapless" @@ -1255,6 +1306,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "indoc" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5" + [[package]] name = "instant" version = "0.1.12" @@ -1367,6 +1424,7 @@ dependencies = [ "log", "prost-reflect", "prost-types", + "ratatui", "serde_json", "serde_yaml", "termtree", @@ -1638,6 +1696,15 @@ dependencies = [ "libc", ] +[[package]] +name = "lru" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "lzma-sys" version = "0.1.20" @@ -2193,6 +2260,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "ratatui" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcb12f8fbf6c62614b0d56eb352af54f6a22410c3b079eb53ee93c7b97dd31d8" +dependencies = [ + "bitflags 2.5.0", + "cassowary", + "compact_str", + "crossterm", + "indoc", + "itertools", + "lru", + "paste", + "stability", + "strum", + "unicode-segmentation", + "unicode-width", +] + [[package]] name = "rayon" version = "1.10.0" @@ -2626,12 +2713,28 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "stability" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd1b177894da2a2d9120208c3386066af06a488255caabc5de8ddca22dbc3ce" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.10.0" @@ -2649,6 +2752,9 @@ name = "strum" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29" +dependencies = [ + "strum_macros", +] [[package]] name = "strum_macros" @@ -3040,6 +3146,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" + [[package]] name = "unicode-width" version = "0.1.11" @@ -3430,6 +3542,26 @@ dependencies = [ "lzma-sys", ] +[[package]] +name = "zerocopy" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.57", +] + [[package]] name = "zeroize" version = "1.7.0" diff --git a/Cargo.toml b/Cargo.toml index c75d3afb..c81bca5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ prost-build = "0.12.4" prost-reflect-build = "0.13.0" prost-types = "0.12.4" rand = "0.8.5" +ratatui = "0.26.1" redb = "2.0.0" rtnetlink = "0.14.1" scopeguard = "1.2.0" diff --git a/crates/ctl/Cargo.toml b/crates/ctl/Cargo.toml index fd7533d8..50898b46 100644 --- a/crates/ctl/Cargo.toml +++ b/crates/ctl/Cargo.toml @@ -13,7 +13,7 @@ anyhow = { workspace = true } async-stream = { workspace = true } clap = { workspace = true } comfy-table = { workspace = true } -crossterm = { workspace = true } +crossterm = { workspace = true, features = ["event-stream"] } ctrlc = { workspace = true, features = ["termination"] } env_logger = { workspace = true } fancy-duration = { workspace = true } @@ -23,6 +23,7 @@ krata = { path = "../krata", version = "^0.0.8" } log = { workspace = true } prost-reflect = { workspace = true, features = ["serde"] } prost-types = { workspace = true } +ratatui = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } termtree = { workspace = true } diff --git a/crates/ctl/src/cli/mod.rs b/crates/ctl/src/cli/mod.rs index 1926567f..aeaaeaf4 100644 --- a/crates/ctl/src/cli/mod.rs +++ b/crates/ctl/src/cli/mod.rs @@ -6,6 +6,7 @@ pub mod list; pub mod logs; pub mod metrics; pub mod resolve; +pub mod top; pub mod watch; use anyhow::{anyhow, Result}; @@ -20,7 +21,7 @@ use tonic::{transport::Channel, Request}; use self::{ attach::AttachCommand, destroy::DestroyCommand, idm_snoop::IdmSnoopCommand, launch::LauchCommand, list::ListCommand, logs::LogsCommand, metrics::MetricsCommand, - resolve::ResolveCommand, watch::WatchCommand, + resolve::ResolveCommand, top::TopCommand, watch::WatchCommand, }; #[derive(Parser)] @@ -52,6 +53,7 @@ pub enum Commands { Resolve(ResolveCommand), Metrics(MetricsCommand), IdmSnoop(IdmSnoopCommand), + Top(TopCommand), } impl ControlCommand { @@ -95,6 +97,10 @@ impl ControlCommand { Commands::IdmSnoop(snoop) => { snoop.run(client, events).await?; } + + Commands::Top(top) => { + top.run(client, events).await?; + } } Ok(()) } diff --git a/crates/ctl/src/cli/top.rs b/crates/ctl/src/cli/top.rs new file mode 100644 index 00000000..e5d87836 --- /dev/null +++ b/crates/ctl/src/cli/top.rs @@ -0,0 +1,215 @@ +use anyhow::Result; +use clap::Parser; +use krata::{events::EventStream, v1::control::control_service_client::ControlServiceClient}; +use std::{ + io::{self, stdout, Stdout}, + time::Duration, +}; +use tokio::select; +use tokio_stream::StreamExt; +use tonic::transport::Channel; + +use crossterm::{ + event::{Event, KeyCode, KeyEvent, KeyEventKind}, + execute, + terminal::*, +}; +use ratatui::{ + prelude::*, + symbols::border, + widgets::{ + block::{Position, Title}, + Block, Borders, Row, Table, TableState, + }, +}; + +use crate::{ + format::guest_status_text, + metrics::{ + lookup_metric_value, MultiMetricCollector, MultiMetricCollectorHandle, MultiMetricState, + }, +}; + +#[derive(Parser)] +#[command(about = "Dashboard for running guests")] +pub struct TopCommand {} + +pub type Tui = Terminal>; + +impl TopCommand { + pub async fn run( + self, + client: ControlServiceClient, + events: EventStream, + ) -> Result<()> { + let collector = MultiMetricCollector::new(client, events, Duration::from_millis(200))?; + let collector = collector.launch().await?; + let mut tui = TopCommand::init()?; + let mut app = TopApp { + metrics: MultiMetricState { guests: vec![] }, + exit: false, + table: TableState::new(), + }; + app.run(collector, &mut tui).await?; + TopCommand::restore()?; + Ok(()) + } + + pub fn init() -> io::Result { + execute!(stdout(), EnterAlternateScreen)?; + enable_raw_mode()?; + Terminal::new(CrosstermBackend::new(stdout())) + } + + pub fn restore() -> io::Result<()> { + execute!(stdout(), LeaveAlternateScreen)?; + disable_raw_mode()?; + Ok(()) + } +} + +pub struct TopApp { + table: TableState, + metrics: MultiMetricState, + exit: bool, +} + +impl TopApp { + pub async fn run( + &mut self, + mut collector: MultiMetricCollectorHandle, + terminal: &mut Tui, + ) -> Result<()> { + let mut events = crossterm::event::EventStream::new(); + + while !self.exit { + terminal.draw(|frame| self.render_frame(frame))?; + + select! { + x = collector.receiver.recv() => match x { + Some(state) => { + self.metrics = state; + }, + + None => { + break; + } + }, + + x = events.next() => match x { + Some(event) => { + let event = event?; + self.handle_event(event)?; + }, + + None => { + break; + } + } + }; + } + Ok(()) + } + + fn render_frame(&mut self, frame: &mut Frame) { + frame.render_widget(self, frame.size()); + } + + fn handle_event(&mut self, event: Event) -> io::Result<()> { + match event { + Event::Key(key_event) if key_event.kind == KeyEventKind::Press => { + self.handle_key_event(key_event) + } + _ => {} + }; + Ok(()) + } + + fn exit(&mut self) { + self.exit = true; + } + + fn handle_key_event(&mut self, key_event: KeyEvent) { + if let KeyCode::Char('q') = key_event.code { + self.exit() + } + } +} + +impl Widget for &mut TopApp { + fn render(self, area: Rect, buf: &mut Buffer) { + let title = Title::from(" krata hypervisor ".bold()); + let instructions = Title::from(vec![" Quit ".into(), " ".blue().bold()]); + let block = Block::default() + .title(title.alignment(Alignment::Center)) + .title( + instructions + .alignment(Alignment::Center) + .position(Position::Bottom), + ) + .borders(Borders::ALL) + .border_set(border::THICK); + + let mut rows = vec![]; + + for ms in &self.metrics.guests { + let Some(ref spec) = ms.guest.spec else { + continue; + }; + + let Some(ref state) = ms.guest.state else { + continue; + }; + + let memory_total = ms + .root + .as_ref() + .and_then(|root| lookup_metric_value(root, "system/memory/total")); + let memory_used = ms + .root + .as_ref() + .and_then(|root| lookup_metric_value(root, "system/memory/used")); + let memory_free = ms + .root + .as_ref() + .and_then(|root| lookup_metric_value(root, "system/memory/free")); + + let row = Row::new(vec![ + spec.name.clone(), + ms.guest.id.clone(), + guest_status_text(state.status()), + memory_total.unwrap_or_default(), + memory_used.unwrap_or_default(), + memory_free.unwrap_or_default(), + ]); + rows.push(row); + } + + let widths = [ + Constraint::Min(8), + Constraint::Min(8), + Constraint::Min(8), + Constraint::Min(8), + Constraint::Min(8), + Constraint::Min(8), + ]; + + let table = Table::new(rows, widths) + .header( + Row::new(vec![ + "name", + "id", + "status", + "total memory", + "used memory", + "free memory", + ]) + .style(Style::new().bold()) + .bottom_margin(1), + ) + .column_spacing(1) + .block(block); + + StatefulWidget::render(table, area, buf, &mut self.table); + } +} diff --git a/crates/ctl/src/format.rs b/crates/ctl/src/format.rs index 0fa0851a..9f7a2f4c 100644 --- a/crates/ctl/src/format.rs +++ b/crates/ctl/src/format.rs @@ -121,7 +121,7 @@ fn metrics_value_numeric(value: Value) -> f64 { string.parse::().ok().unwrap_or(f64::NAN) } -fn metrics_value_pretty(value: Value, format: GuestMetricFormat) -> String { +pub fn metrics_value_pretty(value: Value, format: GuestMetricFormat) -> String { match format { GuestMetricFormat::Bytes => human_bytes(metrics_value_numeric(value)), GuestMetricFormat::Integer => (metrics_value_numeric(value) as u64).to_string(), diff --git a/crates/ctl/src/lib.rs b/crates/ctl/src/lib.rs index 1dde7a03..c3aeefea 100644 --- a/crates/ctl/src/lib.rs +++ b/crates/ctl/src/lib.rs @@ -1,3 +1,4 @@ pub mod cli; pub mod console; pub mod format; +pub mod metrics; diff --git a/crates/ctl/src/metrics.rs b/crates/ctl/src/metrics.rs new file mode 100644 index 00000000..1cef438e --- /dev/null +++ b/crates/ctl/src/metrics.rs @@ -0,0 +1,159 @@ +use anyhow::Result; +use krata::{ + events::EventStream, + v1::{ + common::{Guest, GuestMetricNode, GuestStatus}, + control::{ + control_service_client::ControlServiceClient, watch_events_reply::Event, + ListGuestsRequest, ReadGuestMetricsRequest, + }, + }, +}; +use log::error; +use std::time::Duration; +use tokio::{ + select, + sync::mpsc::{channel, Receiver, Sender}, + task::JoinHandle, + time::{sleep, timeout}, +}; +use tonic::transport::Channel; + +use crate::format::metrics_value_pretty; + +pub struct MetricState { + pub guest: Guest, + pub root: Option, +} + +pub struct MultiMetricState { + pub guests: Vec, +} + +pub struct MultiMetricCollector { + client: ControlServiceClient, + events: EventStream, + period: Duration, +} + +pub struct MultiMetricCollectorHandle { + pub receiver: Receiver, + task: JoinHandle<()>, +} + +impl Drop for MultiMetricCollectorHandle { + fn drop(&mut self) { + self.task.abort(); + } +} + +impl MultiMetricCollector { + pub fn new( + client: ControlServiceClient, + events: EventStream, + period: Duration, + ) -> Result { + Ok(MultiMetricCollector { + client, + events, + period, + }) + } + + pub async fn launch(mut self) -> Result { + let (sender, receiver) = channel::(100); + let task = tokio::task::spawn(async move { + if let Err(error) = self.process(sender).await { + error!("failed to process multi metric collector: {}", error); + } + }); + Ok(MultiMetricCollectorHandle { receiver, task }) + } + + pub async fn process(&mut self, sender: Sender) -> Result<()> { + let mut events = self.events.subscribe(); + let mut guests: Vec = self + .client + .list_guests(ListGuestsRequest {}) + .await? + .into_inner() + .guests; + loop { + let collect = select! { + x = events.recv() => match x { + Ok(event) => { + if let Event::GuestChanged(changed) = event { + let Some(guest) = changed.guest else { + continue; + }; + let Some(ref state) = guest.state else { + continue; + }; + guests.retain(|x| x.id != guest.id); + if state.status() != GuestStatus::Destroying { + guests.push(guest); + } + } + false + }, + + Err(error) => { + return Err(error.into()); + } + }, + + _ = sleep(self.period) => { + true + } + }; + + if !collect { + continue; + } + + let mut metrics = Vec::new(); + for guest in &guests { + let Some(ref state) = guest.state else { + continue; + }; + + if state.status() != GuestStatus::Started { + continue; + } + + let root = timeout( + Duration::from_secs(5), + self.client.read_guest_metrics(ReadGuestMetricsRequest { + guest_id: guest.id.clone(), + }), + ) + .await + .ok() + .and_then(|x| x.ok()) + .map(|x| x.into_inner()) + .and_then(|x| x.root); + metrics.push(MetricState { + guest: guest.clone(), + root, + }); + } + sender.send(MultiMetricState { guests: metrics }).await?; + } + } +} + +pub fn lookup<'a>(node: &'a GuestMetricNode, path: &str) -> Option<&'a GuestMetricNode> { + let Some((what, b)) = path.split_once('/') else { + return node.children.iter().find(|x| x.name == path); + }; + let next = node.children.iter().find(|x| x.name == what)?; + return lookup(next, b); +} + +pub fn lookup_metric_value(node: &GuestMetricNode, path: &str) -> Option { + lookup(node, path).and_then(|x| { + x.value + .as_ref() + .map(|v| metrics_value_pretty(v.clone(), x.format())) + }) +} diff --git a/crates/daemon/src/idm.rs b/crates/daemon/src/idm.rs index 7525defd..ee788b9b 100644 --- a/crates/daemon/src/idm.rs +++ b/crates/daemon/src/idm.rs @@ -120,16 +120,22 @@ impl DaemonIdm { if let Some(data) = data { let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new()); buffer.extend_from_slice(&data); - if buffer.len() < 4 { + if buffer.len() < 6 { continue; } - let size = (buffer[0] as u32 | (buffer[1] as u32) << 8 | (buffer[2] as u32) << 16 | (buffer[3] as u32) << 24) as usize; - let needed = size + 4; + + if buffer[0] != 0xff || buffer[1] != 0xff { + buffer.clear(); + continue; + } + + let size = (buffer[2] as u32 | (buffer[3] as u32) << 8 | (buffer[4] as u32) << 16 | (buffer[5] as u32) << 24) as usize; + let needed = size + 6; if buffer.len() < needed { continue; } let mut packet = buffer.split_to(needed); - packet.advance(4); + packet.advance(6); match IdmPacket::decode(packet) { Ok(packet) => { let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?; @@ -159,12 +165,14 @@ impl DaemonIdm { x = self.tx_receiver.recv() => match x { Some((domid, packet)) => { let data = packet.encode_to_vec(); - let mut buffer = vec![0u8; 4]; + let mut buffer = vec![0u8; 6]; let length = data.len() as u32; - buffer[0] = length as u8; - buffer[1] = (length << 8) as u8; - buffer[2] = (length << 16) as u8; - buffer[3] = (length << 24) as u8; + buffer[0] = 0xff; + buffer[1] = 0xff; + buffer[2] = length as u8; + buffer[3] = (length << 8) as u8; + buffer[4] = (length << 16) as u8; + buffer[5] = (length << 24) as u8; buffer.extend_from_slice(&data); self.tx_raw_sender.send((domid, buffer)).await?; let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: 0, to: domid, packet }); diff --git a/crates/krata/src/idm/client.rs b/crates/krata/src/idm/client.rs index c9ac7141..3c0181b0 100644 --- a/crates/krata/src/idm/client.rs +++ b/crates/krata/src/idm/client.rs @@ -69,6 +69,14 @@ impl IdmBackend for IdmFileBackend { async fn recv(&mut self) -> Result { let mut fd = self.read_fd.lock().await; let mut guard = fd.readable_mut().await?; + let b1 = guard.get_inner_mut().read_u8().await?; + if b1 != 0xff { + return Ok(IdmPacket::default()); + } + let b2 = guard.get_inner_mut().read_u8().await?; + if b2 != 0xff { + return Ok(IdmPacket::default()); + } let size = guard.get_inner_mut().read_u32_le().await?; if size == 0 { return Ok(IdmPacket::default()); @@ -84,6 +92,7 @@ impl IdmBackend for IdmFileBackend { async fn send(&mut self, packet: IdmPacket) -> Result<()> { let mut file = self.write.lock().await; let data = packet.encode_to_vec(); + file.write_all(&[0xff, 0xff]).await?; file.write_u32_le(data.len() as u32).await?; file.write_all(&data).await?; Ok(()) diff --git a/crates/runtime/src/channel.rs b/crates/runtime/src/channel.rs index 86ec0c7b..7aeaf360 100644 --- a/crates/runtime/src/channel.rs +++ b/crates/runtime/src/channel.rs @@ -448,6 +448,10 @@ impl KrataChannelBackendProcessor { error!("channel for domid {} has an invalid input space of {}", self.domid, space); } let free = XenConsoleInterface::INPUT_SIZE.wrapping_sub(space); + if free == 0 { + sleep(Duration::from_micros(100)).await; + continue; + } let want = data.len().min(free); let buffer = &data[index..want]; for b in buffer {