Skip to content

Commit

Permalink
feat(runners): server <-> running communication works
Browse files Browse the repository at this point in the history
  • Loading branch information
fiftin committed Aug 28, 2023
1 parent 597956d commit d292c8a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 10 deletions.
5 changes: 5 additions & 0 deletions api/runners/runners.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func UpdateRunner(w http.ResponseWriter, r *http.Request) {
for _, job := range body.Jobs {
tsk := taskPool.GetTask(job.ID)

if tsk == nil {
// TODO: log
continue
}

for _, logRecord := range job.LogRecords {
tsk.Log2(logRecord.Message, logRecord.Time)
}
Expand Down
34 changes: 24 additions & 10 deletions services/runners/JobPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type job struct {
Status db.TaskStatus
args []string
environmentVars []string
id int
}

type RunnerConfig struct {
Expand Down Expand Up @@ -145,15 +144,16 @@ func (p *runningJob) logPipe(reader *bufio.Reader) {
func (p *JobPool) Run() {
queueTicker := time.NewTicker(5 * time.Second)
requestTimer := time.NewTicker(5 * time.Second)
p.runningJobs = make(map[int]*runningJob)

defer func() {
queueTicker.Stop()
}()

for {
select {
case job := <-p.register: // new task created by API or schedule
p.queue = append(p.queue, job)
//case j := <-p.register: // new task created by API or schedule
// p.queue = append(p.queue, j)

case <-queueTicker.C: // timer 5 seconds: get task from queue and run it
if len(p.queue) == 0 {
Expand All @@ -164,22 +164,22 @@ func (p *JobPool) Run() {
if t.Status == db.TaskFailStatus {
//delete failed TaskRunner from queue
p.queue = p.queue[1:]
log.Info("Task " + strconv.Itoa(t.id) + " removed from queue")
log.Info("Task " + strconv.Itoa(t.job.Task.ID) + " removed from queue")
break
}

log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.id))
p.resourceLocker <- &resourceLock{lock: true, holder: t}
//log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.id))
//p.resourceLocker <- &resourceLock{lock: true, holder: t}

p.runningJobs[t.id] = &runningJob{}
p.runningJobs[t.job.Task.ID] = &runningJob{}

t.job.Logger = p.runningJobs[t.id]
t.job.Logger = p.runningJobs[t.job.Task.ID]
t.job.Playbook.Logger = t.job.Logger

go t.job.Run(t.username, t.incomingVersion)

p.queue = p.queue[1:]
log.Info("Task " + strconv.Itoa(t.id) + " removed from queue")
log.Info("Task " + strconv.Itoa(t.job.Task.ID) + " removed from queue")

case <-requestTimer.C:

Expand All @@ -204,6 +204,16 @@ func (p *JobPool) sendProgress() {
Jobs: nil,
}

for id, j := range p.runningJobs {
body.Jobs = append(body.Jobs, JobProgress{
ID: id,
LogRecords: j.logRecords,
Status: j.status,
})

// TODO: clean logs
}

jsonBytes, err := json.Marshal(body)

req, err := http.NewRequest("PUT", url, bytes.NewBuffer(jsonBytes))
Expand Down Expand Up @@ -343,6 +353,10 @@ func (p *JobPool) checkNewJobs() {
}

for _, newJob := range response.NewJobs {
if _, exists := p.runningJobs[newJob.Task.ID]; exists {
continue
}

taskRunner := job{
username: newJob.Username,
incomingVersion: newJob.IncomingVersion,
Expand All @@ -360,6 +374,6 @@ func (p *JobPool) checkNewJobs() {
},
}

p.register <- &taskRunner
p.queue = append(p.queue, &taskRunner)
}
}

0 comments on commit d292c8a

Please sign in to comment.