Skip to content

Commit

Permalink
[all] added webserver to OTNS main wait-group to ensure webserver exi…
Browse files Browse the repository at this point in the history
…t is waited on; added logging for exit status; parallel exit strategy for OT nodes.
  • Loading branch information
EskoDijk committed Aug 21, 2023
1 parent b73ff09 commit fcc72c0
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 28 deletions.
4 changes: 2 additions & 2 deletions cli/runner.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020, The OTNS Authors.
// Copyright (c) 2020-2023, The OTNS Authors.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -38,7 +38,7 @@ var (
)

func Run(cr *CmdRunner, cliOptions *runcli.CliOptions) error {
defer simplelogger.Debugf("CLI exit")
defer simplelogger.Debugf("CLI exit.")

return runcli.RunCli(cr, cliOptions)
}
Expand Down
7 changes: 5 additions & 2 deletions cmd/otns-replay/otns_replay.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022, The OTNS Authors.
// Copyright (c) 2022-2023, The OTNS Authors.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -30,6 +30,7 @@ import (
"context"
"flag"
"net"
"net/http"
"os"

"github.com/openthread/ot-ns/progctx"
Expand Down Expand Up @@ -72,7 +73,9 @@ func main() {
go func() {
siteAddr := ":8997"
err := webSite.Serve(siteAddr)
simplelogger.PanicIfError(err)
if err != http.ErrServerClosed {
simplelogger.PanicIfError(err)
}
}()

go func() {
Expand Down
14 changes: 6 additions & 8 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (d *Dispatcher) Stop() {
d.GoCancel() // cancel current simulation period
d.vis.Stop()
close(d.pcapFrameChan)
simplelogger.Debugf("waiting for dispatcher threads exit ...")
simplelogger.Debugf("waiting for dispatcher threads to stop ...")
d.waitGroup.Wait()
simplelogger.Debugf("dispatcher exit.")
}
Expand Down Expand Up @@ -582,7 +582,8 @@ func (d *Dispatcher) processNextEvent(simSpeed float64) bool {

func (d *Dispatcher) eventsReader() {
defer d.waitGroup.Done()
defer os.RemoveAll(d.socketName) // delete socket file when done.
defer simplelogger.Debugf("dispatcher node socket threads stopped.")
defer os.RemoveAll(d.socketName) // delete Unix socket file when done.
defer d.udpln.Close()

simplelogger.Debugf("dispatcher listening on socket %s ...", d.socketName)
Expand Down Expand Up @@ -610,7 +611,7 @@ func (d *Dispatcher) eventsReader() {

buf := make([]byte, 65536)
myNodeId := 0
var myNode *Node = nil

for {
_ = myConn.SetReadDeadline(time.Now().Add(readTimeout))
n, err := myConn.Read(buf)
Expand All @@ -624,9 +625,6 @@ func (d *Dispatcher) eventsReader() {
continue
} else if err != nil {
simplelogger.Errorf("Node %d - Socket read error: %+v", myNodeId, err)
if myNode != nil && myNode.err == nil {
myNode.err = err
}
break
}

Expand All @@ -648,8 +646,8 @@ func (d *Dispatcher) eventsReader() {
}(conn)
}

simplelogger.Debugf("dispatcher waiting for node socket threads to exit ...")
d.waitGroupNodes.Wait() // wait for all nodes to exit before closing eventsReader.
simplelogger.Debugf("waiting for dispatcher node socket threads to stop ...")
d.waitGroupNodes.Wait() // wait for all node goroutines to stop before closing eventsReader.
}

func (d *Dispatcher) advanceNodeTime(node *Node, timestamp uint64, force bool) {
Expand Down
10 changes: 5 additions & 5 deletions otns_main/otns_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,15 @@ func Main(ctx *progctx.ProgCtx, visualizerCreator func(ctx *progctx.ProgCtx, arg
vis = visualizeGrpc.NewGrpcVisualizer(visGrpcServerAddr, replayFn)
}

ctx.WaitAdd("webserver", 1)
go func() {
defer ctx.WaitDone("webserver")
siteAddr := fmt.Sprintf("%s:%d", args.DispatcherHost, args.DispatcherPort-3)
err := webSite.Serve(siteAddr) // blocks until webSite.StopServe() called
if err != nil && ctx.Err() == nil {
simplelogger.Errorf("website stopped unexpectedly: %+v, OTNS-Web won't be available!", err)
} else if err != nil {
simplelogger.Debugf("website stopped while exiting: %+v", err)
simplelogger.Errorf("webserver stopped unexpectedly: %+v, OTNS-Web won't be available!", err)
}
}()
defer webSite.StopServe()

sim := createSimulation(ctx)
rt := cli.NewCmdRunner(ctx, sim)
Expand All @@ -202,6 +201,7 @@ func Main(ctx *progctx.ProgCtx, visualizerCreator func(ctx *progctx.ProgCtx, arg
vis.Run() // visualize must run in the main thread

simplelogger.Debugf("waiting for OTNS to stop gracefully ...")
webSite.StopServe()
ctx.Wait()
}

Expand All @@ -212,8 +212,8 @@ func handleSignals(ctx *progctx.ProgCtx) {

ctx.WaitAdd("handleSignals", 1)
go func() {
defer simplelogger.Debugf("handleSignals exit.")
defer ctx.WaitDone("handleSignals")
defer simplelogger.Debugf("waiting for handleSignals exit.")

for {
select {
Expand Down
2 changes: 1 addition & 1 deletion otnstester/OtnsTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func NewOtnsTest(t *testing.T) *OtnsTest {

go func() {
defer func() {
simplelogger.Infof("OTNS exited.")
simplelogger.Infof("OTNS exit.")
close(ot.otnsDone)
}()

Expand Down
4 changes: 2 additions & 2 deletions progctx/progctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func (ctx *ProgCtx) Cancel(err interface{}) {
ctx.cancel()

if e, ok := err.(error); ok {
simplelogger.TraceError("program exit: %v", e)
simplelogger.TraceError("program exit requested with ctx error: %v", e)
} else {
simplelogger.Infof("program exit: %v", err)
simplelogger.Debugf("program exit requested without ctx error: %v", err)
}

for _, f := range ctx.deferred {
Expand Down
4 changes: 2 additions & 2 deletions pylibs/unittests/test_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def testSIGINT(self):
def testSIGTERM(self):
self._test_signal_exit(signal.SIGTERM)

def testSIGTERMx500(self):
N = 500
def testSIGTERMx200(self):
N = 200
for i in range(N):
logging.info("round %d", i + 1)
self._test_signal_exit(signal.SIGTERM, 0.1 * random.random())
Expand Down
14 changes: 9 additions & 5 deletions simulation/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,19 @@ func (node *Node) Stop() {
simplelogger.Debugf("%v - stopped, state = %s", node, node.GetState())
}

func (node *Node) SignalExit() error {
return node.cmd.Process.Signal(syscall.SIGTERM)
}

func (node *Node) Exit() error {
_ = node.cmd.Process.Signal(syscall.SIGTERM)

node.S.Dispatcher().RecvEvents() // ensure to receive any remaining events of node.

// no more events or lineReader lines should come, so we can close the virtual-UART.
// no more events or lineReader lines will be accepted, so we close the virtual-UART.
// pipes are closed to allow cmd.Wait() to be successful and not hang.
_ = node.pipeIn.Close()
_ = node.pipeErr.Close()
_ = node.pipeOut.Close()
_ = node.virtualUartReader.Close()

err := node.cmd.Wait() // wait for process end

if node.logFile != nil {
Expand Down Expand Up @@ -838,7 +839,10 @@ func (node *Node) writeToLogFile(line string) {

_, err := node.logFile.WriteString(fmt.Sprintf("%-10d ", timestamp) + line + "\n")
if err != nil {
simplelogger.Error("Couldn't write to log file of %v, closing it (%s)", node, node.logFile)
if node.S.ctx.Err() == nil {
simplelogger.Debugf("ctx.Err()=%v", node.S.ctx.Err())
simplelogger.Errorf("Couldn't write to log file of %v, closing it (%s)", node, node.logFile.Name())
}
_ = node.logFile.Close()
node.logFile = nil
}
Expand Down
10 changes: 10 additions & 0 deletions simulation/simulation.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,18 @@ func (s *Simulation) Stop() {
simplelogger.Infof("stopping simulation and exiting nodes ...")
s.stopped = true

// for faster process, signal node exit first in parallel.
for _, node := range s.nodes {
_ = node.SignalExit()
}
s.Dispatcher().RecvEvents() // meanwhile receive any events of (exiting) nodes.

// then clean up and wait for each node process to stop, sequentially.
for _, node := range s.nodes {
_ = node.Exit()
}
s.Dispatcher().RecvEvents() // ensure to receive any remaining events of exited nodes.

simplelogger.Debugf("all simulation nodes exited.")
}

Expand Down Expand Up @@ -300,6 +309,7 @@ func (s *Simulation) DeleteNode(nodeid NodeId) error {

delete(s.nodes, nodeid)
_ = node.Exit()
s.d.RecvEvents() // ensure to receive any final events of deleted node.
s.d.DeleteNode(nodeid)
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion web/site/site.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,14 @@ func Serve(listenAddr string) error {
return http.ErrServerClosed
}
httpServer = &http.Server{Addr: listenAddr, Handler: nil}
simplelogger.Infof("OTNS web serving on %s ...", listenAddr)
simplelogger.Infof("OTNS webserver now serving on %s ...", listenAddr)
defer simplelogger.Debugf("webserver exit.")
httpServerMutex.Unlock()
return httpServer.ListenAndServe()
}

func StopServe() {
simplelogger.Debugf("requesting webserver to exit ...")
httpServerMutex.Lock()
if httpServer != nil {
_ = httpServer.Close()
Expand Down

0 comments on commit fcc72c0

Please sign in to comment.