From b852af54fc47b79542a78411525b2c268b01e7ac Mon Sep 17 00:00:00 2001 From: Terry Howe Date: Tue, 14 Oct 2025 07:37:14 -0600 Subject: [PATCH 1/3] fix: duplicate upload tag Signed-off-by: Terry Howe --- pkg/registry/client.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/pkg/registry/client.go b/pkg/registry/client.go index 750bb9715..4efe9bf7c 100644 --- a/pkg/registry/client.go +++ b/pkg/registry/client.go @@ -713,11 +713,28 @@ func (c *Client) Push(data []byte, ref string, options ...PushOption) (*PushResu repository.PlainHTTP = c.plainHTTP repository.Client = c.authorizer - manifestDescriptor, err = oras.ExtendedCopy(ctx, memoryStore, parsedRef.String(), repository, parsedRef.String(), oras.DefaultExtendedCopyOptions) - if err != nil { + _, _ = fmt.Printf("============== parsedRef.String()=%s\n", parsedRef.String()) + _, _ = fmt.Printf("============== repository.Reference.Registry=%s\n", repository.Reference.Registry) + _, _ = fmt.Printf("============== repository.Reference.Repository=%s\n", repository.Reference.Repository) + _, _ = fmt.Printf("============== repository.Reference.Reference=%s\n", repository.Reference.Reference) + _, _ = fmt.Printf("============== repository.Reference.String=%s\n", repository.Reference.String()) + _, _ = fmt.Printf("============== manifestDescriptor.Digest=%s\n", manifestDescriptor.Digest) + + if err := oras.ExtendedCopyGraph(ctx, memoryStore, repository, manifestDescriptor, oras.DefaultExtendedCopyGraphOptions); err != nil { return nil, err } + //err = repository.Manifests().Tag(ctx, manifestDescriptor, parsedRef.String()) + //if err != nil { + // return nil, err + //} + + // (ctx context.Context, target Target, mediaType string, contentBytes []byte, reference string) (ocispec.Descriptor, error) { + //manifestDescriptor, err = oras.ExtendedCopy(ctx, memoryStore, parsedRef.String(), repository, parsedRef.String(), oras.DefaultExtendedCopyOptions) + //if err != nil { + // return nil, err + //} + chartSummary := &descriptorPushSummaryWithMeta{ Meta: meta, } From 52f64d280a9f84c109ddddaee0bd94f78c66fc4d Mon Sep 17 00:00:00 2001 From: Terry Howe Date: Tue, 14 Oct 2025 14:49:52 -0600 Subject: [PATCH 2/3] refactor: optimize oci registry calls Signed-off-by: Terry Howe --- pkg/registry/client.go | 144 ++++++++++++++++++++++++++++------------- 1 file changed, 99 insertions(+), 45 deletions(-) diff --git a/pkg/registry/client.go b/pkg/registry/client.go index 4efe9bf7c..537f2a1a2 100644 --- a/pkg/registry/client.go +++ b/pkg/registry/client.go @@ -18,6 +18,7 @@ package registry // import "helm.sh/helm/v4/pkg/registry" import ( "context" + "crypto/sha256" "crypto/tls" "crypto/x509" "encoding/json" @@ -30,8 +31,10 @@ import ( "os" "sort" "strings" + "sync" "github.com/Masterminds/semver/v3" + "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/specs-go" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras-go/v2" @@ -76,6 +79,7 @@ type ( credentialsStore credentials.Store httpClient *http.Client plainHTTP bool + wg sync.WaitGroup } // ClientOption allows specifying various settings configurable by the user for overriding the defaults @@ -664,33 +668,52 @@ func (c *Client) Push(data []byte, ref string, options ...PushOption) (*PushResu } } - ctx := context.Background() - - memoryStore := memory.New() - chartDescriptor, err := oras.PushBytes(ctx, memoryStore, ChartLayerMediaType, data) + repository, err := remote.NewRepository(parsedRef.String()) if err != nil { return nil, err } + repository.PlainHTTP = c.plainHTTP + repository.Client = c.authorizer - configData, err := json.Marshal(meta) + ctx := context.Background() + ctx = auth.AppendRepositoryScope(ctx, repository.Reference, auth.ActionPull, auth.ActionPush) + + chartBlob := newBlob(repository, ChartLayerMediaType, data) + exists, err := chartBlob.exists(ctx) if err != nil { return nil, err } - configDescriptor, err := oras.PushBytes(ctx, memoryStore, ConfigMediaType, configData) + layers := []ocispec.Descriptor{chartBlob.descriptor} + if !exists { + c.runWorker(ctx, chartBlob.push) + } + + configData, err := json.Marshal(meta) if err != nil { return nil, err } + configBlob := newBlob(repository, ConfigMediaType, configData) + c.runWorker(ctx, configBlob.pushNew) - layers := []ocispec.Descriptor{chartDescriptor} - var provDescriptor ocispec.Descriptor + var provBlob blob if operation.provData != nil { - provDescriptor, err = oras.PushBytes(ctx, memoryStore, ProvLayerMediaType, operation.provData) - if err != nil { - return nil, err - } + provBlob = newBlob(repository, ProvLayerMediaType, operation.provData) + c.runWorker(ctx, provBlob.pushNew) + } + c.wg.Wait() - layers = append(layers, provDescriptor) + if chartBlob.err != nil { + return nil, chartBlob.err + } + if configBlob.err != nil { + return nil, configBlob.err + } + if provBlob.err != nil { + return nil, provBlob.err + } + if operation.provData != nil { + layers = append(layers, provBlob.descriptor) } // sort layers for determinism, similar to how ORAS v1 does it @@ -700,54 +723,37 @@ func (c *Client) Push(data []byte, ref string, options ...PushOption) (*PushResu ociAnnotations := generateOCIAnnotations(meta, operation.creationTime) - manifestDescriptor, err := c.tagManifest(ctx, memoryStore, configDescriptor, - layers, ociAnnotations, parsedRef) - if err != nil { - return nil, err + manifest := ocispec.Manifest{ + Versioned: specs.Versioned{SchemaVersion: 2}, + Config: configBlob.descriptor, + Layers: layers, + Annotations: ociAnnotations, } - repository, err := remote.NewRepository(parsedRef.String()) + manifestData, err := json.Marshal(manifest) if err != nil { return nil, err } - repository.PlainHTTP = c.plainHTTP - repository.Client = c.authorizer - _, _ = fmt.Printf("============== parsedRef.String()=%s\n", parsedRef.String()) - _, _ = fmt.Printf("============== repository.Reference.Registry=%s\n", repository.Reference.Registry) - _, _ = fmt.Printf("============== repository.Reference.Repository=%s\n", repository.Reference.Repository) - _, _ = fmt.Printf("============== repository.Reference.Reference=%s\n", repository.Reference.Reference) - _, _ = fmt.Printf("============== repository.Reference.String=%s\n", repository.Reference.String()) - _, _ = fmt.Printf("============== manifestDescriptor.Digest=%s\n", manifestDescriptor.Digest) - - if err := oras.ExtendedCopyGraph(ctx, memoryStore, repository, manifestDescriptor, oras.DefaultExtendedCopyGraphOptions); err != nil { + manifestDescriptor, err := oras.TagBytes(ctx, repository, ocispec.MediaTypeImageManifest, + manifestData, parsedRef.String()) + if err != nil { return nil, err } - //err = repository.Manifests().Tag(ctx, manifestDescriptor, parsedRef.String()) - //if err != nil { - // return nil, err - //} - - // (ctx context.Context, target Target, mediaType string, contentBytes []byte, reference string) (ocispec.Descriptor, error) { - //manifestDescriptor, err = oras.ExtendedCopy(ctx, memoryStore, parsedRef.String(), repository, parsedRef.String(), oras.DefaultExtendedCopyOptions) - //if err != nil { - // return nil, err - //} - chartSummary := &descriptorPushSummaryWithMeta{ Meta: meta, } - chartSummary.Digest = chartDescriptor.Digest.String() - chartSummary.Size = chartDescriptor.Size + chartSummary.Digest = chartBlob.descriptor.Digest.String() + chartSummary.Size = chartBlob.descriptor.Size result := &PushResult{ Manifest: &descriptorPushSummary{ Digest: manifestDescriptor.Digest.String(), Size: manifestDescriptor.Size, }, Config: &descriptorPushSummary{ - Digest: configDescriptor.Digest.String(), - Size: configDescriptor.Size, + Digest: configBlob.descriptor.Digest.String(), + Size: configBlob.descriptor.Size, }, Chart: chartSummary, Prov: &descriptorPushSummary{}, // prevent nil references @@ -755,8 +761,8 @@ func (c *Client) Push(data []byte, ref string, options ...PushOption) (*PushResu } if operation.provData != nil { result.Prov = &descriptorPushSummary{ - Digest: provDescriptor.Digest.String(), - Size: provDescriptor.Size, + Digest: provBlob.descriptor.Digest.String(), + Size: provBlob.descriptor.Size, } } _, _ = fmt.Fprintf(c.out, "Pushed: %s\n", result.Ref) @@ -942,3 +948,51 @@ func (c *Client) tagManifest(ctx context.Context, memoryStore *memory.Store, return oras.TagBytes(ctx, memoryStore, ocispec.MediaTypeImageManifest, manifestData, parsedRef.String()) } + +func (c *Client) runWorker(ctx context.Context, worker func(context.Context)) { + c.wg.Add(1) + go func() { + defer c.wg.Done() + worker(ctx) + }() +} + +type blob struct { + mediaType string + dst *remote.Repository + data []byte + descriptor ocispec.Descriptor + err error +} + +func newBlob(dst *remote.Repository, mediaType string, data []byte) blob { + return blob{ + mediaType: mediaType, + dst: dst, + data: data, + } +} + +func (b *blob) exists(ctx context.Context) (bool, error) { + hash := sha256.Sum256(b.data) + b.descriptor.Size = int64(len(b.data)) + b.descriptor.MediaType = b.mediaType + b.descriptor.Digest = digest.NewDigestFromBytes(digest.SHA256, hash[:]) + return b.dst.Exists(ctx, b.descriptor) +} + +func (b *blob) pushNew(ctx context.Context) { + var exists bool + exists, b.err = b.exists(ctx) + if b.err != nil { + return + } + if exists { + return + } + b.descriptor, b.err = oras.PushBytes(ctx, b.dst, b.mediaType, b.data) +} + +func (b *blob) push(ctx context.Context) { + b.descriptor, b.err = oras.PushBytes(ctx, b.dst, b.mediaType, b.data) +} From a8466f849b8e7f7b7ddaf59b89e28da372f13536 Mon Sep 17 00:00:00 2001 From: Terry Howe Date: Sat, 24 Jan 2026 17:49:40 -0700 Subject: [PATCH 3/3] fix: use local WaitGroup in Push to fix concurrent call safety Address Copilot review feedback: - Move WaitGroup from Client struct field to local variable in Push() - Pass WaitGroup to runWorker as parameter instead of accessing via receiver - Add documentation comments to runWorker, blob type, and its methods - Add TestPushConcurrent to verify concurrent Push operations are safe This fixes a potential race condition if Push() is called concurrently on the same Client instance, where the shared WaitGroup could cause incorrect synchronization between unrelated Push operations. Signed-off-by: Terry Howe --- pkg/registry/client.go | 31 +++++++++--- pkg/registry/client_test.go | 99 +++++++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 8 deletions(-) diff --git a/pkg/registry/client.go b/pkg/registry/client.go index 537f2a1a2..7eafe96c7 100644 --- a/pkg/registry/client.go +++ b/pkg/registry/client.go @@ -79,7 +79,6 @@ type ( credentialsStore credentials.Store httpClient *http.Client plainHTTP bool - wg sync.WaitGroup } // ClientOption allows specifying various settings configurable by the user for overriding the defaults @@ -685,8 +684,9 @@ func (c *Client) Push(data []byte, ref string, options ...PushOption) (*PushResu } layers := []ocispec.Descriptor{chartBlob.descriptor} + var wg sync.WaitGroup if !exists { - c.runWorker(ctx, chartBlob.push) + runWorker(ctx, &wg, chartBlob.push) } configData, err := json.Marshal(meta) @@ -694,14 +694,14 @@ func (c *Client) Push(data []byte, ref string, options ...PushOption) (*PushResu return nil, err } configBlob := newBlob(repository, ConfigMediaType, configData) - c.runWorker(ctx, configBlob.pushNew) + runWorker(ctx, &wg, configBlob.pushNew) var provBlob blob if operation.provData != nil { provBlob = newBlob(repository, ProvLayerMediaType, operation.provData) - c.runWorker(ctx, provBlob.pushNew) + runWorker(ctx, &wg, provBlob.pushNew) } - c.wg.Wait() + wg.Wait() if chartBlob.err != nil { return nil, chartBlob.err @@ -949,14 +949,20 @@ func (c *Client) tagManifest(ctx context.Context, memoryStore *memory.Store, manifestData, parsedRef.String()) } -func (c *Client) runWorker(ctx context.Context, worker func(context.Context)) { - c.wg.Add(1) +// runWorker spawns a goroutine to execute the worker function and tracks it +// with the provided WaitGroup. The WaitGroup counter is incremented before +// spawning and decremented when the worker completes. +func runWorker(ctx context.Context, wg *sync.WaitGroup, worker func(context.Context)) { + wg.Add(1) go func() { - defer c.wg.Done() + defer wg.Done() worker(ctx) }() } +// blob represents a content-addressable blob to be pushed to an OCI registry. +// It encapsulates the data, media type, and destination repository, and tracks +// the resulting descriptor and any error from push operations. type blob struct { mediaType string dst *remote.Repository @@ -965,6 +971,7 @@ type blob struct { err error } +// newBlob creates a new blob with the given repository, media type, and data. func newBlob(dst *remote.Repository, mediaType string, data []byte) blob { return blob{ mediaType: mediaType, @@ -973,6 +980,9 @@ func newBlob(dst *remote.Repository, mediaType string, data []byte) blob { } } +// exists checks if the blob already exists in the registry by computing its +// digest and querying the repository. It also populates the blob's descriptor +// with size, media type, and digest information. func (b *blob) exists(ctx context.Context) (bool, error) { hash := sha256.Sum256(b.data) b.descriptor.Size = int64(len(b.data)) @@ -981,6 +991,9 @@ func (b *blob) exists(ctx context.Context) (bool, error) { return b.dst.Exists(ctx, b.descriptor) } +// pushNew checks if the blob exists in the registry first, and only pushes +// if it doesn't exist. This avoids redundant uploads for blobs that are +// already present. Any error is stored in b.err. func (b *blob) pushNew(ctx context.Context) { var exists bool exists, b.err = b.exists(ctx) @@ -993,6 +1006,8 @@ func (b *blob) pushNew(ctx context.Context) { b.descriptor, b.err = oras.PushBytes(ctx, b.dst, b.mediaType, b.data) } +// push unconditionally pushes the blob to the registry without checking +// for existence first. Any error is stored in b.err. func (b *blob) push(ctx context.Context) { b.descriptor, b.err = oras.PushBytes(ctx, b.dst, b.mediaType, b.data) } diff --git a/pkg/registry/client_test.go b/pkg/registry/client_test.go index 98a8b2ea3..223d5b96b 100644 --- a/pkg/registry/client_test.go +++ b/pkg/registry/client_test.go @@ -17,11 +17,15 @@ limitations under the License. package registry import ( + "crypto/sha256" + "fmt" "io" "net/http" "net/http/httptest" + "os" "path/filepath" "strings" + "sync" "testing" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -166,3 +170,98 @@ func TestWarnIfHostHasPath(t *testing.T) { }) } } + +// TestPushConcurrent verifies that concurrent Push operations on the same Client +// do not interfere with each other. This test is designed to catch race conditions +// when run with -race flag. +func TestPushConcurrent(t *testing.T) { + t.Parallel() + + // Create a mock registry server that accepts pushes + var mu sync.Mutex + uploads := make(map[string][]byte) + var uploadCounter int + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodHead && strings.Contains(r.URL.Path, "/blobs/"): + // Blob existence check - return 404 to force upload + w.WriteHeader(http.StatusNotFound) + + case r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/blobs/uploads/"): + // Start upload - return upload URL with unique ID + mu.Lock() + uploadCounter++ + uploadID := fmt.Sprintf("upload-%d", uploadCounter) + mu.Unlock() + w.Header().Set("Location", fmt.Sprintf("%s%s", r.URL.Path, uploadID)) + w.WriteHeader(http.StatusAccepted) + + case r.Method == http.MethodPut && strings.Contains(r.URL.Path, "/blobs/uploads/"): + // Complete upload - extract digest from query param + body, _ := io.ReadAll(r.Body) + digest := r.URL.Query().Get("digest") + mu.Lock() + uploads[r.URL.Path] = body + mu.Unlock() + w.Header().Set("Docker-Content-Digest", digest) + w.WriteHeader(http.StatusCreated) + + case r.Method == http.MethodPut && strings.Contains(r.URL.Path, "/manifests/"): + // Manifest push - compute actual sha256 digest of the body + body, _ := io.ReadAll(r.Body) + hash := sha256.Sum256(body) + digest := fmt.Sprintf("sha256:%x", hash) + w.Header().Set("Docker-Content-Digest", digest) + w.WriteHeader(http.StatusCreated) + + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer srv.Close() + + host := strings.TrimPrefix(srv.URL, "http://") + + // Create client + credFile := filepath.Join(t.TempDir(), "config.json") + client, err := NewClient( + ClientOptWriter(io.Discard), + ClientOptCredentialsFile(credFile), + ClientOptPlainHTTP(), + ) + require.NoError(t, err) + + // Load test chart + chartData, err := os.ReadFile("../downloader/testdata/local-subchart-0.1.0.tgz") + require.NoError(t, err, "no error loading test chart") + + meta, err := extractChartMeta(chartData) + require.NoError(t, err, "no error extracting chart meta") + + // Run concurrent pushes + const numGoroutines = 10 + var wg sync.WaitGroup + errs := make(chan error, numGoroutines) + + for i := range numGoroutines { + wg.Add(1) + go func(idx int) { + defer wg.Done() + // Each goroutine pushes to a different tag to avoid conflicts + ref := fmt.Sprintf("%s/testrepo/%s:%s-%d", host, meta.Name, meta.Version, idx) + _, err := client.Push(chartData, ref, PushOptStrictMode(false)) + if err != nil { + errs <- fmt.Errorf("goroutine %d: %w", idx, err) + } + }(i) + } + + wg.Wait() + close(errs) + + // Check for errors + for err := range errs { + t.Error(err) + } +}