Skip to content

Commit

Permalink
refactor: removing duplicated subscriber logic (#186)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Oct 31, 2024
1 parent 91cc1b8 commit 4144457
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 71 deletions.
28 changes: 9 additions & 19 deletions integrationos-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use anyhow::Result;
use dotenvy::dotenv;
use envconfig::Envconfig;
use integrationos_api::{domain::config::ConnectionsConfig, server::Server};
use integrationos_domain::telemetry::{
get_subscriber, get_subscriber_with_trace, init_subscriber, OtelGuard,
};
use integrationos_domain::telemetry::{get_subscriber, init_subscriber, OtelGuard};
use tracing::info;

fn main() -> Result<()> {
Expand All @@ -20,22 +18,14 @@ fn main() -> Result<()> {
.build()?;

runtime.block_on(async move {
match config.otlp_endpoint {
Some(ref otlp_url) => {
let subscriber = get_subscriber_with_trace(
"connections-api".into(),
"info".into(),
std::io::stdout,
otlp_url.into(),
);
init_subscriber(subscriber)
}
None => {
let subscriber =
get_subscriber("connections-api".into(), "info".into(), std::io::stdout);
init_subscriber(subscriber)
}
};
let subscriber = get_subscriber(
"connections-api".into(),
"info".into(),
std::io::stdout,
config.otlp_endpoint.clone(),
);

init_subscriber(subscriber);

info!("Starting API with config:\n{config}");

Expand Down
2 changes: 1 addition & 1 deletion integrationos-archiver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() -> Result<Unit> {
StorageProvider::GoogleCloud => GoogleCloudStorage::new(&config).await?,
});

let subscriber = get_subscriber("archiver".into(), "info".into(), std::io::stdout);
let subscriber = get_subscriber("archiver".into(), "info".into(), std::io::stdout, None);
init_subscriber(subscriber);

tracing::info!("Starting archiver with config:\n{config}");
Expand Down
2 changes: 1 addition & 1 deletion integrationos-database/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn main() -> Result<()> {
dotenv().ok();
let config = DatabaseConnectionConfig::init_from_env()?;

let subscriber = get_subscriber("storage".into(), "info".into(), std::io::stdout);
let subscriber = get_subscriber("storage".into(), "info".into(), std::io::stdout, None);
init_subscriber(subscriber);

info!("Starting Storage API with config:\n{config}");
Expand Down
94 changes: 46 additions & 48 deletions integrationos-domain/src/service/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,53 +22,11 @@ where
pub subscriber: T,
}

pub fn get_subscriber_with_trace<Sink>(
name: String,
env_filter: String,
sink: Sink,
otlp_url: String,
) -> Telemetry<impl SubscriberExt + Send + Sync + 'static>
where
Sink: for<'a> MakeWriter<'a> + Send + Sync + 'static,
{
let formatting_layer: BunyanFormattingLayer<Sink> =
BunyanFormattingLayer::new(name.clone(), sink);

let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otlp_url.clone());

let provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
Config::default().with_resource(opentelemetry_sdk::Resource::new(vec![
opentelemetry::KeyValue::new("service.name", name),
])),
)
.with_batch_config(BatchConfig::default())
.with_exporter(exporter)
.install_batch(Tokio)
.expect("Failed to install tracing pipeline");

global::set_tracer_provider(provider.clone());
let tracer = provider.tracer("tracing-otel-subscriber");

let filter_layer =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(env_filter));

Telemetry {
subscriber: Registry::default()
.with(filter_layer)
.with(JsonStorageLayer)
.with(OpenTelemetryLayer::new(tracer))
.with(formatting_layer),
}
}

pub fn get_subscriber<Sink>(
name: String,
env_filter: String,
sink: Sink,
otlp_url: Option<String>,
) -> Telemetry<impl SubscriberExt + Send + Sync + 'static>
where
Sink: for<'a> MakeWriter<'a> + Send + Sync + 'static,
Expand All @@ -79,11 +37,51 @@ where
let filter_layer =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(env_filter));

Telemetry {
subscriber: Registry::default()
.with(filter_layer)
.with(JsonStorageLayer)
.with(formatting_layer),
match otlp_url {
Some(otlp_url) => {
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otlp_url.clone());

let provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(Config::default().with_resource(
opentelemetry_sdk::Resource::new(vec![opentelemetry::KeyValue::new(
"service.name",
name,
)]),
))
.with_batch_config(BatchConfig::default())
.with_exporter(exporter)
.install_batch(Tokio)
.expect("Failed to install tracing pipeline");

global::set_tracer_provider(provider.clone());
let tracer = provider.tracer("tracing-otel-subscriber");

let telemetry: Telemetry<Box<dyn SubscriberExt + Send + Sync>> = Telemetry {
subscriber: Box::new(
Registry::default()
.with(OpenTelemetryLayer::new(tracer))
.with(filter_layer)
.with(JsonStorageLayer)
.with(formatting_layer),
),
};

telemetry
}
None => {
let telemetry: Telemetry<Box<dyn SubscriberExt + Send + Sync>> = Telemetry {
subscriber: Box::new(
Registry::default()
.with(filter_layer)
.with(JsonStorageLayer)
.with(formatting_layer),
),
};
telemetry
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion integrationos-event/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ use tracing::{error, info, warn};
async fn main() -> Result<()> {
dotenv().ok();

let suscriber = get_subscriber("integrationos-event".into(), "info".into(), std::io::stdout);
let suscriber = get_subscriber(
"integrationos-event".into(),
"info".into(),
std::io::stdout,
None,
);
init_subscriber(suscriber);

let config = EventCoreConfig::init_from_env()?;
Expand Down
1 change: 1 addition & 0 deletions integrationos-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async fn main() -> Result<()> {
"integrationos-gateway".into(),
"info".into(),
std::io::stdout,
None,
);
init_subscriber(suscriber);

Expand Down
2 changes: 1 addition & 1 deletion integrationos-watchdog/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tracing::info;
async fn main() -> Result<()> {
dotenv().ok();

let suscriber = get_subscriber("watchdog".into(), "info".into(), std::io::stdout);
let suscriber = get_subscriber("watchdog".into(), "info".into(), std::io::stdout, None);
init_subscriber(suscriber);

let watchdog_config = WatchdogConfig::init_from_env().context("Could not load config")?;
Expand Down

0 comments on commit 4144457

Please sign in to comment.