From 31c4974644f48367326c7cef8a948f618ce28980 Mon Sep 17 00:00:00 2001 From: saisrikark Date: Mon, 17 Jun 2024 15:12:22 +0530 Subject: [PATCH] adding a context to the kubeclient --- pkg/action/install.go | 2 ++ pkg/action/upgrade.go | 4 ++++ pkg/kube/client.go | 36 +++++++++++++++++++++++++++++++----- pkg/kube/wait.go | 5 ++++- 4 files changed, 41 insertions(+), 6 deletions(-) diff --git a/pkg/action/install.go b/pkg/action/install.go index 6dce3ccbb..dd5d4f40e 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -412,6 +412,8 @@ func (i *Install) performInstallCtx(ctx context.Context, rel *release.Release, t } resultChan := make(chan Msg, 1) + // TODOS we are not handling context here + // figure out a way to handle the context go func() { rel, err := i.performInstall(rel, toBeAdopted, resources) resultChan <- Msg{rel, err} diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index 6d26a754e..772dd55e9 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -363,14 +363,18 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR return upgradedRelease, nil } + // TODOS here + // ensure that when we do a RunWithContext and the context is cancelled, we should remove all the existing go routines u.cfg.Log("creating upgraded release for %s", upgradedRelease.Name) if err := u.cfg.Releases.Create(upgradedRelease); err != nil { return nil, err } + rChan := make(chan resultMessage) ctxChan := make(chan resultMessage) doneChan := make(chan interface{}) defer close(doneChan) + go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease) go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease) select { diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 9df833a43..5a36700b6 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -83,15 +83,38 @@ type Client struct { Namespace string kubeClient *kubernetes.Clientset + + // context used to cancel the running task + ctx context.Context } var addToScheme sync.Once +type Option func(*Client) + +func WithContext(ctx context.Context) Option { + return func(c *Client) { + c.ctx = ctx + } +} + // New creates a new Client. -func New(getter genericclioptions.RESTClientGetter) *Client { +func New(getter genericclioptions.RESTClientGetter, opts ...Option) *Client { + + client := &Client{} + + for _, opt := range opts { + opt(client) + } + + if client.ctx == nil { + client.ctx = context.TODO() + } + if getter == nil { getter = genericclioptions.NewConfigFlags(true) } + // Add CRDs to the scheme. They are missing by default. addToScheme.Do(func() { if err := apiextv1.AddToScheme(scheme.Scheme); err != nil { @@ -102,10 +125,11 @@ func New(getter genericclioptions.RESTClientGetter) *Client { panic(err) } }) - return &Client{ - Factory: cmdutil.NewFactory(getter), - Log: nopLogger, - } + + client.Factory = cmdutil.NewFactory(getter) + client.Log = nopLogger + + return client } var nopLogger = func(_ string, _ ...interface{}) {} @@ -307,6 +331,7 @@ func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) err c: checker, log: c.Log, timeout: timeout, + ctx: c.ctx, } return w.waitForResources(resources) } @@ -316,6 +341,7 @@ func (c *Client) WaitForDelete(resources ResourceList, timeout time.Duration) er w := waiter{ log: c.Log, timeout: timeout, + ctx: c.ctx, } return w.waitForDeletedResources(resources) } diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index 36110d0de..ea2e0fe37 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -42,6 +42,9 @@ type waiter struct { c ReadyChecker timeout time.Duration log func(string, ...interface{}) + + // ctx used to to cancel and avoid go routines leaking + ctx context.Context } // waitForResources polls to get the current status of all pods, PVCs, Services and @@ -49,7 +52,7 @@ type waiter struct { func (w *waiter) waitForResources(created ResourceList) error { w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) - ctx, cancel := context.WithTimeout(context.Background(), w.timeout) + ctx, cancel := context.WithTimeout(w.ctx, w.timeout) defer cancel() numberOfErrors := make([]int, len(created))