Skip to content

Commit

Permalink
feat: support Mixed Retry which integrating Failure Retry and Backup …
Browse files Browse the repository at this point in the history
…Request
  • Loading branch information
YangruiEmma committed Aug 27, 2024
1 parent d967b72 commit 6005508
Show file tree
Hide file tree
Showing 16 changed files with 2,153 additions and 1,057 deletions.
29 changes: 23 additions & 6 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,13 @@ func WithFailureRetry(p *retry.FailurePolicy) Option {
if p == nil {
return
}
di.Push(fmt.Sprintf("WithFailureRetry(%+v)", *p))
di.Push(fmt.Sprintf("WithFailureRetry(%+v)", p))
if o.RetryMethodPolicies == nil {
o.RetryMethodPolicies = make(map[string]retry.Policy)
}
if o.RetryMethodPolicies[retry.Wildcard].BackupPolicy != nil {
panic("BackupPolicy has been setup, cannot support Failure Retry at same time")
if o.RetryMethodPolicies[retry.Wildcard].MixedPolicy != nil ||
o.RetryMethodPolicies[retry.Wildcard].BackupPolicy != nil {
panic("MixedPolicy or BackupPolicy has been setup, cannot support Failure Retry at same time")
}
o.RetryMethodPolicies[retry.Wildcard] = retry.BuildFailurePolicy(p)
}}
Expand All @@ -342,17 +343,33 @@ func WithBackupRequest(p *retry.BackupPolicy) Option {
if p == nil {
return
}
di.Push(fmt.Sprintf("WithBackupRequest(%+v)", *p))
di.Push(fmt.Sprintf("WithBackupRequest(%+v)", p))
if o.RetryMethodPolicies == nil {
o.RetryMethodPolicies = make(map[string]retry.Policy)
}
if o.RetryMethodPolicies[retry.Wildcard].FailurePolicy != nil {
panic("BackupPolicy has been setup, cannot support Failure Retry at same time")
if o.RetryMethodPolicies[retry.Wildcard].MixedPolicy != nil ||
o.RetryMethodPolicies[retry.Wildcard].FailurePolicy != nil {
panic("MixedPolicy or BackupPolicy has been setup, cannot support Failure Retry at same time")
}
o.RetryMethodPolicies[retry.Wildcard] = retry.BuildBackupRequest(p)
}}
}

// WithMixedRetry sets the mixed retry policy for client, it will take effect for all methods.
func WithMixedRetry(p *retry.MixedPolicy) Option {
return Option{F: func(o *client.Options, di *utils.Slice) {
if p == nil {
return
}
di.Push(fmt.Sprintf("WithMixedRetry(%+v)", p))
if o.RetryMethodPolicies == nil {
o.RetryMethodPolicies = make(map[string]retry.Policy)
}
// no need to check if BackupPolicy or FailurePolicy are been setup, just let mixed retry replace it
o.RetryMethodPolicies[retry.Wildcard] = retry.BuildMixedPolicy(p)
}}
}

// WithRetryMethodPolicies sets the retry policy for method.
// The priority is higher than WithFailureRetry and WithBackupRequest. Only the methods which are not included by
// this config will use the policy that is configured by WithFailureRetry or WithBackupRequest .
Expand Down
121 changes: 82 additions & 39 deletions client/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package client
import (
"context"
"crypto/tls"
"errors"
"fmt"
"reflect"
"testing"
Expand Down Expand Up @@ -53,46 +54,76 @@ import (
)

func TestRetryOptionDebugInfo(t *testing.T) {
fp := retry.NewFailurePolicy()
fp.WithDDLStop()
expectPolicyStr := "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:false DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:none CfgItems:map[]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr := fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithFixedBackOff(10)
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:false DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:fixed CfgItems:map[fix_ms:10]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithRandomBackOff(10, 20)
fp.DisableChainRetryStop()
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithRetrySameNode()
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:true ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithSpecifiedResultRetry(&retry.ShouldResultRetry{ErrorRetry: func(err error, ri rpcinfo.RPCInfo) bool {
return false
}})
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:true ShouldResultRetry:{ErrorRetry:true, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)
t.Run("FailurePolicy", func(t *testing.T) {
fp := retry.NewFailurePolicy()
fp.WithDDLStop()
expectPolicyStr := "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:false DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:none CfgItems:map[]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
opt := WithFailureRetry(fp)
err := checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)

fp.WithFixedBackOff(10)
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:false DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:fixed CfgItems:map[fix_ms:10]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
opt = WithFailureRetry(fp)
err = checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)

fp.WithRandomBackOff(10, 20)
fp.DisableChainRetryStop()
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
opt = WithFailureRetry(fp)
err = checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)

fp.WithRetrySameNode()
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:true ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
opt = WithFailureRetry(fp)
err = checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)

fp.WithSpecifiedResultRetry(&retry.ShouldResultRetry{ErrorRetryWithCtx: func(ctx context.Context, err error, ri rpcinfo.RPCInfo) bool {
return false
}})
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:true ShouldResultRetry:{ErrorRetry:true, RespRetry:false}})"
opt = WithFailureRetry(fp)
err = checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)
})

bp := retry.NewBackupPolicy(20)
expectPolicyStr = "WithBackupRequest({RetryDelayMS:20 StopPolicy:{MaxRetryTimes:1 MaxDurationMS:0 DisableChainStop:false " +
"DDLStop:false CBPolicy:{ErrorRate:0.1}} RetrySameNode:false})"
policyStr = fmt.Sprintf("WithBackupRequest(%+v)", bp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)
WithBackupRequest(bp)
t.Run("FailurePolicy", func(t *testing.T) {
bp := retry.NewBackupPolicy(20)
expectPolicyStr := "WithBackupRequest({RetryDelayMS:20 StopPolicy:{MaxRetryTimes:1 MaxDurationMS:0 DisableChainStop:false " +
"DDLStop:false CBPolicy:{ErrorRate:0.1}} RetrySameNode:false})"
opt := WithBackupRequest(bp)
err := checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)
})

t.Run("MixedPolicy", func(t *testing.T) {
mp := retry.NewMixedPolicy(100)
mp.WithDDLStop()
expectPolicyStr := "WithMixedRetry({RetryDelayMS:100 StopPolicy:{MaxRetryTimes:1 MaxDurationMS:0 DisableChainStop:false " +
"DDLStop:true CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:none CfgItems:map[]} RetrySameNode:false " +
"ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
opt := WithMixedRetry(mp)
err := checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)

mp.WithSpecifiedResultRetry(&retry.ShouldResultRetry{ErrorRetryWithCtx: func(ctx context.Context, err error, ri rpcinfo.RPCInfo) bool {
return false
}})
expectPolicyStr = "WithMixedRetry({RetryDelayMS:100 StopPolicy:{MaxRetryTimes:1 MaxDurationMS:0 DisableChainStop:false " +
"DDLStop:true CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:none CfgItems:map[]} RetrySameNode:false " +
"ShouldResultRetry:{ErrorRetry:true, RespRetry:false}})"
opt = WithMixedRetry(mp)
err = checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)
})
}

func TestRetryOption(t *testing.T) {
Expand Down Expand Up @@ -708,3 +739,15 @@ func TestWithGRPCTLSConfig(t *testing.T) {
opts := client.NewOptions([]client.Option{WithGRPCTLSConfig(cfg)})
test.Assert(t, opts.GRPCConnectOpts != nil)
}

func checkOneOptionDebugInfo(t *testing.T, opt Option, expectStr string) error {
o := &Options{}
o.Apply([]Option{opt})
if len(o.DebugInfo) != 1 {
return errors.New("length of DebugInfo is unexpected")
}
if o.DebugInfo[0] != expectStr {
return fmt.Errorf("DebugInfo not match with expect str:\n debugInfo=%s", o.DebugInfo[0])
}
return nil
}
39 changes: 37 additions & 2 deletions pkg/retry/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"fmt"
)

const maxBackupRetryTimes = 2
const (
maxBackupRetryTimes = 2
defaultBackupRetryTimes = 1
)

// NewBackupPolicy init backup request policy
// the param delayMS is suggested to set as TP99
Expand All @@ -31,7 +34,7 @@ func NewBackupPolicy(delayMS uint32) *BackupPolicy {
p := &BackupPolicy{
RetryDelayMS: delayMS,
StopPolicy: StopPolicy{
MaxRetryTimes: 1,
MaxRetryTimes: defaultBackupRetryTimes,
DisableChainStop: false,
CBPolicy: CBPolicy{
ErrorRate: defaultCBErrRate,
Expand Down Expand Up @@ -71,3 +74,35 @@ func (p *BackupPolicy) WithRetrySameNode() {
func (p *BackupPolicy) String() string {
return fmt.Sprintf("{RetryDelayMS:%+v StopPolicy:%+v RetrySameNode:%+v}", p.RetryDelayMS, p.StopPolicy, p.RetrySameNode)
}

// Equals to check if BackupPolicy is equal
func (p *BackupPolicy) Equals(np *BackupPolicy) bool {
if p == nil {
return np == nil
}
if np == nil {
return false
}
if p.RetryDelayMS != np.RetryDelayMS {
return false
}
if p.StopPolicy != np.StopPolicy {
return false
}
if p.RetrySameNode != np.RetrySameNode {
return false
}

return true
}

func (p *BackupPolicy) DeepCopy() *BackupPolicy {
if p == nil {
return nil
}
return &BackupPolicy{
RetryDelayMS: p.RetryDelayMS,
StopPolicy: p.StopPolicy, // not a pointer, will copy the value here
RetrySameNode: p.RetrySameNode,
}
}
44 changes: 17 additions & 27 deletions pkg/retry/backup_retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/gofunc"
"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/utils"
)
Expand All @@ -48,16 +47,17 @@ func newBackupRetryer(policy Policy, cbC *cbContainer) (Retryer, error) {

type backupRetryer struct {
enable bool
retryDelay time.Duration
policy *BackupPolicy
cbContainer *cbContainer
retryDelay time.Duration
sync.RWMutex
errMsg string
}

type resultWrapper struct {
ri rpcinfo.RPCInfo
err error
ri rpcinfo.RPCInfo
resp interface{}
err error
}

// ShouldRetry implements the Retryer interface.
Expand Down Expand Up @@ -92,12 +92,13 @@ func (r *backupRetryer) Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rpc
retryTimes := r.policy.StopPolicy.MaxRetryTimes
retryDelay := r.retryDelay
r.RUnlock()

var callTimes int32 = 0
var callCosts utils.StringBuilder
callCosts.RawStringBuilder().Grow(32)
var recordCostDoing int32 = 0
var abort int32 = 0
finishedCount := 0
finishedErrCount := 0
// notice: buff num of chan is very important here, it cannot less than call times, or the below chan receive will block
done := make(chan *resultWrapper, retryTimes+1)
cbKey, _ := r.cbContainer.cbCtl.GetKey(ctx, req)
Expand Down Expand Up @@ -126,7 +127,7 @@ func (r *backupRetryer) Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rpc
if panicInfo := recover(); panicInfo != nil {
e = panicToErr(ctx, panicInfo, firstRI)
}
done <- &resultWrapper{cRI, e}
done <- &resultWrapper{ri: cRI, err: e}
}()
ct := atomic.AddInt32(&callTimes, 1)
callStart := time.Now()
Expand All @@ -152,7 +153,7 @@ func (r *backupRetryer) Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rpc
// There will be only one request (goroutine) pass the `checkRPCState`, others will skip decoding
// and return `ErrRPCFinish`, to avoid concurrent write to response and save the cost of decoding.
// We can safely ignore this error and wait for the response of the passed goroutine.
if finishedCount++; finishedCount >= retryTimes+1 {
if finishedErrCount++; finishedErrCount >= retryTimes+1 {
// But if all requests return this error, it must be a bug, preventive panic to avoid dead loop
panic(errUnexpectedFinish)
}
Expand All @@ -178,29 +179,21 @@ func (r *backupRetryer) UpdatePolicy(rp Policy) (err error) {
r.Unlock()
return nil
}
var errMsg string
if rp.BackupPolicy == nil || rp.Type != BackupType {
errMsg = "BackupPolicy is nil or retry type not match, cannot do update in backupRetryer"
err = errors.New(errMsg)
err = errors.New("BackupPolicy is nil or retry type not match, cannot do update in backupRetryer")
}
if errMsg == "" && (rp.BackupPolicy.RetryDelayMS == 0 || rp.BackupPolicy.StopPolicy.MaxRetryTimes < 0 ||
rp.BackupPolicy.StopPolicy.MaxRetryTimes > maxBackupRetryTimes) {
errMsg = "invalid backup request delay duration or retryTimes"
err = errors.New(errMsg)
if err == nil && rp.BackupPolicy.RetryDelayMS == 0 {
err = errors.New("invalid retry delay duration in backupRetryer")
}
if errMsg == "" {
if e := checkCBErrorRate(&rp.BackupPolicy.StopPolicy.CBPolicy); e != nil {
rp.BackupPolicy.StopPolicy.CBPolicy.ErrorRate = defaultCBErrRate
errMsg = fmt.Sprintf("backupRetryer %s, use default %0.2f", e.Error(), defaultCBErrRate)
klog.Warnf(errMsg)
}
if err == nil {
err = checkStopPolicy(&rp.BackupPolicy.StopPolicy, maxBackupRetryTimes, r)
}

r.Lock()
defer r.Unlock()
r.enable = rp.Enable
if err != nil {
r.errMsg = errMsg
r.errMsg = err.Error()
return err
}
r.policy = rp.BackupPolicy
Expand All @@ -220,14 +213,11 @@ func (r *backupRetryer) AppendErrMsgIfNeeded(ctx context.Context, err error, ri
func (r *backupRetryer) Dump() map[string]interface{} {
r.RLock()
defer r.RUnlock()
dm := map[string]interface{}{"enable": r.enable, "backup_request": r.policy}
if r.errMsg != "" {
return map[string]interface{}{
"enable": r.enable,
"backupRequest": r.policy,
"errMsg": r.errMsg,
}
dm["err_msg"] = r.errMsg
}
return map[string]interface{}{"enable": r.enable, "backupRequest": r.policy}
return dm
}

// Type implements the Retryer interface.
Expand Down
Loading

0 comments on commit 6005508

Please sign in to comment.