-
Notifications
You must be signed in to change notification settings - Fork 0
/
mexec.go
118 lines (105 loc) · 2.63 KB
/
mexec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main
import (
"os"
"fmt"
"time"
"bufio"
"strconv"
"strings"
"os/exec"
)
type Job struct {
cmdRaw string
command string
commandArgs []string
exitStatus error
runTimeSeconds float64
}
func main() {
if len(os.Args) != 3 {
fmt.Printf("Usage: %s jobFile|- numWorkers\n", os.Args[0])
os.Exit(1)
}
jobFilename := os.Args[1]
numWorkers := func(arg string)(ret int){
r, err := strconv.ParseInt(arg, 10, 64)
if err != nil {
fmt.Printf("Error parsing 'numWorkers'=%s, should be int\n", arg)
os.Exit(1)
}
return int(r)
}(os.Args[2]) // Execute n many jobs at once
exitOnError := false // Wait for all jobs to finish or return on first failure
fmt.Printf("jobFilename: %s, %d workers, exitOnError: %t\n",
jobFilename, numWorkers, exitOnError)
// Compute the job list
var jobReader *bufio.Reader
if jobFilename == "-" {
jobReader = bufio.NewReader(os.Stdin)
} else {
jobFile, _ := os.Open(jobFilename)
jobReader = bufio.NewReader(jobFile)
}
jobs := getJobs(jobReader)
fmt.Printf("%d jobs will be submitted for execution\n\n", len(jobs))
inbound := make(chan Job, len(jobs))
outbound := make(chan Job, 0)
// Submit all the jobs
fmt.Printf("Submitting for execution:\n")
for i, job := range jobs {
fmt.Printf("[%d/%d] Submitting job=%s\n", i+1, len(jobs), job)
inbound <- job
}
fmt.Printf("\n\nCompleting:\n");
// Start the needed number of workers
for i := 0; i < numWorkers; i++ {
go startWorker(inbound, outbound)
}
// Check the outbound channel for completed jobs
for i := 0; i < len(jobs); i++ {
job := <-outbound
if job.exitStatus != nil {
if exitOnError {
os.Exit(-1)
}
}
fmt.Printf("[%d/%d] job '%s' completed successfully=%t - (%f seconds)\n", i+1, len(jobs),
job.cmdRaw, job.exitStatus == nil, job.runTimeSeconds)
}
}
func startWorker(inbound chan Job, outbound chan Job) {
for {
job := <-inbound
startTime := time.Now()
job.exitStatus = exec.Command(job.command, job.commandArgs...).Run()
job.runTimeSeconds = time.Since(startTime).Seconds()
outbound <- job
}
}
func getJobs(reader *bufio.Reader) (jobs []Job) {
for {
line, isPrefix, err := reader.ReadLine()
if err != nil || isPrefix {
return
}
jobs = append(jobs, createJob(string(line)))
}
return
}
func createJob(cmd string) (job Job) {
job.cmdRaw = cmd
var commandParts []string
lastStart := 0
for i := 0; i < len(cmd); i++ {
if cmd[i] == ' ' || i+1 == len(cmd) {
part := strings.Trim(cmd[lastStart:i+1], "\" ")
if part != "" {
commandParts = append(commandParts, part)
}
lastStart = i
}
}
job.command = commandParts[0]
job.commandArgs = commandParts[1:]
return job
}