|
|
|
@ -321,9 +321,17 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
rChan := make(chan resultMessage)
|
|
|
|
|
ctxChan := make(chan resultMessage)
|
|
|
|
|
doneChan := make(chan interface{})
|
|
|
|
|
go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease)
|
|
|
|
|
go u.handleContext(ctx, rChan, upgradedRelease)
|
|
|
|
|
result := <-rChan
|
|
|
|
|
go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease)
|
|
|
|
|
var result resultMessage
|
|
|
|
|
select {
|
|
|
|
|
case result = <-rChan:
|
|
|
|
|
doneChan <- true
|
|
|
|
|
case result = <-ctxChan:
|
|
|
|
|
close(rChan)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return result.r, result.e
|
|
|
|
|
}
|
|
|
|
@ -341,13 +349,17 @@ func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Re
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Setup listener for SIGINT and SIGTERM
|
|
|
|
|
func (u *Upgrade) handleContext(ctx context.Context, c chan<- resultMessage, upgradedRelease *release.Release) {
|
|
|
|
|
|
|
|
|
|
func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c chan<- resultMessage, upgradedRelease *release.Release) {
|
|
|
|
|
go func() {
|
|
|
|
|
<-ctx.Done()
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
err := ctx.Err()
|
|
|
|
|
|
|
|
|
|
// when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens.
|
|
|
|
|
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err)
|
|
|
|
|
case <-done:
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) {
|
|
|
|
|