Merge pull request #142 from bmelville/refs

Resourcifier now processes resources in dependency order based on refs.
pull/148/head
vaikas-google 9 years ago
commit 376d33992a

@ -7,6 +7,9 @@ resources:
external_service: true
replicas: 3
image: gcr.io/google_containers/example-guestbook-php-redis:v3
env:
- name: redis-master
value: $(ref.redis-master.name)
- name: redis
type: github.com/kubernetes/application-dm-templates/storage/redis:v1
properties: null

@ -206,6 +206,11 @@ func (m *manager) DeleteDeployment(name string, forget bool) (*common.Deployment
if latest != nil {
log.Printf("Deleting resources from the latest manifest")
// Clear previous state.
for _, r := range latest.ExpandedConfig.Resources {
r.State = nil
}
if _, err := m.deployer.DeleteConfiguration(latest.ExpandedConfig); err != nil {
log.Printf("Failed to delete resources from the latest manifest: %v", err)
return nil, err

@ -18,10 +18,11 @@ import (
"fmt"
"log"
"os/exec"
"regexp"
"strings"
"github.com/kubernetes/deployment-manager/common"
"github.com/ghodss/yaml"
"github.com/kubernetes/deployment-manager/common"
)
// TODO(jackgr): Define an interface and a struct type for Configurator and move initialization to the caller.
@ -73,6 +74,11 @@ func (e *Error) appendError(err error) error {
return err
}
// resource name -> set of dependencies.
type DependencyMap map[string]map[string]bool
var refRe = regexp.MustCompile("\\$\\(ref\\.([^\\.]+)\\.([^\\)]+)\\)")
// Configure passes each resource in the configuration to kubectl and performs the appropriate
// action on it (create/delete/replace) and updates the State of the resource with the resulting
// status. In case of errors with a resource, Resource.State.Errors is set.
@ -80,76 +86,176 @@ func (e *Error) appendError(err error) error {
func (a *Configurator) Configure(c *common.Configuration, o operation) (string, error) {
errors := &Error{}
var output []string
for i, resource := range c.Resources {
args := []string{o.String()}
if o == GetOperation {
args = append(args, "-o", "yaml")
if resource.Type != "" {
args = append(args, resource.Type)
if resource.Name != "" {
args = append(args, resource.Name)
}
}
deps, err := getDependencies(c, o)
if err != nil {
e := fmt.Errorf("Error generating dependencies: %s", err.Error())
return "", e
}
for {
resources := getUnprocessedResources(c)
// No more resources to process.
if len(resources) == 0 {
break
}
var y []byte
if len(resource.Properties) > 0 {
var err error
y, err = yaml.Marshal(resource.Properties)
if err != nil {
e := fmt.Errorf("yaml marshal failed for resource: %v: %v", resource.Name, err)
log.Println(errors.appendError(e))
c.Resources[i].State = &common.ResourceState{
Status: common.Aborted,
Errors: []string{e.Error()},
}
for _, r := range resources {
// Resource still has dependencies.
if len(deps[r.Name]) != 0 {
continue
}
out, err := a.configureResource(r, o)
if err != nil {
log.Println(errors.appendError(err))
abortDependants(c, deps, r.Name)
// Resource states have changed, need to recalculate unprocessed
// resources.
break
}
output = append(output, out)
removeDependencies(deps, r.Name)
}
}
return strings.Join(output, "\n"), nil
}
func (a *Configurator) configureResource(resource *common.Resource, o operation) (string, error) {
args := []string{o.String()}
if o == GetOperation {
args = append(args, "-o", "yaml")
if resource.Type != "" {
args = append(args, resource.Type)
if resource.Name != "" {
args = append(args, resource.Name)
}
}
}
if len(y) > 0 {
args = append(args, "-f", "-")
var y []byte
if len(resource.Properties) > 0 {
var err error
y, err = yaml.Marshal(resource.Properties)
if err != nil {
e := fmt.Errorf("yaml marshal failed for resource: %v: %v", resource.Name, err)
resource.State = failState(e)
return "", e
}
}
args = append(args, a.Arguments...)
cmd := exec.Command(a.KubePath, args...)
cmd.Stdin = bytes.NewBuffer(y)
if len(y) > 0 {
args = append(args, "-f", "-")
}
// Combine stdout and stderr into a single dynamically resized buffer
combined := &bytes.Buffer{}
cmd.Stdout = combined
cmd.Stderr = combined
args = append(args, a.Arguments...)
cmd := exec.Command(a.KubePath, args...)
cmd.Stdin = bytes.NewBuffer(y)
if err := cmd.Start(); err != nil {
e := fmt.Errorf("cannot start kubetcl for resource: %v: %v", resource.Name, err)
c.Resources[i].State = &common.ResourceState{
Status: common.Failed,
Errors: []string{e.Error()},
}
log.Println(errors.appendError(e))
continue
// Combine stdout and stderr into a single dynamically resized buffer
combined := &bytes.Buffer{}
cmd.Stdout = combined
cmd.Stderr = combined
if err := cmd.Start(); err != nil {
e := fmt.Errorf("cannot start kubetcl for resource: %v: %v", resource.Name, err)
resource.State = failState(e)
return "", e
}
if err := cmd.Wait(); err != nil {
// Treat delete special. If a delete is issued and a resource is not found, treat it as
// success.
if o == DeleteOperation && strings.HasSuffix(strings.TrimSpace(combined.String()), "not found") {
log.Println(resource.Name + " not found, treating as success for delete")
} else {
e := fmt.Errorf("kubetcl failed for resource: %v: %v: %v", resource.Name, err, combined.String())
resource.State = failState(e)
return "", e
}
}
log.Printf("kubectl succeeded for resource: %v: SysTime: %v UserTime: %v\n%v",
resource.Name, cmd.ProcessState.SystemTime(), cmd.ProcessState.UserTime(), combined.String())
resource.State = &common.ResourceState{Status: common.Created}
return combined.String(), nil
}
func failState(e error) *common.ResourceState {
return &common.ResourceState{
Status: common.Failed,
Errors: []string{e.Error()},
}
}
func getUnprocessedResources(c *common.Configuration) []*common.Resource {
var resources []*common.Resource
for _, r := range c.Resources {
if r.State == nil {
resources = append(resources, r)
}
}
return resources
}
// getDependencies iterates over resources and returns a map of resource name to
// the set of dependencies that resource has.
//
// Dependencies are reversed for delete operation.
func getDependencies(c *common.Configuration, o operation) (DependencyMap, error) {
deps := DependencyMap{}
// Prepopulate map. This will be used later to validate referenced resources
// actually exist.
for _, r := range c.Resources {
deps[r.Name] = make(map[string]bool)
}
for _, r := range c.Resources {
props, err := yaml.Marshal(r.Properties)
if err != nil {
return nil, fmt.Errorf("Failed to deserialize resource properties for resource %s: %v", r.Name, r.Properties)
}
if err := cmd.Wait(); err != nil {
// Treat delete special. If a delete is issued and a resource is not found, treat it as
// success.
if (o == DeleteOperation && strings.HasSuffix(strings.TrimSpace(combined.String()), "not found")) {
log.Println(resource.Name + " not found, treating as success for delete")
refs := refRe.FindAllStringSubmatch(string(props), -1)
for _, ref := range refs {
// Validate referenced resource exists in config.
if _, ok := deps[ref[1]]; !ok {
return nil, fmt.Errorf("Invalid resource name in reference: %s", ref[1])
}
// Delete dependencies should be reverse of create.
if o == DeleteOperation {
deps[ref[1]][r.Name] = true
} else {
e := fmt.Errorf("kubetcl failed for resource: %v: %v: %v", resource.Name, err, combined.String())
c.Resources[i].State = &common.ResourceState{
Status: common.Failed,
Errors: []string{e.Error()},
}
log.Println(errors.appendError(e))
continue
deps[r.Name][ref[1]] = true
}
}
}
output = append(output, combined.String())
c.Resources[i].State = &common.ResourceState{Status: common.Created}
log.Printf("kubectl succeeded for resource: %v: SysTime: %v UserTime: %v\n%v",
resource.Name, cmd.ProcessState.SystemTime(), cmd.ProcessState.UserTime(), combined.String())
return deps, nil
}
// updateDependants removes the dependency dep from the set of dependencies for
// all resource.
func removeDependencies(deps DependencyMap, dep string) {
for _, d := range deps {
delete(d, dep)
}
}
// abortDependants changes the state of all of the dependants of a resource to
// Aborted.
func abortDependants(c *common.Configuration, deps DependencyMap, dep string) {
for _, r := range c.Resources {
if _, ok := deps[r.Name][dep]; ok {
r.State = &common.ResourceState{Status: common.Aborted}
}
}
return strings.Join(output, "\n"), nil
}

Loading…
Cancel
Save