diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index d1deb623ea..cbdf11683d 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -167,32 +167,59 @@ impl Manifest { } } -/// The encoder of the page file in the disk cache. +/// The writer of the page file in the disk cache. /// /// Following the payload, a footer [`PageFileEncoder::MAGIC_FOOTER`] is /// appended. -struct PageFileEncoder { - payload: Bytes, +struct PageFileWriter { + output: String, } -impl PageFileEncoder { +impl PageFileWriter { const MAGIC_FOOTER: [u8; 8] = [0, 0, 0, 0, b'c', b'e', b'r', b'e']; - async fn encode_and_persist(&self, writer: &mut W, name: &str) -> Result<()> - where - W: AsyncWrite + std::marker::Unpin, - { + fn new(output: String) -> Self { + Self { output } + } + + fn tmp_file(input: &str) -> String { + format!("{}.tmp", input) + } + + async fn write_inner(&self, tmp_file: &str, bytes: Bytes) -> Result<()> { + let mut writer = File::create(&tmp_file) + .await + .context(Io { file: tmp_file })?; writer - .write_all(&self.payload[..]) + .write_all(&bytes) .await - .context(Io { file: name })?; + .context(Io { file: tmp_file })?; writer .write_all(&Self::MAGIC_FOOTER) .await - .context(Io { file: name })?; + .context(Io { file: tmp_file })?; + + writer.flush().await.context(Io { file: tmp_file })?; + + tokio::fs::rename(tmp_file, &self.output) + .await + .context(Io { file: &self.output })?; + + Ok(()) + } - writer.flush().await.context(Io { file: name })?; + // When write bytes to file, the cache lock is released, so when one thread is + // reading, another thread may update it, so we write to tmp file first, + // then rename to expected filename to avoid other threads see partial + // content. + async fn write_and_flush(self, bytes: Bytes) -> Result<()> { + let tmp_file = Self::tmp_file(&self.output); + let write_result = self.write_inner(&tmp_file, bytes).await; + if write_result.is_err() { + // we don't care this result. + _ = tokio::fs::remove_file(&tmp_file).await; + } Ok(()) } @@ -262,7 +289,7 @@ impl DiskCache { async fn insert_data(&self, filename: String, value: Bytes) { let page_meta = { - let file_size = PageFileEncoder::encoded_size(value.len()); + let file_size = PageFileWriter::encoded_size(value.len()); PageMeta { file_size } }; let evicted_file = self.insert_page_meta(filename.clone(), page_meta); @@ -357,34 +384,14 @@ impl DiskCache { } async fn persist_bytes(&self, filename: &str, payload: Bytes) -> Result<()> { - // When write payload to file, the cache lock is released, so when one thread is - // reading, another thread may update it, so we write to tmp file first, - // then rename to expected filename to avoid other threads see partial - // content. - let tmp_filename = format!("{filename}.tmp"); - let tmp_filepath = std::path::Path::new(&self.root_dir) - .join(&tmp_filename) - .into_os_string() - .into_string() - .unwrap(); - - let mut file = File::create(&tmp_filepath).await.context(Io { - file: &tmp_filepath, - })?; - - let encoding = PageFileEncoder { payload }; - encoding - .encode_and_persist(&mut file, &tmp_filename) - .await?; - let dest_filepath = std::path::Path::new(&self.root_dir) .join(filename) .into_os_string() .into_string() .unwrap(); - tokio::fs::rename(tmp_filepath, dest_filepath) - .await - .context(Io { file: filename })?; + + let writer = PageFileWriter::new(dest_filepath); + writer.write_and_flush(payload).await?; Ok(()) } @@ -399,7 +406,7 @@ impl DiskCache { range: &Range, expect_file_size: usize, ) -> std::io::Result { - if PageFileEncoder::encoded_size(range.len()) > expect_file_size { + if PageFileWriter::encoded_size(range.len()) > expect_file_size { return Ok(ReadBytesResult::OutOfRange); }