Skip to content

Commit

Permalink
Add graceful shutdown (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaeldiscky authored Apr 22, 2024
1 parent 579a869 commit 4d6b197
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 23 deletions.
5 changes: 5 additions & 0 deletions gapi/rpc_create_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/raphaeldiscky/simple-bank/util"
"github.com/raphaeldiscky/simple-bank/val"
"github.com/raphaeldiscky/simple-bank/worker"
"github.com/rs/zerolog/log"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -61,6 +62,9 @@ func (server *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest)
},
}

log.Info().Msg(">> creating user ...")
time.Sleep(10 * time.Second)

txResult, err := server.store.CreateUserTx(ctx, arg)
if err != nil {
if db.ErrorCode(err) == db.UniqueViolation {
Expand All @@ -73,6 +77,7 @@ func (server *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest)
User: convertUser(txResult.User),
}

log.Info().Msg(">> done creating user.")
return rsp, nil
}

Expand Down
124 changes: 101 additions & 23 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package main

import (
"context"
"errors"
"net"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/hibiken/asynq"
"github.com/jackc/pgx/v5/pgxpool"
"golang.org/x/sync/errgroup"

"github.com/rakyll/statik/fs"
"github.com/raphaeldiscky/simple-bank/api"
Expand All @@ -29,6 +33,12 @@ import (
"google.golang.org/protobuf/encoding/protojson"
)

var interruptSignals = []os.Signal{
os.Interrupt,
syscall.SIGTERM,
syscall.SIGQUIT,
}

func main() {

config, err := util.LoadConfig(".")
Expand All @@ -40,7 +50,10 @@ func main() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
}

connPool, err := pgxpool.New(context.Background(), config.DBSource)
ctx, stop := signal.NotifyContext(context.Background(), interruptSignals...)
defer stop()

connPool, err := pgxpool.New(ctx, config.DBSource)
if err != nil {
log.Fatal().Err(err).Err(err).Msg("cannot connect to db:")
}
Expand All @@ -55,9 +68,15 @@ func main() {

taskDistributor := worker.NewRedisTaskDistributor(redisOpt)

go runTaskProcessor(config, redisOpt, store)
go runGatewayServer(config, store, taskDistributor)
runGrpcServer(config, store, taskDistributor)
waitGroup, ctx := errgroup.WithContext(ctx)
runTaskProcessor(ctx, waitGroup, config, redisOpt, store)
runGatewayServer(ctx, waitGroup, config, store, taskDistributor)
runGrpcServer(ctx, waitGroup, config, store, taskDistributor)

err = waitGroup.Wait()
if err != nil {
log.Fatal().Err(err).Msg("error from wait group")
}
}

func runDBMigration(migrationURL string, dbSource string) {
Expand All @@ -73,17 +92,37 @@ func runDBMigration(migrationURL string, dbSource string) {
log.Info().Msg("db migrated successfully")
}

func runTaskProcessor(config util.Config, redisOpt asynq.RedisClientOpt, store db.Store) {
func runTaskProcessor(
ctx context.Context,
waitGroup *errgroup.Group,
config util.Config,
redisOpt asynq.RedisClientOpt,
store db.Store,
) {
mailer := mail.NewGmailSender(config.EmailSenderName, config.EmailSenderAddress, config.EmailSenderPassword)
taskProcessor := worker.NewRedisTaskProcessor(redisOpt, store, mailer)
log.Info().Msg("start task processor")
err := taskProcessor.Start()
if err != nil {
log.Fatal().Err(err).Msg("failed to start task processor:")
}

waitGroup.Go(func() error {
<-ctx.Done()
log.Info().Msg("graceful shutdown task processor")
taskProcessor.Shutdown()
log.Info().Msg("task processor is stop")
return nil
})
}

func runGatewayServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) {
func runGatewayServer(
ctx context.Context,
waitGroup *errgroup.Group,
config util.Config,
store db.Store,
taskDistributor worker.TaskDistributor,
) {
server, err := gapi.NewServer(config, store, taskDistributor)

if err != nil {
Expand All @@ -101,8 +140,7 @@ func runGatewayServer(config util.Config, store db.Store, taskDistributor worker
})

grpcMux := runtime.NewServeMux(jsonOption)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = pb.RegisterSimpleBankHandlerServer(ctx, grpcMux, server)
if err != nil {
log.Fatal().Err(err).Msg("cannot register handler:")
Expand All @@ -122,21 +160,46 @@ func runGatewayServer(config util.Config, store db.Store, taskDistributor worker
// swaggerHandler := http.StripPrefix("/swagger/", http.FileServer(statikFS))
mux.Handle("/swagger/", http.StripPrefix("/swagger/", http.FileServer(statikFS)))

listener, err := net.Listen("tcp", config.HTTPServerAddress)
if err != nil {
log.Fatal().Err(err).Msg("cannot create listener:")
httpServer := &http.Server{
Handler: gapi.HttpLogger(mux),
Addr: config.HTTPServerAddress,
}

log.Info().Msgf("start HTTP gateway server on %s", listener.Addr().String())
handler := gapi.HttpLogger(mux)
err = http.Serve(listener, handler)
if err != nil {
log.Fatal().Err(err).Msg("cannot start HTTP gateway server:")
}
waitGroup.Go(func() error {
log.Info().Msgf("start HTTP gateway server on %s", httpServer.Addr)

err = httpServer.ListenAndServe()
if err != nil {
if errors.Is(err, http.ErrServerClosed) {
return nil
}
log.Error().Err(err).Msg("cannot start HTTP gateway server:")
return err
}
return nil
})

waitGroup.Go(func() error {
<-ctx.Done()
log.Info().Msg("graceful shutdown HTTP gateway server")
err := httpServer.Shutdown(ctx)
if err != nil {
log.Error().Err(err).Msg("failed to shutdown HTTP gateway server:")
return err
}
log.Info().Msg("HTTP gateway server is stop")
return nil
})

}

func runGrpcServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) {
func runGrpcServer(
ctx context.Context,
waitGroup *errgroup.Group,
config util.Config,
store db.Store,
taskDistributor worker.TaskDistributor,
) {
server, err := gapi.NewServer(config, store, taskDistributor)

if err != nil {
Expand All @@ -152,11 +215,26 @@ func runGrpcServer(config util.Config, store db.Store, taskDistributor worker.Ta
log.Fatal().Err(err).Msg("cannot create listener:")
}

log.Info().Msgf("start gRPC server on %s", listener.Addr().String())
err = grpcServer.Serve(listener)
if err != nil {
log.Fatal().Err(err).Msg("cannot start grpc server:")
}
waitGroup.Go(func() error {
log.Info().Msgf("start gRPC server on %s", listener.Addr().String())
err = grpcServer.Serve(listener)
if err != nil {
if errors.Is(err, grpc.ErrServerStopped) {
return nil
}
log.Error().Err(err).Msg("gRPC server failed to serve")
return err
}
return nil
})

waitGroup.Go(func() error {
<-ctx.Done()
log.Info().Msg("graceful shutdown gRPC server...")
grpcServer.GracefulStop()
log.Info().Msg("gRPC server is stopped")
return nil
})

}

Expand Down
5 changes: 5 additions & 0 deletions worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (

type TaskProcessor interface {
Start() error
Shutdown()
ProcessTaskSendVerifyEmail(ctx context.Context, task *asynq.Task) error
}

Expand Down Expand Up @@ -60,3 +61,7 @@ func (processor *RedisTaskProcessor) Start() error {
mux.HandleFunc(TaskSendVerifyEmail, processor.ProcessTaskSendVerifyEmail)
return processor.server.Start(mux)
}

func (processor *RedisTaskProcessor) Shutdown() {
processor.server.Stop()
}

0 comments on commit 4d6b197

Please sign in to comment.