Skip to content

Commit

Permalink
feat: implement metrics for event emitter (#202)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Dec 9, 2024
1 parent b541c98 commit f0d56c7
Show file tree
Hide file tree
Showing 33 changed files with 721 additions and 289 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
- name: Install fluvio CLI
run: curl -fsS https://hub.infinyon.cloud/install/install.sh | bash
- name: Create fluvio topic
run: ~/.fluvio/bin/fluvio profile add docker 127.0.0.1:9103 docker && ~/.fluvio/bin/fluvio topic create -p 2 events && ~/.fluvio/bin/fluvio topic create -p 2 dlq
run: ~/.fluvio/bin/fluvio profile add docker 127.0.0.1:9103 docker && ~/.fluvio/bin/fluvio topic create events && ~/.fluvio/bin/fluvio topic create dlq
- name: Install protoc
run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
- uses: dtolnay/rust-toolchain@stable
Expand Down
70 changes: 69 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions integrationos-api/src/logic/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ pub async fn create_connection(
group,
identity: Some(identity.to_owned()),
name: payload.name,
has_error: false,
error: None,
identity_type: payload.identity_type,
platform: connection_config.platform.into(),
environment: event_access.environment,
Expand Down
7 changes: 5 additions & 2 deletions integrationos-api/src/logic/event_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use axum::{
Json, Router,
};
use bson::doc;
use integrationos_domain::{ApplicationError, Connection, Id, IntegrationOSError};
use integrationos_domain::{
emitted_events::ConnectionLostReason, ApplicationError, Connection, Id, IntegrationOSError,
};
use std::sync::Arc;

pub fn get_router() -> Router<Arc<AppState>> {
Expand All @@ -19,10 +21,10 @@ pub fn get_router() -> Router<Arc<AppState>> {
)
}

// TODO: Write tests for this endpoint
async fn database_connection_lost_callback(
State(state): State<Arc<AppState>>,
Path(connection_id): Path<Id>,
Json(reason): Json<ConnectionLostReason>,
) -> Result<Json<Connection>, IntegrationOSError> {
// Instead of direcly updating we're getting the record first so that we can
// modify the active and deprecated fields from the record metadata
Expand All @@ -41,6 +43,7 @@ async fn database_connection_lost_callback(
)),
Some(mut conn) => {
if conn.record_metadata.active {
conn.mark_error(reason.reason.as_str());
conn.record_metadata.mark_deprecated("system");
conn.record_metadata.mark_inactive("system");
conn.record_metadata.mark_updated("system");
Expand Down
2 changes: 2 additions & 0 deletions integrationos-api/src/logic/oauth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ async fn oauth_handler(
identity: Some(identity),
identity_type: payload.identity_type,
settings: conn_definition.settings,
has_error: false,
error: None,
throughput: Throughput {
key,
limit: throughput,
Expand Down
5 changes: 2 additions & 3 deletions integrationos-api/tests/checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
};

use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;

pub enum CheckType {
Json,
Expand Down Expand Up @@ -87,10 +86,10 @@ impl JsonChecker for JsonCheckerImpl {
file.read_to_string(&mut contents)
.expect("Failed to read file contents");

let expected = serde_json::from_str::<Value>(&contents)
let expected = serde_json::from_str::<T>(&contents)
.expect("Failed to deserialize expect value");

let actual = serde_json::from_str::<Value>(&serialized)
let actual = serde_json::from_str::<T>(&serialized)
.expect("Failed to deserialize actual value");

expected == actual
Expand Down
22 changes: 11 additions & 11 deletions integrationos-api/tests/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ impl SecretExt for MockSecretsClient {
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ApiResponse<T: DeserializeOwned = Value> {
pub struct ApiResponse<Data: DeserializeOwned = Value> {
pub code: StatusCode,
pub data: T,
pub data: Data,
}

impl TestServer {
Expand Down Expand Up @@ -268,13 +268,13 @@ impl TestServer {
}
}

pub async fn send_request<T: Serialize, U: DeserializeOwned>(
pub async fn send_request<Payload: Serialize, Response: DeserializeOwned>(
&self,
path: &str,
method: http::Method,
key: Option<&str>,
payload: Option<&T>,
) -> Result<ApiResponse<U>> {
payload: Option<&Payload>,
) -> Result<ApiResponse<Response>> {
self.send_request_with_auth_headers(
path,
method,
Expand All @@ -288,14 +288,14 @@ impl TestServer {
.await
}

pub async fn send_request_with_headers<T: Serialize, U: DeserializeOwned>(
pub async fn send_request_with_headers<Payload: Serialize, Response: DeserializeOwned>(
&self,
path: &str,
method: http::Method,
key: Option<&str>,
payload: Option<&T>,
payload: Option<&Payload>,
headers: Option<BTreeMap<String, String>>,
) -> Result<ApiResponse<U>> {
) -> Result<ApiResponse<Response>> {
let mut req = self
.client
.request(method, format!("http://localhost:{}/{path}", self.port));
Expand All @@ -319,14 +319,14 @@ impl TestServer {
})
}

async fn send_request_with_auth_headers<T: Serialize, U: DeserializeOwned>(
async fn send_request_with_auth_headers<Payload: Serialize, Response: DeserializeOwned>(
&self,
path: &str,
method: http::Method,
key: Option<&str>,
payload: Option<&T>,
payload: Option<&Payload>,
headers: Option<BTreeMap<String, String>>,
) -> Result<ApiResponse<U>> {
) -> Result<ApiResponse<Response>> {
let headers = match headers {
Some(h) => h
.into_iter()
Expand Down
15 changes: 12 additions & 3 deletions integrationos-api/tests/http/callback.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::context::TestServer;
use http::{Method, StatusCode};
use integrationos_domain::{environment::Environment, prefix::IdPrefix, Connection, Id};
use integrationos_domain::{
emitted_events::ConnectionLostReason, environment::Environment, prefix::IdPrefix, Connection,
Id,
};
use serde_json::Value;

#[tokio::test]
Expand All @@ -13,9 +16,12 @@ async fn test_database_connection_lost_callback() {
let connection_id = connection.id.to_string();

let path = format!("v1/event-callbacks/database-connection-lost/{connection_id}");
let reason = ConnectionLostReason {
reason: "database-connection-lost".to_string(),
};

let request = server
.send_request::<Value, Connection>(&path, Method::POST, None, None)
.send_request::<ConnectionLostReason, Connection>(&path, Method::POST, None, Some(&reason))
.await
.expect("Failed to send request");

Expand All @@ -31,9 +37,12 @@ async fn test_database_connection_lost_callback_404() {

let connection_id = Id::now(IdPrefix::Connection).to_string();
let path = format!("v1/event-callbacks/database-connection-lost/{connection_id}");
let reason = ConnectionLostReason {
reason: "database-connection-lost".to_string(),
};

let request = server
.send_request::<Value, Value>(&path, Method::POST, None, None)
.send_request::<ConnectionLostReason, Value>(&path, Method::POST, None, Some(&reason))
.await
.expect("Failed to send request");

Expand Down
2 changes: 2 additions & 0 deletions integrationos-api/tests/standard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ fn test_json_connection() {
key: "throughput-key".to_string(),
limit: 100,
},
has_error: false,
error: None,
ownership: Ownership {
id: "owner-id".to_string().into(),
client_id: "client-id".to_string(),
Expand Down
1 change: 1 addition & 0 deletions integrationos-database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ tracing-subscriber.workspace = true
tracing.workspace = true

[dev-dependencies]
mockito.workspace = true
testcontainers-modules = { workspace = true, features = ["postgres"] }
21 changes: 6 additions & 15 deletions integrationos-database/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,14 @@ fn main() -> Result<()> {
.block_on(async move {
match config.database_connection_type {
DatabaseConnectionType::PostgreSql => {
match PostgresDatabaseConnection::init(&config).await {
Ok(server) => {
if let Err(e) = server.run().await {
PostgresDatabaseConnection::kill(&config).await?;
return Err(anyhow::anyhow!("Could not run server: {e}"));
}
let server = PostgresDatabaseConnection::init(&config).await?;

Ok(())
}
Err(e) => {
tracing::error!("Could not initialize storage: {e}");

PostgresDatabaseConnection::kill(&config).await?;

Err(anyhow::anyhow!("Could not initialize storage: {e}"))
}
if let Err(e) = server.run().await {
PostgresDatabaseConnection::kill(&config, e.to_string()).await?;
return Err(e);
}

Ok(())
}
}
})
Expand Down
Loading

0 comments on commit f0d56c7

Please sign in to comment.