Skip to content

Commit

Permalink
Auto merge of #716 - Mark-Simulacrum:tmpfile-backing, r=Mark-Simulacrum
Browse files Browse the repository at this point in the history
Back largest archive by temporary files

This hopefully resolves the OOMs we see in production each time we try to write a report by keeping the archive on-disk rather than in-memory while we write it. Mid-to-long term I'd like to avoid the mmap and temporary file and instead stream the writing to S3, but this is what I had time to throw together today. I think it should work, though I'm a bit uncertain about the disk backing for temporary files.
  • Loading branch information
bors committed Dec 14, 2023
2 parents 434a549 + ff33518 commit 76cd44d
Show file tree
Hide file tree
Showing 9 changed files with 451 additions and 156 deletions.
440 changes: 335 additions & 105 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ prometheus = "0.13.3"
cargo_metadata = "0.18.1"
indexmap = { version = "2.0.2", features = ["serde"] }
tokio = "1.24"
aws-types = "0.56.1"
aws-credential-types = "0.56.1"
aws-smithy-async = "0.56.1"
aws-sdk-s3 = "0.34"
aws-sdk-s3 = "1.7"
aws-config = { version = "1", features = ["behavior-version-latest"] }
thiserror = "1.0.38"
nix = { version = "0.27.1", features = ["mman"] }

[dev-dependencies]
assert_cmd = "2.0.4"
Expand Down
85 changes: 79 additions & 6 deletions src/report/archives.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use std::fs::File;
use std::num::NonZeroUsize;
use std::ptr::NonNull;

use crate::config::Config;
use crate::crates::Crate;
use crate::experiments::Experiment;
Expand All @@ -7,6 +11,52 @@ use crate::results::{EncodedLog, EncodingType, ReadResults};
use flate2::{write::GzEncoder, Compression};
use indexmap::IndexMap;
use tar::{Builder as TarBuilder, Header as TarHeader};
use tempfile::tempfile;

#[cfg(unix)]
struct TempfileBackedBuffer {
_file: File,
mmap: NonNull<[u8]>,
}

#[cfg(unix)]
impl TempfileBackedBuffer {
fn new(file: File) -> Fallible<TempfileBackedBuffer> {
let len = file.metadata()?.len().try_into().unwrap();
unsafe {
let base = nix::sys::mman::mmap(
None,
NonZeroUsize::new(len).unwrap(),
nix::sys::mman::ProtFlags::PROT_READ,
nix::sys::mman::MapFlags::MAP_PRIVATE,
Some(&file),
0,
)?;
let Some(base) = NonNull::new(base as *mut u8) else {
panic!("Failed to map file");
};
Ok(TempfileBackedBuffer {
_file: file,
mmap: NonNull::slice_from_raw_parts(base, len),
})
}
}

fn buffer(&self) -> &[u8] {
unsafe { self.mmap.as_ref() }
}
}

#[cfg(unix)]
impl Drop for TempfileBackedBuffer {
fn drop(&mut self) {
unsafe {
if let Err(e) = nix::sys::mman::munmap(self.mmap.as_ptr() as *mut _, self.mmap.len()) {
eprintln!("Failed to unmap temporary file: {:?}", e);
}
}
}
}

#[derive(Serialize)]
pub struct Archive {
Expand Down Expand Up @@ -92,6 +142,7 @@ fn iterate<'a, DB: ReadResults + 'a>(
})
}

#[allow(unused_mut)]
fn write_all_archive<DB: ReadResults, W: ReportWriter>(
db: &DB,
ex: &Experiment,
Expand All @@ -100,18 +151,37 @@ fn write_all_archive<DB: ReadResults, W: ReportWriter>(
config: &Config,
) -> Fallible<Archive> {
for i in 1..=RETRIES {
let mut all = TarBuilder::new(GzEncoder::new(Vec::new(), Compression::default()));
// We write this large-ish tarball into a tempfile, which moves the I/O to disk operations
// rather than keeping it in memory. This avoids complicating the code by doing incremental
// writes to S3 (requiring buffer management etc) while avoiding keeping the blob entirely
// in memory.
let backing = tempfile()?;
let mut all = TarBuilder::new(GzEncoder::new(backing, Compression::default()));
for entry in iterate(db, ex, crates, config) {
let entry = entry?;
let mut header = entry.header();
all.append_data(&mut header, &entry.path, &entry.log_bytes[..])?;
}

let data = all.into_inner()?.finish()?;
let len = data.len();
let mut data = all.into_inner()?.finish()?;
let mut buffer;
let view;
#[cfg(unix)]
{
buffer = TempfileBackedBuffer::new(data)?;
view = buffer.buffer();
}
#[cfg(not(unix))]
{
use std::io::{Read, Seek};
data.rewind()?;
buffer = Vec::new();
data.read_to_end(&mut buffer)?;
view = &buffer[..];
}
match dest.write_bytes(
"logs-archives/all.tar.gz",
data,
view,
&"application/gzip".parse().unwrap(),
EncodingType::Plain,
) {
Expand All @@ -123,7 +193,10 @@ fn write_all_archive<DB: ReadResults, W: ReportWriter>(
std::thread::sleep(std::time::Duration::from_secs(2));
warn!(
"retry ({}/{}) writing logs-archives/all.tar.gz ({} bytes) (error: {:?})",
i, RETRIES, len, e,
i,
RETRIES,
view.len(),
e,
);
continue;
}
Expand Down Expand Up @@ -164,7 +237,7 @@ pub fn write_logs_archives<DB: ReadResults, W: ReportWriter>(
let data = archive.into_inner()?.finish()?;
dest.write_bytes(
format!("logs-archives/{comparison}.tar.gz"),
data,
&data,
&"application/gzip".parse().unwrap(),
EncodingType::Plain,
)?;
Expand Down
4 changes: 2 additions & 2 deletions src/report/html.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,13 @@ pub fn write_html_report<W: ReportWriter>(
info!("copying static assets");
dest.write_bytes(
"report.js",
js_in.content()?.into_owned(),
&js_in.content()?,
js_in.mime(),
EncodingType::Plain,
)?;
dest.write_bytes(
"report.css",
css_in.content()?.into_owned(),
&css_in.content()?,
css_in.mime(),
EncodingType::Plain,
)?;
Expand Down
10 changes: 5 additions & 5 deletions src/report/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ fn write_logs<DB: ReadResults, W: ReportWriter>(
s.spawn(move || {
while let Ok((log_path, data, encoding)) = rx.recv() {
if let Err(e) =
dest.write_bytes(log_path, data, &mime::TEXT_PLAIN_UTF_8, encoding)
dest.write_bytes(log_path, &data, &mime::TEXT_PLAIN_UTF_8, encoding)
{
errors.lock().unwrap().push(e);
}
Expand Down Expand Up @@ -548,7 +548,7 @@ pub trait ReportWriter: Send + Sync {
fn write_bytes<P: AsRef<Path>>(
&self,
path: P,
b: Vec<u8>,
b: &[u8],
mime: &Mime,
encoding_type: EncodingType,
) -> Fallible<()>;
Expand All @@ -574,7 +574,7 @@ impl ReportWriter for FileWriter {
fn write_bytes<P: AsRef<Path>>(
&self,
path: P,
b: Vec<u8>,
b: &[u8],
_: &Mime,
_: EncodingType,
) -> Fallible<()> {
Expand Down Expand Up @@ -619,14 +619,14 @@ impl ReportWriter for DummyWriter {
fn write_bytes<P: AsRef<Path>>(
&self,
path: P,
b: Vec<u8>,
b: &[u8],
mime: &Mime,
_: EncodingType,
) -> Fallible<()> {
self.results
.lock()
.unwrap()
.insert((path.as_ref().to_path_buf(), mime.clone()), b);
.insert((path.as_ref().to_path_buf(), mime.clone()), b.to_vec());
Ok(())
}

Expand Down
16 changes: 9 additions & 7 deletions src/report/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ impl ReportWriter for S3Writer {
fn write_bytes<P: AsRef<Path>>(
&self,
path: P,
s: Vec<u8>,
body: &[u8],
mime: &Mime,
encoding_type: EncodingType,
) -> Fallible<()> {
// At least 50 MB, then use a multipart upload...
if s.len() >= 50 * 1024 * 1024 {
if body.len() >= 50 * 1024 * 1024 {
let mut request = self
.client
.create_multipart_upload()
Expand All @@ -108,12 +108,12 @@ impl ReportWriter for S3Writer {
};

let chunk_size = 20 * 1024 * 1024;
let bytes = bytes::Bytes::from(s);
let mut part = 1;
let mut start = 0;
let mut parts = aws_sdk_s3::types::CompletedMultipartUpload::builder();
while start < bytes.len() {
let chunk = bytes.slice(start..std::cmp::min(start + chunk_size, bytes.len()));
while start < body.len() {
let chunk = &body[start..std::cmp::min(start + chunk_size, body.len())];
let chunk = bytes::Bytes::copy_from_slice(chunk);

let request = self
.client
Expand Down Expand Up @@ -160,7 +160,9 @@ impl ReportWriter for S3Writer {
let mut request = self
.client
.put_object()
.body(aws_sdk_s3::primitives::ByteStream::from(s))
.body(aws_sdk_s3::primitives::ByteStream::from(
bytes::Bytes::copy_from_slice(body),
))
.acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
.key(format!(
"{}/{}",
Expand All @@ -185,7 +187,7 @@ impl ReportWriter for S3Writer {
}

fn write_string<P: AsRef<Path>>(&self, path: P, s: Cow<str>, mime: &Mime) -> Fallible<()> {
self.write_bytes(path, s.into_owned().into_bytes(), mime, EncodingType::Plain)
self.write_bytes(path, s.as_bytes(), mime, EncodingType::Plain)
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/results/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl DummyDB {
pub fn add_dummy_log(&mut self, ex: &Experiment, krate: Crate, tc: Toolchain, log: EncodedLog) {
self.experiments
.entry(ex.name.to_string())
.or_insert_with(DummyData::default)
.or_default()
.logs
.insert((krate, tc), log);
}
Expand All @@ -41,7 +41,7 @@ impl DummyDB {
) {
self.experiments
.entry(ex.name.to_string())
.or_insert_with(DummyData::default)
.or_default()
.results
.insert((krate, tc), res);
}
Expand Down
23 changes: 9 additions & 14 deletions src/server/reports.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use aws_sdk_s3::config::retry::RetryConfig;

use crate::experiments::{Experiment, Status};
use crate::prelude::*;
use crate::report::{self, Comparison, TestResults};
Expand All @@ -17,24 +15,21 @@ use super::tokens::BucketRegion;
const AUTOMATIC_THREAD_WAKEUP: u64 = 600;

fn generate_report(data: &Data, ex: &Experiment, results: &DatabaseDB) -> Fallible<TestResults> {
let mut config = aws_types::SdkConfig::builder();
let mut config = aws_config::from_env();
match &data.tokens.reports_bucket.region {
BucketRegion::S3 { region } => {
config.set_region(Some(aws_types::region::Region::new(region.to_owned())));
config = config.region(aws_sdk_s3::config::Region::new(region.to_owned()));
}
BucketRegion::Custom { url } => {
config.set_region(Some(aws_types::region::Region::from_static("us-east-1")));
config.set_endpoint_url(Some(url.clone()));
config = config.region(aws_sdk_s3::config::Region::from_static("us-east-1"));
config = config.endpoint_url(url.clone());
}
}
config.set_credentials_provider(Some(data.tokens.reports_bucket.to_aws_credentials()));
// https://github.com/awslabs/aws-sdk-rust/issues/586 -- without this, the
// SDK will just completely not retry requests.
config.set_sleep_impl(Some(aws_sdk_s3::config::SharedAsyncSleep::new(
aws_smithy_async::rt::sleep::TokioSleep::new(),
)));
config.set_retry_config(Some(RetryConfig::standard()));
let config = config.build();
config = config.credentials_provider(data.tokens.reports_bucket.to_aws_credentials());
let config = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
.block_on(config.load());
let client = aws_sdk_s3::Client::new(&config);
let writer = report::S3Writer::create(
client,
Expand Down
18 changes: 7 additions & 11 deletions src/server/tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,13 @@ pub struct ReportsBucket {
}

impl ReportsBucket {
pub(crate) fn to_aws_credentials(
&self,
) -> aws_credential_types::provider::SharedCredentialsProvider {
aws_credential_types::provider::SharedCredentialsProvider::new(
aws_sdk_s3::config::Credentials::new(
self.access_key.clone(),
self.secret_key.clone(),
None,
None,
"crater-credentials",
),
pub(crate) fn to_aws_credentials(&self) -> aws_sdk_s3::config::Credentials {
aws_sdk_s3::config::Credentials::new(
self.access_key.clone(),
self.secret_key.clone(),
None,
None,
"crater-credentials",
)
}
}
Expand Down

0 comments on commit 76cd44d

Please sign in to comment.