adding a context to the kubeclient

pull/13134/head^2
saisrikark 1 year ago
parent 15f76cf83c
commit 31c4974644

@ -412,6 +412,8 @@ func (i *Install) performInstallCtx(ctx context.Context, rel *release.Release, t
} }
resultChan := make(chan Msg, 1) resultChan := make(chan Msg, 1)
// TODOS we are not handling context here
// figure out a way to handle the context
go func() { go func() {
rel, err := i.performInstall(rel, toBeAdopted, resources) rel, err := i.performInstall(rel, toBeAdopted, resources)
resultChan <- Msg{rel, err} resultChan <- Msg{rel, err}

@ -363,14 +363,18 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR
return upgradedRelease, nil return upgradedRelease, nil
} }
// TODOS here
// ensure that when we do a RunWithContext and the context is cancelled, we should remove all the existing go routines
u.cfg.Log("creating upgraded release for %s", upgradedRelease.Name) u.cfg.Log("creating upgraded release for %s", upgradedRelease.Name)
if err := u.cfg.Releases.Create(upgradedRelease); err != nil { if err := u.cfg.Releases.Create(upgradedRelease); err != nil {
return nil, err return nil, err
} }
rChan := make(chan resultMessage) rChan := make(chan resultMessage)
ctxChan := make(chan resultMessage) ctxChan := make(chan resultMessage)
doneChan := make(chan interface{}) doneChan := make(chan interface{})
defer close(doneChan) defer close(doneChan)
go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease) go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease)
go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease) go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease)
select { select {

@ -83,15 +83,38 @@ type Client struct {
Namespace string Namespace string
kubeClient *kubernetes.Clientset kubeClient *kubernetes.Clientset
// context used to cancel the running task
ctx context.Context
} }
var addToScheme sync.Once var addToScheme sync.Once
type Option func(*Client)
func WithContext(ctx context.Context) Option {
return func(c *Client) {
c.ctx = ctx
}
}
// New creates a new Client. // New creates a new Client.
func New(getter genericclioptions.RESTClientGetter) *Client { func New(getter genericclioptions.RESTClientGetter, opts ...Option) *Client {
client := &Client{}
for _, opt := range opts {
opt(client)
}
if client.ctx == nil {
client.ctx = context.TODO()
}
if getter == nil { if getter == nil {
getter = genericclioptions.NewConfigFlags(true) getter = genericclioptions.NewConfigFlags(true)
} }
// Add CRDs to the scheme. They are missing by default. // Add CRDs to the scheme. They are missing by default.
addToScheme.Do(func() { addToScheme.Do(func() {
if err := apiextv1.AddToScheme(scheme.Scheme); err != nil { if err := apiextv1.AddToScheme(scheme.Scheme); err != nil {
@ -102,10 +125,11 @@ func New(getter genericclioptions.RESTClientGetter) *Client {
panic(err) panic(err)
} }
}) })
return &Client{
Factory: cmdutil.NewFactory(getter), client.Factory = cmdutil.NewFactory(getter)
Log: nopLogger, client.Log = nopLogger
}
return client
} }
var nopLogger = func(_ string, _ ...interface{}) {} var nopLogger = func(_ string, _ ...interface{}) {}
@ -307,6 +331,7 @@ func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) err
c: checker, c: checker,
log: c.Log, log: c.Log,
timeout: timeout, timeout: timeout,
ctx: c.ctx,
} }
return w.waitForResources(resources) return w.waitForResources(resources)
} }
@ -316,6 +341,7 @@ func (c *Client) WaitForDelete(resources ResourceList, timeout time.Duration) er
w := waiter{ w := waiter{
log: c.Log, log: c.Log,
timeout: timeout, timeout: timeout,
ctx: c.ctx,
} }
return w.waitForDeletedResources(resources) return w.waitForDeletedResources(resources)
} }

@ -42,6 +42,9 @@ type waiter struct {
c ReadyChecker c ReadyChecker
timeout time.Duration timeout time.Duration
log func(string, ...interface{}) log func(string, ...interface{})
// ctx used to to cancel and avoid go routines leaking
ctx context.Context
} }
// waitForResources polls to get the current status of all pods, PVCs, Services and // waitForResources polls to get the current status of all pods, PVCs, Services and
@ -49,7 +52,7 @@ type waiter struct {
func (w *waiter) waitForResources(created ResourceList) error { func (w *waiter) waitForResources(created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
ctx, cancel := context.WithTimeout(context.Background(), w.timeout) ctx, cancel := context.WithTimeout(w.ctx, w.timeout)
defer cancel() defer cancel()
numberOfErrors := make([]int, len(created)) numberOfErrors := make([]int, len(created))

Loading…
Cancel
Save