From 8c9bd269e4390c26d72f25435df0a6f6866c4fd3 Mon Sep 17 00:00:00 2001
From: Shiming Zhang
Date: Mon, 2 Sep 2024 19:07:03 +0800
Subject: [PATCH] Use manage to specify resources
---
.../v1alpha1/kwok_configuration_types.go | 3 +
.../v1alpha1/kwok_manage_selector_types.go | 42 +++
.../config/v1alpha1/zz_generated.deepcopy.go | 59 +++++
.../kwok_configuration_types.go | 3 +
.../internalversion/kwok_manages_selector.go | 247 ++++++++++++++++++
.../kwok_manages_selector_node.go | 91 +++++++
.../kwok_manages_selector_test.go | 114 ++++++++
.../zz_generated.conversion.go | 42 +++
.../internalversion/zz_generated.deepcopy.go | 75 ++++++
pkg/kwok/cmd/root.go | 188 ++++++-------
pkg/kwok/controllers/controller.go | 109 ++++----
pkg/kwok/server/server.go | 36 +++
site/content/en/docs/generated/apis.md | 134 ++++++++++
site/content/en/docs/generated/kwok.md | 1 +
14 files changed, 1011 insertions(+), 133 deletions(-)
create mode 100644 pkg/apis/config/v1alpha1/kwok_manage_selector_types.go
create mode 100644 pkg/apis/internalversion/kwok_manages_selector.go
create mode 100644 pkg/apis/internalversion/kwok_manages_selector_node.go
create mode 100644 pkg/apis/internalversion/kwok_manages_selector_test.go
diff --git a/pkg/apis/config/v1alpha1/kwok_configuration_types.go b/pkg/apis/config/v1alpha1/kwok_configuration_types.go
index 16980bfbc..06319ea83 100644
--- a/pkg/apis/config/v1alpha1/kwok_configuration_types.go
+++ b/pkg/apis/config/v1alpha1/kwok_configuration_types.go
@@ -70,6 +70,9 @@ type KwokConfigurationOptions struct {
// is the default value for flag --tls-private-key-file
TLSPrivateKeyFile string `json:"tlsPrivateKeyFile,omitempty"`
+ // Manages is the option to manage an resources
+ Manages ManagesSelectors `json:"manages,omitempty"`
+
// ManageSingleNode is the option to manage a single node name.
// is the default value for flag --manage-single-node
// Note: when `manage-all-nodes` is specified as true or
diff --git a/pkg/apis/config/v1alpha1/kwok_manage_selector_types.go b/pkg/apis/config/v1alpha1/kwok_manage_selector_types.go
new file mode 100644
index 000000000..8a2bac9d5
--- /dev/null
+++ b/pkg/apis/config/v1alpha1/kwok_manage_selector_types.go
@@ -0,0 +1,42 @@
+/*
+Copyright 2024 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1alpha1
+
+// ManagesSelectors holds information about the manages selectors.
+type ManagesSelectors []ManagesSelector
+
+// ManagesSelector holds information about the manages selector.
+type ManagesSelector struct {
+ // Kind of the referent.
+ Kind string `json:"kind"`
+ // Group of the referent.
+ Group string `json:"group,omitempty"`
+ // Version of the referent.
+ Version string `json:"version,omitempty"`
+
+ // Name of the referent
+ // Only available with Node Kind.
+ Name string `json:"name,omitempty"`
+ // Labels of the referent.
+ // specify matched with labels.
+ // Only available with Node Kind.
+ Labels map[string]string `json:"labels,omitempty"`
+ // Annotations of the referent.
+ // specify matched with annotations.
+ // Only available with Node Kind.
+ Annotations map[string]string `json:"annotations,omitempty"`
+}
diff --git a/pkg/apis/config/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/config/v1alpha1/zz_generated.deepcopy.go
index 1b5f881f1..c49931952 100644
--- a/pkg/apis/config/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/config/v1alpha1/zz_generated.deepcopy.go
@@ -201,6 +201,13 @@ func (in *KwokConfigurationOptions) DeepCopyInto(out *KwokConfigurationOptions)
*out = make([]string, len(*in))
copy(*out, *in)
}
+ if in.Manages != nil {
+ in, out := &in.Manages, &out.Manages
+ *out = make(ManagesSelectors, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
if in.ManageAllNodes != nil {
in, out := &in.ManageAllNodes, &out.ManageAllNodes
*out = new(bool)
@@ -404,6 +411,58 @@ func (in *KwokctlResource) DeepCopyObject() runtime.Object {
return nil
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ManagesSelector) DeepCopyInto(out *ManagesSelector) {
+ *out = *in
+ if in.Labels != nil {
+ in, out := &in.Labels, &out.Labels
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ if in.Annotations != nil {
+ in, out := &in.Annotations, &out.Annotations
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ManagesSelector.
+func (in *ManagesSelector) DeepCopy() *ManagesSelector {
+ if in == nil {
+ return nil
+ }
+ out := new(ManagesSelector)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in ManagesSelectors) DeepCopyInto(out *ManagesSelectors) {
+ {
+ in := &in
+ *out = make(ManagesSelectors, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ return
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ManagesSelectors.
+func (in ManagesSelectors) DeepCopy() ManagesSelectors {
+ if in == nil {
+ return nil
+ }
+ out := new(ManagesSelectors)
+ in.DeepCopyInto(out)
+ return *out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Port) DeepCopyInto(out *Port) {
*out = *in
diff --git a/pkg/apis/internalversion/kwok_configuration_types.go b/pkg/apis/internalversion/kwok_configuration_types.go
index 40fa11653..3979fd066 100644
--- a/pkg/apis/internalversion/kwok_configuration_types.go
+++ b/pkg/apis/internalversion/kwok_configuration_types.go
@@ -52,6 +52,9 @@ type KwokConfigurationOptions struct {
// TLSPrivateKeyFile is the ile containing x509 private key
TLSPrivateKeyFile string
+ // Manages is the option to manage the resource
+ Manages ManagesSelectors
+
// ManageSingleNode is the option to manage a single node name
ManageSingleNode string
diff --git a/pkg/apis/internalversion/kwok_manages_selector.go b/pkg/apis/internalversion/kwok_manages_selector.go
new file mode 100644
index 000000000..e28990178
--- /dev/null
+++ b/pkg/apis/internalversion/kwok_manages_selector.go
@@ -0,0 +1,247 @@
+/*
+Copyright 2024 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internalversion
+
+import (
+ "fmt"
+ "sort"
+ "strings"
+
+ "k8s.io/apimachinery/pkg/fields"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/selection"
+
+ "sigs.k8s.io/kwok/pkg/utils/maps"
+ "sigs.k8s.io/kwok/pkg/utils/slices"
+)
+
+// ManagesSelectors holds information about the manages selectors.
+type ManagesSelectors []ManagesSelector
+
+// ManagesSelector holds information about the manages selector.
+type ManagesSelector struct {
+ // Kind of the referent.
+ Kind string
+ // Group of the referent.
+ Group string
+ // Version of the referent.
+ Version string
+
+ // Name of the referent
+ // Only available with Node Kind.
+ Name string
+ // Labels of the referent.
+ // specify matched with labels.
+ // Only available with Node Kind.
+ Labels map[string]string
+ // Annotations of the referent.
+ // specify matched with annotations.
+ // Only available with Node Kind.
+ Annotations map[string]string
+}
+
+func (s *ManagesSelectors) Set(sel string) error {
+ p, err := parseManagesSelector(sel)
+ if err != nil {
+ return err
+ }
+ *s = append(*s, *p)
+ return nil
+}
+
+func (s ManagesSelectors) Type() string {
+ return "ManagesSelectorSlice"
+}
+
+func (s ManagesSelectors) String() string {
+ strSlice := slices.Map(s, func(t ManagesSelector) string {
+ return t.String()
+ })
+ return strings.Join(strSlice, " ")
+}
+
+func (s *ManagesSelector) Set(sel string) error {
+ p, err := parseManagesSelector(sel)
+ if err != nil {
+ return err
+ }
+ *s = *p
+ return nil
+}
+
+func (s *ManagesSelector) Type() string {
+ return "ManagesSelector"
+}
+
+func parseManagesSelector(arg string) (*ManagesSelector, error) {
+ items := strings.Split(arg, ":")
+
+ t := ManagesSelector{}
+ gvk := items[0]
+ if gvk == "" {
+ return nil, fmt.Errorf("invalid empty target resource ref")
+ }
+
+ sepVersion := strings.Index(gvk, "/")
+ if sepVersion != -1 {
+ t.Version = gvk[sepVersion+1:]
+ gvk = gvk[:sepVersion]
+ }
+
+ sepGroup := strings.Index(gvk, ".")
+ if sepGroup != -1 {
+ t.Kind = gvk[:sepGroup]
+ t.Group = gvk[sepGroup+1:]
+ } else {
+ t.Kind = gvk
+ }
+
+ for _, item := range items[1:] {
+ sel, err := fields.ParseSelector(item)
+ if err != nil {
+ return nil, err
+ }
+ for _, req := range sel.Requirements() {
+ if req.Operator != selection.Equals && req.Operator != selection.DoubleEquals {
+ return nil, fmt.Errorf("invalid selector requirements: %s", req.Operator)
+ }
+ switch req.Field {
+ case "metadata.name":
+ t.Name = req.Value
+ default:
+ sp := strings.SplitN(req.Field, ".", 3)
+ if len(sp) < 2 {
+ return nil, fmt.Errorf("error target resource ref: %s", item)
+ }
+ if sp[0] != "metadata" {
+ return nil, fmt.Errorf("error target resource ref: %s", item)
+ }
+
+ switch sp[1] {
+ case "labels":
+ if t.Labels == nil {
+ t.Labels = map[string]string{}
+ }
+ t.Labels[sp[2]] = req.Value
+ case "annotations":
+ if t.Annotations == nil {
+ t.Annotations = map[string]string{}
+ }
+ t.Annotations[sp[2]] = req.Value
+ default:
+ return nil, fmt.Errorf("error target resource ref: %s", item)
+ }
+ }
+ }
+ }
+ return &t, nil
+}
+
+func (s *ManagesSelector) String() string {
+ if s == nil {
+ return ""
+ }
+
+ buf := &strings.Builder{}
+ buf.WriteString(s.Kind)
+ if s.Group != "" {
+ buf.WriteString(fmt.Sprintf(".%s", s.Group))
+ }
+ if s.Version != "" {
+ buf.WriteString(fmt.Sprintf("/%s", s.Version))
+ }
+ if s.Name != "" {
+ buf.WriteString(fmt.Sprintf(":metadata.name=%s", s.Name))
+ }
+ if len(s.Labels) > 0 {
+ keys := maps.Keys(s.Labels)
+ sort.Strings(keys)
+ for _, k := range keys {
+ buf.WriteString(fmt.Sprintf(":metadata.labels.%s=%s", k, s.Labels[k]))
+ }
+ }
+ if len(s.Annotations) > 0 {
+ keys := maps.Keys(s.Annotations)
+ sort.Strings(keys)
+ for _, k := range keys {
+ buf.WriteString(fmt.Sprintf(":metadata.annotations.%s=%s", k, s.Annotations[k]))
+ }
+ }
+ return buf.String()
+}
+
+func (s ManagesSelectors) MatchStage(stage *Stage) bool {
+ for _, t := range s {
+ if t.MatchStage(stage) {
+ return true
+ }
+ }
+ return false
+}
+
+func (s *ManagesSelector) MatchStage(stage *Stage) bool {
+ spec := stage.Spec
+ if !s.MatchResourceRef(&spec.ResourceRef) {
+ return false
+ }
+
+ if spec.Selector != nil {
+ if len(s.Labels) != 0 {
+ ml := spec.Selector.MatchLabels
+ for k, v := range s.Labels {
+ if mv, ok := ml[k]; ok && mv != v {
+ return false
+ }
+ }
+ }
+ if len(s.Annotations) != 0 {
+ ma := spec.Selector.MatchAnnotations
+ for k, v := range s.Annotations {
+ if mv, ok := ma[k]; ok && mv != v {
+ return false
+ }
+ }
+ }
+ }
+
+ return true
+}
+
+func (s *ManagesSelector) MatchResourceRef(ref *StageResourceRef) bool {
+ if s.Kind != ref.Kind {
+ return false
+ }
+
+ gv := schema.GroupVersion{
+ Group: s.Group,
+ Version: s.Version,
+ }
+ apiGroup := gv.String()
+ if apiGroup == "" {
+ apiGroup = "v1"
+ }
+
+ if ref.APIGroup == "" {
+ ref.APIGroup = "v1"
+ }
+
+ if apiGroup != ref.APIGroup {
+ return false
+ }
+
+ return true
+}
diff --git a/pkg/apis/internalversion/kwok_manages_selector_node.go b/pkg/apis/internalversion/kwok_manages_selector_node.go
new file mode 100644
index 000000000..0a2c0fd79
--- /dev/null
+++ b/pkg/apis/internalversion/kwok_manages_selector_node.go
@@ -0,0 +1,91 @@
+/*
+Copyright 2024 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internalversion
+
+import (
+ "fmt"
+
+ "k8s.io/apimachinery/pkg/labels"
+)
+
+// ManageNodeSelector is a struct that holds how to manage nodes.
+type ManageNodeSelector struct {
+ // ManageSingleNode is the option to manage a single node name
+ ManageSingleNode string
+
+ // Default option to manage (i.e., maintain heartbeat/liveness of) all Nodes or not.
+ ManageAllNodes bool
+
+ // Default annotations specified on Nodes to demand manage.
+ ManageNodesWithAnnotationSelector string
+
+ // Default labels specified on Nodes to demand manage.
+ ManageNodesWithLabelSelector string
+}
+
+// IsEmpty means that no node needs to be managed.
+func (n ManageNodeSelector) IsEmpty() bool {
+ return !n.ManageAllNodes &&
+ n.ManageSingleNode == "" &&
+ n.ManageNodesWithAnnotationSelector == "" &&
+ n.ManageNodesWithLabelSelector == ""
+}
+
+// NodeSelector returns the selector of nodes
+func (s ManagesSelectors) NodeSelector() (ManageNodeSelector, error) {
+ var n *ManageNodeSelector
+ for _, sel := range s {
+ // TODO: Node, Lease, Pod can be maintained separately by different controllers.
+ if sel.Kind == "Pod" &&
+ sel.Group == "" &&
+ (sel.Version == "" || sel.Version == "v1") {
+ return ManageNodeSelector{}, fmt.Errorf("unsupported pod selector type")
+ }
+ if sel.Kind == "Lease" &&
+ sel.Group == "coordination.k8s.io" &&
+ (sel.Version == "" || sel.Version == "v1") {
+ return ManageNodeSelector{}, fmt.Errorf("unsupported leases.coordination.k8s.io on kube-node-lease selector type")
+ }
+
+ if sel.Kind != "Node" || sel.Group != "" || !(sel.Version == "" || sel.Version == "v1") {
+ continue
+ }
+
+ // TODO: Support multiple nodes selector
+ if n != nil {
+ return ManageNodeSelector{}, fmt.Errorf("duplicate node selector: %v", sel)
+ }
+
+ n = &ManageNodeSelector{}
+
+ if sel.Name != "" {
+ n.ManageSingleNode = sel.Name
+ }
+ if len(sel.Labels) != 0 {
+ n.ManageNodesWithLabelSelector = labels.Set(sel.Labels).String()
+ }
+ if len(sel.Annotations) != 0 {
+ n.ManageNodesWithAnnotationSelector = labels.Set(sel.Annotations).String()
+ }
+ }
+
+ if n == nil {
+ return ManageNodeSelector{}, nil
+ }
+
+ return *n, nil
+}
diff --git a/pkg/apis/internalversion/kwok_manages_selector_test.go b/pkg/apis/internalversion/kwok_manages_selector_test.go
new file mode 100644
index 000000000..bb650219c
--- /dev/null
+++ b/pkg/apis/internalversion/kwok_manages_selector_test.go
@@ -0,0 +1,114 @@
+/*
+Copyright 2024 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internalversion
+
+import (
+ "reflect"
+ "testing"
+)
+
+func Test_parseManagesSelector(t *testing.T) {
+ tests := []struct {
+ name string
+ args string
+ want *ManagesSelector
+ wantErr bool
+ }{
+ {
+ name: "empty",
+ args: "",
+ wantErr: true,
+ },
+ {
+ name: "pod",
+ args: "pod",
+ want: &ManagesSelector{
+ Kind: "pod",
+ },
+ },
+ {
+ name: "pod v1",
+ args: "pod/v1",
+ want: &ManagesSelector{
+ Kind: "pod",
+ Version: "v1",
+ },
+ },
+ {
+ name: "deploy.apps",
+ args: "deploy.apps",
+ want: &ManagesSelector{
+ Kind: "deploy",
+ Group: "apps",
+ },
+ },
+ {
+ name: "deploy.apps v1",
+ args: "deploy.apps/v1",
+ want: &ManagesSelector{
+ Kind: "deploy",
+ Group: "apps",
+ Version: "v1",
+ },
+ },
+ {
+ name: "pod name=po",
+ args: "pod:metadata.name=po",
+ want: &ManagesSelector{
+ Kind: "pod",
+ Name: "po",
+ },
+ },
+ {
+ name: "pod labels.apps.group=xxx",
+ args: "pod:metadata.labels.apps.group=xxx",
+ want: &ManagesSelector{
+ Kind: "pod",
+ Labels: map[string]string{
+ "apps.group": "xxx",
+ },
+ },
+ },
+ {
+ name: "pod annotations.apps.group=xxx",
+ args: "pod:metadata.annotations.apps.group=xxx",
+ want: &ManagesSelector{
+ Kind: "pod",
+ Annotations: map[string]string{
+ "apps.group": "xxx",
+ },
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := parseManagesSelector(tt.args)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("ParseTargetResourceRef() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("ParseTargetResourceRef() got = %v, want %v", got, tt.want)
+ }
+
+ rev := got.String()
+ if rev != tt.args {
+ t.Errorf("reverse got = %v, want %v", rev, tt.args)
+ }
+ })
+ }
+}
diff --git a/pkg/apis/internalversion/zz_generated.conversion.go b/pkg/apis/internalversion/zz_generated.conversion.go
index 65008ceaa..1da2d2d3b 100644
--- a/pkg/apis/internalversion/zz_generated.conversion.go
+++ b/pkg/apis/internalversion/zz_generated.conversion.go
@@ -410,6 +410,16 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
+ if err := s.AddGeneratedConversionFunc((*ManagesSelector)(nil), (*configv1alpha1.ManagesSelector)(nil), func(a, b interface{}, scope conversion.Scope) error {
+ return Convert_internalversion_ManagesSelector_To_v1alpha1_ManagesSelector(a.(*ManagesSelector), b.(*configv1alpha1.ManagesSelector), scope)
+ }); err != nil {
+ return err
+ }
+ if err := s.AddGeneratedConversionFunc((*configv1alpha1.ManagesSelector)(nil), (*ManagesSelector)(nil), func(a, b interface{}, scope conversion.Scope) error {
+ return Convert_v1alpha1_ManagesSelector_To_internalversion_ManagesSelector(a.(*configv1alpha1.ManagesSelector), b.(*ManagesSelector), scope)
+ }); err != nil {
+ return err
+ }
if err := s.AddGeneratedConversionFunc((*Metric)(nil), (*v1alpha1.Metric)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_internalversion_Metric_To_v1alpha1_Metric(a.(*Metric), b.(*v1alpha1.Metric), scope)
}); err != nil {
@@ -1492,6 +1502,7 @@ func autoConvert_internalversion_KwokConfigurationOptions_To_v1alpha1_KwokConfig
out.NodePort = in.NodePort
out.TLSCertFile = in.TLSCertFile
out.TLSPrivateKeyFile = in.TLSPrivateKeyFile
+ out.Manages = *(*configv1alpha1.ManagesSelectors)(unsafe.Pointer(&in.Manages))
out.ManageSingleNode = in.ManageSingleNode
if err := v1.Convert_bool_To_Pointer_bool(&in.ManageAllNodes, &out.ManageAllNodes, s); err != nil {
return err
@@ -1533,6 +1544,7 @@ func autoConvert_v1alpha1_KwokConfigurationOptions_To_internalversion_KwokConfig
out.NodePort = in.NodePort
out.TLSCertFile = in.TLSCertFile
out.TLSPrivateKeyFile = in.TLSPrivateKeyFile
+ out.Manages = *(*ManagesSelectors)(unsafe.Pointer(&in.Manages))
out.ManageSingleNode = in.ManageSingleNode
if err := v1.Convert_Pointer_bool_To_bool(&in.ManageAllNodes, &out.ManageAllNodes, s); err != nil {
return err
@@ -2004,6 +2016,36 @@ func Convert_v1alpha1_LogsSpec_To_internalversion_LogsSpec(in *v1alpha1.LogsSpec
return autoConvert_v1alpha1_LogsSpec_To_internalversion_LogsSpec(in, out, s)
}
+func autoConvert_internalversion_ManagesSelector_To_v1alpha1_ManagesSelector(in *ManagesSelector, out *configv1alpha1.ManagesSelector, s conversion.Scope) error {
+ out.Kind = in.Kind
+ out.Group = in.Group
+ out.Version = in.Version
+ out.Name = in.Name
+ out.Labels = *(*map[string]string)(unsafe.Pointer(&in.Labels))
+ out.Annotations = *(*map[string]string)(unsafe.Pointer(&in.Annotations))
+ return nil
+}
+
+// Convert_internalversion_ManagesSelector_To_v1alpha1_ManagesSelector is an autogenerated conversion function.
+func Convert_internalversion_ManagesSelector_To_v1alpha1_ManagesSelector(in *ManagesSelector, out *configv1alpha1.ManagesSelector, s conversion.Scope) error {
+ return autoConvert_internalversion_ManagesSelector_To_v1alpha1_ManagesSelector(in, out, s)
+}
+
+func autoConvert_v1alpha1_ManagesSelector_To_internalversion_ManagesSelector(in *configv1alpha1.ManagesSelector, out *ManagesSelector, s conversion.Scope) error {
+ out.Kind = in.Kind
+ out.Group = in.Group
+ out.Version = in.Version
+ out.Name = in.Name
+ out.Labels = *(*map[string]string)(unsafe.Pointer(&in.Labels))
+ out.Annotations = *(*map[string]string)(unsafe.Pointer(&in.Annotations))
+ return nil
+}
+
+// Convert_v1alpha1_ManagesSelector_To_internalversion_ManagesSelector is an autogenerated conversion function.
+func Convert_v1alpha1_ManagesSelector_To_internalversion_ManagesSelector(in *configv1alpha1.ManagesSelector, out *ManagesSelector, s conversion.Scope) error {
+ return autoConvert_v1alpha1_ManagesSelector_To_internalversion_ManagesSelector(in, out, s)
+}
+
func autoConvert_internalversion_Metric_To_v1alpha1_Metric(in *Metric, out *v1alpha1.Metric, s conversion.Scope) error {
out.ObjectMeta = in.ObjectMeta
if err := Convert_internalversion_MetricSpec_To_v1alpha1_MetricSpec(&in.Spec, &out.Spec, s); err != nil {
diff --git a/pkg/apis/internalversion/zz_generated.deepcopy.go b/pkg/apis/internalversion/zz_generated.deepcopy.go
index 3bbf08b6a..fb56747cb 100644
--- a/pkg/apis/internalversion/zz_generated.deepcopy.go
+++ b/pkg/apis/internalversion/zz_generated.deepcopy.go
@@ -682,6 +682,13 @@ func (in *KwokConfigurationOptions) DeepCopyInto(out *KwokConfigurationOptions)
*out = make([]string, len(*in))
copy(*out, *in)
}
+ if in.Manages != nil {
+ in, out := &in.Manages, &out.Manages
+ *out = make(ManagesSelectors, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
return
}
@@ -859,6 +866,74 @@ func (in *LogsSpec) DeepCopy() *LogsSpec {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ManageNodeSelector) DeepCopyInto(out *ManageNodeSelector) {
+ *out = *in
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ManageNodeSelector.
+func (in *ManageNodeSelector) DeepCopy() *ManageNodeSelector {
+ if in == nil {
+ return nil
+ }
+ out := new(ManageNodeSelector)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ManagesSelector) DeepCopyInto(out *ManagesSelector) {
+ *out = *in
+ if in.Labels != nil {
+ in, out := &in.Labels, &out.Labels
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ if in.Annotations != nil {
+ in, out := &in.Annotations, &out.Annotations
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ManagesSelector.
+func (in *ManagesSelector) DeepCopy() *ManagesSelector {
+ if in == nil {
+ return nil
+ }
+ out := new(ManagesSelector)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in ManagesSelectors) DeepCopyInto(out *ManagesSelectors) {
+ {
+ in := &in
+ *out = make(ManagesSelectors, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ return
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ManagesSelectors.
+func (in ManagesSelectors) DeepCopy() ManagesSelectors {
+ if in == nil {
+ return nil
+ }
+ out := new(ManagesSelectors)
+ in.DeepCopyInto(out)
+ return *out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Metric) DeepCopyInto(out *Metric) {
*out = *in
diff --git a/pkg/kwok/cmd/root.go b/pkg/kwok/cmd/root.go
index c49ee08d6..8876dffb7 100644
--- a/pkg/kwok/cmd/root.go
+++ b/pkg/kwok/cmd/root.go
@@ -82,6 +82,7 @@ func NewCommand(ctx context.Context) *cobra.Command {
cmd.Flags().IntVar(&flags.Options.NodePort, "node-port", flags.Options.NodePort, "Port of the node")
cmd.Flags().StringVar(&flags.Options.TLSCertFile, "tls-cert-file", flags.Options.TLSCertFile, "File containing the default x509 Certificate for HTTPS")
cmd.Flags().StringVar(&flags.Options.TLSPrivateKeyFile, "tls-private-key-file", flags.Options.TLSPrivateKeyFile, "File containing the default x509 private key matching --tls-cert-file")
+ cmd.Flags().Var(&flags.Options.Manages, "manage", "Manages resources")
cmd.Flags().StringVar(&flags.Options.ManageSingleNode, "manage-single-node", flags.Options.ManageSingleNode, "Node that matches the name will be watched and managed. It's conflicted with manage-nodes-with-annotation-selector, manage-nodes-with-label-selector and manage-all-nodes.")
cmd.Flags().BoolVar(&flags.Options.ManageAllNodes, "manage-all-nodes", flags.Options.ManageAllNodes, "All nodes will be watched and managed. It's conflicted with manage-nodes-with-annotation-selector, manage-nodes-with-label-selector and manage-single-node.")
cmd.Flags().StringVar(&flags.Options.ManageNodesWithAnnotationSelector, "manage-nodes-with-annotation-selector", flags.Options.ManageNodesWithAnnotationSelector, "Nodes that match the annotation selector will be watched and managed. It's conflicted with manage-all-nodes and manage-single-node.")
@@ -215,18 +216,31 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}
- switch {
- case flags.Options.ManageSingleNode != "":
- logger.Info("Watch single node",
- "node", flags.Options.ManageSingleNode,
- )
- case flags.Options.ManageAllNodes:
- logger.Info("Watch all nodes")
- case flags.Options.ManageNodesWithAnnotationSelector != "" || flags.Options.ManageNodesWithLabelSelector != "":
- logger.Info("Watch nodes",
- "annotation", flags.Options.ManageNodesWithAnnotationSelector,
- "label", flags.Options.ManageNodesWithLabelSelector,
- )
+ manages := flags.Options.Manages
+ var nodeSel internalversion.ManageNodeSelector
+ if len(manages) != 0 {
+ nodeSel, err = manages.NodeSelector()
+ if err != nil {
+ return err
+ }
+ } else {
+ switch {
+ case flags.Options.ManageSingleNode != "":
+ logger.Info("Watch single node",
+ "node", flags.Options.ManageSingleNode,
+ )
+ nodeSel.ManageSingleNode = flags.Options.ManageSingleNode
+ case flags.Options.ManageAllNodes:
+ logger.Info("Watch all nodes")
+ nodeSel.ManageAllNodes = true
+ case flags.Options.ManageNodesWithAnnotationSelector != "" || flags.Options.ManageNodesWithLabelSelector != "":
+ logger.Info("Watch nodes",
+ "annotation", flags.Options.ManageNodesWithAnnotationSelector,
+ "label", flags.Options.ManageNodesWithLabelSelector,
+ )
+ nodeSel.ManageNodesWithLabelSelector = flags.Options.ManageNodesWithLabelSelector
+ nodeSel.ManageNodesWithAnnotationSelector = flags.Options.ManageNodesWithAnnotationSelector
+ }
}
id, err := controllers.Identity()
@@ -248,10 +262,12 @@ func runE(ctx context.Context, flags *flagpole) error {
EnableCNI: flags.Options.EnableCNI,
EnableMetrics: enableMetrics,
EnablePodCache: enableMetrics,
- ManageSingleNode: flags.Options.ManageSingleNode,
- ManageAllNodes: flags.Options.ManageAllNodes,
- ManageNodesWithAnnotationSelector: flags.Options.ManageNodesWithAnnotationSelector,
- ManageNodesWithLabelSelector: flags.Options.ManageNodesWithLabelSelector,
+ Manages: manages,
+ NoManageNode: nodeSel.IsEmpty(),
+ ManageSingleNode: nodeSel.ManageSingleNode,
+ ManageAllNodes: nodeSel.ManageAllNodes,
+ ManageNodesWithAnnotationSelector: nodeSel.ManageNodesWithAnnotationSelector,
+ ManageNodesWithLabelSelector: nodeSel.ManageNodesWithLabelSelector,
DisregardStatusWithAnnotationSelector: flags.Options.DisregardStatusWithAnnotationSelector,
DisregardStatusWithLabelSelector: flags.Options.DisregardStatusWithLabelSelector,
CIDR: flags.Options.CIDR,
@@ -274,7 +290,7 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}
- err = startServer(ctx, flags, ctr, typedKwokClient)
+ err = startServer(ctx, flags, ctr, typedKwokClient, nodeSel)
if err != nil {
return err
}
@@ -283,7 +299,7 @@ func runE(ctx context.Context, flags *flagpole) error {
return nil
}
-func startServer(ctx context.Context, flags *flagpole, ctr *controllers.Controller, typedKwokClient versioned.Interface) (err error) {
+func startServer(ctx context.Context, flags *flagpole, ctr *controllers.Controller, typedKwokClient versioned.Interface, nodeSelector internalversion.ManageNodeSelector) (err error) {
logger := log.FromContext(ctx)
serverAddress := flags.Options.ServerAddress
@@ -292,99 +308,95 @@ func startServer(ctx context.Context, flags *flagpole, ctr *controllers.Controll
}
if serverAddress != "" {
- clusterPortForwards := config.FilterWithTypeFromContext[*internalversion.ClusterPortForward](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterPortForwardKind, clusterPortForwards)
- if err != nil {
- return err
+ mangeNode := !nodeSelector.IsEmpty()
+ conf := server.Config{
+ TypedKwokClient: typedKwokClient,
+ NoManageNode: nodeSelector.IsEmpty(),
+ EnableCRDs: flags.Options.EnableCRDs,
+ DataSource: ctr,
+ NodeCacheGetter: ctr.GetNodeCache(),
+ PodCacheGetter: ctr.GetPodCache(),
}
- portForwards := config.FilterWithTypeFromContext[*internalversion.PortForward](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.PortForwardKind, portForwards)
- if err != nil {
- return err
- }
+ if mangeNode {
+ conf.ClusterPortForwards = config.FilterWithTypeFromContext[*internalversion.ClusterPortForward](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterPortForwardKind, conf.ClusterPortForwards)
+ if err != nil {
+ return err
+ }
- clusterExecs := config.FilterWithTypeFromContext[*internalversion.ClusterExec](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterExecKind, clusterExecs)
- if err != nil {
- return err
- }
+ conf.PortForwards = config.FilterWithTypeFromContext[*internalversion.PortForward](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.PortForwardKind, conf.PortForwards)
+ if err != nil {
+ return err
+ }
- execs := config.FilterWithTypeFromContext[*internalversion.Exec](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ExecKind, execs)
- if err != nil {
- return err
- }
+ conf.ClusterExecs = config.FilterWithTypeFromContext[*internalversion.ClusterExec](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterExecKind, conf.ClusterExecs)
+ if err != nil {
+ return err
+ }
- clusterLogs := config.FilterWithTypeFromContext[*internalversion.ClusterLogs](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterLogsKind, clusterLogs)
- if err != nil {
- return err
- }
+ conf.Execs = config.FilterWithTypeFromContext[*internalversion.Exec](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ExecKind, conf.Execs)
+ if err != nil {
+ return err
+ }
- logs := config.FilterWithTypeFromContext[*internalversion.Logs](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.LogsKind, logs)
- if err != nil {
- return err
- }
+ conf.ClusterLogs = config.FilterWithTypeFromContext[*internalversion.ClusterLogs](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterLogsKind, conf.ClusterLogs)
+ if err != nil {
+ return err
+ }
- clusterAttaches := config.FilterWithTypeFromContext[*internalversion.ClusterAttach](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterAttachKind, clusterAttaches)
- if err != nil {
- return err
- }
+ conf.Logs = config.FilterWithTypeFromContext[*internalversion.Logs](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.LogsKind, conf.Logs)
+ if err != nil {
+ return err
+ }
- attaches := config.FilterWithTypeFromContext[*internalversion.Attach](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.AttachKind, attaches)
- if err != nil {
- return err
- }
+ conf.ClusterAttaches = config.FilterWithTypeFromContext[*internalversion.ClusterAttach](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterAttachKind, conf.ClusterAttaches)
+ if err != nil {
+ return err
+ }
- clusterResourceUsages := config.FilterWithTypeFromContext[*internalversion.ClusterResourceUsage](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterResourceUsageKind, clusterResourceUsages)
- if err != nil {
- return err
- }
+ conf.Attaches = config.FilterWithTypeFromContext[*internalversion.Attach](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.AttachKind, conf.Attaches)
+ if err != nil {
+ return err
+ }
- resourceUsages := config.FilterWithTypeFromContext[*internalversion.ResourceUsage](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ResourceUsageKind, resourceUsages)
- if err != nil {
- return err
+ conf.ClusterResourceUsages = config.FilterWithTypeFromContext[*internalversion.ClusterResourceUsage](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterResourceUsageKind, conf.ClusterResourceUsages)
+ if err != nil {
+ return err
+ }
+
+ conf.ResourceUsages = config.FilterWithTypeFromContext[*internalversion.ResourceUsage](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ResourceUsageKind, conf.ResourceUsages)
+ if err != nil {
+ return err
+ }
}
- metrics := config.FilterWithTypeFromContext[*internalversion.Metric](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.MetricKind, metrics)
+ conf.Metrics = config.FilterWithTypeFromContext[*internalversion.Metric](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.MetricKind, conf.Metrics)
if err != nil {
return err
}
- conf := server.Config{
- TypedKwokClient: typedKwokClient,
- EnableCRDs: flags.Options.EnableCRDs,
- ClusterPortForwards: clusterPortForwards,
- PortForwards: portForwards,
- ClusterExecs: clusterExecs,
- Execs: execs,
- ClusterLogs: clusterLogs,
- Logs: logs,
- ClusterAttaches: clusterAttaches,
- Attaches: attaches,
- ClusterResourceUsages: clusterResourceUsages,
- ResourceUsages: resourceUsages,
- Metrics: metrics,
- DataSource: ctr,
- NodeCacheGetter: ctr.GetNodeCache(),
- PodCacheGetter: ctr.GetPodCache(),
- }
svc, err := server.NewServer(conf)
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}
svc.InstallHealthz()
- svc.InstallServiceDiscovery()
+ if mangeNode {
+ svc.InstallServiceDiscovery()
+ }
- if flags.Options.EnableDebuggingHandlers {
+ if mangeNode && flags.Options.EnableDebuggingHandlers {
svc.InstallDebuggingHandlers()
svc.InstallProfilingHandler(flags.Options.EnableProfilingHandler, flags.Options.EnableContentionProfiling)
} else {
diff --git a/pkg/kwok/controllers/controller.go b/pkg/kwok/controllers/controller.go
index 1b39c798c..916e4c9a8 100644
--- a/pkg/kwok/controllers/controller.go
+++ b/pkg/kwok/controllers/controller.go
@@ -76,6 +76,7 @@ type Controller struct {
onNodeUnmanagedFunc func(nodeName string)
readOnlyFunc func(nodeName string) bool
+ noManageNode bool
manageNodesWithLabelSelector string
manageNodesWithAnnotationSelector string
manageNodesWithFieldSelector string
@@ -107,6 +108,8 @@ type Config struct {
RESTMapper meta.RESTMapper
TypedClient kubernetes.Interface
TypedKwokClient versioned.Interface
+ Manages internalversion.ManagesSelectors
+ NoManageNode bool
ManageSingleNode string
ManageAllNodes bool
ManageNodesWithAnnotationSelector string
@@ -167,61 +170,65 @@ func (c *Controller) init(ctx context.Context) (err error) {
return fmt.Errorf("controller already started")
}
- switch {
- case c.conf.ManageSingleNode != "":
- c.managePodsWithFieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.conf.ManageSingleNode).String()
- c.manageNodesWithFieldSelector = fields.OneTermEqualSelector("metadata.name", c.conf.ManageSingleNode).String()
- c.manageNodeLeasesWithFieldSelector = fields.OneTermEqualSelector("metadata.name", c.conf.ManageSingleNode).String()
- case c.conf.ManageAllNodes:
- c.managePodsWithFieldSelector = fields.OneTermNotEqualSelector("spec.nodeName", "").String()
- case c.conf.ManageNodesWithLabelSelector != "" || c.conf.ManageNodesWithAnnotationSelector != "":
- c.manageNodesWithLabelSelector = c.conf.ManageNodesWithLabelSelector
- c.manageNodesWithAnnotationSelector = c.conf.ManageNodesWithAnnotationSelector
- c.managePodsWithFieldSelector = fields.OneTermNotEqualSelector("spec.nodeName", "").String()
- }
-
c.broadcaster = record.NewBroadcaster()
c.recorder = c.broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"})
c.broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: c.conf.TypedClient.CoreV1().Events("")})
+ c.patchMeta = patch.NewPatchMetaFromOpenAPI3(c.conf.RESTClient)
- c.nodesChan = make(chan informer.Event[*corev1.Node], 1)
- c.podsChan = make(chan informer.Event[*corev1.Pod], 1)
+ if c.conf.NoManageNode {
+ c.noManageNode = true
+ } else {
+ c.nodesChan = make(chan informer.Event[*corev1.Node], 1)
+ c.podsChan = make(chan informer.Event[*corev1.Pod], 1)
+
+ switch {
+ case c.conf.ManageSingleNode != "":
+ c.managePodsWithFieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.conf.ManageSingleNode).String()
+ c.manageNodesWithFieldSelector = fields.OneTermEqualSelector("metadata.name", c.conf.ManageSingleNode).String()
+ c.manageNodeLeasesWithFieldSelector = fields.OneTermEqualSelector("metadata.name", c.conf.ManageSingleNode).String()
+ case c.conf.ManageAllNodes:
+ c.managePodsWithFieldSelector = fields.OneTermNotEqualSelector("spec.nodeName", "").String()
+ case c.conf.ManageNodesWithLabelSelector != "" || c.conf.ManageNodesWithAnnotationSelector != "":
+ c.manageNodesWithLabelSelector = c.conf.ManageNodesWithLabelSelector
+ c.manageNodesWithAnnotationSelector = c.conf.ManageNodesWithAnnotationSelector
+ c.managePodsWithFieldSelector = fields.OneTermNotEqualSelector("spec.nodeName", "").String()
+ }
- nodesCli := c.conf.TypedClient.CoreV1().Nodes()
- c.nodesInformer = informer.NewInformer[*corev1.Node, *corev1.NodeList](nodesCli)
- c.nodeCacheGetter, err = c.nodesInformer.WatchWithCache(ctx, informer.Option{
- LabelSelector: c.manageNodesWithLabelSelector,
- AnnotationSelector: c.manageNodesWithAnnotationSelector,
- FieldSelector: c.manageNodesWithFieldSelector,
- }, c.nodesChan)
- if err != nil {
- return fmt.Errorf("failed to watch nodes: %w", err)
- }
+ nodesCli := c.conf.TypedClient.CoreV1().Nodes()
+ c.nodesInformer = informer.NewInformer[*corev1.Node, *corev1.NodeList](nodesCli)
+ c.nodeCacheGetter, err = c.nodesInformer.WatchWithCache(ctx, informer.Option{
+ LabelSelector: c.manageNodesWithLabelSelector,
+ AnnotationSelector: c.manageNodesWithAnnotationSelector,
+ FieldSelector: c.manageNodesWithFieldSelector,
+ }, c.nodesChan)
+ if err != nil {
+ return fmt.Errorf("failed to watch nodes: %w", err)
+ }
- podsCli := c.conf.TypedClient.CoreV1().Pods(corev1.NamespaceAll)
- c.podsInformer = informer.NewInformer[*corev1.Pod, *corev1.PodList](podsCli)
+ podsCli := c.conf.TypedClient.CoreV1().Pods(corev1.NamespaceAll)
+ c.podsInformer = informer.NewInformer[*corev1.Pod, *corev1.PodList](podsCli)
- podWatchOption := informer.Option{
- FieldSelector: c.managePodsWithFieldSelector,
- }
- if c.conf.EnablePodCache {
- c.podCacheGetter, err = c.podsInformer.WatchWithLazyCache(ctx, podWatchOption, c.podsChan)
- } else {
- err = c.podsInformer.Watch(ctx, podWatchOption, c.podsChan)
- }
- if err != nil {
- return fmt.Errorf("failed to watch pods: %w", err)
- }
+ podWatchOption := informer.Option{
+ FieldSelector: c.managePodsWithFieldSelector,
+ }
+ if c.conf.EnablePodCache {
+ c.podCacheGetter, err = c.podsInformer.WatchWithLazyCache(ctx, podWatchOption, c.podsChan)
+ } else {
+ err = c.podsInformer.Watch(ctx, podWatchOption, c.podsChan)
+ }
+ if err != nil {
+ return fmt.Errorf("failed to watch pods: %w", err)
+ }
- if c.conf.NodeLeaseDurationSeconds != 0 {
- nodeLeasesCli := c.conf.TypedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease)
- c.nodeLeasesInformer = informer.NewInformer[*coordinationv1.Lease, *coordinationv1.LeaseList](nodeLeasesCli)
- }
+ if c.conf.NodeLeaseDurationSeconds != 0 {
+ nodeLeasesCli := c.conf.TypedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease)
+ c.nodeLeasesInformer = informer.NewInformer[*coordinationv1.Lease, *coordinationv1.LeaseList](nodeLeasesCli)
+ }
- c.patchMeta = patch.NewPatchMetaFromOpenAPI3(c.conf.RESTClient)
+ c.podOnNodeManageQueue = queue.NewQueue[string]()
+ c.nodeManageQueue = queue.NewQueue[string]()
+ }
- c.podOnNodeManageQueue = queue.NewQueue[string]()
- c.nodeManageQueue = queue.NewQueue[string]()
return nil
}
@@ -363,6 +370,7 @@ func (c *Controller) startStageController(ctx context.Context, ref internalversi
func (c *Controller) initStagesManager(ctx context.Context) error {
logger := log.FromContext(ctx)
+ manages := c.conf.Manages
c.stageGetter = resources.NewDynamicGetter[
[]*internalversion.Stage,
*v1alpha1.Stage,
@@ -376,6 +384,10 @@ func (c *Controller) initStagesManager(ctx context.Context) error {
logger.Error("failed to convert to internal stage", err, "obj", obj)
return nil, false
}
+
+ if manages != nil && !manages.MatchStage(r) {
+ return nil, false
+ }
return r, true
})
},
@@ -499,6 +511,8 @@ func (c *Controller) initStageController(ctx context.Context, ref internalversio
logger.Info("watching stages", "gvr", gvr)
stageInformer := informer.NewInformer[*unstructured.Unstructured, *unstructured.UnstructuredList](c.conf.DynamicClient.Resource(gvr))
stageChan := make(chan informer.Event[*unstructured.Unstructured], 1)
+
+ // TODO: Support filters with manages
err = stageInformer.Watch(ctx, informer.Option{}, stageChan)
if err != nil {
return fmt.Errorf("failed to watch stages: %w", err)
@@ -537,7 +551,12 @@ func (c *Controller) Start(ctx context.Context) error {
}
if len(c.conf.LocalStages) != 0 {
+ manages := c.conf.Manages
+
for ref, stage := range c.conf.LocalStages {
+ if manages != nil {
+ stage = slices.Filter(stage, manages.MatchStage)
+ }
lifecycle, err := lifecycle.NewLifecycle(stage)
if err != nil {
return err
diff --git a/pkg/kwok/server/server.go b/pkg/kwok/server/server.go
index 30608a85d..f9816abe3 100644
--- a/pkg/kwok/server/server.go
+++ b/pkg/kwok/server/server.go
@@ -55,6 +55,8 @@ type Server struct {
enableCRDs []string
+ noManageNode bool
+
restfulCont *restful.Container
idleTimeout time.Duration
@@ -97,6 +99,8 @@ type Config struct {
TypedKwokClient versioned.Interface
EnableCRDs []string
+ NoManageNode bool
+
ClusterPortForwards []*internalversion.ClusterPortForward
PortForwards []*internalversion.PortForward
ClusterExecs []*internalversion.ClusterExec
@@ -125,6 +129,8 @@ func NewServer(conf Config) (*Server, error) {
idleTimeout: 1 * time.Hour,
streamCreationTimeout: remotecommandconsts.DefaultStreamCreationTimeout,
+ noManageNode: conf.NoManageNode,
+
clusterPortForwards: resources.NewStaticGetter(conf.ClusterPortForwards),
portForwards: resources.NewStaticGetter(conf.PortForwards),
clusterExecs: resources.NewStaticGetter(conf.ClusterExecs),
@@ -161,6 +167,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
for _, crd := range s.enableCRDs {
switch crd {
case v1alpha1.ClusterPortForwardKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.clusterPortForwards.Get()) != 0 {
return nil, fmt.Errorf("cluster port forwards already exists, cannot watch CRD")
}
@@ -184,6 +193,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterPortForwards)
s.clusterPortForwards = clusterPortForwards
case v1alpha1.PortForwardKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.portForwards.Get()) != 0 {
return nil, fmt.Errorf("port forwards already exists, cannot watch CRD")
}
@@ -207,6 +219,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, portForwards)
s.portForwards = portForwards
case v1alpha1.ClusterExecKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.clusterExecs.Get()) != 0 {
return nil, fmt.Errorf("cluster execs already exists, cannot watch CRD")
}
@@ -230,6 +245,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterExecs)
s.clusterExecs = clusterExecs
case v1alpha1.ExecKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.execs.Get()) != 0 {
return nil, fmt.Errorf("execs already exists, cannot watch CRD")
}
@@ -253,6 +271,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, execs)
s.execs = execs
case v1alpha1.ClusterLogsKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.clusterLogs.Get()) != 0 {
return nil, fmt.Errorf("cluster logs already exists, cannot watch CRD")
}
@@ -276,6 +297,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterLogs)
s.clusterLogs = clusterLogs
case v1alpha1.LogsKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.logs.Get()) != 0 {
return nil, fmt.Errorf("logs already exists, cannot watch CRD")
}
@@ -299,6 +323,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, logs)
s.logs = logs
case v1alpha1.ClusterAttachKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.clusterAttaches.Get()) != 0 {
return nil, fmt.Errorf("cluster attaches already exists, cannot watch CRD")
}
@@ -322,6 +349,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterAttaches)
s.clusterAttaches = clusterAttaches
case v1alpha1.AttachKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.attaches.Get()) != 0 {
return nil, fmt.Errorf("attaches already exists, cannot watch CRD")
}
@@ -345,6 +375,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, attaches)
s.attaches = attaches
case v1alpha1.ClusterResourceUsageKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.clusterResourceUsages.Get()) != 0 {
return nil, fmt.Errorf("cluster resource usage already exists, cannot watch CRD")
}
@@ -368,6 +401,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterResourceUsages)
s.clusterResourceUsages = clusterResourceUsages
case v1alpha1.ResourceUsageKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.resourceUsages.Get()) != 0 {
return nil, fmt.Errorf("resource usage already exists, cannot watch CRD")
}
diff --git a/site/content/en/docs/generated/apis.md b/site/content/en/docs/generated/apis.md
index 526dc5a77..c14037390 100644
--- a/site/content/en/docs/generated/apis.md
+++ b/site/content/en/docs/generated/apis.md
@@ -141,6 +141,9 @@ Resource Types:
KwokctlResource
+
+
+ManagesSelector
KwokConfiguration
@@ -382,6 +385,112 @@ string
+
+ManagesSelector
+ #
+
+
+
ManagesSelector holds information about the manages selector.
+
+
+
+
+Field |
+Description |
+
+
+
+
+
+apiVersion
+string
+ |
+
+
+config.kwok.x-k8s.io/v1alpha1
+
+ |
+
+
+
+kind
+string
+ |
+ManagesSelector |
+
+
+
+kind
+
+string
+
+ |
+
+ Kind of the referent.
+ |
+
+
+
+group
+
+string
+
+ |
+
+ Group of the referent.
+ |
+
+
+
+version
+
+string
+
+ |
+
+ Version of the referent.
+ |
+
+
+
+name
+
+string
+
+ |
+
+ Name of the referent
+Only available with Node Kind.
+ |
+
+
+
+labels
+
+map[string]string
+
+ |
+
+ Labels of the referent.
+specify matched with labels.
+Only available with Node Kind.
+ |
+
+
+
+annotations
+
+map[string]string
+
+ |
+
+ Annotations of the referent.
+specify matched with annotations.
+Only available with Node Kind.
+ |
+
+
+
kwok.x-k8s.io/v1alpha1
#
@@ -2447,6 +2556,19 @@ is the default value for flag –tls-private-key-file
+manages
+
+
+ManagesSelectors
+
+
+ |
+
+ Manages is the option to manage an resources
+ |
+
+
+
manageSingleNode
string
@@ -3736,6 +3858,18 @@ string
|
+
+ManagesSelectors
+([]sigs.k8s.io/kwok/pkg/apis/config/v1alpha1.ManagesSelector
alias)
+ #
+
+
+Appears on:
+KwokConfigurationOptions
+
+
+
ManagesSelectors holds information about the manages selectors.
+
Port
#
diff --git a/site/content/en/docs/generated/kwok.md b/site/content/en/docs/generated/kwok.md
index 19c736fc1..2b9f986e3 100644
--- a/site/content/en/docs/generated/kwok.md
+++ b/site/content/en/docs/generated/kwok.md
@@ -14,6 +14,7 @@ kwok [flags]
--enable-crds strings List of CRDs to enable
-h, --help help for kwok
--kubeconfig string Path to the kubeconfig file to use (default "~/.kube/config")
+ --manage ManagesSelectorSlice Manages resources
--manage-all-nodes All nodes will be watched and managed. It's conflicted with manage-nodes-with-annotation-selector, manage-nodes-with-label-selector and manage-single-node.
--manage-nodes-with-annotation-selector string Nodes that match the annotation selector will be watched and managed. It's conflicted with manage-all-nodes and manage-single-node.
--manage-nodes-with-label-selector string Nodes that match the label selector will be watched and managed. It's conflicted with manage-all-nodes and manage-single-node.