feat(hip-0025): sequence install upgrade rollback and uninstall

Signed-off-by: Rohit Gudi <50377477+caretak3r@users.noreply.github.com>
pull/32038/head
Rohit Gudi 3 days ago
parent 45ceb428d2
commit 95f0b469e9
No known key found for this signature in database
GPG Key ID: F9223D6BD3386ABB

@ -238,17 +238,24 @@ func splitAndDeannotate(postrendered string) (map[string]string, error) {
//
// This code has to do with writing files to disk.
func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values, releaseName, outputDir string, subNotes, useReleaseName, includeCrds bool, pr postrenderer.PostRenderer, interactWithRemote, enableDNS, hideSecret bool) ([]*release.Hook, *bytes.Buffer, string, error) {
hs, b, notes, _, err := cfg.renderResourcesWithFiles(ch, values, releaseName, outputDir, subNotes, useReleaseName, includeCrds, pr, interactWithRemote, enableDNS, hideSecret)
return hs, b, notes, err
}
// renderResourcesWithFiles is the canonical render implementation, also returning the
// sorted manifest slice for sequencing-aware actions.
func (cfg *Configuration) renderResourcesWithFiles(ch *chart.Chart, values common.Values, releaseName, outputDir string, subNotes, useReleaseName, includeCrds bool, pr postrenderer.PostRenderer, interactWithRemote, enableDNS, hideSecret bool) ([]*release.Hook, *bytes.Buffer, string, []releaseutil.Manifest, error) {
var hs []*release.Hook
b := bytes.NewBuffer(nil)
caps, err := cfg.getCapabilities()
if err != nil {
return hs, b, "", err
return hs, b, "", nil, err
}
if ch.Metadata.KubeVersion != "" {
if !chartutil.IsCompatibleRange(ch.Metadata.KubeVersion, caps.KubeVersion.String()) {
return hs, b, "", fmt.Errorf("chart requires kubeVersion: %s which is incompatible with Kubernetes %s", ch.Metadata.KubeVersion, caps.KubeVersion.Version)
return hs, b, "", nil, fmt.Errorf("chart requires kubeVersion: %s which is incompatible with Kubernetes %s", ch.Metadata.KubeVersion, caps.KubeVersion.Version)
}
}
@ -261,7 +268,7 @@ func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values,
if interactWithRemote && cfg.RESTClientGetter != nil {
restConfig, err := cfg.RESTClientGetter.ToRESTConfig()
if err != nil {
return hs, b, "", err
return hs, b, "", nil, err
}
e := engine.New(restConfig)
e.EnableDNS = enableDNS
@ -277,7 +284,7 @@ func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values,
}
if err2 != nil {
return hs, b, "", err2
return hs, b, "", nil, err2
}
// NOTES.txt gets rendered like all the other files, but because it's not a hook nor a resource,
@ -311,19 +318,19 @@ func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values,
// Merge files as stream of documents for sending to post renderer
merged, err := annotateAndMerge(files)
if err != nil {
return hs, b, notes, fmt.Errorf("error merging manifests: %w", err)
return hs, b, notes, nil, fmt.Errorf("error merging manifests: %w", err)
}
// Run the post renderer
postRendered, err := pr.Run(bytes.NewBufferString(merged))
if err != nil {
return hs, b, notes, fmt.Errorf("error while running post render on files: %w", err)
return hs, b, notes, nil, fmt.Errorf("error while running post render on files: %w", err)
}
// Use the file list and contents received from the post renderer
files, err = splitAndDeannotate(postRendered.String())
if err != nil {
return hs, b, notes, fmt.Errorf("error while parsing post rendered output: %w", err)
return hs, b, notes, nil, fmt.Errorf("error while parsing post rendered output: %w", err)
}
}
@ -343,7 +350,7 @@ func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values,
}
fmt.Fprintf(b, "---\n# Source: %s\n%s\n", name, content)
}
return hs, b, "", err
return hs, b, "", nil, err
}
// Aggregate all valid manifests into one big doc.
@ -356,7 +363,7 @@ func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values,
} else {
err = writeToFile(outputDir, crd.Filename, string(crd.File.Data[:]), fileWritten[crd.Filename])
if err != nil {
return hs, b, "", err
return hs, b, "", nil, err
}
fileWritten[crd.Filename] = true
}
@ -381,13 +388,13 @@ func (cfg *Configuration) renderResources(ch *chart.Chart, values common.Values,
// used by install or upgrade
err = writeToFile(newDir, m.Name, m.Content, fileWritten[m.Name])
if err != nil {
return hs, b, "", err
return hs, b, "", nil, err
}
fileWritten[m.Name] = true
}
}
return hs, b, notes, nil
return hs, b, notes, manifests, nil
}
// RESTClientGetter gets the rest client

@ -100,6 +100,10 @@ type Install struct {
Devel bool
DependencyUpdate bool
Timeout time.Duration
// ReadinessTimeout is the per-batch timeout when --wait=ordered is used.
// Each ordered batch waits at most this long for resources to become ready.
// Must not exceed Timeout. Defaults to 1 minute when zero.
ReadinessTimeout time.Duration
Namespace string
ReleaseName string
GenerateName bool
@ -305,6 +309,10 @@ func (i *Install) RunWithContext(ctx context.Context, ch ci.Charter, vals map[st
return nil, fmt.Errorf("release name check failed: %w", err)
}
if i.ReadinessTimeout > 0 && i.Timeout > 0 && i.ReadinessTimeout > i.Timeout {
return nil, fmt.Errorf("--readiness-timeout (%s) must not exceed --timeout (%s)", i.ReadinessTimeout, i.Timeout)
}
if err := chartutil.ProcessDependencies(chrt, vals); err != nil {
i.cfg.Logger().Error("chart dependencies processing failed", slog.Any("error", err))
return nil, fmt.Errorf("chart dependencies processing failed: %w", err)
@ -369,8 +377,13 @@ func (i *Install) RunWithContext(ctx context.Context, ch ci.Charter, vals map[st
rel := i.createRelease(chrt, vals, i.Labels)
var sortedManifests []releaseutil.Manifest
var manifestDoc *bytes.Buffer
rel.Hooks, manifestDoc, rel.Info.Notes, err = i.cfg.renderResources(chrt, valuesToRender, i.ReleaseName, i.OutputDir, i.SubNotes, i.UseReleaseName, i.IncludeCRDs, i.PostRenderer, interactWithServer(i.DryRunStrategy), i.EnableDNS, i.HideSecret)
if i.WaitStrategy == kube.OrderedWaitStrategy {
rel.Hooks, manifestDoc, rel.Info.Notes, sortedManifests, err = i.cfg.renderResourcesWithFiles(chrt, valuesToRender, i.ReleaseName, i.OutputDir, i.SubNotes, i.UseReleaseName, i.IncludeCRDs, i.PostRenderer, interactWithServer(i.DryRunStrategy), i.EnableDNS, i.HideSecret)
} else {
rel.Hooks, manifestDoc, rel.Info.Notes, err = i.cfg.renderResources(chrt, valuesToRender, i.ReleaseName, i.OutputDir, i.SubNotes, i.UseReleaseName, i.IncludeCRDs, i.PostRenderer, interactWithServer(i.DryRunStrategy), i.EnableDNS, i.HideSecret)
}
// Even for errors, attach this if available
if manifestDoc != nil {
rel.Manifest = manifestDoc.String()
@ -464,7 +477,11 @@ func (i *Install) RunWithContext(ctx context.Context, ch ci.Charter, vals map[st
return rel, err
}
rel, err = i.performInstallCtx(ctx, rel, toBeAdopted, resources)
if i.WaitStrategy == kube.OrderedWaitStrategy {
rel, err = i.performSequencedInstallCtx(ctx, chrt, rel, sortedManifests)
} else {
rel, err = i.performInstallCtx(ctx, rel, toBeAdopted, resources)
}
if err != nil {
rel, err = i.failRelease(rel, err)
}
@ -493,6 +510,89 @@ func (i *Install) performInstallCtx(ctx context.Context, rel *release.Release, t
}
}
// performSequencedInstallCtx runs performSequencedInstall in a goroutine and
// respects context cancellation, mirroring performInstallCtx.
func (i *Install) performSequencedInstallCtx(ctx context.Context, chrt *chart.Chart, rel *release.Release, manifests []releaseutil.Manifest) (*release.Release, error) {
type Msg struct {
r *release.Release
e error
}
resultChan := make(chan Msg, 1)
go func() {
i.goroutineCount.Add(1)
rel, err := i.performSequencedInstall(ctx, chrt, rel, manifests)
resultChan <- Msg{rel, err}
i.goroutineCount.Add(-1)
}()
select {
case <-ctx.Done():
return rel, ctx.Err()
case msg := <-resultChan:
return msg.r, msg.e
}
}
// performSequencedInstall deploys chart resources in DAG-ordered batches when
// --wait=ordered is active.
func (i *Install) performSequencedInstall(ctx context.Context, chrt *chart.Chart, rel *release.Release, manifests []releaseutil.Manifest) (*release.Release, error) {
if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPreInstall, i.WaitStrategy, i.WaitOptions, i.Timeout, i.ServerSideApply); err != nil {
return rel, fmt.Errorf("failed pre-install: %w", err)
}
}
readinessTimeout := i.ReadinessTimeout
if readinessTimeout <= 0 {
readinessTimeout = time.Minute
}
rel.SequencingInfo = &release.SequencingInfo{
Enabled: true,
Strategy: string(i.WaitStrategy),
}
sd := &sequencedDeployment{
cfg: i.cfg,
releaseName: rel.Name,
releaseNamespace: rel.Namespace,
disableOpenAPI: i.DisableOpenAPIValidation,
serverSideApply: i.ServerSideApply,
forceConflicts: i.ForceConflicts,
forceReplace: i.ForceReplace,
waitStrategy: i.WaitStrategy,
waitOptions: i.WaitOptions,
waitForJobs: i.WaitForJobs,
timeout: i.Timeout,
readinessTimeout: readinessTimeout,
deadline: computeDeadline(i.Timeout),
}
if err := sd.deployChartLevel(ctx, chrt, manifests); err != nil {
return rel, err
}
if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPostInstall, i.WaitStrategy, i.WaitOptions, i.Timeout, i.ServerSideApply); err != nil {
return rel, fmt.Errorf("failed post-install: %w", err)
}
}
if len(i.Description) > 0 {
rel.SetStatus(rcommon.StatusDeployed, i.Description)
} else {
rel.SetStatus(rcommon.StatusDeployed, "Install complete")
}
if err := i.recordRelease(rel); err != nil {
i.cfg.Logger().Error("failed to record the release", slog.Any("error", err))
}
return rel, nil
}
// getGoroutineCount return the number of running routines
func (i *Install) getGoroutineCount() int32 {
return i.goroutineCount.Load()
@ -507,6 +607,10 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource
}
}
if err := stripSequencingAnnotations(resources); err != nil {
return rel, fmt.Errorf("stripping sequencing annotations: %w", err)
}
// At this point, we can do the install. Note that before we were detecting whether to
// do an update, but it's not clear whether we WANT to do an update if the reuse is set
// to true, since that is basically an upgrade operation.

@ -18,6 +18,7 @@ package action
import (
"bytes"
"context"
"errors"
"fmt"
"time"
@ -28,6 +29,7 @@ import (
"helm.sh/helm/v4/pkg/kube"
"helm.sh/helm/v4/pkg/release/common"
release "helm.sh/helm/v4/pkg/release/v1"
releaseutil "helm.sh/helm/v4/pkg/release/v1/util"
"helm.sh/helm/v4/pkg/storage/driver"
)
@ -92,7 +94,7 @@ func (r *Rollback) Run(name string) error {
}
r.cfg.Logger().Debug("performing rollback", "name", name)
if _, err := r.performRollback(currentRelease, targetRelease, serverSideApply); err != nil {
if _, err := r.performRollback(context.Background(), currentRelease, targetRelease, serverSideApply); err != nil {
return err
}
@ -184,22 +186,27 @@ func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Rele
// message here, and only override it later if we experience failure.
Description: fmt.Sprintf("Rollback to %d", previousVersion),
},
Version: currentRelease.Version + 1,
Labels: previousRelease.Labels,
Manifest: previousRelease.Manifest,
Hooks: previousRelease.Hooks,
ApplyMethod: string(determineReleaseSSApplyMethod(serverSideApply)),
Version: currentRelease.Version + 1,
Labels: previousRelease.Labels,
Manifest: previousRelease.Manifest,
Hooks: previousRelease.Hooks,
ApplyMethod: string(determineReleaseSSApplyMethod(serverSideApply)),
SequencingInfo: previousRelease.SequencingInfo,
}
return currentRelease, targetRelease, serverSideApply, nil
}
func (r *Rollback) performRollback(currentRelease, targetRelease *release.Release, serverSideApply bool) (*release.Release, error) {
func (r *Rollback) performRollback(ctx context.Context, currentRelease, targetRelease *release.Release, serverSideApply bool) (*release.Release, error) {
if isDryRun(r.DryRunStrategy) {
r.cfg.Logger().Debug("dry run", "name", targetRelease.Name)
return targetRelease, nil
}
if targetRelease.SequencingInfo != nil && targetRelease.SequencingInfo.Enabled {
return r.performSequencedRollback(ctx, currentRelease, targetRelease, serverSideApply)
}
current, err := r.cfg.KubeClient.Build(bytes.NewBufferString(currentRelease.Manifest), false)
if err != nil {
return targetRelease, fmt.Errorf("unable to build kubernetes objects from current release manifest: %w", err)
@ -304,3 +311,131 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
return targetRelease, nil
}
func (r *Rollback) failRollback(currentRelease, targetRelease *release.Release, created kube.ResourceList, err error) (*release.Release, error) {
msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err)
r.cfg.Logger().Warn(msg)
currentRelease.Info.Status = common.StatusSuperseded
targetRelease.Info.Status = common.StatusFailed
targetRelease.Info.Description = msg
r.cfg.recordRelease(currentRelease)
r.cfg.recordRelease(targetRelease)
if r.CleanupOnFail && len(created) > 0 {
r.cfg.Logger().Debug("cleanup on fail set, cleaning up resources", "count", len(created))
_, errs := r.cfg.KubeClient.Delete(created, metav1.DeletePropagationBackground)
if errs != nil {
return targetRelease, fmt.Errorf(
"an error occurred while cleaning up resources after rollback failure: %v: %w",
err,
joinErrors(errs, ", "),
)
}
r.cfg.Logger().Debug("resource cleanup complete")
}
return targetRelease, err
}
// performSequencedRollback deploys the target revision's resources in DAG-ordered batches
// when the target revision was originally deployed with --wait=ordered.
func (r *Rollback) performSequencedRollback(ctx context.Context, currentRelease, targetRelease *release.Release, serverSideApply bool) (*release.Release, error) {
fail := func(created kube.ResourceList, err error) (*release.Release, error) {
return r.failRollback(currentRelease, targetRelease, created, err)
}
current, err := r.cfg.KubeClient.Build(bytes.NewBufferString(currentRelease.Manifest), false)
if err != nil {
return fail(nil, fmt.Errorf("unable to build kubernetes objects from current release manifest: %w", err))
}
target, err := r.cfg.KubeClient.Build(bytes.NewBufferString(targetRelease.Manifest), false)
if err != nil {
return fail(nil, fmt.Errorf("unable to build kubernetes objects from target release manifest: %w", err))
}
rawManifests := releaseutil.SplitManifests(targetRelease.Manifest)
_, sortedManifests, err := releaseutil.SortManifests(rawManifests, nil, releaseutil.InstallOrder)
if err != nil {
return fail(nil, fmt.Errorf("parsing target release manifest for sequenced rollback: %w", err))
}
if !r.DisableHooks {
if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.WaitStrategy, r.WaitOptions, r.Timeout, serverSideApply); err != nil {
return fail(nil, err)
}
} else {
r.cfg.Logger().Debug("rollback hooks disabled", "name", targetRelease.Name)
}
readinessTimeout := r.Timeout / 2
if readinessTimeout <= 0 {
readinessTimeout = time.Minute
}
sd := &sequencedDeployment{
cfg: r.cfg,
releaseName: targetRelease.Name,
releaseNamespace: targetRelease.Namespace,
serverSideApply: serverSideApply,
forceConflicts: r.ForceConflicts,
forceReplace: r.ForceReplace,
waitStrategy: r.WaitStrategy,
waitOptions: r.WaitOptions,
waitForJobs: r.WaitForJobs,
timeout: r.Timeout,
readinessTimeout: readinessTimeout,
deadline: computeDeadline(r.Timeout),
upgradeMode: true,
currentResources: current,
upgradeCSAFieldManager: true,
}
if err := sd.deployChartLevel(ctx, targetRelease.Chart, sortedManifests); err != nil {
return fail(sd.createdResources, err)
}
allTargetKeys := make(map[string]bool, len(target))
for _, resource := range target {
allTargetKeys[objectKey(resource)] = true
}
var toBeDeleted kube.ResourceList
for _, resource := range current {
if !allTargetKeys[objectKey(resource)] {
toBeDeleted = append(toBeDeleted, resource)
}
}
if len(toBeDeleted) > 0 {
if _, errs := r.cfg.KubeClient.Delete(toBeDeleted, metav1.DeletePropagationBackground); errs != nil {
return fail(sd.createdResources, fmt.Errorf("deleting removed resources during rollback: %w", joinErrors(errs, ", ")))
}
}
if !r.DisableHooks {
if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.WaitStrategy, r.WaitOptions, r.Timeout, serverSideApply); err != nil {
return fail(sd.createdResources, err)
}
}
deployed, err := r.cfg.Releases.DeployedAll(currentRelease.Name)
if err != nil && !errors.Is(err, driver.ErrNoDeployedReleases) {
return fail(sd.createdResources, err)
}
for _, reli := range deployed {
rel, err := releaserToV1Release(reli)
if err != nil {
return fail(sd.createdResources, err)
}
r.cfg.Logger().Debug("superseding previous deployment", "version", rel.Version)
rel.Info.Status = common.StatusSuperseded
r.cfg.recordRelease(rel)
}
targetRelease.Info.Status = common.StatusDeployed
return targetRelease, nil
}

@ -0,0 +1,440 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package action
import (
"bytes"
"context"
"fmt"
"log/slog"
"strings"
"time"
chartv2 "helm.sh/helm/v4/pkg/chart/v2"
chartutil "helm.sh/helm/v4/pkg/chart/v2/util"
"helm.sh/helm/v4/pkg/kube"
releaseutil "helm.sh/helm/v4/pkg/release/v1/util"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/cli-runtime/pkg/resource"
)
// computeDeadline returns a deadline based on the given timeout duration.
// If timeout is zero or negative, it returns the zero time (no deadline).
func computeDeadline(timeout time.Duration) time.Time {
if timeout <= 0 {
return time.Time{}
}
return time.Now().Add(timeout)
}
// GroupManifestsByDirectSubchart groups manifests by the direct subchart they belong to.
// The current chart's own manifests are returned under the empty string key "".
// Subcharts are keyed by their immediate name under the first `<chartName>/charts/<subchart>/`
// segment found in the manifest source path.
// Nested subcharts (e.g., `<chartName>/charts/sub/charts/nested/`) are grouped under
// the direct subchart name ("sub"), since nested sequencing is handled recursively.
func GroupManifestsByDirectSubchart(manifests []releaseutil.Manifest, chartName string) map[string][]releaseutil.Manifest {
result := make(map[string][]releaseutil.Manifest)
if chartName == "" {
// Fallback: assign everything to parent
result[""] = append(result[""], manifests...)
return result
}
chartsPrefix := chartName + "/charts/"
for _, m := range manifests {
idx := strings.Index(m.Name, chartsPrefix)
if idx < 0 {
// Parent chart manifest
result[""] = append(result[""], m)
continue
}
// Extract the direct subchart name (first segment after "<chartName>/charts/")
rest := m.Name[idx+len(chartsPrefix):]
// rest is like "subchart1/templates/deploy.yaml" or "subchart1/charts/nested/..."
slashIdx := strings.Index(rest, "/")
if slashIdx < 0 {
// Unlikely: a file directly under charts/ with no subdirectory
result[""] = append(result[""], m)
continue
}
subchartName := rest[:slashIdx]
result[subchartName] = append(result[subchartName], m)
}
return result
}
// buildManifestYAML concatenates the Content fields of the given manifests into a single
// YAML stream suitable for passing to KubeClient.Build().
func buildManifestYAML(manifests []releaseutil.Manifest) string {
if len(manifests) == 0 {
return ""
}
var buf strings.Builder
for i, m := range manifests {
if i > 0 {
buf.WriteString("---\n")
}
buf.WriteString(m.Content)
buf.WriteString("\n")
}
return buf.String()
}
// sequencedDeployment performs ordered installation or upgrade of chart resources.
// It handles the two-level DAG: first subchart ordering, then resource-group
// ordering within each chart level.
type sequencedDeployment struct {
cfg *Configuration
releaseName string
releaseNamespace string
disableOpenAPI bool
serverSideApply bool
forceConflicts bool
forceReplace bool
waitStrategy kube.WaitStrategy
waitOptions []kube.WaitOption
waitForJobs bool
timeout time.Duration
readinessTimeout time.Duration
deadline time.Time // overall operation deadline
// Upgrade-specific fields. When upgradeMode is true, createAndWait delegates
// to updateAndWait which calls KubeClient.Update() instead of Create().
upgradeMode bool
currentResources kube.ResourceList // full set of old (current) resources
upgradeCSAFieldManager bool // upgrade client-side apply field manager
createdResources kube.ResourceList
}
// deployChartLevel deploys all resources for a single chart level in sequenced order.
// It first handles subcharts in dependency order (recursively), then deploys the
// parent chart's own resource-group batches.
func (s *sequencedDeployment) deployChartLevel(ctx context.Context, chrt *chartv2.Chart, manifests []releaseutil.Manifest) error {
// Group manifests by direct subchart
grouped := GroupManifestsByDirectSubchart(manifests, chrt.Name())
// Build subchart DAG and deploy in topological order
dag, err := chartutil.BuildSubchartDAG(chrt)
if err != nil {
return fmt.Errorf("building subchart DAG for %s: %w", chrt.Name(), err)
}
batches, err := dag.GetBatches()
if err != nil {
return fmt.Errorf("getting subchart batches for %s: %w", chrt.Name(), err)
}
// Deploy each subchart batch in order
for batchIdx, batch := range batches {
for _, subchartName := range batch {
subManifests := grouped[subchartName]
if len(subManifests) == 0 {
continue
}
// Find the subchart chart object for recursive nested sequencing
subChart := findSubchart(chrt, subchartName)
if subChart == nil {
// Subchart not found in chart object (may have been disabled or aliased differently)
// Fall back to flat resource-group deployment for these manifests
slog.Warn("subchart not found in chart dependencies; deploying without subchart sequencing",
"subchart", subchartName,
"batch", batchIdx,
)
if err := s.deployResourceGroupBatches(ctx, subManifests); err != nil {
return fmt.Errorf("deploying subchart %s resources: %w", subchartName, err)
}
continue
}
// Recursively deploy the subchart (handles its own nested subcharts and resource-groups)
if err := s.deployChartLevel(ctx, subChart, subManifests); err != nil {
return fmt.Errorf("deploying subchart %s: %w", subchartName, err)
}
}
}
// Deploy parent chart's own resources (after all subchart batches complete)
parentManifests := grouped[""]
if len(parentManifests) > 0 {
if err := s.deployResourceGroupBatches(ctx, parentManifests); err != nil {
return fmt.Errorf("deploying %s own resources: %w", chrt.Name(), err)
}
}
return nil
}
func batchHasCustomReadiness(manifests []releaseutil.Manifest) bool {
for _, manifest := range manifests {
if manifest.Head == nil || manifest.Head.Metadata == nil {
continue
}
annotations := manifest.Head.Metadata.Annotations
if annotations == nil {
continue
}
if annotations[kube.AnnotationReadinessSuccess] != "" || annotations[kube.AnnotationReadinessFailure] != "" {
return true
}
}
return false
}
// deployResourceGroupBatches deploys manifests for a single chart level using
// resource-group annotation DAG ordering. Resources without group annotations
// (or with invalid ones) are deployed last.
func (s *sequencedDeployment) deployResourceGroupBatches(ctx context.Context, manifests []releaseutil.Manifest) error {
if len(manifests) == 0 {
return nil
}
result, _, err := releaseutil.ParseResourceGroups(manifests)
if err != nil {
return fmt.Errorf("parsing resource-group annotations: %w", err)
}
// If there are sequenced groups, build their DAG and deploy in order
if len(result.Groups) > 0 {
dag, err := releaseutil.BuildResourceGroupDAG(result)
if err != nil {
return fmt.Errorf("building resource-group DAG: %w", err)
}
batches, err := dag.GetBatches()
if err != nil {
return fmt.Errorf("getting resource-group batches: %w", err)
}
for _, groupBatch := range batches {
var batchManifests []releaseutil.Manifest
for _, groupName := range groupBatch {
batchManifests = append(batchManifests, result.Groups[groupName]...)
}
if err := s.createAndWait(ctx, batchManifests); err != nil {
return err
}
}
}
// Deploy unsequenced resources last
if len(result.Unsequenced) > 0 {
if err := s.createAndWait(ctx, result.Unsequenced); err != nil {
return err
}
}
return nil
}
// helmSequencingAnnotations lists annotation keys used internally by Helm for
// resource sequencing. These are stripped from resources before applying to
// Kubernetes because some (e.g. helm.sh/depends-on/resource-groups) contain
// multiple slashes which is invalid per the K8s annotation key format.
var helmSequencingAnnotations = []string{
releaseutil.AnnotationDependsOnResourceGroups,
}
// stripSequencingAnnotations removes Helm-internal sequencing annotations from
// resources before they are applied to Kubernetes. This prevents K8s API
// validation errors for annotation keys that are not valid K8s label keys.
func stripSequencingAnnotations(resources kube.ResourceList) error {
return resources.Visit(func(info *resource.Info, err error) error {
if err != nil {
return err
}
acc, accErr := meta.Accessor(info.Object)
if accErr != nil {
return nil // skip non-accessible objects
}
annotations := acc.GetAnnotations()
if len(annotations) == 0 {
return nil
}
changed := false
for _, key := range helmSequencingAnnotations {
if _, exists := annotations[key]; exists {
delete(annotations, key)
changed = true
}
}
if changed {
acc.SetAnnotations(annotations)
}
return nil
})
}
// createAndWait creates (or updates, in upgrade mode) a set of manifest resources
// and waits for them to be ready. It respects both the per-batch readiness timeout
// and the overall operation deadline.
func (s *sequencedDeployment) createAndWait(ctx context.Context, manifests []releaseutil.Manifest) error {
if s.upgradeMode {
return s.updateAndWait(ctx, manifests)
}
if len(manifests) == 0 {
return nil
}
yaml := buildManifestYAML(manifests)
resources, err := s.cfg.KubeClient.Build(bytes.NewBufferString(yaml), !s.disableOpenAPI)
if err != nil {
return fmt.Errorf("building resource batch: %w", err)
}
if len(resources) == 0 {
return nil
}
if err := resources.Visit(setMetadataVisitor(s.releaseName, s.releaseNamespace, true)); err != nil {
return fmt.Errorf("setting metadata for resource batch: %w", err)
}
if err := stripSequencingAnnotations(resources); err != nil {
return fmt.Errorf("stripping sequencing annotations: %w", err)
}
result, err := s.cfg.KubeClient.Create(resources, kube.ClientCreateOptionServerSideApply(s.serverSideApply, false))
if err != nil {
return fmt.Errorf("creating resource batch: %w", err)
}
s.createdResources = append(s.createdResources, result.Created...)
return s.waitForResources(resources, manifests)
}
// updateAndWait applies an upgrade batch using KubeClient.Update() and waits for readiness.
// It matches current (old) resources by objectKey to compute the per-batch diff.
func (s *sequencedDeployment) updateAndWait(_ context.Context, manifests []releaseutil.Manifest) error {
if len(manifests) == 0 {
return nil
}
yaml := buildManifestYAML(manifests)
target, err := s.cfg.KubeClient.Build(bytes.NewBufferString(yaml), !s.disableOpenAPI)
if err != nil {
return fmt.Errorf("building resource batch: %w", err)
}
if len(target) == 0 {
return nil
}
if err := target.Visit(setMetadataVisitor(s.releaseName, s.releaseNamespace, true)); err != nil {
return fmt.Errorf("setting metadata for resource batch: %w", err)
}
if err := stripSequencingAnnotations(target); err != nil {
return fmt.Errorf("stripping sequencing annotations: %w", err)
}
// Find the subset of current (old) resources that are represented in this batch.
// Update() will handle creates (target resources not in matchingCurrent) and
// updates (resources in both). Deletions are handled separately after all batches.
targetKeys := make(map[string]bool, len(target))
for _, r := range target {
targetKeys[objectKey(r)] = true
}
var matchingCurrent kube.ResourceList
for _, r := range s.currentResources {
if targetKeys[objectKey(r)] {
matchingCurrent = append(matchingCurrent, r)
}
}
result, err := s.cfg.KubeClient.Update(
matchingCurrent,
target,
kube.ClientUpdateOptionForceReplace(s.forceReplace),
kube.ClientUpdateOptionServerSideApply(s.serverSideApply, s.forceConflicts),
kube.ClientUpdateOptionUpgradeClientSideFieldManager(s.upgradeCSAFieldManager),
)
if err != nil {
return fmt.Errorf("updating resource batch: %w", err)
}
s.createdResources = append(s.createdResources, result.Created...)
return s.waitForResources(target, manifests)
}
// waitForResources waits for the given resources to become ready,
// applying the per-batch and overall deadline constraints.
func (s *sequencedDeployment) waitForResources(resources kube.ResourceList, manifests []releaseutil.Manifest) error {
// Determine effective wait timeout: min(readinessTimeout, remaining time to overall deadline)
waitTimeout := s.readinessTimeout
if !s.deadline.IsZero() {
remaining := time.Until(s.deadline)
if remaining <= 0 {
return fmt.Errorf("overall timeout exceeded before waiting for resource batch")
}
if waitTimeout <= 0 || remaining < waitTimeout {
waitTimeout = remaining
}
}
if waitTimeout <= 0 {
waitTimeout = time.Minute // safe default
}
var err error
var waiter kube.Waiter
waitOptions := append([]kube.WaitOption(nil), s.waitOptions...)
if batchHasCustomReadiness(manifests) {
waitOptions = append(waitOptions, kube.WithCustomReadinessStatusReader())
}
if c, ok := s.cfg.KubeClient.(kube.InterfaceWaitOptions); ok {
waiter, err = c.GetWaiterWithOptions(s.waitStrategy, waitOptions...)
} else {
waiter, err = s.cfg.KubeClient.GetWaiter(s.waitStrategy)
}
if err != nil {
return fmt.Errorf("getting waiter for resource batch: %w", err)
}
if s.waitForJobs {
return waiter.WaitWithJobs(resources, waitTimeout)
}
return waiter.Wait(resources, waitTimeout)
}
// findSubchart finds the subchart chart object within chrt's direct dependencies by name or alias.
// It uses the parent chart's Metadata.Dependencies to resolve aliases, since the alias is stored
// on the Dependency struct in Chart.yaml, not on the subchart's own Metadata.
func findSubchart(chrt *chartv2.Chart, nameOrAlias string) *chartv2.Chart {
// Build a map from subchart chart name → alias from the parent's dependency declarations.
aliasMap := make(map[string]string) // chart name → effective name (alias or name)
if chrt.Metadata != nil {
for _, dep := range chrt.Metadata.Dependencies {
effective := dep.Name
if dep.Alias != "" {
effective = dep.Alias
}
aliasMap[dep.Name] = effective
}
}
for _, dep := range chrt.Dependencies() {
effective := dep.Name()
if alias, ok := aliasMap[dep.Name()]; ok {
effective = alias
}
if effective == nameOrAlias || dep.Name() == nameOrAlias {
return dep
}
}
return nil
}

@ -25,10 +25,13 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
chartcommon "helm.sh/helm/v4/pkg/chart/common"
renderutil "helm.sh/helm/v4/pkg/chart/common/util"
chart "helm.sh/helm/v4/pkg/chart/v2"
chartutil "helm.sh/helm/v4/pkg/chart/v2/util"
"helm.sh/helm/v4/pkg/kube"
releasei "helm.sh/helm/v4/pkg/release"
"helm.sh/helm/v4/pkg/release/common"
releasecommon "helm.sh/helm/v4/pkg/release/common"
release "helm.sh/helm/v4/pkg/release/v1"
releaseutil "helm.sh/helm/v4/pkg/release/v1/util"
"helm.sh/helm/v4/pkg/storage/driver"
@ -116,7 +119,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error)
// TODO: Are there any cases where we want to force a delete even if it's
// already marked deleted?
if rel.Info.Status == common.StatusUninstalled {
if rel.Info.Status == releasecommon.StatusUninstalled {
if !u.KeepHistory {
if err := u.purgeReleases(rels...); err != nil {
return nil, fmt.Errorf("uninstall: Failed to purge the release: %w", err)
@ -127,7 +130,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error)
}
u.cfg.Logger().Debug("uninstall: deleting release", "name", name)
rel.Info.Status = common.StatusUninstalling
rel.Info.Status = releasecommon.StatusUninstalling
rel.Info.Deleted = time.Now()
rel.Info.Description = "Deletion in progress (or silently failed)"
res := &releasei.UninstallReleaseResponse{Release: rel}
@ -147,7 +150,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error)
u.cfg.Logger().Debug("uninstall: Failed to store updated release", slog.Any("error", err))
}
deletedResources, kept, errs := u.deleteRelease(rel)
deletedResources, kept, errs := u.deleteRelease(rel, waiter)
if errs != nil {
u.cfg.Logger().Debug("uninstall: Failed to delete release", slog.Any("error", errs))
return nil, fmt.Errorf("failed to delete release: %s", name)
@ -158,8 +161,10 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error)
}
res.Info = kept
if err := waiter.WaitForDelete(deletedResources, u.Timeout); err != nil {
errs = append(errs, err)
if !isSequencedRelease(rel) {
if err := waiter.WaitForDelete(deletedResources, u.Timeout); err != nil {
errs = append(errs, err)
}
}
if !u.DisableHooks {
@ -169,7 +174,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error)
}
}
rel.Info.Status = common.StatusUninstalled
rel.Info.Status = releasecommon.StatusUninstalled
if len(u.Description) > 0 {
rel.Info.Description = u.Description
} else {
@ -208,7 +213,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error)
}
u.cfg.Logger().Debug("superseding previous deployment", "version", rel.Version)
rel.Info.Status = common.StatusSuperseded
rel.Info.Status = releasecommon.StatusSuperseded
if err := u.cfg.Releases.Update(rel); err != nil {
u.cfg.Logger().Debug("uninstall: Failed to store updated release", slog.Any("error", err))
}
@ -220,6 +225,10 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error)
return res, nil
}
func isSequencedRelease(rel *release.Release) bool {
return rel.SequencingInfo != nil && rel.SequencingInfo.Enabled
}
func (u *Uninstall) purgeReleases(rels ...*release.Release) error {
for _, rel := range rels {
if _, err := u.cfg.Releases.Delete(rel.Name, rel.Version); err != nil {
@ -253,10 +262,8 @@ func (e *joinedErrors) Unwrap() []error {
return e.errs
}
// deleteRelease deletes the release and returns list of delete resources and manifests that were kept in the deletion process
func (u *Uninstall) deleteRelease(rel *release.Release) (kube.ResourceList, string, []error) {
var errs []error
// deleteRelease deletes the release and returns list of delete resources and manifests that were kept in the deletion process.
func (u *Uninstall) deleteRelease(rel *release.Release, waiter kube.Waiter) (kube.ResourceList, string, []error) {
manifests := releaseutil.SplitManifests(rel.Manifest)
_, files, err := releaseutil.SortManifests(manifests, nil, releaseutil.UninstallOrder)
if err != nil {
@ -273,19 +280,247 @@ func (u *Uninstall) deleteRelease(rel *release.Release) (kube.ResourceList, stri
fmt.Fprintf(&kept, "[%s] %s\n", f.Head.Kind, f.Head.Metadata.Name)
}
if isSequencedRelease(rel) {
filesToDelete = u.recoverManifestPathsForSequencedUninstall(rel, filesToDelete)
deleted, errs := u.sequencedDeleteManifests(rel.Chart, filesToDelete, waiter)
return deleted, kept.String(), errs
}
resources, err := u.buildDeleteResources(filesToDelete)
if err != nil {
return nil, "", []error{err}
}
if len(resources) > 0 {
_, errs := u.cfg.KubeClient.Delete(resources, parseCascadingFlag(u.DeletionPropagation))
return resources, kept.String(), errs
}
return resources, kept.String(), nil
}
func (u *Uninstall) buildDeleteResources(manifests []releaseutil.Manifest) (kube.ResourceList, error) {
var builder strings.Builder
for _, file := range filesToDelete {
for _, file := range manifests {
builder.WriteString("\n---\n" + file.Content)
}
resources, err := u.cfg.KubeClient.Build(strings.NewReader(builder.String()), false)
if err != nil {
return nil, "", []error{fmt.Errorf("unable to build kubernetes objects for delete: %w", err)}
return nil, fmt.Errorf("unable to build kubernetes objects for delete: %w", err)
}
if len(resources) > 0 {
_, errs = u.cfg.KubeClient.Delete(resources, parseCascadingFlag(u.DeletionPropagation))
return resources, nil
}
func (u *Uninstall) deleteManifestBatch(manifests []releaseutil.Manifest, waiter kube.Waiter) (kube.ResourceList, []error) {
resources, err := u.buildDeleteResources(manifests)
if err != nil || len(resources) == 0 {
if err != nil {
return nil, []error{err}
}
return nil, nil
}
_, errs := u.cfg.KubeClient.Delete(resources, parseCascadingFlag(u.DeletionPropagation))
if len(errs) > 0 {
return resources, errs
}
if err := waiter.WaitForDelete(resources, u.Timeout); err != nil {
errs = append(errs, err)
}
return resources, errs
}
func (u *Uninstall) sequencedDeleteManifests(chrt *chart.Chart, manifests []releaseutil.Manifest, waiter kube.Waiter) (kube.ResourceList, []error) {
if chrt == nil {
return u.deleteResourceGroupBatchesReverse(manifests, waiter)
}
return u.deleteChartLevelReverse(chrt, manifests, waiter)
}
func (u *Uninstall) deleteChartLevelReverse(chrt *chart.Chart, manifests []releaseutil.Manifest, waiter kube.Waiter) (kube.ResourceList, []error) {
grouped := GroupManifestsByDirectSubchart(manifests, chrt.Name())
allDeleted, errs := u.deleteResourceGroupBatchesReverse(grouped[""], waiter)
if len(errs) > 0 {
return allDeleted, errs
}
dag, err := chartutil.BuildSubchartDAG(chrt)
if err != nil {
return allDeleted, []error{fmt.Errorf("building subchart DAG for sequenced uninstall: %w", err)}
}
batches, err := dag.GetBatches()
if err != nil {
return allDeleted, []error{fmt.Errorf("getting subchart batches for sequenced uninstall: %w", err)}
}
for i, j := 0, len(batches)-1; i < j; i, j = i+1, j-1 {
batches[i], batches[j] = batches[j], batches[i]
}
for _, batch := range batches {
for _, subchartName := range batch {
subchartManifests := grouped[subchartName]
if len(subchartManifests) == 0 {
continue
}
subchart := findSubchart(chrt, subchartName)
var deleted kube.ResourceList
if subchart == nil {
u.cfg.Logger().Warn("subchart not found in chart dependencies during sequenced uninstall; falling back to flat resource-group deletion", "subchart", subchartName)
deleted, errs = u.deleteResourceGroupBatchesReverse(subchartManifests, waiter)
} else {
deleted, errs = u.deleteChartLevelReverse(subchart, subchartManifests, waiter)
}
allDeleted = append(allDeleted, deleted...)
if len(errs) > 0 {
return allDeleted, errs
}
}
}
return allDeleted, nil
}
func (u *Uninstall) deleteResourceGroupBatchesReverse(manifests []releaseutil.Manifest, waiter kube.Waiter) (kube.ResourceList, []error) {
result, warnings, err := releaseutil.ParseResourceGroups(manifests)
if err != nil {
return nil, []error{fmt.Errorf("parsing resource-group annotations for sequenced uninstall: %w", err)}
}
for _, warning := range warnings {
u.cfg.Logger().Warn("resource-group annotation warning during uninstall", "warning", warning)
}
return resources, kept.String(), errs
var allDeleted kube.ResourceList
if len(result.Unsequenced) > 0 {
deleted, errs := u.deleteManifestBatch(result.Unsequenced, waiter)
allDeleted = append(allDeleted, deleted...)
if len(errs) > 0 {
return allDeleted, errs
}
}
if len(result.Groups) == 0 {
return allDeleted, nil
}
dag, err := releaseutil.BuildResourceGroupDAG(result)
if err != nil {
return allDeleted, []error{fmt.Errorf("building resource-group DAG for sequenced uninstall: %w", err)}
}
batches, err := dag.GetBatches()
if err != nil {
return allDeleted, []error{fmt.Errorf("getting resource-group batches for sequenced uninstall: %w", err)}
}
for i, j := 0, len(batches)-1; i < j; i, j = i+1, j-1 {
batches[i], batches[j] = batches[j], batches[i]
}
for _, batch := range batches {
var batchManifests []releaseutil.Manifest
for _, groupName := range batch {
batchManifests = append(batchManifests, result.Groups[groupName]...)
}
deleted, errs := u.deleteManifestBatch(batchManifests, waiter)
allDeleted = append(allDeleted, deleted...)
if len(errs) > 0 {
return allDeleted, errs
}
}
return allDeleted, nil
}
func (u *Uninstall) recoverManifestPathsForSequencedUninstall(rel *release.Release, manifests []releaseutil.Manifest) []releaseutil.Manifest {
rendered, err := u.renderReleaseManifestsWithPaths(rel)
if err != nil {
u.cfg.Logger().Warn("unable to recover rendered chart paths for sequenced uninstall; falling back to stored manifest order", slog.Any("error", err))
return manifests
}
return applyRenderedManifestPaths(manifests, rendered)
}
func (u *Uninstall) renderReleaseManifestsWithPaths(rel *release.Release) ([]releaseutil.Manifest, error) {
if rel.Chart == nil {
return nil, errors.New("release chart is missing")
}
if err := chartutil.ProcessDependencies(rel.Chart, rel.Config); err != nil {
return nil, fmt.Errorf("processing chart dependencies: %w", err)
}
caps, err := u.cfg.getCapabilities()
if err != nil {
return nil, fmt.Errorf("getting capabilities: %w", err)
}
valuesToRender, err := renderutil.ToRenderValuesWithSchemaValidation(
rel.Chart,
rel.Config,
chartcommon.ReleaseOptions{
Name: rel.Name,
Namespace: rel.Namespace,
Revision: rel.Version,
IsInstall: rel.Version <= 1,
IsUpgrade: rel.Version > 1,
},
caps,
true,
)
if err != nil {
return nil, fmt.Errorf("building render values: %w", err)
}
_, _, _, manifests, err := u.cfg.renderResourcesWithFiles(rel.Chart, valuesToRender, rel.Name, "", false, false, false, nil, false, false, false)
if err != nil {
return nil, fmt.Errorf("rendering chart manifests: %w", err)
}
return manifests, nil
}
func applyRenderedManifestPaths(stored, rendered []releaseutil.Manifest) []releaseutil.Manifest {
byContent := make(map[string][]string, len(rendered))
byIdentity := make(map[string][]string, len(rendered))
for _, manifest := range rendered {
contentKey := normalizedManifestContent(manifest.Content)
byContent[contentKey] = append(byContent[contentKey], manifest.Name)
identityKey := manifestIdentity(manifest)
byIdentity[identityKey] = append(byIdentity[identityKey], manifest.Name)
}
out := make([]releaseutil.Manifest, len(stored))
copy(out, stored)
for i, manifest := range out {
contentKey := normalizedManifestContent(manifest.Content)
if names := byContent[contentKey]; len(names) > 0 {
out[i].Name = names[0]
byContent[contentKey] = names[1:]
continue
}
identityKey := manifestIdentity(manifest)
if names := byIdentity[identityKey]; len(names) > 0 {
out[i].Name = names[0]
byIdentity[identityKey] = names[1:]
}
}
return out
}
func manifestIdentity(manifest releaseutil.Manifest) string {
if manifest.Head == nil || manifest.Head.Metadata == nil {
return normalizedManifestContent(manifest.Content)
}
return fmt.Sprintf("%s/%s/%s", manifest.Head.Version, manifest.Head.Kind, manifest.Head.Metadata.Name)
}
func normalizedManifestContent(content string) string {
return strings.TrimSpace(content)
}
func parseCascadingFlag(cascadingFlag string) v1.DeletionPropagation {

@ -70,6 +70,10 @@ type Upgrade struct {
SkipCRDs bool
// Timeout is the timeout for this operation
Timeout time.Duration
// ReadinessTimeout is the per-batch timeout when --wait=ordered is used.
// Each ordered batch waits at most this long for resources to become ready.
// Must not exceed Timeout. Defaults to 1 minute when zero.
ReadinessTimeout time.Duration
// WaitStrategy determines what type of waiting should be done
WaitStrategy kube.WaitStrategy
// WaitOptions are additional options for waiting on resources
@ -183,12 +187,16 @@ func (u *Upgrade) RunWithContext(ctx context.Context, name string, ch chart.Char
u.WaitStrategy = kube.StatusWatcherStrategy
}
if u.ReadinessTimeout > 0 && u.Timeout > 0 && u.ReadinessTimeout > u.Timeout {
return nil, fmt.Errorf("--readiness-timeout (%s) must not exceed --timeout (%s)", u.ReadinessTimeout, u.Timeout)
}
if err := chartutil.ValidateReleaseName(name); err != nil {
return nil, fmt.Errorf("release name is invalid: %s", name)
}
u.cfg.Logger().Debug("preparing upgrade", "name", name)
currentRelease, upgradedRelease, serverSideApply, err := u.prepareUpgrade(name, chrt, vals)
currentRelease, upgradedRelease, serverSideApply, sortedManifests, err := u.prepareUpgrade(name, chrt, vals)
if err != nil {
return nil, err
}
@ -196,7 +204,12 @@ func (u *Upgrade) RunWithContext(ctx context.Context, name string, ch chart.Char
u.cfg.Releases.MaxHistory = u.MaxHistory
u.cfg.Logger().Debug("performing update", "name", name)
res, err := u.performUpgrade(ctx, currentRelease, upgradedRelease, serverSideApply)
var res *release.Release
if u.WaitStrategy == kube.OrderedWaitStrategy {
res, err = u.performSequencedUpgrade(ctx, chrt, currentRelease, upgradedRelease, sortedManifests, serverSideApply)
} else {
res, err = u.performUpgrade(ctx, currentRelease, upgradedRelease, serverSideApply)
}
if err != nil {
return res, err
}
@ -213,14 +226,16 @@ func (u *Upgrade) RunWithContext(ctx context.Context, name string, ch chart.Char
}
// prepareUpgrade builds an upgraded release for an upgrade operation.
func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[string]any) (*release.Release, *release.Release, bool, error) {
// When WaitStrategy is OrderedWaitStrategy, sortedManifests contains the parsed
// manifests with annotation data needed for DAG sequencing; otherwise it is nil.
func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[string]any) (*release.Release, *release.Release, bool, []releaseutil.Manifest, error) {
if chart == nil {
return nil, nil, false, errMissingChart
return nil, nil, false, nil, errMissingChart
}
// HideSecret must be used with dry run. Otherwise, return an error.
if !isDryRun(u.DryRunStrategy) && u.HideSecret {
return nil, nil, false, errors.New("hiding Kubernetes secrets requires a dry-run mode")
return nil, nil, false, nil, errors.New("hiding Kubernetes secrets requires a dry-run mode")
}
// finds the last non-deleted release with the given name
@ -228,19 +243,19 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[str
if err != nil {
// to keep existing behavior of returning the "%q has no deployed releases" error when an existing release does not exist
if errors.Is(err, driver.ErrReleaseNotFound) {
return nil, nil, false, driver.NewErrNoDeployedReleases(name)
return nil, nil, false, nil, driver.NewErrNoDeployedReleases(name)
}
return nil, nil, false, err
return nil, nil, false, nil, err
}
lastRelease, err := releaserToV1Release(lastReleasei)
if err != nil {
return nil, nil, false, err
return nil, nil, false, nil, err
}
// Concurrent `helm upgrade`s will either fail here with `errPending` or when creating the release with "already exists". This should act as a pessimistic lock.
if lastRelease.Info.Status.IsPending() {
return nil, nil, false, errPending
return nil, nil, false, nil, errPending
}
var currentRelease *release.Release
@ -253,14 +268,14 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[str
var cerr error
currentRelease, cerr = releaserToV1Release(currentReleasei)
if cerr != nil {
return nil, nil, false, err
return nil, nil, false, nil, err
}
if err != nil {
if errors.Is(err, driver.ErrNoDeployedReleases) &&
(lastRelease.Info.Status == rcommon.StatusFailed || lastRelease.Info.Status == rcommon.StatusSuperseded) {
currentRelease = lastRelease
} else {
return nil, nil, false, err
return nil, nil, false, nil, err
}
}
@ -269,11 +284,11 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[str
// determine if values will be reused
vals, err = u.reuseValues(chart, currentRelease, vals)
if err != nil {
return nil, nil, false, err
return nil, nil, false, nil, err
}
if err := chartutil.ProcessDependencies(chart, vals); err != nil {
return nil, nil, false, err
return nil, nil, false, nil, err
}
// Increment revision count. This is passed to templates, and also stored on
@ -289,25 +304,33 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[str
caps, err := u.cfg.getCapabilities()
if err != nil {
return nil, nil, false, err
return nil, nil, false, nil, err
}
valuesToRender, err := util.ToRenderValuesWithSchemaValidation(chart, vals, options, caps, u.SkipSchemaValidation)
if err != nil {
return nil, nil, false, err
return nil, nil, false, nil, err
}
hooks, manifestDoc, notesTxt, err := u.cfg.renderResources(chart, valuesToRender, "", "", u.SubNotes, false, false, u.PostRenderer, interactWithServer(u.DryRunStrategy), u.EnableDNS, u.HideSecret)
var sortedManifests []releaseutil.Manifest
var hooks []*release.Hook
var manifestDoc *bytes.Buffer
var notesTxt string
if u.WaitStrategy == kube.OrderedWaitStrategy {
hooks, manifestDoc, notesTxt, sortedManifests, err = u.cfg.renderResourcesWithFiles(chart, valuesToRender, "", "", u.SubNotes, false, false, u.PostRenderer, interactWithServer(u.DryRunStrategy), u.EnableDNS, u.HideSecret)
} else {
hooks, manifestDoc, notesTxt, err = u.cfg.renderResources(chart, valuesToRender, "", "", u.SubNotes, false, false, u.PostRenderer, interactWithServer(u.DryRunStrategy), u.EnableDNS, u.HideSecret)
}
if err != nil {
return nil, nil, false, err
return nil, nil, false, nil, err
}
if driver.ContainsSystemLabels(u.Labels) {
return nil, nil, false, fmt.Errorf("user supplied labels contains system reserved label name. System labels: %+v", driver.GetSystemLabels())
return nil, nil, false, nil, fmt.Errorf("user supplied labels contains system reserved label name. System labels: %+v", driver.GetSystemLabels())
}
serverSideApply, err := getUpgradeServerSideValue(u.ServerSideApply, lastRelease.ApplyMethod)
if err != nil {
return nil, nil, false, err
return nil, nil, false, nil, err
}
u.cfg.Logger().Debug("determined release apply method", slog.Bool("server_side_apply", serverSideApply), slog.String("previous_release_apply_method", lastRelease.ApplyMethod))
@ -335,7 +358,7 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chartv2.Chart, vals map[str
upgradedRelease.Info.Notes = notesTxt
}
err = validateManifest(u.cfg.KubeClient, manifestDoc.Bytes(), !u.DisableOpenAPIValidation)
return currentRelease, upgradedRelease, serverSideApply, err
return currentRelease, upgradedRelease, serverSideApply, sortedManifests, err
}
func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedRelease *release.Release, serverSideApply bool) (*release.Release, error) {
@ -462,6 +485,12 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
u.cfg.Logger().Debug("upgrade hooks disabled", "name", upgradedRelease.Name)
}
// Strip Helm-internal sequencing annotations before applying to K8s.
if err := stripSequencingAnnotations(target); err != nil {
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("stripping sequencing annotations: %w", err))
return
}
upgradeClientSideFieldManager := isReleaseApplyMethodClientSideApply(originalRelease.ApplyMethod) && serverSideApply // Update client-side field manager if transitioning from client-side to server-side apply
results, err := u.cfg.KubeClient.Update(
current,
@ -520,6 +549,123 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
u.reportToPerformUpgrade(c, upgradedRelease, nil, nil)
}
// performSequencedUpgrade deploys chart resources in DAG-ordered batches when
// --wait=ordered is used. It mirrors releasingUpgrade but uses sequencedDeployment
// to apply each batch and wait for readiness before proceeding to the next batch.
func (u *Upgrade) performSequencedUpgrade(ctx context.Context, chrt *chartv2.Chart, currentRelease, upgradedRelease *release.Release, manifests []releaseutil.Manifest, serverSideApply bool) (*release.Release, error) {
// Build the full set of current (old) resources for diff matching.
current, err := u.cfg.KubeClient.Build(bytes.NewBufferString(currentRelease.Manifest), false)
if err != nil {
if strings.Contains(err.Error(), "unable to recognize \"\": no matches for kind") {
return upgradedRelease, fmt.Errorf("current release manifest contains removed kubernetes api(s) for this "+
"kubernetes version and it is therefore unable to build the kubernetes "+
"objects for performing the diff. error from kubernetes: %w", err)
}
return upgradedRelease, fmt.Errorf("unable to build kubernetes objects from current release manifest: %w", err)
}
// Build target to compute the set of resources to delete (those removed in the new release).
target, err := u.cfg.KubeClient.Build(bytes.NewBufferString(upgradedRelease.Manifest), !u.DisableOpenAPIValidation)
if err != nil {
return upgradedRelease, fmt.Errorf("unable to build kubernetes objects from new release manifest: %w", err)
}
if isDryRun(u.DryRunStrategy) {
u.cfg.Logger().Debug("dry run for release", "name", upgradedRelease.Name)
if len(u.Description) > 0 {
upgradedRelease.Info.Description = u.Description
} else {
upgradedRelease.Info.Description = "Dry run complete"
}
return upgradedRelease, nil
}
u.cfg.Logger().Debug("creating upgraded release", "name", upgradedRelease.Name)
if err := u.cfg.Releases.Create(upgradedRelease); err != nil {
return nil, err
}
// pre-upgrade hooks
if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.WaitStrategy, u.WaitOptions, u.Timeout, serverSideApply); err != nil {
return u.failRelease(upgradedRelease, nil, fmt.Errorf("pre-upgrade hooks failed: %s", err))
}
} else {
u.cfg.Logger().Debug("upgrade hooks disabled", "name", upgradedRelease.Name)
}
upgradeCSAFieldManager := isReleaseApplyMethodClientSideApply(currentRelease.ApplyMethod) && serverSideApply
readinessTimeout := u.ReadinessTimeout
if readinessTimeout <= 0 {
readinessTimeout = time.Minute
}
sd := &sequencedDeployment{
cfg: u.cfg,
releaseName: upgradedRelease.Name,
releaseNamespace: upgradedRelease.Namespace,
disableOpenAPI: u.DisableOpenAPIValidation,
serverSideApply: serverSideApply,
forceConflicts: u.ForceConflicts,
forceReplace: u.ForceReplace,
waitStrategy: u.WaitStrategy,
waitOptions: u.WaitOptions,
waitForJobs: u.WaitForJobs,
timeout: u.Timeout,
readinessTimeout: readinessTimeout,
deadline: computeDeadline(u.Timeout),
upgradeMode: true,
currentResources: current,
upgradeCSAFieldManager: upgradeCSAFieldManager,
}
// Set SequencingInfo before deployment so that failure recovery can use sequenced order.
upgradedRelease.SequencingInfo = &release.SequencingInfo{
Enabled: true,
Strategy: string(u.WaitStrategy),
}
if err := sd.deployChartLevel(ctx, chrt, manifests); err != nil {
return u.failRelease(upgradedRelease, sd.createdResources, err)
}
// Delete resources that were removed in the new release (in old but not in new).
allNewKeys := make(map[string]bool, len(target))
for _, r := range target {
allNewKeys[objectKey(r)] = true
}
var toBeDeleted kube.ResourceList
for _, r := range current {
if !allNewKeys[objectKey(r)] {
toBeDeleted = append(toBeDeleted, r)
}
}
if len(toBeDeleted) > 0 {
if _, errs := u.cfg.KubeClient.Delete(toBeDeleted, metav1.DeletePropagationBackground); errs != nil {
return u.failRelease(upgradedRelease, sd.createdResources, fmt.Errorf("deleting removed resources: %w", joinErrors(errs, ", ")))
}
}
// post-upgrade hooks
if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.WaitStrategy, u.WaitOptions, u.Timeout, serverSideApply); err != nil {
return u.failRelease(upgradedRelease, sd.createdResources, fmt.Errorf("post-upgrade hooks failed: %s", err))
}
}
currentRelease.Info.Status = rcommon.StatusSuperseded
u.cfg.recordRelease(currentRelease)
upgradedRelease.Info.Status = rcommon.StatusDeployed
if len(u.Description) > 0 {
upgradedRelease.Info.Description = u.Description
} else {
upgradedRelease.Info.Description = "Upgrade complete"
}
u.cfg.recordRelease(upgradedRelease)
return upgradedRelease, nil
}
func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, err error) (*release.Release, error) {
msg := fmt.Sprintf("Upgrade %q failed: %s", rel.Name, err)
u.cfg.Logger().Warn(

Loading…
Cancel
Save