Skip to content

Commit

Permalink
Add logger for asynq workers
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaeldiscky committed Apr 21, 2024
1 parent 57a17db commit f30257f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 4 deletions.
43 changes: 43 additions & 0 deletions worker/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package worker

import (
"context"
"fmt"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

type Logger struct{}

func NewLogger() *Logger {
return &Logger{}
}

func (logger *Logger) Print(level zerolog.Level, args ...interface{}) {
log.WithLevel(level).Msg(fmt.Sprint(args...))
}

func (logger *Logger) Printf(ctx context.Context, format string, v ...interface{}) {
log.WithLevel(zerolog.DebugLevel).Msgf(format, v...)
}

func (logger *Logger) Debug(args ...interface{}) {
logger.Print(zerolog.DebugLevel, args...)
}

func (logger *Logger) Info(args ...interface{}) {
logger.Print(zerolog.InfoLevel, args...)
}

func (logger *Logger) Warn(args ...interface{}) {
logger.Print(zerolog.WarnLevel, args...)
}

func (logger *Logger) Error(args ...interface{}) {
logger.Print(zerolog.ErrorLevel, args...)
}

func (logger *Logger) Fatal(args ...interface{}) {
logger.Print(zerolog.FatalLevel, args...)
}
15 changes: 11 additions & 4 deletions worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/hibiken/asynq"
db "github.com/raphaeldiscky/simple-bank/db/sqlc"
"github.com/rs/zerolog/log"
)

const (
Expand All @@ -24,17 +25,23 @@ type RedisTaskProcessor struct {

// NewRedisTaskProcessor will pick up the tasks from the Redis queue and process them
func NewRedisTaskProcessor(redisOpt asynq.RedisClientOpt, store db.Store) TaskProcessor {
logger := NewLogger()
server := asynq.NewServer(
redisOpt,
asynq.Config{
// Specify how many workers you want
Concurrency: 10,
// Optionally specify multiple queues with different priority.
// If not specified, uses the default (10).
Queues: map[string]int{
QueueCritical: 6,
QueueDefault: 3,
QueueCritical: 10,
QueueDefault: 5,
},
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
log.Error().Err(err).
Str("type", task.Type()).
Bytes("payload", task.Payload()).
Msg("process task failed")
}),
Logger: logger,
},
)

Expand Down

0 comments on commit f30257f

Please sign in to comment.