Merge pull request #193 from vaikas-google/kubectl_merge

Have configurator use the kubernetes_kubectl object
pull/194/head
Brendan Melville 9 years ago
commit 1331edbc8f

@ -27,11 +27,8 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"log"
"net"
"net/http" "net/http"
"net/url" "net/url"
"os"
"github.com/ghodss/yaml" "github.com/ghodss/yaml"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -183,57 +180,20 @@ func putConfigurationHandlerFunc(w http.ResponseWriter, r *http.Request) {
} }
func getConfigurator() *configurator.Configurator { func getConfigurator() *configurator.Configurator {
if *kubePath == "" { kubernetesConfig := &util.KubernetesConfig{
log.Fatalf("kubectl path cannot be empty") KubePath: *kubePath,
} KubeService: *kubeService,
KubeServer: *kubeServer,
// If a configuration file is specified, then it will provide the server KubeInsecure: *kubeInsecure,
// address and credentials. If not, then we check for the server address KubeConfig: *kubeConfig,
// and credentials as individual flags. KubeCertAuth: *kubeCertAuth,
var args []string KubeClientCert: *kubeClientCert,
if *kubeConfig != "" { KubeClientKey: *kubeClientKey,
*kubeConfig = os.ExpandEnv(*kubeConfig) KubeToken: *kubeToken,
args = append(args, fmt.Sprintf("--kubeconfig=%s", *kubeConfig)) KubeUsername: *kubeUsername,
} else { KubePassword: *kubePassword,
if *kubeServer != "" { }
args = append(args, fmt.Sprintf("--server=https://%s", *kubeServer)) return configurator.NewConfigurator(util.NewKubernetesKubectl(kubernetesConfig))
} else if *kubeService != "" {
addrs, err := net.LookupHost(*kubeService)
if err != nil || len(addrs) < 1 {
log.Fatalf("cannot resolve DNS name: %v", *kubeService)
}
args = append(args, fmt.Sprintf("--server=https://%s", addrs[0]))
}
if *kubeInsecure {
args = append(args, fmt.Sprintf("--insecure-skip-tls-verify=%s", *kubeInsecure))
} else {
if *kubeCertAuth != "" {
args = append(args, fmt.Sprintf("--certificate-authority=%s", *kubeCertAuth))
if *kubeClientCert == "" {
args = append(args, fmt.Sprintf("--client-certificate=%s", *kubeClientCert))
}
if *kubeClientKey == "" {
args = append(args, fmt.Sprintf("--client-key=%s", *kubeClientKey))
}
}
}
if *kubeToken != "" {
args = append(args, fmt.Sprintf("--token=%s", *kubeToken))
} else {
if *kubeUsername != "" {
args = append(args, fmt.Sprintf("--username=%s", *kubeUsername))
}
if *kubePassword != "" {
args = append(args, fmt.Sprintf("--password=%s", *kubePassword))
}
}
}
return configurator.NewConfigurator(*kubePath, args)
} }
func getPathVariable(w http.ResponseWriter, r *http.Request, variable, handler string) (string, error) { func getPathVariable(w http.ResponseWriter, r *http.Request, variable, handler string) (string, error) {

@ -6,7 +6,7 @@ you may not use this file except in compliance with the License.
You may obtain a copy of the License at You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -17,26 +17,22 @@ limitations under the License.
package configurator package configurator
import ( import (
"bytes"
"fmt" "fmt"
"log" "log"
"os/exec"
"regexp" "regexp"
"strings" "strings"
"github.com/ghodss/yaml" "github.com/ghodss/yaml"
"github.com/kubernetes/deployment-manager/common" "github.com/kubernetes/deployment-manager/common"
"github.com/kubernetes/deployment-manager/util"
) )
// TODO(jackgr): Define an interface and a struct type for Configurator and move initialization to the caller.
type Configurator struct { type Configurator struct {
KubePath string k util.Kubernetes
Arguments []string
} }
func NewConfigurator(kubectlPath string, arguments []string) *Configurator { func NewConfigurator(kubernetes util.Kubernetes) *Configurator {
return &Configurator{kubectlPath, arguments} return &Configurator{kubernetes}
} }
// operation is an enumeration type for kubectl operations. // operation is an enumeration type for kubectl operations.
@ -50,10 +46,6 @@ const (
ReplaceOperation operation = "replace" ReplaceOperation operation = "replace"
) )
func (o operation) String() string {
return string(o)
}
// TODO(jackgr): Configure resources without dependencies in parallel. // TODO(jackgr): Configure resources without dependencies in parallel.
// Error is an error type that captures errors from the multiple calls to kubectl // Error is an error type that captures errors from the multiple calls to kubectl
@ -128,65 +120,64 @@ func (a *Configurator) Configure(c *common.Configuration, o operation) (string,
return strings.Join(output, "\n"), nil return strings.Join(output, "\n"), nil
} }
func (a *Configurator) configureResource(resource *common.Resource, o operation) (string, error) { func marshalResource(resource *common.Resource) (string, error) {
args := []string{o.String()}
if o == GetOperation {
args = append(args, "-o", "yaml")
if resource.Type != "" {
args = append(args, resource.Type)
if resource.Name != "" {
args = append(args, resource.Name)
}
}
}
var y []byte
if len(resource.Properties) > 0 { if len(resource.Properties) > 0 {
var err error y, err := yaml.Marshal(resource.Properties)
y, err = yaml.Marshal(resource.Properties)
if err != nil { if err != nil {
e := fmt.Errorf("yaml marshal failed for resource: %v: %v", resource.Name, err) return "", fmt.Errorf("yaml marshal failed for resource: %v: %v", resource.Name, err)
resource.State = failState(e)
return "", e
} }
return string(y), nil
} }
return "", nil
}
if len(y) > 0 { func (a *Configurator) configureResource(resource *common.Resource, o operation) (string, error) {
args = append(args, "-f", "-") ret := ""
} var err error
args = append(args, a.Arguments...)
cmd := exec.Command(a.KubePath, args...)
cmd.Stdin = bytes.NewBuffer(y)
// Combine stdout and stderr into a single dynamically resized buffer
combined := &bytes.Buffer{}
cmd.Stdout = combined
cmd.Stderr = combined
if err := cmd.Start(); err != nil {
e := fmt.Errorf("cannot start kubectl for resource: %v: %v", resource.Name, err)
resource.State = failState(e)
return "", e
}
if err := cmd.Wait(); err != nil { switch o {
// Treat delete special. If a delete is issued and a resource is not found, treat it as case CreateOperation:
// success. obj, err := marshalResource(resource)
if o == DeleteOperation && strings.HasSuffix(strings.TrimSpace(combined.String()), "not found") { if err != nil {
log.Println(resource.Name + " not found, treating as success for delete") resource.State = failState(err)
return "", err
}
ret, err = a.k.Create(obj)
if err != nil {
resource.State = failState(err)
} else { } else {
e := fmt.Errorf("kubectl failed for resource: %v: %v: %v", resource.Name, err, combined.String()) resource.State = &common.ResourceState{Status: common.Created}
resource.State = failState(e) }
return combined.String(), e return ret, nil
case ReplaceOperation:
obj, err := marshalResource(resource)
if err != nil {
resource.State = failState(err)
return "", err
} }
ret, err = a.k.Replace(obj)
if err != nil {
resource.State = failState(err)
} else {
resource.State = &common.ResourceState{Status: common.Created}
}
return ret, nil
case GetOperation:
return a.k.Get(resource.Name, resource.Type)
case DeleteOperation:
ret, err = a.k.Delete(resource.Name, resource.Type)
// Treat deleting a non-existent resource as success.
if err != nil {
if strings.HasSuffix(strings.TrimSpace(ret), "not found") {
resource.State = &common.ResourceState{Status: common.Created}
return ret, nil
}
resource.State = failState(err)
}
return ret, err
default:
return "", fmt.Errorf("invalid operation %s for resource: %v: %v", o, resource.Name, err)
} }
log.Printf("kubectl succeeded for resource: %v: SysTime: %v UserTime: %v\n%v",
resource.Name, cmd.ProcessState.SystemTime(), cmd.ProcessState.UserTime(), combined.String())
resource.State = &common.ResourceState{Status: common.Created}
return combined.String(), nil
} }
func failState(e error) *common.ResourceState { func failState(e error) *common.ResourceState {

@ -38,4 +38,5 @@ type Kubernetes interface {
Get(name string, resourceType string) (string, error) Get(name string, resourceType string) (string, error)
Create(resource string) (string, error) Create(resource string) (string, error)
Delete(name string, resourceType string) (string, error) Delete(name string, resourceType string) (string, error)
Replace(resource string) (string, error)
} }

@ -107,6 +107,11 @@ func (k *KubernetesKubectl) Delete(name string, resourceType string) (string, er
return k.execute(args, "") return k.execute(args, "")
} }
func (k *KubernetesKubectl) Replace(resource string) (string, error) {
args := []string{"replace"}
return k.execute(args, resource)
}
func (k *KubernetesKubectl) execute(args []string, input string) (string, error) { func (k *KubernetesKubectl) execute(args []string, input string) (string, error) {
if len(input) > 0 { if len(input) > 0 {
args = append(args, "-f", "-") args = append(args, "-f", "-")
@ -123,13 +128,15 @@ func (k *KubernetesKubectl) execute(args []string, input string) (string, error)
cmd.Stderr = combined cmd.Stderr = combined
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
log.Printf("cannot start kubectl %#v", err) log.Printf("cannot start kubectl %s %#v", combined.String(), err)
return combined.String(), err return combined.String(), err
} }
if err := cmd.Wait(); err != nil { if err := cmd.Wait(); err != nil {
log.Printf("kubectl failed: %#v", err) log.Printf("kubectl failed: %s %#v", combined.String(), err)
return combined.String(), err return combined.String(), err
} }
log.Printf("kubectl succeeded: SysTime: %v UserTime: %v\n%v",
cmd.ProcessState.SystemTime(), cmd.ProcessState.UserTime(), combined.String())
return combined.String(), nil return combined.String(), nil
} }

Loading…
Cancel
Save