diff --git a/pkg/action/action.go b/pkg/action/action.go index c93950103..e1bd8d2a1 100644 --- a/pkg/action/action.go +++ b/pkg/action/action.go @@ -238,17 +238,24 @@ func splitAndDeannotate(postrendered string) (map[string]string, error) { // // This code has to do with writing files to disk. func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values, releaseName, outputDir string, subNotes, useReleaseName, includeCrds bool, pr postrenderer.PostRenderer, interactWithRemote, enableDNS, hideSecret bool) ([]*release.Hook, *bytes.Buffer, string, error) { + hs, b, notes, _, err := cfg.renderResourcesWithFiles(ch, values, releaseName, outputDir, subNotes, useReleaseName, includeCrds, pr, interactWithRemote, enableDNS, hideSecret) + return hs, b, notes, err +} + +// renderResourcesWithFiles is the canonical render implementation, also returning the +// sorted manifest slice for sequencing-aware actions. +func (cfg *Configuration) renderResourcesWithFiles(ch *chart.Chart, values common.Values, releaseName, outputDir string, subNotes, useReleaseName, includeCrds bool, pr postrenderer.PostRenderer, interactWithRemote, enableDNS, hideSecret bool) ([]*release.Hook, *bytes.Buffer, string, []releaseutil.Manifest, error) { var hs []*release.Hook b := bytes.NewBuffer(nil) caps, err := cfg.getCapabilities() if err != nil { - return hs, b, "", err + return hs, b, "", nil, err } if ch.Metadata.KubeVersion != "" { if !chartutil.IsCompatibleRange(ch.Metadata.KubeVersion, caps.KubeVersion.String()) { - return hs, b, "", fmt.Errorf("chart requires kubeVersion: %s which is incompatible with Kubernetes %s", ch.Metadata.KubeVersion, caps.KubeVersion.Version) + return hs, b, "", nil, fmt.Errorf("chart requires kubeVersion: %s which is incompatible with Kubernetes %s", ch.Metadata.KubeVersion, caps.KubeVersion.Version) } } @@ -261,7 +268,7 @@ func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values, if interactWithRemote && cfg.RESTClientGetter != nil { restConfig, err := cfg.RESTClientGetter.ToRESTConfig() if err != nil { - return hs, b, "", err + return hs, b, "", nil, err } e := engine.New(restConfig) e.EnableDNS = enableDNS @@ -277,7 +284,7 @@ func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values, } if err2 != nil { - return hs, b, "", err2 + return hs, b, "", nil, err2 } // NOTES.txt gets rendered like all the other files, but because it's not a hook nor a resource, @@ -311,19 +318,19 @@ func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values, // Merge files as stream of documents for sending to post renderer merged, err := annotateAndMerge(files) if err != nil { - return hs, b, notes, fmt.Errorf("error merging manifests: %w", err) + return hs, b, notes, nil, fmt.Errorf("error merging manifests: %w", err) } // Run the post renderer postRendered, err := pr.Run(bytes.NewBufferString(merged)) if err != nil { - return hs, b, notes, fmt.Errorf("error while running post render on files: %w", err) + return hs, b, notes, nil, fmt.Errorf("error while running post render on files: %w", err) } // Use the file list and contents received from the post renderer files, err = splitAndDeannotate(postRendered.String()) if err != nil { - return hs, b, notes, fmt.Errorf("error while parsing post rendered output: %w", err) + return hs, b, notes, nil, fmt.Errorf("error while parsing post rendered output: %w", err) } } @@ -343,7 +350,7 @@ func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values, } fmt.Fprintf(b, "---\n# Source: %s\n%s\n", name, content) } - return hs, b, "", err + return hs, b, "", nil, err } // Aggregate all valid manifests into one big doc. @@ -356,7 +363,7 @@ func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values, } else { err = writeToFile(outputDir, crd.Filename, string(crd.File.Data[:]), fileWritten[crd.Filename]) if err != nil { - return hs, b, "", err + return hs, b, "", nil, err } fileWritten[crd.Filename] = true } @@ -381,13 +388,13 @@ func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values, // used by install or upgrade err = writeToFile(newDir, m.Name, m.Content, fileWritten[m.Name]) if err != nil { - return hs, b, "", err + return hs, b, "", nil, err } fileWritten[m.Name] = true } } - return hs, b, notes, nil + return hs, b, notes, manifests, nil } // RESTClientGetter gets the rest client diff --git a/pkg/action/install.go b/pkg/action/install.go index 50df13c05..84b4e1f39 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -100,6 +100,10 @@ type Install struct { Devel bool DependencyUpdate bool Timeout time.Duration + // ReadinessTimeout is the per-batch timeout when --wait=ordered is used. + // Each ordered batch waits at most this long for resources to become ready. + // Must not exceed Timeout. Defaults to 1 minute when zero. + ReadinessTimeout time.Duration Namespace string ReleaseName string GenerateName bool @@ -305,6 +309,10 @@ func (i *Install) RunWithContext(ctx context.Context, ch ci.Charter, vals map[st return nil, fmt.Errorf("release name check failed: %w", err) } + if i.ReadinessTimeout > 0 && i.Timeout > 0 && i.ReadinessTimeout > i.Timeout { + return nil, fmt.Errorf("--readiness-timeout (%s) must not exceed --timeout (%s)", i.ReadinessTimeout, i.Timeout) + } + if err := chartutil.ProcessDependencies(chrt, vals); err != nil { i.cfg.Logger().Error("chart dependencies processing failed", slog.Any("error", err)) return nil, fmt.Errorf("chart dependencies processing failed: %w", err) @@ -369,8 +377,13 @@ func (i *Install) RunWithContext(ctx context.Context, ch ci.Charter, vals map[st rel := i.createRelease(chrt, vals, i.Labels) + var sortedManifests []releaseutil.Manifest var manifestDoc *bytes.Buffer - rel.Hooks, manifestDoc, rel.Info.Notes, err = i.cfg.renderResources(chrt, valuesToRender, i.ReleaseName, i.OutputDir, i.SubNotes, i.UseReleaseName, i.IncludeCRDs, i.PostRenderer, interactWithServer(i.DryRunStrategy), i.EnableDNS, i.HideSecret) + if i.WaitStrategy == kube.OrderedWaitStrategy { + rel.Hooks, manifestDoc, rel.Info.Notes, sortedManifests, err = i.cfg.renderResourcesWithFiles(chrt, valuesToRender, i.ReleaseName, i.OutputDir, i.SubNotes, i.UseReleaseName, i.IncludeCRDs, i.PostRenderer, interactWithServer(i.DryRunStrategy), i.EnableDNS, i.HideSecret) + } else { + rel.Hooks, manifestDoc, rel.Info.Notes, err = i.cfg.renderResources(chrt, valuesToRender, i.ReleaseName, i.OutputDir, i.SubNotes, i.UseReleaseName, i.IncludeCRDs, i.PostRenderer, interactWithServer(i.DryRunStrategy), i.EnableDNS, i.HideSecret) + } // Even for errors, attach this if available if manifestDoc != nil { rel.Manifest = manifestDoc.String() @@ -464,7 +477,11 @@ func (i *Install) RunWithContext(ctx context.Context, ch ci.Charter, vals map[st return rel, err } - rel, err = i.performInstallCtx(ctx, rel, toBeAdopted, resources) + if i.WaitStrategy == kube.OrderedWaitStrategy { + rel, err = i.performSequencedInstallCtx(ctx, chrt, rel, sortedManifests) + } else { + rel, err = i.performInstallCtx(ctx, rel, toBeAdopted, resources) + } if err != nil { rel, err = i.failRelease(rel, err) } @@ -493,6 +510,89 @@ func (i *Install) performInstallCtx(ctx context.Context, rel *release.Release, t } } +// performSequencedInstallCtx runs performSequencedInstall in a goroutine and +// respects context cancellation, mirroring performInstallCtx. +func (i *Install) performSequencedInstallCtx(ctx context.Context, chrt *chart.Chart, rel *release.Release, manifests []releaseutil.Manifest) (*release.Release, error) { + type Msg struct { + r *release.Release + e error + } + + resultChan := make(chan Msg, 1) + + go func() { + i.goroutineCount.Add(1) + rel, err := i.performSequencedInstall(ctx, chrt, rel, manifests) + resultChan <- Msg{rel, err} + i.goroutineCount.Add(-1) + }() + + select { + case <-ctx.Done(): + return rel, ctx.Err() + case msg := <-resultChan: + return msg.r, msg.e + } +} + +// performSequencedInstall deploys chart resources in DAG-ordered batches when +// --wait=ordered is active. +func (i *Install) performSequencedInstall(ctx context.Context, chrt *chart.Chart, rel *release.Release, manifests []releaseutil.Manifest) (*release.Release, error) { + if !i.DisableHooks { + if err := i.cfg.execHook(rel, release.HookPreInstall, i.WaitStrategy, i.WaitOptions, i.Timeout, i.ServerSideApply); err != nil { + return rel, fmt.Errorf("failed pre-install: %w", err) + } + } + + readinessTimeout := i.ReadinessTimeout + if readinessTimeout <= 0 { + readinessTimeout = time.Minute + } + + rel.SequencingInfo = &release.SequencingInfo{ + Enabled: true, + Strategy: string(i.WaitStrategy), + } + + sd := &sequencedDeployment{ + cfg: i.cfg, + releaseName: rel.Name, + releaseNamespace: rel.Namespace, + disableOpenAPI: i.DisableOpenAPIValidation, + serverSideApply: i.ServerSideApply, + forceConflicts: i.ForceConflicts, + forceReplace: i.ForceReplace, + waitStrategy: i.WaitStrategy, + waitOptions: i.WaitOptions, + waitForJobs: i.WaitForJobs, + timeout: i.Timeout, + readinessTimeout: readinessTimeout, + deadline: computeDeadline(i.Timeout), + } + + if err := sd.deployChartLevel(ctx, chrt, manifests); err != nil { + return rel, err + } + + if !i.DisableHooks { + if err := i.cfg.execHook(rel, release.HookPostInstall, i.WaitStrategy, i.WaitOptions, i.Timeout, i.ServerSideApply); err != nil { + return rel, fmt.Errorf("failed post-install: %w", err) + } + } + + if len(i.Description) > 0 { + rel.SetStatus(rcommon.StatusDeployed, i.Description) + } else { + rel.SetStatus(rcommon.StatusDeployed, "Install complete") + } + + if err := i.recordRelease(rel); err != nil { + i.cfg.Logger().Error("failed to record the release", slog.Any("error", err)) + } + + return rel, nil +} + // getGoroutineCount return the number of running routines func (i *Install) getGoroutineCount() int32 { return i.goroutineCount.Load() @@ -507,6 +607,10 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource } } + if err := stripSequencingAnnotations(resources); err != nil { + return rel, fmt.Errorf("stripping sequencing annotations: %w", err) + } + // At this point, we can do the install. Note that before we were detecting whether to // do an update, but it's not clear whether we WANT to do an update if the reuse is set // to true, since that is basically an upgrade operation. diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index 459569781..2d096c8af 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -18,6 +18,7 @@ package action import ( "bytes" + "context" "errors" "fmt" "time" @@ -28,6 +29,7 @@ import ( "helm.sh/helm/v4/pkg/kube" "helm.sh/helm/v4/pkg/release/common" release "helm.sh/helm/v4/pkg/release/v1" + releaseutil "helm.sh/helm/v4/pkg/release/v1/util" "helm.sh/helm/v4/pkg/storage/driver" ) @@ -92,7 +94,7 @@ func (r *Rollback) Run(name string) error { } r.cfg.Logger().Debug("performing rollback", "name", name) - if _, err := r.performRollback(currentRelease, targetRelease, serverSideApply); err != nil { + if _, err := r.performRollback(context.Background(), currentRelease, targetRelease, serverSideApply); err != nil { return err } @@ -184,22 +186,27 @@ func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Rele // message here, and only override it later if we experience failure. Description: fmt.Sprintf("Rollback to %d", previousVersion), }, - Version: currentRelease.Version + 1, - Labels: previousRelease.Labels, - Manifest: previousRelease.Manifest, - Hooks: previousRelease.Hooks, - ApplyMethod: string(determineReleaseSSApplyMethod(serverSideApply)), + Version: currentRelease.Version + 1, + Labels: previousRelease.Labels, + Manifest: previousRelease.Manifest, + Hooks: previousRelease.Hooks, + ApplyMethod: string(determineReleaseSSApplyMethod(serverSideApply)), + SequencingInfo: previousRelease.SequencingInfo, } return currentRelease, targetRelease, serverSideApply, nil } -func (r *Rollback) performRollback(currentRelease, targetRelease *release.Release, serverSideApply bool) (*release.Release, error) { +func (r *Rollback) performRollback(ctx context.Context, currentRelease, targetRelease *release.Release, serverSideApply bool) (*release.Release, error) { if isDryRun(r.DryRunStrategy) { r.cfg.Logger().Debug("dry run", "name", targetRelease.Name) return targetRelease, nil } + if targetRelease.SequencingInfo != nil && targetRelease.SequencingInfo.Enabled { + return r.performSequencedRollback(ctx, currentRelease, targetRelease, serverSideApply) + } + current, err := r.cfg.KubeClient.Build(bytes.NewBufferString(currentRelease.Manifest), false) if err != nil { return targetRelease, fmt.Errorf("unable to build kubernetes objects from current release manifest: %w", err) @@ -304,3 +311,131 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas return targetRelease, nil } + +func (r *Rollback) failRollback(currentRelease, targetRelease *release.Release, created kube.ResourceList, err error) (*release.Release, error) { + msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) + r.cfg.Logger().Warn(msg) + + currentRelease.Info.Status = common.StatusSuperseded + targetRelease.Info.Status = common.StatusFailed + targetRelease.Info.Description = msg + r.cfg.recordRelease(currentRelease) + r.cfg.recordRelease(targetRelease) + + if r.CleanupOnFail && len(created) > 0 { + r.cfg.Logger().Debug("cleanup on fail set, cleaning up resources", "count", len(created)) + _, errs := r.cfg.KubeClient.Delete(created, metav1.DeletePropagationBackground) + if errs != nil { + return targetRelease, fmt.Errorf( + "an error occurred while cleaning up resources after rollback failure: %v: %w", + err, + joinErrors(errs, ", "), + ) + } + r.cfg.Logger().Debug("resource cleanup complete") + } + + return targetRelease, err +} + +// performSequencedRollback deploys the target revision's resources in DAG-ordered batches +// when the target revision was originally deployed with --wait=ordered. +func (r *Rollback) performSequencedRollback(ctx context.Context, currentRelease, targetRelease *release.Release, serverSideApply bool) (*release.Release, error) { + fail := func(created kube.ResourceList, err error) (*release.Release, error) { + return r.failRollback(currentRelease, targetRelease, created, err) + } + + current, err := r.cfg.KubeClient.Build(bytes.NewBufferString(currentRelease.Manifest), false) + if err != nil { + return fail(nil, fmt.Errorf("unable to build kubernetes objects from current release manifest: %w", err)) + } + + target, err := r.cfg.KubeClient.Build(bytes.NewBufferString(targetRelease.Manifest), false) + if err != nil { + return fail(nil, fmt.Errorf("unable to build kubernetes objects from target release manifest: %w", err)) + } + + rawManifests := releaseutil.SplitManifests(targetRelease.Manifest) + _, sortedManifests, err := releaseutil.SortManifests(rawManifests, nil, releaseutil.InstallOrder) + if err != nil { + return fail(nil, fmt.Errorf("parsing target release manifest for sequenced rollback: %w", err)) + } + + if !r.DisableHooks { + if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.WaitStrategy, r.WaitOptions, r.Timeout, serverSideApply); err != nil { + return fail(nil, err) + } + } else { + r.cfg.Logger().Debug("rollback hooks disabled", "name", targetRelease.Name) + } + + readinessTimeout := r.Timeout / 2 + if readinessTimeout <= 0 { + readinessTimeout = time.Minute + } + + sd := &sequencedDeployment{ + cfg: r.cfg, + releaseName: targetRelease.Name, + releaseNamespace: targetRelease.Namespace, + serverSideApply: serverSideApply, + forceConflicts: r.ForceConflicts, + forceReplace: r.ForceReplace, + waitStrategy: r.WaitStrategy, + waitOptions: r.WaitOptions, + waitForJobs: r.WaitForJobs, + timeout: r.Timeout, + readinessTimeout: readinessTimeout, + deadline: computeDeadline(r.Timeout), + upgradeMode: true, + currentResources: current, + upgradeCSAFieldManager: true, + } + + if err := sd.deployChartLevel(ctx, targetRelease.Chart, sortedManifests); err != nil { + return fail(sd.createdResources, err) + } + + allTargetKeys := make(map[string]bool, len(target)) + for _, resource := range target { + allTargetKeys[objectKey(resource)] = true + } + + var toBeDeleted kube.ResourceList + for _, resource := range current { + if !allTargetKeys[objectKey(resource)] { + toBeDeleted = append(toBeDeleted, resource) + } + } + + if len(toBeDeleted) > 0 { + if _, errs := r.cfg.KubeClient.Delete(toBeDeleted, metav1.DeletePropagationBackground); errs != nil { + return fail(sd.createdResources, fmt.Errorf("deleting removed resources during rollback: %w", joinErrors(errs, ", "))) + } + } + + if !r.DisableHooks { + if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.WaitStrategy, r.WaitOptions, r.Timeout, serverSideApply); err != nil { + return fail(sd.createdResources, err) + } + } + + deployed, err := r.cfg.Releases.DeployedAll(currentRelease.Name) + if err != nil && !errors.Is(err, driver.ErrNoDeployedReleases) { + return fail(sd.createdResources, err) + } + + for _, reli := range deployed { + rel, err := releaserToV1Release(reli) + if err != nil { + return fail(sd.createdResources, err) + } + r.cfg.Logger().Debug("superseding previous deployment", "version", rel.Version) + rel.Info.Status = common.StatusSuperseded + r.cfg.recordRelease(rel) + } + + targetRelease.Info.Status = common.StatusDeployed + + return targetRelease, nil +} diff --git a/pkg/action/sequencing.go b/pkg/action/sequencing.go new file mode 100644 index 000000000..f65fb762f --- /dev/null +++ b/pkg/action/sequencing.go @@ -0,0 +1,440 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package action + +import ( + "bytes" + "context" + "fmt" + "log/slog" + "strings" + "time" + + chartv2 "helm.sh/helm/v4/pkg/chart/v2" + chartutil "helm.sh/helm/v4/pkg/chart/v2/util" + "helm.sh/helm/v4/pkg/kube" + releaseutil "helm.sh/helm/v4/pkg/release/v1/util" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/cli-runtime/pkg/resource" +) + +// computeDeadline returns a deadline based on the given timeout duration. +// If timeout is zero or negative, it returns the zero time (no deadline). +func computeDeadline(timeout time.Duration) time.Time { + if timeout <= 0 { + return time.Time{} + } + return time.Now().Add(timeout) +} + +// GroupManifestsByDirectSubchart groups manifests by the direct subchart they belong to. +// The current chart's own manifests are returned under the empty string key "". +// Subcharts are keyed by their immediate name under the first `/charts//` +// segment found in the manifest source path. +// Nested subcharts (e.g., `/charts/sub/charts/nested/`) are grouped under +// the direct subchart name ("sub"), since nested sequencing is handled recursively. +func GroupManifestsByDirectSubchart(manifests []releaseutil.Manifest, chartName string) map[string][]releaseutil.Manifest { + result := make(map[string][]releaseutil.Manifest) + if chartName == "" { + // Fallback: assign everything to parent + result[""] = append(result[""], manifests...) + return result + } + + chartsPrefix := chartName + "/charts/" + for _, m := range manifests { + idx := strings.Index(m.Name, chartsPrefix) + if idx < 0 { + // Parent chart manifest + result[""] = append(result[""], m) + continue + } + // Extract the direct subchart name (first segment after "/charts/") + rest := m.Name[idx+len(chartsPrefix):] + // rest is like "subchart1/templates/deploy.yaml" or "subchart1/charts/nested/..." + slashIdx := strings.Index(rest, "/") + if slashIdx < 0 { + // Unlikely: a file directly under charts/ with no subdirectory + result[""] = append(result[""], m) + continue + } + subchartName := rest[:slashIdx] + result[subchartName] = append(result[subchartName], m) + } + return result +} + +// buildManifestYAML concatenates the Content fields of the given manifests into a single +// YAML stream suitable for passing to KubeClient.Build(). +func buildManifestYAML(manifests []releaseutil.Manifest) string { + if len(manifests) == 0 { + return "" + } + var buf strings.Builder + for i, m := range manifests { + if i > 0 { + buf.WriteString("---\n") + } + buf.WriteString(m.Content) + buf.WriteString("\n") + } + return buf.String() +} + +// sequencedDeployment performs ordered installation or upgrade of chart resources. +// It handles the two-level DAG: first subchart ordering, then resource-group +// ordering within each chart level. +type sequencedDeployment struct { + cfg *Configuration + releaseName string + releaseNamespace string + disableOpenAPI bool + serverSideApply bool + forceConflicts bool + forceReplace bool + waitStrategy kube.WaitStrategy + waitOptions []kube.WaitOption + waitForJobs bool + timeout time.Duration + readinessTimeout time.Duration + deadline time.Time // overall operation deadline + + // Upgrade-specific fields. When upgradeMode is true, createAndWait delegates + // to updateAndWait which calls KubeClient.Update() instead of Create(). + upgradeMode bool + currentResources kube.ResourceList // full set of old (current) resources + upgradeCSAFieldManager bool // upgrade client-side apply field manager + createdResources kube.ResourceList +} + +// deployChartLevel deploys all resources for a single chart level in sequenced order. +// It first handles subcharts in dependency order (recursively), then deploys the +// parent chart's own resource-group batches. +func (s *sequencedDeployment) deployChartLevel(ctx context.Context, chrt *chartv2.Chart, manifests []releaseutil.Manifest) error { + // Group manifests by direct subchart + grouped := GroupManifestsByDirectSubchart(manifests, chrt.Name()) + + // Build subchart DAG and deploy in topological order + dag, err := chartutil.BuildSubchartDAG(chrt) + if err != nil { + return fmt.Errorf("building subchart DAG for %s: %w", chrt.Name(), err) + } + + batches, err := dag.GetBatches() + if err != nil { + return fmt.Errorf("getting subchart batches for %s: %w", chrt.Name(), err) + } + + // Deploy each subchart batch in order + for batchIdx, batch := range batches { + for _, subchartName := range batch { + subManifests := grouped[subchartName] + if len(subManifests) == 0 { + continue + } + + // Find the subchart chart object for recursive nested sequencing + subChart := findSubchart(chrt, subchartName) + if subChart == nil { + // Subchart not found in chart object (may have been disabled or aliased differently) + // Fall back to flat resource-group deployment for these manifests + slog.Warn("subchart not found in chart dependencies; deploying without subchart sequencing", + "subchart", subchartName, + "batch", batchIdx, + ) + if err := s.deployResourceGroupBatches(ctx, subManifests); err != nil { + return fmt.Errorf("deploying subchart %s resources: %w", subchartName, err) + } + continue + } + + // Recursively deploy the subchart (handles its own nested subcharts and resource-groups) + if err := s.deployChartLevel(ctx, subChart, subManifests); err != nil { + return fmt.Errorf("deploying subchart %s: %w", subchartName, err) + } + } + } + + // Deploy parent chart's own resources (after all subchart batches complete) + parentManifests := grouped[""] + if len(parentManifests) > 0 { + if err := s.deployResourceGroupBatches(ctx, parentManifests); err != nil { + return fmt.Errorf("deploying %s own resources: %w", chrt.Name(), err) + } + } + + return nil +} + +func batchHasCustomReadiness(manifests []releaseutil.Manifest) bool { + for _, manifest := range manifests { + if manifest.Head == nil || manifest.Head.Metadata == nil { + continue + } + annotations := manifest.Head.Metadata.Annotations + if annotations == nil { + continue + } + if annotations[kube.AnnotationReadinessSuccess] != "" || annotations[kube.AnnotationReadinessFailure] != "" { + return true + } + } + return false +} + +// deployResourceGroupBatches deploys manifests for a single chart level using +// resource-group annotation DAG ordering. Resources without group annotations +// (or with invalid ones) are deployed last. +func (s *sequencedDeployment) deployResourceGroupBatches(ctx context.Context, manifests []releaseutil.Manifest) error { + if len(manifests) == 0 { + return nil + } + + result, _, err := releaseutil.ParseResourceGroups(manifests) + if err != nil { + return fmt.Errorf("parsing resource-group annotations: %w", err) + } + + // If there are sequenced groups, build their DAG and deploy in order + if len(result.Groups) > 0 { + dag, err := releaseutil.BuildResourceGroupDAG(result) + if err != nil { + return fmt.Errorf("building resource-group DAG: %w", err) + } + + batches, err := dag.GetBatches() + if err != nil { + return fmt.Errorf("getting resource-group batches: %w", err) + } + + for _, groupBatch := range batches { + var batchManifests []releaseutil.Manifest + for _, groupName := range groupBatch { + batchManifests = append(batchManifests, result.Groups[groupName]...) + } + if err := s.createAndWait(ctx, batchManifests); err != nil { + return err + } + } + } + + // Deploy unsequenced resources last + if len(result.Unsequenced) > 0 { + if err := s.createAndWait(ctx, result.Unsequenced); err != nil { + return err + } + } + + return nil +} + +// helmSequencingAnnotations lists annotation keys used internally by Helm for +// resource sequencing. These are stripped from resources before applying to +// Kubernetes because some (e.g. helm.sh/depends-on/resource-groups) contain +// multiple slashes which is invalid per the K8s annotation key format. +var helmSequencingAnnotations = []string{ + releaseutil.AnnotationDependsOnResourceGroups, +} + +// stripSequencingAnnotations removes Helm-internal sequencing annotations from +// resources before they are applied to Kubernetes. This prevents K8s API +// validation errors for annotation keys that are not valid K8s label keys. +func stripSequencingAnnotations(resources kube.ResourceList) error { + return resources.Visit(func(info *resource.Info, err error) error { + if err != nil { + return err + } + acc, accErr := meta.Accessor(info.Object) + if accErr != nil { + return nil // skip non-accessible objects + } + annotations := acc.GetAnnotations() + if len(annotations) == 0 { + return nil + } + changed := false + for _, key := range helmSequencingAnnotations { + if _, exists := annotations[key]; exists { + delete(annotations, key) + changed = true + } + } + if changed { + acc.SetAnnotations(annotations) + } + return nil + }) +} + +// createAndWait creates (or updates, in upgrade mode) a set of manifest resources +// and waits for them to be ready. It respects both the per-batch readiness timeout +// and the overall operation deadline. +func (s *sequencedDeployment) createAndWait(ctx context.Context, manifests []releaseutil.Manifest) error { + if s.upgradeMode { + return s.updateAndWait(ctx, manifests) + } + + if len(manifests) == 0 { + return nil + } + + yaml := buildManifestYAML(manifests) + resources, err := s.cfg.KubeClient.Build(bytes.NewBufferString(yaml), !s.disableOpenAPI) + if err != nil { + return fmt.Errorf("building resource batch: %w", err) + } + if len(resources) == 0 { + return nil + } + + if err := resources.Visit(setMetadataVisitor(s.releaseName, s.releaseNamespace, true)); err != nil { + return fmt.Errorf("setting metadata for resource batch: %w", err) + } + + if err := stripSequencingAnnotations(resources); err != nil { + return fmt.Errorf("stripping sequencing annotations: %w", err) + } + + result, err := s.cfg.KubeClient.Create(resources, kube.ClientCreateOptionServerSideApply(s.serverSideApply, false)) + if err != nil { + return fmt.Errorf("creating resource batch: %w", err) + } + s.createdResources = append(s.createdResources, result.Created...) + + return s.waitForResources(resources, manifests) +} + +// updateAndWait applies an upgrade batch using KubeClient.Update() and waits for readiness. +// It matches current (old) resources by objectKey to compute the per-batch diff. +func (s *sequencedDeployment) updateAndWait(_ context.Context, manifests []releaseutil.Manifest) error { + if len(manifests) == 0 { + return nil + } + + yaml := buildManifestYAML(manifests) + target, err := s.cfg.KubeClient.Build(bytes.NewBufferString(yaml), !s.disableOpenAPI) + if err != nil { + return fmt.Errorf("building resource batch: %w", err) + } + if len(target) == 0 { + return nil + } + + if err := target.Visit(setMetadataVisitor(s.releaseName, s.releaseNamespace, true)); err != nil { + return fmt.Errorf("setting metadata for resource batch: %w", err) + } + + if err := stripSequencingAnnotations(target); err != nil { + return fmt.Errorf("stripping sequencing annotations: %w", err) + } + + // Find the subset of current (old) resources that are represented in this batch. + // Update() will handle creates (target resources not in matchingCurrent) and + // updates (resources in both). Deletions are handled separately after all batches. + targetKeys := make(map[string]bool, len(target)) + for _, r := range target { + targetKeys[objectKey(r)] = true + } + var matchingCurrent kube.ResourceList + for _, r := range s.currentResources { + if targetKeys[objectKey(r)] { + matchingCurrent = append(matchingCurrent, r) + } + } + + result, err := s.cfg.KubeClient.Update( + matchingCurrent, + target, + kube.ClientUpdateOptionForceReplace(s.forceReplace), + kube.ClientUpdateOptionServerSideApply(s.serverSideApply, s.forceConflicts), + kube.ClientUpdateOptionUpgradeClientSideFieldManager(s.upgradeCSAFieldManager), + ) + if err != nil { + return fmt.Errorf("updating resource batch: %w", err) + } + s.createdResources = append(s.createdResources, result.Created...) + + return s.waitForResources(target, manifests) +} + +// waitForResources waits for the given resources to become ready, +// applying the per-batch and overall deadline constraints. +func (s *sequencedDeployment) waitForResources(resources kube.ResourceList, manifests []releaseutil.Manifest) error { + // Determine effective wait timeout: min(readinessTimeout, remaining time to overall deadline) + waitTimeout := s.readinessTimeout + if !s.deadline.IsZero() { + remaining := time.Until(s.deadline) + if remaining <= 0 { + return fmt.Errorf("overall timeout exceeded before waiting for resource batch") + } + if waitTimeout <= 0 || remaining < waitTimeout { + waitTimeout = remaining + } + } + if waitTimeout <= 0 { + waitTimeout = time.Minute // safe default + } + + var err error + var waiter kube.Waiter + waitOptions := append([]kube.WaitOption(nil), s.waitOptions...) + if batchHasCustomReadiness(manifests) { + waitOptions = append(waitOptions, kube.WithCustomReadinessStatusReader()) + } + if c, ok := s.cfg.KubeClient.(kube.InterfaceWaitOptions); ok { + waiter, err = c.GetWaiterWithOptions(s.waitStrategy, waitOptions...) + } else { + waiter, err = s.cfg.KubeClient.GetWaiter(s.waitStrategy) + } + if err != nil { + return fmt.Errorf("getting waiter for resource batch: %w", err) + } + + if s.waitForJobs { + return waiter.WaitWithJobs(resources, waitTimeout) + } + return waiter.Wait(resources, waitTimeout) +} + +// findSubchart finds the subchart chart object within chrt's direct dependencies by name or alias. +// It uses the parent chart's Metadata.Dependencies to resolve aliases, since the alias is stored +// on the Dependency struct in Chart.yaml, not on the subchart's own Metadata. +func findSubchart(chrt *chartv2.Chart, nameOrAlias string) *chartv2.Chart { + // Build a map from subchart chart name → alias from the parent's dependency declarations. + aliasMap := make(map[string]string) // chart name → effective name (alias or name) + if chrt.Metadata != nil { + for _, dep := range chrt.Metadata.Dependencies { + effective := dep.Name + if dep.Alias != "" { + effective = dep.Alias + } + aliasMap[dep.Name] = effective + } + } + + for _, dep := range chrt.Dependencies() { + effective := dep.Name() + if alias, ok := aliasMap[dep.Name()]; ok { + effective = alias + } + if effective == nameOrAlias || dep.Name() == nameOrAlias { + return dep + } + } + return nil +} diff --git a/pkg/action/uninstall.go b/pkg/action/uninstall.go index 79156991c..9507b3727 100644 --- a/pkg/action/uninstall.go +++ b/pkg/action/uninstall.go @@ -25,10 +25,13 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + chartcommon "helm.sh/helm/v4/pkg/chart/common" + renderutil "helm.sh/helm/v4/pkg/chart/common/util" + chart "helm.sh/helm/v4/pkg/chart/v2" chartutil "helm.sh/helm/v4/pkg/chart/v2/util" "helm.sh/helm/v4/pkg/kube" releasei "helm.sh/helm/v4/pkg/release" - "helm.sh/helm/v4/pkg/release/common" + releasecommon "helm.sh/helm/v4/pkg/release/common" release "helm.sh/helm/v4/pkg/release/v1" releaseutil "helm.sh/helm/v4/pkg/release/v1/util" "helm.sh/helm/v4/pkg/storage/driver" @@ -116,7 +119,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error) // TODO: Are there any cases where we want to force a delete even if it's // already marked deleted? - if rel.Info.Status == common.StatusUninstalled { + if rel.Info.Status == releasecommon.StatusUninstalled { if !u.KeepHistory { if err := u.purgeReleases(rels...); err != nil { return nil, fmt.Errorf("uninstall: Failed to purge the release: %w", err) @@ -127,7 +130,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error) } u.cfg.Logger().Debug("uninstall: deleting release", "name", name) - rel.Info.Status = common.StatusUninstalling + rel.Info.Status = releasecommon.StatusUninstalling rel.Info.Deleted = time.Now() rel.Info.Description = "Deletion in progress (or silently failed)" res := &releasei.UninstallReleaseResponse{Release: rel} @@ -147,7 +150,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error) u.cfg.Logger().Debug("uninstall: Failed to store updated release", slog.Any("error", err)) } - deletedResources, kept, errs := u.deleteRelease(rel) + deletedResources, kept, errs := u.deleteRelease(rel, waiter) if errs != nil { u.cfg.Logger().Debug("uninstall: Failed to delete release", slog.Any("error", errs)) return nil, fmt.Errorf("failed to delete release: %s", name) @@ -158,8 +161,10 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error) } res.Info = kept - if err := waiter.WaitForDelete(deletedResources, u.Timeout); err != nil { - errs = append(errs, err) + if !isSequencedRelease(rel) { + if err := waiter.WaitForDelete(deletedResources, u.Timeout); err != nil { + errs = append(errs, err) + } } if !u.DisableHooks { @@ -169,7 +174,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error) } } - rel.Info.Status = common.StatusUninstalled + rel.Info.Status = releasecommon.StatusUninstalled if len(u.Description) > 0 { rel.Info.Description = u.Description } else { @@ -208,7 +213,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error) } u.cfg.Logger().Debug("superseding previous deployment", "version", rel.Version) - rel.Info.Status = common.StatusSuperseded + rel.Info.Status = releasecommon.StatusSuperseded if err := u.cfg.Releases.Update(rel); err != nil { u.cfg.Logger().Debug("uninstall: Failed to store updated release", slog.Any("error", err)) } @@ -220,6 +225,10 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error) return res, nil } +func isSequencedRelease(rel *release.Release) bool { + return rel.SequencingInfo != nil && rel.SequencingInfo.Enabled +} + func (u *Uninstall) purgeReleases(rels ...*release.Release) error { for _, rel := range rels { if _, err := u.cfg.Releases.Delete(rel.Name, rel.Version); err != nil { @@ -253,10 +262,8 @@ func (e *joinedErrors) Unwrap() []error { return e.errs } -// deleteRelease deletes the release and returns list of delete resources and manifests that were kept in the deletion process -func (u *Uninstall) deleteRelease(rel *release.Release) (kube.ResourceList, string, []error) { - var errs []error - +// deleteRelease deletes the release and returns list of delete resources and manifests that were kept in the deletion process. +func (u *Uninstall) deleteRelease(rel *release.Release, waiter kube.Waiter) (kube.ResourceList, string, []error) { manifests := releaseutil.SplitManifests(rel.Manifest) _, files, err := releaseutil.SortManifests(manifests, nil, releaseutil.UninstallOrder) if err != nil { @@ -273,19 +280,247 @@ func (u *Uninstall) deleteRelease(rel *release.Release) (kube.ResourceList, stri fmt.Fprintf(&kept, "[%s] %s\n", f.Head.Kind, f.Head.Metadata.Name) } + if isSequencedRelease(rel) { + filesToDelete = u.recoverManifestPathsForSequencedUninstall(rel, filesToDelete) + deleted, errs := u.sequencedDeleteManifests(rel.Chart, filesToDelete, waiter) + return deleted, kept.String(), errs + } + + resources, err := u.buildDeleteResources(filesToDelete) + if err != nil { + return nil, "", []error{err} + } + if len(resources) > 0 { + _, errs := u.cfg.KubeClient.Delete(resources, parseCascadingFlag(u.DeletionPropagation)) + return resources, kept.String(), errs + } + return resources, kept.String(), nil +} + +func (u *Uninstall) buildDeleteResources(manifests []releaseutil.Manifest) (kube.ResourceList, error) { var builder strings.Builder - for _, file := range filesToDelete { + for _, file := range manifests { builder.WriteString("\n---\n" + file.Content) } resources, err := u.cfg.KubeClient.Build(strings.NewReader(builder.String()), false) if err != nil { - return nil, "", []error{fmt.Errorf("unable to build kubernetes objects for delete: %w", err)} + return nil, fmt.Errorf("unable to build kubernetes objects for delete: %w", err) } - if len(resources) > 0 { - _, errs = u.cfg.KubeClient.Delete(resources, parseCascadingFlag(u.DeletionPropagation)) + return resources, nil +} + +func (u *Uninstall) deleteManifestBatch(manifests []releaseutil.Manifest, waiter kube.Waiter) (kube.ResourceList, []error) { + resources, err := u.buildDeleteResources(manifests) + if err != nil || len(resources) == 0 { + if err != nil { + return nil, []error{err} + } + return nil, nil + } + + _, errs := u.cfg.KubeClient.Delete(resources, parseCascadingFlag(u.DeletionPropagation)) + if len(errs) > 0 { + return resources, errs + } + if err := waiter.WaitForDelete(resources, u.Timeout); err != nil { + errs = append(errs, err) + } + return resources, errs +} + +func (u *Uninstall) sequencedDeleteManifests(chrt *chart.Chart, manifests []releaseutil.Manifest, waiter kube.Waiter) (kube.ResourceList, []error) { + if chrt == nil { + return u.deleteResourceGroupBatchesReverse(manifests, waiter) + } + return u.deleteChartLevelReverse(chrt, manifests, waiter) +} + +func (u *Uninstall) deleteChartLevelReverse(chrt *chart.Chart, manifests []releaseutil.Manifest, waiter kube.Waiter) (kube.ResourceList, []error) { + grouped := GroupManifestsByDirectSubchart(manifests, chrt.Name()) + + allDeleted, errs := u.deleteResourceGroupBatchesReverse(grouped[""], waiter) + if len(errs) > 0 { + return allDeleted, errs + } + + dag, err := chartutil.BuildSubchartDAG(chrt) + if err != nil { + return allDeleted, []error{fmt.Errorf("building subchart DAG for sequenced uninstall: %w", err)} + } + + batches, err := dag.GetBatches() + if err != nil { + return allDeleted, []error{fmt.Errorf("getting subchart batches for sequenced uninstall: %w", err)} + } + + for i, j := 0, len(batches)-1; i < j; i, j = i+1, j-1 { + batches[i], batches[j] = batches[j], batches[i] + } + + for _, batch := range batches { + for _, subchartName := range batch { + subchartManifests := grouped[subchartName] + if len(subchartManifests) == 0 { + continue + } + + subchart := findSubchart(chrt, subchartName) + var deleted kube.ResourceList + if subchart == nil { + u.cfg.Logger().Warn("subchart not found in chart dependencies during sequenced uninstall; falling back to flat resource-group deletion", "subchart", subchartName) + deleted, errs = u.deleteResourceGroupBatchesReverse(subchartManifests, waiter) + } else { + deleted, errs = u.deleteChartLevelReverse(subchart, subchartManifests, waiter) + } + allDeleted = append(allDeleted, deleted...) + if len(errs) > 0 { + return allDeleted, errs + } + } + } + + return allDeleted, nil +} + +func (u *Uninstall) deleteResourceGroupBatchesReverse(manifests []releaseutil.Manifest, waiter kube.Waiter) (kube.ResourceList, []error) { + result, warnings, err := releaseutil.ParseResourceGroups(manifests) + if err != nil { + return nil, []error{fmt.Errorf("parsing resource-group annotations for sequenced uninstall: %w", err)} + } + for _, warning := range warnings { + u.cfg.Logger().Warn("resource-group annotation warning during uninstall", "warning", warning) } - return resources, kept.String(), errs + + var allDeleted kube.ResourceList + + if len(result.Unsequenced) > 0 { + deleted, errs := u.deleteManifestBatch(result.Unsequenced, waiter) + allDeleted = append(allDeleted, deleted...) + if len(errs) > 0 { + return allDeleted, errs + } + } + + if len(result.Groups) == 0 { + return allDeleted, nil + } + + dag, err := releaseutil.BuildResourceGroupDAG(result) + if err != nil { + return allDeleted, []error{fmt.Errorf("building resource-group DAG for sequenced uninstall: %w", err)} + } + batches, err := dag.GetBatches() + if err != nil { + return allDeleted, []error{fmt.Errorf("getting resource-group batches for sequenced uninstall: %w", err)} + } + + for i, j := 0, len(batches)-1; i < j; i, j = i+1, j-1 { + batches[i], batches[j] = batches[j], batches[i] + } + + for _, batch := range batches { + var batchManifests []releaseutil.Manifest + for _, groupName := range batch { + batchManifests = append(batchManifests, result.Groups[groupName]...) + } + deleted, errs := u.deleteManifestBatch(batchManifests, waiter) + allDeleted = append(allDeleted, deleted...) + if len(errs) > 0 { + return allDeleted, errs + } + } + + return allDeleted, nil +} + +func (u *Uninstall) recoverManifestPathsForSequencedUninstall(rel *release.Release, manifests []releaseutil.Manifest) []releaseutil.Manifest { + rendered, err := u.renderReleaseManifestsWithPaths(rel) + if err != nil { + u.cfg.Logger().Warn("unable to recover rendered chart paths for sequenced uninstall; falling back to stored manifest order", slog.Any("error", err)) + return manifests + } + return applyRenderedManifestPaths(manifests, rendered) +} + +func (u *Uninstall) renderReleaseManifestsWithPaths(rel *release.Release) ([]releaseutil.Manifest, error) { + if rel.Chart == nil { + return nil, errors.New("release chart is missing") + } + if err := chartutil.ProcessDependencies(rel.Chart, rel.Config); err != nil { + return nil, fmt.Errorf("processing chart dependencies: %w", err) + } + + caps, err := u.cfg.getCapabilities() + if err != nil { + return nil, fmt.Errorf("getting capabilities: %w", err) + } + + valuesToRender, err := renderutil.ToRenderValuesWithSchemaValidation( + rel.Chart, + rel.Config, + chartcommon.ReleaseOptions{ + Name: rel.Name, + Namespace: rel.Namespace, + Revision: rel.Version, + IsInstall: rel.Version <= 1, + IsUpgrade: rel.Version > 1, + }, + caps, + true, + ) + if err != nil { + return nil, fmt.Errorf("building render values: %w", err) + } + + _, _, _, manifests, err := u.cfg.renderResourcesWithFiles(rel.Chart, valuesToRender, rel.Name, "", false, false, false, nil, false, false, false) + if err != nil { + return nil, fmt.Errorf("rendering chart manifests: %w", err) + } + return manifests, nil +} + +func applyRenderedManifestPaths(stored, rendered []releaseutil.Manifest) []releaseutil.Manifest { + byContent := make(map[string][]string, len(rendered)) + byIdentity := make(map[string][]string, len(rendered)) + for _, manifest := range rendered { + contentKey := normalizedManifestContent(manifest.Content) + byContent[contentKey] = append(byContent[contentKey], manifest.Name) + + identityKey := manifestIdentity(manifest) + byIdentity[identityKey] = append(byIdentity[identityKey], manifest.Name) + } + + out := make([]releaseutil.Manifest, len(stored)) + copy(out, stored) + + for i, manifest := range out { + contentKey := normalizedManifestContent(manifest.Content) + if names := byContent[contentKey]; len(names) > 0 { + out[i].Name = names[0] + byContent[contentKey] = names[1:] + continue + } + + identityKey := manifestIdentity(manifest) + if names := byIdentity[identityKey]; len(names) > 0 { + out[i].Name = names[0] + byIdentity[identityKey] = names[1:] + } + } + + return out +} + +func manifestIdentity(manifest releaseutil.Manifest) string { + if manifest.Head == nil || manifest.Head.Metadata == nil { + return normalizedManifestContent(manifest.Content) + } + return fmt.Sprintf("%s/%s/%s", manifest.Head.Version, manifest.Head.Kind, manifest.Head.Metadata.Name) +} + +func normalizedManifestContent(content string) string { + return strings.TrimSpace(content) } func parseCascadingFlag(cascadingFlag string) v1.DeletionPropagation { diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index 0f360fe37..7d0c0c2f1 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -70,6 +70,10 @@ type Upgrade struct { SkipCRDs bool // Timeout is the timeout for this operation Timeout time.Duration + // ReadinessTimeout is the per-batch timeout when --wait=ordered is used. + // Each ordered batch waits at most this long for resources to become ready. + // Must not exceed Timeout. Defaults to 1 minute when zero. + ReadinessTimeout time.Duration // WaitStrategy determines what type of waiting should be done WaitStrategy kube.WaitStrategy // WaitOptions are additional options for waiting on resources @@ -183,12 +187,16 @@ func (u *Upgrade) RunWithContext(ctx context.Context, name string, ch chart.Char u.WaitStrategy = kube.StatusWatcherStrategy } + if u.ReadinessTimeout > 0 && u.Timeout > 0 && u.ReadinessTimeout > u.Timeout { + return nil, fmt.Errorf("--readiness-timeout (%s) must not exceed --timeout (%s)", u.ReadinessTimeout, u.Timeout) + } + if err := chartutil.ValidateReleaseName(name); err != nil { return nil, fmt.Errorf("release name is invalid: %s", name) } u.cfg.Logger().Debug("preparing upgrade", "name", name) - currentRelease, upgradedRelease, serverSideApply, err := u.prepareUpgrade(name, chrt, vals) + currentRelease, upgradedRelease, serverSideApply, sortedManifests, err := u.prepareUpgrade(name, chrt, vals) if err != nil { return nil, err } @@ -196,7 +204,12 @@ func (u *Upgrade) RunWithContext(ctx context.Context, name string, ch chart.Char u.cfg.Releases.MaxHistory = u.MaxHistory u.cfg.Logger().Debug("performing update", "name", name) - res, err := u.performUpgrade(ctx, currentRelease, upgradedRelease, serverSideApply) + var res *release.Release + if u.WaitStrategy == kube.OrderedWaitStrategy { + res, err = u.performSequencedUpgrade(ctx, chrt, currentRelease, upgradedRelease, sortedManifests, serverSideApply) + } else { + res, err = u.performUpgrade(ctx, currentRelease, upgradedRelease, serverSideApply) + } if err != nil { return res, err } @@ -213,14 +226,16 @@ func (u *Upgrade) RunWithContext(ctx context.Context, name string, ch chart.Char } // prepareUpgrade builds an upgraded release for an upgrade operation. -func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[string]any) (*release.Release, *release.Release, bool, error) { +// When WaitStrategy is OrderedWaitStrategy, sortedManifests contains the parsed +// manifests with annotation data needed for DAG sequencing; otherwise it is nil. +func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[string]any) (*release.Release, *release.Release, bool, []releaseutil.Manifest, error) { if chart == nil { - return nil, nil, false, errMissingChart + return nil, nil, false, nil, errMissingChart } // HideSecret must be used with dry run. Otherwise, return an error. if !isDryRun(u.DryRunStrategy) && u.HideSecret { - return nil, nil, false, errors.New("hiding Kubernetes secrets requires a dry-run mode") + return nil, nil, false, nil, errors.New("hiding Kubernetes secrets requires a dry-run mode") } // finds the last non-deleted release with the given name @@ -228,19 +243,19 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[str if err != nil { // to keep existing behavior of returning the "%q has no deployed releases" error when an existing release does not exist if errors.Is(err, driver.ErrReleaseNotFound) { - return nil, nil, false, driver.NewErrNoDeployedReleases(name) + return nil, nil, false, nil, driver.NewErrNoDeployedReleases(name) } - return nil, nil, false, err + return nil, nil, false, nil, err } lastRelease, err := releaserToV1Release(lastReleasei) if err != nil { - return nil, nil, false, err + return nil, nil, false, nil, err } // Concurrent `helm upgrade`s will either fail here with `errPending` or when creating the release with "already exists". This should act as a pessimistic lock. if lastRelease.Info.Status.IsPending() { - return nil, nil, false, errPending + return nil, nil, false, nil, errPending } var currentRelease *release.Release @@ -253,14 +268,14 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[str var cerr error currentRelease, cerr = releaserToV1Release(currentReleasei) if cerr != nil { - return nil, nil, false, err + return nil, nil, false, nil, err } if err != nil { if errors.Is(err, driver.ErrNoDeployedReleases) && (lastRelease.Info.Status == rcommon.StatusFailed || lastRelease.Info.Status == rcommon.StatusSuperseded) { currentRelease = lastRelease } else { - return nil, nil, false, err + return nil, nil, false, nil, err } } @@ -269,11 +284,11 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[str // determine if values will be reused vals, err = u.reuseValues(chart, currentRelease, vals) if err != nil { - return nil, nil, false, err + return nil, nil, false, nil, err } if err := chartutil.ProcessDependencies(chart, vals); err != nil { - return nil, nil, false, err + return nil, nil, false, nil, err } // Increment revision count. This is passed to templates, and also stored on @@ -289,25 +304,33 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[str caps, err := u.cfg.getCapabilities() if err != nil { - return nil, nil, false, err + return nil, nil, false, nil, err } valuesToRender, err := util.ToRenderValuesWithSchemaValidation(chart, vals, options, caps, u.SkipSchemaValidation) if err != nil { - return nil, nil, false, err + return nil, nil, false, nil, err } - hooks, manifestDoc, notesTxt, err := u.cfg.renderResources(chart, valuesToRender, "", "", u.SubNotes, false, false, u.PostRenderer, interactWithServer(u.DryRunStrategy), u.EnableDNS, u.HideSecret) + var sortedManifests []releaseutil.Manifest + var hooks []*release.Hook + var manifestDoc *bytes.Buffer + var notesTxt string + if u.WaitStrategy == kube.OrderedWaitStrategy { + hooks, manifestDoc, notesTxt, sortedManifests, err = u.cfg.renderResourcesWithFiles(chart, valuesToRender, "", "", u.SubNotes, false, false, u.PostRenderer, interactWithServer(u.DryRunStrategy), u.EnableDNS, u.HideSecret) + } else { + hooks, manifestDoc, notesTxt, err = u.cfg.renderResources(chart, valuesToRender, "", "", u.SubNotes, false, false, u.PostRenderer, interactWithServer(u.DryRunStrategy), u.EnableDNS, u.HideSecret) + } if err != nil { - return nil, nil, false, err + return nil, nil, false, nil, err } if driver.ContainsSystemLabels(u.Labels) { - return nil, nil, false, fmt.Errorf("user supplied labels contains system reserved label name. System labels: %+v", driver.GetSystemLabels()) + return nil, nil, false, nil, fmt.Errorf("user supplied labels contains system reserved label name. System labels: %+v", driver.GetSystemLabels()) } serverSideApply, err := getUpgradeServerSideValue(u.ServerSideApply, lastRelease.ApplyMethod) if err != nil { - return nil, nil, false, err + return nil, nil, false, nil, err } u.cfg.Logger().Debug("determined release apply method", slog.Bool("server_side_apply", serverSideApply), slog.String("previous_release_apply_method", lastRelease.ApplyMethod)) @@ -335,7 +358,7 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[str upgradedRelease.Info.Notes = notesTxt } err = validateManifest(u.cfg.KubeClient, manifestDoc.Bytes(), !u.DisableOpenAPIValidation) - return currentRelease, upgradedRelease, serverSideApply, err + return currentRelease, upgradedRelease, serverSideApply, sortedManifests, err } func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedRelease *release.Release, serverSideApply bool) (*release.Release, error) { @@ -462,6 +485,12 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele u.cfg.Logger().Debug("upgrade hooks disabled", "name", upgradedRelease.Name) } + // Strip Helm-internal sequencing annotations before applying to K8s. + if err := stripSequencingAnnotations(target); err != nil { + u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("stripping sequencing annotations: %w", err)) + return + } + upgradeClientSideFieldManager := isReleaseApplyMethodClientSideApply(originalRelease.ApplyMethod) && serverSideApply // Update client-side field manager if transitioning from client-side to server-side apply results, err := u.cfg.KubeClient.Update( current, @@ -520,6 +549,123 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele u.reportToPerformUpgrade(c, upgradedRelease, nil, nil) } +// performSequencedUpgrade deploys chart resources in DAG-ordered batches when +// --wait=ordered is used. It mirrors releasingUpgrade but uses sequencedDeployment +// to apply each batch and wait for readiness before proceeding to the next batch. +func (u *Upgrade) performSequencedUpgrade(ctx context.Context, chrt *chartv2.Chart, currentRelease, upgradedRelease *release.Release, manifests []releaseutil.Manifest, serverSideApply bool) (*release.Release, error) { + // Build the full set of current (old) resources for diff matching. + current, err := u.cfg.KubeClient.Build(bytes.NewBufferString(currentRelease.Manifest), false) + if err != nil { + if strings.Contains(err.Error(), "unable to recognize \"\": no matches for kind") { + return upgradedRelease, fmt.Errorf("current release manifest contains removed kubernetes api(s) for this "+ + "kubernetes version and it is therefore unable to build the kubernetes "+ + "objects for performing the diff. error from kubernetes: %w", err) + } + return upgradedRelease, fmt.Errorf("unable to build kubernetes objects from current release manifest: %w", err) + } + + // Build target to compute the set of resources to delete (those removed in the new release). + target, err := u.cfg.KubeClient.Build(bytes.NewBufferString(upgradedRelease.Manifest), !u.DisableOpenAPIValidation) + if err != nil { + return upgradedRelease, fmt.Errorf("unable to build kubernetes objects from new release manifest: %w", err) + } + + if isDryRun(u.DryRunStrategy) { + u.cfg.Logger().Debug("dry run for release", "name", upgradedRelease.Name) + if len(u.Description) > 0 { + upgradedRelease.Info.Description = u.Description + } else { + upgradedRelease.Info.Description = "Dry run complete" + } + return upgradedRelease, nil + } + + u.cfg.Logger().Debug("creating upgraded release", "name", upgradedRelease.Name) + if err := u.cfg.Releases.Create(upgradedRelease); err != nil { + return nil, err + } + + // pre-upgrade hooks + if !u.DisableHooks { + if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.WaitStrategy, u.WaitOptions, u.Timeout, serverSideApply); err != nil { + return u.failRelease(upgradedRelease, nil, fmt.Errorf("pre-upgrade hooks failed: %s", err)) + } + } else { + u.cfg.Logger().Debug("upgrade hooks disabled", "name", upgradedRelease.Name) + } + + upgradeCSAFieldManager := isReleaseApplyMethodClientSideApply(currentRelease.ApplyMethod) && serverSideApply + + readinessTimeout := u.ReadinessTimeout + if readinessTimeout <= 0 { + readinessTimeout = time.Minute + } + + sd := &sequencedDeployment{ + cfg: u.cfg, + releaseName: upgradedRelease.Name, + releaseNamespace: upgradedRelease.Namespace, + disableOpenAPI: u.DisableOpenAPIValidation, + serverSideApply: serverSideApply, + forceConflicts: u.ForceConflicts, + forceReplace: u.ForceReplace, + waitStrategy: u.WaitStrategy, + waitOptions: u.WaitOptions, + waitForJobs: u.WaitForJobs, + timeout: u.Timeout, + readinessTimeout: readinessTimeout, + deadline: computeDeadline(u.Timeout), + upgradeMode: true, + currentResources: current, + upgradeCSAFieldManager: upgradeCSAFieldManager, + } + + // Set SequencingInfo before deployment so that failure recovery can use sequenced order. + upgradedRelease.SequencingInfo = &release.SequencingInfo{ + Enabled: true, + Strategy: string(u.WaitStrategy), + } + + if err := sd.deployChartLevel(ctx, chrt, manifests); err != nil { + return u.failRelease(upgradedRelease, sd.createdResources, err) + } + + // Delete resources that were removed in the new release (in old but not in new). + allNewKeys := make(map[string]bool, len(target)) + for _, r := range target { + allNewKeys[objectKey(r)] = true + } + var toBeDeleted kube.ResourceList + for _, r := range current { + if !allNewKeys[objectKey(r)] { + toBeDeleted = append(toBeDeleted, r) + } + } + if len(toBeDeleted) > 0 { + if _, errs := u.cfg.KubeClient.Delete(toBeDeleted, metav1.DeletePropagationBackground); errs != nil { + return u.failRelease(upgradedRelease, sd.createdResources, fmt.Errorf("deleting removed resources: %w", joinErrors(errs, ", "))) + } + } + + // post-upgrade hooks + if !u.DisableHooks { + if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.WaitStrategy, u.WaitOptions, u.Timeout, serverSideApply); err != nil { + return u.failRelease(upgradedRelease, sd.createdResources, fmt.Errorf("post-upgrade hooks failed: %s", err)) + } + } + + currentRelease.Info.Status = rcommon.StatusSuperseded + u.cfg.recordRelease(currentRelease) + upgradedRelease.Info.Status = rcommon.StatusDeployed + if len(u.Description) > 0 { + upgradedRelease.Info.Description = u.Description + } else { + upgradedRelease.Info.Description = "Upgrade complete" + } + u.cfg.recordRelease(upgradedRelease) + return upgradedRelease, nil +} + func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, err error) (*release.Release, error) { msg := fmt.Sprintf("Upgrade %q failed: %s", rel.Name, err) u.cfg.Logger().Warn(