master
dongming 2 years ago
parent 6080042e26
commit 76e5c9cfc1

@ -404,6 +404,7 @@ func isDeletionDup(a, b *Delta) *Delta {
return b return b
} }
// 将新元素加入队列的方法并且调用前要加锁。附带作用填充了items
// queueActionLocked appends to the delta list for the object. // queueActionLocked appends to the delta list for the object.
// Caller must lock first. // Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
@ -573,10 +574,14 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
// are those listed by `f.knownObjects` and the current object of K is // are those listed by `f.knownObjects` and the current object of K is
// what `f.knownObjects.GetByKey(K)` returns. // what `f.knownObjects.GetByKey(K)` returns.
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
// 调用这个函数的时候时线程安全的
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
// 注意keys的类型是一个集合
keys := make(sets.String, len(list)) keys := make(sets.String, len(list))
// 是为了兼容考虑,并没有更多的意义
// keep backwards compat for old clients // keep backwards compat for old clients
action := Sync action := Sync
if f.emitDeltaTypeReplaced { if f.emitDeltaTypeReplaced {
@ -589,12 +594,16 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
if err != nil { if err != nil {
return KeyError{item, err} return KeyError{item, err}
} }
keys.Insert(key) keys.Insert(key)
if err := f.queueActionLocked(action, item); err != nil { if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err) return fmt.Errorf("couldn't enqueue object: %v", err)
} }
} }
// 处理没有提供已知对象的方法集的情况
if f.knownObjects == nil { if f.knownObjects == nil {
// Do deletion detection against our own list. // Do deletion detection against our own list.
queuedDeletions := 0 queuedDeletions := 0

@ -244,6 +244,13 @@ var internalPackages = []string{"client-go/tools/cache/"}
func (r *Reflector) Run(stopCh <-chan struct{}) { func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() { wait.BackoffUntil(func() {
// 在运行 List & Watch 工程中要提供下游处理的方法。提供这些方法的对象就是delta pipo queue
// 他需要提供
// Replace
// Resync
// Add
// Update()
// Delete()
if err := r.ListAndWatch(stopCh); err != nil {// 调用了 list & watch if err := r.ListAndWatch(stopCh); err != nil {// 调用了 list & watch
r.watchErrorHandler(r, err) r.watchErrorHandler(r, err)
} }

Loading…
Cancel
Save