|
|
|
@ -62,12 +62,13 @@ type ThreadSafeStore interface {
|
|
|
|
|
// threadSafeMap implements ThreadSafeStore
|
|
|
|
|
type threadSafeMap struct {
|
|
|
|
|
lock sync.RWMutex
|
|
|
|
|
items map[string]interface{}
|
|
|
|
|
items map[string]interface{} // 真正存储数据的地方,加入,删除,更新都是操作这里的数据
|
|
|
|
|
|
|
|
|
|
// indexers maps a name to an IndexFunc
|
|
|
|
|
indexers Indexers
|
|
|
|
|
indexers Indexers // 存储的是索引的字段到获取索引的方法。如查询Pods的时候,按照namespace来查询,namespace就是这和索引字段
|
|
|
|
|
|
|
|
|
|
// indices maps a name to an Index
|
|
|
|
|
indices Indices
|
|
|
|
|
indices Indices // 存储的是索引的字段到索引对象的方法。索引对象是从索引的具体值到存储数据的key值的映射
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *threadSafeMap) Add(key string, obj interface{}) {
|
|
|
|
@ -79,6 +80,7 @@ func (c *threadSafeMap) Update(key string, obj interface{}) {
|
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
oldObject := c.items[key]
|
|
|
|
|
c.items[key] = obj
|
|
|
|
|
// 更新数据后,造成了索引的失效,所以需要更新
|
|
|
|
|
c.updateIndices(oldObject, obj, key)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -86,7 +88,7 @@ func (c *threadSafeMap) Delete(key string) {
|
|
|
|
|
c.lock.Lock()
|
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
if obj, exists := c.items[key]; exists {
|
|
|
|
|
c.updateIndices(obj, nil, key)
|
|
|
|
|
c.updateIndices(obj, nil, key) // 删除造成索引变更,所以要更新索引
|
|
|
|
|
delete(c.items, key)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -120,11 +122,13 @@ func (c *threadSafeMap) ListKeys() []string {
|
|
|
|
|
return list
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 用新的数据,整个替换掉老的数据
|
|
|
|
|
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
|
|
|
|
|
c.lock.Lock()
|
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
c.items = items
|
|
|
|
|
|
|
|
|
|
// 重建索引
|
|
|
|
|
// rebuild any index
|
|
|
|
|
c.indices = Indices{}
|
|
|
|
|
for key, item := range c.items {
|
|
|
|
@ -132,12 +136,16 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 通过索引来查询数据
|
|
|
|
|
// Index returns a list of items that match the given object on the index function.
|
|
|
|
|
// Index is thread-safe so long as you treat all items as immutable.
|
|
|
|
|
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
|
|
|
|
|
c.lock.RLock()
|
|
|
|
|
defer c.lock.RUnlock()
|
|
|
|
|
|
|
|
|
|
// key 指的是找到“索引”的key
|
|
|
|
|
// “索引”是存储 key 到 items中key的映射
|
|
|
|
|
// 通过索引名称,获取索引的值(就是key)
|
|
|
|
|
indexFunc := c.indexers[indexName]
|
|
|
|
|
if indexFunc == nil {
|
|
|
|
|
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
|
|
|
|
@ -147,8 +155,11 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{},
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 通过索引名称,获取索引(key-》对象的key)
|
|
|
|
|
index := c.indices[indexName]
|
|
|
|
|
|
|
|
|
|
// 取得要查询的对象在 items 中的 key值的集合
|
|
|
|
|
var storeKeySet sets.String
|
|
|
|
|
if len(indexedValues) == 1 {
|
|
|
|
|
// In majority of cases, there is exactly one value matching.
|
|
|
|
@ -165,6 +176,7 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 用过 key 的集合,取得数据
|
|
|
|
|
list := make([]interface{}, 0, storeKeySet.Len())
|
|
|
|
|
for storeKey := range storeKeySet {
|
|
|
|
|
list = append(list, c.items[storeKey])
|
|
|
|
@ -172,6 +184,7 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{},
|
|
|
|
|
return list, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 和Index作用相同,只是查询的参数不是 object,而是使用索引的值,这个值能在indeices中获取正确的索引对象
|
|
|
|
|
// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
|
|
|
|
|
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
|
|
|
|
|
c.lock.RLock()
|
|
|
|
@ -193,6 +206,7 @@ func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{},
|
|
|
|
|
return list, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 仅仅返索引中的key,并没有去items中查询真正的数据
|
|
|
|
|
// IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value.
|
|
|
|
|
// IndexKeys is thread-safe so long as you treat all items as immutable.
|
|
|
|
|
func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
|
|
|
|
@ -247,6 +261,10 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 更新方法包含:
|
|
|
|
|
// 新建: 只有 newObj 值,oldObj 值为 nil
|
|
|
|
|
// 更新: newObj,oldObj 都有值
|
|
|
|
|
// 删除: 只有 oldObj,newObj 值为 nil
|
|
|
|
|
// updateIndices modifies the objects location in the managed indexes:
|
|
|
|
|
// - for create you must provide only the newObj
|
|
|
|
|
// - for update you must provide both the oldObj and the newObj
|
|
|
|
@ -259,7 +277,7 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke
|
|
|
|
|
if oldObj != nil {
|
|
|
|
|
oldIndexValues, err = indexFunc(oldObj)
|
|
|
|
|
} else {
|
|
|
|
|
oldIndexValues = oldIndexValues[:0]
|
|
|
|
|
oldIndexValues = oldIndexValues[:0] // 相当于清空
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
|
|
|
|
@ -282,6 +300,7 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke
|
|
|
|
|
|
|
|
|
|
if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
|
|
|
|
|
// We optimize for the most common case where indexFunc returns a single value which has not been changed
|
|
|
|
|
// 新老对象完全相同,就不操作
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|