From e9e937e612da9abe2902db1439f75fb2a0ebc4c8 Mon Sep 17 00:00:00 2001 From: Jakub Dyszkiewicz Date: Mon, 22 Jan 2024 12:44:10 +0100 Subject: [PATCH] fix(vips): skip ignored listeners (#8937) Signed-off-by: Jakub Dyszkiewicz --- pkg/dns/vips_allocator.go | 3 + pkg/dns/vips_allocator_test.go | 24 +++ .../runtime/k8s/controllers/endpoints.go | 3 + .../resources/samples/dataplane_samples.go | 6 + .../kubernetes/graceful/change_service.go | 190 ++++++++++++++++++ 5 files changed, 226 insertions(+) create mode 100644 test/e2e_env/kubernetes/graceful/change_service.go diff --git a/pkg/dns/vips_allocator.go b/pkg/dns/vips_allocator.go index fc7a93eace49..a70f30557df3 100644 --- a/pkg/dns/vips_allocator.go +++ b/pkg/dns/vips_allocator.go @@ -195,6 +195,9 @@ func (d *VIPsAllocator) BuildVirtualOutboundMeshView(ctx context.Context, mesh s var errs error for _, dp := range dataplanes.Items { for _, inbound := range dp.Spec.GetNetworking().GetInbound() { + if inbound.State == mesh_proto.Dataplane_Networking_Inbound_Ignored { + continue + } if d.serviceVipEnabled { errs = multierr.Append(errs, addDefault(outboundSet, inbound.GetService(), 0)) } diff --git a/pkg/dns/vips_allocator_test.go b/pkg/dns/vips_allocator_test.go index c0b7f4ecb06e..4c5d022fb3dd 100644 --- a/pkg/dns/vips_allocator_test.go +++ b/pkg/dns/vips_allocator_test.go @@ -18,6 +18,7 @@ import ( "github.com/kumahq/kuma/pkg/dns" "github.com/kumahq/kuma/pkg/dns/vips" "github.com/kumahq/kuma/pkg/plugins/resources/memory" + "github.com/kumahq/kuma/pkg/test/resources/samples" ) func dpWithTags(tags ...map[string]string) *mesh_proto.Dataplane { @@ -752,6 +753,29 @@ var _ = DescribeTable("outboundView", }, }, }), + Entry("skip ignored listener", outboundViewTestCase{ + givenResources: map[model.ResourceKey]model.Resource{ + model.WithMesh("mesh", "dp-1"): samples.IgnoredDataplaneBackendBuilder().WithMesh("mesh").Build(), + model.WithMesh("mesh", "vob-1"): &mesh.VirtualOutboundResource{ + Spec: &mesh_proto.VirtualOutbound{ + Selectors: []*mesh_proto.Selector{ + {Match: map[string]string{mesh_proto.ServiceTag: "*"}}, + }, + Conf: &mesh_proto.VirtualOutbound_Conf{ + Host: "{{.srv}}.mesh", + Port: "8080", + Parameters: []*mesh_proto.VirtualOutbound_Conf_TemplateParameter{ + {Name: "srv", TagKey: mesh_proto.ServiceTag}, + {Name: "port"}, + }, + }, + }, + }, + }, + whenMesh: "mesh", + thenHostnameEntries: []vips.HostnameEntry{}, + thenOutbounds: map[vips.HostnameEntry][]vips.OutboundEntry{}, + }), ) var _ = Describe("AllocateVIPs", func() { diff --git a/pkg/plugins/runtime/k8s/controllers/endpoints.go b/pkg/plugins/runtime/k8s/controllers/endpoints.go index e4e800dad0aa..181491d5db1e 100644 --- a/pkg/plugins/runtime/k8s/controllers/endpoints.go +++ b/pkg/plugins/runtime/k8s/controllers/endpoints.go @@ -28,6 +28,9 @@ func endpointsByService(dataplanes []*core_mesh.DataplaneResource) EndpointsBySe result := EndpointsByService{} for _, other := range dataplanes { for _, inbound := range other.Spec.Networking.GetInbound() { + if inbound.State == mesh_proto.Dataplane_Networking_Inbound_Ignored { + continue + } svc, ok := inbound.GetTags()[mesh_proto.ServiceTag] if !ok { continue diff --git a/pkg/test/resources/samples/dataplane_samples.go b/pkg/test/resources/samples/dataplane_samples.go index 2f160bd1cecc..94d2de803f01 100644 --- a/pkg/test/resources/samples/dataplane_samples.go +++ b/pkg/test/resources/samples/dataplane_samples.go @@ -35,3 +35,9 @@ func GatewayDataplane() *mesh.DataplaneResource { WithBuiltInGateway("sample-gateway"). Build() } + +func IgnoredDataplaneBackendBuilder() *builders.DataplaneBuilder { + return DataplaneBackendBuilder().With(func(resource *mesh.DataplaneResource) { + resource.Spec.Networking.Inbound[0].State = mesh_proto.Dataplane_Networking_Inbound_Ignored + }) +} diff --git a/test/e2e_env/kubernetes/graceful/change_service.go b/test/e2e_env/kubernetes/graceful/change_service.go new file mode 100644 index 000000000000..d27cdc2fded9 --- /dev/null +++ b/test/e2e_env/kubernetes/graceful/change_service.go @@ -0,0 +1,190 @@ +package graceful + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + kube_meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/kumahq/kuma/pkg/plugins/policies/meshretry/api/v1alpha1" + "github.com/kumahq/kuma/pkg/util/channels" + "github.com/kumahq/kuma/pkg/util/pointer" + . "github.com/kumahq/kuma/test/framework" + "github.com/kumahq/kuma/test/framework/client" + "github.com/kumahq/kuma/test/framework/deployments/testserver" + "github.com/kumahq/kuma/test/framework/envs/kubernetes" +) + +func ChangeService() { + const namespace = "changesvc" + const mesh = "changesvc" + + firstTestServerLabels := map[string]string{ + "app": "test-server", + "changesvc-test-label": "first", + } + + secondTestServerLabels := map[string]string{ + "app": "test-server", + "changesvc-test-label": "second", + } + + thirdTestServerLabels := map[string]string{ + "kuma.io/sidecar-injection": "disabled", + "app": "test-server", + "changesvc-test-label": "third", + } + + newSvc := func(selector map[string]string) *corev1.Service { + return &corev1.Service{ + TypeMeta: kube_meta.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: kube_meta.ObjectMeta{ + Name: "test-server", + Namespace: namespace, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "main", + Port: int32(80), + TargetPort: intstr.FromString("main"), + AppProtocol: pointer.To("htt"), + }, + }, + Selector: selector, + }, + } + } + + BeforeAll(func() { + err := NewClusterSetup(). + Install(MTLSMeshKubernetes(mesh)). + Install(MeshTrafficPermissionAllowAllKubernetes(mesh)). + Install(NamespaceWithSidecarInjection(namespace)). + Install(testserver.Install( + testserver.WithNamespace(namespace), + testserver.WithMesh(mesh), + testserver.WithName("demo-client"), + )). + Install(testserver.Install( + testserver.WithNamespace(namespace), + testserver.WithMesh(mesh), + testserver.WithName("test-server-first"), + testserver.WithEchoArgs("echo", "--instance", "test-server-first"), + testserver.WithoutService(), + testserver.WithoutWaitingToBeReady(), // WaitForPods assumes that app label is name, but we change this in WithPodLabels + testserver.WithPodLabels(firstTestServerLabels), + )). + Install(testserver.Install( + testserver.WithNamespace(namespace), + testserver.WithMesh(mesh), + testserver.WithName("test-server-second"), + testserver.WithEchoArgs("echo", "--instance", "test-server-second"), + testserver.WithoutService(), + testserver.WithoutWaitingToBeReady(), // WaitForPods assumes that app label is name, but we change this in WithPodLabels + testserver.WithPodLabels(secondTestServerLabels), + )). + Install(testserver.Install( + testserver.WithNamespace(namespace), + testserver.WithName("test-server-third"), + testserver.WithEchoArgs("echo", "--instance", "test-server-third"), + testserver.WithoutService(), + testserver.WithoutWaitingToBeReady(), // WaitForPods assumes that app label is name, but we change this in WithPodLabels + testserver.WithPodLabels(thirdTestServerLabels), + )). + Install(YamlK8sObject(newSvc(firstTestServerLabels))). + Setup(kubernetes.Cluster) + Expect(err).To(Succeed()) + + // remove retries to avoid covering failed request + Expect(DeleteMeshPolicyOrError( + kubernetes.Cluster, + v1alpha1.MeshRetryResourceTypeDescriptor, + fmt.Sprintf("mesh-retry-all-%s", mesh), + )).To(Succeed()) + }) + + E2EAfterAll(func() { + Expect(kubernetes.Cluster.TriggerDeleteNamespace(namespace)).To(Succeed()) + Expect(kubernetes.Cluster.DeleteMesh(mesh)).To(Succeed()) + }) + + doRequest := func() (string, error) { + resp, err := client.CollectEchoResponse( + kubernetes.Cluster, + "demo-client", + "test-server:80", + client.FromKubernetesPod(namespace, "demo-client"), + ) + return resp.Instance, err + } + + It("should gracefully switch to other service", func() { + // given traffic to the first server + Eventually(func(g Gomega) { + instance, err := doRequest() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(instance).To(Equal("test-server-first")) + }, "30s", "1s").Should(Succeed()) + + // and constant traffic in the background + var failedErr error + closeCh := make(chan struct{}) + defer close(closeCh) + go func() { + for { + if channels.IsClosed(closeCh) { + return + } + if _, err := doRequest(); err != nil { + failedErr = err + return + } + // add a slight delay to not overwhelm completely the host running this test and leave more resources to other tests running in parallel. + time.Sleep(50 * time.Millisecond) + } + }() + + // when + err := kubernetes.Cluster.Install(YamlK8sObject(newSvc(secondTestServerLabels))) + + // then traffic shifted + Expect(err).To(Succeed()) + Eventually(func(g Gomega) { + instance, err := doRequest() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(instance).To(Equal("test-server-second")) + }, "30s", "1s").Should(Succeed()) + + // and we did not drop a single request + Expect(failedErr).ToNot(HaveOccurred()) + }) + + It("should switch to the instance of a service that in not in the mesh", func() { + // given + Expect(kubernetes.Cluster.Install(YamlK8sObject(newSvc(firstTestServerLabels)))).To(Succeed()) + Eventually(func(g Gomega) { + instance, err := doRequest() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(instance).To(Equal("test-server-first")) + }, "30s", "1s").Should(Succeed()) + + // when + err := kubernetes.Cluster.Install(YamlK8sObject(newSvc(thirdTestServerLabels))) + + // then + Expect(err).To(Succeed()) + Eventually(func(g Gomega) { + instance, err := doRequest() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(instance).To(Equal("test-server-third")) + }, "30s", "1s").Should(Succeed()) + }) +}