From 4aa43c0e6585a48237825a5fba56b40a7e23ef5b Mon Sep 17 00:00:00 2001 From: Iceber Gu Date: Fri, 10 Dec 2021 23:06:34 +0800 Subject: [PATCH] sync resource with the last resource version --- .../informer/named_controller.go | 47 ++++++++++++--- .../informer/resourceversion_informer.go | 15 +++-- .../informer/resourceversion_storage.go | 60 +++++++++++++++++-- .../clustersynchro/resource_synchro.go | 13 ++-- 4 files changed, 111 insertions(+), 24 deletions(-) diff --git a/pkg/synchromanager/clustersynchro/informer/named_controller.go b/pkg/synchromanager/clustersynchro/informer/named_controller.go index fb785148d..608081fc7 100644 --- a/pkg/synchromanager/clustersynchro/informer/named_controller.go +++ b/pkg/synchromanager/clustersynchro/informer/named_controller.go @@ -14,24 +14,34 @@ type controller struct { config cache.Config reflectorMutex sync.RWMutex - reflector *cache.Reflector - queue cache.Queue + reflector *Reflector + + lastResourceVersion string } -func NewNamedController(name string, config *cache.Config) cache.Controller { +func NewNamedController(name string, config *cache.Config) *controller { return &controller{ name: name, config: *config, } } +func (c *controller) SetLastResourceVersion(lastResourceVersion string) { + c.reflectorMutex.Lock() + defer c.reflectorMutex.Unlock() + if c.reflector != nil { + panic("controller is running, connot set last resource version") + } + c.lastResourceVersion = lastResourceVersion +} + func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() - r := cache.NewNamedReflector( + r := NewNamedReflector( c.name, c.config.ListerWatcher, c.config.ObjectType, @@ -42,6 +52,9 @@ func (c *controller) Run(stopCh <-chan struct{}) { r.WatchListPageSize = c.config.WatchListPageSize c.reflectorMutex.Lock() + if c.lastResourceVersion != "" { + r.lastSyncResourceVersion = c.lastResourceVersion + } c.reflector = r c.reflectorMutex.Unlock() @@ -52,6 +65,27 @@ func (c *controller) Run(stopCh <-chan struct{}) { wg.Wait() } +/* +func (c *controller) setLastResourceVersionForReflector(reflector *cache.Reflector) { + if c.resourceVersionGetter == nil { + return + } + + rv := c.resourceVersionGetter.LastResourceVersion() + if rv == "" || rv == "0" { + return + } + rvValue := reflect.ValueOf(rv) + + field := reflect.ValueOf(reflector).Elem().FieldByName("lastSyncResourceVersion") + value := reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem() + if value.Kind() != rvValue.Kind() { + panic(fmt.Sprintf("reflector.lastSyncResourceVersion's value kind is %v", value.Kind())) + } + value.Set(rvValue) +} +*/ + func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process)) @@ -71,10 +105,7 @@ func (c *controller) HasSynced() bool { c.reflectorMutex.RLock() defer c.reflectorMutex.RUnlock() - if c.queue == nil { - return false - } - return c.queue.HasSynced() + return c.config.Queue.HasSynced() } func (c *controller) LastSyncResourceVersion() string { diff --git a/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go b/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go index 4c92aecc0..59c744ad6 100644 --- a/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go +++ b/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go @@ -7,7 +7,7 @@ import ( ) type ResourceVersionInformer interface { - Run(stopCh <-chan struct{}) + Run(withLastResourceVersion bool, stopCh <-chan struct{}) HasSynced() bool } @@ -15,7 +15,7 @@ type resourceVersionInformer struct { name string storage *ResourceVersionStorage handler ResourceEventHandler - controller cache.Controller + controller *controller listerWatcher cache.ListerWatcher } @@ -24,7 +24,6 @@ func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *Re panic("name is required") } - // storage: NewResourceVersionStorage(cache.DeletionHandlingMetaNamespaceKeyFunc), informer := &resourceVersionInformer{ name: name, listerWatcher: lw, @@ -38,7 +37,7 @@ func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *Re RetryOnError: false, Process: func(obj interface{}) error { deltas := obj.(cache.Deltas) - return informer.HandleDeltas(deltas) + return informer.handleDeltas(deltas) }, Queue: cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{ KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc, @@ -54,11 +53,15 @@ func (informer *resourceVersionInformer) HasSynced() bool { return informer.controller.HasSynced() } -func (informer *resourceVersionInformer) Run(stopCh <-chan struct{}) { +func (informer *resourceVersionInformer) Run(withLastResourceVersion bool, stopCh <-chan struct{}) { + // TODO(iceber): It can only be run once and an error is reported if it is run a second time + if withLastResourceVersion { + informer.controller.SetLastResourceVersion(informer.storage.LastResourceVersion()) + } informer.controller.Run(stopCh) } -func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas) error { +func (informer *resourceVersionInformer) handleDeltas(deltas cache.Deltas) error { for _, d := range deltas { switch d.Type { case cache.Replaced, cache.Added, cache.Updated: diff --git a/pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go b/pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go index 77960e9a7..923aee577 100644 --- a/pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go +++ b/pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go @@ -1,23 +1,35 @@ package informer import ( + "strconv" + "sync/atomic" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/client-go/tools/cache" ) type ResourceVersionStorage struct { keyFunc cache.KeyFunc - cacheStorage cache.ThreadSafeStore + lastResourceVersion *uint64 + cacheStorage cache.ThreadSafeStore } var _ cache.KeyListerGetter = &ResourceVersionStorage{} func NewResourceVersionStorage(keyFunc cache.KeyFunc) *ResourceVersionStorage { - return &ResourceVersionStorage{ - cacheStorage: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), - keyFunc: keyFunc, + var lastResourceVersion uint64 + storage := &ResourceVersionStorage{ + keyFunc: keyFunc, + lastResourceVersion: &lastResourceVersion, + cacheStorage: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), } + return storage +} + +func (c *ResourceVersionStorage) LastResourceVersion() string { + return strconv.FormatUint(atomic.LoadUint64(c.lastResourceVersion), 10) } func (c *ResourceVersionStorage) Add(obj interface{}) error { @@ -25,12 +37,19 @@ func (c *ResourceVersionStorage) Add(obj interface{}) error { if err != nil { return cache.KeyError{Obj: obj, Err: err} } + accessor, err := meta.Accessor(obj) if err != nil { return err } - c.cacheStorage.Add(key, accessor.GetResourceVersion()) + resourceversion := accessor.GetResourceVersion() + rv, err := etcd3.Versioner.ParseResourceVersion(resourceversion) + if err != nil { + return err + } + atomic.CompareAndSwapUint64(c.lastResourceVersion, *c.lastResourceVersion, rv) + c.cacheStorage.Add(key, resourceversion) return nil } @@ -39,12 +58,19 @@ func (c *ResourceVersionStorage) Update(obj interface{}) error { if err != nil { return cache.KeyError{Obj: obj, Err: err} } + accessor, err := meta.Accessor(obj) if err != nil { return err } - c.cacheStorage.Update(key, accessor.GetResourceVersion()) + resourceversion := accessor.GetResourceVersion() + rv, err := etcd3.Versioner.ParseResourceVersion(resourceversion) + if err != nil { + return err + } + atomic.CompareAndSwapUint64(c.lastResourceVersion, *c.lastResourceVersion, rv) + c.cacheStorage.Update(key, resourceversion) return nil } @@ -54,6 +80,15 @@ func (c *ResourceVersionStorage) Delete(obj interface{}) error { return cache.KeyError{Obj: obj, Err: err} } + if accessor, err := meta.Accessor(obj); err == nil { + resourceversion := accessor.GetResourceVersion() + rv, err := etcd3.Versioner.ParseResourceVersion(resourceversion) + if err != nil { + return err + } + atomic.CompareAndSwapUint64(c.lastResourceVersion, *c.lastResourceVersion, rv) + } + c.cacheStorage.Delete(key) return nil } @@ -80,6 +115,19 @@ func (c *ResourceVersionStorage) GetByKey(key string) (item interface{}, exists } func (c *ResourceVersionStorage) Replace(versions map[string]interface{}) error { + var lastResourceVersion uint64 + for _, version := range versions { + rv, err := etcd3.Versioner.ParseResourceVersion(version.(string)) + if err != nil { + // TODO(iceber): handle err + continue + } + + if rv > lastResourceVersion { + lastResourceVersion = rv + } + } + atomic.StoreUint64(c.lastResourceVersion, lastResourceVersion) c.cacheStorage.Replace(versions, "") return nil } diff --git a/pkg/synchromanager/clustersynchro/resource_synchro.go b/pkg/synchromanager/clustersynchro/resource_synchro.go index a5e3f9ccf..880eb7631 100644 --- a/pkg/synchromanager/clustersynchro/resource_synchro.go +++ b/pkg/synchromanager/clustersynchro/resource_synchro.go @@ -27,9 +27,10 @@ type ResourceSynchro struct { cluster string storageResource schema.GroupResource - queue queue.EventQueue - listerWatcher cache.ListerWatcher - cache *informer.ResourceVersionStorage + queue queue.EventQueue + listerWatcher cache.ListerWatcher + cache *informer.ResourceVersionStorage + syncWithLastResourceVersion bool memoryVersion schema.GroupVersion convertor runtime.ObjectConvertor @@ -69,6 +70,7 @@ func newResourceSynchro(cluster string, lw cache.ListerWatcher, rvcache *informe closed: make(chan struct{}), } close(synchro.stoped) + synchro.syncWithLastResourceVersion = true status := clustersv1alpha1.ClusterResourceSyncCondition{ Status: clustersv1alpha1.SyncStatusPending, @@ -139,7 +141,10 @@ func (synchro *ResourceSynchro) Run(stopCh <-chan struct{}) { synchro.cache, &unstructured.Unstructured{}, synchro, - ).Run(informerStopCh) + ).Run(synchro.syncWithLastResourceVersion, informerStopCh) + + // next run informer with last resource version + synchro.syncWithLastResourceVersion = true status = clustersv1alpha1.ClusterResourceSyncCondition{ Status: clustersv1alpha1.SyncStatusStop,