Skip to content

Commit

Permalink
[all] ordering updates for goroutine-starting; test fix by reinstatin…
Browse files Browse the repository at this point in the history
…g stale-event deletion from queue; longer duration for autoGo.
  • Loading branch information
EskoDijk committed Sep 7, 2023
1 parent 71ab9ad commit 51f2783
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 35 deletions.
3 changes: 3 additions & 0 deletions cmd/otns/otns.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"context"
"os"

"github.com/simonlingoogle/go-simplelogger"

"github.com/openthread/ot-ns/otns_main"
"github.com/openthread/ot-ns/progctx"
"github.com/openthread/ot-ns/visualize"
Expand All @@ -40,5 +42,6 @@ func main() {
otns_main.Main(ctx, func(ctx *progctx.ProgCtx, args *otns_main.MainArgs) visualize.Visualizer {
return nil
}, nil)
simplelogger.Debugf("OTNS exit.")
os.Exit(0)
}
17 changes: 13 additions & 4 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,8 +583,8 @@ func (d *Dispatcher) processNextEvent(simSpeed float64) bool {
}
}
}
} else {
simplelogger.Panicf("processNextEvent() with deleted/unknown node %v: %v", evt.NodeId, evt)
} else if evt.NodeId > 0 {
simplelogger.Warnf("processNextEvent() with deleted/unknown node %v: %v", evt.NodeId, evt)
}
}
nextAlarmTime = d.alarmMgr.NextTimestamp()
Expand Down Expand Up @@ -635,7 +635,11 @@ func (d *Dispatcher) eventsReader() {
if errors.Is(err, io.EOF) {
break
} else if err != nil {
simplelogger.Errorf("Node %d - Closing socket after read error: %+v", myNodeId, err)
d.cbHandler.OnLogMessage(LogEntry{
NodeId: myNodeId,
Level: WatchCritLevel,
Msg: fmt.Sprintf("closing socket after read error: %+v", err),
})
break
}

Expand All @@ -661,7 +665,11 @@ func (d *Dispatcher) eventsReader() {
// increase buf size when needed
if n > len(buf)/2 {
buf = make([]byte, len(buf)*2)
simplelogger.Warnf("Increasing eventsReader() buf size for node %d to: %d KB", myNodeId, len(buf)/1024)
d.cbHandler.OnLogMessage(LogEntry{
NodeId: myNodeId,
Level: WatchWarnLevel,
Msg: fmt.Sprintf("increasing eventsReader() buf size to: %d KB", len(buf)/1024),
})
}
}

Expand Down Expand Up @@ -1330,6 +1338,7 @@ func (d *Dispatcher) DeleteNode(id NodeId) {
d.energyAnalyser.DeleteNode(id)
d.vis.DeleteNode(id)
d.radioModel.DeleteNode(id)
d.eventQueue.DisableEventsForNode(id)
}

// SetNodeFailed sets the radio of the node to failed (true) or operational (false) state.
Expand Down
22 changes: 16 additions & 6 deletions dispatcher/send_queue.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2022, The OTNS Authors.
// Copyright (c) 2020-2023, The OTNS Authors.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -36,15 +36,15 @@ type sendQueue struct {
q []*Event
}

func (sq sendQueue) Len() int {
func (sq *sendQueue) Len() int {
return len(sq.q)
}

func (sq sendQueue) Less(i, j int) bool {
func (sq *sendQueue) Less(i, j int) bool {
return sq.q[i].Timestamp < sq.q[j].Timestamp
}

func (sq sendQueue) Swap(i, j int) {
func (sq *sendQueue) Swap(i, j int) {
sq.q[i], sq.q[j] = sq.q[j], sq.q[i]
}

Expand All @@ -59,15 +59,15 @@ func (sq *sendQueue) Pop() (elem interface{}) {
return
}

func (sq sendQueue) NextTimestamp() uint64 {
func (sq *sendQueue) NextTimestamp() uint64 {
if len(sq.q) > 0 {
return sq.q[0].Timestamp
} else {
return Ever
}
}

func (sq sendQueue) NextEvent() *Event {
func (sq *sendQueue) NextEvent() *Event {
if len(sq.q) > 0 {
return sq.q[0]
} else {
Expand All @@ -79,6 +79,16 @@ func (sq *sendQueue) Add(evt *Event) {
heap.Push(sq, evt)
}

// DisableEventsForNode diables all events to/from a particular nodeid from the queue.
// This is done by setting a NodeId of '0' for these events.
func (sq *sendQueue) DisableEventsForNode(nodeid NodeId) {
for _, evt := range sq.q {
if evt.NodeId == nodeid {
evt.NodeId = 0 // make the event invalid.
}
}
}

func (sq *sendQueue) PopNext() *Event {
return heap.Pop(sq).(*Event)
}
Expand Down
36 changes: 20 additions & 16 deletions otns_main/otns_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,21 @@ func parseArgs() {
defaultOtCli = simulation.DefaultExecutableConfig.Ftd // only use custom MTD, not FTD.
}

flag.StringVar(&args.Speed, "speed", "1", "set simulating speed")
flag.StringVar(&args.Speed, "speed", "1", "set simulation speed")
flag.StringVar(&args.OtCliPath, "ot-cli", defaultOtCli, "specify the OT CLI executable, for FTD and also for MTD if not configured otherwise.")
flag.StringVar(&args.OtCliMtdPath, "ot-cli-mtd", defaultOtCliMtd, "specify the OT CLI MTD executable, separately from FTD executable.")
flag.StringVar(&args.InitScriptName, "ot-script", "", "specify the OT node init script filename, to use for init of new nodes. By default an internal script is used.")
flag.BoolVar(&args.AutoGo, "autogo", true, "auto go (runs the simulation at given speed, without issuing 'go' commands.)")
flag.BoolVar(&args.ReadOnly, "readonly", false, "readonly simulation can not be manipulated")
flag.StringVar(&args.LogLevel, "log", "warn", "set logging level: debug, info, warn, error.")
flag.StringVar(&args.LogLevel, "log", "warn", "set logging level: trace, debug, info, warn, error.")
flag.StringVar(&args.WatchLevel, "watch", "off", "set default watch level for all new nodes: off, trace, debug, info, note, warn, error.")
flag.BoolVar(&args.OpenWeb, "web", true, "open web visualization")
flag.BoolVar(&args.RawMode, "raw", false, "use raw mode (skips OT node init by script)")
flag.BoolVar(&args.Real, "real", false, "use real mode (for real devices)")
flag.BoolVar(&args.Real, "real", false, "use real mode (for real devices - currently NOT SUPPORTED)")
flag.StringVar(&args.ListenAddr, "listen", fmt.Sprintf("localhost:%d", InitialDispatcherPort), "specify UDP listen address and port")
flag.BoolVar(&args.DumpPackets, "dump-packets", false, "dump packets")
flag.BoolVar(&args.NoPcap, "no-pcap", false, "do not generate PCAP file (named \"current.pcap\")")
flag.BoolVar(&args.NoReplay, "no-replay", false, "do not generate Replay file")
flag.BoolVar(&args.NoReplay, "no-replay", false, "do not generate Replay file (named \"otns_?.replay\")")

flag.Parse()
}
Expand Down Expand Up @@ -136,18 +136,19 @@ func parseListenAddr() {
}

func Main(ctx *progctx.ProgCtx, visualizerCreator func(ctx *progctx.ProgCtx, args *MainArgs) visualize.Visualizer, cliOptions *runcli.CliOptions) {
parseArgs()
//simplelogger.SetOutput([]string{"stdout", "otns.log"}) // for debug: generate a log output file.
simplelogger.SetLevel(simplelogger.ParseLevel(args.LogLevel))
parseListenAddr()
handleSignals(ctx)

rand.Seed(time.Now().UnixNano())
// run console in the main goroutine
ctx.Defer(func() {
_ = os.Stdin.Close()
})

handleSignals(ctx)
parseArgs()
//simplelogger.SetOutput([]string{"stdout", "otns.log"}) // for debug: generate a log output file.
simplelogger.SetLevel(GetSimpleloggerLevel(ParseWatchLogLevel(args.LogLevel)))
parseListenAddr()

rand.Seed(time.Now().UnixNano())

var vis visualize.Visualizer
if visualizerCreator != nil {
Expand Down Expand Up @@ -183,10 +184,6 @@ func Main(ctx *progctx.ProgCtx, visualizerCreator func(ctx *progctx.ProgCtx, arg
rt := cli.NewCmdRunner(ctx, sim)
sim.SetVisualizer(vis)
go sim.Run()
go func() {
err := cli.Run(rt, cliOptions)
ctx.Cancel(errors.Wrapf(err, "console-exit"))
}()

web.ConfigWeb(args.DispatcherHost, args.DispatcherPort-2, args.DispatcherPort-1, args.DispatcherPort-3)
simplelogger.Debugf("open web: %v", args.OpenWeb)
Expand All @@ -198,6 +195,11 @@ func Main(ctx *progctx.ProgCtx, visualizerCreator func(ctx *progctx.ProgCtx, arg
go autoGo(ctx, sim)
}

go func() {
err := cli.Run(rt, cliOptions)
ctx.Cancel(errors.Wrapf(err, "console-exit"))
}()

vis.Run() // visualize must run in the main thread
ctx.Cancel("main")

Expand All @@ -208,13 +210,14 @@ func Main(ctx *progctx.ProgCtx, visualizerCreator func(ctx *progctx.ProgCtx, arg

func handleSignals(ctx *progctx.ProgCtx) {
c := make(chan os.Signal, 1)
sigHandlerReady := make(chan struct{})
signal.Notify(c, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGHUP)
signal.Ignore(syscall.SIGALRM)

ctx.WaitAdd("handleSignals", 1)
go func() {
defer ctx.WaitDone("handleSignals")

close(sigHandlerReady)
for {
select {
case sig := <-c:
Expand All @@ -225,11 +228,12 @@ func handleSignals(ctx *progctx.ProgCtx) {
}
}
}()
<-sigHandlerReady
}

func autoGo(ctx *progctx.ProgCtx, sim *simulation.Simulation) {
for {
<-sim.Go(time.Second)
<-sim.Go(time.Second * 5)
if ctx.Err() != nil { // exit when context is Done.
return
}
Expand Down
22 changes: 13 additions & 9 deletions pylibs/unittests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,21 @@ def testDelNode(self):
self.assertTrue(len(ns.nodes()) == 1 and 1 not in ns.nodes())

def testDelManyNodes(self):
ns = self.ns
many = 32
for i in range(many):
ns.add("router", x=(i % 6) * 100, y=(i // 6) * 150)
for j in range(4):
ns = self.ns
many = 32

ns.go(10)
for i in range(1, many + 1):
ns.delete(i)
ns.go(5)
for i in range(many):
ns.add("router", x=(i % 6) * 100, y=(i // 6) * 150)

self.assertTrue(ns.nodes() == {})
ns.go(10)
for i in range(1, many + 1):
ns.delete(i)
ns.go(5)

self.assertTrue(ns.nodes() == {})
self.tearDown()
self.setUp()

def testDelNodeAndImmediatelyRecreate(self):
# repeat multiple times to catch some goroutine race conditions that only happen sometimes.
Expand Down

0 comments on commit 51f2783

Please sign in to comment.