From 43d7f60c3b6c9a04f7aaeefa919f73428d44b2c0 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakivskyy Date: Mon, 26 Aug 2024 16:34:26 +0300 Subject: [PATCH] nfd-gc: Remove stale NRT objects Remove stale NRT objects whose creator pod does not exist anymore. Signed-off-by: Oleg Zhurakivskyy --- .../templates/topologyupdater.yaml | 4 ++ pkg/nfd-gc/nfd-gc.go | 46 +++++++++--- pkg/nfd-gc/nfd-gc_test.go | 72 ++++++++++++++++--- .../nfd-topology-updater.go | 3 + test/e2e/utils/pod/pod.go | 8 +++ 5 files changed, 114 insertions(+), 19 deletions(-) diff --git a/deployment/helm/node-feature-discovery/templates/topologyupdater.yaml b/deployment/helm/node-feature-discovery/templates/topologyupdater.yaml index a94aac8701..2c0018f023 100644 --- a/deployment/helm/node-feature-discovery/templates/topologyupdater.yaml +++ b/deployment/helm/node-feature-discovery/templates/topologyupdater.yaml @@ -55,6 +55,10 @@ spec: valueFrom: fieldRef: fieldPath: status.hostIP + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name command: - "nfd-topology-updater" args: diff --git a/pkg/nfd-gc/nfd-gc.go b/pkg/nfd-gc/nfd-gc.go index c3dc8dae37..7ffe1d6887 100644 --- a/pkg/nfd-gc/nfd-gc.go +++ b/pkg/nfd-gc/nfd-gc.go @@ -19,6 +19,7 @@ package nfdgarbagecollector import ( "context" "fmt" + "strings" "time" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" @@ -28,12 +29,15 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/kubernetes" + k8sclient "k8s.io/client-go/kubernetes" metadataclient "k8s.io/client-go/metadata" "k8s.io/client-go/metadata/metadatainformer" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1" + "sigs.k8s.io/node-feature-discovery/pkg/nfd-topology-updater" "sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/version" ) @@ -42,6 +46,7 @@ var ( gvrNF = nfdv1alpha1.SchemeGroupVersion.WithResource("nodefeatures") gvrNRT = topologyv1alpha2.SchemeGroupVersion.WithResource("noderesourcetopologies") gvrNode = corev1.SchemeGroupVersion.WithResource("nodes") + gcNRTs = map[string]string{} ) // Args are the command line arguments @@ -57,10 +62,11 @@ type NfdGarbageCollector interface { } type nfdGarbageCollector struct { - args *Args - stopChan chan struct{} - client metadataclient.Interface - factory metadatainformer.SharedInformerFactory + args *Args + stopChan chan struct{} + client metadataclient.Interface + factory metadatainformer.SharedInformerFactory + k8sClient k8sclient.Interface } func New(args *Args) (NfdGarbageCollector, error) { @@ -70,12 +76,14 @@ func New(args *Args) (NfdGarbageCollector, error) { } cli := metadataclient.NewForConfigOrDie(kubeconfig) + clientset := kubernetes.NewForConfigOrDie(kubeconfig) return &nfdGarbageCollector{ - args: args, - stopChan: make(chan struct{}), - client: cli, - factory: metadatainformer.NewSharedInformerFactory(cli, 0), + args: args, + stopChan: make(chan struct{}), + client: cli, + factory: metadatainformer.NewSharedInformerFactory(cli, 0), + k8sClient: clientset, }, nil } @@ -190,7 +198,27 @@ func (n *nfdGarbageCollector) garbageCollect() { // Handle NodeResourceTopology objects listAndHandle(gvrNRT, func(meta metav1.PartialObjectMetadata) { - if !nodeNames.Has(meta.Name) { + deleteNRT := false + + if label, ok := meta.Labels[nfdtopologyupdater.NRTOwnerPodLabel]; ok { + if s := strings.Split(label, "_"); len(s) == 2 { + ns := s[0] + pod := s[1] + _, err := n.k8sClient.CoreV1().Pods(ns).Get(context.TODO(), pod, metav1.GetOptions{}) + if errors.IsNotFound(err) { + if _, ok := gcNRTs[meta.Name]; !ok { + gcNRTs[meta.Name] = label + } else { + delete(gcNRTs, meta.Name) + deleteNRT = true + } + } else if err != nil { + klog.ErrorS(err, "failed to get Pod object") + } + } + } + + if !nodeNames.Has(meta.Name) || deleteNRT { n.deleteNRT(meta.Name) } }) diff --git a/pkg/nfd-gc/nfd-gc_test.go b/pkg/nfd-gc/nfd-gc_test.go index d93c487481..a8b61d86ce 100644 --- a/pkg/nfd-gc/nfd-gc_test.go +++ b/pkg/nfd-gc/nfd-gc_test.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" + fakek8sclientset "k8s.io/client-go/kubernetes/fake" metadataclient "k8s.io/client-go/metadata" "k8s.io/client-go/metadata/fake" fakemetadataclient "k8s.io/client-go/metadata/fake" @@ -37,7 +38,7 @@ import ( func TestNRTGC(t *testing.T) { Convey("When theres is old NRT ", t, func() { - gc := newMockGC(nil, []string{"node1"}) + gc := newMockGC(nil, []string{"node1"}, []string{"pod1"}) errChan := make(chan error) go func() { errChan <- gc.Run() }() @@ -48,7 +49,7 @@ func TestNRTGC(t *testing.T) { So(<-errChan, ShouldBeNil) }) Convey("When theres is one old NRT and one up to date", t, func() { - gc := newMockGC([]string{"node1"}, []string{"node1", "node2"}) + gc := newMockGC([]string{"node1"}, []string{"node1", "node2"}, []string{"pod1", "pod2"}) errChan := make(chan error) go func() { errChan <- gc.Run() }() @@ -59,7 +60,7 @@ func TestNRTGC(t *testing.T) { So(<-errChan, ShouldBeNil) }) Convey("Should react to delete event", t, func() { - gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"}) + gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"}, []string{"pod1", "pod2"}) errChan := make(chan error) go func() { errChan <- gc.Run() }() @@ -71,7 +72,7 @@ func TestNRTGC(t *testing.T) { So(waitForNRT(gc.client, "node2"), ShouldBeTrue) }) Convey("periodic GC should remove obsolete NRT", t, func() { - gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"}) + gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"}, []string{"pod1", "pod2"}) // Override period to run fast gc.args.GCPeriod = 100 * time.Millisecond @@ -86,16 +87,35 @@ func TestNRTGC(t *testing.T) { So(waitForNRT(gc.client, "node1", "node2"), ShouldBeTrue) }) + Convey("periodic GC should remove stale NRT", t, func() { + gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"}, []string{"pod1", "pod2"}) + // Override period to run fast + gc.args.GCPeriod = 100 * time.Millisecond + + nrt := createPartialObjectMetadata("topology.node.k8s.io/v1alpha2", "NodeResourceTopology", "", "not-existing") + nrt.ObjectMeta.Labels = map[string]string{"owner-pod": "pod4"} + + errChan := make(chan error) + go func() { errChan <- gc.Run() }() + + gvr := topologyv1alpha2.SchemeGroupVersion.WithResource("noderesourcetopologies") + _, err := gc.client.Resource(gvr).(fake.MetadataClient).CreateFake(nrt, metav1.CreateOptions{}) + So(err, ShouldBeNil) + + So(waitForNrtPodsGC(gc.client, "pod1", "pod2"), ShouldBeTrue) + }) } -func newMockGC(nodes, nrts []string) *mockGC { +func newMockGC(nodes, nrts, pods []string) *mockGC { // Create fake objects objs := []runtime.Object{} for _, name := range nodes { objs = append(objs, createPartialObjectMetadata("v1", "Node", "", name)) } - for _, name := range nrts { - objs = append(objs, createPartialObjectMetadata("topology.node.k8s.io/v1alpha2", "NodeResourceTopology", "", name)) + for i, name := range nrts { + nrt := createPartialObjectMetadata("topology.node.k8s.io/v1alpha2", "NodeResourceTopology", "", name) + nrt.ObjectMeta.Labels = map[string]string{"owner-pod": pods[i]} + objs = append(objs, nrt) } scheme := fake.NewTestScheme() @@ -103,9 +123,10 @@ func newMockGC(nodes, nrts []string) *mockGC { cli := fakemetadataclient.NewSimpleMetadataClient(scheme, objs...) return &mockGC{ nfdGarbageCollector: nfdGarbageCollector{ - factory: metadatainformer.NewSharedInformerFactory(cli, 0), - client: cli, - stopChan: make(chan struct{}), + factory: metadatainformer.NewSharedInformerFactory(cli, 0), + client: cli, + k8sClient: fakek8sclientset.NewSimpleClientset(createFakePods(pods...)...), + stopChan: make(chan struct{}), args: &Args{ GCPeriod: 10 * time.Minute, }, @@ -114,6 +135,17 @@ func newMockGC(nodes, nrts []string) *mockGC { } } +func createFakePods(names ...string) []runtime.Object { + pods := make([]runtime.Object, len(names)) + for i, n := range names { + pods[i] = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: n, + }} + } + return pods +} + func createPartialObjectMetadata(apiVersion, kind, namespace, name string) *metav1.PartialObjectMetadata { return &metav1.PartialObjectMetadata{ TypeMeta: metav1.TypeMeta{ @@ -152,3 +184,23 @@ func waitForNRT(cli metadataclient.Interface, names ...string) bool { } return false } + +func waitForNrtPodsGC(cli metadataclient.Interface, pods ...string) bool { + podsSet := sets.NewString(pods...) + gvr := topologyv1alpha2.SchemeGroupVersion.WithResource("noderesourcetopologies") + for i := 0; i < 2; i++ { + rsp, err := cli.Resource(gvr).List(context.TODO(), metav1.ListOptions{}) + So(err, ShouldBeNil) + + nrtPods := sets.NewString() + for _, nrt := range rsp.Items { + nrtPods.Insert(nrt.Labels["owner-pod"]) + } + + if nrtPods.Equal(podsSet) { + return true + } + time.Sleep(1 * time.Second) + } + return false +} diff --git a/pkg/nfd-topology-updater/nfd-topology-updater.go b/pkg/nfd-topology-updater/nfd-topology-updater.go index e65edbea2b..c58042c7e6 100644 --- a/pkg/nfd-topology-updater/nfd-topology-updater.go +++ b/pkg/nfd-topology-updater/nfd-topology-updater.go @@ -52,6 +52,7 @@ const ( TopologyManagerPolicyAttributeName = "topologyManagerPolicy" // TopologyManagerScopeAttributeName represents an attribute which defines Topology Manager Policy Scope TopologyManagerScopeAttributeName = "topologyManagerScope" + NRTOwnerPodLabel = "nfd.node.kubernetes.io/owner-pod" ) // Args are the command line arguments @@ -294,6 +295,7 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi nrtNew := v1alpha2.NodeResourceTopology{ ObjectMeta: metav1.ObjectMeta{ Name: w.nodeName, + Labels: map[string]string{NRTOwnerPodLabel: fmt.Sprintf("%s_%s", w.kubernetesNamespace, os.Getenv("POD_NAME"))}, OwnerReferences: w.ownerRefs, }, Zones: zoneInfo, @@ -317,6 +319,7 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi nrtMutated := nrt.DeepCopy() nrtMutated.Zones = zoneInfo nrtMutated.OwnerReferences = w.ownerRefs + nrtMutated.Labels[NRTOwnerPodLabel] = fmt.Sprintf("%s_%s", w.kubernetesNamespace, os.Getenv("POD_NAME")) attributes := scanResponse.Attributes diff --git a/test/e2e/utils/pod/pod.go b/test/e2e/utils/pod/pod.go index 435405266f..8eb5906891 100644 --- a/test/e2e/utils/pod/pod.go +++ b/test/e2e/utils/pod/pod.go @@ -423,6 +423,14 @@ func NFDTopologyUpdaterSpec(kc utils.KubeletConfig, opts ...SpecOption) *corev1. }, }, }, + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, }, SecurityContext: &corev1.SecurityContext{ Capabilities: &corev1.Capabilities{