From 169d88f2ee33990b8ecf8ca047d17b08784272dc Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Tue, 12 Nov 2024 11:33:03 +1100 Subject: [PATCH 01/29] Add io.factorhouse.kpow.MetricsFilter class --- src/clojure/io/factorhouse/kpow/agent.clj | 80 +++++++++++-------- .../io/factorhouse/kpow/MetricsFilter.java | 40 ++++++++++ .../io/factorhouse/kpow/StreamsRegistry.java | 9 ++- src/java/io/operatr/kpow/StreamsRegistry.java | 10 ++- test/io/factorhouse/agent_test.clj | 23 +++--- 5 files changed, 114 insertions(+), 48 deletions(-) create mode 100644 src/java/io/factorhouse/kpow/MetricsFilter.java diff --git a/src/clojure/io/factorhouse/kpow/agent.clj b/src/clojure/io/factorhouse/kpow/agent.clj index 814ee0f..dd7bd8d 100644 --- a/src/clojure/io/factorhouse/kpow/agent.clj +++ b/src/clojure/io/factorhouse/kpow/agent.clj @@ -4,10 +4,13 @@ [clojure.core.protocols :as p]) (:import (java.util UUID) (java.util.concurrent Executors TimeUnit ThreadFactory) + (java.util.function Predicate) (org.apache.kafka.clients.producer Producer ProducerRecord) + (org.apache.kafka.common MetricName) (org.apache.kafka.streams Topology KeyValue TopologyDescription TopologyDescription$Subtopology TopologyDescription$GlobalStore TopologyDescription$Node TopologyDescription$Source TopologyDescription$Processor TopologyDescription$Sink) + (io.factorhouse.kpow MetricsFilter) (java.util.concurrent.atomic AtomicInteger))) (def kpow-snapshot-topic @@ -52,12 +55,13 @@ (defn metrics [streams] (into [] (map (fn [[_ metric]] - (let [metric-name (.metricName metric)] + (let [metric-name ^MetricName (.metricName metric)] {:value (.metricValue metric) :description (.description metric-name) :group (.group metric-name) :name (.name metric-name) - :tags (into {} (.tags metric-name))}))) + :tags (into {} (.tags metric-name)) + :metric-name metric-name}))) (.metrics streams))) (defn application-id @@ -78,14 +82,22 @@ (first)))) (defn numeric-metrics - [metrics] - (->> metrics - (filter (comp number? :value)) - (remove (fn [{:keys [value]}] - (if (double? value) - (Double/isNaN value) - false))) - (map #(select-keys % [:name :tags :value])))) + [metrics ^MetricsFilter metrics-filter] + (let [filters (.getFilters metrics-filter) + filter-fn (if (seq filters) + (fn [{:keys [metric-name]}] + (some (fn [^Predicate predicate] + (.test predicate ^MetricName metric-name)) + filters)) + (constantly identity))] + (into [] (comp (filter (comp number? :value)) + (remove (fn [{:keys [value]}] + (if (double? value) + (Double/isNaN value) + false))) + (map #(select-keys % [:name :tags :value])) + (filter filter-fn)) + metrics))) (defn snapshot-send [{:keys [snapshot-topic ^Producer producer snapshot-id application-id job-id client-id captured]} data] @@ -104,30 +116,30 @@ [{:keys [snapshot-topic producer snapshot-id application-id job-id client-id captured]} metrics] (let [taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent]] (doseq [data (partition-all 50 metrics)] - (let [value {:type :kafka/streams-agent-metrics - :application-id application-id - :client-id client-id - :captured captured - :data (vec data) - :job/id job-id - :snapshot/id snapshot-id} - record (ProducerRecord. (:topic snapshot-topic) taxon value)] + (let [value {:type :kafka/streams-agent-metrics + :application-id application-id + :client-id client-id + :captured captured + :data (vec data) + :job/id job-id + :snapshot/id snapshot-id} + record (ProducerRecord. (:topic snapshot-topic) taxon value)] (.get (.send producer record)))) (log/infof "Kpow: sent [%s] streams metrics for application.id %s" (count metrics) application-id))) (defn plan-send [{:keys [snapshot-topic producer snapshot-id job-id captured]}] - (let [taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent] - plan {:type :observation/plan - :captured captured - :snapshot/id snapshot-id - :job/id job-id - :data {:type :observe/streams-agent}} - record (ProducerRecord. (:topic snapshot-topic) taxon plan)] + (let [taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent] + plan {:type :observation/plan + :captured captured + :snapshot/id snapshot-id + :job/id job-id + :data {:type :observe/streams-agent}} + record (ProducerRecord. (:topic snapshot-topic) taxon plan)] (.get (.send producer record)))) (defn snapshot-telemetry - [{:keys [streams ^Topology topology] :as ctx}] + [{:keys [streams ^Topology topology metrics-filter] :as ctx}] (let [metrics (metrics streams)] (if (empty? metrics) (log/warn "KafkStreams .metrics() method returned an empty collection, no telemetry was sent. Has something mutated the global metrics registry?") @@ -149,17 +161,17 @@ (client-id-tag metrics) application-id)))) (snapshot-send ctx snapshot) - (metrics-send ctx (numeric-metrics metrics)) + (metrics-send ctx (numeric-metrics metrics metrics-filter)) ctx)))) (defn snapshot-task - ^Runnable [snapshot-topic producer registered-topologies latch] + ^Runnable [snapshot-topic producer registered-topologies metrics-filter latch] (fn [] (let [job-id (str (UUID/randomUUID)) ctx {:job-id job-id :snapshot-topic snapshot-topic - :producer producer}] - + :producer producer + :metrics-filter metrics-filter}] (doseq [[id [streams topology]] @registered-topologies] (try (when-let [next-ctx (snapshot-telemetry (assoc ctx :streams streams :topology topology))] (Thread/sleep 2000) @@ -177,7 +189,7 @@ (.setName (format "kpow-streams-agent-%d" (.getAndIncrement n)))))))) (defn start-registry - [{:keys [snapshot-topic producer]}] + [{:keys [snapshot-topic producer metrics-filter]}] (log/info "Kpow: starting registry") (let [registered-topologies (atom {}) pool (Executors/newSingleThreadScheduledExecutor thread-factory) @@ -186,7 +198,7 @@ (swap! registered-topologies assoc id [streams topology]) id)) latch (promise) - task (snapshot-task snapshot-topic producer registered-topologies latch) + task (snapshot-task snapshot-topic producer registered-topologies metrics-filter latch) scheduled-future (.scheduleWithFixedDelay pool task 500 60000 TimeUnit/MILLISECONDS)] {:register register-fn :pool pool @@ -205,8 +217,8 @@ {}) (defn init-registry - [producer] - (start-registry {:snapshot-topic kpow-snapshot-topic :producer producer})) + [producer metrics-filter] + (start-registry {:snapshot-topic kpow-snapshot-topic :producer producer :metrics-filter metrics-filter})) (defn register [agent streams topology] diff --git a/src/java/io/factorhouse/kpow/MetricsFilter.java b/src/java/io/factorhouse/kpow/MetricsFilter.java new file mode 100644 index 0000000..9660d3d --- /dev/null +++ b/src/java/io/factorhouse/kpow/MetricsFilter.java @@ -0,0 +1,40 @@ +package io.factorhouse.kpow; + +import org.apache.kafka.common.MetricName; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Predicate; + +public class MetricsFilter { + private final List> filters; + + private MetricsFilter(List> filters) { + this.filters = new ArrayList<>(filters); + } + + public List> getFilters() { + return Collections.unmodifiableList(filters); + } + + public static Builder defaultMetricsFilter() { + return new Builder().addFilter(metricName -> "streams.state".equals(metricName.name())); + } + + public static Builder emptyMetricsFilter() { + return new Builder(); + } + + public static class Builder { + private final List> filters = new ArrayList<>(); + + public Builder addFilter(Predicate filter) { + filters.add(filter); + return this; + } + + public MetricsFilter build() { + return new MetricsFilter(filters); + } + } +} \ No newline at end of file diff --git a/src/java/io/factorhouse/kpow/StreamsRegistry.java b/src/java/io/factorhouse/kpow/StreamsRegistry.java index c1c6b34..6d59b8e 100644 --- a/src/java/io/factorhouse/kpow/StreamsRegistry.java +++ b/src/java/io/factorhouse/kpow/StreamsRegistry.java @@ -3,6 +3,7 @@ import clojure.java.api.Clojure; import clojure.lang.IFn; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.Metric; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; @@ -73,7 +74,7 @@ public static Properties filterProperties(Properties props) { return nextProps; } - public StreamsRegistry(Properties props) { + public StreamsRegistry(Properties props, MetricsFilter metricsFilter) { IFn require = Clojure.var("clojure.core", "require"); require.invoke(Clojure.read("io.factorhouse.kpow.agent")); IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry"); @@ -83,7 +84,11 @@ public StreamsRegistry(Properties props) { Serializer valSerializer = (Serializer) serdesFn.invoke(); Properties producerProps = filterProperties(props); KafkaProducer producer = new KafkaProducer<>(producerProps, keySerializer, valSerializer); - agent = agentFn.invoke(producer); + agent = agentFn.invoke(producer, metricsFilter); + } + + public StreamsRegistry(Properties props) { + this(props, MetricsFilter.defaultMetricsFilter().build()); } public StreamsAgent register(KafkaStreams streams, Topology topology) { diff --git a/src/java/io/operatr/kpow/StreamsRegistry.java b/src/java/io/operatr/kpow/StreamsRegistry.java index 9e372b6..fd71562 100644 --- a/src/java/io/operatr/kpow/StreamsRegistry.java +++ b/src/java/io/operatr/kpow/StreamsRegistry.java @@ -2,7 +2,9 @@ import clojure.java.api.Clojure; import clojure.lang.IFn; +import io.factorhouse.kpow.MetricsFilter; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.Metric; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; @@ -73,7 +75,7 @@ public static Properties filterProperties(Properties props) { return nextProps; } - public StreamsRegistry(Properties props) { + public StreamsRegistry(Properties props, MetricsFilter metricsFilter) { IFn require = Clojure.var("clojure.core", "require"); require.invoke(Clojure.read("io.factorhouse.kpow.agent")); IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry"); @@ -83,7 +85,11 @@ public StreamsRegistry(Properties props) { Serializer valSerializer = (Serializer) serdesFn.invoke(); Properties producerProps = filterProperties(props); KafkaProducer producer = new KafkaProducer<>(producerProps, keySerializer, valSerializer); - agent = agentFn.invoke(producer); + agent = agentFn.invoke(producer, metricsFilter); + } + + public StreamsRegistry(Properties props) { + this(props, MetricsFilter.defaultMetricsFilter().build()); } public StreamsAgent register(KafkaStreams streams, Topology topology) { diff --git a/test/io/factorhouse/agent_test.clj b/test/io/factorhouse/agent_test.clj index 9f3188a..52e8b16 100644 --- a/test/io/factorhouse/agent_test.clj +++ b/test/io/factorhouse/agent_test.clj @@ -2,7 +2,7 @@ (:require [clojure.core.protocols :as p] [clojure.test :refer :all] [io.factorhouse.kpow.agent :as agent]) - (:import (io.factorhouse.kpow StreamsRegistry) + (:import (io.factorhouse.kpow MetricsFilter StreamsRegistry) (java.util Properties) (org.apache.kafka.clients.producer Producer) (org.apache.kafka.common Metric MetricName) @@ -63,8 +63,10 @@ (deftest agent-metrics (is (= [{:value 1.0, :description "mock metric", :group "first", :name "first.metric", :tags {}} {:value 2.0, :description "mock metric", :group "first", :name "second.metric", :tags {}}] - (agent/metrics (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) - (mock-metric "second.metric" "first" "mock metric" {} 2.0)]))))) + (into [] + (map #(dissoc % :metric-name)) + (agent/metrics (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) + (mock-metric "second.metric" "first" "mock metric" {} 2.0)])))))) (deftest datafy-topo (is (= {:sub-topologies #{{:id 0, @@ -77,13 +79,14 @@ (p/datafy (.describe (test-topology)))))) (deftest agent-test - (let [records (atom []) - registry (agent/init-registry (mock-producer records)) - agent (agent/register registry - (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) - (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") - (mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)]) - (test-topology))] + (let [records (atom []) + metrics-filter (.build (MetricsFilter/emptyMetricsFilter)) + registry (agent/init-registry (mock-producer records) metrics-filter) + agent (agent/register registry + (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) + (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") + (mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)]) + (test-topology))] (is agent) From 15b57e14a6b32c055336dc2a46970350046d6788 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Tue, 12 Nov 2024 17:16:37 +1100 Subject: [PATCH 02/29] Add allow/deny predicates to MetricFilter --- src/clojure/io/factorhouse/kpow/agent.clj | 33 +++++--- .../io/factorhouse/kpow/MetricFilter.java | 83 +++++++++++++++++++ .../io/factorhouse/kpow/MetricsFilter.java | 40 --------- .../io/factorhouse/kpow/StreamsRegistry.java | 11 ++- src/java/io/operatr/kpow/StreamsRegistry.java | 13 ++- test/io/factorhouse/agent_test.clj | 4 +- 6 files changed, 123 insertions(+), 61 deletions(-) create mode 100644 src/java/io/factorhouse/kpow/MetricFilter.java delete mode 100644 src/java/io/factorhouse/kpow/MetricsFilter.java diff --git a/src/clojure/io/factorhouse/kpow/agent.clj b/src/clojure/io/factorhouse/kpow/agent.clj index dd7bd8d..a4a725c 100644 --- a/src/clojure/io/factorhouse/kpow/agent.clj +++ b/src/clojure/io/factorhouse/kpow/agent.clj @@ -4,13 +4,12 @@ [clojure.core.protocols :as p]) (:import (java.util UUID) (java.util.concurrent Executors TimeUnit ThreadFactory) - (java.util.function Predicate) (org.apache.kafka.clients.producer Producer ProducerRecord) (org.apache.kafka.common MetricName) (org.apache.kafka.streams Topology KeyValue TopologyDescription TopologyDescription$Subtopology TopologyDescription$GlobalStore TopologyDescription$Node TopologyDescription$Source TopologyDescription$Processor TopologyDescription$Sink) - (io.factorhouse.kpow MetricsFilter) + (io.factorhouse.kpow MetricFilter MetricFilter$FilterCriteria) (java.util.concurrent.atomic AtomicInteger))) (def kpow-snapshot-topic @@ -81,22 +80,32 @@ (str/split #"-StreamThread-") (first)))) +(defn apply-metric-filters + [^MetricName metric-name filters] + (reduce + (fn [acc ^MetricFilter$FilterCriteria filter-criteria] + (let [metric-filter-type (.getFilterType filter-criteria) + predicate (.getPredicate filter-criteria)] + (if (.test predicate metric-name) + (reduced + (case (.name metric-filter-type) + "ACCEPT" true + "DENY" false)) + acc))) + nil + filters)) + (defn numeric-metrics - [metrics ^MetricsFilter metrics-filter] - (let [filters (.getFilters metrics-filter) - filter-fn (if (seq filters) - (fn [{:keys [metric-name]}] - (some (fn [^Predicate predicate] - (.test predicate ^MetricName metric-name)) - filters)) - (constantly identity))] + [metrics ^MetricFilter metrics-filter] + (let [filters (.getFilters metrics-filter)] (into [] (comp (filter (comp number? :value)) (remove (fn [{:keys [value]}] (if (double? value) (Double/isNaN value) false))) - (map #(select-keys % [:name :tags :value])) - (filter filter-fn)) + (filter (fn [{:keys [metric-name]}] + (apply-metric-filters metric-name filters))) + (map #(select-keys % [:name :tags :value]))) metrics))) (defn snapshot-send diff --git a/src/java/io/factorhouse/kpow/MetricFilter.java b/src/java/io/factorhouse/kpow/MetricFilter.java new file mode 100644 index 0000000..58cefc2 --- /dev/null +++ b/src/java/io/factorhouse/kpow/MetricFilter.java @@ -0,0 +1,83 @@ +package io.factorhouse.kpow; + +import org.apache.kafka.common.MetricName; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Predicate; + +public class MetricFilter { + public enum FilterType { + ACCEPT, DENY + } + + public static class FilterCriteria { + private final Predicate predicate; + private final FilterType filterType; + + // Constructor to initialize both fields + private FilterCriteria(Predicate predicate, FilterType filterType) { + this.predicate = predicate; + this.filterType = filterType; + } + + public Predicate getPredicate() { + return predicate; + } + + public FilterType getFilterType() { + return filterType; + } + } + + private final List filters; + + public MetricFilter() { + this.filters = new ArrayList<>(); + } + + public List getFilters() { + return Collections.unmodifiableList(filters); + } + + public MetricFilter accept() { + Predicate acceptPredicate = (_filter) -> { return true; }; + FilterCriteria criteria = new FilterCriteria(acceptPredicate, FilterType.ACCEPT); + this.filters.add(criteria); + return this; + } + + public MetricFilter accept(Predicate acceptFilter) { + FilterCriteria criteria = new FilterCriteria(acceptFilter, FilterType.ACCEPT); + this.filters.add(criteria); + return this; + } + + public MetricFilter deny() { + Predicate denyFilter = (_filter) -> { return true; }; + FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY); + this.filters.add(criteria); + return this; + } + + public MetricFilter deny(Predicate denyFilter) { + FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY); + this.filters.add(criteria); + return this; + } + + public MetricFilter acceptNameStartsWith(String prefix) { + Predicate acceptFilter = (metricName) -> { return metricName.name().startsWith(prefix); }; + FilterCriteria criteria = new FilterCriteria(acceptFilter, FilterType.ACCEPT); + this.filters.add(criteria); + return this; + } + + public MetricFilter denyNameStartsWith(String prefix) { + Predicate denyFilter = (metricName) -> { return metricName.name().startsWith(prefix); }; + FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY); + this.filters.add(criteria); + return this; + } +} \ No newline at end of file diff --git a/src/java/io/factorhouse/kpow/MetricsFilter.java b/src/java/io/factorhouse/kpow/MetricsFilter.java deleted file mode 100644 index 9660d3d..0000000 --- a/src/java/io/factorhouse/kpow/MetricsFilter.java +++ /dev/null @@ -1,40 +0,0 @@ -package io.factorhouse.kpow; - -import org.apache.kafka.common.MetricName; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.function.Predicate; - -public class MetricsFilter { - private final List> filters; - - private MetricsFilter(List> filters) { - this.filters = new ArrayList<>(filters); - } - - public List> getFilters() { - return Collections.unmodifiableList(filters); - } - - public static Builder defaultMetricsFilter() { - return new Builder().addFilter(metricName -> "streams.state".equals(metricName.name())); - } - - public static Builder emptyMetricsFilter() { - return new Builder(); - } - - public static class Builder { - private final List> filters = new ArrayList<>(); - - public Builder addFilter(Predicate filter) { - filters.add(filter); - return this; - } - - public MetricsFilter build() { - return new MetricsFilter(filters); - } - } -} \ No newline at end of file diff --git a/src/java/io/factorhouse/kpow/StreamsRegistry.java b/src/java/io/factorhouse/kpow/StreamsRegistry.java index 6d59b8e..9a910e4 100644 --- a/src/java/io/factorhouse/kpow/StreamsRegistry.java +++ b/src/java/io/factorhouse/kpow/StreamsRegistry.java @@ -3,7 +3,6 @@ import clojure.java.api.Clojure; import clojure.lang.IFn; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.Metric; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; @@ -74,7 +73,7 @@ public static Properties filterProperties(Properties props) { return nextProps; } - public StreamsRegistry(Properties props, MetricsFilter metricsFilter) { + public StreamsRegistry(Properties props, MetricFilter metricsFilter) { IFn require = Clojure.var("clojure.core", "require"); require.invoke(Clojure.read("io.factorhouse.kpow.agent")); IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry"); @@ -87,8 +86,14 @@ public StreamsRegistry(Properties props, MetricsFilter metricsFilter) { agent = agentFn.invoke(producer, metricsFilter); } + public static MetricFilter defaultMetricFilter() { + return new MetricFilter() + .acceptNameStartsWith("foo") + .deny(); + } + public StreamsRegistry(Properties props) { - this(props, MetricsFilter.defaultMetricsFilter().build()); + this(props, StreamsRegistry.defaultMetricFilter()); } public StreamsAgent register(KafkaStreams streams, Topology topology) { diff --git a/src/java/io/operatr/kpow/StreamsRegistry.java b/src/java/io/operatr/kpow/StreamsRegistry.java index fd71562..cb9b75f 100644 --- a/src/java/io/operatr/kpow/StreamsRegistry.java +++ b/src/java/io/operatr/kpow/StreamsRegistry.java @@ -2,9 +2,8 @@ import clojure.java.api.Clojure; import clojure.lang.IFn; -import io.factorhouse.kpow.MetricsFilter; +import io.factorhouse.kpow.MetricFilter; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.Metric; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; @@ -75,7 +74,7 @@ public static Properties filterProperties(Properties props) { return nextProps; } - public StreamsRegistry(Properties props, MetricsFilter metricsFilter) { + public StreamsRegistry(Properties props, MetricFilter metricsFilter) { IFn require = Clojure.var("clojure.core", "require"); require.invoke(Clojure.read("io.factorhouse.kpow.agent")); IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry"); @@ -88,8 +87,14 @@ public StreamsRegistry(Properties props, MetricsFilter metricsFilter) { agent = agentFn.invoke(producer, metricsFilter); } + public static MetricFilter defaultMetricFilter() { + return new MetricFilter() + .acceptNameStartsWith("foo") + .deny(); + } + public StreamsRegistry(Properties props) { - this(props, MetricsFilter.defaultMetricsFilter().build()); + this(props, StreamsRegistry.defaultMetricFilter()); } public StreamsAgent register(KafkaStreams streams, Topology topology) { diff --git a/test/io/factorhouse/agent_test.clj b/test/io/factorhouse/agent_test.clj index 52e8b16..25b4fdc 100644 --- a/test/io/factorhouse/agent_test.clj +++ b/test/io/factorhouse/agent_test.clj @@ -2,7 +2,7 @@ (:require [clojure.core.protocols :as p] [clojure.test :refer :all] [io.factorhouse.kpow.agent :as agent]) - (:import (io.factorhouse.kpow MetricsFilter StreamsRegistry) + (:import (io.factorhouse.kpow MetricFilter StreamsRegistry) (java.util Properties) (org.apache.kafka.clients.producer Producer) (org.apache.kafka.common Metric MetricName) @@ -80,7 +80,7 @@ (deftest agent-test (let [records (atom []) - metrics-filter (.build (MetricsFilter/emptyMetricsFilter)) + metrics-filter (-> (MetricFilter.) (.accept)) registry (agent/init-registry (mock-producer records) metrics-filter) agent (agent/register registry (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) From 6aa9f9038e9de065e59f28075230d352e192dae0 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Tue, 12 Nov 2024 17:34:06 +1100 Subject: [PATCH 03/29] Document the MetricFilter class --- README.md | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 53f786e..55869a0 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,39 @@ The StreamsRegistry is a *single-threaded process* that performs these actions * The StreamsRegistry **does not talk directly to Kpow**. Kpow reads streams data from the snapshot topic. -# Configuration +# Metric filters + +You can configure each streams registry with metric filters, which give you greater control over which metrics Kpow's streams agent will export. + +Metric filters can be chained and added programmatically: + +```java +import io.factorhouse.kpow.StreamsRegistry; +import io.factorhouse.kpow.MetricFilter; + +MetricFilter metricFilter = MetricFilter().deny(); // don't send any streams metrics, just send through the Streams Topology + +// .. + +StreamsRegistry registry = new StreamsRegistry(props, metricFilter); +``` + +If you pass no metric filters to the `StreamsRegistry` constructor then the default metric filter will be used. The default metric filter will **accept** all metrics to be exported. + +### Metric filter usage + +Kpow's streams agent metric filters work very similar to Micrometer's [meter filters](https://github.com/micrometer-metrics/micrometer-docs/blob/main/src/docs/concepts/meter-filters.adoc). + +Metric filters can either `ACCEPT` or `DENY` a metric. The filter itself is a Java predicate which takes in the [org.apache.common.MetricName](https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/MetricName.html#group()) class. This allows you to filter metrics by name, tags or group. + +Metric filters are applied sequentially in the order they are configured in the registry. This allows for stacking of deny and accept filters to create more complex rules: + +```java +MetricFilter metricFilter = MetricFilter().acceptNameStartsWith("rocksdb").deny(); +``` +The above example allows all rocksdb related metrics through and denies all other types of streams metrics. + +# Kafka connection The `StreamsRegistry` `Properties` contains configuration to create the snapshot producer. From ca8cb950ab7b20e5985a5cb2d5acd149bb78baaa Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Tue, 12 Nov 2024 19:04:29 +1100 Subject: [PATCH 04/29] Document key strategy --- README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/README.md b/README.md index 55869a0..758239b 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,22 @@ Producer configuration means any of the following fields: For more details visit the [Producer](https://kafka.apache.org/documentation/#producerconfigs) section of the Apache Kafka documentation. +### Key strategy + +The keying strategy for data sent from Kpow's streams agent to its internal Kafka topic is configurable. The key strategy plays an important role in enabling Kpow to align stream metrics with the UI accurately. There are many key strategies available depending on your organisation's deployment. + +#### Cluster ID (default in 1.0.0+, recommended) + +The default key strategy uses the cluster ID, obtained via an AdminClient [describeClusters](https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/admin/DescribeClusterResult.html) call. This AdminClient is created once during registry initialization and then closed. If you prefer not to have the streams registry create an AdminClient—either because your Kafka variant does not provide a cluster ID or due to security considerations—you may select an alternative key strategy from the options below. + +#### Client ID (default in 0.2.0 and below) + +This key strategy relies on the client ID and application ID from the active KafkaStreams instance, eliminating the need for an AdminClient. However, in a multi-cluster Kpow deployment where the same application ID is used across multiple environments (e.g., staging, dev, prod), Kpow cannot determine which cluster the Kafka Streams instance is associated with. + +#### Environment name (manual) + +If you have set a UI-friendly cluster name using the `ENVIRONMENT_NAME` environment variable in Kpow, you can use this environment name as the keying strategy for the streams agent. + ### Minimum Required ACLs If you secure your Kafka Cluster with ACLs, the user provided in the Producer configuration must have permission to write to the internal Kpow topic. From dd4dc7b351e941a424d079a3282ef64cafe2897b Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Wed, 13 Nov 2024 10:58:08 +1100 Subject: [PATCH 05/29] Implement KeyStrategy interface --- README.md | 12 +++-- project.clj | 1 + src/clojure/io/factorhouse/kpow/agent.clj | 53 ++++++++++++------- .../io/factorhouse/kpow/StreamsRegistry.java | 15 +++--- .../key_strategies/ClientIDKeyStrategy.java | 10 ++++ .../key_strategies/ClusterIDKeyStrategy.java | 22 ++++++++ .../kpow/key_strategies/KeyStrategy.java | 5 ++ .../key_strategies/ManualKeyStrategy.java | 14 +++++ .../kpow/key_strategies/Taxon.java | 32 +++++++++++ test/io/factorhouse/agent_test.clj | 8 +-- 10 files changed, 138 insertions(+), 34 deletions(-) create mode 100644 src/java/io/factorhouse/kpow/key_strategies/ClientIDKeyStrategy.java create mode 100644 src/java/io/factorhouse/kpow/key_strategies/ClusterIDKeyStrategy.java create mode 100644 src/java/io/factorhouse/kpow/key_strategies/KeyStrategy.java create mode 100644 src/java/io/factorhouse/kpow/key_strategies/ManualKeyStrategy.java create mode 100644 src/java/io/factorhouse/kpow/key_strategies/Taxon.java diff --git a/README.md b/README.md index 758239b..5149e13 100644 --- a/README.md +++ b/README.md @@ -55,21 +55,25 @@ In your application, just before you start your KafkaStreams instance: ```java import io.factorhouse.kpow.StreamsRegistry; +import io.factorhouse.kpow.key_strategies.ClusterIDKeyStrategy; // Your Kafka Streams topology -Topology topology = createMyTopology(); +Topology topology = createMyTopology(); // Your Kafka Streams config Properties props = new createMyStreamProperties(); - + // Your Kafka Streams instance -KafkaStreams streams = new KafkaStreams(topology, props); +KafkaStreams streams = new KafkaStreams(topology, props); // Create a Kpow StreamsRegistry StreamsRegistry registry = new StreamsRegistry(props); +// Specify the key strategy when writing metrics to the internal Kafka topic +KeyStrategy keyStrategy = new ClusterIDKeyStrategy(props); + // Register your KafkaStreams and Topology instances with the StreamsRegistry -registry.register(streams, topology); +registry.register(streams, topology, keyStrategy); // Start your Kafka Streams application streams.start(); diff --git a/project.clj b/project.clj index 8424b7d..0b78053 100644 --- a/project.clj +++ b/project.clj @@ -24,6 +24,7 @@ :dependencies [[org.clojure/clojure "1.11.3"] [com.cognitect/transit-clj "1.0.333"] [org.clojure/tools.logging "1.3.0"] + [org.apache.kafka/kafka-clients "3.6.1" :scope "provided"] [org.apache.kafka/kafka-streams "3.6.1" :scope "provided"]] :uberjar {:prep-tasks ["clean" "javac" "compile"] :aot :all} diff --git a/src/clojure/io/factorhouse/kpow/agent.clj b/src/clojure/io/factorhouse/kpow/agent.clj index a4a725c..df03d8e 100644 --- a/src/clojure/io/factorhouse/kpow/agent.clj +++ b/src/clojure/io/factorhouse/kpow/agent.clj @@ -2,7 +2,8 @@ (:require [clojure.string :as str] [clojure.tools.logging :as log] [clojure.core.protocols :as p]) - (:import (java.util UUID) + (:import (io.factorhouse.kpow.key_strategies KeyStrategy Taxon) + (java.util UUID) (java.util.concurrent Executors TimeUnit ThreadFactory) (org.apache.kafka.clients.producer Producer ProducerRecord) (org.apache.kafka.common MetricName) @@ -16,6 +17,15 @@ {:topic "__oprtr_snapshot_state"}) (extend-protocol p/Datafiable + Taxon + (datafy [v] + (into [] + (filter identity) + [(keyword (.getDomain v)) + (.getId v) + (keyword "kafka" (.getObject v)) + (.getObjectId v)])) + KeyValue (datafy [kv] {:key (.key kv) @@ -109,21 +119,21 @@ metrics))) (defn snapshot-send - [{:keys [snapshot-topic ^Producer producer snapshot-id application-id job-id client-id captured]} data] - (let [snapshot {:type :kafka/streams-agent + [{:keys [snapshot-topic ^Producer producer taxon application-id job-id client-id captured]} data] + (let [taxon (p/datafy taxon) + snapshot {:type :kafka/streams-agent :application-id application-id :client-id client-id :captured captured :data data :job/id job-id - :snapshot/id snapshot-id} - taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent] + :snapshot/id {:domain (first taxon) :id (second taxon)}} record (ProducerRecord. (:topic snapshot-topic) taxon snapshot)] (.get (.send producer record)))) (defn metrics-send - [{:keys [snapshot-topic producer snapshot-id application-id job-id client-id captured]} metrics] - (let [taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent]] + [{:keys [snapshot-topic producer taxon application-id job-id client-id captured]} metrics] + (let [taxon (p/datafy taxon)] (doseq [data (partition-all 50 metrics)] (let [value {:type :kafka/streams-agent-metrics :application-id application-id @@ -131,24 +141,24 @@ :captured captured :data (vec data) :job/id job-id - :snapshot/id snapshot-id} + :snapshot/id {:domain (first taxon) :id (second taxon)}} record (ProducerRecord. (:topic snapshot-topic) taxon value)] (.get (.send producer record)))) (log/infof "Kpow: sent [%s] streams metrics for application.id %s" (count metrics) application-id))) (defn plan-send - [{:keys [snapshot-topic producer snapshot-id job-id captured]}] - (let [taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent] + [{:keys [snapshot-topic producer job-id captured taxon]}] + (let [taxon (p/datafy taxon) plan {:type :observation/plan :captured captured - :snapshot/id snapshot-id + :snapshot/id {:domain (first taxon) :id (second taxon)} :job/id job-id :data {:type :observe/streams-agent}} record (ProducerRecord. (:topic snapshot-topic) taxon plan)] (.get (.send producer record)))) (defn snapshot-telemetry - [{:keys [streams ^Topology topology metrics-filter] :as ctx}] + [{:keys [streams ^Topology topology metrics-filter ^KeyStrategy key-strategy] :as ctx}] (let [metrics (metrics streams)] (if (empty? metrics) (log/warn "KafkStreams .metrics() method returned an empty collection, no telemetry was sent. Has something mutated the global metrics registry?") @@ -157,11 +167,12 @@ snapshot {:topology topology :state state} client-id (client-id metrics) application-id (application-id metrics) + taxon (.getTaxon key-strategy client-id application-id) ctx (assoc ctx :captured (System/currentTimeMillis) :client-id client-id :application-id application-id - :snapshot-id {:domain :streams :id client-id})] + :taxon taxon)] (when (nil? application-id) (throw (Exception. "Cannot infer application id from metrics returned from KafkaStreams instance. Expected metric \"application-id\" in the metrics registry."))) (when (nil? client-id) @@ -181,8 +192,8 @@ :snapshot-topic snapshot-topic :producer producer :metrics-filter metrics-filter}] - (doseq [[id [streams topology]] @registered-topologies] - (try (when-let [next-ctx (snapshot-telemetry (assoc ctx :streams streams :topology topology))] + (doseq [[id [streams topology key-strategy]] @registered-topologies] + (try (when-let [next-ctx (snapshot-telemetry (assoc ctx :streams streams :topology topology :key-strategy key-strategy))] (Thread/sleep 2000) (plan-send next-ctx)) (catch Throwable e @@ -202,9 +213,9 @@ (log/info "Kpow: starting registry") (let [registered-topologies (atom {}) pool (Executors/newSingleThreadScheduledExecutor thread-factory) - register-fn (fn [streams topology] + register-fn (fn [streams topology key-strategy] (let [id (str (UUID/randomUUID))] - (swap! registered-topologies assoc id [streams topology]) + (swap! registered-topologies assoc id [streams topology key-strategy]) id)) latch (promise) task (snapshot-task snapshot-topic producer registered-topologies metrics-filter latch) @@ -227,12 +238,14 @@ (defn init-registry [producer metrics-filter] - (start-registry {:snapshot-topic kpow-snapshot-topic :producer producer :metrics-filter metrics-filter})) + (start-registry {:snapshot-topic kpow-snapshot-topic + :producer producer + :metrics-filter metrics-filter})) (defn register - [agent streams topology] + [agent streams topology key-strategy] (when-let [register-fn (:register agent)] - (let [id (register-fn streams topology)] + (let [id (register-fn streams topology key-strategy)] (log/infof "Kpow: registring new streams agent %s" id) id))) diff --git a/src/java/io/factorhouse/kpow/StreamsRegistry.java b/src/java/io/factorhouse/kpow/StreamsRegistry.java index 9a910e4..38397d5 100644 --- a/src/java/io/factorhouse/kpow/StreamsRegistry.java +++ b/src/java/io/factorhouse/kpow/StreamsRegistry.java @@ -2,6 +2,7 @@ import clojure.java.api.Clojure; import clojure.lang.IFn; +import io.factorhouse.kpow.key_strategies.KeyStrategy; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; @@ -86,21 +87,21 @@ public StreamsRegistry(Properties props, MetricFilter metricsFilter) { agent = agentFn.invoke(producer, metricsFilter); } + public StreamsRegistry(Properties props) { + this(props, StreamsRegistry.defaultMetricFilter()); + } + public static MetricFilter defaultMetricFilter() { return new MetricFilter() .acceptNameStartsWith("foo") .deny(); } - public StreamsRegistry(Properties props) { - this(props, StreamsRegistry.defaultMetricFilter()); - } - - public StreamsAgent register(KafkaStreams streams, Topology topology) { + public StreamsAgent register(KafkaStreams streams, Topology topology, KeyStrategy keyStrategy) { IFn require = Clojure.var("clojure.core", "require"); require.invoke(Clojure.read("io.factorhouse.kpow.agent")); IFn registerFn = Clojure.var("io.factorhouse.kpow.agent", "register"); - String id = (String) registerFn.invoke(agent, streams, topology); + String id = (String) registerFn.invoke(agent, streams, topology, keyStrategy); if (id != null) { return new StreamsAgent(id); } else { @@ -124,4 +125,4 @@ public void close() { IFn closeFn = Clojure.var("io.factorhouse.kpow.agent", "close-registry"); closeFn.invoke(agent); } -} \ No newline at end of file +} diff --git a/src/java/io/factorhouse/kpow/key_strategies/ClientIDKeyStrategy.java b/src/java/io/factorhouse/kpow/key_strategies/ClientIDKeyStrategy.java new file mode 100644 index 0000000..24bf29f --- /dev/null +++ b/src/java/io/factorhouse/kpow/key_strategies/ClientIDKeyStrategy.java @@ -0,0 +1,10 @@ +package io.factorhouse.kpow.key_strategies; + +public class ClientIDKeyStrategy implements KeyStrategy { + public ClientIDKeyStrategy() {} + + @Override + public Taxon getTaxon(String clientId, String applicationId) { + return new Taxon("streams", clientId, "streams-agent", null); + } +} diff --git a/src/java/io/factorhouse/kpow/key_strategies/ClusterIDKeyStrategy.java b/src/java/io/factorhouse/kpow/key_strategies/ClusterIDKeyStrategy.java new file mode 100644 index 0000000..c223d1f --- /dev/null +++ b/src/java/io/factorhouse/kpow/key_strategies/ClusterIDKeyStrategy.java @@ -0,0 +1,22 @@ +package io.factorhouse.kpow.key_strategies; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class ClusterIDKeyStrategy implements KeyStrategy { + private final String clusterID; + + public ClusterIDKeyStrategy(Properties props) throws InterruptedException, ExecutionException { + try (AdminClient adminClient = AdminClient.create(props)) { + DescribeClusterResult describeClusterResult = adminClient.describeCluster(); + this.clusterID = describeClusterResult.clusterId().get(); + } + } + + @Override + public Taxon getTaxon(String clientId, String applicationId) { + return new Taxon("streams", clusterID, "streams-agent-cid", clientId); + } +} diff --git a/src/java/io/factorhouse/kpow/key_strategies/KeyStrategy.java b/src/java/io/factorhouse/kpow/key_strategies/KeyStrategy.java new file mode 100644 index 0000000..447b984 --- /dev/null +++ b/src/java/io/factorhouse/kpow/key_strategies/KeyStrategy.java @@ -0,0 +1,5 @@ +package io.factorhouse.kpow.key_strategies; + +public interface KeyStrategy { + Taxon getTaxon(String clientId, String applicationId); +} diff --git a/src/java/io/factorhouse/kpow/key_strategies/ManualKeyStrategy.java b/src/java/io/factorhouse/kpow/key_strategies/ManualKeyStrategy.java new file mode 100644 index 0000000..463ccb9 --- /dev/null +++ b/src/java/io/factorhouse/kpow/key_strategies/ManualKeyStrategy.java @@ -0,0 +1,14 @@ +package io.factorhouse.kpow.key_strategies; + +public class ManualKeyStrategy implements KeyStrategy { + private final String envName; + + public ManualKeyStrategy(String envName) { + this.envName = envName; + } + + @Override + public Taxon getTaxon(String clientId, String applicationId) { + return new Taxon("streams", envName, "streams-agent-m", clientId); + } +} diff --git a/src/java/io/factorhouse/kpow/key_strategies/Taxon.java b/src/java/io/factorhouse/kpow/key_strategies/Taxon.java new file mode 100644 index 0000000..a220a44 --- /dev/null +++ b/src/java/io/factorhouse/kpow/key_strategies/Taxon.java @@ -0,0 +1,32 @@ +package io.factorhouse.kpow.key_strategies; + +public class Taxon { + private final String domain; + private final String id; + + private final String object; + private final String objectId; + + public Taxon(String domain, String id, String object, String objectId) { + this.id = id; + this.domain = domain; + this.object = object; + this.objectId = objectId; + } + + public String getDomain() { + return domain; + } + + public String getObject() { + return object; + } + + public String getObjectId() { + return objectId; + } + + public String getId() { + return id; + } +} diff --git a/test/io/factorhouse/agent_test.clj b/test/io/factorhouse/agent_test.clj index 25b4fdc..91a3bc0 100644 --- a/test/io/factorhouse/agent_test.clj +++ b/test/io/factorhouse/agent_test.clj @@ -6,7 +6,8 @@ (java.util Properties) (org.apache.kafka.clients.producer Producer) (org.apache.kafka.common Metric MetricName) - (org.apache.kafka.streams KafkaStreams$State StreamsBuilder Topology))) + (org.apache.kafka.streams KafkaStreams$State StreamsBuilder Topology) + (io.factorhouse.kpow.key_strategies ClientIDKeyStrategy))) (defn ^Properties ->props [m] (let [props (Properties.)] @@ -86,7 +87,8 @@ (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") (mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)]) - (test-topology))] + (test-topology) + (ClientIDKeyStrategy.))] (is agent) @@ -123,4 +125,4 @@ (is (agent/unregister registry agent)) - (is (empty? (agent/close-registry registry))))) \ No newline at end of file + (is (empty? (agent/close-registry registry))))) From 4bd2d2e6b0033bf65d9f82649535a38a8b11e618 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Wed, 13 Nov 2024 11:10:17 +1100 Subject: [PATCH 06/29] Better document KeyStrategy usage --- README.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/README.md b/README.md index 5149e13..310a852 100644 --- a/README.md +++ b/README.md @@ -165,14 +165,39 @@ The keying strategy for data sent from Kpow's streams agent to its internal Kafk The default key strategy uses the cluster ID, obtained via an AdminClient [describeClusters](https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/admin/DescribeClusterResult.html) call. This AdminClient is created once during registry initialization and then closed. If you prefer not to have the streams registry create an AdminClient—either because your Kafka variant does not provide a cluster ID or due to security considerations—you may select an alternative key strategy from the options below. +```java +// Specify the key strategy when writing metrics to the internal Kafka topic +// props are java.util.Properties describing the Kafka Connection +KeyStrategy keyStrategy = new ClusterIDKeyStrategy(props); +// Register your KafkaStreams and Topology instances with the StreamsRegistry +registry.register(streams, topology, keyStrategy); +``` + #### Client ID (default in 0.2.0 and below) This key strategy relies on the client ID and application ID from the active KafkaStreams instance, eliminating the need for an AdminClient. However, in a multi-cluster Kpow deployment where the same application ID is used across multiple environments (e.g., staging, dev, prod), Kpow cannot determine which cluster the Kafka Streams instance is associated with. +```java +import io.factorhouse.kpow.StreamsRegistry; +import io.factorhouse.kpow.key_strategies.ClientIDKeyStrategy; + +KeyStrategy keyStrategy = new ClientIDKeyStrategy(); +registry.register(streams, topology, keyStrategy); +``` + #### Environment name (manual) If you have set a UI-friendly cluster name using the `ENVIRONMENT_NAME` environment variable in Kpow, you can use this environment name as the keying strategy for the streams agent. +```java +import io.factorhouse.kpow.StreamsRegistry; +import io.factorhouse.kpow.key_strategies.ManualKeyStrategy; + +// This sets a manual key of `Trade Book (Staging)`, the name of the clusters environment name in Kpow's UI. +KeyStrategy keyStrategy = new ManualKeyStrategy("Trade Book (Staging)"); +registry.register(streams, topology, keyStrategy); +``` + ### Minimum Required ACLs If you secure your Kafka Cluster with ACLs, the user provided in the Producer configuration must have permission to write to the internal Kpow topic. From 4a8708e68d3697aa72acb2b904ffec3ca91499b6 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Wed, 13 Nov 2024 15:23:36 +1100 Subject: [PATCH 07/29] Key strategy package structure --- README.md | 22 +++---- src/clojure/io/factorhouse/kpow/agent.clj | 65 +++++++++---------- .../io/factorhouse/kpow/StreamsRegistry.java | 2 +- .../ClientIdKeyStrategy.java} | 6 +- .../ClusterIdKeyStrategy.java} | 12 ++-- .../{key_strategies => key}/KeyStrategy.java | 2 +- .../ManualKeyStrategy.java | 2 +- .../kpow/{key_strategies => key}/Taxon.java | 13 ++-- test/io/factorhouse/agent_test.clj | 6 +- 9 files changed, 63 insertions(+), 67 deletions(-) rename src/java/io/factorhouse/kpow/{key_strategies/ClientIDKeyStrategy.java => key/ClientIdKeyStrategy.java} (53%) rename src/java/io/factorhouse/kpow/{key_strategies/ClusterIDKeyStrategy.java => key/ClusterIdKeyStrategy.java} (60%) rename src/java/io/factorhouse/kpow/{key_strategies => key}/KeyStrategy.java (67%) rename src/java/io/factorhouse/kpow/{key_strategies => key}/ManualKeyStrategy.java (88%) rename src/java/io/factorhouse/kpow/{key_strategies => key}/Taxon.java (63%) diff --git a/README.md b/README.md index 310a852..4259603 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ In your application, just before you start your KafkaStreams instance: ```java import io.factorhouse.kpow.StreamsRegistry; -import io.factorhouse.kpow.key_strategies.ClusterIDKeyStrategy; +import io.factorhouse.kpow.key.ClusterIdKeyStrategy; // Your Kafka Streams topology Topology topology = createMyTopology(); @@ -70,7 +70,7 @@ KafkaStreams streams = new KafkaStreams(topology, props); StreamsRegistry registry = new StreamsRegistry(props); // Specify the key strategy when writing metrics to the internal Kafka topic -KeyStrategy keyStrategy = new ClusterIDKeyStrategy(props); +KeyStrategy keyStrategy = new ClusterIdKeyStrategy(props); // Register your KafkaStreams and Topology instances with the StreamsRegistry registry.register(streams, topology, keyStrategy); @@ -161,7 +161,7 @@ For more details visit the [Producer](https://kafka.apache.org/documentation/#pr The keying strategy for data sent from Kpow's streams agent to its internal Kafka topic is configurable. The key strategy plays an important role in enabling Kpow to align stream metrics with the UI accurately. There are many key strategies available depending on your organisation's deployment. -#### Cluster ID (default in 1.0.0+, recommended) +#### Cluster ID (recommended key strategy, requires Kpow 94.1+) The default key strategy uses the cluster ID, obtained via an AdminClient [describeClusters](https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/admin/DescribeClusterResult.html) call. This AdminClient is created once during registry initialization and then closed. If you prefer not to have the streams registry create an AdminClient—either because your Kafka variant does not provide a cluster ID or due to security considerations—you may select an alternative key strategy from the options below. @@ -178,20 +178,20 @@ registry.register(streams, topology, keyStrategy); This key strategy relies on the client ID and application ID from the active KafkaStreams instance, eliminating the need for an AdminClient. However, in a multi-cluster Kpow deployment where the same application ID is used across multiple environments (e.g., staging, dev, prod), Kpow cannot determine which cluster the Kafka Streams instance is associated with. ```java -import io.factorhouse.kpow.StreamsRegistry; -import io.factorhouse.kpow.key_strategies.ClientIDKeyStrategy; -KeyStrategy keyStrategy = new ClientIDKeyStrategy(); +import io.factorhouse.kpow.key.ClientIdKeyStrategy; + +KeyStrategy keyStrategy = new ClientIdKeyStrategy(); registry.register(streams, topology, keyStrategy); ``` -#### Environment name (manual) +#### Environment name (manual, requires Kpow 94.1+) If you have set a UI-friendly cluster name using the `ENVIRONMENT_NAME` environment variable in Kpow, you can use this environment name as the keying strategy for the streams agent. ```java -import io.factorhouse.kpow.StreamsRegistry; -import io.factorhouse.kpow.key_strategies.ManualKeyStrategy; + +import io.factorhouse.kpow.key.ManualKeyStrategy; // This sets a manual key of `Trade Book (Staging)`, the name of the clusters environment name in Kpow's UI. KeyStrategy keyStrategy = new ManualKeyStrategy("Trade Book (Staging)"); @@ -224,7 +224,7 @@ Properties streamsProps = new Properties(); KafkaStreams streams = new KafkaStreams(topology, streamsProps); StreamsRegistry registry = new StreamsRegistry(streamsProps); -... +//... ``` ### Multi-Cluster Kpow @@ -237,7 +237,7 @@ KafkaStreams streams = new KafkaStreams(topology, streamsProps); Properties primaryProps = createMyPrimaryClusterProducerProperties(); StreamsRegistry registry = new StreamsRegistry(primaryProps); -... +//... ``` See the [Kpow Multi-Cluster Feature Guide](https://docs.factorhouse.io/kpow-ee/config/multi-cluster/) for more information. diff --git a/src/clojure/io/factorhouse/kpow/agent.clj b/src/clojure/io/factorhouse/kpow/agent.clj index df03d8e..10e675f 100644 --- a/src/clojure/io/factorhouse/kpow/agent.clj +++ b/src/clojure/io/factorhouse/kpow/agent.clj @@ -1,17 +1,17 @@ (ns io.factorhouse.kpow.agent - (:require [clojure.string :as str] - [clojure.tools.logging :as log] - [clojure.core.protocols :as p]) - (:import (io.factorhouse.kpow.key_strategies KeyStrategy Taxon) + (:require [clojure.core.protocols :as p] + [clojure.string :as str] + [clojure.tools.logging :as log]) + (:import (io.factorhouse.kpow MetricFilter MetricFilter$FilterCriteria) + (io.factorhouse.kpow.key KeyStrategy Taxon) (java.util UUID) - (java.util.concurrent Executors TimeUnit ThreadFactory) + (java.util.concurrent Executors ThreadFactory TimeUnit) + (java.util.concurrent.atomic AtomicInteger) (org.apache.kafka.clients.producer Producer ProducerRecord) (org.apache.kafka.common MetricName) - (org.apache.kafka.streams Topology KeyValue TopologyDescription TopologyDescription$Subtopology - TopologyDescription$GlobalStore TopologyDescription$Node TopologyDescription$Source - TopologyDescription$Processor TopologyDescription$Sink) - (io.factorhouse.kpow MetricFilter MetricFilter$FilterCriteria) - (java.util.concurrent.atomic AtomicInteger))) + (org.apache.kafka.streams KeyValue Topology TopologyDescription TopologyDescription$GlobalStore + TopologyDescription$Node TopologyDescription$Processor TopologyDescription$Sink + TopologyDescription$Source TopologyDescription$Subtopology))) (def kpow-snapshot-topic {:topic "__oprtr_snapshot_state"}) @@ -19,12 +19,9 @@ (extend-protocol p/Datafiable Taxon (datafy [v] - (into [] - (filter identity) - [(keyword (.getDomain v)) - (.getId v) - (keyword "kafka" (.getObject v)) - (.getObjectId v)])) + (if-let [object-id (.getObjectId v)] + [(keyword (.getDomain v)) (.getDomainId v) (keyword "kafka" (.getObject v)) object-id] + [(keyword (.getDomain v)) (.getDomainId v) (keyword "kafka" (.getObject v))])) KeyValue (datafy [kv] @@ -93,17 +90,17 @@ (defn apply-metric-filters [^MetricName metric-name filters] (reduce - (fn [acc ^MetricFilter$FilterCriteria filter-criteria] - (let [metric-filter-type (.getFilterType filter-criteria) - predicate (.getPredicate filter-criteria)] - (if (.test predicate metric-name) - (reduced - (case (.name metric-filter-type) - "ACCEPT" true - "DENY" false)) - acc))) - nil - filters)) + (fn [acc ^MetricFilter$FilterCriteria filter-criteria] + (let [metric-filter-type (.getFilterType filter-criteria) + predicate (.getPredicate filter-criteria)] + (if (.test predicate metric-name) + (reduced + (case (.name metric-filter-type) + "ACCEPT" true + "DENY" false)) + acc))) + nil + filters)) (defn numeric-metrics [metrics ^MetricFilter metrics-filter] @@ -169,17 +166,17 @@ application-id (application-id metrics) taxon (.getTaxon key-strategy client-id application-id) ctx (assoc ctx - :captured (System/currentTimeMillis) - :client-id client-id - :application-id application-id - :taxon taxon)] + :captured (System/currentTimeMillis) + :client-id client-id + :application-id application-id + :taxon taxon)] (when (nil? application-id) (throw (Exception. "Cannot infer application id from metrics returned from KafkaStreams instance. Expected metric \"application-id\" in the metrics registry."))) (when (nil? client-id) (throw (Exception. - (format "Cannot infer client id from metrics returned from KafkaStreams instance. Got: client-id %s and application-id %s" - (client-id-tag metrics) - application-id)))) + (format "Cannot infer client id from metrics returned from KafkaStreams instance. Got: client-id %s and application-id %s" + (client-id-tag metrics) + application-id)))) (snapshot-send ctx snapshot) (metrics-send ctx (numeric-metrics metrics metrics-filter)) ctx)))) diff --git a/src/java/io/factorhouse/kpow/StreamsRegistry.java b/src/java/io/factorhouse/kpow/StreamsRegistry.java index 38397d5..5fc9d11 100644 --- a/src/java/io/factorhouse/kpow/StreamsRegistry.java +++ b/src/java/io/factorhouse/kpow/StreamsRegistry.java @@ -2,7 +2,7 @@ import clojure.java.api.Clojure; import clojure.lang.IFn; -import io.factorhouse.kpow.key_strategies.KeyStrategy; +import io.factorhouse.kpow.key.KeyStrategy; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; diff --git a/src/java/io/factorhouse/kpow/key_strategies/ClientIDKeyStrategy.java b/src/java/io/factorhouse/kpow/key/ClientIdKeyStrategy.java similarity index 53% rename from src/java/io/factorhouse/kpow/key_strategies/ClientIDKeyStrategy.java rename to src/java/io/factorhouse/kpow/key/ClientIdKeyStrategy.java index 24bf29f..9fe2602 100644 --- a/src/java/io/factorhouse/kpow/key_strategies/ClientIDKeyStrategy.java +++ b/src/java/io/factorhouse/kpow/key/ClientIdKeyStrategy.java @@ -1,7 +1,7 @@ -package io.factorhouse.kpow.key_strategies; +package io.factorhouse.kpow.key; -public class ClientIDKeyStrategy implements KeyStrategy { - public ClientIDKeyStrategy() {} +public class ClientIdKeyStrategy implements KeyStrategy { + public ClientIdKeyStrategy() {} @Override public Taxon getTaxon(String clientId, String applicationId) { diff --git a/src/java/io/factorhouse/kpow/key_strategies/ClusterIDKeyStrategy.java b/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java similarity index 60% rename from src/java/io/factorhouse/kpow/key_strategies/ClusterIDKeyStrategy.java rename to src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java index c223d1f..e0965e3 100644 --- a/src/java/io/factorhouse/kpow/key_strategies/ClusterIDKeyStrategy.java +++ b/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java @@ -1,22 +1,22 @@ -package io.factorhouse.kpow.key_strategies; +package io.factorhouse.kpow.key; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeClusterResult; import java.util.Properties; import java.util.concurrent.ExecutionException; -public class ClusterIDKeyStrategy implements KeyStrategy { - private final String clusterID; +public class ClusterIdKeyStrategy implements KeyStrategy { + private final String clusterId; - public ClusterIDKeyStrategy(Properties props) throws InterruptedException, ExecutionException { + public ClusterIdKeyStrategy(Properties props) throws InterruptedException, ExecutionException { try (AdminClient adminClient = AdminClient.create(props)) { DescribeClusterResult describeClusterResult = adminClient.describeCluster(); - this.clusterID = describeClusterResult.clusterId().get(); + this.clusterId = describeClusterResult.clusterId().get(); } } @Override public Taxon getTaxon(String clientId, String applicationId) { - return new Taxon("streams", clusterID, "streams-agent-cid", clientId); + return new Taxon("streams", clusterId, "streams-agent-cid", clientId); } } diff --git a/src/java/io/factorhouse/kpow/key_strategies/KeyStrategy.java b/src/java/io/factorhouse/kpow/key/KeyStrategy.java similarity index 67% rename from src/java/io/factorhouse/kpow/key_strategies/KeyStrategy.java rename to src/java/io/factorhouse/kpow/key/KeyStrategy.java index 447b984..ab7f582 100644 --- a/src/java/io/factorhouse/kpow/key_strategies/KeyStrategy.java +++ b/src/java/io/factorhouse/kpow/key/KeyStrategy.java @@ -1,4 +1,4 @@ -package io.factorhouse.kpow.key_strategies; +package io.factorhouse.kpow.key; public interface KeyStrategy { Taxon getTaxon(String clientId, String applicationId); diff --git a/src/java/io/factorhouse/kpow/key_strategies/ManualKeyStrategy.java b/src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java similarity index 88% rename from src/java/io/factorhouse/kpow/key_strategies/ManualKeyStrategy.java rename to src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java index 463ccb9..f37d28d 100644 --- a/src/java/io/factorhouse/kpow/key_strategies/ManualKeyStrategy.java +++ b/src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java @@ -1,4 +1,4 @@ -package io.factorhouse.kpow.key_strategies; +package io.factorhouse.kpow.key; public class ManualKeyStrategy implements KeyStrategy { private final String envName; diff --git a/src/java/io/factorhouse/kpow/key_strategies/Taxon.java b/src/java/io/factorhouse/kpow/key/Taxon.java similarity index 63% rename from src/java/io/factorhouse/kpow/key_strategies/Taxon.java rename to src/java/io/factorhouse/kpow/key/Taxon.java index a220a44..0e31a3d 100644 --- a/src/java/io/factorhouse/kpow/key_strategies/Taxon.java +++ b/src/java/io/factorhouse/kpow/key/Taxon.java @@ -1,14 +1,13 @@ -package io.factorhouse.kpow.key_strategies; +package io.factorhouse.kpow.key; public class Taxon { private final String domain; - private final String id; - + private final String domainId; private final String object; private final String objectId; - public Taxon(String domain, String id, String object, String objectId) { - this.id = id; + public Taxon(String domain, String domainId, String object, String objectId) { + this.domainId = domainId; this.domain = domain; this.object = object; this.objectId = objectId; @@ -26,7 +25,7 @@ public String getObjectId() { return objectId; } - public String getId() { - return id; + public String getDomainId() { + return domainId; } } diff --git a/test/io/factorhouse/agent_test.clj b/test/io/factorhouse/agent_test.clj index 91a3bc0..44cb69e 100644 --- a/test/io/factorhouse/agent_test.clj +++ b/test/io/factorhouse/agent_test.clj @@ -3,11 +3,11 @@ [clojure.test :refer :all] [io.factorhouse.kpow.agent :as agent]) (:import (io.factorhouse.kpow MetricFilter StreamsRegistry) + (io.factorhouse.kpow.key ClientIdKeyStrategy) (java.util Properties) (org.apache.kafka.clients.producer Producer) (org.apache.kafka.common Metric MetricName) - (org.apache.kafka.streams KafkaStreams$State StreamsBuilder Topology) - (io.factorhouse.kpow.key_strategies ClientIDKeyStrategy))) + (org.apache.kafka.streams KafkaStreams$State StreamsBuilder Topology))) (defn ^Properties ->props [m] (let [props (Properties.)] @@ -88,7 +88,7 @@ (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") (mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)]) (test-topology) - (ClientIDKeyStrategy.))] + (ClientIdKeyStrategy.))] (is agent) From df2ecb3f6b6b13effe795458502e623872031ed0 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Wed, 13 Nov 2024 15:33:19 +1100 Subject: [PATCH 08/29] Add unit tests for key strat + metric filters --- test/io/factorhouse/agent_test.clj | 108 ++++++++++++++++++++++++++++- 1 file changed, 107 insertions(+), 1 deletion(-) diff --git a/test/io/factorhouse/agent_test.clj b/test/io/factorhouse/agent_test.clj index 44cb69e..72d4b72 100644 --- a/test/io/factorhouse/agent_test.clj +++ b/test/io/factorhouse/agent_test.clj @@ -3,7 +3,7 @@ [clojure.test :refer :all] [io.factorhouse.kpow.agent :as agent]) (:import (io.factorhouse.kpow MetricFilter StreamsRegistry) - (io.factorhouse.kpow.key ClientIdKeyStrategy) + (io.factorhouse.kpow.key ClientIdKeyStrategy ManualKeyStrategy) (java.util Properties) (org.apache.kafka.clients.producer Producer) (org.apache.kafka.common Metric MetricName) @@ -126,3 +126,109 @@ (is (agent/unregister registry agent)) (is (empty? (agent/close-registry registry))))) + +(deftest agent-test-metric-filters + (let [records (atom []) + metrics-filter (-> (MetricFilter.) + (.denyNameStartsWith "second") + (.acceptNameStartsWith "first") + (.acceptNameStartsWith "rocksdb") + (.deny)) + registry (agent/init-registry (mock-producer records) metrics-filter) + agent (agent/register registry + (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) + (mock-metric "rocksdb.foo" "first" "mock metric" {"client-id" "abc123"} 3.0) + (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") + (mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)]) + (test-topology) + (ClientIdKeyStrategy.))] + + (is agent) + + (is (deref (:latch registry) 5000 false)) + + (is (= #{[[:streams "abc123" :kafka/streams-agent] + {:type :kafka/streams-agent, + :application-id "xxx", + :client-id "abc123", + :data {:topology {:sub-topologies #{{:id 0, + :nodes #{{:name "KSTREAM-SOURCE-0000000000", + :predecessors #{}, + :successors #{}, + :topic-set #{"__oprtr_snapshot_state"}, + :topic-pattern nil}}}}, + :global-stores #{}}, + :state "RUNNING"}, + :snapshot/id {:domain :streams, :id "abc123"}}] + [[:streams "abc123" :kafka/streams-agent] + {:type :observation/plan, :snapshot/id {:domain :streams, :id "abc123"}, :data {:type :observe/streams-agent}}] + [[:streams "abc123" :kafka/streams-agent] + {:type :kafka/streams-agent-metrics, + :application-id "xxx", + :client-id "abc123", + :data [{:name "first.metric", :tags {}, :value 1.0} + {:name "rocksdb.foo", :tags {"client-id" "abc123"}, :value 3.0}], + :snapshot/id {:domain :streams, :id "abc123"}}]} + (into #{} (map (fn [record] + [(.key record) (dissoc (.value record) :job/id :captured)])) + @records))) + + (testing "consistent :captured value" + (is (= 1 (count (into #{} (map (fn [record] (-> record (.value) :captured))) @records))))) + + (is (agent/unregister registry agent)) + + (is (empty? (agent/close-registry registry))))) + +(deftest agent-test-manual-key-strategy + (let [records (atom []) + metrics-filter (-> (MetricFilter.) + (.denyNameStartsWith "second") + (.acceptNameStartsWith "first") + (.acceptNameStartsWith "rocksdb") + (.deny)) + registry (agent/init-registry (mock-producer records) metrics-filter) + agent (agent/register registry + (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) + (mock-metric "rocksdb.foo" "first" "mock metric" {"client-id" "abc123"} 3.0) + (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") + (mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)]) + (test-topology) + (ManualKeyStrategy. "Trade Book (Staging)"))] + + (is agent) + + (is (deref (:latch registry) 5000 false)) + + (is (= #{[[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] + {:type :kafka/streams-agent, + :application-id "xxx", + :client-id "abc123", + :data {:topology {:sub-topologies #{{:id 0, + :nodes #{{:name "KSTREAM-SOURCE-0000000000", + :predecessors #{}, + :successors #{}, + :topic-set #{"__oprtr_snapshot_state"}, + :topic-pattern nil}}}}, + :global-stores #{}}, + :state "RUNNING"}, + :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}}] + [[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] + {:type :observation/plan, :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}, :data {:type :observe/streams-agent}}] + [[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] + {:type :kafka/streams-agent-metrics, + :application-id "xxx", + :client-id "abc123", + :data [{:name "first.metric", :tags {}, :value 1.0} + {:name "rocksdb.foo", :tags {"client-id" "abc123"}, :value 3.0}], + :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}}]} + (into #{} (map (fn [record] + [(.key record) (dissoc (.value record) :job/id :captured)])) + @records))) + + (testing "consistent :captured value" + (is (= 1 (count (into #{} (map (fn [record] (-> record (.value) :captured))) @records))))) + + (is (agent/unregister registry agent)) + + (is (empty? (agent/close-registry registry))))) From 380a59acfc04f486eed3ef2eb7a836689a6f0644 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Wed, 13 Nov 2024 15:35:13 +1100 Subject: [PATCH 09/29] Test deny metric filter --- test/io/factorhouse/agent_test.clj | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/test/io/factorhouse/agent_test.clj b/test/io/factorhouse/agent_test.clj index 72d4b72..9aa88ff 100644 --- a/test/io/factorhouse/agent_test.clj +++ b/test/io/factorhouse/agent_test.clj @@ -182,11 +182,7 @@ (deftest agent-test-manual-key-strategy (let [records (atom []) - metrics-filter (-> (MetricFilter.) - (.denyNameStartsWith "second") - (.acceptNameStartsWith "first") - (.acceptNameStartsWith "rocksdb") - (.deny)) + metrics-filter (-> (MetricFilter.) (.deny)) registry (agent/init-registry (mock-producer records) metrics-filter) agent (agent/register registry (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) @@ -214,14 +210,7 @@ :state "RUNNING"}, :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}}] [[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] - {:type :observation/plan, :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}, :data {:type :observe/streams-agent}}] - [[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] - {:type :kafka/streams-agent-metrics, - :application-id "xxx", - :client-id "abc123", - :data [{:name "first.metric", :tags {}, :value 1.0} - {:name "rocksdb.foo", :tags {"client-id" "abc123"}, :value 3.0}], - :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}}]} + {:type :observation/plan, :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}, :data {:type :observe/streams-agent}}]} (into #{} (map (fn [record] [(.key record) (dissoc (.value record) :job/id :captured)])) @records))) From d3af2cc54e26ee29ff4fba84fec8c08db764ff05 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Wed, 13 Nov 2024 15:48:07 +1100 Subject: [PATCH 10/29] fmtfix --- src/clojure/io/factorhouse/kpow/agent.clj | 36 +++++++++++------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/clojure/io/factorhouse/kpow/agent.clj b/src/clojure/io/factorhouse/kpow/agent.clj index 10e675f..374dbe2 100644 --- a/src/clojure/io/factorhouse/kpow/agent.clj +++ b/src/clojure/io/factorhouse/kpow/agent.clj @@ -90,17 +90,17 @@ (defn apply-metric-filters [^MetricName metric-name filters] (reduce - (fn [acc ^MetricFilter$FilterCriteria filter-criteria] - (let [metric-filter-type (.getFilterType filter-criteria) - predicate (.getPredicate filter-criteria)] - (if (.test predicate metric-name) - (reduced - (case (.name metric-filter-type) - "ACCEPT" true - "DENY" false)) - acc))) - nil - filters)) + (fn [acc ^MetricFilter$FilterCriteria filter-criteria] + (let [metric-filter-type (.getFilterType filter-criteria) + predicate (.getPredicate filter-criteria)] + (if (.test predicate metric-name) + (reduced + (case (.name metric-filter-type) + "ACCEPT" true + "DENY" false)) + acc))) + nil + filters)) (defn numeric-metrics [metrics ^MetricFilter metrics-filter] @@ -166,17 +166,17 @@ application-id (application-id metrics) taxon (.getTaxon key-strategy client-id application-id) ctx (assoc ctx - :captured (System/currentTimeMillis) - :client-id client-id - :application-id application-id - :taxon taxon)] + :captured (System/currentTimeMillis) + :client-id client-id + :application-id application-id + :taxon taxon)] (when (nil? application-id) (throw (Exception. "Cannot infer application id from metrics returned from KafkaStreams instance. Expected metric \"application-id\" in the metrics registry."))) (when (nil? client-id) (throw (Exception. - (format "Cannot infer client id from metrics returned from KafkaStreams instance. Got: client-id %s and application-id %s" - (client-id-tag metrics) - application-id)))) + (format "Cannot infer client id from metrics returned from KafkaStreams instance. Got: client-id %s and application-id %s" + (client-id-tag metrics) + application-id)))) (snapshot-send ctx snapshot) (metrics-send ctx (numeric-metrics metrics metrics-filter)) ctx)))) From b822adc4adcc71bca00e84a267c0119c306915f0 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Mon, 18 Nov 2024 11:40:54 +1100 Subject: [PATCH 11/29] Add agent metadata to observation snapshot --- src/clojure/io/factorhouse/kpow/agent.clj | 73 ++++++++++--------- .../io/factorhouse/kpow/MetricFilter.java | 21 +++++- .../io/factorhouse/kpow/StreamsRegistry.java | 8 +- .../kpow/key/ClusterIdKeyStrategy.java | 2 +- .../kpow/key/ManualKeyStrategy.java | 2 +- src/java/io/operatr/kpow/StreamsRegistry.java | 10 +-- test/io/factorhouse/agent_test.clj | 38 +++++++--- 7 files changed, 92 insertions(+), 62 deletions(-) diff --git a/src/clojure/io/factorhouse/kpow/agent.clj b/src/clojure/io/factorhouse/kpow/agent.clj index 374dbe2..d7c2a2c 100644 --- a/src/clojure/io/factorhouse/kpow/agent.clj +++ b/src/clojure/io/factorhouse/kpow/agent.clj @@ -90,17 +90,17 @@ (defn apply-metric-filters [^MetricName metric-name filters] (reduce - (fn [acc ^MetricFilter$FilterCriteria filter-criteria] - (let [metric-filter-type (.getFilterType filter-criteria) - predicate (.getPredicate filter-criteria)] - (if (.test predicate metric-name) - (reduced - (case (.name metric-filter-type) - "ACCEPT" true - "DENY" false)) - acc))) - nil - filters)) + (fn [acc ^MetricFilter$FilterCriteria filter-criteria] + (let [metric-filter-type (.getFilterType filter-criteria) + predicate (.getPredicate filter-criteria)] + (if (.test predicate metric-name) + (reduced + (case (.name metric-filter-type) + "ACCEPT" true + "DENY" false)) + acc))) + nil + filters)) (defn numeric-metrics [metrics ^MetricFilter metrics-filter] @@ -124,7 +124,7 @@ :captured captured :data data :job/id job-id - :snapshot/id {:domain (first taxon) :id (second taxon)}} + :snapshot/id {:domain :streams :id taxon}} record (ProducerRecord. (:topic snapshot-topic) taxon snapshot)] (.get (.send producer record)))) @@ -138,48 +138,53 @@ :captured captured :data (vec data) :job/id job-id - :snapshot/id {:domain (first taxon) :id (second taxon)}} + :snapshot/id {:domain :streams :id taxon}} record (ProducerRecord. (:topic snapshot-topic) taxon value)] (.get (.send producer record)))) (log/infof "Kpow: sent [%s] streams metrics for application.id %s" (count metrics) application-id))) (defn plan-send - [{:keys [snapshot-topic producer job-id captured taxon]}] + [{:keys [snapshot-topic producer job-id captured taxon metrics-summary]}] (let [taxon (p/datafy taxon) plan {:type :observation/plan :captured captured - :snapshot/id {:domain (first taxon) :id (second taxon)} + :snapshot/id {:domain :streams :id taxon} :job/id job-id - :data {:type :observe/streams-agent}} + :data {:type :observe/streams-agent + :agent {:metrics-summary metrics-summary + :version "1.0.0"}}} record (ProducerRecord. (:topic snapshot-topic) taxon plan)] (.get (.send producer record)))) (defn snapshot-telemetry - [{:keys [streams ^Topology topology metrics-filter ^KeyStrategy key-strategy] :as ctx}] + [{:keys [streams ^Topology topology ^MetricFilter metrics-filter ^KeyStrategy key-strategy] :as ctx}] (let [metrics (metrics streams)] (if (empty? metrics) (log/warn "KafkStreams .metrics() method returned an empty collection, no telemetry was sent. Has something mutated the global metrics registry?") - (let [topology (p/datafy (.describe topology)) - state (str (.state streams)) - snapshot {:topology topology :state state} - client-id (client-id metrics) - application-id (application-id metrics) - taxon (.getTaxon key-strategy client-id application-id) - ctx (assoc ctx - :captured (System/currentTimeMillis) - :client-id client-id - :application-id application-id - :taxon taxon)] + (let [topology (p/datafy (.describe topology)) + state (str (.state streams)) + snapshot {:topology topology :state state} + client-id (client-id metrics) + application-id (application-id metrics) + taxon (.getTaxon key-strategy client-id application-id) + ctx (assoc ctx + :captured (System/currentTimeMillis) + :client-id client-id + :application-id application-id + :taxon taxon) + filtered-metrics (numeric-metrics metrics metrics-filter)] (when (nil? application-id) (throw (Exception. "Cannot infer application id from metrics returned from KafkaStreams instance. Expected metric \"application-id\" in the metrics registry."))) (when (nil? client-id) (throw (Exception. - (format "Cannot infer client id from metrics returned from KafkaStreams instance. Got: client-id %s and application-id %s" - (client-id-tag metrics) - application-id)))) + (format "Cannot infer client id from metrics returned from KafkaStreams instance. Got: client-id %s and application-id %s" + (client-id-tag metrics) + application-id)))) (snapshot-send ctx snapshot) - (metrics-send ctx (numeric-metrics metrics metrics-filter)) - ctx)))) + (metrics-send ctx filtered-metrics) + (assoc ctx :metrics-summary {:total (count metrics) + :sent (count filtered-metrics) + :id (some-> metrics-filter .getFilterId)}))))) (defn snapshot-task ^Runnable [snapshot-topic producer registered-topologies metrics-filter latch] @@ -194,7 +199,7 @@ (Thread/sleep 2000) (plan-send next-ctx)) (catch Throwable e - (log/errorf e "Kpow: error sending streams snapshot for agent %s" id)))) + (log/warnf e "Kpow: error sending streams snapshot for agent %s" id)))) (deliver latch true)))) diff --git a/src/java/io/factorhouse/kpow/MetricFilter.java b/src/java/io/factorhouse/kpow/MetricFilter.java index 58cefc2..8d946ae 100644 --- a/src/java/io/factorhouse/kpow/MetricFilter.java +++ b/src/java/io/factorhouse/kpow/MetricFilter.java @@ -8,6 +8,13 @@ import java.util.function.Predicate; public class MetricFilter { + + private String filterId = null; + + public String getFilterId() { + return filterId; + } + public enum FilterType { ACCEPT, DENY } @@ -35,6 +42,18 @@ public FilterType getFilterType() { public MetricFilter() { this.filters = new ArrayList<>(); + this.filterId = "custom"; + } + + private MetricFilter(String id) { + this.filters = new ArrayList<>(); + this.filterId = id; + } + + public static MetricFilter defaultMetricFilter() { + return new MetricFilter("default") + .acceptNameStartsWith("streams.state") + .deny(); } public List getFilters() { @@ -80,4 +99,4 @@ public MetricFilter denyNameStartsWith(String prefix) { this.filters.add(criteria); return this; } -} \ No newline at end of file +} diff --git a/src/java/io/factorhouse/kpow/StreamsRegistry.java b/src/java/io/factorhouse/kpow/StreamsRegistry.java index 5fc9d11..09f2001 100644 --- a/src/java/io/factorhouse/kpow/StreamsRegistry.java +++ b/src/java/io/factorhouse/kpow/StreamsRegistry.java @@ -88,13 +88,7 @@ public StreamsRegistry(Properties props, MetricFilter metricsFilter) { } public StreamsRegistry(Properties props) { - this(props, StreamsRegistry.defaultMetricFilter()); - } - - public static MetricFilter defaultMetricFilter() { - return new MetricFilter() - .acceptNameStartsWith("foo") - .deny(); + this(props, MetricFilter.defaultMetricFilter()); } public StreamsAgent register(KafkaStreams streams, Topology topology, KeyStrategy keyStrategy) { diff --git a/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java b/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java index e0965e3..7b70726 100644 --- a/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java +++ b/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java @@ -17,6 +17,6 @@ public ClusterIdKeyStrategy(Properties props) throws InterruptedException, Execu @Override public Taxon getTaxon(String clientId, String applicationId) { - return new Taxon("streams", clusterId, "streams-agent-cid", clientId); + return new Taxon("cluster", clusterId, "streams-agent-cid", clientId); } } diff --git a/src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java b/src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java index f37d28d..86b3b3e 100644 --- a/src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java +++ b/src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java @@ -9,6 +9,6 @@ public ManualKeyStrategy(String envName) { @Override public Taxon getTaxon(String clientId, String applicationId) { - return new Taxon("streams", envName, "streams-agent-m", clientId); + return new Taxon("env", envName, "streams-agent-m", clientId); } } diff --git a/src/java/io/operatr/kpow/StreamsRegistry.java b/src/java/io/operatr/kpow/StreamsRegistry.java index cb9b75f..8366e56 100644 --- a/src/java/io/operatr/kpow/StreamsRegistry.java +++ b/src/java/io/operatr/kpow/StreamsRegistry.java @@ -87,14 +87,8 @@ public StreamsRegistry(Properties props, MetricFilter metricsFilter) { agent = agentFn.invoke(producer, metricsFilter); } - public static MetricFilter defaultMetricFilter() { - return new MetricFilter() - .acceptNameStartsWith("foo") - .deny(); - } - public StreamsRegistry(Properties props) { - this(props, StreamsRegistry.defaultMetricFilter()); + this(props, MetricFilter.defaultMetricFilter()); } public StreamsAgent register(KafkaStreams streams, Topology topology) { @@ -125,4 +119,4 @@ public void close() { IFn closeFn = Clojure.var("io.factorhouse.kpow.agent", "close-registry"); closeFn.invoke(agent); } -} \ No newline at end of file +} diff --git a/test/io/factorhouse/agent_test.clj b/test/io/factorhouse/agent_test.clj index 9aa88ff..b871b79 100644 --- a/test/io/factorhouse/agent_test.clj +++ b/test/io/factorhouse/agent_test.clj @@ -106,16 +106,22 @@ :topic-pattern nil}}}}, :global-stores #{}}, :state "RUNNING"}, - :snapshot/id {:domain :streams, :id "abc123"}}] + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}] [[:streams "abc123" :kafka/streams-agent] - {:type :observation/plan, :snapshot/id {:domain :streams, :id "abc123"}, :data {:type :observe/streams-agent}}] + {:type :observation/plan + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]} + :data {:type :observe/streams-agent + :agent {:metrics-summary {:id "custom" + :sent 2 + :total 3} + :version "1.0.0"}}}] [[:streams "abc123" :kafka/streams-agent] {:type :kafka/streams-agent-metrics, :application-id "xxx", :client-id "abc123", :data [{:name "first.metric", :tags {}, :value 1.0} {:name "second.metric", :tags {"client-id" "abc123"}, :value 2.0}], - :snapshot/id {:domain :streams, :id "abc123"}}]} + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}]} (into #{} (map (fn [record] [(.key record) (dissoc (.value record) :job/id :captured)])) @records))) @@ -159,16 +165,22 @@ :topic-pattern nil}}}}, :global-stores #{}}, :state "RUNNING"}, - :snapshot/id {:domain :streams, :id "abc123"}}] + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}] [[:streams "abc123" :kafka/streams-agent] - {:type :observation/plan, :snapshot/id {:domain :streams, :id "abc123"}, :data {:type :observe/streams-agent}}] + {:type :observation/plan + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]} + :data {:type :observe/streams-agent + :agent {:metrics-summary {:id "custom" + :sent 2 + :total 4} + :version "1.0.0"}}}] [[:streams "abc123" :kafka/streams-agent] {:type :kafka/streams-agent-metrics, :application-id "xxx", :client-id "abc123", :data [{:name "first.metric", :tags {}, :value 1.0} {:name "rocksdb.foo", :tags {"client-id" "abc123"}, :value 3.0}], - :snapshot/id {:domain :streams, :id "abc123"}}]} + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}]} (into #{} (map (fn [record] [(.key record) (dissoc (.value record) :job/id :captured)])) @records))) @@ -196,7 +208,7 @@ (is (deref (:latch registry) 5000 false)) - (is (= #{[[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] + (is (= #{[[:env "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] {:type :kafka/streams-agent, :application-id "xxx", :client-id "abc123", @@ -208,9 +220,15 @@ :topic-pattern nil}}}}, :global-stores #{}}, :state "RUNNING"}, - :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}}] - [[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] - {:type :observation/plan, :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}, :data {:type :observe/streams-agent}}]} + :snapshot/id {:domain :streams, :id [:env "Trade Book (Staging)" :kafka/streams-agent-m "abc123"]}}] + [[:env "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] + {:type :observation/plan + :snapshot/id {:domain :streams, :id [:env "Trade Book (Staging)" :kafka/streams-agent-m "abc123"]} + :data {:type :observe/streams-agent + :agent {:metrics-summary {:id "custom" + :sent 0 + :total 4} + :version "1.0.0"}}}]} (into #{} (map (fn [record] [(.key record) (dissoc (.value record) :job/id :captured)])) @records))) From 6ef14c9491f5bf2fce47bce80b6d27b3b966cca2 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Mon, 18 Nov 2024 11:44:24 +1100 Subject: [PATCH 12/29] fmtfix --- src/clojure/io/factorhouse/kpow/agent.clj | 36 +++++++++++------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/clojure/io/factorhouse/kpow/agent.clj b/src/clojure/io/factorhouse/kpow/agent.clj index d7c2a2c..30cd463 100644 --- a/src/clojure/io/factorhouse/kpow/agent.clj +++ b/src/clojure/io/factorhouse/kpow/agent.clj @@ -90,17 +90,17 @@ (defn apply-metric-filters [^MetricName metric-name filters] (reduce - (fn [acc ^MetricFilter$FilterCriteria filter-criteria] - (let [metric-filter-type (.getFilterType filter-criteria) - predicate (.getPredicate filter-criteria)] - (if (.test predicate metric-name) - (reduced - (case (.name metric-filter-type) - "ACCEPT" true - "DENY" false)) - acc))) - nil - filters)) + (fn [acc ^MetricFilter$FilterCriteria filter-criteria] + (let [metric-filter-type (.getFilterType filter-criteria) + predicate (.getPredicate filter-criteria)] + (if (.test predicate metric-name) + (reduced + (case (.name metric-filter-type) + "ACCEPT" true + "DENY" false)) + acc))) + nil + filters)) (defn numeric-metrics [metrics ^MetricFilter metrics-filter] @@ -168,18 +168,18 @@ application-id (application-id metrics) taxon (.getTaxon key-strategy client-id application-id) ctx (assoc ctx - :captured (System/currentTimeMillis) - :client-id client-id - :application-id application-id - :taxon taxon) + :captured (System/currentTimeMillis) + :client-id client-id + :application-id application-id + :taxon taxon) filtered-metrics (numeric-metrics metrics metrics-filter)] (when (nil? application-id) (throw (Exception. "Cannot infer application id from metrics returned from KafkaStreams instance. Expected metric \"application-id\" in the metrics registry."))) (when (nil? client-id) (throw (Exception. - (format "Cannot infer client id from metrics returned from KafkaStreams instance. Got: client-id %s and application-id %s" - (client-id-tag metrics) - application-id)))) + (format "Cannot infer client id from metrics returned from KafkaStreams instance. Got: client-id %s and application-id %s" + (client-id-tag metrics) + application-id)))) (snapshot-send ctx snapshot) (metrics-send ctx filtered-metrics) (assoc ctx :metrics-summary {:total (count metrics) From 34616b15d68b818f48fbb67ce3a07a1bd97a3351 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Mon, 18 Nov 2024 14:25:46 +1100 Subject: [PATCH 13/29] Pass thru agent id in observation plan --- src/clojure/io/factorhouse/kpow/agent.clj | 11 ++++++++--- test/io/factorhouse/agent_test.clj | 13 ++++++++----- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/clojure/io/factorhouse/kpow/agent.clj b/src/clojure/io/factorhouse/kpow/agent.clj index 30cd463..4fef36b 100644 --- a/src/clojure/io/factorhouse/kpow/agent.clj +++ b/src/clojure/io/factorhouse/kpow/agent.clj @@ -144,7 +144,7 @@ (log/infof "Kpow: sent [%s] streams metrics for application.id %s" (count metrics) application-id))) (defn plan-send - [{:keys [snapshot-topic producer job-id captured taxon metrics-summary]}] + [{:keys [snapshot-topic producer job-id captured taxon metrics-summary agent-id]}] (let [taxon (p/datafy taxon) plan {:type :observation/plan :captured captured @@ -152,12 +152,13 @@ :job/id job-id :data {:type :observe/streams-agent :agent {:metrics-summary metrics-summary + :id agent-id :version "1.0.0"}}} record (ProducerRecord. (:topic snapshot-topic) taxon plan)] (.get (.send producer record)))) (defn snapshot-telemetry - [{:keys [streams ^Topology topology ^MetricFilter metrics-filter ^KeyStrategy key-strategy] :as ctx}] + [{:keys [streams ^Topology topology ^MetricFilter metrics-filter ^KeyStrategy key-strategy agent-id] :as ctx}] (let [metrics (metrics streams)] (if (empty? metrics) (log/warn "KafkStreams .metrics() method returned an empty collection, no telemetry was sent. Has something mutated the global metrics registry?") @@ -195,7 +196,10 @@ :producer producer :metrics-filter metrics-filter}] (doseq [[id [streams topology key-strategy]] @registered-topologies] - (try (when-let [next-ctx (snapshot-telemetry (assoc ctx :streams streams :topology topology :key-strategy key-strategy))] + (try (when-let [next-ctx (snapshot-telemetry (assoc ctx :streams streams + :topology topology + :key-strategy key-strategy + :agent-id id))] (Thread/sleep 2000) (plan-send next-ctx)) (catch Throwable e @@ -217,6 +221,7 @@ pool (Executors/newSingleThreadScheduledExecutor thread-factory) register-fn (fn [streams topology key-strategy] (let [id (str (UUID/randomUUID))] + (log/infof "Kpow: registering new streams application with id %s" id) (swap! registered-topologies assoc id [streams topology key-strategy]) id)) latch (promise) diff --git a/test/io/factorhouse/agent_test.clj b/test/io/factorhouse/agent_test.clj index b871b79..1d83e94 100644 --- a/test/io/factorhouse/agent_test.clj +++ b/test/io/factorhouse/agent_test.clj @@ -83,14 +83,14 @@ (let [records (atom []) metrics-filter (-> (MetricFilter.) (.accept)) registry (agent/init-registry (mock-producer records) metrics-filter) - agent (agent/register registry + agent-id (agent/register registry (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") (mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)]) (test-topology) (ClientIdKeyStrategy.))] - (is agent) + (is agent-id) (is (deref (:latch registry) 5000 false)) @@ -114,6 +114,7 @@ :agent {:metrics-summary {:id "custom" :sent 2 :total 3} + :id agent-id :version "1.0.0"}}}] [[:streams "abc123" :kafka/streams-agent] {:type :kafka/streams-agent-metrics, @@ -141,7 +142,7 @@ (.acceptNameStartsWith "rocksdb") (.deny)) registry (agent/init-registry (mock-producer records) metrics-filter) - agent (agent/register registry + agent-id (agent/register registry (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) (mock-metric "rocksdb.foo" "first" "mock metric" {"client-id" "abc123"} 3.0) (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") @@ -173,6 +174,7 @@ :agent {:metrics-summary {:id "custom" :sent 2 :total 4} + :id agent-id :version "1.0.0"}}}] [[:streams "abc123" :kafka/streams-agent] {:type :kafka/streams-agent-metrics, @@ -196,7 +198,7 @@ (let [records (atom []) metrics-filter (-> (MetricFilter.) (.deny)) registry (agent/init-registry (mock-producer records) metrics-filter) - agent (agent/register registry + agent-id (agent/register registry (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) (mock-metric "rocksdb.foo" "first" "mock metric" {"client-id" "abc123"} 3.0) (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") @@ -204,7 +206,7 @@ (test-topology) (ManualKeyStrategy. "Trade Book (Staging)"))] - (is agent) + (is agent-id) (is (deref (:latch registry) 5000 false)) @@ -228,6 +230,7 @@ :agent {:metrics-summary {:id "custom" :sent 0 :total 4} + :id agent-id :version "1.0.0"}}}]} (into #{} (map (fn [record] [(.key record) (dissoc (.value record) :job/id :captured)])) From 690a11a0c69b1c43c0a1768efc7143d8a9d9c392 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Tue, 19 Nov 2024 11:37:59 +1100 Subject: [PATCH 14/29] kondo --- src/clojure/io/factorhouse/kpow/agent.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clojure/io/factorhouse/kpow/agent.clj b/src/clojure/io/factorhouse/kpow/agent.clj index 4fef36b..32d8d47 100644 --- a/src/clojure/io/factorhouse/kpow/agent.clj +++ b/src/clojure/io/factorhouse/kpow/agent.clj @@ -158,7 +158,7 @@ (.get (.send producer record)))) (defn snapshot-telemetry - [{:keys [streams ^Topology topology ^MetricFilter metrics-filter ^KeyStrategy key-strategy agent-id] :as ctx}] + [{:keys [streams ^Topology topology ^MetricFilter metrics-filter ^KeyStrategy key-strategy] :as ctx}] (let [metrics (metrics streams)] (if (empty? metrics) (log/warn "KafkStreams .metrics() method returned an empty collection, no telemetry was sent. Has something mutated the global metrics registry?") From 4f138e506560b0b546ff4f7883c12490cca9099b Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Tue, 19 Nov 2024 13:52:57 +1100 Subject: [PATCH 15/29] 1.0.0-rc1 --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index 56be5b8..cc4f4b7 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject io.factorhouse/kpow-streams-agent "0.2.13" +(defproject io.factorhouse/kpow-streams-agent "1.0.0-rc1" :description "Kpow's Kafka Streams monitoring agent" :url "https://github.com/factorhouse/kpow-streams-agent" :license {:name "Apache-2.0 License" From 4de94baba14d1e54b16d7dd16ded9aaa6caa13f9 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Thu, 21 Nov 2024 10:27:35 +1100 Subject: [PATCH 16/29] Tweak key strat --- project.clj | 2 +- src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java | 2 +- src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/project.clj b/project.clj index cc4f4b7..263ec03 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject io.factorhouse/kpow-streams-agent "1.0.0-rc1" +(defproject io.factorhouse/kpow-streams-agent "1.0.0-rc2" :description "Kpow's Kafka Streams monitoring agent" :url "https://github.com/factorhouse/kpow-streams-agent" :license {:name "Apache-2.0 License" diff --git a/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java b/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java index 7b70726..109c97a 100644 --- a/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java +++ b/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java @@ -17,6 +17,6 @@ public ClusterIdKeyStrategy(Properties props) throws InterruptedException, Execu @Override public Taxon getTaxon(String clientId, String applicationId) { - return new Taxon("cluster", clusterId, "streams-agent-cid", clientId); + return new Taxon("cluster", clusterId, "streams-agent", clientId); } } diff --git a/src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java b/src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java index 86b3b3e..872cd06 100644 --- a/src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java +++ b/src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java @@ -9,6 +9,6 @@ public ManualKeyStrategy(String envName) { @Override public Taxon getTaxon(String clientId, String applicationId) { - return new Taxon("env", envName, "streams-agent-m", clientId); + return new Taxon("env", envName, "streams-agent", clientId); } } From 599b7bd52161af57d61701cc9ea90c57465db050 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Thu, 21 Nov 2024 14:31:47 +1100 Subject: [PATCH 17/29] Affix client+application-id to plan snapshot --- src/clojure/io/factorhouse/kpow/agent.clj | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/clojure/io/factorhouse/kpow/agent.clj b/src/clojure/io/factorhouse/kpow/agent.clj index 32d8d47..c794e62 100644 --- a/src/clojure/io/factorhouse/kpow/agent.clj +++ b/src/clojure/io/factorhouse/kpow/agent.clj @@ -137,6 +137,7 @@ :client-id client-id :captured captured :data (vec data) + :job/id job-id :snapshot/id {:domain :streams :id taxon}} record (ProducerRecord. (:topic snapshot-topic) taxon value)] @@ -144,16 +145,18 @@ (log/infof "Kpow: sent [%s] streams metrics for application.id %s" (count metrics) application-id))) (defn plan-send - [{:keys [snapshot-topic producer job-id captured taxon metrics-summary agent-id]}] + [{:keys [snapshot-topic producer job-id captured taxon metrics-summary agent-id application-id client-id]}] (let [taxon (p/datafy taxon) - plan {:type :observation/plan - :captured captured - :snapshot/id {:domain :streams :id taxon} - :job/id job-id - :data {:type :observe/streams-agent - :agent {:metrics-summary metrics-summary - :id agent-id - :version "1.0.0"}}} + plan {:type :observation/plan + :captured captured + :snapshot/id {:domain :streams :id taxon} + :job/id job-id + :application-id application-id + :client-id client-id + :data {:type :observe/streams-agent + :agent {:metrics-summary metrics-summary + :id agent-id + :version "1.0.0"}}} record (ProducerRecord. (:topic snapshot-topic) taxon plan)] (.get (.send producer record)))) From b0b6836b1bc960b057275a0b560b70190928f7e7 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Thu, 21 Nov 2024 14:32:10 +1100 Subject: [PATCH 18/29] 1.0.0-rc3 --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index 263ec03..ace9732 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject io.factorhouse/kpow-streams-agent "1.0.0-rc2" +(defproject io.factorhouse/kpow-streams-agent "1.0.0-rc3" :description "Kpow's Kafka Streams monitoring agent" :url "https://github.com/factorhouse/kpow-streams-agent" :license {:name "Apache-2.0 License" From f7e8736cb2ff6eae537347863988f50f2f322755 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Thu, 28 Nov 2024 13:29:36 +1100 Subject: [PATCH 19/29] 1.0.0-rc4 --- project.clj | 4 ++-- src/clojure/io/factorhouse/kpow/agent.clj | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/project.clj b/project.clj index ace9732..fdbfb7c 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject io.factorhouse/kpow-streams-agent "1.0.0-rc3" +(defproject io.factorhouse/kpow-streams-agent "1.0.0-rc4" :description "Kpow's Kafka Streams monitoring agent" :url "https://github.com/factorhouse/kpow-streams-agent" :license {:name "Apache-2.0 License" @@ -19,7 +19,7 @@ [:name "Derek Troy-West"] [:url "https://factorhouse.io"] [:roles - [:role "developer"] + [:role "developer"]7 [:role "maintainer"]]]]) :dependencies [[org.clojure/clojure "1.12.0"] [com.cognitect/transit-clj "1.0.333"] diff --git a/src/clojure/io/factorhouse/kpow/agent.clj b/src/clojure/io/factorhouse/kpow/agent.clj index c794e62..3f0417c 100644 --- a/src/clojure/io/factorhouse/kpow/agent.clj +++ b/src/clojure/io/factorhouse/kpow/agent.clj @@ -156,6 +156,7 @@ :data {:type :observe/streams-agent :agent {:metrics-summary metrics-summary :id agent-id + :captured captured :version "1.0.0"}}} record (ProducerRecord. (:topic snapshot-topic) taxon plan)] (.get (.send producer record)))) @@ -199,7 +200,8 @@ :producer producer :metrics-filter metrics-filter}] (doseq [[id [streams topology key-strategy]] @registered-topologies] - (try (when-let [next-ctx (snapshot-telemetry (assoc ctx :streams streams + (try (when-let [next-ctx (snapshot-telemetry (assoc ctx + :streams streams :topology topology :key-strategy key-strategy :agent-id id))] From 480b5da2e6dc7147768f61413cb53589d58a3948 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Wed, 18 Dec 2024 15:29:00 +1100 Subject: [PATCH 20/29] Flesh out static MetricFilter methods --- .../io/factorhouse/kpow/MetricFilter.java | 75 +++++++++++++++++-- 1 file changed, 70 insertions(+), 5 deletions(-) diff --git a/src/java/io/factorhouse/kpow/MetricFilter.java b/src/java/io/factorhouse/kpow/MetricFilter.java index 8d946ae..5677e9a 100644 --- a/src/java/io/factorhouse/kpow/MetricFilter.java +++ b/src/java/io/factorhouse/kpow/MetricFilter.java @@ -50,18 +50,76 @@ private MetricFilter(String id) { this.filterId = id; } + /** + * Returns the default metricsFilter used by the streams agent. + * By default, Kpow's streams agent will only send across the topology and streams state + * on each observation. + * + * @return the default metrics filter + */ public static MetricFilter defaultMetricFilter() { return new MetricFilter("default") - .acceptNameStartsWith("streams.state") .deny(); } + /** + * Returns a metrics filter that accepts all numeric metrics from the running Streams application. + * + * @return accept all metric filter + */ + public static MetricFilter acceptAllMetricFilter() { + return new MetricFilter("acceptAll").accept(); + } + + + /** + * Returns a metrics filter that includes only state store (and + * + * @return state store metrics only filter + */ + public static MetricFilter stateStoreMetricsOnlyFilter() { + Predicate stateStoreMetricsOnly = m -> + m.tags().containsKey("store") || + m.tags().containsKey("in-memory-state-id") || + m.tags().containsKey("in-memory-window-state-id") || + m.tags().containsKey("in-memory-session-state-id") || + m.tags().containsKey("rocksdb-session-state-id") || + m.tags().containsKey("rocksdb-state-id") || + m.tags().containsKey("rocksdb-window-state-id"); + return new MetricFilter("stateStoreMetricsOnly") + .accept(stateStoreMetricsOnly); + } + + public static MetricFilter essentialMetricsOnlyFilter() { + return new MetricFilter("essentialMetricsOnlyFilter") + // Lagency + .acceptNameStartsWith("commit-latency-avg") + .acceptNameStartsWith("process-latency-avg") + .acceptNameStartsWith("poll-latency-avg") + // Throughput + .acceptNameStartsWith("process-rate") + .acceptNameStartsWith("records-processed-rate") + // Lag + .acceptNameStartsWith("commit-rate") + .acceptNameStartsWith("records-lag-max") + .acceptNameStartsWith("records-lag") + // Stability + .acceptNameStartsWith("failed-stream-threads") + .acceptNameStartsWith("rebalances") + // State store health + .acceptNameStartsWith("put-rate") + .acceptNameStartsWith("get-rate") + .acceptNameStartsWith("flush-rate"); + } + public List getFilters() { return Collections.unmodifiableList(filters); } public MetricFilter accept() { - Predicate acceptPredicate = (_filter) -> { return true; }; + Predicate acceptPredicate = (_filter) -> { + return true; + }; FilterCriteria criteria = new FilterCriteria(acceptPredicate, FilterType.ACCEPT); this.filters.add(criteria); return this; @@ -74,7 +132,9 @@ public MetricFilter accept(Predicate acceptFilter) { } public MetricFilter deny() { - Predicate denyFilter = (_filter) -> { return true; }; + Predicate denyFilter = (_filter) -> { + return true; + }; FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY); this.filters.add(criteria); return this; @@ -87,16 +147,21 @@ public MetricFilter deny(Predicate denyFilter) { } public MetricFilter acceptNameStartsWith(String prefix) { - Predicate acceptFilter = (metricName) -> { return metricName.name().startsWith(prefix); }; + Predicate acceptFilter = (metricName) -> { + return metricName.name().startsWith(prefix); + }; FilterCriteria criteria = new FilterCriteria(acceptFilter, FilterType.ACCEPT); this.filters.add(criteria); return this; } public MetricFilter denyNameStartsWith(String prefix) { - Predicate denyFilter = (metricName) -> { return metricName.name().startsWith(prefix); }; + Predicate denyFilter = (metricName) -> { + return metricName.name().startsWith(prefix); + }; FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY); this.filters.add(criteria); return this; } } + From ea8cd23039308017e1093f702ecd685b30619e8b Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Wed, 18 Dec 2024 15:29:30 +1100 Subject: [PATCH 21/29] io.factorhouse/kpow-streams-agent "1.0.0-rc5" --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index fdbfb7c..d4af323 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject io.factorhouse/kpow-streams-agent "1.0.0-rc4" +(defproject io.factorhouse/kpow-streams-agent "1.0.0-rc5" :description "Kpow's Kafka Streams monitoring agent" :url "https://github.com/factorhouse/kpow-streams-agent" :license {:name "Apache-2.0 License" From 27b2151bfda5ac6936bea04a847a32273803c473 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Thu, 19 Dec 2024 10:19:15 +1100 Subject: [PATCH 22/29] Better document static metric filter methods --- .../io/factorhouse/kpow/MetricFilter.java | 53 +++++++++++++------ 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/src/java/io/factorhouse/kpow/MetricFilter.java b/src/java/io/factorhouse/kpow/MetricFilter.java index 5677e9a..f870a22 100644 --- a/src/java/io/factorhouse/kpow/MetricFilter.java +++ b/src/java/io/factorhouse/kpow/MetricFilter.java @@ -50,18 +50,6 @@ private MetricFilter(String id) { this.filterId = id; } - /** - * Returns the default metricsFilter used by the streams agent. - * By default, Kpow's streams agent will only send across the topology and streams state - * on each observation. - * - * @return the default metrics filter - */ - public static MetricFilter defaultMetricFilter() { - return new MetricFilter("default") - .deny(); - } - /** * Returns a metrics filter that accepts all numeric metrics from the running Streams application. * @@ -71,9 +59,18 @@ public static MetricFilter acceptAllMetricFilter() { return new MetricFilter("acceptAll").accept(); } + /** + * Returns a metrics filter that denies all metrics, only sending across the Kafka Streams topology + state + * on every observation. + * + * @return deny all metric filter + */ + public static MetricFilter denyAllMetricFilter() { + return new MetricFilter("denyAll").deny(); + } /** - * Returns a metrics filter that includes only state store (and + * Returns a metrics filter that includes only state store metrics. * * @return state store metrics only filter */ @@ -90,9 +87,33 @@ public static MetricFilter stateStoreMetricsOnlyFilter() { .accept(stateStoreMetricsOnly); } - public static MetricFilter essentialMetricsOnlyFilter() { - return new MetricFilter("essentialMetricsOnlyFilter") - // Lagency + /** + * Returns the default metricsFilter used by the streams agent. + * By default, Kpow's streams agent will only send across a few key Kafka Streams metrics: + * Latency: + * - commit-latency-avg + * - process-latency-avg + * - poll-latecny-avg + * Throughput: + * - process-rate + * - records-processed-rate + * Lag: + * - commit-rate + * - records-lag-max + * - records-lag + * Stability: + * - failed-stream-threads + * - rebalances + * State store health: + * - put-rate + * - get-rate + * - flush-rate + * + * @return the default metrics filter + */ + public static MetricFilter defaultMetricFilter() { + return new MetricFilter("default") + // Latency .acceptNameStartsWith("commit-latency-avg") .acceptNameStartsWith("process-latency-avg") .acceptNameStartsWith("poll-latency-avg") From b6f6b021e59aa1f690d71c89c3404f9b794e385e Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Thu, 19 Dec 2024 10:31:56 +1100 Subject: [PATCH 23/29] Document all MetricFilter mutation methods --- .../io/factorhouse/kpow/MetricFilter.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/java/io/factorhouse/kpow/MetricFilter.java b/src/java/io/factorhouse/kpow/MetricFilter.java index f870a22..f90978b 100644 --- a/src/java/io/factorhouse/kpow/MetricFilter.java +++ b/src/java/io/factorhouse/kpow/MetricFilter.java @@ -137,6 +137,11 @@ public List getFilters() { return Collections.unmodifiableList(filters); } + /** + * Accepts all metrics. + * + * @return an updated MetricFilter + */ public MetricFilter accept() { Predicate acceptPredicate = (_filter) -> { return true; @@ -146,12 +151,22 @@ public MetricFilter accept() { return this; } + /** + * Accepts a metric based on the specified Predicate. + * + * @return an updated MetricFilter + */ public MetricFilter accept(Predicate acceptFilter) { FilterCriteria criteria = new FilterCriteria(acceptFilter, FilterType.ACCEPT); this.filters.add(criteria); return this; } + /** + * Denies all metrics. + * + * @return an updated MetricFilter + */ public MetricFilter deny() { Predicate denyFilter = (_filter) -> { return true; @@ -161,12 +176,22 @@ public MetricFilter deny() { return this; } + /** + * Denies a metric based on the specified Predicate. + * + * @return an updated MetricFilter + */ public MetricFilter deny(Predicate denyFilter) { FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY); this.filters.add(criteria); return this; } + /** + * Accepts all metrics whose name start with the specified prefix. + * + * @return an updated MetricFilter + */ public MetricFilter acceptNameStartsWith(String prefix) { Predicate acceptFilter = (metricName) -> { return metricName.name().startsWith(prefix); @@ -176,6 +201,11 @@ public MetricFilter acceptNameStartsWith(String prefix) { return this; } + /** + * Denies all metrics whose name start with the specified prefix. + * + * @return an updated MetricFilter + */ public MetricFilter denyNameStartsWith(String prefix) { Predicate denyFilter = (metricName) -> { return metricName.name().startsWith(prefix); From 1c702e71d3be1a95124f54f9fdff934e9f87ad3a Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Thu, 19 Dec 2024 10:51:49 +1100 Subject: [PATCH 24/29] Deprecate io.operatr.kpow.StreamsRegistry --- README.md | 5 +++-- .../io/factorhouse/kpow/key/ManualKeyStrategy.java | 14 -------------- src/java/io/operatr/kpow/StreamsRegistry.java | 11 +++++++++-- 3 files changed, 12 insertions(+), 18 deletions(-) delete mode 100644 src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java diff --git a/README.md b/README.md index 4259603..e19995e 100644 --- a/README.md +++ b/README.md @@ -191,11 +191,12 @@ If you have set a UI-friendly cluster name using the `ENVIRONMENT_NAME` environm ```java -import io.factorhouse.kpow.key.ManualKeyStrategy; // This sets a manual key of `Trade Book (Staging)`, the name of the clusters environment name in Kpow's UI. KeyStrategy keyStrategy = new ManualKeyStrategy("Trade Book (Staging)"); -registry.register(streams, topology, keyStrategy); +registry. + + register(streams, topology, keyStrategy); ``` ### Minimum Required ACLs diff --git a/src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java b/src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java deleted file mode 100644 index 872cd06..0000000 --- a/src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java +++ /dev/null @@ -1,14 +0,0 @@ -package io.factorhouse.kpow.key; - -public class ManualKeyStrategy implements KeyStrategy { - private final String envName; - - public ManualKeyStrategy(String envName) { - this.envName = envName; - } - - @Override - public Taxon getTaxon(String clientId, String applicationId) { - return new Taxon("env", envName, "streams-agent", clientId); - } -} diff --git a/src/java/io/operatr/kpow/StreamsRegistry.java b/src/java/io/operatr/kpow/StreamsRegistry.java index 8366e56..087b675 100644 --- a/src/java/io/operatr/kpow/StreamsRegistry.java +++ b/src/java/io/operatr/kpow/StreamsRegistry.java @@ -3,6 +3,7 @@ import clojure.java.api.Clojure; import clojure.lang.IFn; import io.factorhouse.kpow.MetricFilter; +import io.factorhouse.kpow.key.KeyStrategy; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; @@ -11,6 +12,12 @@ import java.util.ArrayList; import java.util.Properties; + +/** + * @deprecated This class is no longer recommended for use. + * Please use {@link io.factorhouse.kpow.StreamsRegistry} instead. + */ +@Deprecated public class StreamsRegistry implements AutoCloseable { public static class StreamsAgent { @@ -91,11 +98,11 @@ public StreamsRegistry(Properties props) { this(props, MetricFilter.defaultMetricFilter()); } - public StreamsAgent register(KafkaStreams streams, Topology topology) { + public StreamsAgent register(KafkaStreams streams, Topology topology, KeyStrategy keyStrategy) { IFn require = Clojure.var("clojure.core", "require"); require.invoke(Clojure.read("io.factorhouse.kpow.agent")); IFn registerFn = Clojure.var("io.factorhouse.kpow.agent", "register"); - String id = (String) registerFn.invoke(agent, streams, topology); + String id = (String) registerFn.invoke(agent, streams, topology, keyStrategy); if (id != null) { return new StreamsAgent(id); } else { From 5419311dca34ee90576254cd128db6e0fa96c6b5 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Thu, 19 Dec 2024 11:17:45 +1100 Subject: [PATCH 25/29] Delete io.operatr.kpow.StreamsRegistry --- src/java/io/operatr/kpow/StreamsRegistry.java | 129 ------------------ 1 file changed, 129 deletions(-) delete mode 100644 src/java/io/operatr/kpow/StreamsRegistry.java diff --git a/src/java/io/operatr/kpow/StreamsRegistry.java b/src/java/io/operatr/kpow/StreamsRegistry.java deleted file mode 100644 index 087b675..0000000 --- a/src/java/io/operatr/kpow/StreamsRegistry.java +++ /dev/null @@ -1,129 +0,0 @@ -package io.operatr.kpow; - -import clojure.java.api.Clojure; -import clojure.lang.IFn; -import io.factorhouse.kpow.MetricFilter; -import io.factorhouse.kpow.key.KeyStrategy; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.Topology; - -import java.util.ArrayList; -import java.util.Properties; - - -/** - * @deprecated This class is no longer recommended for use. - * Please use {@link io.factorhouse.kpow.StreamsRegistry} instead. - */ -@Deprecated -public class StreamsRegistry implements AutoCloseable { - - public static class StreamsAgent { - private final String _id; - - StreamsAgent(String id) { - _id = id; - } - - public String getId() { - return _id; - } - } - - private final Object agent; - - public static Properties filterProperties(Properties props) { - ArrayList allowedKeys = new ArrayList<>(); - allowedKeys.add("ssl.enabled.protocols"); - allowedKeys.add("sasl.client.callback.handler.class"); - allowedKeys.add("ssl.endpoint.identification.algorithm"); - allowedKeys.add("ssl.provider"); - allowedKeys.add("ssl.truststore.location"); - allowedKeys.add("ssl.keystore.key"); - allowedKeys.add("ssl.key.password"); - allowedKeys.add("ssl.protocol"); - allowedKeys.add("ssl.keystore.password"); - allowedKeys.add("sasl.login.class"); - allowedKeys.add("ssl.trustmanager.algorithm"); - allowedKeys.add("ssl.keystore.location"); - allowedKeys.add("sasl.login.callback.handler.class"); - allowedKeys.add("ssl.truststore.certificates"); - allowedKeys.add("ssl.cipher.suites"); - allowedKeys.add("ssl.truststore.password"); - allowedKeys.add("ssl.keymanager.algorithm"); - allowedKeys.add("ssl.keystore.type"); - allowedKeys.add("ssl.secure.random.implementation"); - allowedKeys.add("ssl.truststore.type"); - allowedKeys.add("sasl.jaas.config"); - allowedKeys.add("ssl.keystore.certificate.chain"); - allowedKeys.add("sasl.mechanism"); - allowedKeys.add("sasl.oauthbearer.jwks.endpoint.url"); - allowedKeys.add("sasl.oauthbearer.token.endpoint.url"); - allowedKeys.add("sasl.kerberos.service.name"); - allowedKeys.add("security.protocol"); - allowedKeys.add("bootstrap.servers"); - - Properties nextProps = new Properties(); - for (String key : allowedKeys) { - if (props.containsKey(key)) { - nextProps.setProperty(key, String.valueOf(props.get(key))); - } - } - - String compressionType = props.getProperty("compression.type", "gzip"); - nextProps.setProperty("compression.type", compressionType); - - String idempotence = props.getProperty("enable.idempotence", "false"); - nextProps.setProperty("enable.idempotence", idempotence); - - return nextProps; - } - - public StreamsRegistry(Properties props, MetricFilter metricsFilter) { - IFn require = Clojure.var("clojure.core", "require"); - require.invoke(Clojure.read("io.factorhouse.kpow.agent")); - IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry"); - require.invoke(Clojure.read("io.factorhouse.kpow.serdes")); - IFn serdesFn = Clojure.var("io.factorhouse.kpow.serdes", "transit-json-serializer"); - Serializer keySerializer = (Serializer) serdesFn.invoke(); - Serializer valSerializer = (Serializer) serdesFn.invoke(); - Properties producerProps = filterProperties(props); - KafkaProducer producer = new KafkaProducer<>(producerProps, keySerializer, valSerializer); - agent = agentFn.invoke(producer, metricsFilter); - } - - public StreamsRegistry(Properties props) { - this(props, MetricFilter.defaultMetricFilter()); - } - - public StreamsAgent register(KafkaStreams streams, Topology topology, KeyStrategy keyStrategy) { - IFn require = Clojure.var("clojure.core", "require"); - require.invoke(Clojure.read("io.factorhouse.kpow.agent")); - IFn registerFn = Clojure.var("io.factorhouse.kpow.agent", "register"); - String id = (String) registerFn.invoke(agent, streams, topology, keyStrategy); - if (id != null) { - return new StreamsAgent(id); - } else { - return null; - } - } - - public void unregister(StreamsAgent streamsAgent) { - if (streamsAgent != null) { - IFn require = Clojure.var("clojure.core", "require"); - require.invoke(Clojure.read("io.factorhouse.kpow.agent")); - IFn unregisterFn = Clojure.var("io.factorhouse.kpow.agent", "unregister"); - unregisterFn.invoke(agent, streamsAgent.getId()); - } - } - - @Override - public void close() { - IFn require = Clojure.var("clojure.core", "require"); - require.invoke(Clojure.read("io.factorhouse.kpow.agent")); - IFn closeFn = Clojure.var("io.factorhouse.kpow.agent", "close-registry"); - closeFn.invoke(agent); - } -} From a4f8f96c76a6d61eef250938bce34047f8b39669 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Thu, 19 Dec 2024 11:24:40 +1100 Subject: [PATCH 26/29] JavaDoc for key strats --- src/java/io/factorhouse/kpow/key/ClientIdKeyStrategy.java | 7 +++++++ src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java | 4 ++++ 2 files changed, 11 insertions(+) diff --git a/src/java/io/factorhouse/kpow/key/ClientIdKeyStrategy.java b/src/java/io/factorhouse/kpow/key/ClientIdKeyStrategy.java index 9fe2602..32d171d 100644 --- a/src/java/io/factorhouse/kpow/key/ClientIdKeyStrategy.java +++ b/src/java/io/factorhouse/kpow/key/ClientIdKeyStrategy.java @@ -1,5 +1,12 @@ package io.factorhouse.kpow.key; +/** + * This key strategy relies on the client ID and application ID from the active KafkaStreams instance, eliminating the need for an AdminClient. + *

+ * However, in a multi-cluster Kpow deployment where the same application ID is used across multiple environments (e.g., staging, dev, prod), + * Kpow cannot determine which cluster the Kafka Streams instance is associated with. + *

+*/ public class ClientIdKeyStrategy implements KeyStrategy { public ClientIdKeyStrategy() {} diff --git a/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java b/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java index 109c97a..1b8a5ed 100644 --- a/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java +++ b/src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java @@ -5,6 +5,10 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; +/** + * The default key strategy uses the cluster ID, obtained via an {@link org.apache.kafka.clients.admin.Admin#describeClusters()} call. + * This AdminClient is created once during registry initialization and then closed. + */ public class ClusterIdKeyStrategy implements KeyStrategy { private final String clusterId; From 53adbe1114484f443be3709bf8b35b9e76a48d3a Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Thu, 19 Dec 2024 11:57:51 +1100 Subject: [PATCH 27/29] Document io.factorhouse.kpow.StreamsRegistry --- src/java/io/factorhouse/kpow/StreamsRegistry.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/java/io/factorhouse/kpow/StreamsRegistry.java b/src/java/io/factorhouse/kpow/StreamsRegistry.java index 09f2001..2e211f1 100644 --- a/src/java/io/factorhouse/kpow/StreamsRegistry.java +++ b/src/java/io/factorhouse/kpow/StreamsRegistry.java @@ -91,6 +91,11 @@ public StreamsRegistry(Properties props) { this(props, MetricFilter.defaultMetricFilter()); } + /** + * Registers a Kafka Streams application with Kpow's StreamsRegistry. + * + * @return a StreamsAgent instance for the registered application (used to unregister). + */ public StreamsAgent register(KafkaStreams streams, Topology topology, KeyStrategy keyStrategy) { IFn require = Clojure.var("clojure.core", "require"); require.invoke(Clojure.read("io.factorhouse.kpow.agent")); @@ -103,6 +108,10 @@ public StreamsAgent register(KafkaStreams streams, Topology topology, KeyStrateg } } + /** + * Unregisters a Kafka Streams application with Kpow's StreamsRegistry. Use the StreamsAgent instance returned from + * the register method. + */ public void unregister(StreamsAgent streamsAgent) { if (streamsAgent != null) { IFn require = Clojure.var("clojure.core", "require"); From 29d27d7b760b8e092081296a41d58ba07609bdc5 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Thu, 19 Dec 2024 12:06:20 +1100 Subject: [PATCH 28/29] Fix unit tests --- test/io/factorhouse/agent_test.clj | 203 ++++++++++++----------------- 1 file changed, 82 insertions(+), 121 deletions(-) diff --git a/test/io/factorhouse/agent_test.clj b/test/io/factorhouse/agent_test.clj index 1d83e94..4bf65f3 100644 --- a/test/io/factorhouse/agent_test.clj +++ b/test/io/factorhouse/agent_test.clj @@ -3,7 +3,7 @@ [clojure.test :refer :all] [io.factorhouse.kpow.agent :as agent]) (:import (io.factorhouse.kpow MetricFilter StreamsRegistry) - (io.factorhouse.kpow.key ClientIdKeyStrategy ManualKeyStrategy) + (io.factorhouse.kpow.key ClientIdKeyStrategy) (java.util Properties) (org.apache.kafka.clients.producer Producer) (org.apache.kafka.common Metric MetricName) @@ -94,41 +94,46 @@ (is (deref (:latch registry) 5000 false)) - (is (= #{[[:streams "abc123" :kafka/streams-agent] - {:type :kafka/streams-agent, - :application-id "xxx", - :client-id "abc123", - :data {:topology {:sub-topologies #{{:id 0, - :nodes #{{:name "KSTREAM-SOURCE-0000000000", - :predecessors #{}, - :successors #{}, - :topic-set #{"__oprtr_snapshot_state"}, - :topic-pattern nil}}}}, - :global-stores #{}}, - :state "RUNNING"}, - :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}] - [[:streams "abc123" :kafka/streams-agent] - {:type :observation/plan - :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]} - :data {:type :observe/streams-agent - :agent {:metrics-summary {:id "custom" - :sent 2 - :total 3} - :id agent-id - :version "1.0.0"}}}] - [[:streams "abc123" :kafka/streams-agent] - {:type :kafka/streams-agent-metrics, - :application-id "xxx", - :client-id "abc123", - :data [{:name "first.metric", :tags {}, :value 1.0} - {:name "second.metric", :tags {"client-id" "abc123"}, :value 2.0}], - :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}]} - (into #{} (map (fn [record] - [(.key record) (dissoc (.value record) :job/id :captured)])) - @records))) - - (testing "consistent :captured value" - (is (= 1 (count (into #{} (map (fn [record] (-> record (.value) :captured))) @records))))) + (let [captured (into #{} (map (fn [record] (-> record (.value) :captured))) @records)] + + (testing "consistent :captured value" + (is (= 1 (count captured)))) + + (is (= #{[[:streams "abc123" :kafka/streams-agent] + {:type :kafka/streams-agent, + :application-id "xxx", + :client-id "abc123", + :data {:topology {:sub-topologies #{{:id 0, + :nodes #{{:name "KSTREAM-SOURCE-0000000000", + :predecessors #{}, + :successors #{}, + :topic-set #{"__oprtr_snapshot_state"}, + :topic-pattern nil}}}}, + :global-stores #{}}, + :state "RUNNING"}, + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}] + [[:streams "abc123" :kafka/streams-agent] + {:type :observation/plan + :application-id "xxx", + :client-id "abc123", + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]} + :data {:type :observe/streams-agent + :agent {:captured (first captured) + :metrics-summary {:id "custom" + :sent 2 + :total 3} + :id agent-id + :version "1.0.0"}}}] + [[:streams "abc123" :kafka/streams-agent] + {:type :kafka/streams-agent-metrics, + :application-id "xxx", + :client-id "abc123", + :data [{:name "first.metric", :tags {}, :value 1.0} + {:name "second.metric", :tags {"client-id" "abc123"}, :value 2.0}], + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}]} + (into #{} (map (fn [record] + [(.key record) (dissoc (.value record) :job/id :captured)])) + @records)))) (is (agent/unregister registry agent)) @@ -154,91 +159,47 @@ (is (deref (:latch registry) 5000 false)) - (is (= #{[[:streams "abc123" :kafka/streams-agent] - {:type :kafka/streams-agent, - :application-id "xxx", - :client-id "abc123", - :data {:topology {:sub-topologies #{{:id 0, - :nodes #{{:name "KSTREAM-SOURCE-0000000000", - :predecessors #{}, - :successors #{}, - :topic-set #{"__oprtr_snapshot_state"}, - :topic-pattern nil}}}}, - :global-stores #{}}, - :state "RUNNING"}, - :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}] - [[:streams "abc123" :kafka/streams-agent] - {:type :observation/plan - :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]} - :data {:type :observe/streams-agent - :agent {:metrics-summary {:id "custom" - :sent 2 - :total 4} - :id agent-id - :version "1.0.0"}}}] - [[:streams "abc123" :kafka/streams-agent] - {:type :kafka/streams-agent-metrics, - :application-id "xxx", - :client-id "abc123", - :data [{:name "first.metric", :tags {}, :value 1.0} - {:name "rocksdb.foo", :tags {"client-id" "abc123"}, :value 3.0}], - :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}]} - (into #{} (map (fn [record] - [(.key record) (dissoc (.value record) :job/id :captured)])) - @records))) - - (testing "consistent :captured value" - (is (= 1 (count (into #{} (map (fn [record] (-> record (.value) :captured))) @records))))) + (let [captured (into #{} (map (fn [record] (-> record (.value) :captured))) @records)] + + (testing "consistent :captured value" + (is (= 1 (count captured)))) + + (is (= #{[[:streams "abc123" :kafka/streams-agent] + {:type :kafka/streams-agent, + :application-id "xxx", + :client-id "abc123", + :data {:topology {:sub-topologies #{{:id 0, + :nodes #{{:name "KSTREAM-SOURCE-0000000000", + :predecessors #{}, + :successors #{}, + :topic-set #{"__oprtr_snapshot_state"}, + :topic-pattern nil}}}}, + :global-stores #{}}, + :state "RUNNING"}, + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}] + [[:streams "abc123" :kafka/streams-agent] + {:type :observation/plan + :application-id "xxx", + :client-id "abc123", + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]} + :data {:type :observe/streams-agent + :agent {:captured (first captured) + :metrics-summary {:id "custom" + :sent 2 + :total 4} + :id agent-id + :version "1.0.0"}}}] + [[:streams "abc123" :kafka/streams-agent] + {:type :kafka/streams-agent-metrics, + :application-id "xxx", + :client-id "abc123", + :data [{:name "first.metric", :tags {}, :value 1.0} + {:name "rocksdb.foo", :tags {"client-id" "abc123"}, :value 3.0}], + :snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}]} + (into #{} (map (fn [record] + [(.key record) (dissoc (.value record) :job/id :captured)])) + @records)))) (is (agent/unregister registry agent)) - (is (empty? (agent/close-registry registry))))) - -(deftest agent-test-manual-key-strategy - (let [records (atom []) - metrics-filter (-> (MetricFilter.) (.deny)) - registry (agent/init-registry (mock-producer records) metrics-filter) - agent-id (agent/register registry - (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) - (mock-metric "rocksdb.foo" "first" "mock metric" {"client-id" "abc123"} 3.0) - (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") - (mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)]) - (test-topology) - (ManualKeyStrategy. "Trade Book (Staging)"))] - - (is agent-id) - - (is (deref (:latch registry) 5000 false)) - - (is (= #{[[:env "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] - {:type :kafka/streams-agent, - :application-id "xxx", - :client-id "abc123", - :data {:topology {:sub-topologies #{{:id 0, - :nodes #{{:name "KSTREAM-SOURCE-0000000000", - :predecessors #{}, - :successors #{}, - :topic-set #{"__oprtr_snapshot_state"}, - :topic-pattern nil}}}}, - :global-stores #{}}, - :state "RUNNING"}, - :snapshot/id {:domain :streams, :id [:env "Trade Book (Staging)" :kafka/streams-agent-m "abc123"]}}] - [[:env "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] - {:type :observation/plan - :snapshot/id {:domain :streams, :id [:env "Trade Book (Staging)" :kafka/streams-agent-m "abc123"]} - :data {:type :observe/streams-agent - :agent {:metrics-summary {:id "custom" - :sent 0 - :total 4} - :id agent-id - :version "1.0.0"}}}]} - (into #{} (map (fn [record] - [(.key record) (dissoc (.value record) :job/id :captured)])) - @records))) - - (testing "consistent :captured value" - (is (= 1 (count (into #{} (map (fn [record] (-> record (.value) :captured))) @records))))) - - (is (agent/unregister registry agent)) - - (is (empty? (agent/close-registry registry))))) + (is (empty? (agent/close-registry registry))))) \ No newline at end of file From 48236ae68776ebff67df5e8eb2668d6521b5a5db Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Thu, 19 Dec 2024 12:16:25 +1100 Subject: [PATCH 29/29] io.factorhouse/kpow-streams-agent "1.0.0-rc6" --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index d4af323..830876f 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject io.factorhouse/kpow-streams-agent "1.0.0-rc5" +(defproject io.factorhouse/kpow-streams-agent "1.0.0-rc6" :description "Kpow's Kafka Streams monitoring agent" :url "https://github.com/factorhouse/kpow-streams-agent" :license {:name "Apache-2.0 License"