Skip to content

Commit

Permalink
Made the async prober private (#3421)
Browse files Browse the repository at this point in the history
Signed-off-by: Calum Murray <[email protected]>
  • Loading branch information
Cali0707 authored Oct 23, 2023
1 parent 9ef55f2 commit 57c4d1e
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 75 deletions.
10 changes: 5 additions & 5 deletions control-plane/pkg/prober/async_prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
cacheExpiryTime = time.Minute * 30
)

type IPsLister func(addressable Addressable) ([]string, error)
type IPsLister func(addressable proberAddressable) ([]string, error)

type asyncProber struct {
client httpClient
Expand All @@ -47,7 +47,7 @@ type asyncProber struct {
// NewAsync creates an async Prober.
//
// It reports status changes using the provided EnqueueFunc.
func NewAsync(ctx context.Context, client httpClient, port string, IPsLister IPsLister, enqueue EnqueueFunc) Prober {
func NewAsync(ctx context.Context, client httpClient, port string, IPsLister IPsLister, enqueue EnqueueFunc) prober {
logger := logging.FromContext(ctx).Desugar().
With(zap.String("scope", "prober"))

Expand All @@ -65,15 +65,15 @@ func NewAsync(ctx context.Context, client httpClient, port string, IPsLister IPs
}
}

func NewAsyncWithTLS(ctx context.Context, port string, IPsLister IPsLister, enqueue EnqueueFunc, caCerts *string) (Prober, error) {
func NewAsyncWithTLS(ctx context.Context, port string, IPsLister IPsLister, enqueue EnqueueFunc, caCerts *string) (prober, error) {
newClient, err := makeHttpClientWithTLS(caCerts)
if err != nil {
return nil, err
}
return NewAsync(ctx, newClient, port, IPsLister, enqueue), nil
}

func (a *asyncProber) Probe(ctx context.Context, addressable Addressable, expected Status) Status {
func (a *asyncProber) probe(ctx context.Context, addressable proberAddressable, expected Status) Status {
address := addressable.Address
IPs, err := a.IPsLister(addressable)
if err != nil {
Expand Down Expand Up @@ -165,7 +165,7 @@ func (a *asyncProber) enqueueArg(_ string, arg interface{}) {
a.enqueue(arg.(types.NamespacedName))
}

func (a *asyncProber) RotateRootCaCerts(caCerts *string) error {
func (a *asyncProber) rotateRootCaCerts(caCerts *string) error {
newClient, err := makeHttpClientWithTLS(caCerts)
if err != nil {
return err
Expand Down
24 changes: 12 additions & 12 deletions control-plane/pkg/prober/async_prober_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestAsyncProber(t *testing.T) {
name string
pods []*corev1.Pod
podsLabelsSelector labels.Selector
addressable Addressable
addressable proberAddressable
responseStatusCode int
wantStatus Status
wantRequeueCountMin int
Expand All @@ -70,7 +70,7 @@ func TestAsyncProber(t *testing.T) {
name: "no pods",
pods: []*corev1.Pod{},
podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}),
addressable: Addressable{
addressable: proberAddressable{
Address: &url.URL{Scheme: "http", Path: "/b1/b1"},
ResourceKey: types.NamespacedName{Namespace: "b1", Name: "b1"},
},
Expand All @@ -92,7 +92,7 @@ func TestAsyncProber(t *testing.T) {
},
},
podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}),
addressable: Addressable{
addressable: proberAddressable{
Address: &url.URL{Scheme: "http", Path: "/b1/b1"},
ResourceKey: types.NamespacedName{Namespace: "b1", Name: "b1"},
},
Expand All @@ -115,7 +115,7 @@ func TestAsyncProber(t *testing.T) {
},
},
podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}),
addressable: Addressable{
addressable: proberAddressable{
Address: &url.URL{Scheme: "http", Path: "/b1/b1"},
ResourceKey: types.NamespacedName{Namespace: "b1", Name: "b1"},
},
Expand All @@ -138,7 +138,7 @@ func TestAsyncProber(t *testing.T) {
},
},
podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}),
addressable: Addressable{
addressable: proberAddressable{
Address: &url.URL{Scheme: "https", Path: "/b1/b1"},
ResourceKey: types.NamespacedName{Namespace: "b1", Name: "b1"},
},
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestAsyncProber(t *testing.T) {
u, _ := url.Parse(s.URL)

wantRequeueCountMin := atomic.NewInt64(int64(tc.wantRequeueCountMin))
var IPsLister IPsLister = func(addressable Addressable) ([]string, error) {
var IPsLister IPsLister = func(addressable proberAddressable) ([]string, error) {
pods, err := podinformer.Get(ctx).Lister().List(tc.podsLabelsSelector)
if err != nil {
return nil, err
Expand All @@ -196,7 +196,7 @@ func TestAsyncProber(t *testing.T) {
}
return ips, nil
}
var prober Prober
var prober prober
var err error
if tc.useTLS {
prober, err = NewAsyncWithTLS(ctx, u.Port(), IPsLister, func(key types.NamespacedName) {
Expand All @@ -210,7 +210,7 @@ func TestAsyncProber(t *testing.T) {
}

probeFunc := func() bool {
status := prober.Probe(ctx, tc.addressable, tc.wantStatus)
status := prober.probe(ctx, tc.addressable, tc.wantStatus)
return status == tc.wantStatus
}

Expand Down Expand Up @@ -258,7 +258,7 @@ func TestAsyncProberRotateCACerts(t *testing.T) {
u, err := url.Parse(addrString)
require.NoError(t, err)

addressable := Addressable{
addressable := proberAddressable{
Address: &url.URL{Scheme: "https", Path: "/b1/b1", Host: addrString},
ResourceKey: types.NamespacedName{Namespace: "b1", Name: "b1"},
}
Expand All @@ -273,7 +273,7 @@ func TestAsyncProberRotateCACerts(t *testing.T) {
}
podinformer.Get(ctx).Informer().GetStore().Add(pod)
labelSelector := labels.SelectorFromSet(map[string]string{"app": "p"})
var IPsLister IPsLister = func(addressable Addressable) ([]string, error) {
var IPsLister IPsLister = func(addressable proberAddressable) ([]string, error) {
pods, err := podinformer.Get(ctx).Lister().List(labelSelector)
if err != nil {
return nil, err
Expand All @@ -292,7 +292,7 @@ func TestAsyncProberRotateCACerts(t *testing.T) {
require.NoError(t, err)

probeFunc := func() bool {
status := prober.Probe(ctx, addressable, wantStatus)
status := prober.probe(ctx, addressable, wantStatus)
return status == wantStatus
}

Expand All @@ -302,7 +302,7 @@ func TestAsyncProberRotateCACerts(t *testing.T) {
require.Eventuallyf(tt, func() bool { return wantRequeueCountMin.Load() == 1 }, 5*time.Second, 250*time.Millisecond, "got %d, want 1", wantRequeueCountMin.Load())
})
t.Run("one pod - TLS certs after rotation", func(tt *testing.T) {
prober.RotateRootCaCerts(pointer.String(string(CA2)))
prober.rotateRootCaCerts(pointer.String(string(CA2)))
s.TLSConfig.GetCertificate = func(chi *tls.ClientHelloInfo) (*tls.Certificate, error) {
cert, err := tls.X509KeyPair(Crt2, Key2)
return &cert, err
Expand Down
14 changes: 7 additions & 7 deletions control-plane/pkg/prober/composite_prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ var (
)

type compositeProber struct {
httpProber Prober
httpsProber Prober
httpProber prober
httpsProber prober
}

// NewComposite creates a composite prober.
Expand All @@ -50,17 +50,17 @@ func NewCompositeNoTLS(ctx context.Context, httpPort string, IPsLister IPsLister
return NewComposite(ctx, httpPort, "443", IPsLister, enqueue, &emptyCaCerts)
}

func (c *compositeProber) Probe(ctx context.Context, addressable NewAddressable, expected Status) Status {
func (c *compositeProber) Probe(ctx context.Context, addressable ProberAddressable, expected Status) Status {
var status Status
for _, addr := range addressable.AddressStatus.Addresses {
oldAddressable := Addressable{
oldAddressable := proberAddressable{
ResourceKey: addressable.ResourceKey,
Address: addr.URL.URL(),
}
if addr.URL.Scheme == "https" {
status = c.httpsProber.Probe(ctx, oldAddressable, expected)
status = c.httpsProber.probe(ctx, oldAddressable, expected)
} else if addr.URL.Scheme == "http" {
status = c.httpProber.Probe(ctx, oldAddressable, expected)
status = c.httpProber.probe(ctx, oldAddressable, expected)
}
if status != expected {
return status
Expand All @@ -71,6 +71,6 @@ func (c *compositeProber) Probe(ctx context.Context, addressable NewAddressable,

func (c *compositeProber) RotateRootCaCerts(caCerts *string) error {
// we don't need to rotate the certs on the http prober as it isn't using them
err := c.httpsProber.RotateRootCaCerts(caCerts)
err := c.httpsProber.rotateRootCaCerts(caCerts)
return err
}
20 changes: 10 additions & 10 deletions control-plane/pkg/prober/composite_prober_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestCompositeProber(t *testing.T) {
name string
pods []*corev1.Pod
podsLabelsSelector labels.Selector
addressable NewAddressable
addressable ProberAddressable
responseStatusCode int
wantStatus Status
wantRequeueCountMin int
Expand All @@ -66,7 +66,7 @@ func TestCompositeProber(t *testing.T) {
},
},
podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}),
addressable: NewAddressable{
addressable: ProberAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &duckv1.Addressable{
URL: &apis.URL{Scheme: "http", Path: "/b1/b1"},
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestCompositeProber(t *testing.T) {
},
},
podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}),
addressable: NewAddressable{
addressable: ProberAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &duckv1.Addressable{
URL: &apis.URL{Scheme: "https", Path: "/b1/b1"},
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestCompositeProber(t *testing.T) {
},
},
podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}),
addressable: NewAddressable{
addressable: ProberAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &duckv1.Addressable{
URL: &apis.URL{Scheme: "http", Path: "/b1/b1"},
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestCompositeProber(t *testing.T) {
}
}

var IPsLister IPsLister = func(addressable Addressable) ([]string, error) {
var IPsLister IPsLister = func(addressable proberAddressable) ([]string, error) {
pods, err := podinformer.Get(ctx).Lister().List(tc.podsLabelsSelector)
if err != nil {
return nil, err
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestCompositeProberNoTLS(t *testing.T) {
name string
pods []*corev1.Pod
podsLabelsSelector labels.Selector
addressable NewAddressable
addressable ProberAddressable
responseStatusCode int
wantStatus Status
wantRequeueCountMin int
Expand All @@ -258,7 +258,7 @@ func TestCompositeProberNoTLS(t *testing.T) {
},
},
podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}),
addressable: NewAddressable{
addressable: ProberAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &duckv1.Addressable{
URL: &apis.URL{Scheme: "http", Path: "/b1/b1"},
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestCompositeProberNoTLS(t *testing.T) {
},
},
podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}),
addressable: NewAddressable{
addressable: ProberAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &duckv1.Addressable{
URL: &apis.URL{Scheme: "https", Path: "/b1/b1"},
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestCompositeProberNoTLS(t *testing.T) {
},
},
podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}),
addressable: NewAddressable{
addressable: ProberAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &duckv1.Addressable{
URL: &apis.URL{Scheme: "http", Path: "/b1/b1"},
Expand Down Expand Up @@ -389,7 +389,7 @@ func TestCompositeProberNoTLS(t *testing.T) {
}
}

var IPsLister IPsLister = func(addressable Addressable) ([]string, error) {
var IPsLister IPsLister = func(addressable proberAddressable) ([]string, error) {
pods, err := podinformer.Get(ctx).Lister().List(tc.podsLabelsSelector)
if err != nil {
return nil, err
Expand Down
34 changes: 17 additions & 17 deletions control-plane/pkg/prober/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"knative.dev/pkg/network"
)

// Addressable contains addressable resource data for the prober.
type Addressable struct {
// proberAddressable contains addressable resource data for the prober.
type proberAddressable struct {
// Addressable address.
Address *url.URL
// Resource key.
Expand All @@ -41,15 +41,15 @@ type Addressable struct {
type EnqueueFunc func(key types.NamespacedName)

// Prober probes an addressable resource.
type Prober interface {
type prober interface {
// Probe probes the provided Addressable resource and returns its Status.
Probe(ctx context.Context, addressable Addressable, expected Status) Status
probe(ctx context.Context, addressable proberAddressable, expected Status) Status
// RotateRootCaCerts rotates the CA certs used to make http requests
RotateRootCaCerts(caCerts *string) error
rotateRootCaCerts(caCerts *string) error
}

// NewAddressable contains addressable resource data for the new prober
type NewAddressable struct {
// ProberAddressable contains addressable resource data for the new prober
type ProberAddressable struct {
// Addressable status
AddressStatus *duckv1.AddressStatus
// Resource key
Expand All @@ -59,7 +59,7 @@ type NewAddressable struct {
// NewProber probes an addressable resource
type NewProber interface {
// Probe probes the provided NewAddressable resource and returns its Status
Probe(ctx context.Context, addressable NewAddressable, expected Status) Status
Probe(ctx context.Context, addressable ProberAddressable, expected Status) Status
// RotateRootCaCerts rotates the CA certs used to make http requests
RotateRootCaCerts(caCerts *string) error
}
Expand All @@ -68,21 +68,21 @@ type NewProber interface {
// ordinary functions as Prober. If f is a function
// with the appropriate signature, Func(f) is a
// Prober that calls f.
type Func func(ctx context.Context, addressable Addressable, expected Status) Status
type Func func(ctx context.Context, addressable proberAddressable, expected Status) Status

// Probe implements the Prober interface for Func.
func (p Func) Probe(ctx context.Context, addressable Addressable, expected Status) Status {
func (p Func) probe(ctx context.Context, addressable proberAddressable, expected Status) Status {
return p(ctx, addressable, expected)
}

// RotateRootCaCerts is an empty implementation to complete the Prober interface for Func.
func (p Func) RotateRootCaCerts(caCerts *string) error {
func (p Func) rotateRootCaCerts(caCerts *string) error {
return nil
}

type NewFunc func(ctx context.Context, addressable NewAddressable, expected Status) Status
type NewFunc func(ctx context.Context, addressable ProberAddressable, expected Status) Status

func (p NewFunc) Probe(ctx context.Context, addressable NewAddressable, expected Status) Status {
func (p NewFunc) Probe(ctx context.Context, addressable ProberAddressable, expected Status) Status {
return p(ctx, addressable, expected)
}

Expand Down Expand Up @@ -127,7 +127,7 @@ func probe(ctx context.Context, client httpClient, logger *zap.Logger, address s
}

func IPsListerFromService(svc types.NamespacedName) IPsLister {
return func(addressable Addressable) ([]string, error) {
return func(addressable proberAddressable) ([]string, error) {
return []string{GetIPForService(svc)}, nil
}
}
Expand All @@ -139,7 +139,7 @@ func GetIPForService(svc types.NamespacedName) string {
type IPListerWithMapping interface {
Register(svc types.NamespacedName, ip string)
Unregister(svc types.NamespacedName)
List(addressable Addressable) ([]string, error)
List(addressable proberAddressable) ([]string, error)
}

type ipListerWithMapping struct {
Expand Down Expand Up @@ -171,7 +171,7 @@ func (m *ipListerWithMapping) Unregister(svc types.NamespacedName) {
delete(m.mapping, a)
}

func (m *ipListerWithMapping) List(addressable Addressable) ([]string, error) {
func (m *ipListerWithMapping) List(addressable proberAddressable) ([]string, error) {
a := addressable.ResourceKey.String()

m.mx.RLock()
Expand All @@ -185,7 +185,7 @@ func (m *ipListerWithMapping) List(addressable Addressable) ([]string, error) {
}

func IdentityIPsLister() IPsLister {
return func(addressable Addressable) ([]string, error) {
return func(addressable proberAddressable) ([]string, error) {
return []string{addressable.Address.Host}, nil
}
}
Loading

0 comments on commit 57c4d1e

Please sign in to comment.