From 351bb78ee5a22cb0e68818a310d77d7240edc0b1 Mon Sep 17 00:00:00 2001 From: Artem Vdovin Date: Thu, 12 Jun 2025 23:33:53 +0500 Subject: [PATCH 1/4] fix index concurrency Signed-off-by: Artem Vdovin --- pkg/repo/chartrepo.go | 36 ++++++++++++++++++++++++++++++++++-- pkg/repo/index.go | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/pkg/repo/chartrepo.go b/pkg/repo/chartrepo.go index e41226fa4..bbcab2ae8 100644 --- a/pkg/repo/chartrepo.go +++ b/pkg/repo/chartrepo.go @@ -17,6 +17,7 @@ limitations under the License. package repo // import "helm.sh/helm/v4/pkg/repo" import ( + "context" "crypto/rand" "encoding/base64" "encoding/json" @@ -27,6 +28,9 @@ import ( "os" "path/filepath" "strings" + "time" + + "github.com/gofrs/flock" "helm.sh/helm/v4/pkg/getter" "helm.sh/helm/v4/pkg/helmpath" @@ -106,13 +110,41 @@ func (r *ChartRepository) DownloadIndexFile() (string, error) { for name := range indexFile.Entries { fmt.Fprintln(&charts, name) } + chartsFile := filepath.Join(r.CachePath, helmpath.CacheChartsFile(r.Config.Name)) os.MkdirAll(filepath.Dir(chartsFile), 0755) - os.WriteFile(chartsFile, []byte(charts.String()), 0644) - // Create the index file in the cache directory fname := filepath.Join(r.CachePath, helmpath.CacheIndexFile(r.Config.Name)) os.MkdirAll(filepath.Dir(fname), 0755) + + // context for lock files + lockCtx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + // lock the charts file + chLock := flock.New(filepath.Join(r.CachePath, helmpath.CacheChartsFile(r.Config.Name)+".lock")) + chLocked, err := chLock.TryLockContext(lockCtx, 500*time.Millisecond) + if err == nil && chLocked { + defer chLock.Unlock() + } + if err != nil { + return "", err + } + + // write charts files after lock + os.WriteFile(chartsFile, []byte(charts.String()), 0644) + + // lock the index file + idxLock := flock.New(filepath.Join(r.CachePath, helmpath.CacheIndexFile(r.Config.Name)+".lock")) + idxLocked, err := idxLock.TryLockContext(lockCtx, 500*time.Millisecond) + if err == nil && idxLocked { + defer idxLock.Unlock() + } + if err != nil { + return "", err + } + + // Create the index file in the cache directory return fname, os.WriteFile(fname, index, 0644) } diff --git a/pkg/repo/index.go b/pkg/repo/index.go index c26d7581c..b93816d71 100644 --- a/pkg/repo/index.go +++ b/pkg/repo/index.go @@ -18,6 +18,7 @@ package repo import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -30,6 +31,7 @@ import ( "time" "github.com/Masterminds/semver/v3" + "github.com/gofrs/flock" "sigs.k8s.io/yaml" "helm.sh/helm/v4/internal/fileutil" @@ -103,6 +105,19 @@ func NewIndexFile() *IndexFile { // LoadIndexFile takes a file at the given path and returns an IndexFile object func LoadIndexFile(path string) (*IndexFile, error) { + lockCtx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + idxLock := flock.New(filepath.Join(path + ".lock")) + + idxLocked, err := idxLock.TryRLockContext(lockCtx, 500*time.Millisecond) + if err == nil && idxLocked { + defer idxLock.Unlock() + } + if err != nil { + return nil, err + } + b, err := os.ReadFile(path) if err != nil { return nil, err @@ -225,6 +240,19 @@ func (i IndexFile) Get(name, version string) (*ChartVersion, error) { // // The mode on the file is set to 'mode'. func (i IndexFile) WriteFile(dest string, mode os.FileMode) error { + lockCtx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + idxLock := flock.New(dest + ".lock") + idxLocked, err := idxLock.TryLockContext(lockCtx, 500*time.Millisecond) + + if err == nil && idxLocked { + defer idxLock.Unlock() + } + if err != nil { + return err + } + b, err := yaml.Marshal(i) if err != nil { return err @@ -237,6 +265,19 @@ func (i IndexFile) WriteFile(dest string, mode os.FileMode) error { // // The mode on the file is set to 'mode'. func (i IndexFile) WriteJSONFile(dest string, mode os.FileMode) error { + lockCtx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + idxLock := flock.New(dest + ".lock") + idxLocked, err := idxLock.TryLockContext(lockCtx, 500*time.Millisecond) + + if err == nil && idxLocked { + defer idxLock.Unlock() + } + if err != nil { + return err + } + b, err := json.MarshalIndent(i, "", " ") if err != nil { return err From 314bd19d1159834699d5ee58fe93d8df2d416e0c Mon Sep 17 00:00:00 2001 From: Artem Vdovin Date: Sun, 13 Jul 2025 22:48:39 +0500 Subject: [PATCH 2/4] update writing index files to writeAtomicFile Signed-off-by: Artem Vdovin --- pkg/repo/chartrepo.go | 41 ++++++----------------------------------- pkg/repo/index.go | 41 ----------------------------------------- 2 files changed, 6 insertions(+), 76 deletions(-) diff --git a/pkg/repo/chartrepo.go b/pkg/repo/chartrepo.go index bbcab2ae8..ed80a71c8 100644 --- a/pkg/repo/chartrepo.go +++ b/pkg/repo/chartrepo.go @@ -17,7 +17,7 @@ limitations under the License. package repo // import "helm.sh/helm/v4/pkg/repo" import ( - "context" + "bytes" "crypto/rand" "encoding/base64" "encoding/json" @@ -28,10 +28,8 @@ import ( "os" "path/filepath" "strings" - "time" - - "github.com/gofrs/flock" + "helm.sh/helm/v4/internal/fileutil" "helm.sh/helm/v4/pkg/getter" "helm.sh/helm/v4/pkg/helmpath" ) @@ -110,42 +108,15 @@ func (r *ChartRepository) DownloadIndexFile() (string, error) { for name := range indexFile.Entries { fmt.Fprintln(&charts, name) } - chartsFile := filepath.Join(r.CachePath, helmpath.CacheChartsFile(r.Config.Name)) os.MkdirAll(filepath.Dir(chartsFile), 0755) - fname := filepath.Join(r.CachePath, helmpath.CacheIndexFile(r.Config.Name)) - os.MkdirAll(filepath.Dir(fname), 0755) - - // context for lock files - lockCtx, cancel := context.WithTimeout(context.Background(), 90*time.Second) - defer cancel() - - // lock the charts file - chLock := flock.New(filepath.Join(r.CachePath, helmpath.CacheChartsFile(r.Config.Name)+".lock")) - chLocked, err := chLock.TryLockContext(lockCtx, 500*time.Millisecond) - if err == nil && chLocked { - defer chLock.Unlock() - } - if err != nil { - return "", err - } - - // write charts files after lock - os.WriteFile(chartsFile, []byte(charts.String()), 0644) - - // lock the index file - idxLock := flock.New(filepath.Join(r.CachePath, helmpath.CacheIndexFile(r.Config.Name)+".lock")) - idxLocked, err := idxLock.TryLockContext(lockCtx, 500*time.Millisecond) - if err == nil && idxLocked { - defer idxLock.Unlock() - } - if err != nil { - return "", err - } + fileutil.AtomicWriteFile(chartsFile, bytes.NewReader([]byte(charts.String())), 0644) // Create the index file in the cache directory - return fname, os.WriteFile(fname, index, 0644) + fname := filepath.Join(r.CachePath, helmpath.CacheIndexFile(r.Config.Name)) + os.MkdirAll(filepath.Dir(fname), 0755) + return fname, fileutil.AtomicWriteFile(fname, bytes.NewReader(index), 0644) } type findChartInRepoURLOptions struct { diff --git a/pkg/repo/index.go b/pkg/repo/index.go index b93816d71..c26d7581c 100644 --- a/pkg/repo/index.go +++ b/pkg/repo/index.go @@ -18,7 +18,6 @@ package repo import ( "bytes" - "context" "encoding/json" "errors" "fmt" @@ -31,7 +30,6 @@ import ( "time" "github.com/Masterminds/semver/v3" - "github.com/gofrs/flock" "sigs.k8s.io/yaml" "helm.sh/helm/v4/internal/fileutil" @@ -105,19 +103,6 @@ func NewIndexFile() *IndexFile { // LoadIndexFile takes a file at the given path and returns an IndexFile object func LoadIndexFile(path string) (*IndexFile, error) { - lockCtx, cancel := context.WithTimeout(context.Background(), 90*time.Second) - defer cancel() - - idxLock := flock.New(filepath.Join(path + ".lock")) - - idxLocked, err := idxLock.TryRLockContext(lockCtx, 500*time.Millisecond) - if err == nil && idxLocked { - defer idxLock.Unlock() - } - if err != nil { - return nil, err - } - b, err := os.ReadFile(path) if err != nil { return nil, err @@ -240,19 +225,6 @@ func (i IndexFile) Get(name, version string) (*ChartVersion, error) { // // The mode on the file is set to 'mode'. func (i IndexFile) WriteFile(dest string, mode os.FileMode) error { - lockCtx, cancel := context.WithTimeout(context.Background(), 90*time.Second) - defer cancel() - - idxLock := flock.New(dest + ".lock") - idxLocked, err := idxLock.TryLockContext(lockCtx, 500*time.Millisecond) - - if err == nil && idxLocked { - defer idxLock.Unlock() - } - if err != nil { - return err - } - b, err := yaml.Marshal(i) if err != nil { return err @@ -265,19 +237,6 @@ func (i IndexFile) WriteFile(dest string, mode os.FileMode) error { // // The mode on the file is set to 'mode'. func (i IndexFile) WriteJSONFile(dest string, mode os.FileMode) error { - lockCtx, cancel := context.WithTimeout(context.Background(), 90*time.Second) - defer cancel() - - idxLock := flock.New(dest + ".lock") - idxLocked, err := idxLock.TryLockContext(lockCtx, 500*time.Millisecond) - - if err == nil && idxLocked { - defer idxLock.Unlock() - } - if err != nil { - return err - } - b, err := json.MarshalIndent(i, "", " ") if err != nil { return err From 118d0eb697e52b90ad48e4e09529614be7ada4bc Mon Sep 17 00:00:00 2001 From: Artem Vdovin Date: Sun, 20 Jul 2025 11:42:20 +0500 Subject: [PATCH 3/4] add concurrency test on write & load index file Signed-off-by: Artem Vdovin --- pkg/repo/chartrepo_test.go | 57 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/pkg/repo/chartrepo_test.go b/pkg/repo/chartrepo_test.go index 05e034dd8..ecb8a592e 100644 --- a/pkg/repo/chartrepo_test.go +++ b/pkg/repo/chartrepo_test.go @@ -22,8 +22,10 @@ import ( "net/http" "net/http/httptest" "os" + "path/filepath" "runtime" "strings" + "sync" "testing" "time" @@ -31,6 +33,7 @@ import ( "helm.sh/helm/v4/pkg/cli" "helm.sh/helm/v4/pkg/getter" + "helm.sh/helm/v4/pkg/helmpath" ) type CustomGetter struct { @@ -91,6 +94,60 @@ func TestIndexCustomSchemeDownload(t *testing.T) { } } +func TestConcurrenyDownloadIndex(t *testing.T) { + srv, err := startLocalServerForTests(nil) + if err != nil { + t.Fatal(err) + } + defer srv.Close() + + repo, err := NewChartRepository(&Entry{ + Name: "nginx", + URL: srv.URL, + }, getter.All(&cli.EnvSettings{})) + + if err != nil { + t.Fatalf("Problem loading chart repository from %s: %v", srv.URL, err) + } + repo.CachePath = t.TempDir() + + // initial download index + idx, err := repo.DownloadIndexFile() + if err != nil { + t.Fatalf("Failed to download index file to %s: %v", idx, err) + } + + indexFName := filepath.Join(repo.CachePath, helmpath.CacheIndexFile(repo.Config.Name)) + + var wg sync.WaitGroup + + // Simultaneously start multiple goroutines that: + // 1) download index.yaml via DownloadIndexFile (write operation), + // 2) read index.yaml via LoadIndexFile (read operation). + // This checks for race conditions and ensures correct behavior under concurrent read/write access. + for range 150 { + wg.Add(1) + + go func() { + defer wg.Done() + idx, err := repo.DownloadIndexFile() + if err != nil { + t.Fatalf("Failed to download index file to %s: %v", idx, err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + _, err := LoadIndexFile(indexFName) + if err != nil { + t.Fatalf("Failed to load index file: %v", err) + } + }() + } + wg.Wait() +} + // startLocalServerForTests Start the local helm server func startLocalServerForTests(handler http.Handler) (*httptest.Server, error) { if handler == nil { From ae4af69b9dbd5ad01dc9621a90e5b08327a3499e Mon Sep 17 00:00:00 2001 From: Artem Vdovin Date: Sat, 13 Sep 2025 16:23:23 +0500 Subject: [PATCH 4/4] fix test Signed-off-by: Artem Vdovin --- pkg/repo/v1/chartrepo_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/repo/v1/chartrepo_test.go b/pkg/repo/v1/chartrepo_test.go index ecb8a592e..e82d72279 100644 --- a/pkg/repo/v1/chartrepo_test.go +++ b/pkg/repo/v1/chartrepo_test.go @@ -132,7 +132,7 @@ func TestConcurrenyDownloadIndex(t *testing.T) { defer wg.Done() idx, err := repo.DownloadIndexFile() if err != nil { - t.Fatalf("Failed to download index file to %s: %v", idx, err) + t.Errorf("Failed to download index file to %s: %v", idx, err) } }() @@ -141,7 +141,7 @@ func TestConcurrenyDownloadIndex(t *testing.T) { defer wg.Done() _, err := LoadIndexFile(indexFName) if err != nil { - t.Fatalf("Failed to load index file: %v", err) + t.Errorf("Failed to load index file: %v", err) } }() }