Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use iterators in compaction #24

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 49 additions & 34 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::engine::KvsEngine;
use crate::proto::GetResponse;
use crate::{KvStoreError, Result};
use crate::{LOG_PREFIX, MAX_LOG_FILE_SIZE};
use anyhow::anyhow;
use dashmap::DashMap;
use glob::glob;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -393,39 +394,50 @@ impl KvStore {
let active_files = Arc::new(DashMap::new());

let mut compact_pos = 0;
for entry in self.keydir.as_ref() {
let mut file = file_handles
.entry(entry.file_id.clone())
.or_insert_with(|| {
BufReader::new(
std::fs::File::open(&entry.file_id)
.expect("Log file should exist for compaction"),
)
});
let compact_results: Vec<Result<()>> = self
.keydir
.as_ref()
.iter()
.map(|entry| {
let mut file = file_handles
.entry(entry.file_id.clone())
.or_insert_with(|| {
BufReader::new(
std::fs::File::open(&entry.file_id)
.expect("Log file should exist for compaction"),
)
});

file.seek(SeekFrom::Start(entry.offset as u64))?;
let log_entry: LogEntry = bincode::deserialize_from(&mut *file)?;

// Implicitly remove tombstone values by not adding them into the new file
// and removing them from the keydir.
if log_entry.operation == Operation::Remove {
self.keydir.remove(entry.key());
return Ok(());
}

file.seek(SeekFrom::Start(entry.offset as u64))?;
let log_entry: LogEntry = bincode::deserialize_from(&mut *file)?;
active_files
.entry(entry.file_id.clone())
.and_modify(|f: &mut u64| *f += 1)
.or_default();
let data = bincode::serialize(&log_entry)?;
let written = compaction_buf.write(&data)?;

// Implicitly remove tombstone values by not adding them into the new file
// and removing them from the keydir.
if log_entry.operation == Operation::Remove {
self.keydir.remove(entry.key());
continue;
}

active_files
.entry(entry.file_id.clone())
.and_modify(|f: &mut u64| *f += 1)
.or_default();
let data = bincode::serialize(&log_entry)?;
let written = compaction_buf.write(&data)?;
entries.push((log_entry, compact_pos));
compact_pos += written as u64;
Ok(())
})
.collect();

entries.push((log_entry, compact_pos));
compact_pos += written as u64;
if compact_results.iter().any(Result::is_err) {
return Err(anyhow!("Compaction error!").into());
}

compaction_buf.flush()?;

for (log_entry, offset) in entries {
entries.into_iter().for_each(|(log_entry, offset)| {
self.keydir.entry(log_entry.key).and_modify(|e| {
active_files.entry(e.file_id.clone()).and_modify(|f| {
if *f > 0 {
Expand All @@ -436,15 +448,18 @@ impl KvStore {
e.file_id = PathBuf::from(&compacted_filename);
e.offset = offset as usize;
});
}
});

self.set_active_log_handle()?;
for file in active_files.as_ref() {
if *file.value() == 0 {
debug!(f = ?file.key(), "Removing file which has no entries");
std::fs::remove_file(file.key())?;
}
}
active_files
.iter()
.filter(|file| *file.value() == 0)
.for_each(|file| {
debug!(file = ?file.key(), "Removing file which has no entries");
if let Err(e) = std::fs::remove_file(file.key()) {
tracing::error!(file = ?file.key(), error=?e, "unable to remove file, will remain");
}
});

debug!("Compaction complete, latest entries are available in {compacted_filename}");
Ok(())
Expand Down
Loading