Skip to content

Commit

Permalink
feat: support mixed retry
Browse files Browse the repository at this point in the history
  • Loading branch information
YangruiEmma committed Aug 25, 2024
1 parent 77727f7 commit e735af6
Show file tree
Hide file tree
Showing 10 changed files with 486 additions and 123 deletions.
25 changes: 21 additions & 4 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,9 @@ func WithFailureRetry(p *retry.FailurePolicy) Option {
if o.RetryMethodPolicies == nil {
o.RetryMethodPolicies = make(map[string]retry.Policy)
}
if o.RetryMethodPolicies[retry.Wildcard].BackupPolicy != nil {
panic("BackupPolicy has been setup, cannot support Failure Retry at same time")
if o.RetryMethodPolicies[retry.Wildcard].MixedPolicy != nil ||
o.RetryMethodPolicies[retry.Wildcard].BackupPolicy != nil {
panic("MixedPolicy or BackupPolicy has been setup, cannot support Failure Retry at same time")
}
o.RetryMethodPolicies[retry.Wildcard] = retry.BuildFailurePolicy(p)
}}
Expand All @@ -346,13 +347,29 @@ func WithBackupRequest(p *retry.BackupPolicy) Option {
if o.RetryMethodPolicies == nil {
o.RetryMethodPolicies = make(map[string]retry.Policy)
}
if o.RetryMethodPolicies[retry.Wildcard].FailurePolicy != nil {
panic("BackupPolicy has been setup, cannot support Failure Retry at same time")
if o.RetryMethodPolicies[retry.Wildcard].MixedPolicy != nil ||
o.RetryMethodPolicies[retry.Wildcard].FailurePolicy != nil {
panic("MixedPolicy or BackupPolicy has been setup, cannot support Failure Retry at same time")
}
o.RetryMethodPolicies[retry.Wildcard] = retry.BuildBackupRequest(p)
}}
}

// WithMixedRetry sets the mixed retry policy for client, it will take effect for all methods.
func WithMixedRetry(p *retry.MixedPolicy) Option {
return Option{F: func(o *client.Options, di *utils.Slice) {
if p == nil {
return
}
di.Push(fmt.Sprintf("WithMixedRetry(%+v)", *p))
if o.RetryMethodPolicies == nil {
o.RetryMethodPolicies = make(map[string]retry.Policy)
}
// no need to check if BackupPolicy or FailurePolicy are been setup, just let mixed retry replace it
o.RetryMethodPolicies[retry.Wildcard] = retry.BuildMixedPolicy(p)
}}
}

// WithRetryMethodPolicies sets the retry policy for method.
// The priority is higher than WithFailureRetry and WithBackupRequest. Only the methods which are not included by
// this config will use the policy that is configured by WithFailureRetry or WithBackupRequest .
Expand Down
40 changes: 15 additions & 25 deletions pkg/retry/backup_retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/gofunc"
"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/utils"
)
Expand All @@ -48,16 +47,17 @@ func newBackupRetryer(policy Policy, cbC *cbContainer) (Retryer, error) {

type backupRetryer struct {
enable bool
retryDelay time.Duration
policy *BackupPolicy
cbContainer *cbContainer
retryDelay time.Duration
sync.RWMutex
errMsg string
}

type resultWrapper struct {
ri rpcinfo.RPCInfo
err error
ri rpcinfo.RPCInfo
resp interface{}
err error
}

// ShouldRetry implements the Retryer interface.
Expand Down Expand Up @@ -92,6 +92,7 @@ func (r *backupRetryer) Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rpc
retryTimes := r.policy.StopPolicy.MaxRetryTimes
retryDelay := r.retryDelay
r.RUnlock()

var callTimes int32 = 0
var callCosts utils.StringBuilder
callCosts.RawStringBuilder().Grow(32)
Expand Down Expand Up @@ -126,7 +127,7 @@ func (r *backupRetryer) Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rpc
if panicInfo := recover(); panicInfo != nil {
e = panicToErr(ctx, panicInfo, firstRI)
}
done <- &resultWrapper{cRI, e}
done <- &resultWrapper{ri: cRI, err: e}
}()
ct := atomic.AddInt32(&callTimes, 1)
callStart := time.Now()
Expand Down Expand Up @@ -178,29 +179,21 @@ func (r *backupRetryer) UpdatePolicy(rp Policy) (err error) {
r.Unlock()
return nil
}
var errMsg string
if rp.BackupPolicy == nil || rp.Type != BackupType {
errMsg = "BackupPolicy is nil or retry type not match, cannot do update in backupRetryer"
err = errors.New(errMsg)
err = errors.New("BackupPolicy is nil or retry type not match, cannot do update in backupRetryer")
}
if errMsg == "" && (rp.BackupPolicy.RetryDelayMS == 0 || rp.BackupPolicy.StopPolicy.MaxRetryTimes < 0 ||
rp.BackupPolicy.StopPolicy.MaxRetryTimes > maxBackupRetryTimes) {
errMsg = "invalid backup request delay duration or retryTimes"
err = errors.New(errMsg)
if err == nil && rp.BackupPolicy.RetryDelayMS == 0 {
err = errors.New("invalid retry delay duration in backupRetryer")
}
if errMsg == "" {
if e := checkCBErrorRate(&rp.BackupPolicy.StopPolicy.CBPolicy); e != nil {
rp.BackupPolicy.StopPolicy.CBPolicy.ErrorRate = defaultCBErrRate
errMsg = fmt.Sprintf("backupRetryer %s, use default %0.2f", e.Error(), defaultCBErrRate)
klog.Warnf(errMsg)
}
if err == nil {
err = checkStopPolicy(&rp.BackupPolicy.StopPolicy, maxBackupRetryTimes, r)
}

r.Lock()
defer r.Unlock()
r.enable = rp.Enable
if err != nil {
r.errMsg = errMsg
r.errMsg = err.Error()
return err
}
r.policy = rp.BackupPolicy
Expand All @@ -220,14 +213,11 @@ func (r *backupRetryer) AppendErrMsgIfNeeded(ctx context.Context, err error, ri
func (r *backupRetryer) Dump() map[string]interface{} {
r.RLock()
defer r.RUnlock()
dm := map[string]interface{}{"enable": r.enable, "backup_request": r.policy}
if r.errMsg != "" {
return map[string]interface{}{
"enable": r.enable,
"backupRequest": r.policy,
"errMsg": r.errMsg,
}
dm["err_msg"] = r.errMsg
}
return map[string]interface{}{"enable": r.enable, "backupRequest": r.policy}
return dm
}

// Type implements the Retryer interface.
Expand Down
172 changes: 87 additions & 85 deletions pkg/retry/failure_retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,18 @@ import (
)

func newFailureRetryer(policy Policy, r *ShouldResultRetry, cbC *cbContainer) (Retryer, error) {
fr := &failureRetryer{specifiedResultRetry: r, cbContainer: cbC}
fr := &failureRetryer{failureCommon: &failureCommon{specifiedResultRetry: r}, cbContainer: cbC}
if err := fr.UpdatePolicy(policy); err != nil {
return nil, fmt.Errorf("newfailureRetryer failed, err=%w", err)
}
return fr, nil
}

type failureRetryer struct {
enable bool
policy *FailurePolicy
backOff BackOff
cbContainer *cbContainer
specifiedResultRetry *ShouldResultRetry
enable bool
*failureCommon
policy *FailurePolicy
cbContainer *cbContainer
sync.RWMutex
errMsg string
}
Expand Down Expand Up @@ -137,20 +136,8 @@ func (r *failureRetryer) Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rp
if !r.cbContainer.enablePercentageLimit && r.cbContainer.cbStat {
circuitbreak.RecordStat(ctx, req, nil, err, cbKey, r.cbContainer.cbCtl, r.cbContainer.cbPanel)
}
if err == nil {
if r.policy.isRespRetry(ctx, resp, cRI) {
// user specified resp to do retry
continue
}
if !r.isRetryResult(ctx, cRI, resp, err, r.policy) {
break
} else {
if i == retryTimes {
// stop retry then wrap error
err = kerrors.ErrRetry.WithCause(err)
} else if !r.isRetryErr(ctx, err, cRI) {
// not timeout or user specified error won't do retry
break
}
}
}
recordRetryInfo(cRI, callTimes, callCosts.String())
Expand All @@ -168,32 +155,21 @@ func (r *failureRetryer) UpdatePolicy(rp Policy) (err error) {
r.Unlock()
return nil
}
var errMsg string
if rp.FailurePolicy == nil || rp.Type != FailureType {
errMsg = "FailurePolicy is nil or retry type not match, cannot do update in failureRetryer"
err = errors.New(errMsg)
}
rt := rp.FailurePolicy.StopPolicy.MaxRetryTimes
if errMsg == "" && (rt < 0 || rt > maxFailureRetryTimes) {
errMsg = fmt.Sprintf("invalid failure MaxRetryTimes[%d]", rt)
err = errors.New(errMsg)
err = errors.New("FailurePolicy is nil or retry type not match, cannot do update in failureRetryer")
}
if errMsg == "" {
if e := checkCBErrorRate(&rp.FailurePolicy.StopPolicy.CBPolicy); e != nil {
rp.FailurePolicy.StopPolicy.CBPolicy.ErrorRate = defaultCBErrRate
errMsg = fmt.Sprintf("failureRetryer %s, use default %0.2f", e.Error(), defaultCBErrRate)
klog.Warnf(errMsg)
}
if err == nil {
err = checkStopPolicy(&rp.FailurePolicy.StopPolicy, maxFailureRetryTimes, r)
}
r.Lock()
defer r.Unlock()
r.enable = rp.Enable
if err != nil {
r.errMsg = errMsg
r.errMsg = err.Error()
return err
}
r.policy = rp.FailurePolicy
r.setSpecifiedResultRetryIfNeeded(r.specifiedResultRetry)
r.setSpecifiedResultRetryIfNeeded(r.specifiedResultRetry, r.policy)
if bo, e := initBackOff(rp.FailurePolicy.BackOffPolicy); e != nil {
r.errMsg = fmt.Sprintf("failureRetryer update BackOffPolicy failed, err=%s", e.Error())
klog.Warnf(r.errMsg)
Expand All @@ -205,7 +181,7 @@ func (r *failureRetryer) UpdatePolicy(rp Policy) (err error) {

// AppendErrMsgIfNeeded implements the Retryer interface.
func (r *failureRetryer) AppendErrMsgIfNeeded(ctx context.Context, err error, ri rpcinfo.RPCInfo, msg string) {
if r.isRetryErr(ctx, err, ri) {
if r.isRetryErr(ctx, err, ri, r.policy) {
// Add additional reason when retry is not applied.
appendErrMsg(err, msg)
}
Expand All @@ -216,7 +192,52 @@ func (r *failureRetryer) Prepare(ctx context.Context, prevRI, retryRI rpcinfo.RP
handleRetryInstance(r.policy.RetrySameNode, prevRI, retryRI)
}

func (r *failureRetryer) isRetryErr(ctx context.Context, err error, ri rpcinfo.RPCInfo) bool {
// Type implements the Retryer interface.
func (r *failureRetryer) Type() Type {
return FailureType
}

// Dump implements the Retryer interface.
func (r *failureRetryer) Dump() map[string]interface{} {
r.RLock()
defer r.RUnlock()
dm := make(map[string]interface{})
dm["enable"] = r.enable
dm["failure_retry"] = r.policy
if r.policy != nil {
dm["specified_result_retry"] = r.dumpSpecifiedResultRetry(*r.policy)
}
if r.errMsg != "" {
dm["err_msg"] = r.errMsg
}
return dm
}

type failureCommon struct {
backOff BackOff
specifiedResultRetry *ShouldResultRetry
}

func (f *failureCommon) setSpecifiedResultRetryIfNeeded(rr *ShouldResultRetry, fp *FailurePolicy) {
if rr != nil {
// save the object specified by client.WithSpecifiedResultRetry(..)
f.specifiedResultRetry = rr
}
if fp != nil {
if f.specifiedResultRetry != nil {
// The priority of client.WithSpecifiedResultRetry(..) is higher, so always update it
// NOTE: client.WithSpecifiedResultRetry(..) will always reject a nil object
fp.ShouldResultRetry = f.specifiedResultRetry
}

// even though rr passed from this func is nil,
// the Policy may also have ShouldResultRetry from client.WithFailureRetry or callopt.WithRetryPolicy.
// convertResultRetry is used to convert 'ErrorRetry and RespRetry' to 'ErrorRetryWithCtx and RespRetryWithCtx'
fp.convertResultRetry()
}
}

func (r *failureCommon) isRetryErr(ctx context.Context, err error, ri rpcinfo.RPCInfo, fp *FailurePolicy) bool {
if err == nil {
return false
}
Expand All @@ -225,15 +246,42 @@ func (r *failureRetryer) isRetryErr(ctx context.Context, err error, ri rpcinfo.R
// But CircuitBreak has been checked in ShouldRetry, it doesn't need to filter ServiceCircuitBreak.
// If there are some other specified errors that cannot be retried, it should be filtered here.

if r.policy.isRetryForTimeout() && kerrors.IsTimeoutError(err) {
if fp.isRetryForTimeout() && kerrors.IsTimeoutError(err) {
return true
}
if r.policy.isErrorRetry(ctx, err, ri) {
if fp.isErrorRetry(ctx, err, ri) {
return true
}
return false
}

// isRetryResult to check if the result need to do retry
// Version Change Note:
// < v0.11.0 if the last result still failed, then wrap the error as RetryErr
// >= v0.11.0 don't wrap RetryErr.
// Consideration: Wrap as RetryErr will be reflected as a retry error from monitoring, which is not friendly for troubleshooting
func (r *failureCommon) isRetryResult(ctx context.Context, cRI rpcinfo.RPCInfo, resp interface{}, err error, fp *FailurePolicy) bool {
if err == nil {
if fp.isRespRetry(ctx, resp, cRI) {
// user specified resp to do retry
return true
}
} else if r.isRetryErr(ctx, err, cRI, fp) {
return true
}
return false
}

func (r *failureCommon) dumpSpecifiedResultRetry(fp FailurePolicy) map[string]bool {
return map[string]bool{
"error_retry": fp.isErrorRetryWithCtxNonNil(),
"resp_retry": fp.isRespRetryWithCtxNonNil(),
// keep it for some versions to confirm the correctness when troubleshooting
"old_error_retry": fp.isErrorRetryNonNil(),
"old_resp_retry": fp.isRespRetryNonNil(),
}
}

func initBackOff(policy *BackOffPolicy) (bo BackOff, err error) {
bo = NoneBackOff
if policy == nil {
Expand Down Expand Up @@ -268,49 +316,3 @@ func initBackOff(policy *BackOffPolicy) (bo BackOff, err error) {
}
return
}

// Type implements the Retryer interface.
func (r *failureRetryer) Type() Type {
return FailureType
}

// Dump implements the Retryer interface.
func (r *failureRetryer) Dump() map[string]interface{} {
r.RLock()
defer r.RUnlock()
dm := make(map[string]interface{})
dm["enable"] = r.enable
dm["failure_retry"] = r.policy
if r.policy != nil {
dm["specified_result_retry"] = map[string]bool{
"error_retry": r.policy.isErrorRetryWithCtxNonNil(),
"resp_retry": r.policy.isRespRetryWithCtxNonNil(),
// keep it for some versions to confirm the correctness when troubleshooting
"old_error_retry": r.policy.isErrorRetryNonNil(),
"old_resp_retry": r.policy.isRespRetryNonNil(),
}
}
if r.errMsg != "" {
dm["errMsg"] = r.errMsg
}
return dm
}

func (r *failureRetryer) setSpecifiedResultRetryIfNeeded(rr *ShouldResultRetry) {
if rr != nil {
// save the object specified by client.WithSpecifiedResultRetry(..)
r.specifiedResultRetry = rr
}
if r.policy != nil {
if r.specifiedResultRetry != nil {
// The priority of client.WithSpecifiedResultRetry(..) is higher, so always update it
// NOTE: client.WithSpecifiedResultRetry(..) will always reject a nil object
r.policy.ShouldResultRetry = r.specifiedResultRetry
}

// even though rr passed from this func is nil,
// the Policy may also have ShouldResultRetry from client.WithFailureRetry or callopt.WithRetryPolicy.
// convertResultRetry is used to convert 'ErrorRetry and RespRetry' to 'ErrorRetryWithCtx and RespRetryWithCtx'
r.policy.convertResultRetry()
}
}
Loading

0 comments on commit e735af6

Please sign in to comment.