Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus exporter operator POC #578

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions charts/newrelic-infrastructure/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ rules:
- "nodes"
- "namespaces"
verbs: [ "get", "list", "watch" ]
- apiGroups: [ "" ]
resources:
- "pods"
- "secrets"
verbs: [ "get", "list", "watch", "create"]
- nonResourceURLs: ["/metrics"]
verbs: ["get"]
{{- if .Values.rbac.pspEnabled }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{{- if .Values.kubelet.enabled -}}
---
apiVersion: v1
kind: ConfigMap
Expand All @@ -10,9 +9,5 @@ metadata:
data:
nri-kubernetes.yml: |
{{- (merge .Values.common.config (include "newrelic.integrationConfigDefaults" . | fromYaml)) | toYaml | nindent 4 }}
kubelet:
enabled: true
{{- if .Values.kubelet.config }}
{{- toYaml .Values.kubelet.config | nindent 6 }}
{{- end }}
{{- end }}
operator:
enabled: true
300 changes: 249 additions & 51 deletions cmd/nri-kubernetes/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"os"
"path"
Expand All @@ -9,22 +10,24 @@ import (
"time"

sdk "github.com/newrelic/infra-integrations-sdk/integration"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"

"github.com/newrelic/nri-kubernetes/v3/internal/config"
"github.com/newrelic/nri-kubernetes/v3/internal/discovery"
"github.com/newrelic/nri-kubernetes/v3/src/client"
"github.com/newrelic/nri-kubernetes/v3/src/controlplane"
"github.com/newrelic/nri-kubernetes/v3/src/integration"
"github.com/newrelic/nri-kubernetes/v3/src/ksm"
ksmClient "github.com/newrelic/nri-kubernetes/v3/src/ksm/client"
"github.com/newrelic/nri-kubernetes/v3/src/kubelet"
kubeletClient "github.com/newrelic/nri-kubernetes/v3/src/kubelet/client"
"github.com/newrelic/nri-kubernetes/v3/src/prometheus"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)

const (
Expand Down Expand Up @@ -75,36 +78,38 @@ func main() {
}
}

integrationOptions := []integration.OptionFunc{
integration.WithLogger(logger),
integration.WithMetadata(integration.Metadata{
Name: integrationName,
Version: integrationVersion,
}),
}
/*
integrationOptions := []integration.OptionFunc{
integration.WithLogger(logger),
integration.WithMetadata(integration.Metadata{
Name: integrationName,
Version: integrationVersion,
}),
}

switch c.Sink.Type {
case config.SinkTypeHTTP:
integrationOptions = append(integrationOptions, integration.WithHTTPSink(c.Sink.HTTP))
case config.SinkTypeStdout:
// We don't need to do anything here to sink to stdout, as it's the default behavior of integration.Wrapper.
logger.Warn("Sinking metrics to stdout")
default:
log.Errorf("Unknown sink type %s", c.Sink.Type)
os.Exit(exitConfig)
}
switch c.Sink.Type {
case config.SinkTypeHTTP:
integrationOptions = append(integrationOptions, integration.WithHTTPSink(c.Sink.HTTP))
case config.SinkTypeStdout:
// We don't need to do anything here to sink to stdout, as it's the default behavior of integration.Wrapper.
logger.Warn("Sinking metrics to stdout")
default:
log.Errorf("Unknown sink type %s", c.Sink.Type)
os.Exit(exitConfig)
}

iw, err := integration.NewWrapper(integrationOptions...)
if err != nil {
logger.Errorf("creating integration wrapper: %v", err)
os.Exit(exitIntegration)
}
iw, err := integration.NewWrapper(integrationOptions...)
if err != nil {
logger.Errorf("creating integration wrapper: %v", err)
os.Exit(exitIntegration)
}

i, err := iw.Integration()
if err != nil {
logger.Errorf("creating integration with http sink: %v", err)
os.Exit(exitIntegration)
}
i, err := iw.Integration()
if err != nil {
logger.Errorf("creating integration with http sink: %v", err)
os.Exit(exitIntegration)
}
*/

logger.Infof(
"New Relic %s integration Version: %s, Platform: %s, GoVersion: %s, GitCommit: %s, BuildDate: %s\n",
Expand All @@ -123,14 +128,16 @@ func main() {

namespaceCache := discovery.NewNamespaceInMemoryStore(logger)

var kubeletScraper *kubelet.Scraper
if c.Kubelet.Enabled {
kubeletScraper, err = setupKubelet(c, clients, namespaceCache)
if err != nil {
logger.Errorf("setting up ksm scraper: %v", err)
os.Exit(exitSetup)
/*
var kubeletScraper *kubelet.Scraper
if c.Kubelet.Enabled {
kubeletScraper, err = setupKubelet(c, clients, namespaceCache)
if err != nil {
logger.Errorf("setting up ksm scraper: %v", err)
os.Exit(exitSetup)
}
}
}
*/

var ksmScraper *ksm.Scraper
if c.KSM.Enabled {
Expand All @@ -152,24 +159,38 @@ func main() {
defer controlplaneScraper.Close()
}

var o *operator
if c.Operator.Enabled {
o = createOperator(clients.k8s)
o.log = logger
}

for {
start := time.Now()

logger.Debugf("scraping data from all the scrapers defined: KSM: %t, Kubelet: %t, ControlPlane: %t",
c.KSM.Enabled, c.Kubelet.Enabled, c.ControlPlane.Enabled)

// TODO think carefully to the signature of this function
err := runScrapers(c, ksmScraper, kubeletScraper, controlplaneScraper, i)
if err != nil {
logger.Errorf("retrieving scraper data: %v", err)
os.Exit(exitLoop)
}
/*
// TODO think carefully to the signature of this function
err := runScrapers(c, ksmScraper, kubeletScraper, controlplaneScraper, i)
if err != nil {
logger.Errorf("retrieving scraper data: %v", err)
os.Exit(exitLoop)
}
*/

logger.Debugf("publishing data")
err = i.Publish()
if err != nil {
logger.Errorf("publishing integration: %v", err)
os.Exit(exitLoop)
/*
err = i.Publish()
if err != nil {
logger.Errorf("publishing integration: %v", err)
os.Exit(exitLoop)
}
*/
if c.Operator.Enabled {
o.run()
o.log = logger
}

namespaceCache.Vacuum()
Expand Down Expand Up @@ -206,6 +227,183 @@ func runScrapers(c *config.Config, ksmScraper *ksm.Scraper, kubeletScraper *kube
return nil
}

type operator struct {
lister v1.PodNamespaceLister
client kubernetes.Interface
log *log.Logger
}

const operatorNamespace = "redis"

func createOperator(client kubernetes.Interface) *operator {
listMap, _ := discovery.NewNamespacePodListerer(discovery.PodListererConfig{Client: client, Namespaces: []string{operatorNamespace}})

l, _ := listMap.Lister(operatorNamespace)

return &operator{
lister: l,
client: client,
}
}

func (o operator) run() {
logger.Debugf("running operator")

listWorkLoads, listIntegrations := o.listInterestingPods()

logger.Debugf("CREATING INTEGRATIONS")

for _, w := range listWorkLoads {
found := false
for _, i := range listIntegrations {
if strings.Contains(i.Name, w.Name) {
logger.Debugf("workload already monitored %s by %s", w.Name, i.Name)

found = true
} else {
logger.Debugf("%q does not contain %q", i.Name, w.Name)
}
}

if w.Status.PodIP == "" {
logger.Warnf("SKIPPIN FOR NOW %q since IP is empty", w.Name)
}

if !found && w.Status.PodIP != "" {
o.deployIntegration(w)
}
}

logger.Debugf("CLEANING INTEGRATIONS")

for _, i := range listIntegrations {
found := false
for _, w := range listWorkLoads {
if strings.Contains(i.Name, w.Name) { //TODO this is an example, we should also check that config did not change
logger.Debugf("integration monitoring workload %s by %s", w.Name, i.Name)

found = true
}
}

if !found {
o.deleteIntegration(i)
}

}
}

func (o operator) listInterestingPods() ([]*corev1.Pod, []*corev1.Pod) {
listWorkLoads, err := o.lister.List(
labels.SelectorFromSet(labels.Set{
"monitoring-role": "workload-to-monitor",
}))
if err != nil {
o.log.Errorf("listing workloads %v", err)
}

logger.Debugf("found pods %d", len(listWorkLoads))

listIntegrations, err := o.lister.List(
labels.SelectorFromSet(labels.Set{
"monitoring-role": "integration-monitoring-workload",
}))
if err != nil {
o.log.Errorf("listing integrations %v", err)
}

logger.Debugf("found integrations %d", len(listIntegrations))

return listWorkLoads, listIntegrations
}

func (o operator) deleteIntegration(pod *corev1.Pod) {
logger.Debugf("deleting integrations and secret %q", pod.Name)
err := o.client.CoreV1().Pods(operatorNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
if err != nil {
o.log.Errorf("deleting integration %v", err)
}
err = o.client.CoreV1().Secrets(operatorNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
if err != nil {
o.log.Errorf("deleting secret %v", err)
}
}

func (o operator) deployIntegration(pod *corev1.Pod) {
o.log.Infof("creating a new integration for %s", pod.Name)

// The image to grab will come from an in the monitored service's pod annotation
containerIntegration := corev1.Container{
Name: "integration",
Image: "acabanas977/nri-redis:latest",
Env: []corev1.EnvVar{
{
Name: "WORKLOAD_NODE_IP",
Value: pod.Status.HostIP,
},
},
EnvFrom: []corev1.EnvFromSource{
{
SecretRef: &corev1.SecretEnvSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: pod.Name + "-integration",
},
},
},
},
Resources: corev1.ResourceRequirements{},
}

// The port exposed can always be the same in all our prometheus-exporter images
p := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name + "-integration",
Namespace: operatorNamespace,
Labels: map[string]string{
"monitoring-role": "integration-monitoring-workload",
},
Annotations: map[string]string{
"prometheus.io/scrape": "true",
"prometheus.io/port": "9121",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
containerIntegration,
},
},
}

_, err := o.client.CoreV1().Pods(operatorNamespace).Create(context.Background(), p, metav1.CreateOptions{})
if err != nil {
o.log.Errorf("creating pod %v", err)
}

data := map[string][]byte{}
populatedString := strings.Replace(pod.Annotations["config"], "${discovery.ip}", pod.Status.PodIP, 100)
rows := strings.Split(populatedString, "\n")
for _, r := range rows {
val := strings.Split(r, ": ")
if len(val) == 2 {
data[val[0]] = []byte(val[1])
} else {
o.log.Errorf("Unexpected string %q", rows)
}

}

_, err = o.client.CoreV1().Secrets(operatorNamespace).Create(context.Background(), &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name + "-integration",
Namespace: operatorNamespace,
},
Data: data,
}, metav1.CreateOptions{})
if err != nil {
o.log.Errorf("creating secret %v", err)
}
}

func setupKSM(c *config.Config, clients *clusterClients, namespaceCache *discovery.NamespaceInMemoryStore) (*ksm.Scraper, error) {
providers := ksm.Providers{
K8s: clients.k8s,
Expand Down
Loading