Skip to content

Commit

Permalink
allow restricting Kube metadata to local node only (#1440) (#1456)
Browse files Browse the repository at this point in the history
* allow restricting Kube metadata to local node only

* integration tests

* increase number of cores for test runners

* document new option

* added extra logging

* increased prometheus TSDB retention

* restore lower git action machines

(cherry picked from commit 3b31c2e)
  • Loading branch information
mariomac authored Dec 13, 2024
1 parent aa9c245 commit 6d9088b
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pull_request_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
test:
name: test
runs-on: ubuntu-latest
runs-on: ubuntu-latest-8-cores
strategy:
matrix:
go: [ '1.23' ]
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pull_request_k8s_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
test:
name: test
runs-on: ubuntu-latest
runs-on: ubuntu-latest-8-cores
strategy:
matrix:
go: [ '1.23' ]
Expand Down
11 changes: 11 additions & 0 deletions docs/sources/configure/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,17 @@ reduces the load of the Kubernetes API.
The Pods informer can't be disabled. For that purpose, you should disable the whole
Kubernetes metadata decoration.

| YAML | Environment variable | Type | Default |
|----------------------------|---------------------------------------|---------|---------|
| `meta_restrict_local_node` | `BEYLA_KUBE_META_RESTRICT_LOCAL_NODE` | boolean | false |

If true, Beyla stores Pod and Node metadata only from the node where the Beyla instance is running.

This option decreases the memory used to store the metadata, but some metrics
(such as network bytes or service graph metrics) would miss the metadata from destination
pods that are located in a different node.


| YAML | Environment variable | Type | Default |
|--------------------------|-------------------------------------|----------|---------|
| `informers_sync_timeout` | `BEYLA_KUBE_INFORMERS_SYNC_TIMEOUT` | Duration | 30s |
Expand Down
1 change: 1 addition & 0 deletions pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func buildCommonContextInfo(
DisabledInformers: config.Attributes.Kubernetes.DisableInformers,
MetaCacheAddr: config.Attributes.Kubernetes.MetaCacheAddress,
MetaSourceLabels: config.Attributes.Kubernetes.MetaSourceLabels,
RestrictLocalNode: config.Attributes.Kubernetes.MetaRestrictLocalNode,
}),
}
switch {
Expand Down
16 changes: 15 additions & 1 deletion pkg/internal/kube/informer_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type MetadataConfig struct {
ResyncPeriod time.Duration
MetaCacheAddr string
MetaSourceLabels MetaSourceLabels
RestrictLocalNode bool
}

type MetadataProvider struct {
Expand All @@ -43,6 +44,8 @@ type MetadataProvider struct {
metadata *Store
informer meta.Notifier

localNodeName string

cfg *MetadataConfig
}

Expand Down Expand Up @@ -126,6 +129,9 @@ func (mp *MetadataProvider) getInformer(ctx context.Context) (meta.Notifier, err
}

func (mp *MetadataProvider) CurrentNodeName(ctx context.Context) (string, error) {
if mp.localNodeName != "" {
return mp.localNodeName, nil
}
log := klog().With("func", "NodeName")
kubeClient, err := mp.KubeClient()
if err != nil {
Expand Down Expand Up @@ -153,7 +159,8 @@ func (mp *MetadataProvider) CurrentNodeName(ctx context.Context) (string, error)
" host name as node name", "nodeName", currentPod, "namespace", currentNamespace, "error", err)
return currentPod, nil
}
return pods.Items[0].Spec.NodeName, nil
mp.localNodeName = pods.Items[0].Spec.NodeName
return mp.localNodeName, nil
}

// initLocalInformers initializes an informer client that directly connects to the Node Kube API
Expand All @@ -167,6 +174,13 @@ func (mp *MetadataProvider) initLocalInformers(ctx context.Context) (*meta.Infor
meta.WaitForCacheSync(),
meta.WithCacheSyncTimeout(mp.cfg.SyncTimeout),
)
if mp.cfg.RestrictLocalNode {
localNode, err := mp.CurrentNodeName(ctx)
if err != nil {
return nil, fmt.Errorf("getting local node name: %w", err)
}
opts = append(opts, meta.RestrictNode(localNode))
}
return meta.InitInformers(ctx, opts...)
}

Expand Down
103 changes: 79 additions & 24 deletions pkg/kubecache/meta/informers_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"os"
"path"
"strings"
"sync"
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/testing/protocmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -44,6 +46,8 @@ type informersConfig struct {
disableNodes bool
disableServices bool

restrictNode string

// waits for cache synchronization at start
waitCacheSync bool
cacheSyncTimeout time.Duration
Expand Down Expand Up @@ -80,6 +84,12 @@ func WithoutServices() InformerOption {
}
}

func RestrictNode(nodeName string) InformerOption {
return func(c *informersConfig) {
c.restrictNode = nodeName
}
}

func WithKubeClient(client kubernetes.Interface) InformerOption {
return func(c *informersConfig) {
c.kubeClient = client
Expand Down Expand Up @@ -119,27 +129,25 @@ func InitInformers(ctx context.Context, opts ...InformerOption) (*Informers, err
}
}

informerFactory := informers.NewSharedInformerFactory(svc.config.kubeClient, svc.config.resyncPeriod)

if err := svc.initPodInformer(ctx, informerFactory); err != nil {
createdFactories, err := svc.initInformers(ctx, config)
if err != nil {
return nil, err
}
if !svc.config.disableNodes {
if err := svc.initNodeIPInformer(ctx, informerFactory); err != nil {
return nil, err
}
}
if !svc.config.disableServices {
if err := svc.initServiceIPInformer(ctx, informerFactory); err != nil {
return nil, err
}
}

svc.log.Debug("starting kubernetes informers")
informerFactory.Start(ctx.Done())
allSynced := sync.WaitGroup{}
allSynced.Add(len(createdFactories))
for _, factory := range createdFactories {
factory.Start(ctx.Done())
go func() {
factory.WaitForCacheSync(ctx.Done())
allSynced.Done()
}()
}

go func() {
svc.log.Debug("waiting for informers' syncronization")
informerFactory.WaitForCacheSync(ctx.Done())
svc.log.Debug("waiting for informers' synchronization")
allSynced.Wait()
svc.log.Debug("informers synchronized")
close(svc.waitForSync)
}()
Expand All @@ -159,6 +167,49 @@ func InitInformers(ctx context.Context, opts ...InformerOption) (*Informers, err

}

func (inf *Informers) initInformers(ctx context.Context, config *informersConfig) ([]informers.SharedInformerFactory, error) {
var informerFactory informers.SharedInformerFactory
if config.restrictNode == "" {
informerFactory = informers.NewSharedInformerFactory(inf.config.kubeClient, inf.config.resyncPeriod)
} else {
informerFactory = informers.NewSharedInformerFactoryWithOptions(inf.config.kubeClient, inf.config.resyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"spec.nodeName": config.restrictNode}.String()
}))
}
createdFactories := []informers.SharedInformerFactory{informerFactory}
if err := inf.initPodInformer(ctx, informerFactory); err != nil {
return nil, err
}

if !inf.config.disableNodes {
nodeIFactory := informerFactory
if config.restrictNode != "" {
nodeIFactory = informers.NewSharedInformerFactoryWithOptions(inf.config.kubeClient, inf.config.resyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": config.restrictNode}.String()
}))
createdFactories = append(createdFactories, nodeIFactory)
} // else: use default, unfiltered informerFactory instance
if err := inf.initNodeIPInformer(ctx, nodeIFactory); err != nil {
return nil, err
}
}
if !inf.config.disableServices {
svcIFactory := informerFactory
if config.restrictNode != "" {
// informerFactory will be initially set to a "spec.nodeName"-filtered instance, so we need
// to create an unfiltered one for global services
svcIFactory = informers.NewSharedInformerFactory(inf.config.kubeClient, inf.config.resyncPeriod)
createdFactories = append(createdFactories, svcIFactory)
}
if err := inf.initServiceIPInformer(ctx, svcIFactory); err != nil {
return nil, err
}
}
return createdFactories, nil
}

func initConfigOpts(opts []InformerOption) *informersConfig {
config := &informersConfig{}
for _, opt := range opts {
Expand Down Expand Up @@ -466,32 +517,34 @@ func headlessService(om *informer.ObjectMeta) bool {

func (inf *Informers) ipInfoEventHandler(ctx context.Context) *cache.ResourceEventHandlerFuncs {
metrics := instrument.FromContext(ctx)
log := inf.log.With("func", "ipInfoEventHandler")
return &cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
metrics.InformerNew()
em := obj.(*indexableEntity).EncodedMeta
log.Debug("AddFunc", "kind", em.Kind, "name", em.Name, "ips", em.Ips)
// ignore headless services from being added
if headlessService(obj.(*indexableEntity).EncodedMeta) {
return
}
inf.Notify(&informer.Event{
Type: informer.EventType_CREATED,
Resource: obj.(*indexableEntity).EncodedMeta,
Resource: em,
})
},
UpdateFunc: func(oldObj, newObj interface{}) {
metrics.InformerUpdate()
newEM := newObj.(*indexableEntity).EncodedMeta
oldEM := oldObj.(*indexableEntity).EncodedMeta
// ignore headless services from being added
if headlessService(newObj.(*indexableEntity).EncodedMeta) &&
headlessService(oldObj.(*indexableEntity).EncodedMeta) {
if headlessService(newEM) && headlessService(oldEM) {
return
}
if cmp.Equal(
oldObj.(*indexableEntity).EncodedMeta,
newObj.(*indexableEntity).EncodedMeta,
protoCmpTransform,
) {
if cmp.Equal(oldEM, newEM, protoCmpTransform) {
return
}
log.Debug("UpdateFunc", "kind", newEM.Kind, "name", newEM.Name,
"ips", newEM.Ips, "oldIps", oldEM.Ips)
inf.Notify(&informer.Event{
Type: informer.EventType_UPDATED,
Resource: newObj.(*indexableEntity).EncodedMeta,
Expand All @@ -511,6 +564,8 @@ func (inf *Informers) ipInfoEventHandler(ctx context.Context) *cache.ResourceEve
return
}
}
em := obj.(*indexableEntity).EncodedMeta
log.Debug("DeleteFunc", "kind", em.Kind, "name", em.Name, "ips", em.Ips)

metrics.InformerDelete()
inf.Notify(&informer.Event{
Expand Down
4 changes: 4 additions & 0 deletions pkg/transform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type KubernetesDecorator struct {
// MetaCacheAddress is the host:port address of the beyla-k8s-cache service instance
MetaCacheAddress string `yaml:"meta_cache_address" env:"BEYLA_KUBE_META_CACHE_ADDRESS"`

// MetaRestrictLocalNode will download only the metadata from the Pods that are located in the same
// node as the Beyla instance. It will also restrict the Node information to the local node.
MetaRestrictLocalNode bool `yaml:"meta_restrict_local_node" env:"BEYLA_KUBE_META_RESTRICT_LOCAL_NODE"`

// MetaSourceLabels allows Beyla overriding the service name and namespace of an application from
// the given labels.
// TODO Beyla 2.0. Consider defaulting to (and report as a breaking change):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ spec:
- name: prometheus
image: quay.io/prometheus/prometheus:v2.53.0
args:
- --storage.tsdb.retention.time=1m
- --storage.tsdb.retention.time=10m
- --config.file=/etc/prometheus/prometheus-config.yml
- --storage.tsdb.path=/prometheus
- --web.enable-lifecycle
Expand Down
Loading

0 comments on commit 6d9088b

Please sign in to comment.