Skip to content

Commit

Permalink
Abstract out BGP session stream type
Browse files Browse the repository at this point in the history
  • Loading branch information
hack3ric committed Nov 7, 2024
1 parent 66689cf commit 463f960
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 29 deletions.
45 changes: 22 additions & 23 deletions src/bgp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::net::{IpAddr, SocketAddr};
use std::rc::Rc;
use strum::EnumDiscriminants;
use thiserror::Error;
use tokio::io::{AsyncWrite, BufReader};
use tokio::io::{AsyncRead, AsyncWrite, BufReader};
use tokio::net::TcpStream;
use tokio::select;
use tokio::time::{interval, Duration, Instant, Interval};
Expand All @@ -51,13 +51,13 @@ use State::*;
/// - RFC 8955: Dissemination of Flow Specification Rules
/// - RFC 8956: Dissemination of Flow Specification Rules for IPv6
#[derive(Debug)]
pub struct Session {
pub struct Session<S: AsyncRead + AsyncWrite + Unpin> {
config: RunArgs,
state: State,
state: State<S>,
routes: Routes,
}

impl Session {
impl<S: AsyncRead + AsyncWrite + Unpin> Session<S> {
pub fn new(c: RunArgs) -> Result<Self> {
let nft = (!c.dry_run)
.then(|| Nft::new(c.table.clone(), c.chain.clone(), c.hooked, c.priority))
Expand All @@ -83,14 +83,14 @@ impl Session {
pub fn config(&self) -> &RunArgs {
&self.config
}
pub fn state(&self) -> &State {
pub fn state(&self) -> &State<S> {
&self.state
}
pub fn routes(&self) -> &Routes {
&self.routes
}

pub async fn accept(&mut self, mut stream: TcpStream, addr: SocketAddr) -> Result<()> {
pub async fn accept(&mut self, mut stream: S, addr: SocketAddr) -> Result<()> {
let ip = addr.ip();
if !self.config.allowed_ips.iter().any(|x| x.contains(ip)) {
return Err(Error::UnacceptableAddr(ip));
Expand All @@ -103,7 +103,7 @@ impl Session {
self.config.router_id.to_bits(),
);
open.send(&mut stream).await?;
replace_with_or_abort(&mut self.state, |_| OpenSent { stream: BufReader::new(stream) });
replace_with_or_abort(&mut self.state, |_| OpenSent { stream });
info!("accepting BGP connection from {addr}");
Ok(())
}
Expand Down Expand Up @@ -212,30 +212,29 @@ impl Session {
}

#[derive(Debug)]
pub enum State {
pub enum State<S: AsyncRead + AsyncWrite + Unpin> {
Idle,
Connect, // never used in passive mode
Active,
OpenSent {
stream: BufReader<TcpStream>,
},
OpenConfirm {
stream: BufReader<TcpStream>,
remote_open: OpenMessage<'static>,
timers: Option<Timers>,
},
Established {
stream: BufReader<TcpStream>,
remote_open: OpenMessage<'static>,
timers: Option<Timers>,
},
OpenSent { stream: S },
OpenConfirm { stream: S, remote_open: OpenMessage<'static>, timers: Option<Timers> },
Established { stream: S, remote_open: OpenMessage<'static>, timers: Option<Timers> },
}

impl State {
impl<S: AsyncRead + AsyncWrite + Unpin> State<S> {
pub fn kind(&self) -> StateKind {
self.view().into()
match self {
Idle => StateKind::Idle,
Connect => StateKind::Connect,
Active => StateKind::Active,
OpenSent { .. } => StateKind::OpenSent,
OpenConfirm { .. } => StateKind::OpenConfirm,
Established { .. } => StateKind::Established,
}
}
}

impl State<BufReader<TcpStream>> {
pub fn view(&self) -> StateView {
match self {
Idle => StateView::Idle,
Expand Down
6 changes: 3 additions & 3 deletions src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::ffi::CStr;
use std::io;
use std::mem::MaybeUninit;
use std::path::{Path, PathBuf};
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::{UnixListener, UnixStream};
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::net::{TcpStream, UnixListener, UnixStream};

pub struct IpcServer {
path: PathBuf,
Expand All @@ -31,7 +31,7 @@ impl Drop for IpcServer {
}
}

impl Session {
impl Session<BufReader<TcpStream>> {
pub async fn write_states(&self, writer: &mut (impl AsyncWrite + Unpin)) -> anyhow::Result<()> {
writer.write_all(&postcard::to_allocvec_cobs(self.config())?).await?;
writer.write_all(&postcard::to_allocvec_cobs(&self.state().view())?).await?;
Expand Down
7 changes: 4 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use itertools::Itertools;
use log::{error, info, warn, Level, LevelFilter, Record};
use std::fs::{create_dir_all, File};
use std::io::ErrorKind::UnexpectedEof;
use std::io::{self, BufRead, BufReader, Write};
use std::io::{self, BufRead, Write};
use std::net::Ipv4Addr;
use std::path::Path;
use std::process::ExitCode;
use tokio::io::BufReader;
use tokio::net::TcpListener;
use tokio::signal::unix::{signal, SignalKind};
use tokio::{pin, select};
Expand All @@ -34,7 +35,7 @@ async fn run(mut args: RunArgs, sock_path: &str) -> anyhow::Result<ExitCode> {
args = RunArgs::parse_from(
Some(Ok(format!("{cmd} run")))
.into_iter()
.chain(BufReader::new(File::open(file)?).lines())
.chain(std::io::BufReader::new(File::open(file)?).lines())
.filter(|x| !x.as_ref().is_ok_and(|x| x.is_empty() || x.chars().next().unwrap() == '#'))
.map_ok(|x| "--".to_string() + &x)
.collect::<Result<Vec<_>, _>>()?,
Expand Down Expand Up @@ -72,7 +73,7 @@ async fn run(mut args: RunArgs, sock_path: &str) -> anyhow::Result<ExitCode> {
result = listener.accept(), if matches!(bgp.state(), bgp::State::Active) => {
let (stream, mut addr) = result.context("failed to accept TCP connection")?;
addr.set_ip(addr.ip().to_canonical());
bgp.accept(stream, addr).await.context("failed to accept BGP connection")?;
bgp.accept(BufReader::new(stream), addr).await.context("failed to accept BGP connection")?;
}
result = bgp.process() => match result {
Ok(()) => {}
Expand Down

0 comments on commit 463f960

Please sign in to comment.