Skip to content

Commit

Permalink
Merge pull request #114 from nats-io/rework-cleanup
Browse files Browse the repository at this point in the history
Rework cleanup
  • Loading branch information
wallyqs authored Feb 22, 2023
2 parents 4bd2b10 + c58ebdc commit 6c54a4e
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 29 deletions.
5 changes: 5 additions & 0 deletions cmd/jetstream-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func run() error {
server := flag.String("s", "", "NATS Server URL")
crdConnect := flag.Bool("crd-connect", false, "If true, then NATS connections will be made from CRD config, not global config")
cleanupPeriod := flag.Duration("cleanup-period", 30*time.Second, "Period to run object cleanup")
readOnly := flag.Bool("read-only", false, "Starts the controller without causing changes to the NATS resources")
flag.Parse()

if *version {
Expand Down Expand Up @@ -115,9 +116,13 @@ func run() error {
Namespace: *namespace,
CRDConnect: *crdConnect,
CleanupPeriod: *cleanupPeriod,
ReadOnly: *readOnly,
})

klog.Infof("Starting %s v%s...", os.Args[0], Version)
if *readOnly {
klog.Infof("Running in read-only mode: JetStream state in server will not be changed")
}
go handleSignals(cancel)
return ctrl.Run()
}
Expand Down
10 changes: 7 additions & 3 deletions controllers/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
klog "k8s.io/klog/v2"
)

func (c *Controller) runConsumerQueue() {
Expand Down Expand Up @@ -57,8 +58,10 @@ func (c *Controller) processConsumerObject(cns *apis.Consumer, jsmc jsmClient) (
accServers []string
)
if spec.Account != "" && c.opts.CRDConnect {
// Lookup the account.
acc, err := c.accLister.Accounts(ns).Get(spec.Account)
// Lookup the account using the REST client.
ctx, done := context.WithTimeout(context.Background(), 5*time.Second)
defer done()
acc, err := c.ji.Accounts(ns).Get(ctx, spec.Account, k8smeta.GetOptions{})
if err != nil {
return err
}
Expand Down Expand Up @@ -89,6 +92,7 @@ func (c *Controller) processConsumerObject(cns *apis.Consumer, jsmc jsmClient) (
}
}
}
// FIXME: Add support for UserCredentials for consumer.
}

defer func() {
Expand Down Expand Up @@ -408,7 +412,7 @@ func deleteConsumer(ctx context.Context, c jsmClient, spec apis.ConsumerSpec) (e
}()

if spec.PreventDelete {
fmt.Printf("Consumer %q is configured to preventDelete on stream %q:\n", stream, consumer)
klog.Infof("Consumer %q is configured to preventDelete on stream %q:", stream, consumer)
return nil
}

Expand Down
84 changes: 64 additions & 20 deletions controllers/jetstream/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

k8sapi "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -76,6 +77,7 @@ type Options struct {
Namespace string
CRDConnect bool
CleanupPeriod time.Duration
ReadOnly bool

Recorder record.EventRecorder
}
Expand Down Expand Up @@ -238,7 +240,7 @@ func (c *Controller) Run() error {
return nil
}

func deletedStreams(prev, cur map[string]*apis.Stream) []*apis.Stream {
func selectMissingStreamsFromList(prev, cur map[string]*apis.Stream) []*apis.Stream {
var deleted []*apis.Stream
for name, ps := range prev {
if _, ok := cur[name]; !ok {
Expand All @@ -248,7 +250,7 @@ func deletedStreams(prev, cur map[string]*apis.Stream) []*apis.Stream {
return deleted
}

func streamMap(ss []*apis.Stream) map[string]*apis.Stream {
func streamsMap(ss []*apis.Stream) map[string]*apis.Stream {
m := make(map[string]*apis.Stream)
for _, s := range ss {
m[fmt.Sprintf("%s/%s", s.Namespace, s.Name)] = s
Expand All @@ -257,9 +259,13 @@ func streamMap(ss []*apis.Stream) map[string]*apis.Stream {
}

func (c *Controller) cleanupStreams() error {
if c.opts.ReadOnly {
return nil
}
tick := time.NewTicker(c.opts.CleanupPeriod)
defer tick.Stop()

// Track the Stream CRDs that may have been created.
var prevStreams map[string]*apis.Stream
for {
select {
Expand All @@ -271,23 +277,40 @@ func (c *Controller) cleanupStreams() error {
klog.Infof("failed to list streams for cleanup: %s", err)
continue
}
sm := streamMap(streams)

for _, s := range deletedStreams(prevStreams, sm) {
t := k8smeta.NewTime(time.Now())
s.DeletionTimestamp = &t
if err := c.processStreamObject(s, &realJsmClient{jm: c.jm}); err != nil {
klog.Infof("failed to delete stream %s/%s: %s", s.Namespace, s.Name, err)
continue
sm := streamsMap(streams)
missing := selectMissingStreamsFromList(prevStreams, sm)
for _, s := range missing {
// A stream that we were tracking but that for some reason
// was not part of the latest list shared by informer.
// Need to double check whether the stream is present before
// considering deletion.
klog.Infof("stream %s/%s might be missing, looking it up...", s.Namespace, s.Name)
ctx, done := context.WithTimeout(context.Background(), 10*time.Second)
defer done()
_, err := c.ji.Streams(s.Namespace).Get(ctx, s.Name, k8smeta.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
klog.Infof("stream %s/%s was not found anymore, deleting from JetStream", s.Namespace, s.Name)
t := k8smeta.NewTime(time.Now())
s.DeletionTimestamp = &t
if err := c.processStreamObject(s, &realJsmClient{jm: c.jm}); err != nil && !k8serrors.IsNotFound(err) {
klog.Infof("failed to delete stream %s/%s: %s", s.Namespace, s.Name, err)
continue
}
klog.Infof("deleted stream %s/%s from JetStream", s.Namespace, s.Name)
} else {
klog.Warningf("error looking up stream %s/%s", s.Namespace, s.Name)
}
} else {
klog.Infof("found stream %s/%s, no further action needed", s.Namespace, s.Name)
}
klog.Infof("deleted stream %s/%s", s.Namespace, s.Name)
}
prevStreams = sm
}
}
}

func deletedConsumers(prev, cur map[string]*apis.Consumer) []*apis.Consumer {
func selectMissingConsumersFromList(prev, cur map[string]*apis.Consumer) []*apis.Consumer {
var deleted []*apis.Consumer
for name, ps := range prev {
if _, ok := cur[name]; !ok {
Expand All @@ -306,9 +329,13 @@ func consumerMap(cs []*apis.Consumer) map[string]*apis.Consumer {
}

func (c *Controller) cleanupConsumers() error {
if c.opts.ReadOnly {
return nil
}
tick := time.NewTicker(c.opts.CleanupPeriod)
defer tick.Stop()

// Track consumers that may have been deleted.
var prevConsumers map[string]*apis.Consumer
for {
select {
Expand All @@ -321,15 +348,32 @@ func (c *Controller) cleanupConsumers() error {
continue
}
cm := consumerMap(consumers)

for _, cns := range deletedConsumers(prevConsumers, cm) {
t := k8smeta.NewTime(time.Now())
cns.DeletionTimestamp = &t
if err := c.processConsumerObject(cns, &realJsmClient{jm: c.jm}); err != nil {
klog.Infof("failed to delete consumer %s/%s: %s", cns.Namespace, cns.Name, err)
continue
missing := selectMissingConsumersFromList(prevConsumers, cm)
for _, cns := range missing {
// A consumer that we were tracking but that for some reason
// was not part of the latest list shared by informer.
// Need to double check whether the consumer is present before
// considering deletion.
klog.Infof("consumer %s/%s might be missing, looking it up...", cns.Namespace, cns.Name)
ctx, done := context.WithTimeout(context.Background(), 10*time.Second)
defer done()
_, err := c.ji.Consumers(cns.Namespace).Get(ctx, cns.Name, k8smeta.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
klog.Infof("consumer %s/%s was not found anymore, deleting from JetStream", cns.Namespace, cns.Name)
t := k8smeta.NewTime(time.Now())
cns.DeletionTimestamp = &t
if err := c.processConsumerObject(cns, &realJsmClient{jm: c.jm}); err != nil && !k8serrors.IsNotFound(err) {
klog.Infof("failed to delete consumer %s/%s: %s", cns.Namespace, cns.Name, err)
continue
}
klog.Infof("deleted consumer %s/%s from JetStream", cns.Namespace, cns.Name)
} else {
klog.Warningf("error looking up consumer %s/%s", cns.Namespace, cns.Name)
}
} else {
klog.Infof("found consumer %s/%s, no further action needed", cns.Namespace, cns.Name)
}
klog.Infof("deleted consumer %s/%s", cns.Namespace, cns.Name)
}
prevConsumers = cm
}
Expand Down
20 changes: 14 additions & 6 deletions controllers/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
klog "k8s.io/klog/v2"
)

func (c *Controller) runStreamQueue() {
Expand Down Expand Up @@ -61,6 +62,7 @@ func (c *Controller) processStreamObject(str *apis.Stream, jsmc jsmClient) (err
spec := str.Spec
ifc := c.ji.Streams(str.Namespace)
ns := str.Namespace
readOnly := c.opts.ReadOnly

var (
remoteClientCert string
Expand All @@ -71,9 +73,10 @@ func (c *Controller) processStreamObject(str *apis.Stream, jsmc jsmClient) (err
accUserCreds string
)
if spec.Account != "" && c.opts.CRDConnect {
// Lookup the account.
var err error
acc, err = c.accLister.Accounts(ns).Get(spec.Account)
// Lookup the account using the REST client.
ctx, done := context.WithTimeout(context.Background(), 5*time.Second)
defer done()
acc, err = c.ji.Accounts(ns).Get(ctx, spec.Account, k8smeta.GetOptions{})
if err != nil {
return err
}
Expand Down Expand Up @@ -216,6 +219,10 @@ func (c *Controller) processStreamObject(str *apis.Stream, jsmc jsmClient) (err

switch {
case createOK:
if readOnly {
c.normalEvent(str, "SkipCreate", fmt.Sprintf("Skip creating stream %q", spec.Name))
return nil
}
c.normalEvent(str, "Creating", fmt.Sprintf("Creating stream %q", spec.Name))
if err := natsClientUtil(createStream); err != nil {
return err
Expand All @@ -226,7 +233,7 @@ func (c *Controller) processStreamObject(str *apis.Stream, jsmc jsmClient) (err
}
c.normalEvent(str, "Created", fmt.Sprintf("Created stream %q", spec.Name))
case updateOK:
if str.Spec.PreventUpdate {
if str.Spec.PreventUpdate || readOnly {
c.normalEvent(str, "SkipUpdate", fmt.Sprintf("Skip updating stream %q", spec.Name))
if _, err := setStreamOK(c.ctx, str, ifc); err != nil {
return err
Expand All @@ -244,7 +251,7 @@ func (c *Controller) processStreamObject(str *apis.Stream, jsmc jsmClient) (err
c.normalEvent(str, "Updated", fmt.Sprintf("Updated stream %q", spec.Name))
return nil
case deleteOK:
if str.Spec.PreventDelete {
if str.Spec.PreventDelete || readOnly {
c.normalEvent(str, "SkipDelete", fmt.Sprintf("Skip deleting stream %q", spec.Name))
if _, err := setStreamOK(c.ctx, str, ifc); err != nil {
return err
Expand Down Expand Up @@ -434,6 +441,7 @@ func updateStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e

config := jsmapi.StreamConfig{
Name: spec.Name,
Description: spec.Description,
Retention: retention,
Subjects: spec.Subjects,
MaxConsumers: spec.MaxConsumers,
Expand Down Expand Up @@ -486,7 +494,7 @@ func deleteStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e
}()

if spec.PreventDelete {
fmt.Printf("Stream %q is configured to preventDelete:\n", name)
klog.Infof("Stream %q is configured to preventDelete:\n", name)
return nil
}

Expand Down

0 comments on commit 6c54a4e

Please sign in to comment.