Skip to content

Commit

Permalink
tasks improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavo-iniguez-goya committed Oct 10, 2024
1 parent c1c7138 commit e20e79d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 24 deletions.
9 changes: 4 additions & 5 deletions daemon/tasks/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import (
// TaskBase holds the common fields of every task.
// Warning: don't define fields in tasks with these names.
type TaskBase struct {
Ctx context.Context
Cancel context.CancelFunc
Results chan interface{}
Errors chan error
StopChan chan struct{}
Ctx context.Context
Cancel context.CancelFunc
Results chan interface{}
Errors chan error

// Stop the task if the daemon is disconnected from the GUI (server).
// Some tasks don't need to run if the daemon is not connected to the GUI,
Expand Down
17 changes: 12 additions & 5 deletions daemon/tasks/nodemonitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"syscall"
"time"
"unsafe"
Expand All @@ -24,6 +25,7 @@ type Config struct {
// NodeMonitor monitors the resources of a node (ram, swap, load avg, etc).
type NodeMonitor struct {
tasks.TaskBase
mu *sync.RWMutex
Ticker *time.Ticker

Interval string
Expand All @@ -34,17 +36,20 @@ type NodeMonitor struct {
func New(node, interval string, stopOnDisconnect bool) (string, *NodeMonitor) {
return fmt.Sprint(Name, "-", node), &NodeMonitor{
TaskBase: tasks.TaskBase{
Results: make(chan interface{}),
Errors: make(chan error),
StopChan: make(chan struct{}),
Results: make(chan interface{}),
Errors: make(chan error),
},
mu: &sync.RWMutex{},
Node: node,
Interval: interval,
}
}

// Start ...
func (pm *NodeMonitor) Start(ctx context.Context, cancel context.CancelFunc) error {
pm.mu.Lock()
defer pm.mu.Unlock()

pm.Ctx = ctx
pm.Cancel = cancel

Expand All @@ -60,8 +65,6 @@ func (pm *NodeMonitor) Start(ctx context.Context, cancel context.CancelFunc) err
var info syscall.Sysinfo_t
for {
select {
case <-pm.StopChan:
goto Exit
case <-ctx.Done():
goto Exit
case <-pm.Ticker.C:
Expand Down Expand Up @@ -102,9 +105,13 @@ func (pm *NodeMonitor) Resume() error {

// Stop ...
func (pm *NodeMonitor) Stop() error {
pm.mu.RLock()
defer pm.mu.RUnlock()

if pm.StopOnDisconnect {
return nil
}
pm.Ticker.Stop()
pm.Cancel()
close(pm.TaskBase.Results)
close(pm.TaskBase.Errors)
Expand Down
30 changes: 21 additions & 9 deletions daemon/tasks/pidmonitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"unsafe"

Expand All @@ -25,26 +26,31 @@ type Config struct {
// PIDMonitor monitors a process ID.
type PIDMonitor struct {
tasks.TaskBase
Ticker *time.Ticker
Interval string
Pid int
mu *sync.RWMutex
Ticker *time.Ticker
Interval string
Pid int
isStopped bool
}

// New returns a new PIDMonitor
func New(pid int, interval string, stopOnDisconnect bool) (string, *PIDMonitor) {
return fmt.Sprint(Name, "-", pid), &PIDMonitor{
TaskBase: tasks.TaskBase{
Results: make(chan interface{}),
Errors: make(chan error),
StopChan: make(chan struct{}),
Results: make(chan interface{}),
Errors: make(chan error),
},
mu: &sync.RWMutex{},
Pid: pid,
Interval: interval,
}
}

// Start ...
func (pm *PIDMonitor) Start(ctx context.Context, cancel context.CancelFunc) error {
pm.mu.RLock()
defer pm.mu.RUnlock()

pm.Ctx = ctx
pm.Cancel = cancel
p := &procmon.Process{}
Expand All @@ -71,8 +77,6 @@ func (pm *PIDMonitor) Start(ctx context.Context, cancel context.CancelFunc) erro
go func(ctx context.Context) {
for {
select {
case <-pm.StopChan:
goto Exit
case <-ctx.Done():
goto Exit
case <-pm.Ticker.C:
Expand All @@ -86,6 +90,9 @@ func (pm *PIDMonitor) Start(ctx context.Context, cancel context.CancelFunc) erro
pm.TaskBase.Errors <- err
continue
}
if pm.isStopped {
goto Exit
}
// ~200µs (string()) vs ~60ns
pm.TaskBase.Results <- unsafe.String(unsafe.SliceData(pJSON), len(pJSON))
}
Expand All @@ -111,12 +118,17 @@ func (pm *PIDMonitor) Resume() error {

// Stop ...
func (pm *PIDMonitor) Stop() error {
pm.mu.Lock()
defer pm.mu.Unlock()

if pm.StopOnDisconnect {
log.Debug("[task.PIDMonitor] ignoring Stop()")
return nil
}
pm.isStopped = true

log.Debug("[task.PIDMonitor] Stop()")
pm.StopChan <- struct{}{}
pm.Ticker.Stop()
pm.Cancel()
close(pm.TaskBase.Results)
close(pm.TaskBase.Errors)
Expand Down
7 changes: 2 additions & 5 deletions daemon/tasks/socketsmonitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ func New(config interface{}, stopOnDisconnect bool) (*SocketsMonitor, error) {
}
return &SocketsMonitor{
TaskBase: tasks.TaskBase{
Results: make(chan interface{}),
Errors: make(chan error),
StopChan: make(chan struct{}),
Results: make(chan interface{}),
Errors: make(chan error),
},
mu: &sync.RWMutex{},
StopOnDisconnect: stopOnDisconnect,
Expand All @@ -98,8 +97,6 @@ func (pm *SocketsMonitor) Start(ctx context.Context, cancel context.CancelFunc)
go func(ctx context.Context) {
for {
select {
case <-pm.TaskBase.StopChan:
goto Exit
case <-ctx.Done():
goto Exit
case <-pm.Ticker.C:
Expand Down

0 comments on commit e20e79d

Please sign in to comment.