Skip to content

Commit

Permalink
Merge pull request #1178 from ripienaar/priority_groups
Browse files Browse the repository at this point in the history
initial priority groups support
  • Loading branch information
ripienaar authored Nov 7, 2024
2 parents 2b10f04 + a03462f commit c6bb6c5
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 70 deletions.
167 changes: 143 additions & 24 deletions cli/consumer_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,23 +101,27 @@ type consumerCmd struct {
metadata map[string]string
pauseUntil string

dryRun bool
mgr *jsm.Manager
nc *nats.Conn
nak bool
fPull bool
fPush bool
fBound bool
fWaiting int
fAckPending int
fPending uint64
fIdle time.Duration
fCreated time.Duration
fReplicas uint
fInvert bool
fExpression string
fLeader string
interactive bool
dryRun bool
mgr *jsm.Manager
nc *nats.Conn
nak bool
fPull bool
fPush bool
fBound bool
fWaiting int
fAckPending int
fPending uint64
fIdle time.Duration
fCreated time.Duration
fReplicas uint
fInvert bool
fExpression string
fLeader string
interactive bool
pinnedGroups []string
pinnedTTL time.Duration
overflowGroups []string
groupName string
}

func configureConsumerCommand(app commandHost) {
Expand Down Expand Up @@ -171,6 +175,9 @@ func configureConsumerCommand(app commandHost) {
f.Flag("metadata", "Adds metadata to the consumer").PlaceHolder("META").IsSetByUser(&c.metadataIsSet).StringMapVar(&c.metadata)
if !edit {
f.Flag("pause", fmt.Sprintf("Pause the consumer for a duration after start or until a specific timestamp (eg %s)", time.Now().Format(time.DateTime))).StringVar(&c.pauseUntil)
f.Flag("pinned-groups", "Create a Pinned Client consumer based on these groups").StringsVar(&c.pinnedGroups)
f.Flag("pinned-ttl", "The time to allow for a client to pull before losing the pinned status").DurationVar(&c.pinnedTTL)
f.Flag("overflow-groups", "Create a Overflow consumer based on these groups").StringsVar(&c.overflowGroups)
}
}

Expand Down Expand Up @@ -267,6 +274,12 @@ func configureConsumerCommand(app commandHost) {
conPause.Arg("until", fmt.Sprintf("Pause until a specific time (eg %s)", time.Now().UTC().Format(time.DateTime))).PlaceHolder("TIME").StringVar(&c.pauseUntil)
conPause.Flag("force", "Force pause without prompting").Short('f').UnNegatableBoolVar(&c.force)

conUnpin := cons.Command("unpin", "Unpin the current Pinned Client from a Priority Group").Action(c.unpinAction)
conUnpin.Arg("stream", "Stream name").StringVar(&c.stream)
conUnpin.Arg("consumer", "Consumer name").StringVar(&c.consumer)
conUnpin.Arg("group", "The group to unpin").StringVar(&c.groupName)
conUnpin.Flag("force", "Force unpin without prompting").Short('f').UnNegatableBoolVar(&c.force)

conResume := cons.Command("resume", "Resume a paused consumer").Action(c.resumeAction)
conResume.Arg("stream", "Stream name").StringVar(&c.stream)
conResume.Arg("consumer", "Consumer name").StringVar(&c.consumer)
Expand All @@ -288,6 +301,61 @@ func init() {
registerCommand("consumer", 4, configureConsumerCommand)
}

func (c *consumerCmd) unpinAction(_ *fisk.ParseContext) error {
c.connectAndSetup(true, true)

if !c.selectedConsumer.IsPinnedClientPriority() {
return fmt.Errorf("consumer is not a pinned priority consumer")
}

nfo, err := c.selectedConsumer.State()
if err != nil {
return err
}

matched := map[string]api.PriorityGroupState{}
var groups []string
for _, v := range nfo.PriorityGroups {
if v.PinnedClientID != "" {
matched[v.Group] = v
groups = append(groups, v.Group)
}
}

if len(matched) == 0 {
return fmt.Errorf("no priority groups have pinned clients")
}

if c.groupName == "" {
err = iu.AskOne(&survey.Select{
Message: "Select a Group",
Options: groups,
PageSize: iu.SelectPageSize(len(groups)),
}, &c.groupName, survey.WithValidator(survey.Required))
if err != nil {
return err
}
}

if !c.force {
ok, err := askConfirmation(fmt.Sprintf("Really unpin client from group %s > %s > %s", c.stream, c.consumer, c.groupName), false)
fisk.FatalIfError(err, "could not obtain confirmation")

if !ok {
return nil
}
}

err = c.selectedConsumer.Unpin(c.groupName)
if err != nil {
return err
}

fmt.Printf("Unpinned client %s from Priority Group %s > %s > %s\n", matched[c.groupName].PinnedClientID, c.stream, c.consumer, c.groupName)

return nil
}

func (c *consumerCmd) findAction(_ *fisk.ParseContext) error {
var err error
var stream *jsm.Stream
Expand Down Expand Up @@ -755,6 +823,11 @@ func (c *consumerCmd) editAction(pc *fisk.ParseContext) error {
}
}

err = c.checkConfigLevel(ncfg)
if err != nil {
return err
}

cons, err := c.mgr.NewConsumerFromDefault(c.stream, *ncfg)
if err != nil {
return err
Expand Down Expand Up @@ -996,6 +1069,11 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo
} else {
cols.AddRowIf("Paused Until Deadline", fmt.Sprintf("%s (passed)", f(config.PauseUntil)), !config.PauseUntil.IsZero())
}
if config.PriorityPolicy != api.PriorityNone {
cols.AddRow("Priority Policy", config.PriorityPolicy)
cols.AddRow("Priority Groups", config.PriorityGroups)
cols.AddRowIf("Pinned TTL", config.PinnedTTL, config.PriorityPolicy == api.PriorityPinnedClient)
}

meta := iu.RemoveReservedMetadata(config.Metadata)
if len(meta) > 0 {
Expand Down Expand Up @@ -1068,6 +1146,19 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo
cols.AddRowf("Paused Until", "%s (%s remaining)", f(state.TimeStamp.Add(state.PauseRemaining)), state.PauseRemaining.Round(time.Second))
}

if len(state.PriorityGroups) > 0 && config.PriorityPolicy == api.PriorityPinnedClient {
groups := map[string]string{}
for _, v := range state.PriorityGroups {
msg := "No client"
if v.PinnedClientID != "" {
msg = fmt.Sprintf("pinned %s at %s", v.PinnedClientID, f(v.PinnedTS))
}

groups[v.Group] = msg
}
cols.AddMapStringsAsValue("Priority Groups", groups)
}

cols.Frender(os.Stdout)
}

Expand Down Expand Up @@ -1651,6 +1742,18 @@ func (c *consumerCmd) prepareConfig() (cfg *api.ConsumerConfig, err error) {
}
}

switch {
case len(c.pinnedGroups) > 0 && len(c.overflowGroups) > 0:
return nil, fmt.Errorf("setting both overflow and pinned groups are not supported")
case len(c.pinnedGroups) > 0:
cfg.PriorityPolicy = api.PriorityPinnedClient
cfg.PriorityGroups = c.pinnedGroups
cfg.PinnedTTL = c.pinnedTTL
case len(c.overflowGroups) > 0:
cfg.PriorityPolicy = api.PriorityOverflow
cfg.PriorityGroups = c.pinnedGroups
}

cfg.Metadata = iu.RemoveReservedMetadata(cfg.Metadata)

return cfg, nil
Expand Down Expand Up @@ -1679,7 +1782,7 @@ func (c *consumerCmd) parsePauseUntil(until string) (time.Time, error) {
func (c *consumerCmd) resumeAction(_ *fisk.ParseContext) error {
c.connectAndSetup(true, true)

err := iu.RequireAPILevel(c.mgr, 1, "resuming consumers requires NATS Server 2.11")
err := iu.RequireAPILevel(c.mgr, 1, "resuming Consumers requires NATS Server 2.11")
if err != nil {
return err
}
Expand Down Expand Up @@ -1713,7 +1816,7 @@ func (c *consumerCmd) resumeAction(_ *fisk.ParseContext) error {
func (c *consumerCmd) pauseAction(_ *fisk.ParseContext) error {
c.connectAndSetup(true, true)

err := iu.RequireAPILevel(c.mgr, 1, "pausing consumers requires NATS Server 2.11")
err := iu.RequireAPILevel(c.mgr, 1, "pausing Consumers requires NATS Server 2.11")
if err != nil {
return err
}
Expand Down Expand Up @@ -1866,11 +1969,9 @@ func (c *consumerCmd) createAction(pc *fisk.ParseContext) (err error) {

c.connectAndSetup(true, false)

if !cfg.PauseUntil.IsZero() {
err := iu.RequireAPILevel(c.mgr, 1, "pausing consumers requires NATS Server 2.11")
if err != nil {
return err
}
err = c.checkConfigLevel(cfg)
if err != nil {
return err
}

created, err := c.mgr.NewConsumerFromDefault(c.stream, *cfg)
Expand All @@ -1883,6 +1984,24 @@ func (c *consumerCmd) createAction(pc *fisk.ParseContext) (err error) {
return nil
}

func (c *consumerCmd) checkConfigLevel(cfg *api.ConsumerConfig) error {
if !cfg.PauseUntil.IsZero() {
err := iu.RequireAPILevel(c.mgr, 1, "pausing consumers requires NATS Server 2.11")
if err != nil {
return err
}
}

if len(cfg.PriorityGroups) > 0 || cfg.PriorityPolicy != api.PriorityNone {
err := iu.RequireAPILevel(c.mgr, 1, "Consumer Groups requires NATS Server 2.11")
if err != nil {
return err
}
}

return nil
}

func (c *consumerCmd) getNextMsgDirect(stream string, consumer string) error {
req := &api.JSApiConsumerGetNextRequest{Batch: 1, Expires: opts().Timeout}

Expand Down
24 changes: 12 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@ go 1.22.0
require (
github.com/AlecAivazis/survey/v2 v2.3.7
github.com/HdrHistogram/hdrhistogram-go v1.1.2
github.com/choria-io/fisk v0.6.2
github.com/choria-io/scaffold v0.0.2-0.20240516112801-fc127c79a1df
github.com/choria-io/fisk v0.6.4
github.com/choria-io/scaffold v0.0.2
github.com/dustin/go-humanize v1.0.1
github.com/emicklei/dot v1.6.2
github.com/expr-lang/expr v1.16.9
github.com/fatih/color v1.17.0
github.com/fatih/color v1.18.0
github.com/ghodss/yaml v1.0.0
github.com/google/go-cmp v0.6.0
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/gosuri/uiprogress v0.0.1
github.com/guptarohit/asciigraph v0.7.2
github.com/guptarohit/asciigraph v0.7.3
github.com/jedib0t/go-pretty/v6 v6.6.1
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/klauspost/compress v1.17.11
github.com/mattn/go-isatty v0.0.20
github.com/nats-io/jsm.go v0.1.1-0.20241031085745-33958a03bf6d
github.com/nats-io/jsm.go v0.1.1-0.20241107105049-59758090235c
github.com/nats-io/jwt/v2 v2.7.2
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241030181516-1ee2b8a11af8
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241107032117-dd0bedda7b6e
github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
Expand All @@ -31,22 +31,22 @@ require (
github.com/synadia-io/jwt-auth-builder.go v0.0.0-20240829124321-43722a8ce3ce
github.com/tylertreat/hdrhistogram-writer v0.0.0-20210816161836-2e440612a39f
golang.org/x/crypto v0.28.0
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
golang.org/x/term v0.25.0
gopkg.in/gizak/termui.v1 v1.0.0-20151021151108-e62b5929642a
gopkg.in/yaml.v3 v3.0.1
)

require (
dario.cat/mergo v1.0.0 // indirect
dario.cat/mergo v1.0.1 // indirect
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver/v3 v3.2.1 // indirect
github.com/Masterminds/semver/v3 v3.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/google/go-tpm v0.9.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gosuri/uilive v0.0.4 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/huandu/xstrings v1.5.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
Expand All @@ -55,14 +55,14 @@ require (
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nsc/v2 v2.8.6-0.20231220104935-3f89317df670 // indirect
github.com/nats-io/nsc/v2 v2.8.6 // indirect
github.com/nsf/termbox-go v1.1.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.60.1 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/cast v1.7.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
Expand Down
Loading

0 comments on commit c6bb6c5

Please sign in to comment.