Skip to content

Commit

Permalink
sync resource with the last resource version
Browse files Browse the repository at this point in the history
  • Loading branch information
Iceber committed Dec 14, 2021
1 parent 256d919 commit a5acce8
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 24 deletions.
47 changes: 39 additions & 8 deletions pkg/synchromanager/clustersynchro/informer/named_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,34 @@ type controller struct {
config cache.Config

reflectorMutex sync.RWMutex
reflector *cache.Reflector
queue cache.Queue
reflector *Reflector

lastResourceVersion string
}

func NewNamedController(name string, config *cache.Config) cache.Controller {
func NewNamedController(name string, config *cache.Config) *controller {
return &controller{
name: name,
config: *config,
}
}

func (c *controller) SetLastResourceVersion(lastResourceVersion string) {
c.reflectorMutex.Lock()
defer c.reflectorMutex.Unlock()
if c.reflector != nil {
panic("controller is running, connot set last resource version")
}
c.lastResourceVersion = lastResourceVersion
}

func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := cache.NewNamedReflector(
r := NewNamedReflector(
c.name,
c.config.ListerWatcher,
c.config.ObjectType,
Expand All @@ -42,6 +52,9 @@ func (c *controller) Run(stopCh <-chan struct{}) {
r.WatchListPageSize = c.config.WatchListPageSize

c.reflectorMutex.Lock()
if c.lastResourceVersion != "" {
r.lastSyncResourceVersion = c.lastResourceVersion
}
c.reflector = r
c.reflectorMutex.Unlock()

Expand All @@ -52,6 +65,27 @@ func (c *controller) Run(stopCh <-chan struct{}) {
wg.Wait()
}

/*
func (c *controller) setLastResourceVersionForReflector(reflector *cache.Reflector) {
if c.resourceVersionGetter == nil {
return
}
rv := c.resourceVersionGetter.LastResourceVersion()
if rv == "" || rv == "0" {
return
}
rvValue := reflect.ValueOf(rv)
field := reflect.ValueOf(reflector).Elem().FieldByName("lastSyncResourceVersion")
value := reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem()
if value.Kind() != rvValue.Kind() {
panic(fmt.Sprintf("reflector.lastSyncResourceVersion's value kind is %v", value.Kind()))
}
value.Set(rvValue)
}
*/

func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process))
Expand All @@ -71,10 +105,7 @@ func (c *controller) HasSynced() bool {
c.reflectorMutex.RLock()
defer c.reflectorMutex.RUnlock()

if c.queue == nil {
return false
}
return c.queue.HasSynced()
return c.config.Queue.HasSynced()
}

func (c *controller) LastSyncResourceVersion() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
)

type ResourceVersionInformer interface {
Run(stopCh <-chan struct{})
Run(withLastResourceVersion bool, stopCh <-chan struct{})
HasSynced() bool
}

type resourceVersionInformer struct {
name string
storage *ResourceVersionStorage
handler ResourceEventHandler
controller cache.Controller
controller *controller
listerWatcher cache.ListerWatcher
}

Expand All @@ -24,7 +24,6 @@ func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *Re
panic("name is required")
}

// storage: NewResourceVersionStorage(cache.DeletionHandlingMetaNamespaceKeyFunc),
informer := &resourceVersionInformer{
name: name,
listerWatcher: lw,
Expand All @@ -38,7 +37,7 @@ func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *Re
RetryOnError: false,
Process: func(obj interface{}) error {
deltas := obj.(cache.Deltas)
return informer.HandleDeltas(deltas)
return informer.handleDeltas(deltas)
},
Queue: cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc,
Expand All @@ -54,11 +53,15 @@ func (informer *resourceVersionInformer) HasSynced() bool {
return informer.controller.HasSynced()
}

func (informer *resourceVersionInformer) Run(stopCh <-chan struct{}) {
func (informer *resourceVersionInformer) Run(withLastResourceVersion bool, stopCh <-chan struct{}) {
// TODO(iceber): It can only be run once and an error is reported if it is run a second time
if withLastResourceVersion {
informer.controller.SetLastResourceVersion(informer.storage.LastResourceVersion())
}
informer.controller.Run(stopCh)
}

func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas) error {
func (informer *resourceVersionInformer) handleDeltas(deltas cache.Deltas) error {
for _, d := range deltas {
switch d.Type {
case cache.Replaced, cache.Added, cache.Updated:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,55 @@
package informer

import (
"strconv"
"sync/atomic"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/client-go/tools/cache"
)

type ResourceVersionStorage struct {
keyFunc cache.KeyFunc

cacheStorage cache.ThreadSafeStore
lastResourceVersion *uint64
cacheStorage cache.ThreadSafeStore
}

var _ cache.KeyListerGetter = &ResourceVersionStorage{}

func NewResourceVersionStorage(keyFunc cache.KeyFunc) *ResourceVersionStorage {
return &ResourceVersionStorage{
cacheStorage: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
keyFunc: keyFunc,
var lastResourceVersion uint64
storage := &ResourceVersionStorage{
keyFunc: keyFunc,
lastResourceVersion: &lastResourceVersion,
cacheStorage: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
}
return storage
}

func (c *ResourceVersionStorage) LastResourceVersion() string {
return strconv.FormatUint(atomic.LoadUint64(c.lastResourceVersion), 10)
}

func (c *ResourceVersionStorage) Add(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}

accessor, err := meta.Accessor(obj)
if err != nil {
return err
}

c.cacheStorage.Add(key, accessor.GetResourceVersion())
resourceversion := accessor.GetResourceVersion()
rv, err := etcd3.Versioner.ParseResourceVersion(resourceversion)
if err != nil {
return err
}
atomic.CompareAndSwapUint64(c.lastResourceVersion, *c.lastResourceVersion, rv)
c.cacheStorage.Add(key, resourceversion)
return nil
}

Expand All @@ -39,12 +58,19 @@ func (c *ResourceVersionStorage) Update(obj interface{}) error {
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}

accessor, err := meta.Accessor(obj)
if err != nil {
return err
}

c.cacheStorage.Update(key, accessor.GetResourceVersion())
resourceversion := accessor.GetResourceVersion()
rv, err := etcd3.Versioner.ParseResourceVersion(resourceversion)
if err != nil {
return err
}
atomic.CompareAndSwapUint64(c.lastResourceVersion, *c.lastResourceVersion, rv)
c.cacheStorage.Update(key, resourceversion)
return nil
}

Expand All @@ -54,6 +80,15 @@ func (c *ResourceVersionStorage) Delete(obj interface{}) error {
return cache.KeyError{Obj: obj, Err: err}
}

if accessor, err := meta.Accessor(obj); err == nil {
resourceversion := accessor.GetResourceVersion()
rv, err := etcd3.Versioner.ParseResourceVersion(resourceversion)
if err != nil {
return err
}
atomic.CompareAndSwapUint64(c.lastResourceVersion, *c.lastResourceVersion, rv)
}

c.cacheStorage.Delete(key)
return nil
}
Expand All @@ -80,6 +115,19 @@ func (c *ResourceVersionStorage) GetByKey(key string) (item interface{}, exists
}

func (c *ResourceVersionStorage) Replace(versions map[string]interface{}) error {
var lastResourceVersion uint64
for _, version := range versions {
rv, err := etcd3.Versioner.ParseResourceVersion(version.(string))
if err != nil {
// TODO(iceber): handle err
continue
}

if rv > lastResourceVersion {
lastResourceVersion = rv
}
}
atomic.StoreUint64(c.lastResourceVersion, lastResourceVersion)
c.cacheStorage.Replace(versions, "")
return nil
}
15 changes: 11 additions & 4 deletions pkg/synchromanager/clustersynchro/resource_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ type ResourceSynchro struct {
cluster string
storageResource schema.GroupResource

queue queue.EventQueue
listerWatcher cache.ListerWatcher
cache *informer.ResourceVersionStorage
queue queue.EventQueue
listerWatcher cache.ListerWatcher
cache *informer.ResourceVersionStorage
syncWithLastResourceVersion bool

memoryVersion schema.GroupVersion
convertor runtime.ObjectConvertor
Expand Down Expand Up @@ -72,6 +73,9 @@ func newResourceSynchro(cluster string, lw cache.ListerWatcher, rvcache *informe
}
close(synchro.stoped)

// TODO(iceber): add feature gate
synchro.syncWithLastResourceVersion = true

status := clustersv1alpha1.ClusterResourceSyncCondition{
Status: clustersv1alpha1.SyncStatusPending,
LastTransitionTime: metav1.Now().Rfc3339Copy(),
Expand Down Expand Up @@ -141,7 +145,10 @@ func (synchro *ResourceSynchro) Run(stopCh <-chan struct{}) {
synchro.cache,
&unstructured.Unstructured{},
synchro,
).Run(informerStopCh)
).Run(synchro.syncWithLastResourceVersion, informerStopCh)

// next run informer with last resource version
synchro.syncWithLastResourceVersion = true

status = clustersv1alpha1.ClusterResourceSyncCondition{
Status: clustersv1alpha1.SyncStatusStop,
Expand Down

0 comments on commit a5acce8

Please sign in to comment.