Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bootstrap worker nodes using kops-controller #9653

Merged
merged 13 commits into from
Aug 17, 2020
Merged
Prev Previous commit
Next Next commit
Add server code to kops-controller
  • Loading branch information
johngmyers committed Aug 15, 2020
commit 9cfa169740a0da90408c48921c24532c5cbc6513
1 change: 0 additions & 1 deletion cmd/kops-controller/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -9,7 +9,6 @@ go_library(
"//cmd/kops-controller/controllers:go_default_library",
"//cmd/kops-controller/pkg/config:go_default_library",
"//cmd/kops-controller/pkg/server:go_default_library",
"//pkg/apis/kops:go_default_library",
"//pkg/nodeidentity:go_default_library",
"//pkg/nodeidentity/aws:go_default_library",
"//pkg/nodeidentity/do:go_default_library",
10 changes: 4 additions & 6 deletions cmd/kops-controller/main.go
Original file line number Diff line number Diff line change
@@ -30,7 +30,6 @@ import (
"k8s.io/kops/cmd/kops-controller/controllers"
"k8s.io/kops/cmd/kops-controller/pkg/config"
"k8s.io/kops/cmd/kops-controller/pkg/server"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/nodeidentity"
nodeidentityaws "k8s.io/kops/pkg/nodeidentity/aws"
nodeidentitydo "k8s.io/kops/pkg/nodeidentity/do"
@@ -88,15 +87,14 @@ func main() {
if opt.Server != nil {
var verifier fi.Verifier
var err error
switch opt.Server.Provider {
case kops.CloudProviderAWS:
verifier, err = awsup.NewAWSVerifier()
if opt.Server.Provider.AWS != nil {
verifier, err = awsup.NewAWSVerifier(opt.Server.Provider.AWS)
if err != nil {
setupLog.Error(err, "unable to create verifier")
os.Exit(1)
}
default:
klog.Fatalf("server for cloud provider %s is not supported", opt.Server.Provider)
} else {
klog.Fatalf("server cloud provider config not provided")
}

srv, err := server.NewServer(&opt, verifier)
2 changes: 1 addition & 1 deletion cmd/kops-controller/pkg/config/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -5,5 +5,5 @@ go_library(
srcs = ["options.go"],
importpath = "k8s.io/kops/cmd/kops-controller/pkg/config",
visibility = ["//visibility:public"],
deps = ["//pkg/apis/kops:go_default_library"],
deps = ["//upup/pkg/fi/cloudup/awsup:go_default_library"],
)
8 changes: 6 additions & 2 deletions cmd/kops-controller/pkg/config/options.go
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ limitations under the License.

package config

import "k8s.io/kops/pkg/apis/kops"
import "k8s.io/kops/upup/pkg/fi/cloudup/awsup"

type Options struct {
Cloud string `json:"cloud,omitempty"`
@@ -32,10 +32,14 @@ type ServerOptions struct {
Listen string

// Provider is the cloud provider.
Provider kops.CloudProviderID `json:"provider"`
Provider ServerProviderOptions `json:"provider"`

// ServerKeyPath is the path to our TLS serving private key.
ServerKeyPath string `json:"serverKeyPath,omitempty"`
// ServerCertificatePath is the path to our TLS serving certificate.
ServerCertificatePath string `json:"serverCertificatePath,omitempty"`
}

type ServerProviderOptions struct {
AWS *awsup.AWSVerifierOptions `json:"aws,omitempty"`
}
2 changes: 1 addition & 1 deletion cmd/kops-controller/pkg/server/server.go
Original file line number Diff line number Diff line change
@@ -85,7 +85,7 @@ func (s *Server) bootstrap(w http.ResponseWriter, r *http.Request) {
return
}

klog.Infof("id is %s", id) // todo do something with id
klog.Infof("id is %s", id.Instance) // todo do something with id

req := &nodeup.BootstrapRequest{}
err = json.Unmarshal(body, req)
8 changes: 7 additions & 1 deletion upup/pkg/fi/authenticate.go
Original file line number Diff line number Diff line change
@@ -21,7 +21,13 @@ type Authenticator interface {
CreateToken(body []byte) (string, error)
}

// VerifyResult is the result of a successfully verified request.
type VerifyResult struct {
// Instance is the hostname of the instance.
Instance string
johngmyers marked this conversation as resolved.
Show resolved Hide resolved
}

// Verifier verifies authentication credentials for requests.
type Verifier interface {
VerifyToken(token string, body []byte) (string, error)
VerifyToken(token string, body []byte) (*VerifyResult, error)
}
1 change: 1 addition & 0 deletions upup/pkg/fi/cloudup/awsup/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ go_library(
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/awserr:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/client:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/ec2metadata:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/endpoints:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/request:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/session:go_default_library",
111 changes: 96 additions & 15 deletions upup/pkg/fi/cloudup/awsup/aws_verifier.go
Original file line number Diff line number Diff line change
@@ -32,26 +32,55 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/sts"
"k8s.io/kops/upup/pkg/fi"
)

type AWSVerifierOptions struct {
// NodesRoles are the IAM roles that worker nodes are permitted to have.
NodesRoles []string `json:"nodesRoles"`
}

type awsVerifier struct {
accountId string
opt AWSVerifierOptions

ec2 *ec2.EC2
sts *sts.STS
client http.Client
}

var _ fi.Verifier = &awsVerifier{}

func NewAWSVerifier() (fi.Verifier, error) {
func NewAWSVerifier(opt *AWSVerifierOptions) (fi.Verifier, error) {
config := aws.NewConfig().WithCredentialsChainVerboseErrors(true)
sess, err := session.NewSession(config)
if err != nil {
return nil, err
}

stsClient := sts.New(sess)
identity, err := stsClient.GetCallerIdentity(&sts.GetCallerIdentityInput{})
if err != nil {
return nil, err
}

metadata := ec2metadata.New(sess, config)
region, err := metadata.Region()
if err != nil {
return nil, fmt.Errorf("error querying ec2 metadata service (for region): %v", err)
}

ec2Client := ec2.New(sess, config.WithRegion(region))

return &awsVerifier{
sts: sts.New(sess),
accountId: aws.StringValue(identity.Account),
opt: *opt,
ec2: ec2Client,
sts: stsClient,
client: http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
@@ -87,63 +116,115 @@ type ResponseMetadata struct {
RequestId string `xml:"RequestId"`
}

func (a awsVerifier) VerifyToken(token string, body []byte) (string, error) {
func (a awsVerifier) VerifyToken(token string, body []byte) (*fi.VerifyResult, error) {
if !strings.HasPrefix(token, AWSAuthenticationTokenPrefix) {
return "", fmt.Errorf("incorrect authorization type")
return nil, fmt.Errorf("incorrect authorization type")
}
token = strings.TrimPrefix(token, AWSAuthenticationTokenPrefix)

// We rely on the client and server using the same version of the same STS library.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we instead send the body of the STS request?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vault does that, but they have to support arbitrary clients. I took the ability to require the client to use the same version of the AWS library to reduce the risk of allowing attackers to use kops-controller as a pass-through proxy to STS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - interesting point. Does this introduce a challenge for upgrading the AWS SDK though? If there's master vs node skew then I am worried that either old-version nodes or new nodes won't be able to join?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think supporting version skew was a requirement. Old nodes would only be an issue if they were created before the ASG's spec was updated yet didn't call kops-controller until after the first master was updated. The old node would be cleaned out by rolling update (though the cluster validation failure would impede rolling update from starting). New nodes would fail only until the first master is updated.

I do have an idea of making rolling update ignore non-ready nodes that are in nodes-role instance groups other than the one being updated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think the scenario we be most mindful of is the master being ahead of the nodes. i.e. we update the AWS SDK and somehow the body changes, we roll that out for the control-plane, but the nodes aren't yet updated. As you point out, old node configurations would fail to join in this scenario. Kubernetes itself tries very hard to ensure that the masters can be a minor version ahead of the nodes (indefinitely); we don't have to support that but we should be aware of this.

Do we have an example of what's in the body? How likely is it to change?

Copy link
Member Author

@johngmyers johngmyers Aug 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the time we run kops update cluster --yes (or terraform apply) the node ASGs will be updated. All new instances created after that point will be on the new version of the library. Similarly, all nodes that have joined by the time the first master is updated will continue to be in the cluster. The only issue would be a worker instance that is created before the kops update cluster --yes but takes longer to join the cluster than the time it takes for the first control plane node to get updated and apply the new kops-controller manifest.

Copy link
Member Author

@johngmyers johngmyers Aug 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The body usually has QWN0aW9uPUdldENhbGxlcklkZW50aXR5JlZlcnNpb249MjAxMS0wNi0xNQ== which is the base64 encoding of Action=GetCallerIdentity&Version=2011-06-15. It doesn't seem likely to change often.

The other thing we're relying on being the same from client to server is the request URL.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that sounds reasonable. We could create a unit test to check that the body is this value; if the unit test ever failed we could deal with it (e.g constructing all allowed forms and seeing if any of them match the sha), but as you point out it seems unlikely.

stsRequest, _ := a.sts.GetCallerIdentityRequest(nil)
err := stsRequest.Sign()
if err != nil {
return "", fmt.Errorf("creating identity request: %v", err)
return nil, fmt.Errorf("creating identity request: %v", err)
}

stsRequest.HTTPRequest.Header = nil
tokenBytes, err := base64.StdEncoding.DecodeString(token)
if err != nil {
return "", fmt.Errorf("decoding authorization token: %v", err)
return nil, fmt.Errorf("decoding authorization token: %v", err)
}
err = json.Unmarshal(tokenBytes, &stsRequest.HTTPRequest.Header)
if err != nil {
return "", fmt.Errorf("unmarshalling authorization token: %v", err)
return nil, fmt.Errorf("unmarshalling authorization token: %v", err)
}

sha := sha256.Sum256(body)
if stsRequest.HTTPRequest.Header.Get("X-Kops-Request-SHA") != base64.RawStdEncoding.EncodeToString(sha[:]) {
return "", fmt.Errorf("incorrect SHA")
return nil, fmt.Errorf("incorrect SHA")
}

requestBytes, _ := ioutil.ReadAll(stsRequest.Body)
_, _ = stsRequest.Body.Seek(0, io.SeekStart)
if stsRequest.HTTPRequest.Header.Get("Content-Length") != strconv.Itoa(len(requestBytes)) {
return "", fmt.Errorf("incorrect content-length")
return nil, fmt.Errorf("incorrect content-length")
}

// TODO - implement retry?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect our client needs to retry anyway, so there might not be much reason for us to retry here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nodeup will retry, it's just a question of how long transient errors delay node startup.

response, err := a.client.Do(stsRequest.HTTPRequest)
if err != nil {
return "", fmt.Errorf("sending STS request: %v", err)
return nil, fmt.Errorf("sending STS request: %v", err)
}
if response != nil {
defer response.Body.Close()
}

responseBody, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", fmt.Errorf("reading STS response: %v", err)
return nil, fmt.Errorf("reading STS response: %v", err)
}
if response.StatusCode != 200 {
return "", fmt.Errorf("received status code %d from STS: %s", response.StatusCode, string(responseBody))
return nil, fmt.Errorf("received status code %d from STS: %s", response.StatusCode, string(responseBody))
}

result := GetCallerIdentityResponse{}
err = xml.NewDecoder(bytes.NewReader(responseBody)).Decode(&result)
if err != nil {
return "", fmt.Errorf("decoding STS response: %v", err)
return nil, fmt.Errorf("decoding STS response: %v", err)
}

marshal, _ := json.Marshal(result)
return string(marshal), nil
if result.GetCallerIdentityResult[0].Account != a.accountId {
return nil, fmt.Errorf("incorrect account %s", result.GetCallerIdentityResult[0].Account)
}

arn := result.GetCallerIdentityResult[0].Arn
parts := strings.Split(arn, ":")
if len(parts) != 6 {
return nil, fmt.Errorf("arn %q contains unexpected number of colons", arn)
}
if parts[0] != "arn" {
return nil, fmt.Errorf("arn %q doesn't start with \"arn:\"", arn)
}
// parts[1] is partition
if parts[2] != "iam" && parts[2] != "sts" {
return nil, fmt.Errorf("arn %q has unrecognized service", arn)
}
// parts[3] is region
// parts[4] is account
resource := strings.Split(parts[5], "/")
if resource[0] != "assumed-role" {
return nil, fmt.Errorf("arn %q has unrecognized type", arn)
}
if len(resource) < 3 {
return nil, fmt.Errorf("arn %q contains too few slashes", arn)
}
found := false
for _, role := range a.opt.NodesRoles {
if resource[1] == role {
found = true
break
}
}
if !found {
return nil, fmt.Errorf("arn %q does not contain acceptable node role", arn)
}

instanceID := resource[2]
instances, err := a.ec2.DescribeInstances(&ec2.DescribeInstancesInput{
InstanceIds: aws.StringSlice([]string{instanceID}),
})
if err != nil {
return nil, fmt.Errorf("describing instance for arn %q", arn)
}

if len(instances.Reservations) <= 0 || len(instances.Reservations[0].Instances) <= 0 {
return nil, fmt.Errorf("missing instance id: %s", instanceID)
}
if len(instances.Reservations[0].Instances) > 1 {
return nil, fmt.Errorf("found multiple instances with instance id: %s", instanceID)
}

return &fi.VerifyResult{
Instance: aws.StringValue(instances.Reservations[0].Instances[0].PrivateDnsName),
}, nil
}
22 changes: 21 additions & 1 deletion upup/pkg/fi/cloudup/template_functions.go
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ import (

"github.com/Masterminds/sprig"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog"
kopscontrollerconfig "k8s.io/kops/cmd/kops-controller/pkg/config"
"k8s.io/kops/pkg/apis/kops"
@@ -48,6 +49,7 @@ import (
"k8s.io/kops/pkg/resources/spotinst"
"k8s.io/kops/pkg/wellknownports"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
"k8s.io/kops/upup/pkg/fi/cloudup/gce"
"k8s.io/kops/util/pkg/env"
)
@@ -384,10 +386,28 @@ func (tf *TemplateFunctions) KopsControllerConfig() (string, error) {
pkiDir := "/etc/kubernetes/kops-controller/pki"
config.Server = &kopscontrollerconfig.ServerOptions{
Listen: fmt.Sprintf(":%d", wellknownports.KopsControllerPort),
Provider: kops.CloudProviderID(cluster.Spec.CloudProvider),
ServerCertificatePath: path.Join(pkiDir, "kops-controller.crt"),
ServerKeyPath: path.Join(pkiDir, "kops-controller.key"),
}

switch kops.CloudProviderID(cluster.Spec.CloudProvider) {
case kops.CloudProviderAWS:
nodesRoles := sets.String{}
for _, ig := range tf.InstanceGroups {
if ig.Spec.Role == kops.InstanceGroupRoleNode {
profile, err := tf.LinkToIAMInstanceProfile(ig)
if err != nil {
return "", fmt.Errorf("getting role for ig %s: %v", ig.Name, err)
}
nodesRoles.Insert(*profile.Name)
}
}
config.Server.Provider.AWS = &awsup.AWSVerifierOptions{
NodesRoles: nodesRoles.List(),
}
default:
return "", fmt.Errorf("unsupported cloud provider %s", cluster.Spec.CloudProvider)
}
}

// To avoid indentation problems, we marshal as json. json is a subset of yaml