pull/2386/merge
mike 9 years ago committed by GitHub
commit ce292fac64

@ -75,7 +75,7 @@ type ResourceActorFunc func(*resource.Info) error
// Create creates kubernetes resources from an io.reader
//
// Namespace will set the namespace
func (c *Client) Create(namespace string, reader io.Reader, timeout int64, shouldWait bool) error {
func (c *Client) Create(namespace string, reader io.Reader, writer io.Writer, timeout int64, shouldWait bool) error {
client, err := c.ClientSet()
if err != nil {
return err
@ -91,7 +91,7 @@ func (c *Client) Create(namespace string, reader io.Reader, timeout int64, shoul
return err
}
if shouldWait {
return c.waitForResources(time.Duration(timeout)*time.Second, infos)
return c.waitForResources(time.Duration(timeout)*time.Second, infos, writer)
}
return nil
}
@ -208,7 +208,7 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) {
// not present in the target configuration
//
// Namespace will set the namespaces
func (c *Client) Update(namespace string, originalReader, targetReader io.Reader, recreate bool, timeout int64, shouldWait bool) error {
func (c *Client) Update(namespace string, originalReader, targetReader io.Reader, writer io.Writer, recreate bool, timeout int64, shouldWait bool) error {
original, err := c.BuildUnstructured(namespace, originalReader)
if err != nil {
return fmt.Errorf("failed decoding reader into objects: %s", err)
@ -269,7 +269,7 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader
}
}
if shouldWait {
return c.waitForResources(time.Duration(timeout)*time.Second, target)
return c.waitForResources(time.Duration(timeout)*time.Second, target, writer)
}
return nil
}

@ -17,6 +17,8 @@ limitations under the License.
package kube // import "k8s.io/helm/pkg/kube"
import (
"fmt"
"io"
"log"
"time"
@ -25,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
@ -43,9 +46,8 @@ type deployment struct {
// waitForResources polls to get the current status of all pods, PVCs, and Services
// until all are ready or a timeout is reached
func (c *Client) waitForResources(timeout time.Duration, created Result) error {
func (c *Client) waitForResources(timeout time.Duration, created Result, writer io.Writer) error {
log.Printf("beginning wait for resources with timeout of %v", timeout)
cs, _ := c.ClientSet()
client := versionedClientsetForDeployment(cs)
return wait.Poll(2*time.Second, timeout, func() (bool, error) {
@ -132,6 +134,7 @@ func (c *Client) waitForResources(timeout time.Duration, created Result) error {
services = append(services, *svc)
}
}
writeEvents(client, writer, pods, services, pvc, deployments)
return podsReady(pods) && servicesReady(services) && volumesReady(pvc) && deploymentsReady(deployments), nil
})
}
@ -194,3 +197,48 @@ func versionedClientsetForDeployment(internalClient internalclientset.Interface)
ExtensionsV1beta1Client: extensionsclient.New(internalClient.Extensions().RESTClient()),
}
}
func writeEvents(
client clientset.Interface,
writer io.Writer,
pods []v1.Pod,
services []v1.Service,
pvcs []v1.PersistentVolumeClaim,
deployments []deployment,
) {
// Aggregate all events from k8s on each poll:
// Pods:
for _, pod := range pods {
events, _ := client.Core().Events(pod.Namespace).Search(api.Scheme, runtime.Object(&pod))
write(writer, events, "Pod")
}
// Services:
for _, service := range services {
events, _ := client.Core().Events(service.Namespace).Search(api.Scheme, runtime.Object(&service))
write(writer, events, "Service")
}
// PVCs:
for _, pvc := range pvcs {
events, _ := client.Core().Events(pvc.Namespace).Search(api.Scheme, runtime.Object(&pvc))
write(writer, events, "PVC")
}
// Deployments:
for _, deployment := range deployments {
events, _ := client.Core().Events(deployment.deployment.Namespace).Search(api.Scheme, runtime.Object(deployment.deployment))
write(writer, events, "Deployment")
}
}
func write(writer io.Writer, events *v1.EventList, resource string) {
for _, event := range events.Items {
fmt.Fprintf(
writer,
"[%s] [%s] [Type]: %s [Reason]: %s [Message]: %s\n",
time.Now().Format("2006/01/02 15:04:05"),
resource,
event.Type,
event.Reason,
event.Message,
)
}
}

@ -39,7 +39,8 @@ type Environment struct {
func (env *Environment) createTestPod(test *test) error {
b := bytes.NewBufferString(test.manifest)
if err := env.KubeClient.Create(env.Namespace, b, env.Timeout, false); err != nil {
eventsLog := bytes.NewBuffer(nil)
if err := env.KubeClient.Create(env.Namespace, b, eventsLog, env.Timeout, false); err != nil {
log.Printf(err.Error())
test.result.Info = err.Error()
test.result.Status = release.TestRun_FAILURE

@ -114,7 +114,7 @@ type KubeClient interface {
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Create(namespace string, reader io.Reader, timeout int64, shouldWait bool) error
Create(namespace string, reader io.Reader, writer io.Writer, timeout int64, shouldWait bool) error
// Get gets one or more resources. Returned string hsa the format like kubectl
// provides with the column headers separating the resource types.
@ -147,7 +147,7 @@ type KubeClient interface {
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Update(namespace string, originalReader, modifiedReader io.Reader, recreate bool, timeout int64, shouldWait bool) error
Update(namespace string, originalReader, modifiedReader io.Reader, writer io.Writer, recreate bool, timeout int64, shouldWait bool) error
Build(namespace string, reader io.Reader) (kube.Result, error)
BuildUnstructured(namespace string, reader io.Reader) (kube.Result, error)

@ -20,6 +20,7 @@ import (
"bytes"
"errors"
"fmt"
"io"
"log"
"path"
"regexp"
@ -324,7 +325,8 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R
}
}
if err := s.performKubeUpdate(originalRelease, updatedRelease, req.Recreate, req.Timeout, req.Wait); err != nil {
eventsLog := bytes.NewBuffer(nil)
if err := s.performKubeUpdate(originalRelease, updatedRelease, eventsLog, req.Recreate, req.Timeout, req.Wait); err != nil {
msg := fmt.Sprintf("Upgrade %q failed: %s", updatedRelease.Name, err)
log.Printf("warning: %s", msg)
originalRelease.Info.Status.Code = release.Status_SUPERSEDED
@ -332,7 +334,12 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R
updatedRelease.Info.Description = msg
s.recordRelease(originalRelease, true)
s.recordRelease(updatedRelease, false)
return res, err
return res, fmt.Errorf(
"\nTiller log:\n%s\nupdate %s failed: %s",
eventsLog,
originalRelease.Name,
err,
)
}
// post-upgrade hooks
@ -511,7 +518,8 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R
}
}
if err := s.performKubeUpdate(currentRelease, targetRelease, req.Recreate, req.Timeout, req.Wait); err != nil {
eventsLog := bytes.NewBuffer(nil)
if err := s.performKubeUpdate(currentRelease, targetRelease, eventsLog, req.Recreate, req.Timeout, req.Wait); err != nil {
msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err)
log.Printf("warning: %s", msg)
currentRelease.Info.Status.Code = release.Status_SUPERSEDED
@ -519,7 +527,12 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R
targetRelease.Info.Description = msg
s.recordRelease(currentRelease, true)
s.recordRelease(targetRelease, false)
return res, err
return res, fmt.Errorf(
"\nTiller log:\n%s\nrollback %s failed: %s",
eventsLog,
currentRelease.Name,
err,
)
}
// post-rollback hooks
@ -537,11 +550,11 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R
return res, nil
}
func (s *ReleaseServer) performKubeUpdate(currentRelease, targetRelease *release.Release, recreate bool, timeout int64, shouldWait bool) error {
func (s *ReleaseServer) performKubeUpdate(currentRelease, targetRelease *release.Release, writer io.Writer, recreate bool, timeout int64, shouldWait bool) error {
kubeCli := s.env.KubeClient
current := bytes.NewBufferString(currentRelease.Manifest)
target := bytes.NewBufferString(targetRelease.Manifest)
return kubeCli.Update(targetRelease.Namespace, current, target, recreate, timeout, shouldWait)
return kubeCli.Update(targetRelease.Namespace, current, target, writer, recreate, timeout, shouldWait)
}
// prepareRollback finds the previous release and prepares a new release object with
@ -876,6 +889,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
}
}
eventsLog := bytes.NewBuffer(nil)
switch h, err := s.env.Releases.History(req.Name); {
// if this is a replace operation, append to the release history
case req.ReuseName && err == nil && len(h) >= 1:
@ -893,7 +907,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
// so as to append to the old release's history
r.Version = old.Version + 1
if err := s.performKubeUpdate(old, r, false, req.Timeout, req.Wait); err != nil {
if err := s.performKubeUpdate(old, r, eventsLog, false, req.Timeout, req.Wait); err != nil {
msg := fmt.Sprintf("Release replace %q failed: %s", r.Name, err)
log.Printf("warning: %s", msg)
old.Info.Status.Code = release.Status_SUPERSEDED
@ -901,20 +915,30 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
r.Info.Description = msg
s.recordRelease(old, true)
s.recordRelease(r, false)
return res, err
return res, fmt.Errorf(
"\nTiller log:\n%s\nrelease %s failed: %s",
eventsLog,
r.Name,
err,
)
}
default:
// nothing to replace, create as normal
// regular manifests
b := bytes.NewBufferString(r.Manifest)
if err := s.env.KubeClient.Create(r.Namespace, b, req.Timeout, req.Wait); err != nil {
if err := s.env.KubeClient.Create(r.Namespace, b, eventsLog, req.Timeout, req.Wait); err != nil {
msg := fmt.Sprintf("Release %q failed: %s", r.Name, err)
log.Printf("warning: %s", msg)
r.Info.Status.Code = release.Status_FAILED
r.Info.Description = msg
s.recordRelease(r, false)
return res, fmt.Errorf("release %s failed: %s", r.Name, err)
return res, fmt.Errorf(
"\nTiller log:\n%s\nrelease %s failed: %s",
eventsLog,
r.Name,
err,
) // add resourse statuses here
}
}
@ -966,7 +990,7 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin
for _, h := range executingHooks {
b := bytes.NewBufferString(h.Manifest)
if err := kubeCli.Create(namespace, b, timeout, false); err != nil {
if err := kubeCli.Create(namespace, b, nil, timeout, false); err != nil {
log.Printf("warning: Release %q %s %s failed: %s", name, hook, h.Path, err)
return err
}

Loading…
Cancel
Save