From ed9934adfaa8d6af770058004cb14e3682fd52a8 Mon Sep 17 00:00:00 2001 From: Patrick Decat Date: Tue, 7 May 2019 01:24:17 +0200 Subject: [PATCH 1/7] Add test cases to reproduce issues with concurrent `helm repo add` commands Signed-off-by: Patrick Decat --- cmd/helm/repo_add_test.go | 113 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/cmd/helm/repo_add_test.go b/cmd/helm/repo_add_test.go index 5a458cef7..0b7650fb2 100644 --- a/cmd/helm/repo_add_test.go +++ b/cmd/helm/repo_add_test.go @@ -17,13 +17,18 @@ limitations under the License. package main import ( + "fmt" "io" "os" + "os/exec" + "strings" + "sync" "testing" "github.com/spf13/cobra" "k8s.io/helm/pkg/helm" + "k8s.io/helm/pkg/helm/helmpath" "k8s.io/helm/pkg/repo" "k8s.io/helm/pkg/repo/repotest" ) @@ -101,3 +106,111 @@ func TestRepoAdd(t *testing.T) { t.Errorf("Duplicate repository name was added") } } +func TestRepoAddConcurrentGoRoutines(t *testing.T) { + ts, thome, err := repotest.NewTempServer("testdata/testserver/*.*") + if err != nil { + t.Fatal(err) + } + + cleanup := resetEnv() + defer func() { + ts.Stop() + os.RemoveAll(thome.String()) + cleanup() + }() + + settings.Home = thome + if err := ensureTestHome(settings.Home, t); err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + wg.Add(3) + for i := 0; i < 3; i++ { + go func(name string) { + defer wg.Done() + if err := addRepository(name, ts.URL(), "", "", settings.Home, "", "", "", true); err != nil { + t.Error(err) + } + }(fmt.Sprintf("%s-%d", testName, i)) + } + wg.Wait() + + f, err := repo.LoadRepositoriesFile(settings.Home.RepositoryFile()) + if err != nil { + t.Error(err) + } + + var name string + for i := 0; i < 3; i++ { + name = fmt.Sprintf("%s-%d", testName, i) + if !f.Has(name) { + t.Errorf("%s was not successfully inserted into %s", name, settings.Home.RepositoryFile()) + } + } +} + +// Same as TestRepoAddConcurrentGoRoutines but with repository additions in sub-processes +func TestRepoAddConcurrentSubProcesses(t *testing.T) { + goWantHelperProcess := os.Getenv("GO_WANT_HELPER_PROCESS") + if goWantHelperProcess == "" { + // parent + + ts, thome, err := repotest.NewTempServer("testdata/testserver/*.*") + if err != nil { + t.Fatal(err) + } + + settings.Home = thome + + cleanup := resetEnv() + defer func() { + ts.Stop() + os.RemoveAll(thome.String()) + cleanup() + }() + if err := ensureTestHome(settings.Home, t); err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + wg.Add(3) + for i := 0; i < 3; i++ { + go func(name string) { + defer wg.Done() + + cmd := exec.Command(os.Args[0], "-test.run=^TestRepoAddConcurrentSubProcesses$") + cmd.Env = append(os.Environ(), fmt.Sprintf("GO_WANT_HELPER_PROCESS=%s,%s", name, ts.URL()), fmt.Sprintf("HELM_HOME=%s", settings.Home)) + out, err := cmd.CombinedOutput() + if len(out) > 0 || err != nil { + t.Fatalf("child process: %q, %v", out, err) + } + }(fmt.Sprintf("%s-%d", testName, i)) + } + wg.Wait() + + f, err := repo.LoadRepositoriesFile(settings.Home.RepositoryFile()) + if err != nil { + t.Error(err) + } + + var name string + for i := 0; i < 3; i++ { + name = fmt.Sprintf("%s-%d", testName, i) + if !f.Has(name) { + t.Errorf("%s was not successfully inserted into %s", name, settings.Home.RepositoryFile()) + } + } + } else { + // child + s := strings.Split(goWantHelperProcess, ",") + settings.Home = helmpath.Home(os.Getenv("HELM_HOME")) + repoName := s[0] + tsURL := s[1] + if err := addRepository(repoName, tsURL, "", "", settings.Home, "", "", "", true); err != nil { + t.Fatal(err) + } + + os.Exit(0) + } +} From e07dfcbc00f910bcf0ee0357b1af8c3ab7cd39c9 Mon Sep 17 00:00:00 2001 From: Patrick Decat Date: Tue, 7 May 2019 11:32:23 +0200 Subject: [PATCH 2/7] Lock the repository file for concurrent processes synchronization and re-read its content before updating it Signed-off-by: Patrick Decat --- cmd/helm/repo_add.go | 27 ++++++++++++++++++++++++++- cmd/helm/repo_add_test.go | 1 + 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/cmd/helm/repo_add.go b/cmd/helm/repo_add.go index bfb3f0174..b93222f4a 100644 --- a/cmd/helm/repo_add.go +++ b/cmd/helm/repo_add.go @@ -22,11 +22,12 @@ import ( "github.com/spf13/cobra" + "syscall" + "golang.org/x/crypto/ssh/terminal" "k8s.io/helm/pkg/getter" "k8s.io/helm/pkg/helm/helmpath" "k8s.io/helm/pkg/repo" - "syscall" ) type repoAddCmd struct { @@ -131,6 +132,30 @@ func addRepository(name, url, username, password string, home helmpath.Home, cer return fmt.Errorf("Looks like %q is not a valid chart repository or cannot be reached: %s", url, err.Error()) } + // Lock the repository file for concurrent processes synchronization and re-read its content before updating it + fd, err := syscall.Open(home.RepositoryFile(), syscall.O_CREAT|syscall.O_RDWR|syscall.O_CLOEXEC, 0) + if err != nil { + return err + } + defer syscall.Close(fd) + + flock := syscall.Flock_t{ + Type: syscall.F_WRLCK, + Start: 0, + Len: 0, + Whence: io.SeekStart, + } + + syscall.FcntlFlock(uintptr(fd), syscall.F_SETLK, &flock) + if err != nil { + return err + } + + f, err = repo.LoadRepositoriesFile(home.RepositoryFile()) + if err != nil { + return err + } + f.Update(&c) return f.WriteFile(home.RepositoryFile(), 0644) diff --git a/cmd/helm/repo_add_test.go b/cmd/helm/repo_add_test.go index 0b7650fb2..e8ad35086 100644 --- a/cmd/helm/repo_add_test.go +++ b/cmd/helm/repo_add_test.go @@ -128,6 +128,7 @@ func TestRepoAddConcurrentGoRoutines(t *testing.T) { wg.Add(3) for i := 0; i < 3; i++ { go func(name string) { + // TODO: launch repository additions in sub-processes as file locks are bound to processes, not file descriptors defer wg.Done() if err := addRepository(name, ts.URL(), "", "", settings.Home, "", "", "", true); err != nil { t.Error(err) From 7a470252a934020ca9720ba35dcc901a2a6359c6 Mon Sep 17 00:00:00 2001 From: Patrick Decat Date: Tue, 7 May 2019 12:19:05 +0200 Subject: [PATCH 3/7] Use github.com/gofrs/flock for cross platform file locking Signed-off-by: Patrick Decat --- cmd/helm/repo_add.go | 28 ++++++++++++---------------- glide.lock | 2 ++ glide.yaml | 2 ++ 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/cmd/helm/repo_add.go b/cmd/helm/repo_add.go index b93222f4a..ef9ddf223 100644 --- a/cmd/helm/repo_add.go +++ b/cmd/helm/repo_add.go @@ -17,14 +17,17 @@ limitations under the License. package main import ( + "context" "fmt" "io" - - "github.com/spf13/cobra" - "syscall" + "time" "golang.org/x/crypto/ssh/terminal" + + "github.com/gofrs/flock" + "github.com/spf13/cobra" + "k8s.io/helm/pkg/getter" "k8s.io/helm/pkg/helm/helmpath" "k8s.io/helm/pkg/repo" @@ -133,20 +136,13 @@ func addRepository(name, url, username, password string, home helmpath.Home, cer } // Lock the repository file for concurrent processes synchronization and re-read its content before updating it - fd, err := syscall.Open(home.RepositoryFile(), syscall.O_CREAT|syscall.O_RDWR|syscall.O_CLOEXEC, 0) - if err != nil { - return err + fileLock := flock.New(home.RepositoryFile()) + lockCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + locked, err := fileLock.TryLockContext(lockCtx, time.Second) + if err == nil && locked { + defer fileLock.Unlock() } - defer syscall.Close(fd) - - flock := syscall.Flock_t{ - Type: syscall.F_WRLCK, - Start: 0, - Len: 0, - Whence: io.SeekStart, - } - - syscall.FcntlFlock(uintptr(fd), syscall.F_SETLK, &flock) if err != nil { return err } diff --git a/glide.lock b/glide.lock index 4f031a502..f05bd18f1 100644 --- a/glide.lock +++ b/glide.lock @@ -114,6 +114,8 @@ imports: - syntax/lexer - util/runes - util/strings +- name: github.com/gofrs/flock + version: 392e7fae8f1b0bdbd67dad7237d23f618feb6dbb - name: github.com/gogo/protobuf version: 342cbe0a04158f6dcb03ca0079991a51a4248c02 subpackages: diff --git a/glide.yaml b/glide.yaml index c9ac54b98..fd6613b71 100644 --- a/glide.yaml +++ b/glide.yaml @@ -67,6 +67,8 @@ import: - package: github.com/jmoiron/sqlx version: ^1.2.0 - package: github.com/rubenv/sql-migrate + - package: github.com/gofrs/flock + version: v0.7.1 testImports: - package: github.com/stretchr/testify From 848d54f451fba67596adc138e1b57daca4744e7d Mon Sep 17 00:00:00 2001 From: Patrick Decat Date: Tue, 7 May 2019 17:14:42 +0200 Subject: [PATCH 4/7] Remove obsolete comment from early file locking implementation Signed-off-by: Patrick Decat --- cmd/helm/repo_add_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/helm/repo_add_test.go b/cmd/helm/repo_add_test.go index e8ad35086..0b7650fb2 100644 --- a/cmd/helm/repo_add_test.go +++ b/cmd/helm/repo_add_test.go @@ -128,7 +128,6 @@ func TestRepoAddConcurrentGoRoutines(t *testing.T) { wg.Add(3) for i := 0; i < 3; i++ { go func(name string) { - // TODO: launch repository additions in sub-processes as file locks are bound to processes, not file descriptors defer wg.Done() if err := addRepository(name, ts.URL(), "", "", settings.Home, "", "", "", true); err != nil { t.Error(err) From 3f3b5b42f80538f2814e6a00016d0a968c76c860 Mon Sep 17 00:00:00 2001 From: Patrick Decat Date: Tue, 7 May 2019 18:01:44 +0200 Subject: [PATCH 5/7] Lower repository file rewriting timeout from 5 minutes to 30 seconds Signed-off-by: Patrick Decat --- cmd/helm/repo_add.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/helm/repo_add.go b/cmd/helm/repo_add.go index ef9ddf223..0257367d6 100644 --- a/cmd/helm/repo_add.go +++ b/cmd/helm/repo_add.go @@ -137,7 +137,7 @@ func addRepository(name, url, username, password string, home helmpath.Home, cer // Lock the repository file for concurrent processes synchronization and re-read its content before updating it fileLock := flock.New(home.RepositoryFile()) - lockCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + lockCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() locked, err := fileLock.TryLockContext(lockCtx, time.Second) if err == nil && locked { From 6b800509d0a4166c83192789ea4b679fba76387a Mon Sep 17 00:00:00 2001 From: Patrick Decat Date: Tue, 7 May 2019 18:05:18 +0200 Subject: [PATCH 6/7] Add comment to explain why the repositories file is read a second time Signed-off-by: Patrick Decat --- cmd/helm/repo_add.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/helm/repo_add.go b/cmd/helm/repo_add.go index 0257367d6..ebb27e684 100644 --- a/cmd/helm/repo_add.go +++ b/cmd/helm/repo_add.go @@ -135,7 +135,7 @@ func addRepository(name, url, username, password string, home helmpath.Home, cer return fmt.Errorf("Looks like %q is not a valid chart repository or cannot be reached: %s", url, err.Error()) } - // Lock the repository file for concurrent processes synchronization and re-read its content before updating it + // Lock the repository file for concurrent goroutines or processes synchronization fileLock := flock.New(home.RepositoryFile()) lockCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -147,6 +147,8 @@ func addRepository(name, url, username, password string, home helmpath.Home, cer return err } + // Re-read the repositories file before updating it as its content may have been changed + // by a concurrent execution after the first read and before being locked f, err = repo.LoadRepositoriesFile(home.RepositoryFile()) if err != nil { return err From 79a190a6d0a0014a24213a84e281efef6c786705 Mon Sep 17 00:00:00 2001 From: Patrick Decat Date: Thu, 9 May 2019 09:35:54 +0200 Subject: [PATCH 7/7] Decrease number of concurrent subprocesses for helm repo add test case to fit within CircleCI's pipeline memory limit Signed-off-by: Patrick Decat --- cmd/helm/repo_add_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/helm/repo_add_test.go b/cmd/helm/repo_add_test.go index 0b7650fb2..7443a476a 100644 --- a/cmd/helm/repo_add_test.go +++ b/cmd/helm/repo_add_test.go @@ -174,8 +174,8 @@ func TestRepoAddConcurrentSubProcesses(t *testing.T) { } var wg sync.WaitGroup - wg.Add(3) - for i := 0; i < 3; i++ { + wg.Add(2) + for i := 0; i < 2; i++ { go func(name string) { defer wg.Done() @@ -195,7 +195,7 @@ func TestRepoAddConcurrentSubProcesses(t *testing.T) { } var name string - for i := 0; i < 3; i++ { + for i := 0; i < 2; i++ { name = fmt.Sprintf("%s-%d", testName, i) if !f.Has(name) { t.Errorf("%s was not successfully inserted into %s", name, settings.Home.RepositoryFile())