Skip to content

Commit

Permalink
Extracted per process operations into a function to better handle unk…
Browse files Browse the repository at this point in the history
…nown errors

Signed-off-by: Caleb Metz <[email protected]>
  • Loading branch information
cmetz100 committed Dec 19, 2024
1 parent 0148bf5 commit 394095d
Showing 1 changed file with 147 additions and 130 deletions.
277 changes: 147 additions & 130 deletions lading/src/observer/linux/procfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,158 +118,175 @@ impl Sampler {
}
}
}
// Update the process_info map to only hold processes seen by the current poll call.
self.process_info.retain(|pid, _| pids.contains(pid));

let pid = process.pid();
if let Err(e) = self.handle_process(process, &mut aggr).await {
warn!("Encountered uncaught error when handling `/proc/{pid}/`: {e}");
}
}
// END pid loop

// `/proc/{pid}/status`
let status = match process.status() {
Ok(status) => status,
Err(e) => {
warn!("Couldn't read status: {:?}", e);
// The pid may have exited since we scanned it or we may not
// have sufficient permission.
continue;
}
};
if status.tgid != pid {
// This is a thread, not a process and we do not wish to scan it.
continue;
gauge!("total_rss_bytes").set(aggr.rss as f64);
gauge!("total_pss_bytes").set(aggr.pss as f64);

Ok(())
}

async fn handle_process(

Check failure on line 137 in lading/src/observer/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest)

this function has too many lines (121/100)
&mut self,
process: Process,
aggr: &mut memory::smaps_rollup::Aggregator,
) -> Result<(), Error> {
let pid = process.pid();

// `/proc/{pid}/status`
let status = match process.status() {
Ok(status) => status,
Err(e) => {
warn!("Couldn't read status: {:?}", e);
// The pid may have exited since we scanned it or we may not
// have sufficient permission.
return Ok(());
}
};
if status.tgid != pid {
// This is a thread, not a process and we do not wish to scan it.
return Ok(());
}

// If we haven't seen this process before, initialize its ProcessInfo.
match self.process_info.entry(pid) {
Entry::Occupied(_) => { /* Already initialized */ }
Entry::Vacant(entry) => {
let exe = proc_exe(pid).await?;
let comm = proc_comm(pid).await?;
let cmdline = proc_cmdline(pid).await?;
let pid_s = format!("{pid}");
let stat_sampler = stat::Sampler::new();
// If we haven't seen this process before, initialize its ProcessInfo.
match self.process_info.entry(pid) {
Entry::Occupied(_) => { /* Already initialized */ }
Entry::Vacant(entry) => {
let exe = proc_exe(pid).await?;
let comm = proc_comm(pid).await?;
let cmdline = proc_cmdline(pid).await?;
let pid_s = format!("{pid}");
let stat_sampler = stat::Sampler::new();

entry.insert(ProcessInfo {
cmdline,
exe,
comm,
pid_s,
stat_sampler,
});
}
entry.insert(ProcessInfo {
cmdline,
exe,
comm,
pid_s,
stat_sampler,
});
}
}

// SAFETY: We've just inserted this pid into the map.
let pinfo = self
.process_info
.get_mut(&pid)
.expect("catastrophic programming error");
// SAFETY: We've just inserted this pid into the map.
let pinfo = self
.process_info
.get_mut(&pid)
.expect("catastrophic programming error");

let labels: [(&'static str, String); 4] = [
("pid", pinfo.pid_s.clone()),
("exe", pinfo.exe.clone()),
("cmdline", pinfo.cmdline.clone()),
("comm", pinfo.comm.clone()),
];
let labels: [(&'static str, String); 4] = [
("pid", pinfo.pid_s.clone()),
("exe", pinfo.exe.clone()),
("cmdline", pinfo.cmdline.clone()),
("comm", pinfo.comm.clone()),
];

// `/proc/{pid}/status`
report_status_field!(status, labels, vmrss);
report_status_field!(status, labels, rssanon);
report_status_field!(status, labels, rssfile);
report_status_field!(status, labels, rssshmem);
report_status_field!(status, labels, vmdata);
report_status_field!(status, labels, vmstk);
report_status_field!(status, labels, vmexe);
report_status_field!(status, labels, vmlib);
// `/proc/{pid}/status`
report_status_field!(status, labels, vmrss);
report_status_field!(status, labels, rssanon);
report_status_field!(status, labels, rssfile);
report_status_field!(status, labels, rssshmem);
report_status_field!(status, labels, vmdata);
report_status_field!(status, labels, vmstk);
report_status_field!(status, labels, vmexe);
report_status_field!(status, labels, vmlib);

let uptime = uptime::poll().await?;
let uptime = uptime::poll().await?;

// `/proc/{pid}/stat`, most especially per-process CPU data.
if let Err(e) = pinfo.stat_sampler.poll(pid, uptime, &labels).await {
// We don't want to bail out entirely if we can't read stats
// which will happen if we don't have permissions or, more
// likely, the process has exited.
warn!("Couldn't process `/proc/{pid}/stat`: {e}");
continue;
}
// `/proc/{pid}/stat`, most especially per-process CPU data.
if let Err(e) = pinfo.stat_sampler.poll(pid, uptime, &labels).await {
// We don't want to bail out entirely if we can't read stats
// which will happen if we don't have permissions or, more
// likely, the process has exited.
warn!("Couldn't process `/proc/{pid}/stat`: {e}");
return Ok(());
}

// `/proc/{pid}/smaps`
match memory::smaps::Regions::from_pid(pid) {
Ok(memory_regions) => {
for (pathname, measures) in memory_regions.aggregate_by_pathname() {
let labels: [(&'static str, String); 5] = [
("pid", pinfo.pid_s.clone()),
("exe", pinfo.exe.clone()),
("cmdline", pinfo.cmdline.clone()),
("comm", pinfo.comm.clone()),
("pathname", pathname),
];
gauge!("smaps.rss.by_pathname", &labels).set(measures.rss as f64);
gauge!("smaps.pss.by_pathname", &labels).set(measures.pss as f64);
gauge!("smaps.swap.by_pathname", &labels).set(measures.swap as f64);
gauge!("smaps.size.by_pathname", &labels).set(measures.size as f64);
// `/proc/{pid}/smaps`
match memory::smaps::Regions::from_pid(pid) {
Ok(memory_regions) => {
for (pathname, measures) in memory_regions.aggregate_by_pathname() {
let labels: [(&'static str, String); 5] = [
("pid", pinfo.pid_s.clone()),
("exe", pinfo.exe.clone()),
("cmdline", pinfo.cmdline.clone()),
("comm", pinfo.comm.clone()),
("pathname", pathname),
];
gauge!("smaps.rss.by_pathname", &labels).set(measures.rss as f64);
gauge!("smaps.pss.by_pathname", &labels).set(measures.pss as f64);
gauge!("smaps.swap.by_pathname", &labels).set(measures.swap as f64);
gauge!("smaps.size.by_pathname", &labels).set(measures.size as f64);

if let Some(m) = measures.private_clean {
gauge!("smaps.private_clean.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.private_dirty {
gauge!("smaps.private_dirty.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shared_clean {
gauge!("smaps.shared_clean.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shared_dirty {
gauge!("smaps.shared_dirty.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.referenced {
gauge!("smaps.referenced.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.anonymous {
gauge!("smaps.anonymous.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.lazy_free {
gauge!("smaps.lazy_free.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.anon_huge_pages {
gauge!("smaps.anon_huge_pages.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shmem_pmd_mapped {
gauge!("smaps.shmem_pmd_mapped.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shared_hugetlb {
gauge!("smaps.shared_hugetlb.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.private_hugetlb {
gauge!("smaps.private_hugetlb.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.file_pmd_mapped {
gauge!("smaps.file_pmd_mapped.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.locked {
gauge!("smaps.locked.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.swap_pss {
gauge!("smaps.swap_pss.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.private_clean {
gauge!("smaps.private_clean.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.private_dirty {
gauge!("smaps.private_dirty.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shared_clean {
gauge!("smaps.shared_clean.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shared_dirty {
gauge!("smaps.shared_dirty.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.referenced {
gauge!("smaps.referenced.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.anonymous {
gauge!("smaps.anonymous.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.lazy_free {
gauge!("smaps.lazy_free.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.anon_huge_pages {
gauge!("smaps.anon_huge_pages.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shmem_pmd_mapped {
gauge!("smaps.shmem_pmd_mapped.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shared_hugetlb {
gauge!("smaps.shared_hugetlb.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.private_hugetlb {
gauge!("smaps.private_hugetlb.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.file_pmd_mapped {
gauge!("smaps.file_pmd_mapped.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.locked {
gauge!("smaps.locked.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.swap_pss {
gauge!("smaps.swap_pss.by_pathname", &labels).set(m as f64);
}
}
Err(err) => {
// We don't want to bail out entirely if we can't read stats
// which will happen if we don't have permissions or, more
// likely, the process has exited.
warn!("Couldn't process `/proc/{pid}/smaps`: {err}");
}
}

// `/proc/{pid}/smaps_rollup`
if let Err(err) = memory::smaps_rollup::poll(pid, &labels, &mut aggr).await {
// We don't want to bail out entirely if we can't read smap rollup
Err(err) => {
// We don't want to bail out entirely if we can't read stats
// which will happen if we don't have permissions or, more
// likely, the process has exited.
warn!("Couldn't process `/proc/{pid}/smaps_rollup`: {err}");
warn!("Couldn't process `/proc/{pid}/smaps`: {err}");
return Ok(());
}
}
// END pid loop

gauge!("total_rss_bytes").set(aggr.rss as f64);
gauge!("total_pss_bytes").set(aggr.pss as f64);
// `/proc/{pid}/smaps_rollup`
if let Err(err) = memory::smaps_rollup::poll(pid, &labels, aggr).await {
// We don't want to bail out entirely if we can't read smap rollup
// which will happen if we don't have permissions or, more
// likely, the process has exited.
warn!("Couldn't process `/proc/{pid}/smaps_rollup`: {err}");
return Ok(());
}

Ok(())
}
Expand Down

0 comments on commit 394095d

Please sign in to comment.