diff --git a/README.md b/README.md index 53f786e..e19995e 100644 --- a/README.md +++ b/README.md @@ -55,21 +55,25 @@ In your application, just before you start your KafkaStreams instance: ```java import io.factorhouse.kpow.StreamsRegistry; +import io.factorhouse.kpow.key.ClusterIdKeyStrategy; // Your Kafka Streams topology -Topology topology = createMyTopology(); +Topology topology = createMyTopology(); // Your Kafka Streams config Properties props = new createMyStreamProperties(); - + // Your Kafka Streams instance -KafkaStreams streams = new KafkaStreams(topology, props); +KafkaStreams streams = new KafkaStreams(topology, props); // Create a Kpow StreamsRegistry StreamsRegistry registry = new StreamsRegistry(props); +// Specify the key strategy when writing metrics to the internal Kafka topic +KeyStrategy keyStrategy = new ClusterIdKeyStrategy(props); + // Register your KafkaStreams and Topology instances with the StreamsRegistry -registry.register(streams, topology); +registry.register(streams, topology, keyStrategy); // Start your Kafka Streams application streams.start(); @@ -82,7 +86,39 @@ The StreamsRegistry is a *single-threaded process* that performs these actions * The StreamsRegistry **does not talk directly to Kpow**. Kpow reads streams data from the snapshot topic. -# Configuration +# Metric filters + +You can configure each streams registry with metric filters, which give you greater control over which metrics Kpow's streams agent will export. + +Metric filters can be chained and added programmatically: + +```java +import io.factorhouse.kpow.StreamsRegistry; +import io.factorhouse.kpow.MetricFilter; + +MetricFilter metricFilter = MetricFilter().deny(); // don't send any streams metrics, just send through the Streams Topology + +// .. + +StreamsRegistry registry = new StreamsRegistry(props, metricFilter); +``` + +If you pass no metric filters to the `StreamsRegistry` constructor then the default metric filter will be used. The default metric filter will **accept** all metrics to be exported. + +### Metric filter usage + +Kpow's streams agent metric filters work very similar to Micrometer's [meter filters](https://github.com/micrometer-metrics/micrometer-docs/blob/main/src/docs/concepts/meter-filters.adoc). + +Metric filters can either `ACCEPT` or `DENY` a metric. The filter itself is a Java predicate which takes in the [org.apache.common.MetricName](https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/MetricName.html#group()) class. This allows you to filter metrics by name, tags or group. + +Metric filters are applied sequentially in the order they are configured in the registry. This allows for stacking of deny and accept filters to create more complex rules: + +```java +MetricFilter metricFilter = MetricFilter().acceptNameStartsWith("rocksdb").deny(); +``` +The above example allows all rocksdb related metrics through and denies all other types of streams metrics. + +# Kafka connection The `StreamsRegistry` `Properties` contains configuration to create the snapshot producer. @@ -121,6 +157,48 @@ Producer configuration means any of the following fields: For more details visit the [Producer](https://kafka.apache.org/documentation/#producerconfigs) section of the Apache Kafka documentation. +### Key strategy + +The keying strategy for data sent from Kpow's streams agent to its internal Kafka topic is configurable. The key strategy plays an important role in enabling Kpow to align stream metrics with the UI accurately. There are many key strategies available depending on your organisation's deployment. + +#### Cluster ID (recommended key strategy, requires Kpow 94.1+) + +The default key strategy uses the cluster ID, obtained via an AdminClient [describeClusters](https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/admin/DescribeClusterResult.html) call. This AdminClient is created once during registry initialization and then closed. If you prefer not to have the streams registry create an AdminClient—either because your Kafka variant does not provide a cluster ID or due to security considerations—you may select an alternative key strategy from the options below. + +```java +// Specify the key strategy when writing metrics to the internal Kafka topic +// props are java.util.Properties describing the Kafka Connection +KeyStrategy keyStrategy = new ClusterIDKeyStrategy(props); +// Register your KafkaStreams and Topology instances with the StreamsRegistry +registry.register(streams, topology, keyStrategy); +``` + +#### Client ID (default in 0.2.0 and below) + +This key strategy relies on the client ID and application ID from the active KafkaStreams instance, eliminating the need for an AdminClient. However, in a multi-cluster Kpow deployment where the same application ID is used across multiple environments (e.g., staging, dev, prod), Kpow cannot determine which cluster the Kafka Streams instance is associated with. + +```java + +import io.factorhouse.kpow.key.ClientIdKeyStrategy; + +KeyStrategy keyStrategy = new ClientIdKeyStrategy(); +registry.register(streams, topology, keyStrategy); +``` + +#### Environment name (manual, requires Kpow 94.1+) + +If you have set a UI-friendly cluster name using the `ENVIRONMENT_NAME` environment variable in Kpow, you can use this environment name as the keying strategy for the streams agent. + +```java + + +// This sets a manual key of `Trade Book (Staging)`, the name of the clusters environment name in Kpow's UI. +KeyStrategy keyStrategy = new ManualKeyStrategy("Trade Book (Staging)"); +registry. + + register(streams, topology, keyStrategy); +``` + ### Minimum Required ACLs If you secure your Kafka Cluster with ACLs, the user provided in the Producer configuration must have permission to write to the internal Kpow topic. @@ -147,7 +225,7 @@ Properties streamsProps = new Properties(); KafkaStreams streams = new KafkaStreams(topology, streamsProps); StreamsRegistry registry = new StreamsRegistry(streamsProps); -... +//... ``` ### Multi-Cluster Kpow @@ -160,7 +238,7 @@ KafkaStreams streams = new KafkaStreams(topology, streamsProps); Properties primaryProps = createMyPrimaryClusterProducerProperties(); StreamsRegistry registry = new StreamsRegistry(primaryProps); -... +//... ``` See the [Kpow Multi-Cluster Feature Guide](https://docs.factorhouse.io/kpow-ee/config/multi-cluster/) for more information. diff --git a/project.clj b/project.clj index 56be5b8..830876f 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject io.factorhouse/kpow-streams-agent "0.2.13" +(defproject io.factorhouse/kpow-streams-agent "1.0.0-rc6" :description "Kpow's Kafka Streams monitoring agent" :url "https://github.com/factorhouse/kpow-streams-agent" :license {:name "Apache-2.0 License" @@ -19,7 +19,7 @@ [:name "Derek Troy-West"] [:url "https://factorhouse.io"] [:roles - [:role "developer"] + [:role "developer"]7 [:role "maintainer"]]]]) :dependencies [[org.clojure/clojure "1.12.0"] [com.cognitect/transit-clj "1.0.333"] diff --git a/src/clojure/io/factorhouse/kpow/agent.clj b/src/clojure/io/factorhouse/kpow/agent.clj index 814ee0f..3f0417c 100644 --- a/src/clojure/io/factorhouse/kpow/agent.clj +++ b/src/clojure/io/factorhouse/kpow/agent.clj @@ -1,19 +1,28 @@ (ns io.factorhouse.kpow.agent - (:require [clojure.string :as str] - [clojure.tools.logging :as log] - [clojure.core.protocols :as p]) - (:import (java.util UUID) - (java.util.concurrent Executors TimeUnit ThreadFactory) + (:require [clojure.core.protocols :as p] + [clojure.string :as str] + [clojure.tools.logging :as log]) + (:import (io.factorhouse.kpow MetricFilter MetricFilter$FilterCriteria) + (io.factorhouse.kpow.key KeyStrategy Taxon) + (java.util UUID) + (java.util.concurrent Executors ThreadFactory TimeUnit) + (java.util.concurrent.atomic AtomicInteger) (org.apache.kafka.clients.producer Producer ProducerRecord) - (org.apache.kafka.streams Topology KeyValue TopologyDescription TopologyDescription$Subtopology - TopologyDescription$GlobalStore TopologyDescription$Node TopologyDescription$Source - TopologyDescription$Processor TopologyDescription$Sink) - (java.util.concurrent.atomic AtomicInteger))) + (org.apache.kafka.common MetricName) + (org.apache.kafka.streams KeyValue Topology TopologyDescription TopologyDescription$GlobalStore + TopologyDescription$Node TopologyDescription$Processor TopologyDescription$Sink + TopologyDescription$Source TopologyDescription$Subtopology))) (def kpow-snapshot-topic {:topic "__oprtr_snapshot_state"}) (extend-protocol p/Datafiable + Taxon + (datafy [v] + (if-let [object-id (.getObjectId v)] + [(keyword (.getDomain v)) (.getDomainId v) (keyword "kafka" (.getObject v)) object-id] + [(keyword (.getDomain v)) (.getDomainId v) (keyword "kafka" (.getObject v))])) + KeyValue (datafy [kv] {:key (.key kv) @@ -52,12 +61,13 @@ (defn metrics [streams] (into [] (map (fn [[_ metric]] - (let [metric-name (.metricName metric)] + (let [metric-name ^MetricName (.metricName metric)] {:value (.metricValue metric) :description (.description metric-name) :group (.group metric-name) :name (.name metric-name) - :tags (into {} (.tags metric-name))}))) + :tags (into {} (.tags metric-name)) + :metric-name metric-name}))) (.metrics streams))) (defn application-id @@ -77,70 +87,97 @@ (str/split #"-StreamThread-") (first)))) +(defn apply-metric-filters + [^MetricName metric-name filters] + (reduce + (fn [acc ^MetricFilter$FilterCriteria filter-criteria] + (let [metric-filter-type (.getFilterType filter-criteria) + predicate (.getPredicate filter-criteria)] + (if (.test predicate metric-name) + (reduced + (case (.name metric-filter-type) + "ACCEPT" true + "DENY" false)) + acc))) + nil + filters)) + (defn numeric-metrics - [metrics] - (->> metrics - (filter (comp number? :value)) - (remove (fn [{:keys [value]}] - (if (double? value) - (Double/isNaN value) - false))) - (map #(select-keys % [:name :tags :value])))) + [metrics ^MetricFilter metrics-filter] + (let [filters (.getFilters metrics-filter)] + (into [] (comp (filter (comp number? :value)) + (remove (fn [{:keys [value]}] + (if (double? value) + (Double/isNaN value) + false))) + (filter (fn [{:keys [metric-name]}] + (apply-metric-filters metric-name filters))) + (map #(select-keys % [:name :tags :value]))) + metrics))) (defn snapshot-send - [{:keys [snapshot-topic ^Producer producer snapshot-id application-id job-id client-id captured]} data] - (let [snapshot {:type :kafka/streams-agent + [{:keys [snapshot-topic ^Producer producer taxon application-id job-id client-id captured]} data] + (let [taxon (p/datafy taxon) + snapshot {:type :kafka/streams-agent :application-id application-id :client-id client-id :captured captured :data data :job/id job-id - :snapshot/id snapshot-id} - taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent] + :snapshot/id {:domain :streams :id taxon}} record (ProducerRecord. (:topic snapshot-topic) taxon snapshot)] (.get (.send producer record)))) (defn metrics-send - [{:keys [snapshot-topic producer snapshot-id application-id job-id client-id captured]} metrics] - (let [taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent]] + [{:keys [snapshot-topic producer taxon application-id job-id client-id captured]} metrics] + (let [taxon (p/datafy taxon)] (doseq [data (partition-all 50 metrics)] - (let [value {:type :kafka/streams-agent-metrics - :application-id application-id - :client-id client-id - :captured captured - :data (vec data) - :job/id job-id - :snapshot/id snapshot-id} - record (ProducerRecord. (:topic snapshot-topic) taxon value)] + (let [value {:type :kafka/streams-agent-metrics + :application-id application-id + :client-id client-id + :captured captured + :data (vec data) + + :job/id job-id + :snapshot/id {:domain :streams :id taxon}} + record (ProducerRecord. (:topic snapshot-topic) taxon value)] (.get (.send producer record)))) (log/infof "Kpow: sent [%s] streams metrics for application.id %s" (count metrics) application-id))) (defn plan-send - [{:keys [snapshot-topic producer snapshot-id job-id captured]}] - (let [taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent] - plan {:type :observation/plan - :captured captured - :snapshot/id snapshot-id - :job/id job-id - :data {:type :observe/streams-agent}} - record (ProducerRecord. (:topic snapshot-topic) taxon plan)] + [{:keys [snapshot-topic producer job-id captured taxon metrics-summary agent-id application-id client-id]}] + (let [taxon (p/datafy taxon) + plan {:type :observation/plan + :captured captured + :snapshot/id {:domain :streams :id taxon} + :job/id job-id + :application-id application-id + :client-id client-id + :data {:type :observe/streams-agent + :agent {:metrics-summary metrics-summary + :id agent-id + :captured captured + :version "1.0.0"}}} + record (ProducerRecord. (:topic snapshot-topic) taxon plan)] (.get (.send producer record)))) (defn snapshot-telemetry - [{:keys [streams ^Topology topology] :as ctx}] + [{:keys [streams ^Topology topology ^MetricFilter metrics-filter ^KeyStrategy key-strategy] :as ctx}] (let [metrics (metrics streams)] (if (empty? metrics) (log/warn "KafkStreams .metrics() method returned an empty collection, no telemetry was sent. Has something mutated the global metrics registry?") - (let [topology (p/datafy (.describe topology)) - state (str (.state streams)) - snapshot {:topology topology :state state} - client-id (client-id metrics) - application-id (application-id metrics) - ctx (assoc ctx - :captured (System/currentTimeMillis) - :client-id client-id - :application-id application-id - :snapshot-id {:domain :streams :id client-id})] + (let [topology (p/datafy (.describe topology)) + state (str (.state streams)) + snapshot {:topology topology :state state} + client-id (client-id metrics) + application-id (application-id metrics) + taxon (.getTaxon key-strategy client-id application-id) + ctx (assoc ctx + :captured (System/currentTimeMillis) + :client-id client-id + :application-id application-id + :taxon taxon) + filtered-metrics (numeric-metrics metrics metrics-filter)] (when (nil? application-id) (throw (Exception. "Cannot infer application id from metrics returned from KafkaStreams instance. Expected metric \"application-id\" in the metrics registry."))) (when (nil? client-id) @@ -149,23 +186,29 @@ (client-id-tag metrics) application-id)))) (snapshot-send ctx snapshot) - (metrics-send ctx (numeric-metrics metrics)) - ctx)))) + (metrics-send ctx filtered-metrics) + (assoc ctx :metrics-summary {:total (count metrics) + :sent (count filtered-metrics) + :id (some-> metrics-filter .getFilterId)}))))) (defn snapshot-task - ^Runnable [snapshot-topic producer registered-topologies latch] + ^Runnable [snapshot-topic producer registered-topologies metrics-filter latch] (fn [] (let [job-id (str (UUID/randomUUID)) ctx {:job-id job-id :snapshot-topic snapshot-topic - :producer producer}] - - (doseq [[id [streams topology]] @registered-topologies] - (try (when-let [next-ctx (snapshot-telemetry (assoc ctx :streams streams :topology topology))] + :producer producer + :metrics-filter metrics-filter}] + (doseq [[id [streams topology key-strategy]] @registered-topologies] + (try (when-let [next-ctx (snapshot-telemetry (assoc ctx + :streams streams + :topology topology + :key-strategy key-strategy + :agent-id id))] (Thread/sleep 2000) (plan-send next-ctx)) (catch Throwable e - (log/errorf e "Kpow: error sending streams snapshot for agent %s" id)))) + (log/warnf e "Kpow: error sending streams snapshot for agent %s" id)))) (deliver latch true)))) @@ -177,16 +220,17 @@ (.setName (format "kpow-streams-agent-%d" (.getAndIncrement n)))))))) (defn start-registry - [{:keys [snapshot-topic producer]}] + [{:keys [snapshot-topic producer metrics-filter]}] (log/info "Kpow: starting registry") (let [registered-topologies (atom {}) pool (Executors/newSingleThreadScheduledExecutor thread-factory) - register-fn (fn [streams topology] + register-fn (fn [streams topology key-strategy] (let [id (str (UUID/randomUUID))] - (swap! registered-topologies assoc id [streams topology]) + (log/infof "Kpow: registering new streams application with id %s" id) + (swap! registered-topologies assoc id [streams topology key-strategy]) id)) latch (promise) - task (snapshot-task snapshot-topic producer registered-topologies latch) + task (snapshot-task snapshot-topic producer registered-topologies metrics-filter latch) scheduled-future (.scheduleWithFixedDelay pool task 500 60000 TimeUnit/MILLISECONDS)] {:register register-fn :pool pool @@ -205,13 +249,15 @@ {}) (defn init-registry - [producer] - (start-registry {:snapshot-topic kpow-snapshot-topic :producer producer})) + [producer metrics-filter] + (start-registry {:snapshot-topic kpow-snapshot-topic + :producer producer + :metrics-filter metrics-filter})) (defn register - [agent streams topology] + [agent streams topology key-strategy] (when-let [register-fn (:register agent)] - (let [id (register-fn streams topology)] + (let [id (register-fn streams topology key-strategy)] (log/infof "Kpow: registring new streams agent %s" id) id))) diff --git a/src/java/io/factorhouse/kpow/MetricFilter.java b/src/java/io/factorhouse/kpow/MetricFilter.java new file mode 100644 index 0000000..f90978b --- /dev/null +++ b/src/java/io/factorhouse/kpow/MetricFilter.java @@ -0,0 +1,218 @@ +package io.factorhouse.kpow; + +import org.apache.kafka.common.MetricName; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Predicate; + +public class MetricFilter { + + private String filterId = null; + + public String getFilterId() { + return filterId; + } + + public enum FilterType { + ACCEPT, DENY + } + + public static class FilterCriteria { + private final Predicate predicate; + private final FilterType filterType; + + // Constructor to initialize both fields + private FilterCriteria(Predicate predicate, FilterType filterType) { + this.predicate = predicate; + this.filterType = filterType; + } + + public Predicate getPredicate() { + return predicate; + } + + public FilterType getFilterType() { + return filterType; + } + } + + private final List filters; + + public MetricFilter() { + this.filters = new ArrayList<>(); + this.filterId = "custom"; + } + + private MetricFilter(String id) { + this.filters = new ArrayList<>(); + this.filterId = id; + } + + /** + * Returns a metrics filter that accepts all numeric metrics from the running Streams application. + * + * @return accept all metric filter + */ + public static MetricFilter acceptAllMetricFilter() { + return new MetricFilter("acceptAll").accept(); + } + + /** + * Returns a metrics filter that denies all metrics, only sending across the Kafka Streams topology + state + * on every observation. + * + * @return deny all metric filter + */ + public static MetricFilter denyAllMetricFilter() { + return new MetricFilter("denyAll").deny(); + } + + /** + * Returns a metrics filter that includes only state store metrics. + * + * @return state store metrics only filter + */ + public static MetricFilter stateStoreMetricsOnlyFilter() { + Predicate stateStoreMetricsOnly = m -> + m.tags().containsKey("store") || + m.tags().containsKey("in-memory-state-id") || + m.tags().containsKey("in-memory-window-state-id") || + m.tags().containsKey("in-memory-session-state-id") || + m.tags().containsKey("rocksdb-session-state-id") || + m.tags().containsKey("rocksdb-state-id") || + m.tags().containsKey("rocksdb-window-state-id"); + return new MetricFilter("stateStoreMetricsOnly") + .accept(stateStoreMetricsOnly); + } + + /** + * Returns the default metricsFilter used by the streams agent. + * By default, Kpow's streams agent will only send across a few key Kafka Streams metrics: + * Latency: + * - commit-latency-avg + * - process-latency-avg + * - poll-latecny-avg + * Throughput: + * - process-rate + * - records-processed-rate + * Lag: + * - commit-rate + * - records-lag-max + * - records-lag + * Stability: + * - failed-stream-threads + * - rebalances + * State store health: + * - put-rate + * - get-rate + * - flush-rate + * + * @return the default metrics filter + */ + public static MetricFilter defaultMetricFilter() { + return new MetricFilter("default") + // Latency + .acceptNameStartsWith("commit-latency-avg") + .acceptNameStartsWith("process-latency-avg") + .acceptNameStartsWith("poll-latency-avg") + // Throughput + .acceptNameStartsWith("process-rate") + .acceptNameStartsWith("records-processed-rate") + // Lag + .acceptNameStartsWith("commit-rate") + .acceptNameStartsWith("records-lag-max") + .acceptNameStartsWith("records-lag") + // Stability + .acceptNameStartsWith("failed-stream-threads") + .acceptNameStartsWith("rebalances") + // State store health + .acceptNameStartsWith("put-rate") + .acceptNameStartsWith("get-rate") + .acceptNameStartsWith("flush-rate"); + } + + public List getFilters() { + return Collections.unmodifiableList(filters); + } + + /** + * Accepts all metrics. + * + * @return an updated MetricFilter + */ + public MetricFilter accept() { + Predicate acceptPredicate = (_filter) -> { + return true; + }; + FilterCriteria criteria = new FilterCriteria(acceptPredicate, FilterType.ACCEPT); + this.filters.add(criteria); + return this; + } + + /** + * Accepts a metric based on the specified Predicate. + * + * @return an updated MetricFilter + */ + public MetricFilter accept(Predicate acceptFilter) { + FilterCriteria criteria = new FilterCriteria(acceptFilter, FilterType.ACCEPT); + this.filters.add(criteria); + return this; + } + + /** + * Denies all metrics. + * + * @return an updated MetricFilter + */ + public MetricFilter deny() { + Predicate denyFilter = (_filter) -> { + return true; + }; + FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY); + this.filters.add(criteria); + return this; + } + + /** + * Denies a metric based on the specified Predicate. + * + * @return an updated MetricFilter + */ + public MetricFilter deny(Predicate denyFilter) { + FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY); + this.filters.add(criteria); + return this; + } + + /** + * Accepts all metrics whose name start with the specified prefix. + * + * @return an updated MetricFilter + */ + public MetricFilter acceptNameStartsWith(String prefix) { + Predicate acceptFilter = (metricName) -> { + return metricName.name().startsWith(prefix); + }; + FilterCriteria criteria = new FilterCriteria(acceptFilter, FilterType.ACCEPT); + this.filters.add(criteria); + return this; + } + + /** + * Denies all metrics whose name start with the specified prefix. + * + * @return an updated MetricFilter + */ + public MetricFilter denyNameStartsWith(String prefix) { + Predicate denyFilter = (metricName) -> { + return metricName.name().startsWith(prefix); + }; + FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY); + this.filters.add(criteria); + return this; + } +} + diff --git a/src/java/io/factorhouse/kpow/StreamsRegistry.java b/src/java/io/factorhouse/kpow/StreamsRegistry.java index c1c6b34..2e211f1 100644 --- a/src/java/io/factorhouse/kpow/StreamsRegistry.java +++ b/src/java/io/factorhouse/kpow/StreamsRegistry.java @@ -2,6 +2,7 @@ import clojure.java.api.Clojure; import clojure.lang.IFn; +import io.factorhouse.kpow.key.KeyStrategy; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; @@ -73,7 +74,7 @@ public static Properties filterProperties(Properties props) { return nextProps; } - public StreamsRegistry(Properties props) { + public StreamsRegistry(Properties props, MetricFilter metricsFilter) { IFn require = Clojure.var("clojure.core", "require"); require.invoke(Clojure.read("io.factorhouse.kpow.agent")); IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry"); @@ -83,14 +84,23 @@ public StreamsRegistry(Properties props) { Serializer valSerializer = (Serializer) serdesFn.invoke(); Properties producerProps = filterProperties(props); KafkaProducer producer = new KafkaProducer<>(producerProps, keySerializer, valSerializer); - agent = agentFn.invoke(producer); + agent = agentFn.invoke(producer, metricsFilter); + } + + public StreamsRegistry(Properties props) { + this(props, MetricFilter.defaultMetricFilter()); } - public StreamsAgent register(KafkaStreams streams, Topology topology) { + /** + * Registers a Kafka Streams application with Kpow's StreamsRegistry. + * + * @return a StreamsAgent instance for the registered application (used to unregister). + */ + public StreamsAgent register(KafkaStreams streams, Topology topology, KeyStrategy keyStrategy) { IFn require = Clojure.var("clojure.core", "require"); require.invoke(Clojure.read("io.factorhouse.kpow.agent")); IFn registerFn = Clojure.var("io.factorhouse.kpow.agent", "register"); - String id = (String) registerFn.invoke(agent, streams, topology); + String id = (String) registerFn.invoke(agent, streams, topology, keyStrategy); if (id != null) { return new StreamsAgent(id); } else { @@ -98,6 +108,10 @@ public StreamsAgent register(KafkaStreams streams, Topology topology) { } } + /** + * Unregisters a Kafka Streams application with Kpow's StreamsRegistry. Use the StreamsAgent instance returned from + * the register method. + */ public void unregister(StreamsAgent streamsAgent) { if (streamsAgent != null) { IFn require = Clojure.var("clojure.core", "require"); @@ -114,4 +128,4 @@ public void close() { IFn closeFn = Clojure.var("io.factorhouse.kpow.agent", "close-registry"); closeFn.invoke(agent); } -} \ No newline at end of file +} diff --git a/src/java/io/factorhouse/kpow/key/ClientIdKeyStrategy.java b/src/java/io/factorhouse/kpow/key/ClientIdKeyStrategy.java new file mode 100644 index 0000000..32d171d --- /dev/null +++ b/src/java/io/factorhouse/kpow/key/ClientIdKeyStrategy.java @@ -0,0 +1,17 @@ +package io.factorhouse.kpow.key; + +/** + * This key strategy relies on the client ID and application ID from the active KafkaStreams instance, eliminating the need for an AdminClient. + *

+ * However, in a multi-cluster Kpow deployment where the same application ID is used across multiple environments (e.g., staging, dev, prod), + * Kpow cannot determine which cluster the Kafka Streams instance is associated with. + *

+*/ +public class ClientIdKeyStrategy implements KeyStrategy { + public ClientIdKeyStrategy() {} + + @Override + public Taxon getTaxon(String clientId, String applicationId) { + return new Taxon("streams", clientId, "streams-agent", null); + } +} diff --git a/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java b/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java new file mode 100644 index 0000000..1b8a5ed --- /dev/null +++ b/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java @@ -0,0 +1,26 @@ +package io.factorhouse.kpow.key; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +/** + * The default key strategy uses the cluster ID, obtained via an {@link org.apache.kafka.clients.admin.Admin#describeClusters()} call. + * This AdminClient is created once during registry initialization and then closed. + */ +public class ClusterIdKeyStrategy implements KeyStrategy { + private final String clusterId; + + public ClusterIdKeyStrategy(Properties props) throws InterruptedException, ExecutionException { + try (AdminClient adminClient = AdminClient.create(props)) { + DescribeClusterResult describeClusterResult = adminClient.describeCluster(); + this.clusterId = describeClusterResult.clusterId().get(); + } + } + + @Override + public Taxon getTaxon(String clientId, String applicationId) { + return new Taxon("cluster", clusterId, "streams-agent", clientId); + } +} diff --git a/src/java/io/factorhouse/kpow/key/KeyStrategy.java b/src/java/io/factorhouse/kpow/key/KeyStrategy.java new file mode 100644 index 0000000..ab7f582 --- /dev/null +++ b/src/java/io/factorhouse/kpow/key/KeyStrategy.java @@ -0,0 +1,5 @@ +package io.factorhouse.kpow.key; + +public interface KeyStrategy { + Taxon getTaxon(String clientId, String applicationId); +} diff --git a/src/java/io/factorhouse/kpow/key/Taxon.java b/src/java/io/factorhouse/kpow/key/Taxon.java new file mode 100644 index 0000000..0e31a3d --- /dev/null +++ b/src/java/io/factorhouse/kpow/key/Taxon.java @@ -0,0 +1,31 @@ +package io.factorhouse.kpow.key; + +public class Taxon { + private final String domain; + private final String domainId; + private final String object; + private final String objectId; + + public Taxon(String domain, String domainId, String object, String objectId) { + this.domainId = domainId; + this.domain = domain; + this.object = object; + this.objectId = objectId; + } + + public String getDomain() { + return domain; + } + + public String getObject() { + return object; + } + + public String getObjectId() { + return objectId; + } + + public String getDomainId() { + return domainId; + } +} diff --git a/src/java/io/operatr/kpow/StreamsRegistry.java b/src/java/io/operatr/kpow/StreamsRegistry.java deleted file mode 100644 index 9e372b6..0000000 --- a/src/java/io/operatr/kpow/StreamsRegistry.java +++ /dev/null @@ -1,117 +0,0 @@ -package io.operatr.kpow; - -import clojure.java.api.Clojure; -import clojure.lang.IFn; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.Topology; - -import java.util.ArrayList; -import java.util.Properties; - -public class StreamsRegistry implements AutoCloseable { - - public static class StreamsAgent { - private final String _id; - - StreamsAgent(String id) { - _id = id; - } - - public String getId() { - return _id; - } - } - - private final Object agent; - - public static Properties filterProperties(Properties props) { - ArrayList allowedKeys = new ArrayList<>(); - allowedKeys.add("ssl.enabled.protocols"); - allowedKeys.add("sasl.client.callback.handler.class"); - allowedKeys.add("ssl.endpoint.identification.algorithm"); - allowedKeys.add("ssl.provider"); - allowedKeys.add("ssl.truststore.location"); - allowedKeys.add("ssl.keystore.key"); - allowedKeys.add("ssl.key.password"); - allowedKeys.add("ssl.protocol"); - allowedKeys.add("ssl.keystore.password"); - allowedKeys.add("sasl.login.class"); - allowedKeys.add("ssl.trustmanager.algorithm"); - allowedKeys.add("ssl.keystore.location"); - allowedKeys.add("sasl.login.callback.handler.class"); - allowedKeys.add("ssl.truststore.certificates"); - allowedKeys.add("ssl.cipher.suites"); - allowedKeys.add("ssl.truststore.password"); - allowedKeys.add("ssl.keymanager.algorithm"); - allowedKeys.add("ssl.keystore.type"); - allowedKeys.add("ssl.secure.random.implementation"); - allowedKeys.add("ssl.truststore.type"); - allowedKeys.add("sasl.jaas.config"); - allowedKeys.add("ssl.keystore.certificate.chain"); - allowedKeys.add("sasl.mechanism"); - allowedKeys.add("sasl.oauthbearer.jwks.endpoint.url"); - allowedKeys.add("sasl.oauthbearer.token.endpoint.url"); - allowedKeys.add("sasl.kerberos.service.name"); - allowedKeys.add("security.protocol"); - allowedKeys.add("bootstrap.servers"); - - Properties nextProps = new Properties(); - for (String key : allowedKeys) { - if (props.containsKey(key)) { - nextProps.setProperty(key, String.valueOf(props.get(key))); - } - } - - String compressionType = props.getProperty("compression.type", "gzip"); - nextProps.setProperty("compression.type", compressionType); - - String idempotence = props.getProperty("enable.idempotence", "false"); - nextProps.setProperty("enable.idempotence", idempotence); - - return nextProps; - } - - public StreamsRegistry(Properties props) { - IFn require = Clojure.var("clojure.core", "require"); - require.invoke(Clojure.read("io.factorhouse.kpow.agent")); - IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry"); - require.invoke(Clojure.read("io.factorhouse.kpow.serdes")); - IFn serdesFn = Clojure.var("io.factorhouse.kpow.serdes", "transit-json-serializer"); - Serializer keySerializer = (Serializer) serdesFn.invoke(); - Serializer valSerializer = (Serializer) serdesFn.invoke(); - Properties producerProps = filterProperties(props); - KafkaProducer producer = new KafkaProducer<>(producerProps, keySerializer, valSerializer); - agent = agentFn.invoke(producer); - } - - public StreamsAgent register(KafkaStreams streams, Topology topology) { - IFn require = Clojure.var("clojure.core", "require"); - require.invoke(Clojure.read("io.factorhouse.kpow.agent")); - IFn registerFn = Clojure.var("io.factorhouse.kpow.agent", "register"); - String id = (String) registerFn.invoke(agent, streams, topology); - if (id != null) { - return new StreamsAgent(id); - } else { - return null; - } - } - - public void unregister(StreamsAgent streamsAgent) { - if (streamsAgent != null) { - IFn require = Clojure.var("clojure.core", "require"); - require.invoke(Clojure.read("io.factorhouse.kpow.agent")); - IFn unregisterFn = Clojure.var("io.factorhouse.kpow.agent", "unregister"); - unregisterFn.invoke(agent, streamsAgent.getId()); - } - } - - @Override - public void close() { - IFn require = Clojure.var("clojure.core", "require"); - require.invoke(Clojure.read("io.factorhouse.kpow.agent")); - IFn closeFn = Clojure.var("io.factorhouse.kpow.agent", "close-registry"); - closeFn.invoke(agent); - } -} \ No newline at end of file diff --git a/test/io/factorhouse/agent_test.clj b/test/io/factorhouse/agent_test.clj index 9f3188a..4bf65f3 100644 --- a/test/io/factorhouse/agent_test.clj +++ b/test/io/factorhouse/agent_test.clj @@ -2,7 +2,8 @@ (:require [clojure.core.protocols :as p] [clojure.test :refer :all] [io.factorhouse.kpow.agent :as agent]) - (:import (io.factorhouse.kpow StreamsRegistry) + (:import (io.factorhouse.kpow MetricFilter StreamsRegistry) + (io.factorhouse.kpow.key ClientIdKeyStrategy) (java.util Properties) (org.apache.kafka.clients.producer Producer) (org.apache.kafka.common Metric MetricName) @@ -63,8 +64,10 @@ (deftest agent-metrics (is (= [{:value 1.0, :description "mock metric", :group "first", :name "first.metric", :tags {}} {:value 2.0, :description "mock metric", :group "first", :name "second.metric", :tags {}}] - (agent/metrics (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) - (mock-metric "second.metric" "first" "mock metric" {} 2.0)]))))) + (into [] + (map #(dissoc % :metric-name)) + (agent/metrics (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) + (mock-metric "second.metric" "first" "mock metric" {} 2.0)])))))) (deftest datafy-topo (is (= {:sub-topologies #{{:id 0, @@ -77,46 +80,125 @@ (p/datafy (.describe (test-topology)))))) (deftest agent-test - (let [records (atom []) - registry (agent/init-registry (mock-producer records)) - agent (agent/register registry - (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) - (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") - (mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)]) - (test-topology))] + (let [records (atom []) + metrics-filter (-> (MetricFilter.) (.accept)) + registry (agent/init-registry (mock-producer records) metrics-filter) + agent-id (agent/register registry + (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) + (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") + (mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)]) + (test-topology) + (ClientIdKeyStrategy.))] + + (is agent-id) + + (is (deref (:latch registry) 5000 false)) + + (let [captured (into #{} (map (fn [record] (-> record (.value) :captured))) @records)] + + (testing "consistent :captured value" + (is (= 1 (count captured)))) + + (is (= #{[[:streams "abc123" :kafka/streams-agent] + {:type :kafka/streams-agent, + :application-id "xxx", + :client-id "abc123", + :data {:topology {:sub-topologies #{{:id 0, + :nodes #{{:name "KSTREAM-SOURCE-0000000000", + :predecessors #{}, + :successors #{}, + :topic-set #{"__oprtr_snapshot_state"}, + :topic-pattern nil}}}}, + :global-stores #{}}, + :state "RUNNING"}, + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}] + [[:streams "abc123" :kafka/streams-agent] + {:type :observation/plan + :application-id "xxx", + :client-id "abc123", + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]} + :data {:type :observe/streams-agent + :agent {:captured (first captured) + :metrics-summary {:id "custom" + :sent 2 + :total 3} + :id agent-id + :version "1.0.0"}}}] + [[:streams "abc123" :kafka/streams-agent] + {:type :kafka/streams-agent-metrics, + :application-id "xxx", + :client-id "abc123", + :data [{:name "first.metric", :tags {}, :value 1.0} + {:name "second.metric", :tags {"client-id" "abc123"}, :value 2.0}], + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}]} + (into #{} (map (fn [record] + [(.key record) (dissoc (.value record) :job/id :captured)])) + @records)))) + + (is (agent/unregister registry agent)) + + (is (empty? (agent/close-registry registry))))) + +(deftest agent-test-metric-filters + (let [records (atom []) + metrics-filter (-> (MetricFilter.) + (.denyNameStartsWith "second") + (.acceptNameStartsWith "first") + (.acceptNameStartsWith "rocksdb") + (.deny)) + registry (agent/init-registry (mock-producer records) metrics-filter) + agent-id (agent/register registry + (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) + (mock-metric "rocksdb.foo" "first" "mock metric" {"client-id" "abc123"} 3.0) + (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") + (mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)]) + (test-topology) + (ClientIdKeyStrategy.))] (is agent) (is (deref (:latch registry) 5000 false)) - (is (= #{[[:streams "abc123" :kafka/streams-agent] - {:type :kafka/streams-agent, - :application-id "xxx", - :client-id "abc123", - :data {:topology {:sub-topologies #{{:id 0, - :nodes #{{:name "KSTREAM-SOURCE-0000000000", - :predecessors #{}, - :successors #{}, - :topic-set #{"__oprtr_snapshot_state"}, - :topic-pattern nil}}}}, - :global-stores #{}}, - :state "RUNNING"}, - :snapshot/id {:domain :streams, :id "abc123"}}] - [[:streams "abc123" :kafka/streams-agent] - {:type :observation/plan, :snapshot/id {:domain :streams, :id "abc123"}, :data {:type :observe/streams-agent}}] - [[:streams "abc123" :kafka/streams-agent] - {:type :kafka/streams-agent-metrics, - :application-id "xxx", - :client-id "abc123", - :data [{:name "first.metric", :tags {}, :value 1.0} - {:name "second.metric", :tags {"client-id" "abc123"}, :value 2.0}], - :snapshot/id {:domain :streams, :id "abc123"}}]} - (into #{} (map (fn [record] - [(.key record) (dissoc (.value record) :job/id :captured)])) - @records))) - - (testing "consistent :captured value" - (is (= 1 (count (into #{} (map (fn [record] (-> record (.value) :captured))) @records))))) + (let [captured (into #{} (map (fn [record] (-> record (.value) :captured))) @records)] + + (testing "consistent :captured value" + (is (= 1 (count captured)))) + + (is (= #{[[:streams "abc123" :kafka/streams-agent] + {:type :kafka/streams-agent, + :application-id "xxx", + :client-id "abc123", + :data {:topology {:sub-topologies #{{:id 0, + :nodes #{{:name "KSTREAM-SOURCE-0000000000", + :predecessors #{}, + :successors #{}, + :topic-set #{"__oprtr_snapshot_state"}, + :topic-pattern nil}}}}, + :global-stores #{}}, + :state "RUNNING"}, + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}] + [[:streams "abc123" :kafka/streams-agent] + {:type :observation/plan + :application-id "xxx", + :client-id "abc123", + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]} + :data {:type :observe/streams-agent + :agent {:captured (first captured) + :metrics-summary {:id "custom" + :sent 2 + :total 4} + :id agent-id + :version "1.0.0"}}}] + [[:streams "abc123" :kafka/streams-agent] + {:type :kafka/streams-agent-metrics, + :application-id "xxx", + :client-id "abc123", + :data [{:name "first.metric", :tags {}, :value 1.0} + {:name "rocksdb.foo", :tags {"client-id" "abc123"}, :value 3.0}], + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}]} + (into #{} (map (fn [record] + [(.key record) (dissoc (.value record) :job/id :captured)])) + @records)))) (is (agent/unregister registry agent))