Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams agent 1.0.0 #7

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
169d88f
Add io.factorhouse.kpow.MetricsFilter class
wavejumper Nov 12, 2024
15b57e1
Add allow/deny predicates to MetricFilter
wavejumper Nov 12, 2024
6aa9f90
Document the MetricFilter class
wavejumper Nov 12, 2024
ca8cb95
Document key strategy
wavejumper Nov 12, 2024
dd4dc7b
Implement KeyStrategy interface
wavejumper Nov 12, 2024
4bd2d2e
Better document KeyStrategy usage
wavejumper Nov 13, 2024
4a8708e
Key strategy package structure
wavejumper Nov 13, 2024
df2ecb3
Add unit tests for key strat + metric filters
wavejumper Nov 13, 2024
380a59a
Test deny metric filter
wavejumper Nov 13, 2024
d3af2cc
fmtfix
wavejumper Nov 13, 2024
b822adc
Add agent metadata to observation snapshot
wavejumper Nov 18, 2024
6ef14c9
fmtfix
wavejumper Nov 18, 2024
34616b1
Pass thru agent id in observation plan
wavejumper Nov 18, 2024
b2118b1
Merge branch 'main' of github.com:factorhouse/kpow-streams-agent into…
wavejumper Nov 18, 2024
690a11a
kondo
wavejumper Nov 19, 2024
4f138e5
1.0.0-rc1
wavejumper Nov 19, 2024
4de94ba
Tweak key strat
wavejumper Nov 20, 2024
599b7bd
Affix client+application-id to plan snapshot
wavejumper Nov 21, 2024
b0b6836
1.0.0-rc3
wavejumper Nov 21, 2024
f7e8736
1.0.0-rc4
wavejumper Nov 28, 2024
480b5da
Flesh out static MetricFilter methods
wavejumper Dec 18, 2024
ea8cd23
io.factorhouse/kpow-streams-agent "1.0.0-rc5"
wavejumper Dec 18, 2024
27b2151
Better document static metric filter methods
wavejumper Dec 18, 2024
b6f6b02
Document all MetricFilter mutation methods
wavejumper Dec 18, 2024
1c702e7
Deprecate io.operatr.kpow.StreamsRegistry
wavejumper Dec 18, 2024
5419311
Delete io.operatr.kpow.StreamsRegistry
wavejumper Dec 19, 2024
a4f8f96
JavaDoc for key strats
wavejumper Dec 19, 2024
53adbe1
Document io.factorhouse.kpow.StreamsRegistry
wavejumper Dec 19, 2024
29d27d7
Fix unit tests
wavejumper Dec 19, 2024
48236ae
io.factorhouse/kpow-streams-agent "1.0.0-rc6"
wavejumper Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 85 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,25 @@ In your application, just before you start your KafkaStreams instance:

```java
import io.factorhouse.kpow.StreamsRegistry;
import io.factorhouse.kpow.key.ClusterIdKeyStrategy;

// Your Kafka Streams topology
Topology topology = createMyTopology();
Topology topology = createMyTopology();

// Your Kafka Streams config
Properties props = new createMyStreamProperties();

// Your Kafka Streams instance
KafkaStreams streams = new KafkaStreams(topology, props);
KafkaStreams streams = new KafkaStreams(topology, props);

// Create a Kpow StreamsRegistry
StreamsRegistry registry = new StreamsRegistry(props);

// Specify the key strategy when writing metrics to the internal Kafka topic
KeyStrategy keyStrategy = new ClusterIdKeyStrategy(props);

// Register your KafkaStreams and Topology instances with the StreamsRegistry
registry.register(streams, topology);
registry.register(streams, topology, keyStrategy);

// Start your Kafka Streams application
streams.start();
Expand All @@ -82,7 +86,39 @@ The StreamsRegistry is a *single-threaded process* that performs these actions *

The StreamsRegistry **does not talk directly to Kpow**. Kpow reads streams data from the snapshot topic.

# Configuration
# Metric filters

You can configure each streams registry with metric filters, which give you greater control over which metrics Kpow's streams agent will export.

Metric filters can be chained and added programmatically:

```java
import io.factorhouse.kpow.StreamsRegistry;
import io.factorhouse.kpow.MetricFilter;

MetricFilter metricFilter = MetricFilter().deny(); // don't send any streams metrics, just send through the Streams Topology

// ..

StreamsRegistry registry = new StreamsRegistry(props, metricFilter);
```

If you pass no metric filters to the `StreamsRegistry` constructor then the default metric filter will be used. The default metric filter will **accept** all metrics to be exported.

### Metric filter usage

Kpow's streams agent metric filters work very similar to Micrometer's [meter filters](https://github.com/micrometer-metrics/micrometer-docs/blob/main/src/docs/concepts/meter-filters.adoc).

Metric filters can either `ACCEPT` or `DENY` a metric. The filter itself is a Java predicate which takes in the [org.apache.common.MetricName](https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/MetricName.html#group()) class. This allows you to filter metrics by name, tags or group.

Metric filters are applied sequentially in the order they are configured in the registry. This allows for stacking of deny and accept filters to create more complex rules:

```java
MetricFilter metricFilter = MetricFilter().acceptNameStartsWith("rocksdb").deny();
```
The above example allows all rocksdb related metrics through and denies all other types of streams metrics.

# Kafka connection

The `StreamsRegistry` `Properties` contains configuration to create the snapshot producer.

Expand Down Expand Up @@ -121,6 +157,48 @@ Producer configuration means any of the following fields:

For more details visit the [Producer](https://kafka.apache.org/documentation/#producerconfigs) section of the Apache Kafka documentation.

### Key strategy

The keying strategy for data sent from Kpow's streams agent to its internal Kafka topic is configurable. The key strategy plays an important role in enabling Kpow to align stream metrics with the UI accurately. There are many key strategies available depending on your organisation's deployment.

#### Cluster ID (recommended key strategy, requires Kpow 94.1+)

The default key strategy uses the cluster ID, obtained via an AdminClient [describeClusters](https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/admin/DescribeClusterResult.html) call. This AdminClient is created once during registry initialization and then closed. If you prefer not to have the streams registry create an AdminClient—either because your Kafka variant does not provide a cluster ID or due to security considerations—you may select an alternative key strategy from the options below.

```java
// Specify the key strategy when writing metrics to the internal Kafka topic
// props are java.util.Properties describing the Kafka Connection
KeyStrategy keyStrategy = new ClusterIDKeyStrategy(props);
// Register your KafkaStreams and Topology instances with the StreamsRegistry
registry.register(streams, topology, keyStrategy);
```

#### Client ID (default in 0.2.0 and below)

This key strategy relies on the client ID and application ID from the active KafkaStreams instance, eliminating the need for an AdminClient. However, in a multi-cluster Kpow deployment where the same application ID is used across multiple environments (e.g., staging, dev, prod), Kpow cannot determine which cluster the Kafka Streams instance is associated with.

```java

import io.factorhouse.kpow.key.ClientIdKeyStrategy;

KeyStrategy keyStrategy = new ClientIdKeyStrategy();
registry.register(streams, topology, keyStrategy);
```

#### Environment name (manual, requires Kpow 94.1+)

If you have set a UI-friendly cluster name using the `ENVIRONMENT_NAME` environment variable in Kpow, you can use this environment name as the keying strategy for the streams agent.

```java


// This sets a manual key of `Trade Book (Staging)`, the name of the clusters environment name in Kpow's UI.
KeyStrategy keyStrategy = new ManualKeyStrategy("Trade Book (Staging)");
registry.

register(streams, topology, keyStrategy);
```

### Minimum Required ACLs

If you secure your Kafka Cluster with ACLs, the user provided in the Producer configuration must have permission to write to the internal Kpow topic.
Expand All @@ -147,7 +225,7 @@ Properties streamsProps = new Properties();
KafkaStreams streams = new KafkaStreams(topology, streamsProps);

StreamsRegistry registry = new StreamsRegistry(streamsProps);
...
//...
```

### Multi-Cluster Kpow
Expand All @@ -160,7 +238,7 @@ KafkaStreams streams = new KafkaStreams(topology, streamsProps);

Properties primaryProps = createMyPrimaryClusterProducerProperties();
StreamsRegistry registry = new StreamsRegistry(primaryProps);
...
//...
```

See the [Kpow Multi-Cluster Feature Guide](https://docs.factorhouse.io/kpow-ee/config/multi-cluster/) for more information.
Expand Down
4 changes: 2 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject io.factorhouse/kpow-streams-agent "0.2.13"
(defproject io.factorhouse/kpow-streams-agent "1.0.0-rc6"
:description "Kpow's Kafka Streams monitoring agent"
:url "https://github.com/factorhouse/kpow-streams-agent"
:license {:name "Apache-2.0 License"
Expand All @@ -19,7 +19,7 @@
[:name "Derek Troy-West"]
[:url "https://factorhouse.io"]
[:roles
[:role "developer"]
[:role "developer"]7
[:role "maintainer"]]]])
:dependencies [[org.clojure/clojure "1.12.0"]
[com.cognitect/transit-clj "1.0.333"]
Expand Down
Loading
Loading