Skip to content

Commit

Permalink
Implemented DescribeCluster to Discover Cluster Version (#7314)
Browse files Browse the repository at this point in the history
  • Loading branch information
edibble21 authored Nov 15, 2024
1 parent 6a940ff commit d0c8ed0
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/controllers/nodeclass/status/launchtemplate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ var _ = Describe("NodeClass Launch Template CIDR Resolution Controller", func()
KubernetesNetworkConfig: &ekstypes.KubernetesNetworkConfigResponse{
ServiceIpv6Cidr: lo.ToPtr("2001:db8::/64"),
},
Version: lo.ToPtr("1.30"),
},
})
nodeClass.Spec.AMIFamily = lo.ToPtr(v1.AMIFamilyAL2023)
Expand Down
15 changes: 15 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package errors
import (
"errors"

awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/aws/smithy-go"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -38,6 +39,9 @@ var (
alreadyExistsErrorCodes = sets.New[string](
"EntityAlreadyExists",
)
accessDeniedErrorCodes = sets.New[int](
403,
)
// unfulfillableCapacityErrorCodes signify that capacity is temporarily unable to be launched
unfulfillableCapacityErrorCodes = sets.New[string](
"InsufficientInstanceCapacity",
Expand All @@ -49,6 +53,17 @@ var (
)
)

func IsAccessDenied(err error) bool {
if err == nil {
return false
}
var awsError *awshttp.ResponseError
if errors.As(err, &awsError) {
return accessDeniedErrorCodes.Has(awsError.HTTPStatusCode())
}
return false
}

// IsNotFound returns true if the err is an AWS error (even if it's
// wrapped) and is a known to mean "not found" (as opposed to a more
// serious or unexpected error)
Expand Down
1 change: 1 addition & 0 deletions pkg/fake/eksapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (s *EKSAPI) DescribeCluster(_ context.Context, input *eks.DescribeClusterIn
KubernetesNetworkConfig: &ekstypes.KubernetesNetworkConfigResponse{
ServiceIpv4Cidr: lo.ToPtr("10.100.0.0/16"),
},
Version: lo.ToPtr("1.30"),
},
}, nil
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
ec2api,
cfg.Region,
)
versionProvider := version.NewDefaultProvider(operator.KubernetesInterface, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
versionProvider := version.NewDefaultProvider(operator.KubernetesInterface, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), eksapi)
ssmProvider := ssmp.NewDefaultProvider(ssm.NewFromConfig(cfg), ssmCache)
amiProvider := amifamily.NewDefaultProvider(operator.Clock, versionProvider, ssmProvider, ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
amiResolver := amifamily.NewDefaultResolver()
Expand Down
78 changes: 78 additions & 0 deletions pkg/providers/version/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package version_test

import (
"context"
"testing"

"sigs.k8s.io/karpenter/pkg/test/v1alpha1"

coreoptions "sigs.k8s.io/karpenter/pkg/operator/options"
coretest "sigs.k8s.io/karpenter/pkg/test"

"github.com/aws/karpenter-provider-aws/pkg/apis"
"github.com/aws/karpenter-provider-aws/pkg/fake"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/aws/karpenter-provider-aws/pkg/test"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
. "sigs.k8s.io/karpenter/pkg/utils/testing"
)

var ctx context.Context
var stop context.CancelFunc
var env *coretest.Environment
var awsEnv *test.Environment
var fakeEKSAPI *fake.EKSAPI

func TestAWS(t *testing.T) {
ctx = TestContextWithLogger(t)
RegisterFailHandler(Fail)
RunSpecs(t, "VersionProvider")
}

var _ = BeforeSuite(func() {
env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...))
ctx = coreoptions.ToContext(ctx, coretest.Options())
ctx = options.ToContext(ctx, test.Options())
ctx, stop = context.WithCancel(ctx)
awsEnv = test.NewEnvironment(ctx, env)

fakeEKSAPI = &fake.EKSAPI{}
})

var _ = AfterSuite(func() {
stop()
Expect(env.Stop()).To(Succeed(), "Failed to stop environment")
})

var _ = BeforeEach(func() {
fakeEKSAPI.Reset()
})

var _ = AfterEach(func() {
ExpectCleanedUp(ctx, env.Client)
})

var _ = Describe("Operator", func() {
It("should resolve Kubernetes Version via Describe Cluster", func() {
endpoint, err := awsEnv.VersionProvider.Get(ctx)
Expect(err).ToNot(HaveOccurred())
Expect(endpoint).To(Equal("1.30"))
})
})
34 changes: 30 additions & 4 deletions pkg/providers/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ import (
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/log"

awserrors "github.com/aws/karpenter-provider-aws/pkg/errors"

"github.com/aws/aws-sdk-go-v2/service/eks"

sdk "github.com/aws/karpenter-provider-aws/pkg/aws"

"github.com/aws/karpenter-provider-aws/pkg/operator/options"

"sigs.k8s.io/karpenter/pkg/utils/pretty"
)

Expand All @@ -48,25 +56,43 @@ type DefaultProvider struct {
cache *cache.Cache
cm *pretty.ChangeMonitor
kubernetesInterface kubernetes.Interface
eksapi sdk.EKSAPI
}

func NewDefaultProvider(kubernetesInterface kubernetes.Interface, cache *cache.Cache) *DefaultProvider {
func NewDefaultProvider(kubernetesInterface kubernetes.Interface, cache *cache.Cache, eksapi sdk.EKSAPI) *DefaultProvider {
return &DefaultProvider{
cm: pretty.NewChangeMonitor(),
cache: cache,
kubernetesInterface: kubernetesInterface,
eksapi: eksapi,
}
}

func (p *DefaultProvider) Get(ctx context.Context) (string, error) {
var version string
if version, ok := p.cache.Get(kubernetesVersionCacheKey); ok {
return version.(string), nil
}
serverVersion, err := p.kubernetesInterface.Discovery().ServerVersion()
output, err := p.eksapi.DescribeCluster(ctx, &eks.DescribeClusterInput{
Name: lo.ToPtr(options.FromContext(ctx).ClusterName),
})
if err != nil {
return "", err
if !awserrors.IsAccessDenied(err) {
return "", err
}
output, err := p.kubernetesInterface.Discovery().ServerVersion()
if err != nil {
return "", fmt.Errorf("getting kubernetes version from the kubernetes API")
} else if output != nil {
version = fmt.Sprintf("%s.%s", output.Major, strings.TrimSuffix(output.Minor, "+"))
log.FromContext(ctx).Info("retrieved Kubernetes version from Kubernetes API", "version", version)
}
} else if lo.FromPtr(output.Cluster.Version) != "" {
version = *output.Cluster.Version
log.FromContext(ctx).Info("retrieved Kubernetes version from EKS DescribeCluster", "version", version)
} else {
return "", fmt.Errorf("unable to retrieve Kubernetes version from EKS DescribeCluster")
}
version := fmt.Sprintf("%s.%s", serverVersion.Major, strings.TrimSuffix(serverVersion.Minor, "+"))
p.cache.SetDefault(kubernetesVersionCacheKey, version)
if p.cm.HasChanged("kubernetes-version", version) {
log.FromContext(ctx).WithValues("version", version).V(1).Info("discovered kubernetes version")
Expand Down
2 changes: 1 addition & 1 deletion pkg/test/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment
pricingProvider := pricing.NewDefaultProvider(ctx, fakePricingAPI, ec2api, fake.DefaultRegion)
subnetProvider := subnet.NewDefaultProvider(ec2api, subnetCache, availableIPAdressCache, associatePublicIPAddressCache)
securityGroupProvider := securitygroup.NewDefaultProvider(ec2api, securityGroupCache)
versionProvider := version.NewDefaultProvider(env.KubernetesInterface, kubernetesVersionCache)
versionProvider := version.NewDefaultProvider(env.KubernetesInterface, kubernetesVersionCache, eksapi)
instanceProfileProvider := instanceprofile.NewDefaultProvider(fake.DefaultRegion, iamapi, instanceProfileCache)
ssmProvider := ssmp.NewDefaultProvider(ssmapi, ssmCache)
amiProvider := amifamily.NewDefaultProvider(clock, versionProvider, ssmProvider, ec2api, ec2Cache)
Expand Down

0 comments on commit d0c8ed0

Please sign in to comment.