Merge pull request #113 from vaikas-google/master

Add resource state / errors to manifest. First step to #79, #80
pull/119/head
Brendan Melville 9 years ago
commit 97e4786386

@ -42,6 +42,7 @@ var (
template_registry = flag.String("registry", "kubernetes/deployment-manager/templates", "Github based template registry (owner/repo[/path])")
service = flag.String("service", "http://localhost:8001/api/v1/proxy/namespaces/dm/services/manager-service:manager", "URL for deployment manager")
binary = flag.String("binary", "../expandybird/expansion/expansion.py", "Path to template expansion binary")
timeout = flag.Int("timeout", 10, "Time in seconds to wait for response")
)
var commands = []string{
@ -180,7 +181,12 @@ func callService(path, method, action string, reader io.ReadCloser) {
func callHttp(path, method, action string, reader io.ReadCloser) string {
request, err := http.NewRequest(method, path, reader)
request.Header.Add("Content-Type", "application/json")
response, err := http.DefaultClient.Do(request)
client := http.Client{
Timeout: time.Duration(time.Duration(*timeout) * time.Second),
}
response, err := client.Do(request)
if err != nil {
log.Fatalf("cannot %s: %s\n", action, err)
}

@ -18,6 +18,7 @@ import (
"fmt"
"io"
"io/ioutil"
"time"
"log"
"net/http"
"net/url"
@ -29,18 +30,20 @@ import (
// Deployer abstracts interactions with the expander and deployer services.
type Deployer interface {
GetConfiguration(cached *Configuration) (*Configuration, error)
CreateConfiguration(configuration *Configuration) error
DeleteConfiguration(configuration *Configuration) error
PutConfiguration(configuration *Configuration) error
CreateConfiguration(configuration *Configuration) (*Configuration, error)
DeleteConfiguration(configuration *Configuration) (*Configuration, error)
PutConfiguration(configuration *Configuration) (*Configuration, error)
}
// NewDeployer returns a new initialized Deployer.
// TODO(vaikas): Add a flag for setting the timeout.
func NewDeployer(url string) Deployer {
return &deployer{url}
return &deployer{url, 15}
}
type deployer struct {
deployerURL string
timeout int
}
func (d *deployer) getBaseURL() string {
@ -83,34 +86,47 @@ func (d *deployer) GetConfiguration(cached *Configuration) (*Configuration, erro
return actual, nil
}
// CreateConfiguration deploys the set of resources described by a configuration.
func (d *deployer) CreateConfiguration(configuration *Configuration) error {
// CreateConfiguration deploys the set of resources described by a configuration and returns
// the Configuration with status for each resource filled in.
func (d *deployer) CreateConfiguration(configuration *Configuration) (*Configuration, error) {
return d.callServiceWithConfiguration("POST", "create", configuration)
}
// DeleteConfiguration deletes the set of resources described by a configuration.
func (d *deployer) DeleteConfiguration(configuration *Configuration) error {
func (d *deployer) DeleteConfiguration(configuration *Configuration) (*Configuration, error) {
return d.callServiceWithConfiguration("DELETE", "delete", configuration)
}
// PutConfiguration replaces the set of resources described by a configuration.
func (d *deployer) PutConfiguration(configuration *Configuration) error {
// PutConfiguration replaces the set of resources described by a configuration and returns
// the Configuration with status for each resource filled in.
func (d *deployer) PutConfiguration(configuration *Configuration) (*Configuration, error) {
return d.callServiceWithConfiguration("PUT", "replace", configuration)
}
func (d *deployer) callServiceWithConfiguration(method, operation string, configuration *Configuration) error {
func (d *deployer) callServiceWithConfiguration(method, operation string, configuration *Configuration) (*Configuration, error) {
callback := func(e error) error {
return fmt.Errorf("cannot %s configuration: %s", operation, e)
}
y, err := yaml.Marshal(configuration)
if err != nil {
return callback(err)
return nil, callback(err)
}
reader := ioutil.NopCloser(bytes.NewReader(y))
_, err = d.callService(method, d.getBaseURL(), reader, callback)
return err
resp, err := d.callService(method, d.getBaseURL(), reader, callback)
if err != nil {
return nil, err
}
result := &Configuration{}
if len(resp) != 0 {
if err := yaml.Unmarshal(resp, &result); err != nil {
return nil, fmt.Errorf("cannot unmarshal response: (%v)", err)
}
}
return result, nil
}
func (d *deployer) callService(method, url string, reader io.Reader, callback formatter) ([]byte, error) {
@ -123,7 +139,9 @@ func (d *deployer) callService(method, url string, reader io.Reader, callback fo
request.Header.Add("Content-Type", "application/json")
}
response, err := http.DefaultClient.Do(request)
timeout := time.Duration(time.Duration(d.timeout) * time.Second)
client := http.Client{Timeout: timeout}
response, err := client.Do(request)
if err != nil {
return nil, callback(err)
}

@ -165,7 +165,7 @@ func TestCreateConfiguration(t *testing.T) {
defer ts.Close()
deployer := NewDeployer(ts.URL)
err := deployer.CreateConfiguration(valid)
_, err := deployer.CreateConfiguration(valid)
if err != nil {
message := err.Error()
if !strings.Contains(message, dtc.Error) {
@ -200,7 +200,7 @@ func TestDeleteConfiguration(t *testing.T) {
defer ts.Close()
deployer := NewDeployer(ts.URL)
err := deployer.DeleteConfiguration(valid)
_, err := deployer.DeleteConfiguration(valid)
if err != nil {
message := err.Error()
if !strings.Contains(message, dtc.Error) {
@ -235,7 +235,7 @@ func TestPutConfiguration(t *testing.T) {
defer ts.Close()
deployer := NewDeployer(ts.URL)
err := deployer.PutConfiguration(valid)
_, err := deployer.PutConfiguration(valid)
if err != nil {
message := err.Error()
if !strings.Contains(message, dtc.Error) {

@ -104,21 +104,39 @@ func (m *manager) CreateDeployment(t *Template) (*Deployment, error) {
return nil, err
}
err = m.repository.AddManifest(t.Name, manifest)
if err != nil {
log.Printf("AddManifest failed %v", err)
actualConfig, createErr := m.deployer.CreateConfiguration(manifest.ExpandedConfig)
if createErr != nil {
// Deployment failed, mark as failed
log.Printf("CreateConfiguration failed: %v", err)
m.repository.SetDeploymentStatus(t.Name, FailedStatus)
return nil, err
// If we failed before being able to create some of the resources, then
// return the failure as such. Otherwise, we're going to add the manifest
// and hence resource specific errors down below.
if actualConfig == nil {
return nil, createErr
}
} else {
m.repository.SetDeploymentStatus(t.Name, DeployedStatus)
}
if err := m.deployer.CreateConfiguration(manifest.ExpandedConfig); err != nil {
// Deployment failed, mark as deleted
log.Printf("CreateConfiguration failed: %v", err)
// Update the manifest with the actual state of the reified resources
manifest.ExpandedConfig = actualConfig
aErr := m.repository.AddManifest(t.Name, manifest)
if aErr != nil {
log.Printf("AddManifest failed %v", aErr)
m.repository.SetDeploymentStatus(t.Name, FailedStatus)
return nil, err
// If there's an earlier error, return that instead since it contains
// more applicable error message. Adding manifest failure is more akin
// to a check fail (either deployment doesn't exist, or a manifest with the same
// name already exists).
// TODO(vaikas): Should we combine both errors and return a nicely formatted error for both?
if createErr != nil {
return nil, createErr
} else {
return nil, aErr
}
}
m.repository.SetDeploymentStatus(t.Name, DeployedStatus)
// Finally update the type instances for this deployment.
m.addTypeInstances(t.Name, manifest.Name, manifest.Layout)
return m.repository.GetValidDeployment(t.Name)
@ -183,7 +201,7 @@ func (m *manager) DeleteDeployment(name string, forget bool) (*Deployment, error
latest := getLatestManifest(d.Manifests)
if latest != nil {
log.Printf("Deleting resources from the latest manifest")
if err := m.deployer.DeleteConfiguration(latest.ExpandedConfig); err != nil {
if _, err := m.deployer.DeleteConfiguration(latest.ExpandedConfig); err != nil {
log.Printf("Failed to delete resources from the latest manifest: %v", err)
return nil, err
}
@ -222,13 +240,15 @@ func (m *manager) PutDeployment(name string, t *Template) (*Deployment, error) {
return nil, err
}
err = m.repository.AddManifest(t.Name, manifest)
actualConfig, err := m.deployer.PutConfiguration(manifest.ExpandedConfig)
if err != nil {
m.repository.SetDeploymentStatus(name, FailedStatus)
return nil, err
}
if err := m.deployer.PutConfiguration(manifest.ExpandedConfig); err != nil {
manifest.ExpandedConfig = actualConfig
err = m.repository.AddManifest(t.Name, manifest)
if err != nil {
m.repository.SetDeploymentStatus(name, FailedStatus)
return nil, err
}

@ -28,6 +28,20 @@ var layout = Layout{
var configuration = Configuration{
Resources: []*Resource{&Resource{Name: "test", Type: "test"}},
}
var resourcesWithSuccessState = Configuration{
Resources: []*Resource{&Resource{Name: "test", Type: "test", State: &ResourceState{Status: Created}}},
}
var resourcesWithFailureState = Configuration{
Resources: []*Resource{&Resource{
Name: "test",
Type: "test",
State: &ResourceState{
Status: Failed,
Errors:[]string{"test induced error",
},
},
}},
}
var expandedConfig = ExpandedTemplate{
Config: &configuration,
Layout: &layout,
@ -73,6 +87,7 @@ type deployerStub struct {
Created []*Configuration
FailDelete bool
Deleted []*Configuration
FailCreateResource bool
}
func (deployer *deployerStub) reset() {
@ -80,6 +95,7 @@ func (deployer *deployerStub) reset() {
deployer.Created = make([]*Configuration, 0)
deployer.FailDelete = false
deployer.Deleted = make([]*Configuration, 0)
deployer.FailCreateResource = false
}
func newDeployerStub() *deployerStub {
@ -91,25 +107,28 @@ func (deployer *deployerStub) GetConfiguration(cached *Configuration) (*Configur
return nil, nil
}
func (deployer *deployerStub) CreateConfiguration(configuration *Configuration) error {
func (deployer *deployerStub) CreateConfiguration(configuration *Configuration) (*Configuration, error) {
if deployer.FailCreate {
return errTest
return nil, errTest
}
if deployer.FailCreateResource {
return &resourcesWithFailureState, errTest
}
deployer.Created = append(deployer.Created, configuration)
return nil
return &resourcesWithSuccessState, nil
}
func (deployer *deployerStub) DeleteConfiguration(configuration *Configuration) error {
func (deployer *deployerStub) DeleteConfiguration(configuration *Configuration) (*Configuration, error) {
if deployer.FailDelete {
return errTest
return nil, errTest
}
deployer.Deleted = append(deployer.Deleted, configuration)
return nil
return nil, nil
}
func (deployer *deployerStub) PutConfiguration(configuration *Configuration) error {
return nil
func (deployer *deployerStub) PutConfiguration(configuration *Configuration) (*Configuration, error) {
return nil, nil
}
type repositoryStub struct {
@ -334,18 +353,49 @@ func TestCreateDeploymentCreationFailure(t *testing.T) {
t.Error("CreateDeployment failure did not mark deployment as failed")
}
if err != errTest || d != nil {
t.Errorf("Expected a different set of response values from invoking CreateDeployment."+
"Received: %s, %s. Expected: %s, %s.", d, err, "nil", errTest)
}
if testRepository.TypeInstancesCleared {
t.Error("Unexpected change to type instances during CreateDeployment failure.")
}
}
func TestCreateDeploymentCreationResourceFailure(t *testing.T) {
testRepository.reset()
testDeployer.reset()
testDeployer.FailCreateResource = true
d, err := testManager.CreateDeployment(&template)
if testRepository.Created[0] != template.Name {
t.Errorf("Repository CreateDeployment was called with %s but expected %s.",
testRepository.Created[0], template.Name)
}
if len(testRepository.Deleted) != 0 {
t.Errorf("DeleteDeployment was called with %s but not expected",
testRepository.Created[0])
}
if testRepository.DeploymentStatuses[0] != FailedStatus {
t.Error("CreateDeployment failure did not mark deployment as failed")
}
if !strings.HasPrefix(testRepository.ManifestAdd[template.Name].Name, "manifest-") {
t.Errorf("Repository AddManifest was called with %s but expected manifest name"+
"to begin with manifest-.", testRepository.ManifestAdd[template.Name].Name)
}
if err != errTest || d != nil {
// if err != errTest || d != nil {
if d == nil {
t.Errorf("Expected a different set of response values from invoking CreateDeployment."+
"Received: %s, %s. Expected: %s, %s.", d, err, "nil", errTest)
}
if testRepository.TypeInstancesCleared {
t.Error("Unexpected change to type instances during CreateDeployment failure.")
if !testRepository.TypeInstancesCleared {
t.Error("Repository did not clear type instances during creation")
}
}

@ -169,7 +169,7 @@ type Resource struct {
Name string `json:"name"`
Type string `json:"type"`
Properties map[string]interface{} `json:"properties,omitempty"`
State ResourceState `json:"state"`
State *ResourceState `json:"state,omitempty"`
}
// TypeInstance defines the metadata for an instantiation of a template type

@ -14,6 +14,7 @@ limitations under the License.
package main
import (
"github.com/kubernetes/deployment-manager/manager/manager"
"github.com/kubernetes/deployment-manager/resourcifier/configurator"
"github.com/kubernetes/deployment-manager/util"
@ -75,8 +76,8 @@ func listConfigurationsHandlerFunc(w http.ResponseWriter, r *http.Request) {
return
}
c := &configurator.Configuration{
[]configurator.Resource{
c := &manager.Configuration{
[]*manager.Resource{
{Type: rtype},
},
}
@ -104,8 +105,8 @@ func getConfigurationHandlerFunc(w http.ResponseWriter, r *http.Request) {
return
}
c := &configurator.Configuration{
[]configurator.Resource{
c := &manager.Configuration{
[]*manager.Resource{
{Name: rname, Type: rtype},
},
}
@ -252,7 +253,7 @@ func getPathVariable(w http.ResponseWriter, r *http.Request, variable, handler s
return unescaped, nil
}
func getConfiguration(w http.ResponseWriter, r *http.Request, handler string) *configurator.Configuration {
func getConfiguration(w http.ResponseWriter, r *http.Request, handler string) *manager.Configuration {
b := io.LimitReader(r.Body, *maxLength*1024)
y, err := ioutil.ReadAll(b)
if err != nil {
@ -275,7 +276,7 @@ func getConfiguration(w http.ResponseWriter, r *http.Request, handler string) *c
return nil
}
c := &configurator.Configuration{}
c := &manager.Configuration{}
if err := json.Unmarshal(j, c); err != nil {
e := errors.New(err.Error() + "\n" + string(j))
util.LogAndReturnError(handler, http.StatusBadRequest, e, w)

@ -20,25 +20,12 @@ import (
"os/exec"
"strings"
"github.com/kubernetes/deployment-manager/manager/manager"
"github.com/ghodss/yaml"
)
// TODO(jackgr): Define an interface and a struct type for Configurator and move initialization to the caller.
// Configuration describes a configuration deserialized from a YAML or JSON file.
type Configuration struct {
Resources []Resource `json:"resources"`
}
// Resource describes a resource in a deserialized configuration. A resource has
// a name, a type and a set of properties. The properties are passed directly to
// kubectl as the definition of the resource on the server.
type Resource struct {
Name string `json:"name"`
Type string `json:"type"`
Properties map[string]interface{} `json:"properties"`
}
type Configurator struct {
KubePath string
Arguments []string
@ -86,12 +73,14 @@ func (e *Error) appendError(err error) error {
return err
}
// Configure passes the configuration in the given deployment to kubectl
// Configure passes each resource in the configuration to kubectl and performs the appropriate
// action on it (create/delete/replace) and updates the State of the resource with the resulting
// status. In case of errors with a resource, Resource.State.Errors is set.
// and then updates the deployment with the completion status and completion time.
func (a *Configurator) Configure(c *Configuration, o operation) (string, error) {
func (a *Configurator) Configure(c *manager.Configuration, o operation) (string, error) {
errors := &Error{}
var output []string
for _, resource := range c.Resources {
for i, resource := range c.Resources {
args := []string{o.String()}
if o == GetOperation {
args = append(args, "-o", "yaml")
@ -110,6 +99,10 @@ func (a *Configurator) Configure(c *Configuration, o operation) (string, error)
if err != nil {
e := fmt.Errorf("yaml marshal failed for resource: %v: %v", resource.Name, err)
log.Println(errors.appendError(e))
c.Resources[i].State = &manager.ResourceState{
Status: manager.Aborted,
Errors: []string{e.Error()},
}
continue
}
}
@ -129,6 +122,10 @@ func (a *Configurator) Configure(c *Configuration, o operation) (string, error)
if err := cmd.Start(); err != nil {
e := fmt.Errorf("cannot start kubetcl for resource: %v: %v", resource.Name, err)
c.Resources[i].State = &manager.ResourceState{
Status: manager.Failed,
Errors: []string{e.Error()},
}
log.Println(errors.appendError(e))
continue
}
@ -140,19 +137,19 @@ func (a *Configurator) Configure(c *Configuration, o operation) (string, error)
log.Println(resource.Name + " not found, treating as success for delete")
} else {
e := fmt.Errorf("kubetcl failed for resource: %v: %v: %v", resource.Name, err, combined.String())
c.Resources[i].State = &manager.ResourceState{
Status: manager.Failed,
Errors: []string{e.Error()},
}
log.Println(errors.appendError(e))
continue
}
}
output = append(output, combined.String())
c.Resources[i].State = &manager.ResourceState{Status: manager.Created}
log.Printf("kubectl succeeded for resource: %v: SysTime: %v UserTime: %v\n%v",
resource.Name, cmd.ProcessState.SystemTime(), cmd.ProcessState.UserTime(), combined.String())
}
if len(errors.errors) > 0 {
return "", errors
}
return strings.Join(output, "\n"), nil
}

Loading…
Cancel
Save