Skip to content

Commit

Permalink
ring: add GetWithOptions method to adjust per call behavior
Browse files Browse the repository at this point in the history
This change adds a new method that accepts 0 or more `Option` instances
that modify the behavior of the call. These options can (currently) be
used to adjust the replication factor for a particular key or use buffers
to avoid excessive allocation.

Part of grafana/mimir#9944
  • Loading branch information
56quarters committed Nov 18, 2024
1 parent a6b453a commit 075bb36
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 5 deletions.
59 changes: 54 additions & 5 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,58 @@ const (
GetBufferSize = 5
)

type Options struct {
ReplicationFactor int
BufDescs []InstanceDesc
BufHosts []string
BufZones []string
}

type Option func(opts *Options)

func WithBuffers(bufDescs []InstanceDesc, bufHosts, bufZones []string) Option {
return func(opts *Options) {
opts.BufDescs = bufDescs
opts.BufHosts = bufHosts
opts.BufZones = bufZones
}
}

func WithReplicationFactor(replication int) Option {
return func(opts *Options) {
opts.ReplicationFactor = replication
}
}

func collectOptions(opts ...Option) *Options {
final := &Options{}
for _, opt := range opts {
opt(final)
}
return final
}

// ReadRing represents the read interface to the ring.
// Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet.
type ReadRing interface {
// Get returns n (or more) instances which form the replicas for the given key.
//
// bufDescs, bufHosts and bufZones are slices to be overwritten for the return value
// to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().
Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error)

// GetWithOptions returns n (or more) instances which form the replicas for the given key
// with 0 or more options to change the behavior of the method call. This method is a superset
// of the functionality of the Get method.
GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error)

// GetAllHealthy returns all healthy instances in the ring, for the given operation.
// This function doesn't check if the quorum is honored, so doesn't fail if the number
// of unhealthy instances is greater than the tolerated max unavailable.
GetAllHealthy(op Operation) (ReplicationSet, error)

// GetReplicationSetForOperation returns all instances where the input operation should be executed.
// The resulting ReplicationSet doesn't necessarily contains all healthy instances
// The resulting ReplicationSet doesn't necessarily contain all healthy instances
// in the ring, but could contain the minimum set of instances required to execute
// the input operation.
GetReplicationSetForOperation(op Operation) (ReplicationSet, error)
Expand Down Expand Up @@ -422,13 +459,21 @@ func (r *Ring) setRingStateFromDesc(ringDesc *Desc, updateMetrics, updateRegiste

// Get returns n (or more) instances which form the replicas for the given key.
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
return r.GetWithOptions(key, op, WithBuffers(bufDescs, bufHosts, bufZones))
}

// GetWithOptions returns n (or more) instances which form the replicas for the given key
// with 0 or more options to change the behavior of the method call.
func (r *Ring) GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) {
options := collectOptions(opts...)

r.mtx.RLock()
defer r.mtx.RUnlock()
if r.ringDesc == nil || len(r.ringTokens) == 0 {
return ReplicationSet{}, ErrEmptyRing
}

instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, bufZones, nil)
instances, err := r.findInstancesForKey(key, op, options.BufDescs, options.BufHosts, options.BufZones, options.ReplicationFactor, nil)
if err != nil {
return ReplicationSet{}, err
}
Expand All @@ -447,9 +492,13 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
// Returns instances for given key and operation. Instances are not filtered through ReplicationStrategy.
// InstanceFilter can ignore uninteresting instances that would otherwise be part of the output, and can also stop search early.
// This function needs to be called with read lock on the ring.
func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) {
func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, replicationFactor int, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) {
if replicationFactor <= 0 || replicationFactor < r.cfg.ReplicationFactor {
replicationFactor = r.cfg.ReplicationFactor
}

var (
n = r.cfg.ReplicationFactor
n = replicationFactor
instances = bufDescs[:0]
start = searchToken(r.ringTokens, key)
iterations = 0
Expand Down Expand Up @@ -1349,7 +1398,7 @@ func (r *Ring) numberOfKeysOwnedByInstance(keys []uint32, op Operation, instance

owned := 0
for _, tok := range keys {
i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, func(foundInstanceID string) (include, keepGoing bool) {
i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, 0, func(foundInstanceID string) (include, keepGoing bool) {
if foundInstanceID == instanceID {
// If we've found our instance, we can stop.
return true, false
Expand Down
5 changes: 5 additions & 0 deletions ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func (r *RingMock) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHos
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) {
args := r.Called(key, op, opts)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetAllHealthy(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
Expand Down

0 comments on commit 075bb36

Please sign in to comment.