From 16b11d8f77ae69aca3e43c862b07dfff6c265ac1 Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Thu, 30 May 2024 22:13:09 +0100 Subject: [PATCH] refactor: use iterators with compaction First pass at using iterators as the zero-cost abstraction in the compaction process, this is largely for learning purposes and to see whether the code becomes more readable with this style --- src/store.rs | 83 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 49 insertions(+), 34 deletions(-) diff --git a/src/store.rs b/src/store.rs index a351dde..d3637da 100644 --- a/src/store.rs +++ b/src/store.rs @@ -2,6 +2,7 @@ use crate::engine::KvsEngine; use crate::proto::GetResponse; use crate::{KvStoreError, Result}; use crate::{LOG_PREFIX, MAX_LOG_FILE_SIZE}; +use anyhow::anyhow; use dashmap::DashMap; use glob::glob; use serde::{Deserialize, Serialize}; @@ -393,39 +394,50 @@ impl KvStore { let active_files = Arc::new(DashMap::new()); let mut compact_pos = 0; - for entry in self.keydir.as_ref() { - let mut file = file_handles - .entry(entry.file_id.clone()) - .or_insert_with(|| { - BufReader::new( - std::fs::File::open(&entry.file_id) - .expect("Log file should exist for compaction"), - ) - }); + let compact_results: Vec> = self + .keydir + .as_ref() + .iter() + .map(|entry| { + let mut file = file_handles + .entry(entry.file_id.clone()) + .or_insert_with(|| { + BufReader::new( + std::fs::File::open(&entry.file_id) + .expect("Log file should exist for compaction"), + ) + }); + + file.seek(SeekFrom::Start(entry.offset as u64))?; + let log_entry: LogEntry = bincode::deserialize_from(&mut *file)?; + + // Implicitly remove tombstone values by not adding them into the new file + // and removing them from the keydir. + if log_entry.operation == Operation::Remove { + self.keydir.remove(entry.key()); + return Ok(()); + } - file.seek(SeekFrom::Start(entry.offset as u64))?; - let log_entry: LogEntry = bincode::deserialize_from(&mut *file)?; + active_files + .entry(entry.file_id.clone()) + .and_modify(|f: &mut u64| *f += 1) + .or_default(); + let data = bincode::serialize(&log_entry)?; + let written = compaction_buf.write(&data)?; - // Implicitly remove tombstone values by not adding them into the new file - // and removing them from the keydir. - if log_entry.operation == Operation::Remove { - self.keydir.remove(entry.key()); - continue; - } - - active_files - .entry(entry.file_id.clone()) - .and_modify(|f: &mut u64| *f += 1) - .or_default(); - let data = bincode::serialize(&log_entry)?; - let written = compaction_buf.write(&data)?; + entries.push((log_entry, compact_pos)); + compact_pos += written as u64; + Ok(()) + }) + .collect(); - entries.push((log_entry, compact_pos)); - compact_pos += written as u64; + if compact_results.iter().any(Result::is_err) { + return Err(anyhow!("Compaction error!").into()); } + compaction_buf.flush()?; - for (log_entry, offset) in entries { + entries.into_iter().for_each(|(log_entry, offset)| { self.keydir.entry(log_entry.key).and_modify(|e| { active_files.entry(e.file_id.clone()).and_modify(|f| { if *f > 0 { @@ -436,15 +448,18 @@ impl KvStore { e.file_id = PathBuf::from(&compacted_filename); e.offset = offset as usize; }); - } + }); self.set_active_log_handle()?; - for file in active_files.as_ref() { - if *file.value() == 0 { - debug!(f = ?file.key(), "Removing file which has no entries"); - std::fs::remove_file(file.key())?; - } - } + active_files + .iter() + .filter(|file| *file.value() == 0) + .for_each(|file| { + debug!(file = ?file.key(), "Removing file which has no entries"); + if let Err(e) = std::fs::remove_file(file.key()) { + tracing::error!(file = ?file.key(), error=?e, "unable to remove file, will remain"); + } + }); debug!("Compaction complete, latest entries are available in {compacted_filename}"); Ok(())