Skip to content

Commit

Permalink
Merge branch 'main' into available-parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
partim committed Jan 24, 2024
2 parents ea94916 + b92ee96 commit a09771e
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 36 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ on:
push:
branches:
- main
- series-*
pull_request:
branches:
- main
- series-*
jobs:
test:
name: test
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/pkg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ name: Packaging

on:
push:
branches:
- main
tags:
- v*

Expand Down
8 changes: 4 additions & 4 deletions doc/manual/source/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
sphinx==4.3.2
sphinx==7.2
sphinx-version-warning==1.1.2
sphinx-tabs==3.2.0
sphinx-copybutton==0.4.0
sphinx-tabs==3.4.4
sphinx-copybutton==0.5.2
sphinx-notfound-page
sphinx_rtd_theme
sphinx_rtd_theme==2.0
toml
1 change: 1 addition & 0 deletions src/collector/rrdp/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ impl Collector {
repos: DumpRegistry,
states: HashMap<uri::Https, RepositoryState>,
) -> Result<(), Fatal> {
fatal::create_dir_all(repos.base_dir())?;
let path = repos.base_dir().join("repositories.json");
if let Err(err) = fs::write(
&path,
Expand Down
43 changes: 35 additions & 8 deletions src/collector/rsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use futures::{FutureExt, TryFutureExt};
use futures::future::Either;
use log::{debug, error, info, warn};
use rpki::uri;
use tokio::io::AsyncBufReadExt;
use tokio::process::Command as AsyncCommand;
use crate::config::Config;
use crate::error::{Failed, Fatal};
Expand Down Expand Up @@ -525,9 +526,10 @@ impl RsyncCommand {
command.kill_on_drop(true);
let mut child = command.spawn()?;
let stdout_pipe = child.stdout.take();
let stderr_pipe = child.stderr.take();
let stderr_pipe = child.stderr.take().map(
tokio::io::BufReader::new
);
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let res = tokio::try_join!(
match self.timeout {
None => Either::Left(child.wait().map(Ok)),
Expand All @@ -552,7 +554,10 @@ impl RsyncCommand {
},
async {
if let Some(mut pipe) = stderr_pipe {
tokio::io::copy(&mut pipe, &mut stderr).await?;
let mut line = Vec::new();
while pipe.read_until(b'\n', &mut line).await? != 0 {
Self::log_err_line(source, &mut line);
}
}
Ok(())
},
Expand All @@ -576,11 +581,6 @@ impl RsyncCommand {
Err(err)
}
};
if !stderr.is_empty() {
String::from_utf8_lossy(&stderr).lines().for_each(|l| {
warn!("{}: {}", source, l);
})
}
if !stdout.is_empty() {
String::from_utf8_lossy(&stdout).lines().for_each(|l| {
info!("{}: {}", source, l);
Expand Down Expand Up @@ -694,6 +694,33 @@ impl RsyncCommand {
}
Ok(destination)
}

/// Logs the line in the buffer.
fn log_err_line(
source: &Module,
line: &mut Vec<u8>,
) {
let mut len = line.len();
if len > 0 && line[len - 1] == b'\n' {
len -= 1;
}

// On Windows, we may now have a \r at the end.
#[cfg(windows)]
if len > 0 && line[len - 1] == b'\r' {
len -= 1;
}

if len > 0 {
warn!(
"{}: {}",
source,
String::from_utf8_lossy(&line[..len])
);
}

line.clear();
}
}


Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ impl Config {
///
/// Uses default values for everything except for the cache directory
/// which needs to be provided.
fn default_with_paths(cache_dir: PathBuf) -> Self {
pub fn default_with_paths(cache_dir: PathBuf) -> Self {
Config {
cache_dir,
no_rir_tals: false,
Expand Down
57 changes: 42 additions & 15 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,9 +818,12 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
manifest_bytes.clone(), self.run.validation.strict
) {
Ok(manifest) => manifest,
Err(err) => {
Err(_) => {
self.metrics.invalid_manifests += 1;
warn!("{}: {}.", self.cert.rpki_manifest(), err);
warn!(
"{}: failed to decode manifest.",
self.cert.rpki_manifest()
);
return Ok(None)
}
};
Expand Down Expand Up @@ -945,9 +948,9 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
// Decode and validate the CRL.
let mut crl = match Crl::decode(crl_bytes.clone()) {
Ok(crl) => crl,
Err(err) => {
Err(_) => {
self.metrics.invalid_crls += 1;
warn!("{}: {}.", crl_uri, err);
warn!("{}: failed to decode CRL.", crl_uri);
return Ok(None)
}
};
Expand Down Expand Up @@ -1077,9 +1080,12 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
stored_manifest.manifest().clone(), self.run.validation.strict
) {
Ok(manifest) => manifest,
Err(err) => {
warn!("{}: {}.", self.cert.rpki_manifest(), err);
Err(_) => {
self.metrics.invalid_manifests += 1;
warn!(
"{}: failed to decode manifest.",
self.cert.rpki_manifest(),
);
return Err(Failed);
}
};
Expand Down Expand Up @@ -1125,10 +1131,10 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
// Decode and validate the CRL.
let mut crl = match Crl::decode(stored_manifest.crl().clone()) {
Ok(crl) => crl,
Err(err) => {
warn!("{}: {}.", crl_uri, err);
Err(_) => {
self.metrics.invalid_manifests += 1;
self.metrics.invalid_crls += 1;
warn!("{}: failed to decode CRL.", crl_uri);
return Err(Failed)
}
};
Expand Down Expand Up @@ -1271,9 +1277,9 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
) -> Result<(), Failed> {
let cert = match Cert::decode(content) {
Ok(cert) => cert,
Err(err) => {
warn!("{}: {}.", uri, err);
Err(_) => {
manifest.metrics.invalid_certs += 1;
warn!("{}: failed to decode certificate.", uri);
return Ok(())
}
};
Expand Down Expand Up @@ -1386,9 +1392,9 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
content, self.run.validation.strict
) {
Ok(roa) => roa,
Err(err) => {
warn!("{}: {}.", uri, err);
Err(_) => {
manifest.metrics.invalid_roas += 1;
warn!("{}: failed to decode ROA.", uri);
return Ok(())
}
};
Expand Down Expand Up @@ -1421,8 +1427,8 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
) {
Ok(aspa) => aspa,
Err(err) => {
warn!("{}: {}.", uri, err);
manifest.metrics.invalid_aspas += 1;
warn!("{}: failed to decode ASPA.", uri);
return Ok(())
}
};
Expand Down Expand Up @@ -1453,9 +1459,9 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
content, self.run.validation.strict
) {
Ok(obj) => obj,
Err(err) => {
warn!("{}: {}.", uri, err);
Err(_) => {
manifest.metrics.invalid_gbrs += 1;
warn!("{}: failed to decode GBR.", uri);
return Ok(())
}
};
Expand Down Expand Up @@ -2049,3 +2055,24 @@ pub trait ProcessPubPoint: Sized + Send + Sync {
}
}


//============ Tests =========================================================

#[cfg(test)]
mod test {
use super::*;

#[test]
fn dump_empty_cache() {
let _ = crate::process::Process::init(); // May be inited already.
let src = tempfile::tempdir().unwrap();
let target = tempfile::tempdir().unwrap();
let target = target.path().join("dump");
let mut config = Config::default_with_paths(src.path().into());
config.rsync_command = "echo".into();
config.rsync_args = Some(vec!["some".into()]);
let engine = Engine::new(&config, true).unwrap();
engine.dump(&target).unwrap();
}
}

9 changes: 7 additions & 2 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl Store {
let target = target_base.join("ta");
debug!("Dumping trust anchor certificates to {}", target.display());
fatal::remove_dir_all(&target)?;
fatal::copy_dir_all(&source, &target)?;
fatal::copy_existing_dir_all(&source, &target)?;
debug!("Trust anchor certificate dump complete.");
Ok(())
}
Expand All @@ -195,7 +195,11 @@ impl Store {
path: &Path,
repos: &mut DumpRegistry,
) -> Result<(), Failed> {
for entry in fatal::read_dir(path)? {
let dir = match fatal::read_existing_dir(path)? {
Some(dir) => dir,
None => return Ok(())
};
for entry in dir {
let entry = entry?;
if entry.is_dir() {
self.dump_tree(entry.path(), repos)?;
Expand Down Expand Up @@ -302,6 +306,7 @@ impl Store {
&self,
repos: DumpRegistry,
) -> Result<(), Failed> {
fatal::create_dir_all(repos.base_dir())?;
let path = repos.base_dir().join("repositories.json");
fatal::write_file(
&path,
Expand Down
16 changes: 12 additions & 4 deletions src/utils/fatal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,21 @@ pub fn write_file(path: &Path, contents: &[u8]) -> Result<(), Failed> {

//------------ copy_dir_all --------------------------------------------------

/// Copies the content of a directory.
/// Copies the content of a directory if it exists.
///
/// Creates the target directory with `create_dir_all`. Errors out if
/// anything goes wrong.
pub fn copy_dir_all(source: &Path, target: &Path) -> Result<(), Failed> {
///
/// If the source directory does not exist, does nothing.
pub fn copy_existing_dir_all(
source: &Path, target: &Path
) -> Result<(), Failed> {
let source_dir = match read_existing_dir(source)? {
Some(entry) => entry,
None => return Ok(()),
};
create_dir_all(target)?;
for entry in read_dir(source)? {
for entry in source_dir {
let entry = entry?;
if entry.is_file() {
if let Err(err) = fs::copy(
Expand All @@ -338,7 +346,7 @@ pub fn copy_dir_all(source: &Path, target: &Path) -> Result<(), Failed> {
}
}
else if entry.is_dir() {
copy_dir_all(
copy_existing_dir_all(
entry.path(),
&target.join(entry.file_name())
)?;
Expand Down

0 comments on commit a09771e

Please sign in to comment.