Resourcifier now processes resources in dependency order based on refs.

For now, references act only as dependencies and do not fill in the
values to which they refer. That will come in a follow-up.

Resources are still processed in a single iteration with no
parallelization.
pull/142/head
Brendan Melville 10 years ago
parent d3d57fdaee
commit f1fe4470d3

@ -206,6 +206,11 @@ func (m *manager) DeleteDeployment(name string, forget bool) (*common.Deployment
if latest != nil {
log.Printf("Deleting resources from the latest manifest")
// Clear previous state.
for _, r := range latest.ExpandedConfig.Resources {
r.State = nil
}
if _, err := m.deployer.DeleteConfiguration(latest.ExpandedConfig); err != nil {
log.Printf("Failed to delete resources from the latest manifest: %v", err)
return nil, err

@ -18,10 +18,11 @@ import (
"fmt"
"log"
"os/exec"
"regexp"
"strings"
"github.com/kubernetes/deployment-manager/common"
"github.com/ghodss/yaml"
"github.com/kubernetes/deployment-manager/common"
)
// TODO(jackgr): Define an interface and a struct type for Configurator and move initialization to the caller.
@ -73,6 +74,11 @@ func (e *Error) appendError(err error) error {
return err
}
// resource name -> set of dependencies.
type DependencyMap map[string]map[string]bool
var refRe = regexp.MustCompile("\\$\\(ref\\.([^\\.]+)\\.([^\\)]+)\\)")
// Configure passes each resource in the configuration to kubectl and performs the appropriate
// action on it (create/delete/replace) and updates the State of the resource with the resulting
// status. In case of errors with a resource, Resource.State.Errors is set.
@ -80,76 +86,176 @@ func (e *Error) appendError(err error) error {
func (a *Configurator) Configure(c *common.Configuration, o operation) (string, error) {
errors := &Error{}
var output []string
for i, resource := range c.Resources {
args := []string{o.String()}
if o == GetOperation {
args = append(args, "-o", "yaml")
if resource.Type != "" {
args = append(args, resource.Type)
if resource.Name != "" {
args = append(args, resource.Name)
}
}
deps, err := getDependencies(c, o)
if err != nil {
e := fmt.Errorf("Error generating dependencies: %s", err.Error())
return "", e
}
for {
resources := getUnprocessedResources(c)
// No more resources to process.
if len(resources) == 0 {
break
}
var y []byte
if len(resource.Properties) > 0 {
var err error
y, err = yaml.Marshal(resource.Properties)
if err != nil {
e := fmt.Errorf("yaml marshal failed for resource: %v: %v", resource.Name, err)
log.Println(errors.appendError(e))
c.Resources[i].State = &common.ResourceState{
Status: common.Aborted,
Errors: []string{e.Error()},
}
for _, r := range resources {
// Resource still has dependencies.
if len(deps[r.Name]) != 0 {
continue
}
out, err := a.configureResource(r, o)
if err != nil {
log.Println(errors.appendError(err))
abortDependants(c, deps, r.Name)
// Resource states have changed, need to recalculate unprocessed
// resources.
break
}
output = append(output, out)
removeDependencies(deps, r.Name)
}
}
return strings.Join(output, "\n"), nil
}
func (a *Configurator) configureResource(resource *common.Resource, o operation) (string, error) {
args := []string{o.String()}
if o == GetOperation {
args = append(args, "-o", "yaml")
if resource.Type != "" {
args = append(args, resource.Type)
if resource.Name != "" {
args = append(args, resource.Name)
}
}
}
if len(y) > 0 {
args = append(args, "-f", "-")
var y []byte
if len(resource.Properties) > 0 {
var err error
y, err = yaml.Marshal(resource.Properties)
if err != nil {
e := fmt.Errorf("yaml marshal failed for resource: %v: %v", resource.Name, err)
resource.State = failState(e)
return "", e
}
}
args = append(args, a.Arguments...)
cmd := exec.Command(a.KubePath, args...)
cmd.Stdin = bytes.NewBuffer(y)
if len(y) > 0 {
args = append(args, "-f", "-")
}
// Combine stdout and stderr into a single dynamically resized buffer
combined := &bytes.Buffer{}
cmd.Stdout = combined
cmd.Stderr = combined
args = append(args, a.Arguments...)
cmd := exec.Command(a.KubePath, args...)
cmd.Stdin = bytes.NewBuffer(y)
if err := cmd.Start(); err != nil {
e := fmt.Errorf("cannot start kubetcl for resource: %v: %v", resource.Name, err)
c.Resources[i].State = &common.ResourceState{
Status: common.Failed,
Errors: []string{e.Error()},
}
log.Println(errors.appendError(e))
continue
// Combine stdout and stderr into a single dynamically resized buffer
combined := &bytes.Buffer{}
cmd.Stdout = combined
cmd.Stderr = combined
if err := cmd.Start(); err != nil {
e := fmt.Errorf("cannot start kubetcl for resource: %v: %v", resource.Name, err)
resource.State = failState(e)
return "", e
}
if err := cmd.Wait(); err != nil {
// Treat delete special. If a delete is issued and a resource is not found, treat it as
// success.
if o == DeleteOperation && strings.HasSuffix(strings.TrimSpace(combined.String()), "not found") {
log.Println(resource.Name + " not found, treating as success for delete")
} else {
e := fmt.Errorf("kubetcl failed for resource: %v: %v: %v", resource.Name, err, combined.String())
resource.State = failState(e)
return "", e
}
}
log.Printf("kubectl succeeded for resource: %v: SysTime: %v UserTime: %v\n%v",
resource.Name, cmd.ProcessState.SystemTime(), cmd.ProcessState.UserTime(), combined.String())
resource.State = &common.ResourceState{Status: common.Created}
return combined.String(), nil
}
func failState(e error) *common.ResourceState {
return &common.ResourceState{
Status: common.Failed,
Errors: []string{e.Error()},
}
}
func getUnprocessedResources(c *common.Configuration) []*common.Resource {
var resources []*common.Resource
for _, r := range c.Resources {
if r.State == nil {
resources = append(resources, r)
}
}
return resources
}
// getDependencies iterates over resources and returns a map of resource name to
// the set of dependencies that resource has.
//
// Dependencies are reversed for delete operation.
func getDependencies(c *common.Configuration, o operation) (DependencyMap, error) {
deps := DependencyMap{}
// Prepopulate map. This will be used later to validate referenced resources
// actually exist.
for _, r := range c.Resources {
deps[r.Name] = make(map[string]bool)
}
for _, r := range c.Resources {
props, err := yaml.Marshal(r.Properties)
if err != nil {
return nil, fmt.Errorf("Failed to deserialize resource properties for resource %s: %v", r.Name, r.Properties)
}
if err := cmd.Wait(); err != nil {
// Treat delete special. If a delete is issued and a resource is not found, treat it as
// success.
if (o == DeleteOperation && strings.HasSuffix(strings.TrimSpace(combined.String()), "not found")) {
log.Println(resource.Name + " not found, treating as success for delete")
refs := refRe.FindAllStringSubmatch(string(props), -1)
for _, ref := range refs {
// Validate referenced resource exists in config.
if _, ok := deps[ref[1]]; !ok {
return nil, fmt.Errorf("Invalid resource name in reference: %s", ref[1])
}
// Delete dependencies should be reverse of create.
if o == DeleteOperation {
deps[ref[1]][r.Name] = true
} else {
e := fmt.Errorf("kubetcl failed for resource: %v: %v: %v", resource.Name, err, combined.String())
c.Resources[i].State = &common.ResourceState{
Status: common.Failed,
Errors: []string{e.Error()},
}
log.Println(errors.appendError(e))
continue
deps[r.Name][ref[1]] = true
}
}
}
output = append(output, combined.String())
c.Resources[i].State = &common.ResourceState{Status: common.Created}
log.Printf("kubectl succeeded for resource: %v: SysTime: %v UserTime: %v\n%v",
resource.Name, cmd.ProcessState.SystemTime(), cmd.ProcessState.UserTime(), combined.String())
return deps, nil
}
// updateDependants removes the dependency dep from the set of dependencies for
// all resource.
func removeDependencies(deps DependencyMap, dep string) {
for _, d := range deps {
delete(d, dep)
}
}
// abortDependants changes the state of all of the dependants of a resource to
// Aborted.
func abortDependants(c *common.Configuration, deps DependencyMap, dep string) {
for _, r := range c.Resources {
if _, ok := deps[r.Name][dep]; ok {
r.State = &common.ResourceState{Status: common.Aborted}
}
}
return strings.Join(output, "\n"), nil
}

Loading…
Cancel
Save