Skip to content

Commit

Permalink
chore: adding debug log for events size (#185)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Oct 29, 2024
1 parent 4719277 commit 91cc1b8
Show file tree
Hide file tree
Showing 15 changed files with 111 additions and 56 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integrationos-archiver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"
anyhow.workspace = true
bson.workspace = true
chrono.workspace = true
dotenvy.workspace = true
envconfig.workspace = true
futures.workspace = true
google-cloud-storage = "0.22.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub enum Mode {
pub struct ArchiverConfig {
#[envconfig(nested = true)]
pub db_config: DatabaseConfig,
#[envconfig(from = "EVENT_COLLECTION_NAME", default = "clients")]
#[envconfig(from = "EVENT_COLLECTION_NAME", default = "external-events")]
pub event_collection_name: String,
#[envconfig(from = "GS_STORAGE_BUCKET", default = "integrationos-zsk")]
pub gs_storage_bucket: String,
Expand All @@ -32,7 +32,7 @@ pub struct ArchiverConfig {
pub processing_chunk_timeout_secs: u64,
#[envconfig(from = "MIN_DATE_DAYS", default = "30")]
pub min_date_days: i64,
#[envconfig(from = "CHUNK_SIZE_MINUTES", default = "20")]
#[envconfig(from = "CHUNK_SIZE_MINUTES", default = "5")]
pub chunk_size_minutes: i64,
#[envconfig(from = "CONCURRENT_CHUNKS", default = "10")]
pub concurrent_chunks: usize,
Expand Down
1 change: 1 addition & 0 deletions integrationos-archiver/src/domain/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod config;
9 changes: 6 additions & 3 deletions integrationos-archiver/src/event/completed.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use super::EventMetadata;
use chrono::{DateTime, Utc};
use integrationos_domain::Id;
use integrationos_domain::{prefix::IdPrefix, Id};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Completed {
#[serde(rename = "_id")]
id: Id,
reference: Id,
path: String,
completed_at: DateTime<Utc>,
start_time: i64,
Expand All @@ -16,8 +18,9 @@ pub struct Completed {
impl Completed {
pub fn new(path: String, id: Id, start_time: DateTime<Utc>, end_time: DateTime<Utc>) -> Self {
Self {
id: Id::now(IdPrefix::Archive),
path,
id,
reference: id,
completed_at: Utc::now(),
start_time: start_time.timestamp_millis(),
end_time: end_time.timestamp_millis(),
Expand All @@ -27,6 +30,6 @@ impl Completed {

impl EventMetadata for Completed {
fn reference(&self) -> Id {
self.id
self.reference
}
}
9 changes: 6 additions & 3 deletions integrationos-archiver/src/event/dumped.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use super::EventMetadata;
use chrono::{DateTime, Utc};
use integrationos_domain::Id;
use integrationos_domain::{prefix::IdPrefix, Id};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Dumped {
#[serde(rename = "_id")]
id: Id,
reference: Id,
dumped_at: DateTime<Utc>,
start_time: i64,
end_time: i64,
Expand All @@ -15,7 +17,8 @@ pub struct Dumped {
impl Dumped {
pub fn new(id: Id, start_time: DateTime<Utc>, end_time: DateTime<Utc>) -> Self {
Self {
id,
id: Id::now(IdPrefix::Archive),
reference: id,
dumped_at: Utc::now(),
start_time: start_time.timestamp_millis(),
end_time: end_time.timestamp_millis(),
Expand All @@ -25,6 +28,6 @@ impl Dumped {

impl EventMetadata for Dumped {
fn reference(&self) -> Id {
self.id
self.reference
}
}
9 changes: 6 additions & 3 deletions integrationos-archiver/src/event/failed.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use super::EventMetadata;
use chrono::{DateTime, Utc};
use integrationos_domain::Id;
use integrationos_domain::{prefix::IdPrefix, Id};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Failed {
#[serde(rename = "_id")]
id: Id,
reference: Id,
failed_at: DateTime<Utc>,
start_time: i64,
end_time: i64,
Expand All @@ -16,7 +18,8 @@ pub struct Failed {
impl Failed {
pub fn new(reason: String, id: Id, start_time: DateTime<Utc>, end_time: DateTime<Utc>) -> Self {
Self {
id,
id: Id::now(IdPrefix::Archive),
reference: id,
reason,
start_time: start_time.timestamp_millis(),
end_time: end_time.timestamp_millis(),
Expand All @@ -27,6 +30,6 @@ impl Failed {

impl EventMetadata for Failed {
fn reference(&self) -> Id {
self.id
self.reference
}
}
16 changes: 9 additions & 7 deletions integrationos-archiver/src/event/finished.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
use super::EventMetadata;
use anyhow::Result;
use chrono::{DateTime, Utc};
use integrationos_domain::Id;
use integrationos_domain::{prefix::IdPrefix, Id};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Finished {
#[serde(rename = "_id")]
id: Id,
reference: Id,
finished_at: DateTime<Utc>,
}

impl Finished {
pub fn new(id: Id) -> Result<Self> {
Ok(Self {
id,
pub fn new(id: Id) -> Self {
Self {
id: Id::now(IdPrefix::Archive),
reference: id,
finished_at: Utc::now(),
})
}
}
}

impl EventMetadata for Finished {
fn reference(&self) -> Id {
self.id
self.reference
}
}
6 changes: 6 additions & 0 deletions integrationos-archiver/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@ pub trait EventMetadata {
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "type")]
pub enum Event {
/// Archive process started event. Emitted when the archive process is started.
Started(Started),
/// Archive process dumped event. Emitted when mongodump finishes dumping the database.
Dumped(Dumped),
/// Archive process failed event. Emitted when the archive process fails in some way.
Failed(Failed),
/// Archive process uploaded event. Emitted after the selected storage provider uploads any file (by default, the archive file and metadata file).
Uploaded(Uploaded),
/// Archive process completed event. Emitted when all dumped files are uploaded.
Completed(Completed),
/// Archive process finished event. Emitted when the archive process is finished.
Finished(Finished),
}
4 changes: 4 additions & 0 deletions integrationos-archiver/src/event/started.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ impl Started {
})
}

pub fn started_at(&self) -> DateTime<Utc> {
self.started_at
}

pub fn collection(&self) -> &Store {
&self.collection
}
Expand Down
9 changes: 6 additions & 3 deletions integrationos-archiver/src/event/uploaded.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use super::EventMetadata;
use chrono::{DateTime, Utc};
use integrationos_domain::Id;
use integrationos_domain::{prefix::IdPrefix, Id};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Uploaded {
#[serde(rename = "_id")]
id: Id,
reference: Id,
uploaded_at: DateTime<Utc>,
start_time: i64,
end_time: i64,
Expand All @@ -15,7 +17,8 @@ pub struct Uploaded {
impl Uploaded {
pub fn new(id: Id, start_time: DateTime<Utc>, end_time: DateTime<Utc>) -> Self {
Self {
id,
id: Id::now(IdPrefix::Archive),
reference: id,
uploaded_at: Utc::now(),
start_time: start_time.timestamp_millis(),
end_time: end_time.timestamp_millis(),
Expand All @@ -25,6 +28,6 @@ impl Uploaded {

impl EventMetadata for Uploaded {
fn reference(&self) -> Id {
self.id
self.reference
}
}
59 changes: 43 additions & 16 deletions integrationos-archiver/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
mod config;
mod domain;
mod event;
mod storage;

use crate::domain::config::{ArchiverConfig, Mode};
use crate::event::finished::Finished;
use anyhow::{anyhow, Result};
use bson::{doc, Document};
use chrono::offset::LocalResult;
use chrono::{DateTime, Duration as CDuration, TimeZone, Utc};
use config::{ArchiverConfig, Mode};
use dotenvy::dotenv;
use envconfig::Envconfig;
use event::completed::Completed;
use event::dumped::Dumped;
Expand All @@ -17,7 +18,7 @@ use event::uploaded::Uploaded;
use event::{Event, EventMetadata};
use futures::future::ready;
use futures::stream::{self, Stream};
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use integrationos_domain::telemetry::{get_subscriber, init_subscriber};
use integrationos_domain::{MongoStore, Store, Unit};
use mongodb::Client;
Expand All @@ -30,6 +31,7 @@ use tempfile::TempDir;

#[tokio::main]
async fn main() -> Result<Unit> {
dotenv().ok();
let config = Arc::new(ArchiverConfig::init_from_env()?);
let storage = Arc::new(match config.storage_provider {
StorageProvider::GoogleCloud => GoogleCloudStorage::new(&config).await?,
Expand All @@ -53,17 +55,28 @@ async fn main() -> Result<Unit> {
.await?;

loop {
let _ = match config.mode {
let res = match config.mode {
Mode::Dump => dump(&config, &archives, &started, &storage, &target_store, false).await,
Mode::DumpDelete => {
dump(&config, &archives, &started, &storage, &target_store, true).await
}
Mode::NoOp => Ok(()),
}
.map_err(|e| {
.inspect_err(|e| {
tracing::error!("Error in archiver: {e}");
});

if let Err(e) = res {
archives
.create_one(&Event::Failed(Failed::new(
e.to_string(),
started.reference(),
started.started_at(),
Utc::now(),
)))
.await?;
}

tracing::info!("Sleeping for {} seconds", config.sleep_after_finish);
tokio::time::sleep(Duration::from_secs(config.sleep_after_finish)).await;
}
Expand Down Expand Up @@ -100,7 +113,18 @@ async fn dump(
Some(document) => document
.get_i64("createdAt")
.map_err(|e| anyhow!("Failed to get createdAt from document: {e}"))?,
None => return Err(anyhow!("No events found in collection")),
None => {
tracing::info!(
"No events found in collection {}",
target_store.collection.name()
);
// create event Finished
archives
.create_one(&Event::Finished(Finished::new(started.reference())))
.await?;

return Ok(());
}
};

let start = match Utc.timestamp_millis_opt(start) {
Expand Down Expand Up @@ -145,15 +169,6 @@ async fn dump(
tracing::info!("Archive saved successfully, saved {} events", count);
}
Err(e) => {
archives
.create_one(&Event::Failed(Failed::new(
e.to_string(),
started.reference(),
start_time,
end_time,
)))
.await?;

tracing::error!("Failed to save archive: {e}");

return Err(e);
Expand Down Expand Up @@ -195,7 +210,7 @@ async fn dump(
}

archives
.create_one(&Event::Finished(Finished::new(started.reference())?))
.create_one(&Event::Finished(Finished::new(started.reference())))
.await?;

Ok(())
Expand Down Expand Up @@ -235,6 +250,18 @@ async fn save(
return Ok(0);
}

// Run this only on debug mode
if cfg!(debug_assertions) {
let events = target_store.collection.find(filter.clone(), None).await?;

let events = events.try_collect::<Vec<_>>().await?;

let mem_size = std::mem::size_of::<Vec<Document>>()
+ events.capacity() * std::mem::size_of::<Document>();

tracing::info!("Total size of all the events is {}", mem_size);
}

let command = Command::new("mongodump")
.arg("--uri")
.arg(&config.db_config.event_db_url)
Expand Down
2 changes: 1 addition & 1 deletion integrationos-archiver/src/storage/google_cloud.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Storage;
use crate::config::ArchiverConfig;
use crate::domain::config::ArchiverConfig;
use crate::storage::Chunk;
use crate::Extension;
use anyhow::{Context, Result};
Expand Down
2 changes: 1 addition & 1 deletion integrationos-archiver/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod google_cloud;

use crate::config::ArchiverConfig;
use crate::domain::config::ArchiverConfig;
use anyhow::Result;
use std::{future::Future, ops::Deref, path::Path};
use strum::{AsRefStr, EnumString};
Expand Down
Loading

0 comments on commit 91cc1b8

Please sign in to comment.