fix(tiller): remove locking system from storage and rely on backend controls

Tiller currently hangs indefinitely when deadlocks arise from certain
concurrent operations. This commit removes the nested mutex locking
system from pkg/Storage and relies on resource contention controls in k8s.

Closes #2560
pull/2701/head
Justin Scott 8 years ago
parent 3cf8f2c8b3
commit fa68a6e1db

@ -173,7 +173,7 @@ func (cfgmaps *ConfigMaps) Create(key string, rls *rspb.Release) error {
// push the configmap object out into the kubiverse // push the configmap object out into the kubiverse
if _, err := cfgmaps.impl.Create(obj); err != nil { if _, err := cfgmaps.impl.Create(obj); err != nil {
if apierrors.IsAlreadyExists(err) { if apierrors.IsAlreadyExists(err) {
return ErrReleaseExists(rls.Name) return ErrReleaseExists(key)
} }
cfgmaps.Log("create: failed to create: %s", err) cfgmaps.Log("create: failed to create: %s", err)

@ -18,7 +18,6 @@ package storage // import "k8s.io/helm/pkg/storage"
import ( import (
"fmt" "fmt"
"sync"
rspb "k8s.io/helm/pkg/proto/hapi/release" rspb "k8s.io/helm/pkg/proto/hapi/release"
relutil "k8s.io/helm/pkg/releaseutil" relutil "k8s.io/helm/pkg/releaseutil"
@ -28,12 +27,6 @@ import (
// Storage represents a storage engine for a Release. // Storage represents a storage engine for a Release.
type Storage struct { type Storage struct {
driver.Driver driver.Driver
// releaseLocks are for locking releases to make sure that only one operation at a time is executed on each release
releaseLocks map[string]*sync.Mutex
// releaseLocksLock is a mutex for accessing releaseLocks
releaseLocksLock *sync.Mutex
Log func(string, ...interface{}) Log func(string, ...interface{})
} }
@ -157,53 +150,6 @@ func (s *Storage) Last(name string) (*rspb.Release, error) {
return h[0], nil return h[0], nil
} }
// LockRelease gains a mutually exclusive access to a release via a mutex.
func (s *Storage) LockRelease(name string) error {
s.Log("locking release %s", name)
s.releaseLocksLock.Lock()
defer s.releaseLocksLock.Unlock()
var lock *sync.Mutex
lock, exists := s.releaseLocks[name]
if !exists {
releases, err := s.ListReleases()
if err != nil {
return err
}
found := false
for _, release := range releases {
if release.Name == name {
found = true
}
}
if !found {
return fmt.Errorf("Unable to lock release %q: release not found", name)
}
lock = &sync.Mutex{}
s.releaseLocks[name] = lock
}
lock.Lock()
return nil
}
// UnlockRelease releases a mutually exclusive access to a release.
// If release doesn't exist or wasn't previously locked - the unlock will pass
func (s *Storage) UnlockRelease(name string) {
s.Log("unlocking release %s", name)
s.releaseLocksLock.Lock()
defer s.releaseLocksLock.Unlock()
var lock *sync.Mutex
lock, exists := s.releaseLocks[name]
if !exists {
return
}
lock.Unlock()
}
// makeKey concatenates a release name and version into // makeKey concatenates a release name and version into
// a string with format ```<release_name>#v<version>```. // a string with format ```<release_name>#v<version>```.
// This key is used to uniquely identify storage objects. // This key is used to uniquely identify storage objects.
@ -220,8 +166,6 @@ func Init(d driver.Driver) *Storage {
} }
return &Storage{ return &Storage{
Driver: d, Driver: d,
releaseLocks: make(map[string]*sync.Mutex),
releaseLocksLock: &sync.Mutex{},
Log: func(_ string, _ ...interface{}) {}, Log: func(_ string, _ ...interface{}) {},
} }
} }

@ -272,31 +272,3 @@ func assertErrNil(eh func(args ...interface{}), err error, message string) {
eh(fmt.Sprintf("%s: %q", message, err)) eh(fmt.Sprintf("%s: %q", message, err))
} }
} }
func TestReleaseLocksNotExist(t *testing.T) {
s := Init(driver.NewMemory())
err := s.LockRelease("no-such-release")
if err == nil {
t.Errorf("Exptected error when trying to lock non-existing release, got nil")
}
}
func TestReleaseLocks(t *testing.T) {
s := Init(driver.NewMemory())
releaseName := "angry-beaver"
rls := ReleaseTestData{
Name: releaseName,
Version: 1,
}.ToRelease()
s.Create(rls)
err := s.LockRelease(releaseName)
if err != nil {
t.Errorf("Exptected nil err when locking existing release")
}
s.UnlockRelease(releaseName)
}

@ -29,12 +29,6 @@ import (
// RollbackRelease rolls back to a previous version of the given release. // RollbackRelease rolls back to a previous version of the given release.
func (s *ReleaseServer) RollbackRelease(c ctx.Context, req *services.RollbackReleaseRequest) (*services.RollbackReleaseResponse, error) { func (s *ReleaseServer) RollbackRelease(c ctx.Context, req *services.RollbackReleaseRequest) (*services.RollbackReleaseResponse, error) {
err := s.env.Releases.LockRelease(req.Name)
if err != nil {
return nil, err
}
defer s.env.Releases.UnlockRelease(req.Name)
s.Log("preparing rollback of %s", req.Name) s.Log("preparing rollback of %s", req.Name)
currentRelease, targetRelease, err := s.prepareRollback(req) currentRelease, targetRelease, err := s.prepareRollback(req)
if err != nil { if err != nil {

@ -36,12 +36,6 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR
return nil, err return nil, err
} }
err := s.env.Releases.LockRelease(req.Name)
if err != nil {
return nil, err
}
defer s.env.Releases.UnlockRelease(req.Name)
rels, err := s.env.Releases.History(req.Name) rels, err := s.env.Releases.History(req.Name)
if err != nil { if err != nil {
s.Log("uninstall: Release not loaded: %s", req.Name) s.Log("uninstall: Release not loaded: %s", req.Name)

@ -34,13 +34,6 @@ func (s *ReleaseServer) UpdateRelease(c ctx.Context, req *services.UpdateRelease
s.Log("updateRelease: Release name is invalid: %s", req.Name) s.Log("updateRelease: Release name is invalid: %s", req.Name)
return nil, err return nil, err
} }
err := s.env.Releases.LockRelease(req.Name)
if err != nil {
return nil, err
}
defer s.env.Releases.UnlockRelease(req.Name)
s.Log("preparing update for %s", req.Name) s.Log("preparing update for %s", req.Name)
currentRelease, updatedRelease, err := s.prepareUpdate(req) currentRelease, updatedRelease, err := s.prepareUpdate(req)
if err != nil { if err != nil {

Loading…
Cancel
Save