Skip to content

Commit

Permalink
feat: implementing otlp exporter (#180)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Oct 22, 2024
1 parent ab6a1fa commit 02ee568
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 21 deletions.
111 changes: 107 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions integrationos-api/src/domain/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub struct ConnectionsConfig {
pub database_connection_docker_image: String,
#[envconfig(from = "K8S_MODE", default = "logger")]
pub k8s_mode: K8sMode,
#[envconfig(from = "OTLP_ENDPOINT")]
pub otlp_endpoint: Option<String>,
}

impl Display for ConnectionsConfig {
Expand Down Expand Up @@ -142,6 +144,7 @@ impl Display for ConnectionsConfig {
"METRIC_SAVE_CHANNEL_SIZE: {}",
self.metric_save_channel_size
)?;
writeln!(f, "OTLP_ENDPOINT: ***")?;
writeln!(f, "METRIC_SYSTEM_ID: {}", self.metric_system_id)?;
writeln!(f, "SEGMENT_WRITE_KEY: ***")?;
writeln!(f, "EMIT_URL: {}", self.emit_url)?;
Expand Down
1 change: 1 addition & 0 deletions integrationos-api/src/logic/schema_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ async fn generate_all_types(
Ok(output_types)
}

#[tracing::instrument(skip(state))]
pub async fn get_common_models_projections(
state: State<Arc<AppState>>,
) -> Result<Json<ReadResponse<Document>>, IntegrationOSError> {
Expand Down
46 changes: 34 additions & 12 deletions integrationos-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,47 @@ use anyhow::Result;
use dotenvy::dotenv;
use envconfig::Envconfig;
use integrationos_api::{domain::config::ConnectionsConfig, server::Server};
use integrationos_domain::telemetry::{get_subscriber, init_subscriber};
use integrationos_domain::telemetry::{
get_subscriber, get_subscriber_with_trace, init_subscriber, OtelGuard,
};
use tracing::info;

fn main() -> Result<()> {
dotenv().ok();
let config = ConnectionsConfig::init_from_env()?;
let _guard = OtelGuard {
otlp_url: config.otlp_endpoint.clone(),
};

let subscriber = get_subscriber("connections-api".into(), "info".into(), std::io::stdout);
init_subscriber(subscriber);

info!("Starting API with config:\n{config}");

tokio::runtime::Builder::new_multi_thread()
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.worker_threads.unwrap_or(num_cpus::get()))
.enable_all()
.build()?
.block_on(async move {
let server: Server = Server::init(config).await?;
.build()?;

runtime.block_on(async move {
match config.otlp_endpoint {
Some(ref otlp_url) => {
let subscriber = get_subscriber_with_trace(
"connections-api".into(),
"info".into(),
std::io::stdout,
otlp_url.into(),
);
init_subscriber(subscriber)
}
None => {
let subscriber =
get_subscriber("connections-api".into(), "info".into(), std::io::stdout);
init_subscriber(subscriber)
}
};

info!("Starting API with config:\n{config}");

let server: Server = Server::init(config).await?;

server.run().await
})?;

server.run().await
})
Ok(())
}
6 changes: 5 additions & 1 deletion integrationos-domain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,16 @@ tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tracing-bunyan-formatter = "0.3.9"
tracing-log = "0.2.0"
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing.workspace = true
tracing = { workspace = true, features = ["attributes"] }
uuid = { workspace = true, features = ["v4"] }
crc32fast = "1.4.2"
secrecy = { version = "0.8.0", features = ["serde"] }
chacha20poly1305 = "0.10.1"
hex = { version = "0.4.3", features = ["serde"] }
opentelemetry = { version = "0.26", features = ["trace"] }
opentelemetry-otlp = "0.26"
tracing-opentelemetry = "0.27"
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio", "trace"] }

[dev-dependencies]
once_cell = "1.19.0"
Expand Down
2 changes: 1 addition & 1 deletion integrationos-domain/src/domain/schema/common_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Hash for CommonModel {
self.id.hash(state);
}
}

#[derive(Debug)]
pub enum TypeGenerationStrategy<'a> {
/// Generates the type in a cumulative way, meaning that it will only generate the types for the
/// models and enums that have not been generated before keeping track of the already generated
Expand Down
68 changes: 65 additions & 3 deletions integrationos-domain/src/service/telemetry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,70 @@
use crate::TimedExt;
use axum::body::Body;
use axum::extract::Request;
use axum::middleware::Next;
use axum::response::IntoResponse;
use http::StatusCode;
use opentelemetry::{global, trace::TracerProvider};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::runtime::Tokio;
use opentelemetry_sdk::trace::{BatchConfig, Config};
use tracing::subscriber::set_global_default;
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_log::LogTracer;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::fmt::MakeWriter;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry};

use crate::TimedExt;

pub struct Telemetry<T>
where
T: SubscriberExt + Send + Sync + 'static,
{
pub subscriber: T,
}

pub fn get_subscriber_with_trace<Sink>(
name: String,
env_filter: String,
sink: Sink,
otlp_url: String,
) -> Telemetry<impl SubscriberExt + Send + Sync + 'static>
where
Sink: for<'a> MakeWriter<'a> + Send + Sync + 'static,
{
let formatting_layer: BunyanFormattingLayer<Sink> =
BunyanFormattingLayer::new(name.clone(), sink);

let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otlp_url.clone());

let provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
Config::default().with_resource(opentelemetry_sdk::Resource::new(vec![
opentelemetry::KeyValue::new("service.name", name),
])),
)
.with_batch_config(BatchConfig::default())
.with_exporter(exporter)
.install_batch(Tokio)
.expect("Failed to install tracing pipeline");

global::set_tracer_provider(provider.clone());
let tracer = provider.tracer("tracing-otel-subscriber");

let filter_layer =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(env_filter));

Telemetry {
subscriber: Registry::default()
.with(filter_layer)
.with(JsonStorageLayer)
.with(OpenTelemetryLayer::new(tracer))
.with(formatting_layer),
}
}

pub fn get_subscriber<Sink>(
name: String,
env_filter: String,
Expand All @@ -26,7 +73,8 @@ pub fn get_subscriber<Sink>(
where
Sink: for<'a> MakeWriter<'a> + Send + Sync + 'static,
{
let formatting_layer: BunyanFormattingLayer<Sink> = BunyanFormattingLayer::new(name, sink);
let formatting_layer: BunyanFormattingLayer<Sink> =
BunyanFormattingLayer::new(name.clone(), sink);

let filter_layer =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(env_filter));
Expand Down Expand Up @@ -76,3 +124,17 @@ pub async fn log_request_middleware(

Ok(res)
}

#[derive(Debug)]
pub struct OtelGuard {
pub otlp_url: Option<String>,
}

impl Drop for OtelGuard {
fn drop(&mut self) {
if self.otlp_url.is_some() {
tracing::info!("Shutting down OpenTelemetry");
opentelemetry::global::shutdown_tracer_provider();
}
}
}

0 comments on commit 02ee568

Please sign in to comment.