Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: pass custom log to pubsub #3016

Open
wants to merge 2 commits into
base: v8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
for _, node := range nodes {
_, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
if err != nil {
internal.Logger.Printf(c.Context(), "ReapStaleConns failed: %s", err)
internal.Logger.Printf("ReapStaleConns failed: %s", err)
}
}
}
Expand Down Expand Up @@ -1622,7 +1622,7 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo {

info := cmdsInfo[name]
if info == nil {
internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name)
internal.Logger.Printf("info for cmd=%s not found", name)
}
return info
}
Expand Down
2 changes: 0 additions & 2 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func usePrecise(dur time.Duration) bool {
func formatMs(ctx context.Context, dur time.Duration) int64 {
if dur > 0 && dur < time.Millisecond {
internal.Logger.Printf(
ctx,
"specified duration is %s, but minimal supported value is %s - truncating to 1ms",
dur, time.Millisecond,
)
Expand All @@ -35,7 +34,6 @@ func formatMs(ctx context.Context, dur time.Duration) int64 {
func formatSec(ctx context.Context, dur time.Duration) int64 {
if dur > 0 && dur < time.Second {
internal.Logger.Printf(
ctx,
"specified duration is %s, but minimal supported value is %s - truncating to 1s",
dur, time.Second,
)
Expand Down
2 changes: 1 addition & 1 deletion export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (state *clusterState) IsConsistent(ctx context.Context) bool {
func GetSlavesAddrByName(ctx context.Context, c *SentinelClient, name string) []string {
addrs, err := c.Slaves(ctx, name).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
internal.Logger.Printf("sentinel: Slaves name=%q failed: %s",
name, err)
return []string{}
}
Expand Down
5 changes: 2 additions & 3 deletions internal/log.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package internal

import (
"context"
"fmt"
"log"
"os"
)

type Logging interface {
Printf(ctx context.Context, format string, v ...interface{})
Printf(format string, v ...interface{})
}

type logger struct {
log *log.Logger
}

func (l *logger) Printf(ctx context.Context, format string, v ...interface{}) {
func (l *logger) Printf(format string, v ...interface{}) {
_ = l.log.Output(2, fmt.Sprintf(format, v...))
}

Expand Down
4 changes: 2 additions & 2 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (p *ConnPool) popIdle() (*Conn, error) {

func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
if cn.rd.Buffered() > 0 {
internal.Logger.Printf(ctx, "Conn has unread data")
internal.Logger.Printf("Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
}
Expand Down Expand Up @@ -492,7 +492,7 @@ func (p *ConnPool) reaper(frequency time.Duration) {
}
_, err := p.ReapStaleConns()
if err != nil {
internal.Logger.Printf(context.Background(), "ReapStaleConns failed: %s", err)
internal.Logger.Printf("ReapStaleConns failed: %s", err)
continue
}
case <-p.closedCh:
Expand Down
23 changes: 16 additions & 7 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (c *PubSub) closeTheCn(reason error) error {
return nil
}
if !c.closed {
internal.Logger.Printf(c.getContext(), "redis: discarding bad PubSub connection: %s", reason)
internal.Logger.Printf("redis: discarding bad PubSub connection: %s", reason)
}
err := c.closeConn(c.cn)
c.cn = nil
Expand Down Expand Up @@ -470,6 +470,12 @@ func (c *PubSub) ChannelWithSubscriptions(_ context.Context, size int) <-chan in

type ChannelOption func(c *channel)

func WithLogger(logger internal.Logging) ChannelOption {
return func(c *channel) {
c.logger = logger
}
}

// WithChannelSize specifies the Go chan size that is used to buffer incoming messages.
//
// The default is 100 messages.
Expand Down Expand Up @@ -510,6 +516,8 @@ type channel struct {
chanSize int
chanSendTimeout time.Duration
checkInterval time.Duration

logger internal.Logging
}

func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
Expand All @@ -519,6 +527,7 @@ func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
chanSize: 100,
chanSendTimeout: time.Minute,
checkInterval: 3 * time.Second,
logger: internal.Logger,
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -602,12 +611,12 @@ func (c *channel) initMsgChan() {
<-timer.C
}
case <-timer.C:
internal.Logger.Printf(
ctx, "redis: %s channel is full for %s (message is dropped)",
c.logger.Printf(
"redis: %+v channel is full for %s (message is dropped)",
c, c.chanSendTimeout)
}
default:
internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
c.logger.Printf("redis: unknown message type: %T", msg)
}
}
}()
Expand Down Expand Up @@ -656,12 +665,12 @@ func (c *channel) initAllChan() {
<-timer.C
}
case <-timer.C:
internal.Logger.Printf(
ctx, "redis: %s channel is full for %s (message is dropped)",
c.logger.Printf(
"redis: %+v channel is full for %s (message is dropped)",
c, c.chanSendTimeout)
}
default:
internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
c.logger.Printf("redis: unknown message type: %T", msg)
}
}
}()
Expand Down
4 changes: 2 additions & 2 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (c *ringShards) Heartbeat(frequency time.Duration) {
err := shard.Client.Ping(ctx).Err()
isUp := err == nil || err == pool.ErrPoolTimeout
if shard.Vote(isUp) {
internal.Logger.Printf(context.Background(), "ring shard state changed: %s", shard)
internal.Logger.Printf("ring shard state changed: %s", shard)
rebalance = true
}
}
Expand Down Expand Up @@ -576,7 +576,7 @@ func (c *Ring) cmdInfo(ctx context.Context, name string) *CommandInfo {
}
info := cmdsInfo[name]
if info == nil {
internal.Logger.Printf(ctx, "info for cmd=%s not found", name)
internal.Logger.Printf("info for cmd=%s not found", name)
}
return info
}
Expand Down
16 changes: 8 additions & 8 deletions sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {

masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
internal.Logger.Printf("sentinel: GetMasterAddrByName master=%q failed: %s",
c.opt.MasterName, err)
_ = sentinel.Close()
continue
Expand Down Expand Up @@ -558,7 +558,7 @@ func (c *sentinelFailover) slaveAddrs(ctx context.Context, useDisconnected bool)

slaves, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s",
internal.Logger.Printf("sentinel: Slaves master=%q failed: %s",
c.opt.MasterName, err)
_ = sentinel.Close()
continue
Expand All @@ -584,7 +584,7 @@ func (c *sentinelFailover) slaveAddrs(ctx context.Context, useDisconnected bool)
func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string {
addr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s",
internal.Logger.Printf("sentinel: GetMasterAddrByName name=%q failed: %s",
c.opt.MasterName, err)
return ""
}
Expand All @@ -594,7 +594,7 @@ func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *Sentinel
func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string {
addrs, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
internal.Logger.Printf("sentinel: Slaves name=%q failed: %s",
c.opt.MasterName, err)
return []string{}
}
Expand Down Expand Up @@ -658,7 +658,7 @@ func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) {
}
c._masterAddr = addr

internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
internal.Logger.Printf("sentinel: new master=%q addr=%q",
c.opt.MasterName, addr)
if c.onFailover != nil {
c.onFailover(ctx, addr)
Expand All @@ -679,7 +679,7 @@ func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelCl
func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
sentinels, err := c.sentinel.Sentinels(ctx, c.opt.MasterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.opt.MasterName, err)
internal.Logger.Printf("sentinel: Sentinels master=%q failed: %s", c.opt.MasterName, err)
return
}
for _, sentinel := range sentinels {
Expand All @@ -697,7 +697,7 @@ func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
if ip != "" && port != "" {
sentinelAddr := net.JoinHostPort(ip, port)
if !contains(c.sentinelAddrs, sentinelAddr) {
internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q",
internal.Logger.Printf("sentinel: discovered new sentinel=%q for master=%q",
sentinelAddr, c.opt.MasterName)
c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
}
Expand All @@ -717,7 +717,7 @@ func (c *sentinelFailover) listen(pubsub *PubSub) {
if msg.Channel == "+switch-master" {
parts := strings.Split(msg.Payload, " ")
if parts[0] != c.opt.MasterName {
internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0])
internal.Logger.Printf("sentinel: ignore addr for master=%q", parts[0])
continue
}
addr := net.JoinHostPort(parts[3], parts[4])
Expand Down
Loading