master
dongming 2 years ago
parent d062d19940
commit f34bc6bc13

@ -244,7 +244,7 @@ var internalPackages = []string{"client-go/tools/cache/"}
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {// 调用了 list & watch
if err := r.ListAndWatch(stopCh); err != nil {// 调用了 list & watch
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
@ -280,14 +280,17 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
// 执行list操作
err := r.list(stopCh)
if err != nil {
return err
}
// 执行resync操作
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
// 启动一个协程来运行 resync
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
@ -313,6 +316,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}
}()
// 执行 watch 操作
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
@ -380,6 +384,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// the resource version can be used for further progress notification (aka. watch).
func (r *Reflector) list(stopCh <-chan struct{}) error {
var resourceVersion string
// 创建 option主要是确定 resourceVersion
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
@ -389,25 +395,34 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
// 启动 goroutine 来执行 list 并接收
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// pager 是一个拥有执行了外部list方法结果的对象
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
pager := pager.New(
pager.SimplePageFunc(
func(opts metav1.ListOptions) (runtime.Object, error) {
// 这里执行了外面传进来的对象的 list 方法
return r.listerWatcher.List(opts)
}))
switch {
case r.WatchListPageSize != 0:
// 设置了分页值,就会让这个值在执行分页的时候生效
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
// 如果已经使用用分页的结果,就不做任何操作
// We got a paginated result initially. Assume this resource and server honor
// paging requests (i.e. watch cache is probably disabled) and leave the default
// pager size set.
case options.ResourceVersion != "" && options.ResourceVersion != "0":
// 如果满足这里,说明获取到数据,并且没有分页信息
// 如果 PageSize 为 0 后面会从 cache 中获取数据
// User didn't explicitly request pagination.
//
// With ResourceVersion != "", we have a possibility to list from watch cache,
@ -436,6 +451,8 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
}
close(listCh)
}()
// 控制 goroutine 的运行
select {
case <-stopCh:
return nil
@ -463,12 +480,15 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
r.paginatedResult = true
}
// 表示list执行成功
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
listMetaInterface, err := meta.ListAccessor(list)
// 处理resourceVersion
listMetaInterface, err := meta.ListAccessor(list) // 创建提取器
if err != nil {
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
resourceVersion = listMetaInterface.GetResourceVersion() // 获取当前数据的 resourceVersion。相当于备份 resourceversion
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list)
if err != nil {
@ -479,7 +499,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
return fmt.Errorf("unable to sync list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
r.setLastSyncResourceVersion(resourceVersion) // 还原 resourceVersion
initTrace.Step("Resource version updated")
return nil
}
@ -598,6 +618,7 @@ func (r *Reflector) setLastSyncResourceVersion(v string) {
r.lastSyncResourceVersion = v
}
// 确认 resourceVersion 的方法
// relistResourceVersion determines the resource version the reflector should list or relist from.
// Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
// versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
@ -608,16 +629,20 @@ func (r *Reflector) relistResourceVersion() string {
defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionUnavailable {
// 如果最近一次的sync的 resourceVersion 不可用,我们就返回一个空的 resourceVersion
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
// to the latest available ResourceVersion, using a consistent read from etcd.
return ""
}
if r.lastSyncResourceVersion == "" {
// 如果最近一次的sync的 resourceVersion 为字符串 空 ,则返回字符串 “0”
// For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
// be served from the watch cache if it is enabled.
return "0"
}
// 都不满足直接返回最后一次同步的 resourceVersion
return r.lastSyncResourceVersion
}

@ -74,12 +74,13 @@ func New(fn ListPageFunc) *ListPager {
// server to reduce the impact on the server. If the chunk attempt fails, it will load
// the full list instead. The Limit field on options, if unset, will default to the page size.
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
// 设置 page size
if options.Limit == 0 {
options.Limit = p.PageSize
}
requestedResourceVersion := options.ResourceVersion
requestedResourceVersionMatch := options.ResourceVersionMatch
var list *metainternalversion.List
var list *metainternalversion.List // 用来存储获取的结果
paginatedResult := false
for {
@ -89,8 +90,10 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
default:
}
// 真正执行外部传入的含有list对象的 list 方法
obj, err := p.PageFn(ctx, options)
if err != nil {
// 错误处理
// Only fallback to full list if an "Expired" errors is returned, FullListIfExpired is true, and
// the "Expired" error occurred in page 2 or later (since full list is intended to prevent a pager.List from
// failing when the resource versions is established by the first page request falls out of the compaction
@ -104,14 +107,18 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
options.Continue = ""
options.ResourceVersion = requestedResourceVersion
options.ResourceVersionMatch = requestedResourceVersionMatch
result, err := p.PageFn(ctx, options)
result, err := p.PageFn(ctx, options) // 失败了,最后还会执行一次
// 执行后,不管成功与否,都直接返回
return result, paginatedResult, err
}
// 创建一个提取器
// 根据获取的对象类型,来生成提取器
m, err := meta.ListAccessor(obj)
if err != nil {
return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err)
}
// 判断结果状态,如果没有更多或者已经为空,则退出
// exit early and return the object we got if we haven't processed any pages
if len(m.GetContinue()) == 0 && list == nil {
return obj, paginatedResult, nil
@ -119,10 +126,14 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
// initialize the list and fill its contents
if list == nil {
// 进到这里,表示是第一次执行
// 使用创建的提取器初始化 list 对象
list = &metainternalversion.List{Items: make([]runtime.Object, 0, options.Limit+1)}
list.ResourceVersion = m.GetResourceVersion()
list.SelfLink = m.GetSelfLink()
}
// 处理每一个获取的对象,无论是一个列表,还是封装在一个列表对象中的列表
if err := meta.EachListItem(obj, func(obj runtime.Object) error {
list.Items = append(list.Items, obj)
return nil
@ -130,18 +141,24 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
return nil, paginatedResult, err
}
// 如果获取完成,就返回
// if we have no more items, return the list
if len(m.GetContinue()) == 0 {
return list, paginatedResult, nil
}
// 开始下一次循环的初始化设置
// set the next loop up
options.Continue = m.GetContinue()
options.Continue = m.GetContinue() // 获取了象一个对象
// 一些变量的清理工作
// Clear the ResourceVersion(Match) on the subsequent List calls to avoid the
// `specifying resource version is not allowed when using continue` error.
// See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
options.ResourceVersion = ""
options.ResourceVersionMatch = ""
// 如果走到这里,自然的,分页就产生了。
// At this point, result is already paginated.
paginatedResult = true
}

Loading…
Cancel
Save