diff --git a/pkg/registry/client.go b/pkg/registry/client.go index b5274b8f8..0dc9b1d5b 100644 --- a/pkg/registry/client.go +++ b/pkg/registry/client.go @@ -18,6 +18,7 @@ package registry // import "helm.sh/helm/v4/pkg/registry" import ( "context" + "crypto/sha256" "crypto/tls" "crypto/x509" "encoding/json" @@ -29,8 +30,10 @@ import ( "os" "sort" "strings" + "sync" "github.com/Masterminds/semver/v3" + "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/specs-go" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras-go/v2" @@ -75,6 +78,7 @@ type ( credentialsStore credentials.Store httpClient *http.Client plainHTTP bool + wg sync.WaitGroup } // ClientOption allows specifying various settings configurable by the user for overriding the defaults @@ -650,33 +654,52 @@ func (c *Client) Push(data []byte, ref string, options ...PushOption) (*PushResu } } - ctx := context.Background() - - memoryStore := memory.New() - chartDescriptor, err := oras.PushBytes(ctx, memoryStore, ChartLayerMediaType, data) + repository, err := remote.NewRepository(parsedRef.String()) if err != nil { return nil, err } + repository.PlainHTTP = c.plainHTTP + repository.Client = c.authorizer - configData, err := json.Marshal(meta) + ctx := context.Background() + ctx = auth.AppendRepositoryScope(ctx, repository.Reference, auth.ActionPull, auth.ActionPush) + + chartBlob := newBlob(repository, ChartLayerMediaType, data) + exists, err := chartBlob.exists(ctx) if err != nil { return nil, err } - configDescriptor, err := oras.PushBytes(ctx, memoryStore, ConfigMediaType, configData) + layers := []ocispec.Descriptor{chartBlob.descriptor} + if !exists { + c.runWorker(ctx, chartBlob.push) + } + + configData, err := json.Marshal(meta) if err != nil { return nil, err } + configBlob := newBlob(repository, ConfigMediaType, configData) + c.runWorker(ctx, configBlob.pushNew) - layers := []ocispec.Descriptor{chartDescriptor} - var provDescriptor ocispec.Descriptor + var provBlob blob if operation.provData != nil { - provDescriptor, err = oras.PushBytes(ctx, memoryStore, ProvLayerMediaType, operation.provData) - if err != nil { - return nil, err - } + provBlob = newBlob(repository, ProvLayerMediaType, operation.provData) + c.runWorker(ctx, provBlob.pushNew) + } + c.wg.Wait() - layers = append(layers, provDescriptor) + if chartBlob.err != nil { + return nil, chartBlob.err + } + if configBlob.err != nil { + return nil, configBlob.err + } + if provBlob.err != nil { + return nil, provBlob.err + } + if operation.provData != nil { + layers = append(layers, provBlob.descriptor) } // sort layers for determinism, similar to how ORAS v1 does it @@ -686,54 +709,37 @@ func (c *Client) Push(data []byte, ref string, options ...PushOption) (*PushResu ociAnnotations := generateOCIAnnotations(meta, operation.creationTime) - manifestDescriptor, err := c.tagManifest(ctx, memoryStore, configDescriptor, - layers, ociAnnotations, parsedRef) - if err != nil { - return nil, err + manifest := ocispec.Manifest{ + Versioned: specs.Versioned{SchemaVersion: 2}, + Config: configBlob.descriptor, + Layers: layers, + Annotations: ociAnnotations, } - repository, err := remote.NewRepository(parsedRef.String()) + manifestData, err := json.Marshal(manifest) if err != nil { return nil, err } - repository.PlainHTTP = c.plainHTTP - repository.Client = c.authorizer - _, _ = fmt.Printf("============== parsedRef.String()=%s\n", parsedRef.String()) - _, _ = fmt.Printf("============== repository.Reference.Registry=%s\n", repository.Reference.Registry) - _, _ = fmt.Printf("============== repository.Reference.Repository=%s\n", repository.Reference.Repository) - _, _ = fmt.Printf("============== repository.Reference.Reference=%s\n", repository.Reference.Reference) - _, _ = fmt.Printf("============== repository.Reference.String=%s\n", repository.Reference.String()) - _, _ = fmt.Printf("============== manifestDescriptor.Digest=%s\n", manifestDescriptor.Digest) - - if err := oras.ExtendedCopyGraph(ctx, memoryStore, repository, manifestDescriptor, oras.DefaultExtendedCopyGraphOptions); err != nil { + manifestDescriptor, err := oras.TagBytes(ctx, repository, ocispec.MediaTypeImageManifest, + manifestData, parsedRef.String()) + if err != nil { return nil, err } - //err = repository.Manifests().Tag(ctx, manifestDescriptor, parsedRef.String()) - //if err != nil { - // return nil, err - //} - - // (ctx context.Context, target Target, mediaType string, contentBytes []byte, reference string) (ocispec.Descriptor, error) { - //manifestDescriptor, err = oras.ExtendedCopy(ctx, memoryStore, parsedRef.String(), repository, parsedRef.String(), oras.DefaultExtendedCopyOptions) - //if err != nil { - // return nil, err - //} - chartSummary := &descriptorPushSummaryWithMeta{ Meta: meta, } - chartSummary.Digest = chartDescriptor.Digest.String() - chartSummary.Size = chartDescriptor.Size + chartSummary.Digest = chartBlob.descriptor.Digest.String() + chartSummary.Size = chartBlob.descriptor.Size result := &PushResult{ Manifest: &descriptorPushSummary{ Digest: manifestDescriptor.Digest.String(), Size: manifestDescriptor.Size, }, Config: &descriptorPushSummary{ - Digest: configDescriptor.Digest.String(), - Size: configDescriptor.Size, + Digest: configBlob.descriptor.Digest.String(), + Size: configBlob.descriptor.Size, }, Chart: chartSummary, Prov: &descriptorPushSummary{}, // prevent nil references @@ -741,8 +747,8 @@ func (c *Client) Push(data []byte, ref string, options ...PushOption) (*PushResu } if operation.provData != nil { result.Prov = &descriptorPushSummary{ - Digest: provDescriptor.Digest.String(), - Size: provDescriptor.Size, + Digest: provBlob.descriptor.Digest.String(), + Size: provBlob.descriptor.Size, } } fmt.Fprintf(c.out, "Pushed: %s\n", result.Ref) @@ -928,3 +934,51 @@ func (c *Client) tagManifest(ctx context.Context, memoryStore *memory.Store, return oras.TagBytes(ctx, memoryStore, ocispec.MediaTypeImageManifest, manifestData, parsedRef.String()) } + +func (c *Client) runWorker(ctx context.Context, worker func(context.Context)) { + c.wg.Add(1) + go func() { + defer c.wg.Done() + worker(ctx) + }() +} + +type blob struct { + mediaType string + dst *remote.Repository + data []byte + descriptor ocispec.Descriptor + err error +} + +func newBlob(dst *remote.Repository, mediaType string, data []byte) blob { + return blob{ + mediaType: mediaType, + dst: dst, + data: data, + } +} + +func (b *blob) exists(ctx context.Context) (bool, error) { + hash := sha256.Sum256(b.data) + b.descriptor.Size = int64(len(b.data)) + b.descriptor.MediaType = b.mediaType + b.descriptor.Digest = digest.NewDigestFromBytes(digest.SHA256, hash[:]) + return b.dst.Exists(ctx, b.descriptor) +} + +func (b *blob) pushNew(ctx context.Context) { + var exists bool + exists, b.err = b.exists(ctx) + if b.err != nil { + return + } + if exists { + return + } + b.descriptor, b.err = oras.PushBytes(ctx, b.dst, b.mediaType, b.data) +} + +func (b *blob) push(ctx context.Context) { + b.descriptor, b.err = oras.PushBytes(ctx, b.dst, b.mediaType, b.data) +}