diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 3753998ff..8dca1c51b 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -18,7 +18,6 @@ package kube // import "helm.sh/helm/v4/pkg/kube" import ( "bytes" - "context" "encoding/json" "fmt" "io" @@ -27,11 +26,9 @@ import ( "reflect" "strings" "sync" - "time" jsonpatch "github.com/evanphx/json-patch" "github.com/pkg/errors" - batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" @@ -39,23 +36,18 @@ import ( "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" - multierror "github.com/hashicorp/go-multierror" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" - cachetools "k8s.io/client-go/tools/cache" - watchtools "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/retry" cmdutil "k8s.io/kubectl/pkg/cmd/util" ) @@ -524,52 +516,6 @@ func rdelete(c *Client, resources ResourceList, propagation metav1.DeletionPropa return res, nil } -func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error { - return func(info *resource.Info) error { - return c.watchUntilReady(t, info) - } -} - -// WatchUntilReady watches the resources given and waits until it is ready. -// -// This method is mainly for hook implementations. It watches for a resource to -// hit a particular milestone. The milestone depends on the Kind. -// -// For most kinds, it checks to see if the resource is marked as Added or Modified -// by the Kubernetes event stream. For some kinds, it does more: -// -// - Jobs: A job is marked "Ready" when it has successfully completed. This is -// ascertained by watching the Status fields in a job's output. -// - Pods: A pod is marked "Ready" when it has successfully completed. This is -// ascertained by watching the status.phase field in a pod's output. -// -// Handling for other kinds will be added as necessary. -func (c *Client) WatchUntilReady(resources ResourceList, timeout time.Duration) error { - // For jobs, there's also the option to do poll c.Jobs(namespace).Get(): - // https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300 - return perform(resources, c.watchTimeout(timeout)) -} - -func perform(infos ResourceList, fn func(*resource.Info) error) error { - var result error - - if len(infos) == 0 { - return ErrNoObjectsVisited - } - - errs := make(chan error) - go batchPerform(infos, fn, errs) - - for range infos { - err := <-errs - if err != nil { - result = multierror.Append(result, err) - } - } - - return result -} - // getManagedFieldsManager returns the manager string. If one was set it will be returned. // Otherwise, one is calculated based on the name of the binary. func getManagedFieldsManager() string { @@ -721,109 +667,6 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, return nil } -func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { - kind := info.Mapping.GroupVersionKind.Kind - switch kind { - case "Job", "Pod": - default: - return nil - } - - c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout) - - // Use a selector on the name of the resource. This should be unique for the - // given version and kind - selector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s", info.Name)) - if err != nil { - return err - } - lw := cachetools.NewListWatchFromClient(info.Client, info.Mapping.Resource.Resource, info.Namespace, selector) - - // What we watch for depends on the Kind. - // - For a Job, we watch for completion. - // - For all else, we watch until Ready. - // In the future, we might want to add some special logic for types - // like Ingress, Volume, etc. - - ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) - defer cancel() - _, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, nil, func(e watch.Event) (bool, error) { - // Make sure the incoming object is versioned as we use unstructured - // objects when we build manifests - obj := convertWithMapper(e.Object, info.Mapping) - switch e.Type { - case watch.Added, watch.Modified: - // For things like a secret or a config map, this is the best indicator - // we get. We care mostly about jobs, where what we want to see is - // the status go into a good state. For other types, like ReplicaSet - // we don't really do anything to support these as hooks. - c.Log("Add/Modify event for %s: %v", info.Name, e.Type) - switch kind { - case "Job": - return c.waitForJob(obj, info.Name) - case "Pod": - return c.waitForPodSuccess(obj, info.Name) - } - return true, nil - case watch.Deleted: - c.Log("Deleted event for %s", info.Name) - return true, nil - case watch.Error: - // Handle error and return with an error. - c.Log("Error event for %s", info.Name) - return true, errors.Errorf("failed to deploy %s", info.Name) - default: - return false, nil - } - }) - return err -} - -// waitForJob is a helper that waits for a job to complete. -// -// This operates on an event returned from a watcher. -func (c *Client) waitForJob(obj runtime.Object, name string) (bool, error) { - o, ok := obj.(*batch.Job) - if !ok { - return true, errors.Errorf("expected %s to be a *batch.Job, got %T", name, obj) - } - - for _, c := range o.Status.Conditions { - if c.Type == batch.JobComplete && c.Status == "True" { - return true, nil - } else if c.Type == batch.JobFailed && c.Status == "True" { - return true, errors.Errorf("job %s failed: %s", name, c.Reason) - } - } - - c.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", name, o.Status.Active, o.Status.Failed, o.Status.Succeeded) - return false, nil -} - -// waitForPodSuccess is a helper that waits for a pod to complete. -// -// This operates on an event returned from a watcher. -func (c *Client) waitForPodSuccess(obj runtime.Object, name string) (bool, error) { - o, ok := obj.(*v1.Pod) - if !ok { - return true, errors.Errorf("expected %s to be a *v1.Pod, got %T", name, obj) - } - - switch o.Status.Phase { - case v1.PodSucceeded: - c.Log("Pod %s succeeded", o.Name) - return true, nil - case v1.PodFailed: - return true, errors.Errorf("pod %s failed", o.Name) - case v1.PodPending: - c.Log("Pod %s pending", o.Name) - case v1.PodRunning: - c.Log("Pod %s running", o.Name) - } - - return false, nil -} - // scrubValidationError removes kubectl info from the message. func scrubValidationError(err error) error { if err == nil { diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index f8e3c2ee2..0e6da1094 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -34,18 +34,6 @@ type Interface interface { // Delete destroys one or more resources. Delete(resources ResourceList) (*Result, []error) - // WatchUntilReady watches the resources given and waits until it is ready. - // - // This method is mainly for hook implementations. It watches for a resource to - // hit a particular milestone. The milestone depends on the Kind. - // - // For Jobs, "ready" means the Job ran to completion (exited without error). - // For Pods, "ready" means the Pod phase is marked "succeeded". - // For all other kinds, it means the kind was created or modified without - // error. - // TODO: Is watch until ready really behavior we want over the resources actually being ready? - WatchUntilReady(resources ResourceList, timeout time.Duration) error - // Update updates one or more resources or creates the resource // if it doesn't exist. Update(original, target ResourceList, force bool) (*Result, error) @@ -72,6 +60,18 @@ type Waiter interface { // WaitForDelete wait up to the given timeout for the specified resources to be deleted. WaitForDelete(resources ResourceList, timeout time.Duration) error + + // WatchUntilReady watches the resources given and waits until it is ready. + // + // This method is mainly for hook implementations. It watches for a resource to + // hit a particular milestone. The milestone depends on the Kind. + // + // For Jobs, "ready" means the Job ran to completion (exited without error). + // For Pods, "ready" means the Pod phase is marked "succeeded". + // For all other kinds, it means the kind was created or modified without + // error. + // TODO: Is watch until ready really behavior we want over the resources actually being ready? + WatchUntilReady(resources ResourceList, timeout time.Duration) error } // InterfaceDeletionPropagation is introduced to avoid breaking backwards compatibility for Interface implementers. diff --git a/pkg/kube/statuswait.go b/pkg/kube/statuswait.go index 1aa424c4c..2e27917bc 100644 --- a/pkg/kube/statuswait.go +++ b/pkg/kube/statuswait.go @@ -40,6 +40,11 @@ type statusWaiter struct { log func(string, ...interface{}) } +func (w *statusWaiter) WatchUntilReady(resources ResourceList, timeout time.Duration) error { + panic("todo") + return nil +} + func (w *statusWaiter) Wait(resourceList ResourceList, timeout time.Duration) error { ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index 525373e4d..fdb3c9087 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -22,19 +22,27 @@ import ( "net/http" "time" + multierror "github.com/hashicorp/go-multierror" "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" appsv1beta1 "k8s.io/api/apps/v1beta1" appsv1beta2 "k8s.io/api/apps/v1beta2" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes" + cachetools "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" + batch "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/util/wait" ) @@ -177,3 +185,152 @@ func SelectorsForObject(object runtime.Object) (selector labels.Selector, err er return selector, errors.Wrap(err, "invalid label selector") } + +func (hw *HelmWaiter) watchTimeout(t time.Duration) func(*resource.Info) error { + return func(info *resource.Info) error { + return hw.watchUntilReady(t, info) + } +} + +// WatchUntilReady watches the resources given and waits until it is ready. +// +// This method is mainly for hook implementations. It watches for a resource to +// hit a particular milestone. The milestone depends on the Kind. +// +// For most kinds, it checks to see if the resource is marked as Added or Modified +// by the Kubernetes event stream. For some kinds, it does more: +// +// - Jobs: A job is marked "Ready" when it has successfully completed. This is +// ascertained by watching the Status fields in a job's output. +// - Pods: A pod is marked "Ready" when it has successfully completed. This is +// ascertained by watching the status.phase field in a pod's output. +// +// Handling for other kinds will be added as necessary. +func (hw *HelmWaiter) WatchUntilReady(resources ResourceList, timeout time.Duration) error { + // For jobs, there's also the option to do poll c.Jobs(namespace).Get(): + // https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300 + return perform(resources, hw.watchTimeout(timeout)) +} + +func perform(infos ResourceList, fn func(*resource.Info) error) error { + var result error + + if len(infos) == 0 { + return ErrNoObjectsVisited + } + + errs := make(chan error) + go batchPerform(infos, fn, errs) + + for range infos { + err := <-errs + if err != nil { + result = multierror.Append(result, err) + } + } + + return result +} + +func (hw *HelmWaiter) watchUntilReady(timeout time.Duration, info *resource.Info) error { + kind := info.Mapping.GroupVersionKind.Kind + switch kind { + case "Job", "Pod": + default: + return nil + } + + hw.log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout) + + // Use a selector on the name of the resource. This should be unique for the + // given version and kind + selector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s", info.Name)) + if err != nil { + return err + } + lw := cachetools.NewListWatchFromClient(info.Client, info.Mapping.Resource.Resource, info.Namespace, selector) + + // What we watch for depends on the Kind. + // - For a Job, we watch for completion. + // - For all else, we watch until Ready. + // In the future, we might want to add some special logic for types + // like Ingress, Volume, etc. + + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + _, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, nil, func(e watch.Event) (bool, error) { + // Make sure the incoming object is versioned as we use unstructured + // objects when we build manifests + obj := convertWithMapper(e.Object, info.Mapping) + switch e.Type { + case watch.Added, watch.Modified: + // For things like a secret or a config map, this is the best indicator + // we get. We care mostly about jobs, where what we want to see is + // the status go into a good state. For other types, like ReplicaSet + // we don't really do anything to support these as hooks. + hw.log("Add/Modify event for %s: %v", info.Name, e.Type) + switch kind { + case "Job": + return hw.waitForJob(obj, info.Name) + case "Pod": + return hw.waitForPodSuccess(obj, info.Name) + } + return true, nil + case watch.Deleted: + hw.log("Deleted event for %s", info.Name) + return true, nil + case watch.Error: + // Handle error and return with an error. + hw.log("Error event for %s", info.Name) + return true, errors.Errorf("failed to deploy %s", info.Name) + default: + return false, nil + } + }) + return err +} + +// waitForJob is a helper that waits for a job to complete. +// +// This operates on an event returned from a watcher. +func (hw *HelmWaiter) waitForJob(obj runtime.Object, name string) (bool, error) { + o, ok := obj.(*batch.Job) + if !ok { + return true, errors.Errorf("expected %s to be a *batch.Job, got %T", name, obj) + } + + for _, c := range o.Status.Conditions { + if c.Type == batch.JobComplete && c.Status == "True" { + return true, nil + } else if c.Type == batch.JobFailed && c.Status == "True" { + return true, errors.Errorf("job %s failed: %s", name, c.Reason) + } + } + + hw.log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", name, o.Status.Active, o.Status.Failed, o.Status.Succeeded) + return false, nil +} + +// waitForPodSuccess is a helper that waits for a pod to complete. +// +// This operates on an event returned from a watcher. +func (c *HelmWaiter) waitForPodSuccess(obj runtime.Object, name string) (bool, error) { + o, ok := obj.(*v1.Pod) + if !ok { + return true, errors.Errorf("expected %s to be a *v1.Pod, got %T", name, obj) + } + + switch o.Status.Phase { + case v1.PodSucceeded: + c.log("Pod %s succeeded", o.Name) + return true, nil + case v1.PodFailed: + return true, errors.Errorf("pod %s failed", o.Name) + case v1.PodPending: + c.log("Pod %s pending", o.Name) + case v1.PodRunning: + c.log("Pod %s running", o.Name) + } + + return false, nil +}