diff --git a/gapi/rpc_create_user.go b/gapi/rpc_create_user.go index 3fe5487..5691256 100644 --- a/gapi/rpc_create_user.go +++ b/gapi/rpc_create_user.go @@ -10,6 +10,7 @@ import ( "github.com/raphaeldiscky/simple-bank/util" "github.com/raphaeldiscky/simple-bank/val" "github.com/raphaeldiscky/simple-bank/worker" + "github.com/rs/zerolog/log" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -61,6 +62,9 @@ func (server *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) }, } + log.Info().Msg(">> creating user ...") + time.Sleep(10 * time.Second) + txResult, err := server.store.CreateUserTx(ctx, arg) if err != nil { if db.ErrorCode(err) == db.UniqueViolation { @@ -73,6 +77,7 @@ func (server *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) User: convertUser(txResult.User), } + log.Info().Msg(">> done creating user.") return rsp, nil } diff --git a/main.go b/main.go index 96d150a..4f9e706 100644 --- a/main.go +++ b/main.go @@ -2,9 +2,12 @@ package main import ( "context" + "errors" "net" "net/http" "os" + "os/signal" + "syscall" "github.com/golang-migrate/migrate/v4" _ "github.com/golang-migrate/migrate/v4/database/postgres" @@ -12,6 +15,7 @@ import ( "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/hibiken/asynq" "github.com/jackc/pgx/v5/pgxpool" + "golang.org/x/sync/errgroup" "github.com/rakyll/statik/fs" "github.com/raphaeldiscky/simple-bank/api" @@ -29,6 +33,12 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) +var interruptSignals = []os.Signal{ + os.Interrupt, + syscall.SIGTERM, + syscall.SIGQUIT, +} + func main() { config, err := util.LoadConfig(".") @@ -40,7 +50,10 @@ func main() { log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) } - connPool, err := pgxpool.New(context.Background(), config.DBSource) + ctx, stop := signal.NotifyContext(context.Background(), interruptSignals...) + defer stop() + + connPool, err := pgxpool.New(ctx, config.DBSource) if err != nil { log.Fatal().Err(err).Err(err).Msg("cannot connect to db:") } @@ -55,9 +68,15 @@ func main() { taskDistributor := worker.NewRedisTaskDistributor(redisOpt) - go runTaskProcessor(config, redisOpt, store) - go runGatewayServer(config, store, taskDistributor) - runGrpcServer(config, store, taskDistributor) + waitGroup, ctx := errgroup.WithContext(ctx) + runTaskProcessor(ctx, waitGroup, config, redisOpt, store) + runGatewayServer(ctx, waitGroup, config, store, taskDistributor) + runGrpcServer(ctx, waitGroup, config, store, taskDistributor) + + err = waitGroup.Wait() + if err != nil { + log.Fatal().Err(err).Msg("error from wait group") + } } func runDBMigration(migrationURL string, dbSource string) { @@ -73,7 +92,13 @@ func runDBMigration(migrationURL string, dbSource string) { log.Info().Msg("db migrated successfully") } -func runTaskProcessor(config util.Config, redisOpt asynq.RedisClientOpt, store db.Store) { +func runTaskProcessor( + ctx context.Context, + waitGroup *errgroup.Group, + config util.Config, + redisOpt asynq.RedisClientOpt, + store db.Store, +) { mailer := mail.NewGmailSender(config.EmailSenderName, config.EmailSenderAddress, config.EmailSenderPassword) taskProcessor := worker.NewRedisTaskProcessor(redisOpt, store, mailer) log.Info().Msg("start task processor") @@ -81,9 +106,23 @@ func runTaskProcessor(config util.Config, redisOpt asynq.RedisClientOpt, store d if err != nil { log.Fatal().Err(err).Msg("failed to start task processor:") } + + waitGroup.Go(func() error { + <-ctx.Done() + log.Info().Msg("graceful shutdown task processor") + taskProcessor.Shutdown() + log.Info().Msg("task processor is stop") + return nil + }) } -func runGatewayServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) { +func runGatewayServer( + ctx context.Context, + waitGroup *errgroup.Group, + config util.Config, + store db.Store, + taskDistributor worker.TaskDistributor, +) { server, err := gapi.NewServer(config, store, taskDistributor) if err != nil { @@ -101,8 +140,7 @@ func runGatewayServer(config util.Config, store db.Store, taskDistributor worker }) grpcMux := runtime.NewServeMux(jsonOption) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + err = pb.RegisterSimpleBankHandlerServer(ctx, grpcMux, server) if err != nil { log.Fatal().Err(err).Msg("cannot register handler:") @@ -122,21 +160,46 @@ func runGatewayServer(config util.Config, store db.Store, taskDistributor worker // swaggerHandler := http.StripPrefix("/swagger/", http.FileServer(statikFS)) mux.Handle("/swagger/", http.StripPrefix("/swagger/", http.FileServer(statikFS))) - listener, err := net.Listen("tcp", config.HTTPServerAddress) - if err != nil { - log.Fatal().Err(err).Msg("cannot create listener:") + httpServer := &http.Server{ + Handler: gapi.HttpLogger(mux), + Addr: config.HTTPServerAddress, } - log.Info().Msgf("start HTTP gateway server on %s", listener.Addr().String()) - handler := gapi.HttpLogger(mux) - err = http.Serve(listener, handler) - if err != nil { - log.Fatal().Err(err).Msg("cannot start HTTP gateway server:") - } + waitGroup.Go(func() error { + log.Info().Msgf("start HTTP gateway server on %s", httpServer.Addr) + + err = httpServer.ListenAndServe() + if err != nil { + if errors.Is(err, http.ErrServerClosed) { + return nil + } + log.Error().Err(err).Msg("cannot start HTTP gateway server:") + return err + } + return nil + }) + + waitGroup.Go(func() error { + <-ctx.Done() + log.Info().Msg("graceful shutdown HTTP gateway server") + err := httpServer.Shutdown(ctx) + if err != nil { + log.Error().Err(err).Msg("failed to shutdown HTTP gateway server:") + return err + } + log.Info().Msg("HTTP gateway server is stop") + return nil + }) } -func runGrpcServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) { +func runGrpcServer( + ctx context.Context, + waitGroup *errgroup.Group, + config util.Config, + store db.Store, + taskDistributor worker.TaskDistributor, +) { server, err := gapi.NewServer(config, store, taskDistributor) if err != nil { @@ -152,11 +215,26 @@ func runGrpcServer(config util.Config, store db.Store, taskDistributor worker.Ta log.Fatal().Err(err).Msg("cannot create listener:") } - log.Info().Msgf("start gRPC server on %s", listener.Addr().String()) - err = grpcServer.Serve(listener) - if err != nil { - log.Fatal().Err(err).Msg("cannot start grpc server:") - } + waitGroup.Go(func() error { + log.Info().Msgf("start gRPC server on %s", listener.Addr().String()) + err = grpcServer.Serve(listener) + if err != nil { + if errors.Is(err, grpc.ErrServerStopped) { + return nil + } + log.Error().Err(err).Msg("gRPC server failed to serve") + return err + } + return nil + }) + + waitGroup.Go(func() error { + <-ctx.Done() + log.Info().Msg("graceful shutdown gRPC server...") + grpcServer.GracefulStop() + log.Info().Msg("gRPC server is stopped") + return nil + }) } diff --git a/worker/processor.go b/worker/processor.go index 68a178d..21e81ed 100644 --- a/worker/processor.go +++ b/worker/processor.go @@ -16,6 +16,7 @@ const ( type TaskProcessor interface { Start() error + Shutdown() ProcessTaskSendVerifyEmail(ctx context.Context, task *asynq.Task) error } @@ -60,3 +61,7 @@ func (processor *RedisTaskProcessor) Start() error { mux.HandleFunc(TaskSendVerifyEmail, processor.ProcessTaskSendVerifyEmail) return processor.server.Start(mux) } + +func (processor *RedisTaskProcessor) Shutdown() { + processor.server.Stop() +}