Have configurator use the kubernetes_kubectl object instead of doing the kubectl commands directly

pull/193/head
vaikas-google 9 years ago
parent 2c7aad5f63
commit 2ac1a04987

@ -27,11 +27,8 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
"github.com/ghodss/yaml"
"github.com/gorilla/mux"
@ -183,57 +180,20 @@ func putConfigurationHandlerFunc(w http.ResponseWriter, r *http.Request) {
}
func getConfigurator() *configurator.Configurator {
if *kubePath == "" {
log.Fatalf("kubectl path cannot be empty")
}
// If a configuration file is specified, then it will provide the server
// address and credentials. If not, then we check for the server address
// and credentials as individual flags.
var args []string
if *kubeConfig != "" {
*kubeConfig = os.ExpandEnv(*kubeConfig)
args = append(args, fmt.Sprintf("--kubeconfig=%s", *kubeConfig))
} else {
if *kubeServer != "" {
args = append(args, fmt.Sprintf("--server=https://%s", *kubeServer))
} else if *kubeService != "" {
addrs, err := net.LookupHost(*kubeService)
if err != nil || len(addrs) < 1 {
log.Fatalf("cannot resolve DNS name: %v", *kubeService)
}
args = append(args, fmt.Sprintf("--server=https://%s", addrs[0]))
}
if *kubeInsecure {
args = append(args, fmt.Sprintf("--insecure-skip-tls-verify=%s", *kubeInsecure))
} else {
if *kubeCertAuth != "" {
args = append(args, fmt.Sprintf("--certificate-authority=%s", *kubeCertAuth))
if *kubeClientCert == "" {
args = append(args, fmt.Sprintf("--client-certificate=%s", *kubeClientCert))
}
if *kubeClientKey == "" {
args = append(args, fmt.Sprintf("--client-key=%s", *kubeClientKey))
}
}
}
if *kubeToken != "" {
args = append(args, fmt.Sprintf("--token=%s", *kubeToken))
} else {
if *kubeUsername != "" {
args = append(args, fmt.Sprintf("--username=%s", *kubeUsername))
}
if *kubePassword != "" {
args = append(args, fmt.Sprintf("--password=%s", *kubePassword))
}
}
}
return configurator.NewConfigurator(*kubePath, args)
kubernetesConfig := &util.KubernetesConfig{
KubePath: *kubePath,
KubeService: *kubeService,
KubeServer: *kubeServer,
KubeInsecure: *kubeInsecure,
KubeConfig: *kubeConfig,
KubeCertAuth: *kubeCertAuth,
KubeClientCert: *kubeClientCert,
KubeClientKey: *kubeClientKey,
KubeToken: *kubeToken,
KubeUsername: *kubeUsername,
KubePassword: *kubePassword,
}
return configurator.NewConfigurator(util.NewKubernetesKubectl(kubernetesConfig))
}
func getPathVariable(w http.ResponseWriter, r *http.Request, variable, handler string) (string, error) {

@ -17,26 +17,22 @@ limitations under the License.
package configurator
import (
"bytes"
"fmt"
"log"
"os/exec"
"regexp"
"strings"
"github.com/ghodss/yaml"
"github.com/kubernetes/deployment-manager/common"
"github.com/kubernetes/deployment-manager/util"
)
// TODO(jackgr): Define an interface and a struct type for Configurator and move initialization to the caller.
type Configurator struct {
KubePath string
Arguments []string
k util.Kubernetes
}
func NewConfigurator(kubectlPath string, arguments []string) *Configurator {
return &Configurator{kubectlPath, arguments}
func NewConfigurator(kubernetes util.Kubernetes) *Configurator {
return &Configurator{kubernetes}
}
// operation is an enumeration type for kubectl operations.
@ -50,10 +46,6 @@ const (
ReplaceOperation operation = "replace"
)
func (o operation) String() string {
return string(o)
}
// TODO(jackgr): Configure resources without dependencies in parallel.
// Error is an error type that captures errors from the multiple calls to kubectl
@ -128,65 +120,64 @@ func (a *Configurator) Configure(c *common.Configuration, o operation) (string,
return strings.Join(output, "\n"), nil
}
func (a *Configurator) configureResource(resource *common.Resource, o operation) (string, error) {
args := []string{o.String()}
if o == GetOperation {
args = append(args, "-o", "yaml")
if resource.Type != "" {
args = append(args, resource.Type)
if resource.Name != "" {
args = append(args, resource.Name)
}
}
}
var y []byte
func marshalResource(resource *common.Resource) (string, error) {
if len(resource.Properties) > 0 {
var err error
y, err = yaml.Marshal(resource.Properties)
y, err := yaml.Marshal(resource.Properties)
if err != nil {
e := fmt.Errorf("yaml marshal failed for resource: %v: %v", resource.Name, err)
resource.State = failState(e)
return "", e
return "", fmt.Errorf("yaml marshal failed for resource: %v: %v", resource.Name, err)
}
return string(y), nil
}
return "", nil
}
if len(y) > 0 {
args = append(args, "-f", "-")
}
args = append(args, a.Arguments...)
cmd := exec.Command(a.KubePath, args...)
cmd.Stdin = bytes.NewBuffer(y)
// Combine stdout and stderr into a single dynamically resized buffer
combined := &bytes.Buffer{}
cmd.Stdout = combined
cmd.Stderr = combined
func (a *Configurator) configureResource(resource *common.Resource, o operation) (string, error) {
ret := ""
var err error
if err := cmd.Start(); err != nil {
e := fmt.Errorf("cannot start kubectl for resource: %v: %v", resource.Name, err)
resource.State = failState(e)
return "", e
switch o {
case CreateOperation:
obj, err := marshalResource(resource)
if err != nil {
resource.State = failState(err)
return "", err
}
if err := cmd.Wait(); err != nil {
// Treat delete special. If a delete is issued and a resource is not found, treat it as
// success.
if o == DeleteOperation && strings.HasSuffix(strings.TrimSpace(combined.String()), "not found") {
log.Println(resource.Name + " not found, treating as success for delete")
ret, err = a.k.Create(obj)
if err != nil {
resource.State = failState(err)
} else {
e := fmt.Errorf("kubectl failed for resource: %v: %v: %v", resource.Name, err, combined.String())
resource.State = failState(e)
return combined.String(), e
resource.State = &common.ResourceState{Status: common.Created}
}
return ret, nil
case ReplaceOperation:
obj, err := marshalResource(resource)
if err != nil {
resource.State = failState(err)
return "", err
}
log.Printf("kubectl succeeded for resource: %v: SysTime: %v UserTime: %v\n%v",
resource.Name, cmd.ProcessState.SystemTime(), cmd.ProcessState.UserTime(), combined.String())
ret, err = a.k.Replace(obj)
if err != nil {
resource.State = failState(err)
} else {
resource.State = &common.ResourceState{Status: common.Created}
}
return ret, nil
case GetOperation:
return a.k.Get(resource.Name, resource.Type)
case DeleteOperation:
ret, err = a.k.Delete(resource.Name, resource.Type)
// Treat deleting a non-existent resource as success.
if err != nil {
if strings.HasSuffix(strings.TrimSpace(ret), "not found") {
resource.State = &common.ResourceState{Status: common.Created}
return combined.String(), nil
return ret, nil
}
resource.State = failState(err)
}
return ret, err
default:
return "", fmt.Errorf("invalid operation %s for resource: %v: %v", o, resource.Name, err)
}
}
func failState(e error) *common.ResourceState {

@ -38,4 +38,5 @@ type Kubernetes interface {
Get(name string, resourceType string) (string, error)
Create(resource string) (string, error)
Delete(name string, resourceType string) (string, error)
Replace(resource string) (string, error)
}

@ -107,6 +107,11 @@ func (k *KubernetesKubectl) Delete(name string, resourceType string) (string, er
return k.execute(args, "")
}
func (k *KubernetesKubectl) Replace(resource string) (string, error) {
args := []string{"replace"}
return k.execute(args, resource)
}
func (k *KubernetesKubectl) execute(args []string, input string) (string, error) {
if len(input) > 0 {
args = append(args, "-f", "-")
@ -123,13 +128,15 @@ func (k *KubernetesKubectl) execute(args []string, input string) (string, error)
cmd.Stderr = combined
if err := cmd.Start(); err != nil {
log.Printf("cannot start kubectl %#v", err)
log.Printf("cannot start kubectl %s %#v", combined.String(), err)
return combined.String(), err
}
if err := cmd.Wait(); err != nil {
log.Printf("kubectl failed: %#v", err)
log.Printf("kubectl failed: %s %#v", combined.String(), err)
return combined.String(), err
}
log.Printf("kubectl succeeded: SysTime: %v UserTime: %v\n%v",
cmd.ProcessState.SystemTime(), cmd.ProcessState.UserTime(), combined.String())
return combined.String(), nil
}

Loading…
Cancel
Save