From 76e5c9cfc1d663724ec1003b110f08836ebd505a Mon Sep 17 00:00:00 2001 From: dongming Date: Tue, 13 Dec 2022 22:14:31 +0800 Subject: [PATCH] l-23 --- vendor/k8s.io/client-go/tools/cache/delta_fifo.go | 9 +++++++++ vendor/k8s.io/client-go/tools/cache/reflector.go | 7 +++++++ 2 files changed, 16 insertions(+) diff --git a/vendor/k8s.io/client-go/tools/cache/delta_fifo.go b/vendor/k8s.io/client-go/tools/cache/delta_fifo.go index 0c13a41..78b28ad 100644 --- a/vendor/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/vendor/k8s.io/client-go/tools/cache/delta_fifo.go @@ -404,6 +404,7 @@ func isDeletionDup(a, b *Delta) *Delta { return b } +// 将新元素加入队列的方法,并且调用前要加锁。附带作用,填充了items // queueActionLocked appends to the delta list for the object. // Caller must lock first. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { @@ -573,10 +574,14 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { // are those listed by `f.knownObjects` and the current object of K is // what `f.knownObjects.GetByKey(K)` returns. func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { + // 调用这个函数的时候时线程安全的 f.lock.Lock() defer f.lock.Unlock() + + // 注意keys的类型是一个集合 keys := make(sets.String, len(list)) + // 是为了兼容考虑,并没有更多的意义 // keep backwards compat for old clients action := Sync if f.emitDeltaTypeReplaced { @@ -589,12 +594,16 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { if err != nil { return KeyError{item, err} } + keys.Insert(key) + if err := f.queueActionLocked(action, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } + + // 处理没有提供已知对象的方法集的情况 if f.knownObjects == nil { // Do deletion detection against our own list. queuedDeletions := 0 diff --git a/vendor/k8s.io/client-go/tools/cache/reflector.go b/vendor/k8s.io/client-go/tools/cache/reflector.go index 4e4be02..8ecc1c3 100644 --- a/vendor/k8s.io/client-go/tools/cache/reflector.go +++ b/vendor/k8s.io/client-go/tools/cache/reflector.go @@ -244,6 +244,13 @@ var internalPackages = []string{"client-go/tools/cache/"} func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.BackoffUntil(func() { + // 在运行 List & Watch 工程中要提供下游处理的方法。提供这些方法的对象,就是delta pipo queue + // 他需要提供 + // Replace() + // Resync + // Add() + // Update() + // Delete() if err := r.ListAndWatch(stopCh); err != nil {// 调用了 list & watch, r.watchErrorHandler(r, err) }