diff --git a/server/client.go b/server/client.go index b308389f7b..309eafeea3 100644 --- a/server/client.go +++ b/server/client.go @@ -262,6 +262,9 @@ type client struct { last time.Time lastIn time.Time + repliesSincePrune uint16 + lastReplyPrune time.Time + headers bool rtt time.Duration @@ -422,6 +425,7 @@ const ( pruneSize = 32 routeTargetInit = 8 replyPermLimit = 4096 + replyPruneTime = time.Second ) // Represent read cache booleans with a bitmask @@ -3634,9 +3638,11 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su // If we are tracking dynamic publish permissions that track reply subjects, // do that accounting here. We only look at client.replies which will be non-nil. - if client.replies != nil && len(reply) > 0 { + // Only reply subject permissions if the client is not already allowed to publish to the reply subject. + if client.replies != nil && len(reply) > 0 && !client.pubAllowedFullCheck(string(reply), true, true) { client.replies[string(reply)] = &resp{time.Now(), 0} - if len(client.replies) > replyPermLimit { + client.repliesSincePrune++ + if client.repliesSincePrune > replyPermLimit || time.Since(client.lastReplyPrune) > replyPruneTime { client.pruneReplyPerms() } } @@ -3760,6 +3766,9 @@ func (c *client) pruneReplyPerms() { delete(c.replies, k) } } + + c.repliesSincePrune = 0 + c.lastReplyPrune = now } // pruneDenyCache will prune the deny cache via randomly @@ -3828,7 +3837,7 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo allowed = np == 0 } - // If we are currently not allowed but we are tracking reply subjects + // If we are tracking reply subjects // dynamically, check to see if we are allowed here but avoid pcache. // We need to acquire the lock though. if !allowed && fullCheck && c.perms.resp != nil {