Merge pull request #2 from saisrikark/context-13126-helm

modifying tests and removing leaking code
pull/13135/head
Sai Srikar Komaravolu 1 year ago committed by GitHub
commit 2f10d06fa8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -440,8 +440,7 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) {
is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled")
is.Equal(res.Info.Status, release.StatusFailed)
is.Equal(goroutines+1, runtime.NumGoroutine()) // installation goroutine still is in background
time.Sleep(10 * time.Second) // wait for goroutine to finish
// since the context is cancelled all linked goroutines must also be cancelled without delay
is.Equal(goroutines, runtime.NumGoroutine())
}
func TestInstallRelease_WaitForJobs(t *testing.T) {

@ -369,18 +369,14 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR
}
rChan := make(chan resultMessage)
ctxChan := make(chan resultMessage)
doneChan := make(chan interface{})
defer close(doneChan)
// when context is cancelled, we should terminate all the goroutines
go u.releasingUpgrade(ctx, rChan, upgradedRelease, current, target, originalRelease)
go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease)
select {
case result := <-rChan:
return result.r, result.e
case result := <-ctxChan:
return result.r, result.e
}
result := <-rChan
return result.r, result.e
}
// Function used to lock the Mutex, this is important for the case when the atomic flag is set.
@ -395,18 +391,6 @@ func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Re
u.Lock.Unlock()
}
// Setup listener for SIGINT and SIGTERM
func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c chan<- resultMessage, upgradedRelease *release.Release) {
select {
case <-ctx.Done():
err := ctx.Err()
// when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens.
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err)
case <-done:
return
}
}
func (u *Upgrade) releasingUpgrade(ctx context.Context, c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) {
// pre-upgrade hooks

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"reflect"
"runtime"
"testing"
"time"
@ -401,13 +402,15 @@ func TestUpgradeRelease_Interrupted_Wait(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
goroutines := runtime.NumGoroutine()
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
req.Error(err)
is.Contains(res.Info.Description, "Upgrade \"interrupted-release\" failed: context canceled")
is.Equal(res.Info.Status, release.StatusFailed)
// since the context is cancelled all linked goroutines must also be cancelled
is.Equal(goroutines, runtime.NumGoroutine())
}
func TestUpgradeRelease_Interrupted_Atomic(t *testing.T) {
@ -430,6 +433,7 @@ func TestUpgradeRelease_Interrupted_Atomic(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
goroutines := runtime.NumGoroutine()
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
@ -441,6 +445,8 @@ func TestUpgradeRelease_Interrupted_Atomic(t *testing.T) {
is.NoError(err)
// Should have rolled back to the previous
is.Equal(updatedRes.Info.Status, release.StatusDeployed)
// since the context is cancelled all linked goroutines must also be cancelled
is.Equal(goroutines, runtime.NumGoroutine())
}
func TestMergeCustomLabels(t *testing.T) {

@ -283,32 +283,12 @@ func getResource(info *resource.Info) (runtime.Object, error) {
// Wait waits up to the given timeout for the specified resources to be ready.
func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
cs, err := c.getKubeClient()
if err != nil {
return err
}
checker := NewReadyChecker(cs, c.Log, PausedAsReady(true))
w := waiter{
c: checker,
log: c.Log,
timeout: timeout,
}
return w.waitForResources(resources)
return c.WaitWithContext(context.Background(), resources, timeout)
}
// WaitWithJobs wait up to the given timeout for the specified resources to be ready or until the context is Done, including jobs.
func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error {
cs, err := c.getKubeClient()
if err != nil {
return err
}
checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true))
w := waiter{
c: checker,
log: c.Log,
timeout: timeout,
}
return w.waitForResources(resources)
return c.WaitWithJobsWithContext(context.Background(), resources, timeout)
}
// WaitWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done.

@ -19,6 +19,7 @@ package fake
import (
"context"
"fmt"
"io"
"time"
@ -83,21 +84,36 @@ func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Dur
return f.PrintingKubeClient.WaitWithJobs(resources, d)
}
// WaitWithContext the amount of time defined on f.WaitDuration, then returns the configured error if set or prints.
// WaitWithContext waits the amount of time defined on f.WaitDuration or until context is done
// then returns the configured error if set or prints.
func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList, d time.Duration) error {
time.Sleep(f.WaitDuration)
select {
case <-ctx.Done():
return fmt.Errorf("context canceled")
case <-time.After(f.WaitDuration):
}
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.WaitWithContext(ctx, resources, d)
}
// WaitWithJobsWithContext returns the configured error if set or prints
// WaitWithJobsWithContext waits the amount of time defined on f.WaitDuration or until context is done
// then returns the configured error if set or prints.
func (f *FailingKubeClient) WaitWithJobsWithContext(ctx context.Context, resources kube.ResourceList, d time.Duration) error {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled")
case <-time.After(f.WaitDuration):
}
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.WaitWithJobsWithContext(ctx, resources, d)
return f.PrintingKubeClient.WaitWithContext(ctx, resources, d)
}
// WaitForDelete returns the configured error if set or prints

@ -34,9 +34,11 @@ type Interface interface {
Create(resources ResourceList) (*Result, error)
// Wait waits up to the given timeout for the specified resources to be ready.
// deprecated and replaced by WaitWithContext due for removal in helm v4
Wait(resources ResourceList, timeout time.Duration) error
// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs.
// deprecated and replaced by WaitWithJobsWithContext due for removal in helm v4
WaitWithJobs(resources ResourceList, timeout time.Duration) error
// WaitWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done.

Loading…
Cancel
Save