diff --git a/cmd/rudder/rudder.go b/cmd/rudder/rudder.go index 93913be91..71f5325d1 100644 --- a/cmd/rudder/rudder.go +++ b/cmd/rudder/rudder.go @@ -81,8 +81,12 @@ func (r *ReleaseModuleServiceServer) RollbackRelease(ctx context.Context, in *re return nil, nil } -// UpgradeRelease is not implemented +// UpgradeRelease upgrades manifests using kubernetes client func (r *ReleaseModuleServiceServer) UpgradeRelease(ctx context.Context, in *release.UpgradeReleaseRequest) (*release.UpgradeReleaseResponse, error) { grpclog.Print("upgrade") - return nil, nil + c := bytes.NewBufferString(in.Current.Manifest) + t := bytes.NewBufferString(in.Target.Manifest) + err := kubeClient.Update(in.Target.Namespace, c, t, in.Recreate, in.Timeout, in.Wait) + // upgrade response object should be changed to include status + return &release.UpgradeReleaseResponse{}, err } diff --git a/pkg/proto/hapi/release/modules.pb.go b/pkg/proto/hapi/release/modules.pb.go index cfe36e572..123eb378a 100644 --- a/pkg/proto/hapi/release/modules.pb.go +++ b/pkg/proto/hapi/release/modules.pb.go @@ -158,7 +158,15 @@ func (m *DeleteReleaseResponse) GetResult() *Result { } type UpgradeReleaseRequest struct { - Release *Release `protobuf:"bytes,1,opt,name=release" json:"release,omitempty"` + Current *Release `protobuf:"bytes,1,opt,name=release" json:"release,omitempty"` + Target *Release `protobuf:"bytes,1,opt,name=release" json:"release,omitempty"` + // Performs pods restart for resources if applicable + Recreate bool `protobuf:"varint,6,opt,name=recreate" json:"recreate,omitempty"` + // timeout specifies the max amount of time any kubernetes client command can run. + Timeout int64 `protobuf:"varint,7,opt,name=timeout" json:"timeout,omitempty"` + // wait, if true, will wait until all Pods, PVCs, and Services are in a ready state + // before marking the release as successful. It will wait for as long as timeout + Wait bool `protobuf:"varint,9,opt,name=wait" json:"wait,omitempty"` } func (m *UpgradeReleaseRequest) Reset() { *m = UpgradeReleaseRequest{} } @@ -168,7 +176,7 @@ func (*UpgradeReleaseRequest) Descriptor() ([]byte, []int) { return fileDescript func (m *UpgradeReleaseRequest) GetRelease() *Release { if m != nil { - return m.Release + return m.Target } return nil } diff --git a/pkg/rudder/client.go b/pkg/rudder/client.go index cc3b193ab..1583ac69b 100644 --- a/pkg/rudder/client.go +++ b/pkg/rudder/client.go @@ -40,3 +40,14 @@ func InstallRelease(rel *release.InstallReleaseRequest) (*release.InstallRelease client := release.NewReleaseModuleServiceClient(conn) return client.InstallRelease(context.Background(), rel) } + +// UpgradeReleas calls Rudder UpgradeRelease method which should perform update +func UpgradeRelease(req *release.UpgradeReleaseRequest) (*release.UpgradeReleaseResponse, error) { + conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%d", GrpcAddr), grpc.WithInsecure()) + if err != nil { + return nil, err + } + defer conn.Close() + client := release.NewReleaseModuleServiceClient(conn) + return client.UpgradeRelease(context.Background(), req) +} diff --git a/pkg/tiller/release_modules.go b/pkg/tiller/release_modules.go index 12d4cdcb2..a377527f6 100644 --- a/pkg/tiller/release_modules.go +++ b/pkg/tiller/release_modules.go @@ -28,6 +28,7 @@ import ( // ReleaseModule is an interface that allows ReleaseServer to run operations on release via either local implementation or Rudder service type ReleaseModule interface { Create(r *release.Release, req *services.InstallReleaseRequest, env *environment.Environment) error + Update(current, target *release.Release, req *services.UpdateReleaseRequest, env *environment.Environment) error } // LocalReleaseModule is a local implementation of ReleaseModule @@ -39,6 +40,12 @@ func (m *LocalReleaseModule) Create(r *release.Release, req *services.InstallRel return env.KubeClient.Create(r.Namespace, b, req.Timeout, req.Wait) } +func (m *LocalReleaseModule) Update(current, target *release.Release, req *services.UpdateReleaseRequest, env *environment.Environment) error { + c := bytes.NewBufferString(current.Manifest) + t := bytes.NewBufferString(target.Manifest) + return env.KubeClient.Update(target.Namespace, c, t, req.Recreate, req.Timeout, req.Wait) +} + // RemoteReleaseModule is a ReleaseModule which calls Rudder service to operate on a release type RemoteReleaseModule struct{} @@ -48,3 +55,16 @@ func (m *RemoteReleaseModule) Create(r *release.Release, req *services.InstallRe _, err := rudder.InstallRelease(request) return err } + +// Update calls rudder.UpgradeRelease +func (m *RemoteReleaseModule) Update(current, target *release.Release, req *services.UpdateReleaseRequest, env *environment.Environment) error { + req := &release.UpgradeReleaseRequest{ + Current: current, + Target: target, + Recreate: req.Recreate, + Timeout: req.Timeout, + Wait: req.Wait, + } + _, err := rudder.UpgradeRelease(req) + return err +} diff --git a/pkg/tiller/release_server.go b/pkg/tiller/release_server.go index 4d56465cb..ea5cafd02 100644 --- a/pkg/tiller/release_server.go +++ b/pkg/tiller/release_server.go @@ -332,8 +332,7 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R return res, err } } - - if err := s.performKubeUpdate(originalRelease, updatedRelease, req.Recreate, req.Timeout, req.Wait); err != nil { + if err := s.ReleaseModule.Update(originalRelease, updatedRelease, req, s.env); err != nil { msg := fmt.Sprintf("Upgrade %q failed: %s", updatedRelease.Name, err) log.Printf("warning: %s", msg) originalRelease.Info.Status.Code = release.Status_SUPERSEDED @@ -520,7 +519,7 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R } } - if err := s.performKubeUpdate(currentRelease, targetRelease, req.Recreate, req.Timeout, req.Wait); err != nil { + if err := s.ReleaseModule.Update(currentRelease, targetRelease, req); err != nil { msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) log.Printf("warning: %s", msg) currentRelease.Info.Status.Code = release.Status_SUPERSEDED @@ -546,13 +545,6 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R return res, nil } -func (s *ReleaseServer) performKubeUpdate(currentRelease, targetRelease *release.Release, recreate bool, timeout int64, shouldWait bool) error { - kubeCli := s.env.KubeClient - current := bytes.NewBufferString(currentRelease.Manifest) - target := bytes.NewBufferString(targetRelease.Manifest) - return kubeCli.Update(targetRelease.Namespace, current, target, recreate, timeout, shouldWait) -} - // prepareRollback finds the previous release and prepares a new release object with // the previous release's configuration func (s *ReleaseServer) prepareRollback(req *services.RollbackReleaseRequest) (*release.Release, *release.Release, error) { @@ -902,7 +894,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install // so as to append to the old release's history r.Version = old.Version + 1 - if err := s.performKubeUpdate(old, r, false, req.Timeout, req.Wait); err != nil { + if err := s.ReleaseModule.Update(old, r, req, s.env); err != nil { msg := fmt.Sprintf("Release replace %q failed: %s", r.Name, err) log.Printf("warning: %s", msg) old.Info.Status.Code = release.Status_SUPERSEDED