From ebc79fa807f29b984e090f0071b640f7347937cf Mon Sep 17 00:00:00 2001 From: Davanum Srinivas Date: Fri, 30 Sep 2022 18:22:54 -0400 Subject: [PATCH] Tolerate temporary errors from etcdserver There are cases when the etcdserver is temporarily unavailable and the errors that we get back from kube-apiserver reflect that error. It looks like we bail out immediately when these errors happen currently. We should retry until timeout is reached when this sort of errors happen. Signed-off-by: Davanum Srinivas --- go.mod | 3 ++- go.sum | 2 ++ pkg/kube/wait.go | 26 +++++++++++++++++++++++++- pkg/kube/wait_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 pkg/kube/wait_test.go diff --git a/go.mod b/go.mod index 9ced3597c..5952bd29d 100644 --- a/go.mod +++ b/go.mod @@ -31,9 +31,11 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.0 github.com/xeipuuv/gojsonschema v1.2.0 + go.etcd.io/etcd/api/v3 v3.5.4 golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 golang.org/x/text v0.3.7 + google.golang.org/grpc v1.47.0 k8s.io/api v0.25.0 k8s.io/apiextensions-apiserver v0.25.0 k8s.io/apimachinery v0.25.0 @@ -149,7 +151,6 @@ require ( golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect - google.golang.org/grpc v1.47.0 // indirect google.golang.org/protobuf v1.28.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 93d35468f..5cf093a31 100644 --- a/go.sum +++ b/go.sum @@ -619,6 +619,8 @@ github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go github.com/ziutek/mymysql v1.5.4 h1:GB0qdRGsTwQSBVYuVShFBKaXSnSnYYC2d9knnE1LHFs= github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0= go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= +go.etcd.io/etcd/api/v3 v3.5.4 h1:OHVyt3TopwtUQ2GKdd5wu3PmmipR4FTwCqoEjSyRdIc= +go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index 8928d6745..fd01e9bc7 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -22,6 +22,9 @@ import ( "time" "github.com/pkg/errors" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "google.golang.org/grpc/codes" + appsv1 "k8s.io/api/apps/v1" appsv1beta1 "k8s.io/api/apps/v1beta1" appsv1beta2 "k8s.io/api/apps/v1beta2" @@ -32,7 +35,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" ) @@ -42,6 +44,22 @@ type waiter struct { log func(string, ...interface{}) } +// isServiceUnavailable helps figure out if the error is caused by etcd not being available +// see https://pkg.go.dev/go.etcd.io/etcd/api/v3/v3rpc/rpctypes for `codes.Unavailable` +// we use this to check if the etcdserver is not available we should retry in case +// this is a temporary situation +func isServiceUnavailable(err error) bool { + if err != nil { + err = rpctypes.Error(err) + if ev, ok := err.(rpctypes.EtcdError); ok { + if ev.Code() == codes.Unavailable { + return true + } + } + } + return false +} + // waitForResources polls to get the current status of all pods, PVCs, Services and // Jobs(optional) until all are ready or a timeout is reached func (w *waiter) waitForResources(created ResourceList) error { @@ -54,6 +72,9 @@ func (w *waiter) waitForResources(created ResourceList) error { for _, v := range created { ready, err := w.c.IsReady(ctx, v) if !ready || err != nil { + if isServiceUnavailable(err) { + return false, nil + } return false, err } } @@ -72,6 +93,9 @@ func (w *waiter) waitForDeletedResources(deleted ResourceList) error { for _, v := range deleted { err := v.Get() if err == nil || !apierrors.IsNotFound(err) { + if isServiceUnavailable(err) { + return false, nil + } return false, err } } diff --git a/pkg/kube/wait_test.go b/pkg/kube/wait_test.go new file mode 100644 index 000000000..5f18e49ce --- /dev/null +++ b/pkg/kube/wait_test.go @@ -0,0 +1,42 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube // import "helm.sh/helm/v3/pkg/kube" + +import ( + "errors" + "testing" + + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" +) + +func Test_isServiceUnavailable(t *testing.T) { + tests := []struct { + err error + expect bool + }{ + {err: nil, expect: false}, + {err: errors.New("random error from somewhere"), expect: false}, + {err: rpctypes.ErrGRPCLeaderChanged, expect: true}, + {err: errors.New("etcdserver: leader changed"), expect: true}, + } + + for _, tt := range tests { + if isServiceUnavailable(tt.err) != tt.expect { + t.Errorf("failed test for %q (expect equal: %t)", tt.err, tt.expect) + } + } +}